Skip to content

Instantly share code, notes, and snippets.

@jdleslie
Last active November 8, 2017 02:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdleslie/d087bcbeffa9d1776b4b394e21c6818d to your computer and use it in GitHub Desktop.
Save jdleslie/d087bcbeffa9d1776b4b394e21c6818d to your computer and use it in GitHub Desktop.
Forecast-responsive control script for radiant floors
import logging
import os
from datetime import datetime, timedelta
import requests
import xmltodict
import pytz
TZ = os.environ.get("RADIANT_TZ", "UTC")
LAT = float(os.environ.get("RADIANT_LAT"))
LON = float(os.environ.get("RADIANT_LON"))
WWSD_TEMP = int( os.environ.get("RADIANT_WWSD", 60))
MIN_ON = int( os.environ.get("RADIANT_MIN_ON", 3))
MIN_OFF = int( os.environ.get("RADIANT_MIN_OFF", 3))
webrelay_url = "http://{}/stateFull.xml".format(os.environ.get("RADIANT_WEBRELAY_HOST", "boiler2"))
webrelay_target = os.environ.get("RADIANT_WEBRELAY_TARGET", "relay2state")
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
# Returns a list of dictionaries like this
#
# {u'detailedForecast': u'',
# u'endTime': u'2017-10-03T15:00:00-04:00',
# u'icon': u'https://api.weather.gov/icons/land/day/skc?size=small',
# u'isDaytime': True,
# u'name': u'',
# u'number': 50,
# u'shortForecast': u'Sunny',
# u'startTime': u'2017-10-03T14:00:00-04:00',
# u'temperature': 64,
# u'temperatureTrend': None,
# u'temperatureUnit': u'F',
# u'windDirection': u'SSE',
# u'windSpeed': u'5 mph'},
#
def get_forecast(lat, lon, hours=None):
url = "https://api.weather.gov/points/{:.3f},{:.3f}/forecast/hourly".format(lat, lon)
headers = {
"User-Agent": "boiler-control <boiler-control@example.com>",
"Accept": "application/ld+json",
}
response = requests.get(url, headers=headers)
return response.json()['periods'][:hours]
def is_radiant_running():
response = requests.get(webrelay_url)
root = xmltodict.parse(response.content)
assert root["datavalues"]["units"] == "F"
return root["datavalues"][webrelay_target] == "1"
def get_demand_schedule(lat, lon, max_temp):
future_demand = []
for period in get_forecast(lat, lon):
assert period["temperatureUnit"] == "F"
temp = period["temperature"]
# Adjust for solar gain
if period["isDaytime"]:
forecast = period["shortForecast"]
if forecast == "Sunny": temp += 7
elif forecast == "Mostly Sunny": temp += 5
elif forecast == "Clear": temp += 5
# Adjust for wind speed
wind = int(period['windSpeed'].split()[0])
if wind > 20: temp -= 7
elif wind > 15: temp -= 5
elif wind > 10: temp -= 3
if temp <= max_temp:
# Adjust period index so zero is the current period
future_demand.append(period['number'] - 1)
# https://docs.python.org/2.6/library/itertools.html#examples
def group_ranges(data):
"Combine adjacent demand periods in to (start, end) tuples"
from operator import itemgetter
from itertools import groupby
ranges = []
for k, g in groupby(enumerate(data), lambda (i, x):i - x):
group = map(itemgetter(1), g)
ranges.append((group[0], group[-1],))
return ranges
def elide_demand(schedule, next_demand):
"Combine demand periods separated by three or fewer hours and eliminate trivial length periods"
if not isinstance(schedule, list):
schedule = [schedule]
result = None
prev_demand = schedule[-1]
if next_demand[0] - prev_demand[1] > MIN_OFF:
if next_demand[1] - next_demand[0] > MIN_ON:
result = schedule + [next_demand]
else:
result = schedule
else:
result = schedule[:-1] + [ (prev_demand[0], next_demand[1],) ]
return result
result = reduce(elide_demand, group_ranges(future_demand)) or []
if not any(isinstance(e, list) for e in result):
result = [result]
return result
def is_radiant_needed(demand_start, demand_end):
result = False
if demand_start <= 2:
# Demand is within three hours
if is_radiant_running():
if demand_end >= 2:
# End of current cycle is more than two hours away
result = True
else:
if demand_end - demand_start >= 3:
# Start next cycle if demand will last more than 3 hours
result = True
return result
def set_radiant_state(run):
state = { webrelay_target: run and 1 or 0 }
requests.get(webrelay_url, params=state).raise_for_status()
def log_state(radiant_state, schedule):
schedule_descr = []
now = datetime.now(pytz.utc).astimezone(pytz.timezone(TZ))
for demand_start, demand_end in schedule:
start_date = now + timedelta(hours=demand_start)
length = demand_end - demand_start
hours_label = length > 1 and "hours" or "hour"
if demand_start == 0:
descr = "Now for {} {}".format(length, hours_label)
else:
descr = "{:%a at %-I %p} for {} {}".format(
start_date,
length, hours_label)
schedule_descr.append(descr)
if not len(schedule_descr):
schedule_descr = ["No demand"]
logging.info("Radiant is {}, schedule: {}".format(
radiant_state and "on" or "off",
", ".join(schedule_descr)) )
def main():
import time
from random import randint
while True:
schedule = get_demand_schedule(LAT, LON, WWSD_TEMP)
should_run = False
if len(schedule):
should_run = is_radiant_needed(*schedule[0])
set_radiant_state(should_run)
log_state(should_run, schedule)
time.sleep(3600 + randint(-30, 30))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment