diff --git a/.golangci.yaml b/.golangci.yaml index e516efd..7057a7c 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -17,12 +17,7 @@ linters: - ireturn - gosec - noctx - - funlen - - nestif - - gocognit - interfacer - - revive - - cyclop # deprecated: - scopelint diff --git a/internal/processor/peak.go b/internal/processor/peak.go index c03beb2..05752df 100644 --- a/internal/processor/peak.go +++ b/internal/processor/peak.go @@ -35,6 +35,8 @@ func (rp *RequestProcessor) startPeakHandling() error { return nil } +// registerPeakRequest registers requests coming in the peak time. +// Such requests can be prefetched afterwards just before the peak time comes. func (rp *RequestProcessor) savePeakRequest(cacheDigest string, r *http.Request) { if _, min, _ := time.Now().Clock(); min == 30 { rp.peakRequest30.Store(cacheDigest, *r) diff --git a/internal/processor/processor.go b/internal/processor/processor.go index 1c97a50..5623c5c 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -39,7 +39,7 @@ func plainTextAgents() []string { } } -type responseWithHeader struct { +type ResponseWithHeader struct { InProgress bool // true if the request is being processed Expires time.Time // expiration time of the cache entry @@ -105,14 +105,12 @@ func (rp *RequestProcessor) Start() error { return rp.startPeakHandling() } -func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*responseWithHeader, error) { +func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader, error) { var ( - response *responseWithHeader - cacheEntry responseWithHeader - err error + response *ResponseWithHeader + ip = util.ReadUserIP(r) ) - ip := util.ReadUserIP(r) if ip != "127.0.0.1" { rp.stats.Inc("total") } @@ -137,46 +135,68 @@ func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*responseWithHeader return get(r, rp.upstreamTransport) } + // processing cached request cacheDigest := getCacheDigest(r) - foundInCache := false - rp.savePeakRequest(cacheDigest, r) - cacheBody, ok := rp.lruCache.Get(cacheDigest) - if ok { - cacheEntry, ok = cacheBody.(responseWithHeader) + response = rp.processRequestFromCache(r) + if response != nil { + return response, nil } - if ok { - rp.stats.Inc("cache1") - // if after all attempts we still have no answer, - // we try to make the query on our own - for attempts := 0; attempts < 300; attempts++ { - if !ok || !cacheEntry.InProgress { - break - } - time.Sleep(30 * time.Millisecond) - cacheBody, ok = rp.lruCache.Get(cacheDigest) - if ok && cacheBody != nil { - if v, ok := cacheBody.(responseWithHeader); ok { - cacheEntry = v - } - } - } - if cacheEntry.InProgress { - log.Printf("TIMEOUT: %s\n", cacheDigest) + return rp.processUncachedRequest(r) +} + +// processRequestFromCache processes requests using the cache. +// If no entry in cache found, nil is returned. +func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWithHeader { + var ( + cacheEntry ResponseWithHeader + cacheDigest = getCacheDigest(r) + ok bool + ) + + cacheBody, _ := rp.lruCache.Get(cacheDigest) + cacheEntry, ok = cacheBody.(ResponseWithHeader) + if !ok { + return nil + } + + // if after all attempts we still have no answer, + // we try to make the query on our own + for attempts := 0; attempts < 300; attempts++ { + if !ok || !cacheEntry.InProgress { + break } - if ok && !cacheEntry.InProgress && cacheEntry.Expires.After(time.Now()) { - response = &cacheEntry - foundInCache = true + time.Sleep(30 * time.Millisecond) + cacheBody, _ = rp.lruCache.Get(cacheDigest) + v, ok := cacheBody.(ResponseWithHeader) + if ok { + cacheEntry = v } } + if cacheEntry.InProgress { + log.Printf("TIMEOUT: %s\n", cacheDigest) + } + if ok && !cacheEntry.InProgress && cacheEntry.Expires.After(time.Now()) { + rp.stats.Inc("cache1") - if foundInCache { - return response, nil + return &cacheEntry } + return nil +} + +// processUncachedRequest processes requests that were not found in the cache. +func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWithHeader, error) { + var ( + cacheDigest = getCacheDigest(r) + ip = util.ReadUserIP(r) + response *ResponseWithHeader + err error + ) + // Response was not found in cache. // Starting real handling. format := r.URL.Query().Get("format") @@ -187,13 +207,14 @@ func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*responseWithHeader } } - // How many IP addresses are known. + // Count, how many IP addresses are known. _, err = rp.geoIPCache.Read(ip) if err == nil { rp.stats.Inc("geoip") } - rp.lruCache.Add(cacheDigest, responseWithHeader{InProgress: true}) + // Indicate, that the request is being handled. + rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true}) response, err = get(r, rp.upstreamTransport) if err != nil { @@ -209,7 +230,7 @@ func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*responseWithHeader return response, nil } -func get(req *http.Request, transport *http.Transport) (*responseWithHeader, error) { +func get(req *http.Request, transport *http.Transport) (*ResponseWithHeader, error) { client := &http.Client{ Transport: transport, } @@ -245,7 +266,7 @@ func get(req *http.Request, transport *http.Transport) (*responseWithHeader, err return nil, err } - return &responseWithHeader{ + return &ResponseWithHeader{ InProgress: false, Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second), Body: body, @@ -282,7 +303,7 @@ func dontCache(req *http.Request) bool { // Insecure queries are marked by the frontend web server // with X-Forwarded-Proto header: // `proxy_set_header X-Forwarded-Proto $scheme;`. -func redirectInsecure(req *http.Request) (*responseWithHeader, bool) { +func redirectInsecure(req *http.Request) (*ResponseWithHeader, bool) { if isPlainTextAgent(req.Header.Get("User-Agent")) { return nil, false } @@ -304,7 +325,7 @@ The document has moved `, target)) - return &responseWithHeader{ + return &ResponseWithHeader{ InProgress: false, Expires: time.Now().Add(time.Duration(randInt(1000, 1500)) * time.Second), Body: body, @@ -340,8 +361,8 @@ func ipFromAddr(s string) string { } // fromCadre converts Cadre into a responseWithHeader. -func fromCadre(cadre *routing.Cadre) *responseWithHeader { - return &responseWithHeader{ +func fromCadre(cadre *routing.Cadre) *ResponseWithHeader { + return &ResponseWithHeader{ Body: cadre.Body, Expires: cadre.Expires, StatusCode: 200, diff --git a/srv.go b/srv.go index 8f4e88d..25c11a1 100644 --- a/srv.go +++ b/srv.go @@ -79,7 +79,9 @@ func serve(conf *config.Config) error { mux = http.NewServeMux() // logger is optimized requests logger. - logger *logging.RequestLogger + logger = logging.NewRequestLogger( + conf.Logging.AccessLog, + time.Duration(conf.Logging.Interval)*time.Second) rp *processor.RequestProcessor @@ -89,25 +91,18 @@ func serve(conf *config.Config) error { // numberOfServers started. If 0, exit. numberOfServers int - errorsLog *logging.LogSuppressor + errorsLog = logging.NewLogSuppressor( + conf.Logging.ErrorsLog, + []string{ + "error reading preface from client", + "TLS handshake error from", + }, + logLineStart, + ) err error ) - // logger is optimized requests logger. - logger = logging.NewRequestLogger( - conf.Logging.AccessLog, - time.Duration(conf.Logging.Interval)*time.Second) - - errorsLog = logging.NewLogSuppressor( - conf.Logging.ErrorsLog, - []string{ - "error reading preface from client", - "TLS handshake error from", - }, - logLineStart, - ) - rp, err = processor.NewRequestProcessor(conf) if err != nil { return fmt.Errorf("log processor initialization: %w", err) @@ -123,11 +118,32 @@ func serve(conf *config.Config) error { return err } - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + mux.HandleFunc("/", mainHandler(rp, logger)) + + if conf.Server.PortHTTP != 0 { + go serveHTTP(mux, conf.Server.PortHTTP, errorsLog, errs) + numberOfServers++ + } + if conf.Server.PortHTTPS != 0 { + go serveHTTPS(mux, conf.Server.PortHTTPS, conf.Server.TLSCertFile, conf.Server.TLSKeyFile, errorsLog, errs) + numberOfServers++ + } + if numberOfServers == 0 { + return types.ErrNoServersConfigured + } + + return <-errs // block until one of the servers writes an error +} + +func mainHandler( + rp *processor.RequestProcessor, + logger *logging.RequestLogger, +) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { if err := logger.Log(r); err != nil { log.Println(err) } - // printStat() + response, err := rp.ProcessRequest(r) if err != nil { log.Println(err) @@ -147,21 +163,7 @@ func serve(conf *config.Config) error { if err != nil { log.Println(err) } - }) - - if conf.Server.PortHTTP != 0 { - go serveHTTP(mux, conf.Server.PortHTTP, errorsLog, errs) - numberOfServers++ - } - if conf.Server.PortHTTPS != 0 { - go serveHTTPS(mux, conf.Server.PortHTTPS, conf.Server.TLSCertFile, conf.Server.TLSKeyFile, errorsLog, errs) - numberOfServers++ - } - if numberOfServers == 0 { - return types.ErrNoServersConfigured } - - return <-errs // block until one of the servers writes an error } func main() { @@ -192,29 +194,12 @@ func main() { return } - if cli.ConvertGeoIPCache { - geoIPCache, err := geoip.NewCache(conf) - if err != nil { - ctx.FatalIfErrorf(err) - } - - ctx.FatalIfErrorf(geoIPCache.ConvertCache()) - - return - } - - if cli.ConvertGeoLocationCache { - geoLocCache, err := geoloc.NewCache(conf) - if err != nil { - ctx.FatalIfErrorf(err) - } - - ctx.FatalIfErrorf(geoLocCache.ConvertCache()) - - return - } - - if cli.GeoResolve != "" { + switch { + case cli.ConvertGeoIPCache: + ctx.FatalIfErrorf(convertGeoIPCache(conf)) + case cli.ConvertGeoLocationCache: + ctx.FatalIfErrorf(convertGeoLocationCache(conf)) + case cli.GeoResolve != "": sr := geoloc.NewSearcher(conf) loc, err := sr.Search(cli.GeoResolve) ctx.FatalIfErrorf(err) @@ -222,8 +207,26 @@ func main() { //nolint:forbidigo fmt.Println(*loc) } + default: + err = serve(conf) + ctx.FatalIfErrorf(err) + } +} + +func convertGeoIPCache(conf *config.Config) error { + geoIPCache, err := geoip.NewCache(conf) + if err != nil { + return err + } + + return geoIPCache.ConvertCache() +} + +func convertGeoLocationCache(conf *config.Config) error { + geoLocCache, err := geoloc.NewCache(conf) + if err != nil { + return err } - err = serve(conf) - ctx.FatalIfErrorf(err) + return geoLocCache.ConvertCache() }