diff --git a/internal/config/config.go b/internal/config/config.go index 23c90d1..f96df4a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -50,8 +50,17 @@ type Server struct { // Uplink configuration. type Uplink struct { - // Address contains address of the uplink server in form IP:PORT. - Address string `yaml:"address,omitempty"` + // Address1 contains address of the uplink server in form IP:PORT + // for format=j1 queries. + Address1 string `yaml:"address1,omitempty"` + + // Address2 contains address of the uplink server in form IP:PORT + // for format=* queries. + Address2 string `yaml:"address2,omitempty"` + + // Address3 contains address of the uplink server in form IP:PORT + // for all other queries. + Address3 string `yaml:"address3,omitempty"` // Timeout for upstream queries. Timeout int `yaml:"timeout,omitempty"` @@ -85,7 +94,7 @@ type Geo struct { LocationCacheType types.CacheType `yaml:"locationCacheType,omitempty"` - Nominatim []Nominatim + Nominatim []Nominatim `yaml:"nominatim"` } type Nominatim struct { @@ -140,7 +149,9 @@ func Default() *Config { TLSKeyFile: "/wttr.in/etc/privkey.pem", }, Uplink{ - Address: "127.0.0.1:9002", + Address1: "127.0.0.1:9002", + Address2: "127.0.0.1:9002", + Address3: "127.0.0.1:9002", Timeout: 30, PrefetchInterval: 300, }, diff --git a/internal/geo/ip/ip.go b/internal/geo/ip/ip.go index c494bd4..c098f5d 100644 --- a/internal/geo/ip/ip.go +++ b/internal/geo/ip/ip.go @@ -10,12 +10,13 @@ import ( "strconv" "strings" + "github.com/samonzeweb/godb" + "github.com/samonzeweb/godb/adapters/sqlite" + "github.com/chubin/wttr.in/internal/config" "github.com/chubin/wttr.in/internal/routing" "github.com/chubin/wttr.in/internal/types" "github.com/chubin/wttr.in/internal/util" - "github.com/samonzeweb/godb" - "github.com/samonzeweb/godb/adapters/sqlite" ) // Address information. diff --git a/internal/processor/processor.go b/internal/processor/processor.go index ee5b553..5c254ef 100644 --- a/internal/processor/processor.go +++ b/internal/processor/processor.go @@ -3,7 +3,6 @@ package processor import ( "context" "fmt" - "io/ioutil" "log" "math/rand" "net" @@ -52,15 +51,17 @@ type ResponseWithHeader struct { // RequestProcessor handles incoming requests. type RequestProcessor struct { - peakRequest30 sync.Map - peakRequest60 sync.Map - lruCache *lru.Cache - stats *stats.Stats - router routing.Router - upstreamTransport *http.Transport - config *config.Config - geoIPCache *geoip.Cache - geoLocation *geoloc.Cache + peakRequest30 sync.Map + peakRequest60 sync.Map + lruCache *lru.Cache + stats *stats.Stats + router routing.Router + upstreamTransport1 *http.Transport + upstreamTransport2 *http.Transport + upstreamTransport3 *http.Transport + config *config.Config + geoIPCache *geoip.Cache + geoLocation *geoloc.Cache } // NewRequestProcessor returns new RequestProcessor. @@ -76,9 +77,19 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) { DualStack: true, } - transport := &http.Transport{ + transport1 := &http.Transport{ DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { - return dialer.DialContext(ctx, network, config.Uplink.Address) + return dialer.DialContext(ctx, network, config.Uplink.Address1) + }, + } + transport2 := &http.Transport{ + DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { + return dialer.DialContext(ctx, network, config.Uplink.Address2) + }, + } + transport3 := &http.Transport{ + DialContext: func(ctx context.Context, network, _ string) (net.Conn, error) { + return dialer.DialContext(ctx, network, config.Uplink.Address3) }, } @@ -93,12 +104,14 @@ func NewRequestProcessor(config *config.Config) (*RequestProcessor, error) { } rp := &RequestProcessor{ - lruCache: lruCache, - stats: stats.New(), - upstreamTransport: transport, - config: config, - geoIPCache: geoCache, - geoLocation: geoLocation, + lruCache: lruCache, + stats: stats.New(), + upstreamTransport1: transport1, + upstreamTransport2: transport2, + upstreamTransport3: transport3, + config: config, + geoIPCache: geoCache, + geoLocation: geoLocation, } // Initialize routes. @@ -142,7 +155,7 @@ func (rp *RequestProcessor) ProcessRequest(r *http.Request) (*ResponseWithHeader if dontCache(r) { rp.stats.Inc("uncached") - return get(r, rp.upstreamTransport) + return getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3) } // processing cached request @@ -173,8 +186,9 @@ func (rp *RequestProcessor) processRequestFromCache(r *http.Request) *ResponseWi return nil } - // if after all attempts we still have no answer, - // we try to make the query on our own + // If after all attempts we still have no answer, + // respond with an error message. + // (WAS: we try to make the query on our own) for attempts := 0; attempts < 300; attempts++ { if !ok || !cacheEntry.InProgress { break @@ -223,10 +237,7 @@ func (rp *RequestProcessor) processUncachedRequest(r *http.Request) (*ResponseWi rp.stats.Inc("geoip") } - // Indicate, that the request is being handled. - rp.lruCache.Add(cacheDigest, ResponseWithHeader{InProgress: true}) - - response, err = get(r, rp.upstreamTransport) + response, err = getAny(r, rp.upstreamTransport1, rp.upstreamTransport2, rp.upstreamTransport3) if err != nil { return nil, err }