#!/bin/env python # vim: fileencoding=utf-8 from datetime import datetime, timedelta import json import logging import os import re import sys import timezonefinder from pytz import timezone from constants import WWO_CODE logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO")) logger = logging.getLogger(__name__) def metno_request(path, query_string): # We'll need to sanitize the inbound request - ideally the # premium/v1/weather.ashx portion would have always been here, though # it seems as though the proxy was built after the majority of the app # and not refactored. For WAPI we'll strip this and the API key out, # then manage it on our own. logger.debug("Original path: " + path) logger.debug("Original query: " + query_string) path = path.replace( "premium/v1/weather.ashx", "weatherapi/locationforecast/2.0/complete" ) query_string = re.sub(r"key=[^&]*&", "", query_string) query_string = re.sub(r"format=[^&]*&", "", query_string) days = int(re.search(r"num_of_days=([0-9]+)&", query_string).group(1)) query_string = re.sub(r"num_of_days=[0-9]+&", "", query_string) # query_string = query_string.replace('key=', '?key=' + WAPI_KEY) # TP is for hourly forecasting, which isn't available in the free api. query_string = re.sub(r"tp=[0-9]*&", "", query_string) # This assumes lang=... is at the end. Also note that the API doesn't # localize, and we're not either. TODO: add language support query_string = re.sub(r"lang=[^&]*$", "", query_string) query_string = re.sub(r"&$", "", query_string) logger.debug("qs: " + query_string) # Deal with coordinates. Need to be rounded to 4 decimals for metno ToC # and in a different query string format coord_match = re.search(r"q=[^&]*", query_string) coords_str = coord_match.group(0) coords = re.findall(r"[-0-9.]+", coords_str) lat = str(round(float(coords[0]), 4)) lng = str(round(float(coords[1]), 4)) logger.debug("lat: " + lat) logger.debug("lng: " + lng) query_string = re.sub(r"q=[^&]*", "lat=" + lat + "&lon=" + lng + "&", query_string) logger.debug("Return path: " + path) logger.debug("Return query: " + query_string) return path, query_string, days def celsius_to_f(celsius): return round((1.8 * celsius) + 32, 1) def to_weather_code(symbol_code): logger.debug(symbol_code) code = re.sub(r"_.*", "", symbol_code) logger.debug(code) # symbol codes: https://api.met.no/weatherapi/weathericon/2.0/documentation # they also have _day, _night and _polartwilight variants # See json from https://api.met.no/weatherapi/weathericon/2.0/legends # WWO codes: https://github.com/chubin/wttr.in/blob/master/lib/constants.py # http://www.worldweatheronline.com/feed/wwoConditionCodes.txt weather_code_map = { "clearsky": 113, "cloudy": 119, "fair": 116, "fog": 143, "heavyrain": 302, "heavyrainandthunder": 389, "heavyrainshowers": 305, "heavyrainshowersandthunder": 386, "heavysleet": 314, # There's a ton of 'LightSleet' in WWO_CODE... "heavysleetandthunder": 377, "heavysleetshowers": 362, "heavysleetshowersandthunder": 374, "heavysnow": 230, "heavysnowandthunder": 392, "heavysnowshowers": 371, "heavysnowshowersandthunder": 392, "lightrain": 266, "lightrainandthunder": 200, "lightrainshowers": 176, "lightrainshowersandthunder": 386, "lightsleet": 281, "lightsleetandthunder": 377, "lightsleetshowers": 284, "lightsnow": 320, "lightsnowandthunder": 392, "lightsnowshowers": 368, "lightssleetshowersandthunder": 365, "lightssnowshowersandthunder": 392, "partlycloudy": 116, "rain": 293, "rainandthunder": 389, "rainshowers": 299, "rainshowersandthunder": 386, "sleet": 185, "sleetandthunder": 392, "sleetshowers": 263, "sleetshowersandthunder": 392, "snow": 329, "snowandthunder": 392, "snowshowers": 230, "snowshowersandthunder": 392, } if code not in weather_code_map: logger.debug("not found") return -1 # not found logger.debug(weather_code_map[code]) return weather_code_map[code] def to_description(symbol_code): desc = WWO_CODE[str(to_weather_code(symbol_code))] logger.debug(desc) return desc def to_16_point(degrees): # 360 degrees / 16 = 22.5 degrees of arc or 11.25 degrees around the point if degrees > (360 - 11.25) or degrees <= 11.25: return "N" if degrees > 11.25 and degrees <= (11.25 + 22.5): return "NNE" if degrees > (11.25 + (22.5 * 1)) and degrees <= (11.25 + (22.5 * 2)): return "NE" if degrees > (11.25 + (22.5 * 2)) and degrees <= (11.25 + (22.5 * 3)): return "ENE" if degrees > (11.25 + (22.5 * 3)) and degrees <= (11.25 + (22.5 * 4)): return "E" if degrees > (11.25 + (22.5 * 4)) and degrees <= (11.25 + (22.5 * 5)): return "ESE" if degrees > (11.25 + (22.5 * 5)) and degrees <= (11.25 + (22.5 * 6)): return "SE" if degrees > (11.25 + (22.5 * 6)) and degrees <= (11.25 + (22.5 * 7)): return "SSE" if degrees > (11.25 + (22.5 * 7)) and degrees <= (11.25 + (22.5 * 8)): return "S" if degrees > (11.25 + (22.5 * 8)) and degrees <= (11.25 + (22.5 * 9)): return "SSW" if degrees > (11.25 + (22.5 * 9)) and degrees <= (11.25 + (22.5 * 10)): return "SW" if degrees > (11.25 + (22.5 * 10)) and degrees <= (11.25 + (22.5 * 11)): return "WSW" if degrees > (11.25 + (22.5 * 11)) and degrees <= (11.25 + (22.5 * 12)): return "W" if degrees > (11.25 + (22.5 * 12)) and degrees <= (11.25 + (22.5 * 13)): return "WNW" if degrees > (11.25 + (22.5 * 13)) and degrees <= (11.25 + (22.5 * 14)): return "NW" if degrees > (11.25 + (22.5 * 14)) and degrees <= (11.25 + (22.5 * 15)): return "NNW" def meters_to_miles(meters): return round(meters * 0.00062137, 2) def mm_to_inches(mm): return round(mm / 25.4, 2) def hpa_to_mb(hpa): return hpa def hpa_to_in(hpa): return round(hpa * 0.02953, 2) def hpa_to_mmHg(hpa): return round(hpa * 0.75006157584566, 3) def group_hours_to_days(lat, lng, hourlies, days_to_return): tf = timezonefinder.TimezoneFinder() timezone_str = tf.certain_timezone_at(lat=lat, lng=lng) logger.debug("got TZ: " + timezone_str) tz = timezone(timezone_str) start_day_gmt = datetime.fromisoformat(hourlies[0]["time"].replace("Z", "+00:00")) start_day_local = start_day_gmt.astimezone(tz) end_day_local = (start_day_local + timedelta(days=days_to_return - 1)).date() logger.debug("series starts at gmt time: " + str(start_day_gmt)) logger.debug("series starts at local time: " + str(start_day_local)) logger.debug("series ends on day: " + str(end_day_local)) days = {} for hour in hourlies: current_day_gmt = datetime.fromisoformat(hour["time"].replace("Z", "+00:00")) current_local = current_day_gmt.astimezone(tz) current_day_local = current_local.date() if current_day_local > end_day_local: continue if current_day_local not in days: days[current_day_local] = {"hourly": []} hour["localtime"] = current_local.time() days[current_day_local]["hourly"].append(hour) # Need a second pass to build the min/max/avg data for date, day in days.items(): minTempC = -999 maxTempC = 1000 avgTempC = None n = 0 maxUvIndex = 0 for hour in day["hourly"]: temp = hour["data"]["instant"]["details"]["air_temperature"] if temp > minTempC: minTempC = temp if temp < maxTempC: maxTempC = temp if avgTempC is None: avgTempC = temp n = 1 else: avgTempC = ((avgTempC * n) + temp) / (n + 1) n = n + 1 uv = hour["data"]["instant"]["details"] if "ultraviolet_index_clear_sky" in uv: if uv["ultraviolet_index_clear_sky"] > maxUvIndex: maxUvIndex = uv["ultraviolet_index_clear_sky"] day["maxtempC"] = str(maxTempC) day["maxtempF"] = str(celsius_to_f(maxTempC)) day["mintempC"] = str(minTempC) day["mintempF"] = str(celsius_to_f(minTempC)) day["avgtempC"] = str(round(avgTempC, 1)) day["avgtempF"] = str(celsius_to_f(avgTempC)) # day["totalSnow_cm": "not implemented", # day["sunHour": "12", # This would come from astonomy data day["uvIndex"] = str(maxUvIndex) return days def _convert_hour(hour): # Whatever is upstream is expecting data in the shape of WWO. This method will # morph from metno to hourly WWO response format. # Note that WWO is providing data every 3 hours. Metno provides every hour # { # "time": "0", # "tempC": "19", # "tempF": "66", # "windspeedMiles": "6", # "windspeedKmph": "9", # "winddirDegree": "276", # "winddir16Point": "W", # "weatherCode": "119", # "weatherIconUrl": [ # { # "value": "http://cdn.worldweatheronline.com/images/wsymbols01_png_64/wsymbol_0003_white_cloud.png" # } # ], # "weatherDesc": [ # { # "value": "Cloudy" # } # ], # "precipMM": "0.0", # "precipInches": "0.0", # "humidity": "62", # "visibility": "10", # "visibilityMiles": "6", # "pressure": "1017", # "pressureInches": "31", # "cloudcover": "66", # "HeatIndexC": "19", # "HeatIndexF": "66", # "DewPointC": "12", # "DewPointF": "53", # "WindChillC": "19", # "WindChillF": "66", # "WindGustMiles": "8", # "WindGustKmph": "13", # "FeelsLikeC": "19", # "FeelsLikeF": "66", # "chanceofrain": "0", # "chanceofremdry": "93", # "chanceofwindy": "0", # "chanceofovercast": "89", # "chanceofsunshine": "18", # "chanceoffrost": "0", # "chanceofhightemp": "0", # "chanceoffog": "0", # "chanceofsnow": "0", # "chanceofthunder": "0", # "uvIndex": "1" details = hour["data"]["instant"]["details"] if "next_1_hours" in hour["data"]: next_hour = hour["data"]["next_1_hours"] elif "next_6_hours" in hour["data"]: next_hour = hour["data"]["next_6_hours"] elif "next_12_hours" in hour["data"]: next_hour = hour["data"]["next_12_hours"] else: next_hour = {} # Need to dig out symbol_code and precipitation_amount symbol_code = "clearsky_day" # Default to sunny if "summary" in next_hour and "symbol_code" in next_hour["summary"]: symbol_code = next_hour["summary"]["symbol_code"] precipitation_amount = 0 # Default to no rain if "details" in next_hour and "precipitation_amount" in next_hour["details"]: precipitation_amount = next_hour["details"]["precipitation_amount"] uvIndex = 0 # default to 0 index if "ultraviolet_index_clear_sky" in details: uvIndex = details["ultraviolet_index_clear_sky"] localtime = "" if "localtime" in hour: localtime = "{h:02.0f}".format(h=hour["localtime"].hour) + "{m:02.0f}".format( m=hour["localtime"].minute ) logger.debug(str(hour["localtime"])) # time property is local time, 4 digit 24 hour, with no :, e.g. 2100 return { "time": localtime, "observation_time": hour["time"], # Need to figure out WWO TZ # temp_C is used in we-lang.go calcs in such a way # as to expect a whole number "temp_C": str(int(round(details["air_temperature"], 0))), # temp_F can be more precise - not used in we-lang.go calcs "temp_F": str(celsius_to_f(details["air_temperature"])), "weatherCode": str(to_weather_code(symbol_code)), "weatherIconUrl": [ { "value": "not yet implemented", } ], "weatherDesc": [ { "value": to_description(symbol_code), } ], # similiarly, windspeedMiles is not used by we-lang.go, but kmph is "windspeedMiles": str(meters_to_miles(details["wind_speed"])), "windspeedKmph": str(int(round(details["wind_speed"], 0))), "winddirDegree": str(details["wind_from_direction"]), "winddir16Point": to_16_point(details["wind_from_direction"]), "precipMM": str(precipitation_amount), "precipInches": str(mm_to_inches(precipitation_amount)), "humidity": str(details["relative_humidity"]), "visibility": "not yet implemented", # str(details['vis_km']), "visibilityMiles": "not yet implemented", # str(details['vis_miles']), "pressure": str(hpa_to_mb(details["air_pressure_at_sea_level"])), "pressure_mmHg": str(hpa_to_mmHg(details["air_pressure_at_sea_level"])), "pressureInches": str(hpa_to_in(details["air_pressure_at_sea_level"])), "cloudcover": "not yet implemented", # Convert from cloud_area_fraction?? str(details['cloud']), # metno doesn't have FeelsLikeC, but we-lang.go is using it in calcs, # so we shall set it to temp_C "FeelsLikeC": str(int(round(details["air_temperature"], 0))), "FeelsLikeF": "not yet implemented", # str(details['feelslike_f']), "uvIndex": str(uvIndex), } def _convert_hourly(hours): converted_hours = [] for hour in hours: converted_hours.append(_convert_hour(hour)) return converted_hours # Whatever is upstream is expecting data in the shape of WWO. This method will # morph from metno to WWO response format. def create_standard_json_from_metno(content, days_to_return): try: forecast = json.loads(content) # pylint: disable=invalid-name except (ValueError, TypeError) as exception: logger.error("---") logger.error(exception) logger.error("---") return {}, "" hourlies = forecast["properties"]["timeseries"] current = hourlies[0] # We are assuming these units: # "units": { # "air_pressure_at_sea_level": "hPa", # "air_temperature": "celsius", # "air_temperature_max": "celsius", # "air_temperature_min": "celsius", # "cloud_area_fraction": "%", # "cloud_area_fraction_high": "%", # "cloud_area_fraction_low": "%", # "cloud_area_fraction_medium": "%", # "dew_point_temperature": "celsius", # "fog_area_fraction": "%", # "precipitation_amount": "mm", # "relative_humidity": "%", # "ultraviolet_index_clear_sky": "1", # "wind_from_direction": "degrees", # "wind_speed": "m/s" # } content = { "data": { "request": [ { "type": "feature", "query": str(forecast["geometry"]["coordinates"][1]) + "," + str(forecast["geometry"]["coordinates"][0]), } ], "current_condition": [_convert_hour(current)], "weather": [], } } days = group_hours_to_days( forecast["geometry"]["coordinates"][1], forecast["geometry"]["coordinates"][0], hourlies, days_to_return, ) # TODO: Astronomy needs to come from this: # https://api.met.no/weatherapi/sunrise/2.0/.json?lat=40.7127&lon=-74.0059&date=2020-10-07&offset=-05:00 # and obviously can be cached for a while # https://api.met.no/weatherapi/sunrise/2.0/documentation # Note that full moon/new moon/first quarter/last quarter aren't returned # and the moonphase value should match these from WWO: # New Moon # Waxing Crescent # First Quarter # Waxing Gibbous # Full Moon # Waning Gibbous # Last Quarter # Waning Crescent for date, day in days.items(): content["data"]["weather"].append( { "date": str(date), "astronomy": [], "maxtempC": day["maxtempC"], "maxtempF": day["maxtempF"], "mintempC": day["mintempC"], "mintempF": day["mintempF"], "avgtempC": day["avgtempC"], "avgtempF": day["avgtempF"], "totalSnow_cm": "not implemented", "sunHour": "12", # This would come from astonomy data "uvIndex": day["uvIndex"], "hourly": _convert_hourly(day["hourly"]), } ) # for day in forecast. return json.dumps(content) if __name__ == "__main__": # if len(sys.argv) == 1: # for deg in range(0, 360): # print('deg: ' + str(deg) + '; 16point: ' + to_16_point(deg)) if len(sys.argv) == 2: req = sys.argv[1].split("?") # to_description(sys.argv[1]) metno_request(req[0], req[1]) elif len(sys.argv) == 3: with open(sys.argv[1], "r") as contentf: content = create_standard_json_from_metno(contentf.read(), int(sys.argv[2])) print(content) else: print("usage: metno <content file> <days>")