Skip to content

Instantly share code, notes, and snippets.

@bonzini
Last active March 2, 2022 09:32
Show Gist options
  • Save bonzini/b917e978959c08ebe6898aea12ec569b to your computer and use it in GitHub Desktop.
Save bonzini/b917e978959c08ebe6898aea12ec569b to your computer and use it in GitHub Desktop.
termostato con zigbee2mqtt
#! /usr/bin/env python3
from collections import defaultdict
import asyncio
import paho.mqtt.client
import json
import weakref
class ActiveSubscription:
def __init__(self, topic, obj, client):
# when obj dies, all references between the client and
# obj's corresponding ActiveSubscription go away
self.topic = topic
self.client = client
self.wr = weakref.ref(obj, self.unsubscribe)
def unsubscribe(self, wr):
if self.wr is None:
return
self.wr = None
self.client.do_unsubscribe(self.topic, wr)
class MqttClient:
def __init__(self, host, port=1883, keepalive=60, bind_address="",
username=None, password=None,
client_id="", transport="tcp"):
self.loop = asyncio.get_event_loop()
self.client_id = client_id or 'mqtt'
self.subscribers = defaultdict(set)
self.subscriptions = dict()
self.client = paho.mqtt.client.Client(client_id, transport=transport)
self.client.on_message = lambda client, userdata, message: \
self.loop.call_soon_threadsafe(self.on_message, message)
if username:
self.client.username_pw_set(username, password)
self.client.connect(host, port, keepalive, bind_address)
self.client.loop_start()
def on_message(self, message):
#print(f'[{self.client_id}]', message.topic, message.payload)
for wr in list(self.subscribers[message.topic]):
obj = wr()
if obj is not None:
obj.on_message(message)
def will_set(self, topic, payload=None, qos=0, retain=False):
self.client.will_set(topic, payload, qos, retain)
def publish(self, topic, payload=None, qos=0, retain=False):
self.client.publish(topic, payload, qos, retain)
def is_subscribed(self, obj):
return weakref.ref(obj) in self.subscriptions
def subscribe(self, obj, topic):
if self.is_subscribed(obj):
raise Exception("two subscriptions for the same object")
need_subscribe = not self.subscribers[topic]
proxy = ActiveSubscription(topic, obj, self)
self.subscribers[topic].add(proxy.wr)
self.subscriptions[proxy.wr] = proxy
if need_subscribe:
self.client.subscribe(topic)
print(f'[{self.client_id}] subscribe', topic)
def do_unsubscribe(self, topic, wr):
self.subscribers[topic].remove(wr)
del self.subscriptions[wr]
if not self.subscribers[topic]:
self.client.unsubscribe(topic)
print(f'[{self.client_id}] unsubscribe', topic)
def unsubscribe(self, obj):
wr = weakref.ref(obj)
proxy = self.subscriptions[wr]
proxy.unsubscribe(wr)
class Topic:
def __init__(self, client, topic, json=False):
self.loop = asyncio.get_event_loop()
self.client = client
self.topic = topic
self.future = self.loop.create_future()
self.json = json
self.client.subscribe(self, topic)
def on_message(self, msg):
value = msg.payload.decode()
self.value = json.loads(value) if self.json else value
if not self.future.done():
self.future.set_result(True)
async def get(self):
await self.future
return self.value
def close(self):
self.client.unsubscribe(self)
class Subscription:
def __init__(self, client, topic, queue, json=False):
self.client = client
self.queue = queue
self.json = json
self.client.subscribe(self, topic)
def on_message(self, msg):
value = msg.payload.decode()
value = json.loads(value) if self.json else value
self.queue.put_nowait((msg.topic, value))
def close(self):
self.client.unsubscribe(self)
#! /usr/bin/env python3
import argparse
import asyncio
import atexit
import contextlib
import time
from mqtt import MqttClient, Topic, Subscription
from utils import repeat, run_then_cancel
SECONDI_COPIA = 3600
SECONDI_CONTROLLO = 60
class Termostato:
def __init__(self, client):
self.modo = Topic(client, 'controllo-temperatura/modo')
self.t1 = Topic(client, 'controllo-temperatura/t1')
self.t2 = Topic(client, 'controllo-temperatura/t2')
async def temp_obiettivo(self):
ora = time.localtime().tm_hour
modo = await self.modo.get()
if modo == 'spento':
return 6
if modo == 'basso' or (modo == 'acceso' and (ora < 6 or ora > 20)):
return float(await self.t1.get())
else:
return float(await self.t2.get())
class Controllo:
def __init__(self, client, controllo, in_base, out_base):
self.client = client
self.controllo = controllo
self.obiettivo = None
self.tempo = 100
self.stato_desiderato = None
self.sensore = Topic(client, in_base, json=True)
self.presa = Topic(client, out_base, json=True)
self.out_topic = out_base + '/set/state'
async def una_volta(self):
self.tempo += 1
stato = (await self.presa.get())['state']
sensore = (await self.sensore.get())['temperature']
obiettivo = await self.controllo.temp_obiettivo()
if stato != self.stato_desiderato or obiettivo != self.obiettivo or self.tempo >= 10:
self.obiettivo = obiettivo
if stato == 'ON':
self.stato_desiderato = 'OFF' if sensore > obiettivo + 0.5 else 'ON'
elif stato == 'OFF':
self.stato_desiderato = 'ON' if sensore < obiettivo - 0.5 else 'OFF'
print(self.out_topic, self.tempo, sensore, stato, obiettivo, self.stato_desiderato)
if self.stato_desiderato != stato:
self.client.publish(self.out_topic, self.stato_desiderato)
self.tempo = 0
class CopiaPeriodica:
def __init__(self, in_client, out_client, prefix, topics):
self.out_client = out_client
self.prefix = prefix
self.topics = [Topic(in_client, x) for x in topics]
async def una_volta(self):
for t in self.topics:
payload = await t.get()
self.out_client.publish(t.topic, await t.get(), retain=True)
#print(self.prefix, t.topic, payload)
class Copia:
def __init__(self, in_client, out_client, prefix, topics):
self.queue = asyncio.Queue()
self.out_client = out_client
self.prefix = prefix
self.subscriptions = [Subscription(in_client, x, self.queue) for x in topics]
async def vai(self):
with contextlib.suppress(asyncio.CancelledError):
while True:
topic, payload = await self.queue.get()
self.out_client.publish(topic, payload, retain=True)
#print(self.prefix, topic, payload)
def leggi(esterno, locale):
leggi_esterno = Copia(esterno, locale, '<<', ['controllo-temperatura/modo',
'controllo-temperatura/t1',
'controllo-temperatura/t2'])
return leggi_esterno.vai()
def pubblica(locale, esterno):
scrivi_esterno = CopiaPeriodica(locale, esterno, '>>', ['zigbee2mqtt/temp1', 'zigbee2mqtt/presa1',
'zigbee2mqtt/temp2', 'zigbee2mqtt/presa2'])
return repeat(scrivi_esterno.una_volta, SECONDI_COPIA)
def controllo(locale, in_base, out_base):
termostato = Termostato(locale)
controllo = Controllo(locale, termostato, in_base, out_base)
return repeat(controllo.una_volta, SECONDI_CONTROLLO)
def main(loop):
parser = argparse.ArgumentParser(description='termostato')
parser.add_argument('-u', '--username', metavar='USER', help='username')
parser.add_argument('server', metavar='HOST', nargs='?', help='host')
args = parser.parse_args()
locale = MqttClient('localhost', client_id='locale')
loop.create_task(controllo(locale, 'zigbee2mqtt/temp1', 'zigbee2mqtt/presa1'), name='locale1')
loop.create_task(controllo(locale, 'zigbee2mqtt/temp2', 'zigbee2mqtt/presa2'), name='locale2')
if args.server:
esterno = MqttClient(args.server, client_id='esterno', username=args.username, keepalive=60)
esterno.will_set('controllo-temperatura/attivo', '0', retain=True)
esterno.publish('controllo-temperatura/attivo', '1', retain=True)
atexit.register(esterno.publish, 'controllo-temperatura/attivo', '0', retain=True)
loop.create_task(leggi(esterno, locale), name='leggi')
loop.create_task(pubblica(locale, esterno), name='pubblica')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
main(loop)
run_then_cancel(loop)
#! /usr/bin/env python3
import asyncio
import contextlib
import signal
async def repeat(func, interval):
with contextlib.suppress(asyncio.CancelledError):
while True:
loop = asyncio.get_running_loop()
timer = loop.create_future()
h = loop.call_later(interval, timer.set_result, True)
try:
await func()
await timer
except Exception as e:
h.cancel()
if not timer.done():
timer.cancel()
raise e
h.cancel()
def run_then_cancel(loop):
loop.add_signal_handler(signal.SIGTERM, loop.stop)
loop.add_signal_handler(signal.SIGINT, loop.stop)
try:
loop.run_forever()
finally:
for t in asyncio.all_tasks(loop=loop):
if not (t.done() or t.cancelled()):
t.cancel()
loop.run_until_complete(t)
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment