Fix formatting in lib/wttr_srv.py

pull/1062/head
Igor Chubin 1 month ago
parent 6b7ca7d60a
commit 6964912fb0

@ -15,14 +15,27 @@ from flask import render_template, send_file, make_response
import fmt.png import fmt.png
import parse_query import parse_query
from translations import get_message, FULL_TRANSLATION, PARTIAL_TRANSLATION, SUPPORTED_LANGS from translations import (
get_message,
FULL_TRANSLATION,
PARTIAL_TRANSLATION,
SUPPORTED_LANGS,
)
from buttons import add_buttons from buttons import add_buttons
from globals import get_help_file, remove_ansi, TRANSLATION_TABLE, \ from globals import (
BASH_FUNCTION_FILE, TRANSLATION_FILE, LOG_FILE, \ get_help_file,
NOT_FOUND_LOCATION, \ remove_ansi,
MALFORMED_RESPONSE_HTML_PAGE, \ TRANSLATION_TABLE,
PLAIN_TEXT_AGENTS, PLAIN_TEXT_PAGES, \ BASH_FUNCTION_FILE,
MY_EXTERNAL_IP, QUERY_LIMITS TRANSLATION_FILE,
LOG_FILE,
NOT_FOUND_LOCATION,
MALFORMED_RESPONSE_HTML_PAGE,
PLAIN_TEXT_AGENTS,
PLAIN_TEXT_PAGES,
MY_EXTERNAL_IP,
QUERY_LIMITS,
)
from location import is_location_blocked, location_processing from location import is_location_blocked, location_processing
from limits import Limits from limits import Limits
from view.wttr import get_wetter from view.wttr import get_wetter
@ -33,49 +46,53 @@ import cache
if not os.path.exists(os.path.dirname(LOG_FILE)): if not os.path.exists(os.path.dirname(LOG_FILE)):
os.makedirs(os.path.dirname(LOG_FILE)) os.makedirs(os.path.dirname(LOG_FILE))
logging.basicConfig(filename=LOG_FILE, level=logging.INFO, format='%(asctime)s %(message)s') logging.basicConfig(
filename=LOG_FILE, level=logging.INFO, format="%(asctime)s %(message)s"
)
LIMITS = Limits(whitelist=[MY_EXTERNAL_IP], limits=QUERY_LIMITS) LIMITS = Limits(whitelist=[MY_EXTERNAL_IP], limits=QUERY_LIMITS)
TASKS = ThreadPool(25) TASKS = ThreadPool(25)
def show_text_file(name, lang): def show_text_file(name, lang):
""" """
show static file `name` for `lang` show static file `name` for `lang`
""" """
text = "" text = ""
if name == ":help": if name == ":help":
text = open(get_help_file(lang), 'r').read() text = open(get_help_file(lang), "r").read()
text = text.replace('FULL_TRANSLATION', ' '.join(FULL_TRANSLATION)) text = text.replace("FULL_TRANSLATION", " ".join(FULL_TRANSLATION))
text = text.replace('PARTIAL_TRANSLATION', ' '.join(PARTIAL_TRANSLATION)) text = text.replace("PARTIAL_TRANSLATION", " ".join(PARTIAL_TRANSLATION))
elif name == ":bash.function": elif name == ":bash.function":
text = open(BASH_FUNCTION_FILE, 'r').read() text = open(BASH_FUNCTION_FILE, "r").read()
elif name == ":iterm2": elif name == ":iterm2":
text = open("share/iterm2.txt", 'r').read() text = open("share/iterm2.txt", "r").read()
elif name == ":translation": elif name == ":translation":
text = open(TRANSLATION_FILE, 'r').read() text = open(TRANSLATION_FILE, "r").read()
text = text\ text = text.replace("NUMBER_OF_LANGUAGES", str(len(SUPPORTED_LANGS))).replace(
.replace('NUMBER_OF_LANGUAGES', str(len(SUPPORTED_LANGS)))\ "SUPPORTED_LANGUAGES", " ".join(SUPPORTED_LANGS)
.replace('SUPPORTED_LANGUAGES', ' '.join(SUPPORTED_LANGS)) )
return text return text
def _client_ip_address(request): def _client_ip_address(request):
"""Return client ip address for flask `request`. """Return client ip address for flask `request`."""
"""
if request.headers.getlist("X-PNG-Query-For"): if request.headers.getlist("X-PNG-Query-For"):
ip_addr = request.headers.getlist("X-PNG-Query-For")[0] ip_addr = request.headers.getlist("X-PNG-Query-For")[0]
if ip_addr.startswith('::ffff:'): if ip_addr.startswith("::ffff:"):
ip_addr = ip_addr[7:] ip_addr = ip_addr[7:]
elif request.headers.getlist("X-Forwarded-For"): elif request.headers.getlist("X-Forwarded-For"):
ip_addr = request.headers.getlist("X-Forwarded-For")[0] ip_addr = request.headers.getlist("X-Forwarded-For")[0]
if ip_addr.startswith('::ffff:'): if ip_addr.startswith("::ffff:"):
ip_addr = ip_addr[7:] ip_addr = ip_addr[7:]
else: else:
ip_addr = request.remote_addr ip_addr = request.remote_addr
return ip_addr return ip_addr
def _parse_language_header(header): def _parse_language_header(header):
""" """
>>> _parse_language_header("en-US,en;q=0.9") >>> _parse_language_header("en-US,en;q=0.9")
@ -103,18 +120,18 @@ def _parse_language_header(header):
return locale_q_pairs return locale_q_pairs
def _find_supported_language(accepted_languages): def _find_supported_language(accepted_languages):
def supported_langs(): def supported_langs():
"""Yields all pairs in the Accept-Language header """Yields all pairs in the Accept-Language header
supported in SUPPORTED_LANGS or None if 'en' is the preferred""" supported in SUPPORTED_LANGS or None if 'en' is the preferred"""
for lang_tuple in accepted_languages: for lang_tuple in accepted_languages:
lang = lang_tuple[0] lang = lang_tuple[0]
if '-' in lang: if "-" in lang:
lang = lang.split('-', 1)[0] lang = lang.split("-", 1)[0]
if lang in SUPPORTED_LANGS: if lang in SUPPORTED_LANGS:
yield lang, lang_tuple[1] yield lang, lang_tuple[1]
elif lang == 'en': elif lang == "en":
yield None, lang_tuple[1] yield None, lang_tuple[1]
try: try:
return max(supported_langs(), key=lambda lang_tuple: lang_tuple[1])[0] return max(supported_langs(), key=lambda lang_tuple: lang_tuple[1])[0]
except ValueError: except ValueError:
@ -122,6 +139,7 @@ def _parse_language_header(header):
return _find_supported_language(_parse_accept_language(header)) return _find_supported_language(_parse_accept_language(header))
def get_answer_language_and_view(request): def get_answer_language_and_view(request):
""" """
Return preferred answer language based on Return preferred answer language based on
@ -130,24 +148,25 @@ def get_answer_language_and_view(request):
lang = None lang = None
view_name = None view_name = None
hostname = request.headers['Host'] hostname = request.headers["Host"]
if hostname != 'wttr.in' and hostname.endswith('.wttr.in'): if hostname != "wttr.in" and hostname.endswith(".wttr.in"):
lang = hostname[:-8] lang = hostname[:-8]
if lang.startswith("v2") or lang.startswith("v3"): if lang.startswith("v2") or lang.startswith("v3"):
view_name = lang view_name = lang
lang = None lang = None
if 'lang' in request.args: if "lang" in request.args:
lang = request.args.get('lang') lang = request.args.get("lang")
if lang.lower() == 'none': if lang.lower() == "none":
lang = None lang = None
header_accept_language = request.headers.get('Accept-Language', '') header_accept_language = request.headers.get("Accept-Language", "")
if lang is None and header_accept_language: if lang is None and header_accept_language:
lang = _parse_language_header(header_accept_language) lang = _parse_language_header(header_accept_language)
return lang, view_name return lang, view_name
def get_output_format(query, parsed_query): def get_output_format(query, parsed_query):
""" """
Return preferred output format: ansi, text, html or png Return preferred output format: ansi, text, html or png
@ -155,23 +174,28 @@ def get_output_format(query, parsed_query):
Return new location (can be rewritten) Return new location (can be rewritten)
""" """
if ('view' in query if (
(
"view" in query
and not query["view"].startswith("v2") and not query["view"].startswith("v2")
and not query["view"].startswith("v3")) \ and not query["view"].startswith("v3")
or parsed_query.get("png_filename") \ )
or query.get('force-ansi'): or parsed_query.get("png_filename")
or query.get("force-ansi")
):
return False return False
user_agent = parsed_query.get("user_agent", "").lower() user_agent = parsed_query.get("user_agent", "").lower()
html_output = not any(agent in user_agent for agent in PLAIN_TEXT_AGENTS) html_output = not any(agent in user_agent for agent in PLAIN_TEXT_AGENTS)
return html_output return html_output
def _cyclic_location_selection(locations, period): def _cyclic_location_selection(locations, period):
"""Return one of `locations` (: separated list) """Return one of `locations` (: separated list)
basing on the current time and query interval `period` basing on the current time and query interval `period`
""" """
locations = locations.split(':') locations = locations.split(":")
max_len = max(len(x) for x in locations) max_len = max(len(x) for x in locations)
locations = [x.rjust(max_len) for x in locations] locations = [x.rjust(max_len) for x in locations]
@ -180,7 +204,7 @@ def _cyclic_location_selection(locations, period):
except ValueError: except ValueError:
period = 1 period = 1
index = int(time.time()/period) % len(locations) index = int(time.time() / period) % len(locations)
return locations[index] return locations[index]
@ -195,31 +219,32 @@ def _response(parsed_query, query, fast_mode=False):
parsed_query["user_agent"], parsed_query["user_agent"],
parsed_query["request_url"], parsed_query["request_url"],
parsed_query["ip_addr"], parsed_query["ip_addr"],
parsed_query["lang"]) parsed_query["lang"],
)
answer = cache.get(cache_signature) answer = cache.get(cache_signature)
if parsed_query['orig_location'] in PLAIN_TEXT_PAGES: if parsed_query["orig_location"] in PLAIN_TEXT_PAGES:
answer = show_text_file(parsed_query['orig_location'], parsed_query['lang']) answer = show_text_file(parsed_query["orig_location"], parsed_query["lang"])
if parsed_query['html_output']: if parsed_query["html_output"]:
answer = render_template('index.html', body=answer) answer = render_template("index.html", body=answer)
if answer or fast_mode: if answer or fast_mode:
return answer return answer
# at this point, we could not handle the query fast, # at this point, we could not handle the query fast,
# so we handle it with all available logic # so we handle it with all available logic
loc = (parsed_query['orig_location'] or "").lower() loc = (parsed_query["orig_location"] or "").lower()
if parsed_query.get("view"): if parsed_query.get("view"):
if not parsed_query.get("location"): if not parsed_query.get("location"):
parsed_query["location"] = loc parsed_query["location"] = loc
output = wttr_line(query, parsed_query) output = wttr_line(query, parsed_query)
elif loc == 'moon' or loc.startswith('moon@'): elif loc == "moon" or loc.startswith("moon@"):
output = get_moon(parsed_query) output = get_moon(parsed_query)
else: else:
output = get_wetter(parsed_query) output = get_wetter(parsed_query)
if parsed_query.get('png_filename'): if parsed_query.get("png_filename"):
if parsed_query.get("view") != "v3": if parsed_query.get("view") != "v3":
# originally it was just a usual function call, # originally it was just a usual function call,
# but it was a blocking call, so it was moved # but it was a blocking call, so it was moved
@ -227,24 +252,29 @@ def _response(parsed_query, query, fast_mode=False):
# #
# output = fmt.png.render_ansi( # output = fmt.png.render_ansi(
# output, options=parsed_query) # output, options=parsed_query)
result = TASKS.spawn(fmt.png.render_ansi, cache._update_answer(output), options=parsed_query) result = TASKS.spawn(
fmt.png.render_ansi, cache._update_answer(output), options=parsed_query
)
output = result.get() output = result.get()
else: else:
if query.get('days', '3') != '0' \ if (
and not query.get('no-follow-line') \ query.get("days", "3") != "0"
and ((parsed_query.get("view") or "v2")[:2] in ["v2", "v3"]): and not query.get("no-follow-line")
if parsed_query['html_output']: and ((parsed_query.get("view") or "v2")[:2] in ["v2", "v3"])
):
if parsed_query["html_output"]:
output = add_buttons(output) output = add_buttons(output)
else: else:
message = get_message('FOLLOW_ME', parsed_query['lang']) message = get_message("FOLLOW_ME", parsed_query["lang"])
if parsed_query.get('no-terminal', False): if parsed_query.get("no-terminal", False):
message = remove_ansi(message) message = remove_ansi(message)
if parsed_query.get('dumb', False): if parsed_query.get("dumb", False):
message = message.translate(TRANSLATION_TABLE) message = message.translate(TRANSLATION_TABLE)
output += '\n' + message + '\n' output += "\n" + message + "\n"
return cache.store(cache_signature, output) return cache.store(cache_signature, output)
def parse_request(location, request, query, fast_mode=False): def parse_request(location, request, query, fast_mode=False):
"""Parse request and provided extended information for the query, """Parse request and provided extended information for the query,
including location data, language, output format, view, etc. including location data, language, output format, view, etc.
@ -279,14 +309,14 @@ def parse_request(location, request, query, fast_mode=False):
if location is not None and location.lower().endswith(".png"): if location is not None and location.lower().endswith(".png"):
png_filename = location png_filename = location
location = location[:-4] location = location[:-4]
if location and ':' in location and location[0] != ":": if location and ":" in location and location[0] != ":":
location = _cyclic_location_selection(location, query.get('period', 1)) location = _cyclic_location_selection(location, query.get("period", 1))
parsed_query = { parsed_query = {
'ip_addr': _client_ip_address(request), "ip_addr": _client_ip_address(request),
'user_agent': request.headers.get('User-Agent', '').lower(), "user_agent": request.headers.get("User-Agent", "").lower(),
'request_url': request.url, "request_url": request.url,
} }
if png_filename: if png_filename:
parsed_query["png_filename"] = png_filename parsed_query["png_filename"] = png_filename
@ -302,24 +332,35 @@ def parse_request(location, request, query, fast_mode=False):
parsed_query["html_output"] = get_output_format(query, parsed_query) parsed_query["html_output"] = get_output_format(query, parsed_query)
parsed_query["json_output"] = (parsed_query.get("view", "") or "").startswith("j") parsed_query["json_output"] = (parsed_query.get("view", "") or "").startswith("j")
if not fast_mode: # not png_filename and not fast_mode: if not fast_mode: # not png_filename and not fast_mode:
location, override_location_name, full_address, country, query_source_location, hemisphere = \ (
location_processing(parsed_query["location"], parsed_query["ip_addr"]) location,
override_location_name,
us_ip = query_source_location[2] in ["United States", "United States of America"] \ full_address,
and 'slack' not in parsed_query['user_agent'] country,
query_source_location,
hemisphere,
) = location_processing(parsed_query["location"], parsed_query["ip_addr"])
us_ip = (
query_source_location[2] in ["United States", "United States of America"]
and "slack" not in parsed_query["user_agent"]
)
query = parse_query.metric_or_imperial(query, lang, us_ip=us_ip) query = parse_query.metric_or_imperial(query, lang, us_ip=us_ip)
if country and location != NOT_FOUND_LOCATION: if country and location != NOT_FOUND_LOCATION:
location = "%s,%s" % (location, country) location = "%s,%s" % (location, country)
parsed_query.update({ parsed_query.update(
'location': location, {
'override_location_name': override_location_name, "location": location,
'full_address': full_address, "override_location_name": override_location_name,
'country': country, "full_address": full_address,
'query_source_location': query_source_location, "country": country,
'hemisphere': hemisphere}) "query_source_location": query_source_location,
"hemisphere": hemisphere,
}
)
parsed_query.update(query) parsed_query.update(query)
return parsed_query return parsed_query
@ -332,21 +373,23 @@ def wttr(location, request):
""" """
def _wrap_response(response_text, html_output, json_output, png_filename=None): def _wrap_response(response_text, html_output, json_output, png_filename=None):
if not isinstance(response_text, str) and \ if not isinstance(response_text, str) and not isinstance(response_text, bytes):
not isinstance(response_text, bytes):
return response_text return response_text
if png_filename: if png_filename:
response = make_response(send_file( response = make_response(
io.BytesIO(response_text), send_file(
attachment_filename=png_filename, io.BytesIO(response_text),
mimetype='image/png')) attachment_filename=png_filename,
mimetype="image/png",
)
)
for key, value in { for key, value in {
'Cache-Control': 'no-cache, no-store, must-revalidate', "Cache-Control": "no-cache, no-store, must-revalidate",
'Pragma': 'no-cache', "Pragma": "no-cache",
'Expires': '0', "Expires": "0",
}.items(): }.items():
response.headers[key] = value response.headers[key] = value
else: else:
response = make_response(response_text) response = make_response(response_text)
@ -359,12 +402,12 @@ def wttr(location, request):
return response return response
if is_location_blocked(location): if is_location_blocked(location):
return ("", 403) # Forbidden return ("", 403) # Forbidden
try: try:
LIMITS.check_ip(_client_ip_address(request)) LIMITS.check_ip(_client_ip_address(request))
except RuntimeError as exception: except RuntimeError as exception:
return (str(exception), 429) # Too many requests return (str(exception), 429) # Too many requests
query = parse_query.parse_query(request.args) query = parse_query.parse_query(request.args)
@ -408,29 +451,37 @@ def wttr(location, request):
if not response: if not response:
parsed_query = parse_request(location, request, query) parsed_query = parse_request(location, request, query)
response = _response(parsed_query, query) response = _response(parsed_query, query)
#if not response or (isinstance(response, str) and not response.strip()): # if not response or (isinstance(response, str) and not response.strip()):
# return RuntimeError("Empty answer") # return RuntimeError("Empty answer")
if parsed_query["location"] == NOT_FOUND_LOCATION: if parsed_query["location"] == NOT_FOUND_LOCATION:
http_code = 404 http_code = 404
# pylint: disable=broad-except # pylint: disable=broad-except
except Exception as exception: except Exception:
logging.error("Exception has occured", exc_info=1) logging.error("Exception has occured", exc_info=1)
if parsed_query['html_output']: if parsed_query["html_output"]:
response = MALFORMED_RESPONSE_HTML_PAGE response = MALFORMED_RESPONSE_HTML_PAGE
http_code = 500 # Internal Server Error http_code = 500 # Internal Server Error
else: else:
response = get_message('CAPACITY_LIMIT_REACHED', parsed_query['lang']) response = get_message("CAPACITY_LIMIT_REACHED", parsed_query["lang"])
http_code = 503 # Service Unavailable http_code = 503 # Service Unavailable
# if exception is occured, we return not a png file but text # if exception is occured, we return not a png file but text
if "png_filename" in parsed_query: if "png_filename" in parsed_query:
del parsed_query["png_filename"] del parsed_query["png_filename"]
return (_wrap_response( return (
response, parsed_query['html_output'], parsed_query['json_output'], _wrap_response(
png_filename=parsed_query.get('png_filename')), http_code) response,
parsed_query["html_output"],
parsed_query["json_output"],
png_filename=parsed_query.get("png_filename"),
),
http_code,
)
if __name__ == "__main__": if __name__ == "__main__":
import doctest import doctest
doctest.testmod() doctest.testmod()

Loading…
Cancel
Save