Skip to content

Instantly share code, notes, and snippets.

@ahihi
Created May 16, 2020 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ahihi/299693c355acc2b320d5ca16bdacd582 to your computer and use it in GitHub Desktop.
Save ahihi/299693c355acc2b320d5ca16bdacd582 to your computer and use it in GitHub Desktop.
sääkissa
import argparse
from datetime import datetime, timedelta
from functools import reduce
from lxml import etree
import math
import random
import sys
import time
from owslib.wfs import WebFeatureService
from pythonosc.udp_client import SimpleUDPClient
colors = [
(-20, (0.5, 0.0, 1.0)),
(-10, (0.0, 0.0, 1.0)),
(0, (0.0, 0.5, 1.0)),
(10, (1.0, 1.0, 1.0)),
(20, (1.0, 0.5, 0.0)),
(30, (1.0, 0.0, 0.0))
]
noise_len = 7
fade_var = (0.5, 1.0)
def get_clipped(x, in_min, in_max, out_min, out_max, clip="minmax"):
if clip in ("min", "minmax") and x <= in_min:
return out_min
elif clip in ("max", "minmax") and x >= in_max:
return out_max
else:
return None
def linlin(x, in_min, in_max, out_min, out_max, clip="minmax"):
clipped = get_clipped(x, in_min, in_max, out_min, out_max, clip)
if clipped != None:
return clipped
return (x - in_min) / (in_max - in_min) * (out_max - out_min) + out_min
def linexp(x, in_min, in_max, out_min, out_max, clip="minmax"):
clipped = get_clipped(x, in_min, in_max, out_min, out_max, clip)
if clipped != None:
return clipped
return pow(out_max / out_min, (x - in_min) / (in_max - in_min)) * out_min
def lincurve(x, in_min=0, in_max=1, out_min=0, out_max=1, curve=-4, clip="minmax"):
clipped = get_clipped(x, in_min, in_max, out_min, out_max, clip)
if clipped != None:
return clipped
grow = math.exp(curve)
a = (out_max - out_min) / (1.0 - grow)
b = out_min + a
scaled = (x - in_min) / (in_max - in_min)
return b - (a * pow(grow, scaled))
def get_color_seq(temp, rain):
def _acc(s, m):
s_i, s_temp = s
m_temp, m_color = m
if s_temp != None:
return s
elif temp < m_temp:
return (s_i, m_temp)
else:
return (s_i+1, s_temp)
next_i, next_temp = reduce(_acc, colors, (0, None))
if next_temp == None: # temp is above maximum
color = colors[-1][1]
elif next_i == 0: # below minimum
color = colors[next_i][1]
else: # in between
prev_temp, prev_color = colors[next_i - 1]
next_color = colors[next_i][1]
k = linlin(temp, prev_temp, next_temp, 0.0, 1.0)
color = tuple(map(lambda p, n: (1.0 - k) * p + k * n, prev_color, next_color))
rain_interval = lincurve(rain, 0.0, 1.0, 2000, 300, -3.0)
rain_v = lincurve(rain, 0.0, 2.0, 1.0, 0.9, -4.0)
attack = 50
def _raindrop(i):
fade = round(rain_interval * linexp(random.uniform(0.0, 1.0), 0.0, 1.0, fade_var[0], fade_var[1]))
v = rain_v * random.uniform(linlin(rain, 0.0, 1.0, 1.0, 0.95), 1.0)
r, g, b = color
return [
r, g, b, 1, fade - attack,
v*r, v*g, v*b, 1, attack
]
rain_colors = reduce(lambda a, b: a + b, map(_raindrop, range(noise_len)), [])
return rain_colors
def log(*args, **kwargs):
print(time.strftime("%Y-%m-%d %H:%M:%S"), *args, **kwargs)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", required=True)
parser.add_argument("--port", type=int, required=True)
parser.add_argument("--poll_interval", type=int, default=1800)
args = parser.parse_args()
wfs = WebFeatureService(url='https://opendata.fmi.fi/wfs', version='2.0.0')
if not wfs:
log("connection failed")
sys.exit(1)
log("connected:", wfs.identification.title)
log("poll interval:", args.poll_interval, "s")
osc = SimpleUDPClient(args.host, args.port)
latest = {}
while True:
now = datetime.now()
response = wfs.getfeature(
storedQueryID="fmi::observations::weather::simple",
storedQueryParams={
"place": "Helsinki",
"parameters": "temperature,r_1h",
"timestep": 30,
"starttime": (now - timedelta(minutes=60)).astimezone().replace(microsecond=0).isoformat()
}
)
data = response.read()
xml = etree.XML(data)
elements = reduce(lambda a, b: a+b, map(lambda e: e.findall("{*}BsWfsElement"), xml.findall("{*}member")), [])
for element in elements:
#time = element.find("{*}Time").text
name = element.find("{*}ParameterName").text
value = float(element.find("{*}ParameterValue").text)
if not math.isnan(value):
latest[name] = value
log("latest data:", latest)
if "temperature" in latest and "r_1h" in latest:
osc_addr = "/rgb"
osc_args = [1, *get_color_seq(latest["temperature"], latest["r_1h"])]
log("sending OSC:", osc_addr, osc_args)
osc.send_message(osc_addr, osc_args)
time.sleep(args.poll_interval)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment