Minor code cleanup

gregdan3/master
Igor Chubin 4 years ago
parent 41d4d3d789
commit 1b25e16a37

@ -31,38 +31,30 @@ Exports:
location_processing location_processing
is_location_blocked is_location_blocked
""" """
from __future__ import print_function from __future__ import print_function
import sys
import os
import json import json
import os
import socket import socket
import requests import sys
import geoip2.database import geoip2.database
import pycountry import pycountry
import requests
from globals import GEOLITE, GEOLOCATOR_SERVICE, IP2LCACHE, IP2LOCATION_KEY, NOT_FOUND_LOCATION, \ from globals import GEOLITE, GEOLOCATOR_SERVICE, IP2LCACHE, IP2LOCATION_KEY, NOT_FOUND_LOCATION, \
ALIASES, BLACKLIST, IATA_CODES_FILE, IPLOCATION_ORDER, IPINFO_TOKEN ALIASES, BLACKLIST, IATA_CODES_FILE, IPLOCATION_ORDER, IPINFO_TOKEN
GEOIP_READER = geoip2.database.Reader(GEOLITE) GEOIP_READER = geoip2.database.Reader(GEOLITE)
COUNTRY_MAP = {"Russian Federation": "Russia"} COUNTRY_MAP = {"Russian Federation": "Russia"}
def _debug_log(s): def _debug_log(s):
with open("/tmp/debug.log", "a") as f: with open("/tmp/debug.log", "a") as f:
f.write(s+"\n") f.write(s+"\n")
def ascii_only(string):
"Check if `string` contains only ASCII symbols"
try: def _is_ip(ip_addr):
for _ in range(5):
string = string.encode('utf-8')
return True
except UnicodeDecodeError:
return False
def is_ip(ip_addr):
""" """
Check if `ip_addr` looks like an IP Address Check if `ip_addr` looks like an IP Address
""" """
@ -80,7 +72,8 @@ def is_ip(ip_addr):
except socket.error: except socket.error:
return False return False
def location_normalize(location):
def _location_normalize(location):
""" """
Normalize location name `location` Normalize location name `location`
""" """
@ -94,8 +87,7 @@ def location_normalize(location):
return location return location
def _geolocator(location):
def geolocator(location):
""" """
Return a GPS pair for specified `location` or None Return a GPS pair for specified `location` or None
if nothing can be found if nothing can be found
@ -120,7 +112,7 @@ def geolocator(location):
return None return None
def ipcachewrite(ip_addr, location): def _ipcachewrite(ip_addr, location):
""" Write a retrieved ip+location into cache """ Write a retrieved ip+location into cache
Can stress some filesystems after long term use, see Can stress some filesystems after long term use, see
https://stackoverflow.com/questions/466521/how-many-files-can-i-put-in-a-directory https://stackoverflow.com/questions/466521/how-many-files-can-i-put-in-a-directory
@ -132,7 +124,8 @@ def ipcachewrite(ip_addr, location):
file.write(location[3] + ';' + location[2] + ';' + location[1] + ';' + location[0] + ';' + location[4] + ';' + location[5]) file.write(location[3] + ';' + location[2] + ';' + location[1] + ';' + location[0] + ';' + location[4] + ';' + location[5])
# like ip2location format # like ip2location format
def ipcache(ip_addr):
def _ipcache(ip_addr):
""" Retrieve a location from cache by ip addr """ Retrieve a location from cache by ip addr
Returns a triple of (CITY, REGION, COUNTRY) or None Returns a triple of (CITY, REGION, COUNTRY) or None
TODO: When cache becomes more robust, transition to using latlong TODO: When cache becomes more robust, transition to using latlong
@ -152,7 +145,7 @@ def ipcache(ip_addr):
return None return None
def ip2location(ip_addr): def _ip2location(ip_addr):
"""Convert IP address `ip_addr` to a location name""" """Convert IP address `ip_addr` to a location name"""
# if IP2LOCATION_KEY is not set, do not query, # if IP2LOCATION_KEY is not set, do not query,
# because the query wont be processed anyway # because the query wont be processed anyway
@ -171,7 +164,7 @@ def ip2location(ip_addr):
return city, region, country, ccode, lat, long return city, region, country, ccode, lat, long
def ipinfo(ip_addr): def _ipinfo(ip_addr):
if not IPINFO_TOKEN: if not IPINFO_TOKEN:
return None return None
try: try:
@ -191,7 +184,7 @@ def ipinfo(ip_addr):
return city, region, country, ccode, lat, long return city, region, country, ccode, lat, long
def geoip(ip_addr): def _geoip(ip_addr):
try: try:
response = GEOIP_READER.city(ip_addr) response = GEOIP_READER.city(ip_addr)
city, region, country, ccode, lat, long = response.city.name, response.subdivisions.name, response.country.name, response.country.iso_code, response.location.latitude, response.location.longitude city, region, country, ccode, lat, long = response.city.name, response.subdivisions.name, response.country.name, response.country.iso_code, response.location.latitude, response.location.longitude
@ -200,17 +193,18 @@ def geoip(ip_addr):
return city, region, country, ccode, lat, long return city, region, country, ccode, lat, long
def workaround(country): def _country_name_workaround(country):
# workaround for strange bug with the country name # workaround for strange bug with the country name
# maybe some other countries has this problem too # maybe some other countries has this problem too
country = COUNTRY_MAP.get(country) or country country = COUNTRY_MAP.get(country) or country
return country return country
def get_location(ip_addr):
def _get_location(ip_addr):
""" """
Return location triple (CITY, REGION, COUNTRY) for `ip_addr` Return location triple (CITY, REGION, COUNTRY) for `ip_addr`
""" """
location = ipcache(ip_addr) location = _ipcache(ip_addr)
if location: if location:
return location return location
@ -218,20 +212,20 @@ def get_location(ip_addr):
# (CITY, REGION, COUNTRY, CCODE, LAT, LONG) # (CITY, REGION, COUNTRY, CCODE, LAT, LONG)
for method in IPLOCATION_ORDER: for method in IPLOCATION_ORDER:
if method == 'geoip': if method == 'geoip':
location = geoip(ip_addr) location = _geoip(ip_addr)
elif method == 'ip2location': elif method == 'ip2location':
location = ip2location(ip_addr) location = _ip2location(ip_addr)
elif method == 'ipinfo': elif method == 'ipinfo':
location = ipinfo(ip_addr) location = _ipinfo(ip_addr)
else: else:
print("ERROR: invalid iplocation method specified: %s" % method) print("ERROR: invalid iplocation method specified: %s" % method)
if location is not None: if location is not None:
break break
if location is not None and all(location): if location is not None and all(location):
ipcachewrite(ip_addr, location) _ipcachewrite(ip_addr, location)
# cache write used to happen before workaround, preserve that # cache write used to happen before workaround, preserve that
location[2] = workaround(location[2]) location[2] = _country_name_workaround(location[2])
return location[:3] # city, region, country return location[:3] # city, region, country
# ccode is cached but not needed for location # ccode is cached but not needed for location
@ -249,15 +243,15 @@ def get_location(ip_addr):
return NOT_FOUND_LOCATION, None, None return NOT_FOUND_LOCATION, None, None
def location_canonical_name(location): def _location_canonical_name(location):
"Find canonical name for `location`" "Find canonical name for `location`"
location = location_normalize(location) location = _location_normalize(location)
if location.lower() in LOCATION_ALIAS: if location.lower() in LOCATION_ALIAS:
return LOCATION_ALIAS[location.lower()] return LOCATION_ALIAS[location.lower()]
return location return location
def load_aliases(aliases_filename): def _load_aliases(aliases_filename):
""" """
Load aliases from the aliases file Load aliases from the aliases file
""" """
@ -269,10 +263,10 @@ def load_aliases(aliases_filename):
except AttributeError: except AttributeError:
from_, to_ = line.split(':', 1) from_, to_ = line.split(':', 1)
aliases_db[location_normalize(from_)] = location_normalize(to_) aliases_db[_location_normalize(from_)] = _location_normalize(to_)
return aliases_db return aliases_db
def load_iata_codes(iata_codes_filename): def _load_iata_codes(iata_codes_filename):
""" """
Load IATA codes from the IATA codes file Load IATA codes from the IATA codes file
""" """
@ -282,9 +276,9 @@ def load_iata_codes(iata_codes_filename):
result.append(line.strip()) result.append(line.strip())
return set(result) return set(result)
LOCATION_ALIAS = load_aliases(ALIASES) LOCATION_ALIAS = _load_aliases(ALIASES)
LOCATION_BLACK_LIST = [x.strip() for x in open(BLACKLIST, 'r').readlines()] LOCATION_BLACK_LIST = [x.strip() for x in open(BLACKLIST, 'r').readlines()]
IATA_CODES = load_iata_codes(IATA_CODES_FILE) IATA_CODES = _load_iata_codes(IATA_CODES_FILE)
def is_location_blocked(location): def is_location_blocked(location):
""" """
@ -294,7 +288,7 @@ def is_location_blocked(location):
return location is not None and location.lower() in LOCATION_BLACK_LIST return location is not None and location.lower() in LOCATION_BLACK_LIST
def get_hemisphere(location): def _get_hemisphere(location):
""" """
Return hemisphere of the location (True = North, False = South). Return hemisphere of the location (True = North, False = South).
Assume North and return True if location can't be found. Assume North and return True if location can't be found.
@ -302,11 +296,12 @@ def get_hemisphere(location):
if all(location): if all(location):
location_string = ", ".join(location) location_string = ", ".join(location)
geolocation = geolocator(location_string) geolocation = _geolocator(location_string)
if geolocation is None: if geolocation is None:
return True return True
return geolocation["latitude"] > 0 return geolocation["latitude"] > 0
def location_processing(location, ip_addr): def location_processing(location, ip_addr):
""" """
""" """
@ -326,7 +321,7 @@ def location_processing(location, ip_addr):
if location and location.lstrip('~ ').startswith('@'): if location and location.lstrip('~ ').startswith('@'):
try: try:
location, region, country = get_location( location, region, country = _get_location(
socket.gethostbyname( socket.gethostbyname(
location.lstrip('~ ')[1:])) location.lstrip('~ ')[1:]))
location = '~' + location location = '~' + location
@ -336,20 +331,20 @@ def location_processing(location, ip_addr):
except: except:
location, region, country = NOT_FOUND_LOCATION, None, None location, region, country = NOT_FOUND_LOCATION, None, None
query_source_location = get_location(ip_addr) query_source_location = _get_location(ip_addr)
# For moon queries, hemisphere must be found # For moon queries, hemisphere must be found
# True for North, False for South # True for North, False for South
hemisphere = False hemisphere = False
if location is not None and (location.lower()+"@").startswith("moon@"): if location is not None and (location.lower()+"@").startswith("moon@"):
hemisphere = get_hemisphere(query_source_location) hemisphere = _get_hemisphere(query_source_location)
country = None country = None
if not location or location == 'MyLocation': if not location or location == 'MyLocation':
location = ip_addr location = ip_addr
if is_ip(location): if _is_ip(location):
location, region, country = get_location(location) location, region, country = _get_location(location)
# location is just city here # location is just city here
# here too # here too
@ -360,7 +355,7 @@ def location_processing(location, ip_addr):
hide_full_address = not force_show_full_address hide_full_address = not force_show_full_address
if location and not location.startswith('~'): if location and not location.startswith('~'):
tmp_location = location_canonical_name(location) tmp_location = _location_canonical_name(location)
if tmp_location != location: if tmp_location != location:
override_location_name = location override_location_name = location
location = tmp_location location = tmp_location
@ -368,7 +363,7 @@ def location_processing(location, ip_addr):
# up to this point it is possible that the name # up to this point it is possible that the name
# contains some unicode symbols # contains some unicode symbols
# here we resolve them # here we resolve them
if location is not None: # and not ascii_only(location): if location is not None:
location = "~" + location.lstrip('~ ') location = "~" + location.lstrip('~ ')
if not override_location_name: if not override_location_name:
override_location_name = location.lstrip('~') override_location_name = location.lstrip('~')
@ -377,7 +372,7 @@ def location_processing(location, ip_addr):
# location = '~%s' % location # location = '~%s' % location
if location is not None and not location.startswith("~-,") and location.startswith('~'): if location is not None and not location.startswith("~-,") and location.startswith('~'):
geolocation = geolocator(location_canonical_name(location[1:])) geolocation = _geolocator(_location_canonical_name(location[1:]))
if geolocation is not None: if geolocation is not None:
if not override_location_name: if not override_location_name:
override_location_name = location[1:].replace('+', ' ') override_location_name = location[1:].replace('+', ' ')
@ -390,7 +385,6 @@ def location_processing(location, ip_addr):
else: else:
location = NOT_FOUND_LOCATION #location[1:] location = NOT_FOUND_LOCATION #location[1:]
return location, \ return location, \
override_location_name, \ override_location_name, \
full_address, \ full_address, \

Loading…
Cancel
Save