You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
wttr.in/lib/spark.py

548 lines
15 KiB

# vim: fileencoding=utf-8
# vim: foldmethod=marker foldenable:
"""
[X] emoji
[ ] wego icon
[ ] v2.wttr.in
[X] astronomical (sunset)
[X] time
[X] frames
[X] colorize rain data
[ ] date + locales
[X] wind color
[ ] highlight current date
[ ] bind to real site
[ ] max values: temperature
[X] max value: rain
[ ] comment github
[ ] commit
"""
import sys
import re
import math
import json
import datetime
import StringIO
import requests
import diagram
import pyjq
import pytz
import numpy as np
from astral import Astral, Location
from scipy.interpolate import interp1d
from babel.dates import format_datetime
from globals import WWO_KEY
import constants
import translations
import wttr_line
if not sys.version_info >= (3, 0):
reload(sys) # noqa: F821
sys.setdefaultencoding("utf-8")
# data processing {{{
def get_data(config):
"""
Fetch data for `query_string`
"""
url = (
'http://'
'localhost:5001/premium/v1/weather.ashx'
'?key=%s'
'&q=%s&format=json&num_of_days=3&tp=3&lang=None'
) % (WWO_KEY, config["location"])
text = requests.get(url).text
parsed_data = json.loads(text)
return parsed_data
def interpolate_data(input_data, max_width):
"""
Resample `input_data` to number of `max_width` counts
"""
x = list(range(len(input_data)))
y = input_data
xvals = np.linspace(0, len(input_data)-1, max_width)
yinterp = interp1d(x, y, kind='cubic')
return yinterp(xvals)
def jq_query(query, data_parsed):
"""
Apply `query` to structued data `data_parsed`
"""
pyjq_data = pyjq.all(query, data_parsed)
data = map(float, pyjq_data)
return data
# }}}
# utils {{{
def colorize(string, color_code):
return "\033[%sm%s\033[0m" % (color_code, string)
# }}}
# draw_spark {{{
def draw_spark(data, height, width, color_data):
"""
Spark-style visualize `data` in a region `height` x `width`
"""
_BARS = u' _▁▂▃▄▅▇█'
def _box(height, row, value, max_value):
row_height = 1.0 * max_value / height
if row_height * row >= value:
return _BARS[0]
if row_height * (row+1) <= value:
return _BARS[-1]
return _BARS[int(1.0*(value - row_height*row)/(row_height*1.0)*len(_BARS))]
max_value = max(data)
output = ""
color_code = 20
for i in range(height):
for j in range(width):
character = _box(height, height-i-1, data[j], max_value)
if data[j] != 0:
chance_of_rain = color_data[j]/100.0 * 2
if chance_of_rain > 1:
chance_of_rain = 1
color_index = int(5*chance_of_rain)
color_code = 16 + color_index # int(math.floor((20-16) * 1.0 * (height-1-i)/height*(max_value/data[j])))
output += "\033[38;5;%sm%s\033[0m" % (color_code, character)
output += "\n"
# labeling max value
if max_value == 0:
max_line = " "*width
else:
max_line = ""
for j in range(width):
if data[j] == max_value:
max_line = "%3.2fmm|%s%%" % (max_value, int(color_data[j]))
orig_max_line = max_line
# aligning it
if len(max_line)/2 < j and len(max_line)/2 + j < width:
spaces = " "*(j - len(max_line)/2)
max_line = spaces + max_line # + spaces
max_line = max_line + " "*(width - len(max_line))
elif len(max_line)/2 + j >= width:
max_line = " "*(width - len(max_line)) + max_line
max_line = max_line.replace(orig_max_line, colorize(orig_max_line, "38;5;33"))
break
if max_line:
output = "\n" + max_line + "\n" + output + "\n"
return output
# }}}
# draw_diagram {{{
def draw_diagram(data, height, width):
option = diagram.DOption()
option.size = diagram.Point([width, height])
option.mode = 'g'
stream = StringIO.StringIO()
gram = diagram.DGWrapper(
data=[list(data), range(len(data))],
dg_option=option,
ostream=stream)
gram.show()
return stream.getvalue()
# }}}
# draw_date {{{
def draw_date(config, geo_data):
"""
"""
tzinfo = pytz.timezone(geo_data["timezone"])
locale = config.get("locale", "en_US")
datetime_day_start = datetime.datetime.utcnow()
answer = ""
for day in range(3):
datetime_ = datetime_day_start + datetime.timedelta(hours=24*day)
date = format_datetime(datetime_, "EEE dd MMM", locale=locale, tzinfo=tzinfo)
spaces = ((24-len(date))/2)*" "
date = spaces + date + spaces
date = " "*(24-len(date)) + date
answer += date
answer += "\n"
for _ in range(3):
answer += " "*23 + u""
return answer[:-1] + " "
# }}}
# draw_time {{{
def draw_time(geo_data):
"""
"""
tzinfo = pytz.timezone(geo_data["timezone"])
line = ["", ""]
for _ in range(3):
part = u""*5 + u"" + u""*5
line[0] += part + u"" + part + u""
line[0] += "\n"
for _ in range(3):
line[1] += " 6 12 18 "
line[1] += "\n"
# highlight current time
hour_number = \
(datetime.datetime.now(tzinfo)
- datetime.datetime.now(tzinfo).replace(hour=0, minute=0, second=0, microsecond=0)
).seconds//3600
for line_number, _ in enumerate(line):
line[line_number] = \
line[line_number][:hour_number] \
+ colorize(line[line_number][hour_number], "46") \
+ line[line_number][hour_number+1:]
return "".join(line)
# }}}
# draw_astronomical {{{
def draw_astronomical(city_name, geo_data):
datetime_day_start = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
a = Astral()
a.solar_depression = 'civil'
city = Location()
city.latitude = geo_data["latitude"]
city.longitude = geo_data["longitude"]
city.timezone = geo_data["timezone"]
answer = ""
moon_line = ""
for time_interval in range(72):
current_date = (
datetime_day_start
+ datetime.timedelta(hours=1*time_interval)).replace(tzinfo=pytz.timezone(geo_data["timezone"]))
sun = city.sun(date=current_date, local=False)
dawn = sun['dawn'] # .replace(tzinfo=None)
dusk = sun['dusk'] # .replace(tzinfo=None)
sunrise = sun['sunrise'] # .replace(tzinfo=None)
sunset = sun['sunset'] # .replace(tzinfo=None)
if current_date < dawn:
char = " "
elif current_date > dusk:
char = " "
elif dawn < current_date and current_date < sunrise:
char = u""
elif sunset < current_date and current_date < dusk:
char = u""
elif sunrise < current_date and current_date < sunset:
char = u""
answer += char
# moon
if time_interval % 3 == 0:
moon_phase = city.moon_phase(
date=datetime_day_start + datetime.timedelta(hours=time_interval))
moon_phase_emoji = constants.MOON_PHASES[int(math.floor(moon_phase*1.0/28.0*8+0.5)) % len(constants.MOON_PHASES)]
if time_interval in [0, 24, 48, 69]:
moon_line += moon_phase_emoji + " "
else:
moon_line += " "
answer = moon_line + "\n" + answer + "\n"
answer += "\n"
return answer
# }}}
# draw_emoji {{{
def draw_emoji(data):
answer = ""
for i in data:
emoji = constants.WEATHER_SYMBOL.get(
constants.WWO_CODE.get(
str(int(i)), "Unknown"))
space = " "*(3-constants.WEATHER_SYMBOL_WIDTH_VTE.get(emoji))
answer += emoji + space
answer += "\n"
return answer
# }}}
# draw_wind {{{
def draw_wind(data, color_data):
def _color_code_for_wind_speed(wind_speed):
color_codes = [
(3, 241), # 82
(6, 242), # 118
(9, 243), # 154
(12, 246), # 190
(15, 250), # 226
(19, 253), # 220
(23, 214),
(27, 208),
(31, 202),
(-1, 196)
]
for this_wind_speed, this_color_code in color_codes:
if wind_speed <= this_wind_speed:
return this_color_code
return color_codes[-1][1]
answer = ""
answer_line2 = ""
for j, degree in enumerate(data):
degree = int(degree)
if degree:
wind_direction = constants.WIND_DIRECTION[((degree+22)%360)/45]
else:
wind_direction = ""
color_code = "38;5;%s" % _color_code_for_wind_speed(int(color_data[j]))
answer += " %s " % colorize(wind_direction, color_code)
# wind_speed
wind_speed = int(color_data[j])
wind_speed_str = colorize(str(wind_speed), color_code)
if wind_speed < 10:
wind_speed_str = " " + wind_speed_str + " "
elif wind_speed < 100:
wind_speed_str = " " + wind_speed_str
answer_line2 += wind_speed_str
answer += "\n"
answer += answer_line2 + "\n"
return answer
# }}}
# panel implementation {{{
def add_frame(output, width, config):
"""
Add frame arond `output` that has width `width`
"""
empty_line = " "*width
output = "\n".join(u""+(x or empty_line)+u"" for x in output.splitlines()) + "\n"
weather_report = \
translations.CAPTION[config["lang"]] \
+ " " \
+ (config["override_location"] or config["location"])
caption = u"" + " " + weather_report + " " + u""
output = u"" + caption + u""*(width-len(caption)) + u"\n" \
+ output + \
u"" + u""*width + u"\n"
return output
def generate_panel(data_parsed, geo_data, config):
"""
"""
max_width = 72
precip_mm_query = "[.data.weather[] | .hourly[]] | .[].precipMM"
precip_chance_query = "[.data.weather[] | .hourly[]] | .[].chanceofrain"
feels_like_query = "[.data.weather[] | .hourly[]] | .[].FeelsLikeC"
weather_code_query = "[.data.weather[] | .hourly[]] | .[].weatherCode"
wind_direction_query = "[.data.weather[] | .hourly[]] | .[].winddirDegree"
wind_speed_query = "[.data.weather[] | .hourly[]] | .[].windspeedKmph"
output = ""
output += "\n\n"
output += draw_date(config, geo_data)
output += "\n"
output += "\n"
output += "\n"
data = jq_query(feels_like_query, data_parsed)
data_interpolated = interpolate_data(data, max_width)
output += draw_diagram(data_interpolated, 10, max_width)
output += "\n"
output += draw_time(geo_data)
data = jq_query(precip_mm_query, data_parsed)
color_data = jq_query(precip_chance_query, data_parsed)
data_interpolated = interpolate_data(data, max_width)
color_data_interpolated = interpolate_data(color_data, max_width)
output += draw_spark(data_interpolated, 5, max_width, color_data_interpolated)
output += "\n"
data = jq_query(weather_code_query, data_parsed)
output += draw_emoji(data)
data = jq_query(wind_direction_query, data_parsed)
color_data = jq_query(wind_speed_query, data_parsed)
output += draw_wind(data, color_data)
output += "\n"
output += draw_astronomical(config["location"], geo_data)
output += "\n"
output = add_frame(output, max_width, config)
return output
# }}}
# textual information {{{
def textual_information(data_parsed, geo_data, config):
"""
Add textual information about current weather and
astronomical conditions
"""
def _shorten_full_location(full_location, city_only=False):
def _count_runes(string):
return len(string.encode('utf-16-le')) // 2
words = full_location.split(",")
output = words[0]
if city_only:
return output
for word in words[1:]:
if _count_runes(output + "," + word) > 50:
return output
output += "," + word
return output
city = Location()
city.latitude = geo_data["latitude"]
city.longitude = geo_data["longitude"]
city.timezone = geo_data["timezone"]
output = []
timezone = city.timezone
datetime_day_start = datetime.datetime.now()\
.replace(hour=0, minute=0, second=0, microsecond=0)
sun = city.sun(date=datetime_day_start, local=True)
format_line = "%c %C, %t, %h, %w, %P"
current_condition = data_parsed['data']['current_condition'][0]
query = {}
weather_line = wttr_line.render_line(format_line, current_condition, query)
output.append('Weather: %s' % weather_line)
output.append('Timezone: %s' % timezone)
tmp_output = []
tmp_output.append(' Now: %%{{NOW(%s)}}' % timezone)
tmp_output.append('Dawn: %s'
% str(sun['dawn'].strftime("%H:%M:%S")))
tmp_output.append('Sunrise: %s'
% str(sun['sunrise'].strftime("%H:%M:%S")))
tmp_output.append(' Zenith: %s'
% str(sun['noon'].strftime("%H:%M:%S ")))
tmp_output.append('Sunset: %s'
% str(sun['sunset'].strftime("%H:%M:%S")))
tmp_output.append('Dusk: %s'
% str(sun['dusk'].strftime("%H:%M:%S")))
tmp_output = [
re.sub("^([A-Za-z]*:)", lambda m: colorize(m.group(1), "2"), x)
for x in tmp_output]
output.append(
"%20s" % tmp_output[0] \
+ " | %20s " % tmp_output[1] \
+ " | %20s" % tmp_output[2])
output.append(
"%20s" % tmp_output[3] \
+ " | %20s " % tmp_output[4] \
+ " | %20s" % tmp_output[5])
city_only = False
suffix = ""
if "Simferopol" in timezone:
city_only = True
suffix = ", Крым"
if config["full_address"]:
output.append('Location: %s%s [%5.4f,%5.4f]' \
% (
_shorten_full_location(config["full_address"], city_only=city_only),
suffix,
geo_data["latitude"],
geo_data["longitude"],
))
output = [
re.sub("^( *[A-Za-z]*:)", lambda m: colorize(m.group(1), "2"),
re.sub("^( +[A-Za-z]*:)", lambda m: colorize(m.group(1), "2"),
re.sub(r"(\|)", lambda m: colorize(m.group(1), "2"), x)))
for x in output]
return "".join("%s\n" % x for x in output)
# }}}
# get_geodata {{{
def get_geodata(location):
text = requests.get("http://localhost:8004/%s" % location).text
return json.loads(text)
# }}}
def main(location, override_location=None, data=None, full_address=None, view=None):
config = {
"lang": "en",
"locale": "en_US",
"location": location,
"override_location": override_location,
"full_address": full_address,
"view": view,
}
geo_data = get_geodata(location)
if data is None:
data_parsed = get_data(config)
else:
data_parsed = data
output = generate_panel(data_parsed, geo_data, config)
output += textual_information(data_parsed, geo_data, config)
return output
if __name__ == '__main__':
sys.stdout.write(main(sys.argv[1]))