Skip to content

Instantly share code, notes, and snippets.

@temoto
Last active October 22, 2018 13:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save temoto/72cb25f924ef0e7c52a360ab600fe822 to your computer and use it in GitHub Desktop.
Save temoto/72cb25f924ef0e7c52a360ab600fe822 to your computer and use it in GitHub Desktop.
Automate lights color temperature via MQTT enabled dimmer (like Sonoff). Temporary location, will be moved to full repository.
#!/usr/bin/env python2
import os, sys, time
import astral, datetime
import paho.mqtt.client
# EDIT HERE TO CONFIGURE + run with env `mqtt_broker=addr:port mqtt_topic=cmnd/house/ct script.py`
config = 'midnight scale_down sunrise 0% noon scale_up sunset 100%'
lat, lng = (58.0, 56.316) # Perm
def utc_to_local(utc):
epoch = time.mktime(utc.timetuple())
offset = datetime.datetime.fromtimestamp(epoch) - datetime.datetime.utcfromtimestamp(epoch)
return utc + offset
def sun_minutes():
today = datetime.date.today()
a = astral.Astral()
# a.solar_depression = 'civil'
a.solar_depression = 'nautical'
# a.solar_depression = 'astronomic'
dt_minute = lambda dt: dt.hour*60 + dt.minute
sunrise = dt_minute(utc_to_local(a.sunrise_utc(today, lat, lng)))
sunset = dt_minute(utc_to_local(a.sunset_utc(today, lat, lng)))
return sunrise, sunset
def scale(v, vrange, target, reverse=False):
vlo, vhi = vrange
assert vlo <= v <= vhi, "config error. fail {}<={}<={}".format(vlo, v, vhi)
vk = float(v-vlo) / (vhi-vlo)
assert 0 <= vk <= 1, "code bug. fail vk=[0;1] locals={}".format(locals())
if reverse:
vk = 1 - vk
tlo, thi = target
t = int(vk * (thi-tlo)) + tlo
assert tlo <= t <= thi, "code bug. fail {}<={}<={} locals={}".format(tlo, t, thi, locals())
return t
def act(action, v, vrange, target):
if isinstance(action, (float, int)):
return action
elif action == 'scale_up':
return scale(v, vrange, target, reverse=False)
elif action == 'scale_down':
return scale(v, vrange, target, reverse=True)
elif action.endswith('%'):
return scale(int(action[:-1]), (0, 100), target)
raise Exception("code bug. invalid action={} locals={}".format(act, locals()))
def color_temp(config_text, minute, sun_range):
target = (153, 500)
sunrise, sunset = sun_range
mid_day = sunrise + int((sunset - sunrise) / 2)
mid_night = (sunrise + sunset) % 1440
time_name_map = {
'sunrise': sunrise,
'sunset': sunset,
'night': mid_night,
'midnight': mid_night,
'mid-night': mid_night,
'noon': mid_day,
'midday': mid_day,
'mid-day': mid_day,
}
def parse_config_time(s):
named = time_name_map.get(s)
if named is not None:
return named
if s.beginswith('T'):
h, m = map(int, s[1:].split(':', 1))
return h*60 + m
raise Exception("config error. invalid time={}".format(s))
# parse config text
words = config_text.split()
assert len(words)%2 == 0, "config error, word count must be even (x%2==0) text: {}".format(config_text)
times = tuple(map(parse_config_time, words[0::2]))
actions = tuple(words[1::2])
for a in actions:
assert isinstance(a, (float, int)) or (a in ('scale_up', 'scale_down')) or a.endswith('%')
time_low = times[0]
time_high = times[-1]
# handle over-night wrap
if minute < time_low:
return act(actions[-1], minute+1440, (time_high, time_low+1440), target)
elif minute > time_high:
return act(actions[-1], minute, (time_high, time_low+1440), target)
# all other values for `minute` inside `config` ranges
for i, lo in enumerate(times):
hi = times[i+1]
if lo <= minute < hi:
return act(actions[i], minute, (lo, hi), target)
raise Exception("likely config error. range not found. minute={} locals={}".format(minute, locals()))
def parse_net_addr(s):
parts = s.rsplit(':', 1)
addr = parts[0]
port = int(parts[1])
return (addr, port)
def main():
local = time.localtime()
minute = local.tm_hour * 60 + local.tm_min
sunrise, sunset = sun_minutes()
ct = color_temp(config, minute, (sunrise, sunset))
sys.stderr.write("minute={minute} sunrise={sunrise} sunset={sunset} ct={ct}\n".format(**locals()))
if ct is None:
sys.stderr.write("wrong interval\n")
sys.exit(1)
mqtt_addr = parse_net_addr(os.environ.get('mqtt_broker', 'localhost:1883'))
mqtt_topic = os.environ.get('mqtt_topic')
if not mqtt_topic:
sys.stderr.write("must define mqtt_topic environment\n")
sys.exit(1)
mqtt = paho.mqtt.client.Client("color-temp-controller-v0")
mqtt.connect(*mqtt_addr)
mqtt.publish(mqtt_topic, ct, retain=True)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment