From 981836fe3cf7fa0f96c134b6d25b9d0745aad18d Mon Sep 17 00:00:00 2001 From: Igor Chubin Date: Mon, 3 Feb 2025 14:04:50 +0100 Subject: [PATCH] Fix formatting of lib/location.py --- lib/location.py | 206 +++++++++++++++++++++++++++++------------------- 1 file changed, 124 insertions(+), 82 deletions(-) diff --git a/lib/location.py b/lib/location.py index 7224d77..087463f 100644 --- a/lib/location.py +++ b/lib/location.py @@ -46,8 +46,18 @@ import geoip2.database import pycountry import requests -from globals import GEOLITE, GEOLOCATOR_SERVICE, IP2LCACHE, IP2LOCATION_KEY, NOT_FOUND_LOCATION, \ - ALIASES, BLACKLIST, IATA_CODES_FILE, IPLOCATION_ORDER, IPINFO_TOKEN +from globals import ( + GEOLITE, + GEOLOCATOR_SERVICE, + IP2LCACHE, + IP2LOCATION_KEY, + NOT_FOUND_LOCATION, + ALIASES, + BLACKLIST, + IATA_CODES_FILE, + IPLOCATION_ORDER, + IPINFO_TOKEN, +) GEOIP_READER = geoip2.database.Reader(GEOLITE) COUNTRY_MAP = {"Russian Federation": "Russia"} @@ -56,7 +66,7 @@ COUNTRY_MAP = {"Russian Federation": "Russia"} def _debug_log(s): if os.environ.get("WTTR_DEBUG_LOCATION"): with open("/tmp/location-debug.log", "a") as f: - f.write("%s %s\n" % (datetime.datetime.now(),s)) + f.write("%s %s\n" % (datetime.datetime.now(), s)) def _is_ip(ip_addr): @@ -82,13 +92,13 @@ def _location_normalize(location): """ Normalize location name `location` """ - #translation_table = dict.fromkeys(map(ord, '!@#$*;'), None) + # translation_table = dict.fromkeys(map(ord, '!@#$*;'), None) def _remove_chars(chars, string): - return ''.join(x for x in string if x not in chars) + return "".join(x for x in string if x not in chars) - location = location.lower().replace('_', ' ').replace('+', ' ').strip() - if not location.startswith('moon@'): - location = _remove_chars(r'!@#$*;:\\', location) + location = location.lower().replace("_", " ").replace("+", " ").strip() + if not location.startswith("moon@"): + location = _remove_chars(r"!@#$*;:\\", location) return location @@ -100,9 +110,11 @@ def _geolocator(location): try: if random.random() < 0: - geo = requests.get('%s/%s' % (GEOLOCATOR_SERVICE, location)).text + geo = requests.get("%s/%s" % (GEOLOCATOR_SERVICE, location)).text else: - geo = requests.get("http://127.0.0.1:8085/:geo-location?location=%s" % location).text + geo = requests.get( + "http://127.0.0.1:8085/:geo-location?location=%s" % location + ).text except requests.exceptions.ConnectionError as exception: print("ERROR: %s" % exception) return None @@ -111,7 +123,7 @@ def _geolocator(location): return None try: - answer = json.loads(geo.encode('utf-8')) + answer = json.loads(geo.encode("utf-8")) if "error" in answer: return None return answer @@ -123,38 +135,40 @@ def _geolocator(location): def _ipcachewrite(ip_addr, location): - """ Write a retrieved ip+location into cache - Can stress some filesystems after long term use, see - https://stackoverflow.com/questions/466521/how-many-files-can-i-put-in-a-directory + """Write a retrieved ip+location into cache + Can stress some filesystems after long term use, see + https://stackoverflow.com/questions/466521/how-many-files-can-i-put-in-a-directory - Expects a location of the form: - `(city, region, country, country_code, , )` - Writes a cache entry of the form: - `country_code;country;region;city;;` + Expects a location of the form: + `(city, region, country, country_code, , )` + Writes a cache entry of the form: + `country_code;country;region;city;;` - The latitude and longitude are optional elements. + The latitude and longitude are optional elements. """ return cachefile = os.path.join(IP2LCACHE, ip_addr) if not os.path.exists(IP2LCACHE): os.makedirs(IP2LCACHE) - with open(cachefile, 'w') as file: + with open(cachefile, "w") as file: # like ip2location format - file.write(location[3] + ';' + location[2] + ';' + location[1] + ';' + location[0]) + file.write( + location[3] + ";" + location[2] + ";" + location[1] + ";" + location[0] + ) if len(location) > 4: - file.write(';' + ';'.join(map(str, location[4:]))) + file.write(";" + ";".join(map(str, location[4:]))) def _ipcache(ip_addr): - """ Retrieve a location from cache by ip addr - Returns a triple of (CITY, REGION, COUNTRY) or None - TODO: When cache becomes more robust, transition to using latlong + """Retrieve a location from cache by ip addr + Returns a triple of (CITY, REGION, COUNTRY) or None + TODO: When cache becomes more robust, transition to using latlong """ ## Use Geo IP service when available r = requests.get("http://127.0.0.1:8085/:geo-ip-get?ip=%s" % ip_addr) if r.status_code == 200 and ";" in r.text: - _, country, region, city, *_ = r.text.split(';') + _, country, region, city, *_ = r.text.split(";") return city, region, country return None @@ -175,12 +189,12 @@ def _ipcache(ip_addr): def _ip2location(ip_addr): - """ Convert IP address `ip_addr` to a location name using ip2location. - Return list of location data fields: + """Convert IP address `ip_addr` to a location name using ip2location. + Return list of location data fields: - [ccode, country, region, city, rest...] + [ccode, country, region, city, rest...] - Return `None` if an error occured. + Return `None` if an error occured. """ # if IP2LOCATION_KEY is not set, do not query, @@ -190,12 +204,13 @@ def _ip2location(ip_addr): try: _debug_log("[_ip2location] %s search" % ip_addr) r = requests.get( - 'http://api.ip2location.com/?ip=%s&key=%s&package=WS3' # WS5 provides latlong - % (ip_addr, IP2LOCATION_KEY)) + "http://api.ip2location.com/?ip=%s&key=%s&package=WS3" # WS5 provides latlong + % (ip_addr, IP2LOCATION_KEY) + ) r.raise_for_status() location = r.text - parts = location.split(';') + parts = location.split(";") if len(parts) >= 4: # ccode, country, region, city, (rest) _debug_log("[_ip2location] %s found" % ip_addr) @@ -209,14 +224,17 @@ def _ipinfo(ip_addr): if not IPINFO_TOKEN: return None try: - r = requests.get( - 'https://ipinfo.io/%s/json?token=%s' - % (ip_addr, IPINFO_TOKEN)) + r = requests.get("https://ipinfo.io/%s/json?token=%s" % (ip_addr, IPINFO_TOKEN)) r.raise_for_status() r_json = r.json() # can't do two unpackings on one line - city, region, country, ccode = r_json["city"], r_json["region"], '', r_json["country"], - lat, long = r_json["loc"].split(',') + city, region, country, ccode = ( + r_json["city"], + r_json["region"], + "", + r_json["country"], + ) + lat, long = r_json["loc"].split(",") # NOTE: ipinfo only provides ISO codes for countries country = pycountry.countries.get(alpha_2=ccode).name except (requests.exceptions.RequestException, ValueError): @@ -230,12 +248,26 @@ def _geoip(ip_addr): _debug_log("[_geoip] %s search" % ip_addr) response = GEOIP_READER.city(ip_addr) # print(response.subdivisions) - city, region, country, ccode, lat, long = response.city.name, response.subdivisions[0].names["en"], response.country.name, response.country.iso_code, response.location.latitude, response.location.longitude + city, region, country, ccode, lat, long = ( + response.city.name, + response.subdivisions[0].names["en"], + response.country.name, + response.country.iso_code, + response.location.latitude, + response.location.longitude, + ) _debug_log("[_geoip] %s found" % ip_addr) except IndexError: # Tuple error try: - city, region, country, ccode, lat, long = response.city.name, None, response.country.name, response.country.iso_code, response.location.latitude, response.location.longitude + city, region, country, ccode, lat, long = ( + response.city.name, + None, + response.country.name, + response.country.iso_code, + response.location.latitude, + response.location.longitude, + ) except IndexError: return None except (geoip2.errors.AddressNotFoundError): @@ -261,11 +293,11 @@ def _get_location(ip_addr): # location from iplocators have the following order: # (CITY, REGION, COUNTRY, CCODE, LAT, LONG) for method in IPLOCATION_ORDER: - if method == 'geoip': + if method == "geoip": location = _geoip(ip_addr) - elif method == 'ip2location': + elif method == "ip2location": location = _ip2location(ip_addr) - elif method == 'ipinfo': + elif method == "ipinfo": location = _ipinfo(ip_addr) else: print("ERROR: invalid iplocation method specified: %s" % method) @@ -281,7 +313,7 @@ def _get_location(ip_addr): # temporary disabled it because of geoip services capcacity # - #if city is None and response.location: + # if city is None and response.location: # coord = "%s, %s" % (response.location.latitude, response.location.longitude) # try: # location = geolocator.reverse(coord, language='en') @@ -307,12 +339,12 @@ def _load_aliases(aliases_filename): Load aliases from the aliases file """ aliases_db = {} - with open(aliases_filename, 'r') as f_aliases: + with open(aliases_filename, "r") as f_aliases: for line in f_aliases.readlines(): try: - from_, to_ = line.decode('utf-8').split(':', 1) + from_, to_ = line.decode("utf-8").split(":", 1) except AttributeError: - from_, to_ = line.split(':', 1) + from_, to_ = line.split(":", 1) aliases_db[_location_normalize(from_)] = _location_normalize(to_) return aliases_db @@ -322,7 +354,7 @@ def _load_iata_codes(iata_codes_filename): """ Load IATA codes from the IATA codes file """ - with open(iata_codes_filename, 'r') as f_iata_codes: + with open(iata_codes_filename, "r") as f_iata_codes: result = [] for line in f_iata_codes.readlines(): result.append(line.strip()) @@ -330,7 +362,7 @@ def _load_iata_codes(iata_codes_filename): LOCATION_ALIAS = _load_aliases(ALIASES) -LOCATION_BLACK_LIST = [x.strip() for x in open(BLACKLIST, 'r').readlines()] +LOCATION_BLACK_LIST = [x.strip() for x in open(BLACKLIST, "r").readlines()] IATA_CODES = _load_iata_codes(IATA_CODES_FILE) @@ -359,8 +391,8 @@ def _get_hemisphere(location): def _fully_qualified_location(location, region, country): - """ Return fully qualified location name with `region` and `country`, - as a string. + """Return fully qualified location name with `region` and `country`, + as a string. """ # If country is not specified, location stays as is @@ -388,8 +420,7 @@ def _fully_qualified_location(location, region, country): def location_processing(location, ip_addr): - """ - """ + """ """ # if location is starting with ~ # or has non ascii symbols @@ -397,23 +428,23 @@ def location_processing(location, ip_addr): override_location_name = None full_address = None hide_full_address = False - force_show_full_address = location is not None and location.startswith('~') + force_show_full_address = location is not None and location.startswith("~") # location ~ means that it should be detected automatically, # and shown in the location line below the report - if location == '~': + if location == "~": location = None - if location and location.lstrip('~ ').startswith('@'): + if location and location.lstrip("~ ").startswith("@"): try: - if (location.lstrip('~ ')[1:] == ""): + if location.lstrip("~ ")[1:] == "": location, region, country = NOT_FOUND_LOCATION, None, None else: location, region, country = _get_location( - socket.gethostbyname( - location.lstrip('~ ')[1:])) - location = '~' + location + socket.gethostbyname(location.lstrip("~ ")[1:]) + ) + location = "~" + location location = _fully_qualified_location(location, region, country) hide_full_address = not force_show_full_address @@ -425,11 +456,11 @@ def location_processing(location, ip_addr): # For moon queries, hemisphere must be found # True for North, False for South hemisphere = False - if location is not None and (location.lower()+"@").startswith("moon@"): + if location is not None and (location.lower() + "@").startswith("moon@"): hemisphere = _get_hemisphere(query_source_location) country = None - if not location or location == 'MyLocation': + if not location or location == "MyLocation": location = ip_addr if _is_ip(location): @@ -438,11 +469,11 @@ def location_processing(location, ip_addr): # here too if location: - location = '~' + location + location = "~" + location location = _fully_qualified_location(location, region, country) hide_full_address = not force_show_full_address - if location and not location.startswith('~'): + if location and not location.startswith("~"): tmp_location = _location_canonical_name(location) if tmp_location != location: override_location_name = location @@ -452,38 +483,44 @@ def location_processing(location, ip_addr): # contains some unicode symbols # here we resolve them if location is not None and location != NOT_FOUND_LOCATION: - location = "~" + location.lstrip('~ ') + location = "~" + location.lstrip("~ ") if not override_location_name: - override_location_name = location.lstrip('~') + override_location_name = location.lstrip("~") # if location is not None and location.upper() in IATA_CODES: # location = '~%s' % location - if location is not None and not location.startswith("~-,") and location.startswith('~'): + if ( + location is not None + and not location.startswith("~-,") + and location.startswith("~") + ): geolocation = _geolocator(_location_canonical_name(location[1:])) if geolocation is not None: if not override_location_name: - override_location_name = location[1:].replace('+', ' ') - location = "%s,%s" % (geolocation['latitude'], geolocation['longitude']) + override_location_name = location[1:].replace("+", " ") + location = "%s,%s" % (geolocation["latitude"], geolocation["longitude"]) country = None if not hide_full_address: - full_address = geolocation['address'] + full_address = geolocation["address"] else: full_address = None else: - location = NOT_FOUND_LOCATION #location[1:] + location = NOT_FOUND_LOCATION # location[1:] - return location, \ - override_location_name, \ - full_address, \ - country, \ - query_source_location, \ - hemisphere + return ( + location, + override_location_name, + full_address, + country, + query_source_location, + hemisphere, + ) def _main_(): - """ Validate cache entries. Print names of invalid cache entries - and move it to the "broken-entries" directory.""" + """Validate cache entries. Print names of invalid cache entries + and move it to the "broken-entries" directory.""" import glob import shutil @@ -495,7 +532,11 @@ def _main_(): city, region, country = data if any(x in city for x in "0123456789"): print(city) - shutil.move(filename, os.path.join("/wttr.in/cache/ip2l-broken-format", ip_address)) + shutil.move( + filename, + os.path.join("/wttr.in/cache/ip2l-broken-format", ip_address), + ) + def _trace_ip(): @@ -503,7 +544,8 @@ def _trace_ip(): print(_get_location("108.5.186.108")) print(location_processing("", "108.5.186.108")) + if __name__ == "__main__": _trace_ip() - #_main_() - #print(_geoip("173.216.90.56")) + # _main_() + # print(_geoip("173.216.90.56"))