package processor import ( "log" "net/http" "sync" "time" "github.com/robfig/cron" ) func (rp *RequestProcessor) startPeakHandling() error { var err error c := cron.New() // cronTime := fmt.Sprintf("%d,%d * * * *", 30-prefetchInterval/60, 60-prefetchInterval/60) err = c.AddFunc( "24 * * * *", func() { rp.prefetchPeakRequests(&rp.peakRequest30) }, ) if err != nil { return err } err = c.AddFunc( "54 * * * *", func() { rp.prefetchPeakRequests(&rp.peakRequest60) }, ) if err != nil { return err } c.Start() return nil } // registerPeakRequest registers requests coming in the peak time. // Such requests can be prefetched afterwards just before the peak time comes. func (rp *RequestProcessor) savePeakRequest(cacheDigest string, r *http.Request) { if _, min, _ := time.Now().Clock(); min == 30 { rp.peakRequest30.Store(cacheDigest, *r) } else if min == 0 { rp.peakRequest60.Store(cacheDigest, *r) } } func (rp *RequestProcessor) prefetchRequest(r *http.Request) error { _, err := rp.ProcessRequest(r) return err } func syncMapLen(sm *sync.Map) int { count := 0 f := func(key, value interface{}) bool { // Not really certain about this part, don't know for sure // if this is a good check for an entry's existence if key == "" { return false } count++ return true } sm.Range(f) return count } func (rp *RequestProcessor) prefetchPeakRequests(peakRequestMap *sync.Map) { peakRequestLen := syncMapLen(peakRequestMap) if peakRequestLen == 0 { return } log.Printf("PREFETCH: Prefetching %d requests\n", peakRequestLen) sleepBetweenRequests := time.Duration(rp.config.Uplink.PrefetchInterval*1000/peakRequestLen) * time.Millisecond peakRequestMap.Range(func(key interface{}, value interface{}) bool { req, ok := value.(http.Request) if !ok { log.Println("missing value for:", key) return true } go func(r http.Request) { err := rp.prefetchRequest(&r) if err != nil { log.Println("prefetch request:", err) } }(req) peakRequestMap.Delete(key) time.Sleep(sleepBetweenRequests) return true }) }