Skip to content

Instantly share code, notes, and snippets.

@basuke
Created May 7, 2013 07:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save basuke/5530835 to your computer and use it in GitHub Desktop.
Save basuke/5530835 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8
import RPi.GPIO as GPIO
import mosquitto
import sys
from datetime import datetime
PINS = (18, 23, 24, 25)
SERVER = "sample.example.com"
def main():
# setup MQTT
conn = MQTTServer(SERVER)
conn.connect()
# setup GPIO
GPIO.setmode(GPIO.BCM)
[GPIO.setup(pin, GPIO.IN) for pin in PINS]
labels = range(1, len(PINS) + 1)
sensors = [Sensor(label, pin) for label, pin in zip(labels, PINS)]
# send initial state
for sensor in sensors:
conn.publish(*sensor.event())
while True:
conn.loop()
for sensor in sensors:
if sensor.check():
conn.publish(*sensor.event())
class Sensor(object):
def __init__(self, label, pin):
self.pin = pin
self.state = GPIO.input(self.pin)
self.topic = "home/kitchen/gas/%s" % label
def check(self):
state = self.state
self.state = GPIO.input(self.pin)
return (state != self.state)
def event(self):
state = "ON" if self.state else "OFF"
msg = ' '.join([state, utctimestamp()])
return (self.topic, msg)
class MQTTServer(object):
def __init__(self, server):
self.server = server
self.client = mosquitto.Mosquitto("Daidokoro-monitor", obj=self)
self.connected = False
def on_connect(self, rc):
if rc != 0:
log("Cannot connect to server. Exit.")
sys.exit(255)
log("Connected successfully.")
self.connected = True
def on_disconnect(self, rc):
log("Disconnected. Try to connect again.")
self.connected = False
self.reconnect()
self.client.on_connect = on_connect
self.client.on_disconnect = on_disconnect
def connect(self):
log("Connecting to MQTT server.")
self.client.connect(self.server)
self.wait_connection()
def reconnect(self):
log("Reconnecting to MQTT server.")
self.client.reconnect()
self.wait_connection()
def publish(self, topic, payload):
log("Publish %s => %s" % (topic, payload))
self.client.publish(topic, payload, retain=True)
def loop(self, timeout=-1):
self.client.loop(timeout=timeout)
def wait_connection(self):
while not self.connected:
self.loop()
def utctimestamp():
now = datetime.utcnow().replace(microsecond=0)
return now.isoformat() + 'Z'
def log(msg):
print msg
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment