@ -1,225 +1,163 @@
# vim: set encoding=utf-8
# vim: set encoding=utf-8
# pylint: disable=wrong-import-position
from __future__ import print_function
import gevent
Main view (wttr.in) implementation.
from gevent.pywsgi import WSGIServer
The module is a wrapper for the modified Wego program.
from gevent.queue import Queue
from gevent.monkey import patch_all
from gevent.subprocess import Popen, PIPE, STDOUT
import sys
import sys
import os
import re
import re
import time
import hashlib
from gevent.subprocess import Popen, PIPE
sys.path.insert(0, "..")
sys.path.insert(0, "..")
from translations import get_message, SUPPORTED_LANGS
from globals import WEGO, CACHEDIR, \
error, remove_ansi
log, error, remove_ansi
def _is_invalid_location(location):
if '.png' in location:
return True
def get_wetter(parsed_query):
def get_wetter(parsed_query):
location = parsed_query['location']
location = parsed_query['location']
ip = parsed_query['ip_addr']
html = parsed_query['html_output']
html = parsed_query['html_output']
lang = parsed_query['lang']
lang = parsed_query['lang']
location_name = parsed_query['override_location_name']
full_address = parsed_query['full_address']
url = parsed_query['request_url']
local_url = url
local_location = location
def get_opengraph():
if local_url is None:
url = ""
url = local_url
if local_location is None:
location = ""
location = local_location
pic_url = url.replace('?', '_')
location_not_found = False
if location == NOT_FOUND_LOCATION:
return (
location_not_found = True
'<meta property="og:image" content="%(pic_url)s_0pq.png" />'
'<meta property="og:site_name" content="wttr.in" />'
'<meta property="og:type" content="profile" />'
'<meta property="og:url" content="%(url)s" />'
) % {
'pic_url': pic_url,
'url': url,
'location': location,
# '<meta property="og:title" content="Weather report: %(location)s" />'
# '<meta content="Partly cloudy // 6-8 °C // ↑ 9 km/h // 10 km // 0.4 mm" property="og:description" />'
def get_filename(location, lang=None, query=None, location_name=None):
location = location.replace('/', '_')
timestamp = time.strftime( "%Y%m%d%H", time.localtime() )
imperial_suffix = ''
if query.get('use_imperial', False):
imperial_suffix = '-imperial'
lang_suffix = ''
if not location_not_found:
if lang is not None:
stdout, stderr, returncode = _wego_wrapper(location, parsed_query)
lang_suffix = '-lang_%s' % lang
if query != None:
if location_not_found or returncode != 0:
query_line = "_" + "_".join("%s=%s" % (key, value) for (key, value) in query.items())
if ('Unable to find any matching weather'
' location to the parsed_query submitted') in stderr:
query_line = ""
stdout, stderr, returncode = _wego_wrapper(NOT_FOUND_LOCATION, parsed_query)
location_not_found = True
stdout += get_message('NOT_FOUND_MESSAGE', lang)
if location_name is None:
first_line, stdout = _wego_postprocessing(location, parsed_query, stdout)
location_name = ""
filename = "".join([timestamp, imperial_suffix, lang_suffix, query_line, location_name])
digest = hashlib.sha1(filename.encode('utf-8')).hexdigest()
return "%s/%s/%s" % (CACHEDIR, location, digest)
def save_weather_data(location, filename, lang=None, query=None, location_name=None, full_address=None):
if html:
return _htmlize(stdout, first_line, parsed_query)
return stdout
if _is_invalid_location( location ):
def _wego_wrapper(location, parsed_query):
error("Invalid location: %s" % location)
lang = parsed_query['lang']
while True:
location_name = parsed_query['override_location_name']
location_not_found = False
if location in [ "test-thunder" ]:
test_name = location[5:]
test_file = TEST_FILE.replace('NAME', test_name)
stdout = open(test_file, 'r').read()
stderr = ""
if location == NOT_FOUND_LOCATION:
location_not_found = True
cmd = [WEGO, '--city=%s' % location]
cmd = [WEGO, '--city=%s' % location]
if query.get('inverted_colors'):
if parsed_query.get('inverted_colors'):
cmd += ['-inverse']
cmd += ['-inverse']
if query.get('use_ms_for_wind'):
if parsed_query.get('use_ms_for_wind'):
cmd += ['-wind_in_ms']
cmd += ['-wind_in_ms']
if query.get('narrow'):
if parsed_query.get('narrow'):
cmd += ['-narrow']
cmd += ['-narrow']
if lang and lang in SUPPORTED_LANGS:
if lang and lang in SUPPORTED_LANGS:
cmd += ['-lang=%s'%lang]
cmd += ['-lang=%s'%lang]
if query.get('use_imperial', False):
if parsed_query.get('use_imperial', False):
cmd += ['-imperial']
cmd += ['-imperial']
if location_name:
if location_name:
cmd += ['-location_name', location_name]
cmd += ['-location_name', location_name]
p = Popen(cmd, stdout=PIPE, stderr=PIPE)
proc = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()
stdout, stderr = proc.communicate()
stdout = stdout.decode("utf-8")
stdout = stdout.decode("utf-8")
stderr = stderr.decode("utf-8")
stderr = stderr.decode("utf-8")
if p.returncode != 0:
return stdout, stderr, proc.returncode
print("ERROR: location not found: %s" % location)
if u'Unable to find any matching weather location to the query submitted' in stderr:
if location != NOT_FOUND_LOCATION:
NOT_FOUND_MESSAGE_HEADER = u"ERROR: %s: %s\n---\n\n" % (get_message('UNKNOWN_LOCATION', lang), location)
error(stdout + stderr)
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
if location_not_found:
def _wego_postprocessing(location, parsed_query, stdout):
stdout += get_message('NOT_FOUND_MESSAGE', lang)
full_address = parsed_query['full_address']
stdout = NOT_FOUND_MESSAGE_HEADER + stdout
lang = parsed_query['lang']
if 'days' in query:
if 'days' in parsed_query:
if query['days'] == '0':
if parsed_query['days'] == '0':
stdout = "\n".join(stdout.splitlines()[:7]) + "\n"
stdout = "\n".join(stdout.splitlines()[:7]) + "\n"
if query['days'] == '1':
if parsed_query['days'] == '1':
stdout = "\n".join(stdout.splitlines()[:17]) + "\n"
stdout = "\n".join(stdout.splitlines()[:17]) + "\n"
if query['days'] == '2':
if parsed_query['days'] == '2':
stdout = "\n".join(stdout.splitlines()[:27]) + "\n"
stdout = "\n".join(stdout.splitlines()[:27]) + "\n"
first = stdout.splitlines()[0]
first = stdout.splitlines()[0]
rest = stdout.splitlines()[1:]
rest = stdout.splitlines()[1:]
if query.get('no-caption', False):
if parsed_query.get('no-caption', False):
separator = None
if ':' in first:
if ':' in first:
separator = ':'
first = first.split(":", 1)[1]
if u':' in first:
separator = u':'
if separator:
first = first.split(separator, 1)[1]
stdout = "\n".join([first.strip()] + rest) + "\n"
stdout = "\n".join([first.strip()] + rest) + "\n"
if query.get('no-terminal', False):
if parsed_query.get('no-terminal', False):
stdout = remove_ansi(stdout)
stdout = remove_ansi(stdout)
if query.get('no-city', False):
if parsed_query.get('no-city', False):
stdout = "\n".join(stdout.splitlines()[2:]) + "\n"
stdout = "\n".join(stdout.splitlines()[2:]) + "\n"
if full_address \
if full_address \
and query.get('format', 'txt') != 'png' \
and parsed_query.get('format', 'txt') != 'png' \
and (not query.get('no-city')
and (not parsed_query.get('no-city')
and not query.get('no-caption')
and not parsed_query.get('no-caption')
and not query.get('days') == '0'):
and not parsed_query.get('days') == '0'):
line = "%s: %s [%s]\n" % (
line = "%s: %s [%s]\n" % (
get_message('LOCATION', lang),
get_message('LOCATION', lang),
stdout += line
stdout += line
if query.get('padding', False):
if parsed_query.get('padding', False):
lines = [x.rstrip() for x in stdout.splitlines()]
lines = [x.rstrip() for x in stdout.splitlines()]
max_l = max(len(remove_ansi(x)) for x in lines)
max_l = max(len(remove_ansi(x)) for x in lines)
last_line = " "*max_l + " .\n"
last_line = " "*max_l + " .\n"
stdout = " \n" + "\n".join(" %s " %x for x in lines) + "\n" + last_line
stdout = " \n" + "\n".join(" %s " %x for x in lines) + "\n" + last_line
open(filename, 'w').write(stdout)
return first, stdout
def _htmlize(ansi_output, title, parsed_query):
"""Return HTML representation of `ansi_output`.
Use `title` as the title of the page.
Format page according to query parameters from `parsed_query`."""
cmd = ["bash", ANSI2HTML, "--palette=solarized"]
cmd = ["bash", ANSI2HTML, "--palette=solarized"]
if not query.get('inverted_colors'):
if not parsed_query.get('inverted_colors'):
cmd += ["--bg=dark"]
cmd += ["--bg=dark"]
p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE )
proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate(stdout.encode("utf-8"))
stdout, stderr = proc.communicate(ansi_output.encode("utf-8"))
stdout = stdout.decode("utf-8")
stdout = stdout.decode("utf-8")
stderr = stderr.decode("utf-8")
stderr = stderr.decode("utf-8")
if p.returncode != 0:
if proc.returncode != 0:
error(stdout + stderr)
error(stdout + stderr)
if query.get('inverted_colors'):
if parsed_query.get('inverted_colors'):
stdout = stdout.replace('<body class="">', '<body class="" style="background:white;color:#777777">')
stdout = stdout.replace(
'<body class="">', '<body class="" style="background:white;color:#777777">')
title = "<title>%s</title>" % first
title = "<title>%s</title>" % title
opengraph = get_opengraph()
opengraph = _get_opengraph(parsed_query)
stdout = re.sub("<head>", "<head>" + title + opengraph, stdout)
stdout = re.sub("<head>", "<head>" + title + opengraph, stdout)
open(filename+'.html', 'w').write(stdout)
return stdout
filename = get_filename(location, lang=lang, query=parsed_query, location_name=location_name)
def _get_opengraph(parsed_query):
if not os.path.exists(filename):
"""Return OpenGraph data for `parsed_query`"""
save_weather_data(location, filename, lang=lang, query=parsed_query, location_name=location_name, full_address=full_address)
if html:
url = parsed_query['request_url'] or ""
filename += '.html'
pic_url = url.replace('?', '_')
return open(filename).read()
return (
'<meta property="og:image" content="%(pic_url)s_0pq.png" />'
'<meta property="og:site_name" content="wttr.in" />'
'<meta property="og:type" content="profile" />'
'<meta property="og:url" content="%(url)s" />'
) % {
'pic_url': pic_url,
'url': url,