Fix formatting of lib/location.py

pull/1062/head
Igor Chubin 1 month ago
parent 2dd407fb5c
commit 981836fe3c

@ -46,8 +46,18 @@ import geoip2.database
import pycountry import pycountry
import requests import requests
from globals import GEOLITE, GEOLOCATOR_SERVICE, IP2LCACHE, IP2LOCATION_KEY, NOT_FOUND_LOCATION, \ from globals import (
ALIASES, BLACKLIST, IATA_CODES_FILE, IPLOCATION_ORDER, IPINFO_TOKEN GEOLITE,
GEOLOCATOR_SERVICE,
IP2LCACHE,
IP2LOCATION_KEY,
NOT_FOUND_LOCATION,
ALIASES,
BLACKLIST,
IATA_CODES_FILE,
IPLOCATION_ORDER,
IPINFO_TOKEN,
)
GEOIP_READER = geoip2.database.Reader(GEOLITE) GEOIP_READER = geoip2.database.Reader(GEOLITE)
COUNTRY_MAP = {"Russian Federation": "Russia"} COUNTRY_MAP = {"Russian Federation": "Russia"}
@ -56,7 +66,7 @@ COUNTRY_MAP = {"Russian Federation": "Russia"}
def _debug_log(s): def _debug_log(s):
if os.environ.get("WTTR_DEBUG_LOCATION"): if os.environ.get("WTTR_DEBUG_LOCATION"):
with open("/tmp/location-debug.log", "a") as f: with open("/tmp/location-debug.log", "a") as f:
f.write("%s %s\n" % (datetime.datetime.now(),s)) f.write("%s %s\n" % (datetime.datetime.now(), s))
def _is_ip(ip_addr): def _is_ip(ip_addr):
@ -82,13 +92,13 @@ def _location_normalize(location):
""" """
Normalize location name `location` Normalize location name `location`
""" """
#translation_table = dict.fromkeys(map(ord, '!@#$*;'), None) # translation_table = dict.fromkeys(map(ord, '!@#$*;'), None)
def _remove_chars(chars, string): def _remove_chars(chars, string):
return ''.join(x for x in string if x not in chars) return "".join(x for x in string if x not in chars)
location = location.lower().replace('_', ' ').replace('+', ' ').strip() location = location.lower().replace("_", " ").replace("+", " ").strip()
if not location.startswith('moon@'): if not location.startswith("moon@"):
location = _remove_chars(r'!@#$*;:\\', location) location = _remove_chars(r"!@#$*;:\\", location)
return location return location
@ -100,9 +110,11 @@ def _geolocator(location):
try: try:
if random.random() < 0: if random.random() < 0:
geo = requests.get('%s/%s' % (GEOLOCATOR_SERVICE, location)).text geo = requests.get("%s/%s" % (GEOLOCATOR_SERVICE, location)).text
else: else:
geo = requests.get("http://127.0.0.1:8085/:geo-location?location=%s" % location).text geo = requests.get(
"http://127.0.0.1:8085/:geo-location?location=%s" % location
).text
except requests.exceptions.ConnectionError as exception: except requests.exceptions.ConnectionError as exception:
print("ERROR: %s" % exception) print("ERROR: %s" % exception)
return None return None
@ -111,7 +123,7 @@ def _geolocator(location):
return None return None
try: try:
answer = json.loads(geo.encode('utf-8')) answer = json.loads(geo.encode("utf-8"))
if "error" in answer: if "error" in answer:
return None return None
return answer return answer
@ -123,38 +135,40 @@ def _geolocator(location):
def _ipcachewrite(ip_addr, location): def _ipcachewrite(ip_addr, location):
""" Write a retrieved ip+location into cache """Write a retrieved ip+location into cache
Can stress some filesystems after long term use, see Can stress some filesystems after long term use, see
https://stackoverflow.com/questions/466521/how-many-files-can-i-put-in-a-directory https://stackoverflow.com/questions/466521/how-many-files-can-i-put-in-a-directory
Expects a location of the form: Expects a location of the form:
`(city, region, country, country_code, <lat>, <long>)` `(city, region, country, country_code, <lat>, <long>)`
Writes a cache entry of the form: Writes a cache entry of the form:
`country_code;country;region;city;<lat>;<long>` `country_code;country;region;city;<lat>;<long>`
The latitude and longitude are optional elements. The latitude and longitude are optional elements.
""" """
return return
cachefile = os.path.join(IP2LCACHE, ip_addr) cachefile = os.path.join(IP2LCACHE, ip_addr)
if not os.path.exists(IP2LCACHE): if not os.path.exists(IP2LCACHE):
os.makedirs(IP2LCACHE) os.makedirs(IP2LCACHE)
with open(cachefile, 'w') as file: with open(cachefile, "w") as file:
# like ip2location format # like ip2location format
file.write(location[3] + ';' + location[2] + ';' + location[1] + ';' + location[0]) file.write(
location[3] + ";" + location[2] + ";" + location[1] + ";" + location[0]
)
if len(location) > 4: if len(location) > 4:
file.write(';' + ';'.join(map(str, location[4:]))) file.write(";" + ";".join(map(str, location[4:])))
def _ipcache(ip_addr): def _ipcache(ip_addr):
""" Retrieve a location from cache by ip addr """Retrieve a location from cache by ip addr
Returns a triple of (CITY, REGION, COUNTRY) or None Returns a triple of (CITY, REGION, COUNTRY) or None
TODO: When cache becomes more robust, transition to using latlong TODO: When cache becomes more robust, transition to using latlong
""" """
## Use Geo IP service when available ## Use Geo IP service when available
r = requests.get("http://127.0.0.1:8085/:geo-ip-get?ip=%s" % ip_addr) r = requests.get("http://127.0.0.1:8085/:geo-ip-get?ip=%s" % ip_addr)
if r.status_code == 200 and ";" in r.text: if r.status_code == 200 and ";" in r.text:
_, country, region, city, *_ = r.text.split(';') _, country, region, city, *_ = r.text.split(";")
return city, region, country return city, region, country
return None return None
@ -175,12 +189,12 @@ def _ipcache(ip_addr):
def _ip2location(ip_addr): def _ip2location(ip_addr):
""" Convert IP address `ip_addr` to a location name using ip2location. """Convert IP address `ip_addr` to a location name using ip2location.
Return list of location data fields: Return list of location data fields:
[ccode, country, region, city, rest...] [ccode, country, region, city, rest...]
Return `None` if an error occured. Return `None` if an error occured.
""" """
# if IP2LOCATION_KEY is not set, do not query, # if IP2LOCATION_KEY is not set, do not query,
@ -190,12 +204,13 @@ def _ip2location(ip_addr):
try: try:
_debug_log("[_ip2location] %s search" % ip_addr) _debug_log("[_ip2location] %s search" % ip_addr)
r = requests.get( r = requests.get(
'http://api.ip2location.com/?ip=%s&key=%s&package=WS3' # WS5 provides latlong "http://api.ip2location.com/?ip=%s&key=%s&package=WS3" # WS5 provides latlong
% (ip_addr, IP2LOCATION_KEY)) % (ip_addr, IP2LOCATION_KEY)
)
r.raise_for_status() r.raise_for_status()
location = r.text location = r.text
parts = location.split(';') parts = location.split(";")
if len(parts) >= 4: if len(parts) >= 4:
# ccode, country, region, city, (rest) # ccode, country, region, city, (rest)
_debug_log("[_ip2location] %s found" % ip_addr) _debug_log("[_ip2location] %s found" % ip_addr)
@ -209,14 +224,17 @@ def _ipinfo(ip_addr):
if not IPINFO_TOKEN: if not IPINFO_TOKEN:
return None return None
try: try:
r = requests.get( r = requests.get("https://ipinfo.io/%s/json?token=%s" % (ip_addr, IPINFO_TOKEN))
'https://ipinfo.io/%s/json?token=%s'
% (ip_addr, IPINFO_TOKEN))
r.raise_for_status() r.raise_for_status()
r_json = r.json() r_json = r.json()
# can't do two unpackings on one line # can't do two unpackings on one line
city, region, country, ccode = r_json["city"], r_json["region"], '', r_json["country"], city, region, country, ccode = (
lat, long = r_json["loc"].split(',') r_json["city"],
r_json["region"],
"",
r_json["country"],
)
lat, long = r_json["loc"].split(",")
# NOTE: ipinfo only provides ISO codes for countries # NOTE: ipinfo only provides ISO codes for countries
country = pycountry.countries.get(alpha_2=ccode).name country = pycountry.countries.get(alpha_2=ccode).name
except (requests.exceptions.RequestException, ValueError): except (requests.exceptions.RequestException, ValueError):
@ -230,12 +248,26 @@ def _geoip(ip_addr):
_debug_log("[_geoip] %s search" % ip_addr) _debug_log("[_geoip] %s search" % ip_addr)
response = GEOIP_READER.city(ip_addr) response = GEOIP_READER.city(ip_addr)
# print(response.subdivisions) # print(response.subdivisions)
city, region, country, ccode, lat, long = response.city.name, response.subdivisions[0].names["en"], response.country.name, response.country.iso_code, response.location.latitude, response.location.longitude city, region, country, ccode, lat, long = (
response.city.name,
response.subdivisions[0].names["en"],
response.country.name,
response.country.iso_code,
response.location.latitude,
response.location.longitude,
)
_debug_log("[_geoip] %s found" % ip_addr) _debug_log("[_geoip] %s found" % ip_addr)
except IndexError: except IndexError:
# Tuple error # Tuple error
try: try:
city, region, country, ccode, lat, long = response.city.name, None, response.country.name, response.country.iso_code, response.location.latitude, response.location.longitude city, region, country, ccode, lat, long = (
response.city.name,
None,
response.country.name,
response.country.iso_code,
response.location.latitude,
response.location.longitude,
)
except IndexError: except IndexError:
return None return None
except (geoip2.errors.AddressNotFoundError): except (geoip2.errors.AddressNotFoundError):
@ -261,11 +293,11 @@ def _get_location(ip_addr):
# location from iplocators have the following order: # location from iplocators have the following order:
# (CITY, REGION, COUNTRY, CCODE, LAT, LONG) # (CITY, REGION, COUNTRY, CCODE, LAT, LONG)
for method in IPLOCATION_ORDER: for method in IPLOCATION_ORDER:
if method == 'geoip': if method == "geoip":
location = _geoip(ip_addr) location = _geoip(ip_addr)
elif method == 'ip2location': elif method == "ip2location":
location = _ip2location(ip_addr) location = _ip2location(ip_addr)
elif method == 'ipinfo': elif method == "ipinfo":
location = _ipinfo(ip_addr) location = _ipinfo(ip_addr)
else: else:
print("ERROR: invalid iplocation method specified: %s" % method) print("ERROR: invalid iplocation method specified: %s" % method)
@ -281,7 +313,7 @@ def _get_location(ip_addr):
# temporary disabled it because of geoip services capcacity # temporary disabled it because of geoip services capcacity
# #
#if city is None and response.location: # if city is None and response.location:
# coord = "%s, %s" % (response.location.latitude, response.location.longitude) # coord = "%s, %s" % (response.location.latitude, response.location.longitude)
# try: # try:
# location = geolocator.reverse(coord, language='en') # location = geolocator.reverse(coord, language='en')
@ -307,12 +339,12 @@ def _load_aliases(aliases_filename):
Load aliases from the aliases file Load aliases from the aliases file
""" """
aliases_db = {} aliases_db = {}
with open(aliases_filename, 'r') as f_aliases: with open(aliases_filename, "r") as f_aliases:
for line in f_aliases.readlines(): for line in f_aliases.readlines():
try: try:
from_, to_ = line.decode('utf-8').split(':', 1) from_, to_ = line.decode("utf-8").split(":", 1)
except AttributeError: except AttributeError:
from_, to_ = line.split(':', 1) from_, to_ = line.split(":", 1)
aliases_db[_location_normalize(from_)] = _location_normalize(to_) aliases_db[_location_normalize(from_)] = _location_normalize(to_)
return aliases_db return aliases_db
@ -322,7 +354,7 @@ def _load_iata_codes(iata_codes_filename):
""" """
Load IATA codes from the IATA codes file Load IATA codes from the IATA codes file
""" """
with open(iata_codes_filename, 'r') as f_iata_codes: with open(iata_codes_filename, "r") as f_iata_codes:
result = [] result = []
for line in f_iata_codes.readlines(): for line in f_iata_codes.readlines():
result.append(line.strip()) result.append(line.strip())
@ -330,7 +362,7 @@ def _load_iata_codes(iata_codes_filename):
LOCATION_ALIAS = _load_aliases(ALIASES) LOCATION_ALIAS = _load_aliases(ALIASES)
LOCATION_BLACK_LIST = [x.strip() for x in open(BLACKLIST, 'r').readlines()] LOCATION_BLACK_LIST = [x.strip() for x in open(BLACKLIST, "r").readlines()]
IATA_CODES = _load_iata_codes(IATA_CODES_FILE) IATA_CODES = _load_iata_codes(IATA_CODES_FILE)
@ -359,8 +391,8 @@ def _get_hemisphere(location):
def _fully_qualified_location(location, region, country): def _fully_qualified_location(location, region, country):
""" Return fully qualified location name with `region` and `country`, """Return fully qualified location name with `region` and `country`,
as a string. as a string.
""" """
# If country is not specified, location stays as is # If country is not specified, location stays as is
@ -388,8 +420,7 @@ def _fully_qualified_location(location, region, country):
def location_processing(location, ip_addr): def location_processing(location, ip_addr):
""" """ """
"""
# if location is starting with ~ # if location is starting with ~
# or has non ascii symbols # or has non ascii symbols
@ -397,23 +428,23 @@ def location_processing(location, ip_addr):
override_location_name = None override_location_name = None
full_address = None full_address = None
hide_full_address = False hide_full_address = False
force_show_full_address = location is not None and location.startswith('~') force_show_full_address = location is not None and location.startswith("~")
# location ~ means that it should be detected automatically, # location ~ means that it should be detected automatically,
# and shown in the location line below the report # and shown in the location line below the report
if location == '~': if location == "~":
location = None location = None
if location and location.lstrip('~ ').startswith('@'): if location and location.lstrip("~ ").startswith("@"):
try: try:
if (location.lstrip('~ ')[1:] == ""): if location.lstrip("~ ")[1:] == "":
location, region, country = NOT_FOUND_LOCATION, None, None location, region, country = NOT_FOUND_LOCATION, None, None
else: else:
location, region, country = _get_location( location, region, country = _get_location(
socket.gethostbyname( socket.gethostbyname(location.lstrip("~ ")[1:])
location.lstrip('~ ')[1:])) )
location = '~' + location location = "~" + location
location = _fully_qualified_location(location, region, country) location = _fully_qualified_location(location, region, country)
hide_full_address = not force_show_full_address hide_full_address = not force_show_full_address
@ -425,11 +456,11 @@ def location_processing(location, ip_addr):
# For moon queries, hemisphere must be found # For moon queries, hemisphere must be found
# True for North, False for South # True for North, False for South
hemisphere = False hemisphere = False
if location is not None and (location.lower()+"@").startswith("moon@"): if location is not None and (location.lower() + "@").startswith("moon@"):
hemisphere = _get_hemisphere(query_source_location) hemisphere = _get_hemisphere(query_source_location)
country = None country = None
if not location or location == 'MyLocation': if not location or location == "MyLocation":
location = ip_addr location = ip_addr
if _is_ip(location): if _is_ip(location):
@ -438,11 +469,11 @@ def location_processing(location, ip_addr):
# here too # here too
if location: if location:
location = '~' + location location = "~" + location
location = _fully_qualified_location(location, region, country) location = _fully_qualified_location(location, region, country)
hide_full_address = not force_show_full_address hide_full_address = not force_show_full_address
if location and not location.startswith('~'): if location and not location.startswith("~"):
tmp_location = _location_canonical_name(location) tmp_location = _location_canonical_name(location)
if tmp_location != location: if tmp_location != location:
override_location_name = location override_location_name = location
@ -452,38 +483,44 @@ def location_processing(location, ip_addr):
# contains some unicode symbols # contains some unicode symbols
# here we resolve them # here we resolve them
if location is not None and location != NOT_FOUND_LOCATION: if location is not None and location != NOT_FOUND_LOCATION:
location = "~" + location.lstrip('~ ') location = "~" + location.lstrip("~ ")
if not override_location_name: if not override_location_name:
override_location_name = location.lstrip('~') override_location_name = location.lstrip("~")
# if location is not None and location.upper() in IATA_CODES: # if location is not None and location.upper() in IATA_CODES:
# location = '~%s' % location # location = '~%s' % location
if location is not None and not location.startswith("~-,") and location.startswith('~'): if (
location is not None
and not location.startswith("~-,")
and location.startswith("~")
):
geolocation = _geolocator(_location_canonical_name(location[1:])) geolocation = _geolocator(_location_canonical_name(location[1:]))
if geolocation is not None: if geolocation is not None:
if not override_location_name: if not override_location_name:
override_location_name = location[1:].replace('+', ' ') override_location_name = location[1:].replace("+", " ")
location = "%s,%s" % (geolocation['latitude'], geolocation['longitude']) location = "%s,%s" % (geolocation["latitude"], geolocation["longitude"])
country = None country = None
if not hide_full_address: if not hide_full_address:
full_address = geolocation['address'] full_address = geolocation["address"]
else: else:
full_address = None full_address = None
else: else:
location = NOT_FOUND_LOCATION #location[1:] location = NOT_FOUND_LOCATION # location[1:]
return location, \ return (
override_location_name, \ location,
full_address, \ override_location_name,
country, \ full_address,
query_source_location, \ country,
hemisphere query_source_location,
hemisphere,
)
def _main_(): def _main_():
""" Validate cache entries. Print names of invalid cache entries """Validate cache entries. Print names of invalid cache entries
and move it to the "broken-entries" directory.""" and move it to the "broken-entries" directory."""
import glob import glob
import shutil import shutil
@ -495,7 +532,11 @@ def _main_():
city, region, country = data city, region, country = data
if any(x in city for x in "0123456789"): if any(x in city for x in "0123456789"):
print(city) print(city)
shutil.move(filename, os.path.join("/wttr.in/cache/ip2l-broken-format", ip_address)) shutil.move(
filename,
os.path.join("/wttr.in/cache/ip2l-broken-format", ip_address),
)
def _trace_ip(): def _trace_ip():
@ -503,7 +544,8 @@ def _trace_ip():
print(_get_location("108.5.186.108")) print(_get_location("108.5.186.108"))
print(location_processing("", "108.5.186.108")) print(location_processing("", "108.5.186.108"))
if __name__ == "__main__": if __name__ == "__main__":
_trace_ip() _trace_ip()
#_main_() # _main_()
#print(_geoip("173.216.90.56")) # print(_geoip("173.216.90.56"))

Loading…
Cancel
Save