Skip to content

Instantly share code, notes, and snippets.

@Sennevds
Created January 2, 2020 15:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sennevds/98e2f2340eac7323f2723de353fcdd9f to your computer and use it in GitHub Desktop.
Save Sennevds/98e2f2340eac7323f2723de353fcdd9f to your computer and use it in GitHub Desktop.
#!/bin/python3
# importeer de GPIO bibliotheek.
import RPi.GPIO as GPIO
import yaml
import sys
import paho.mqtt.client as mqtt
import threading, time, signal
# Importeer de time biblotheek voor tijdfuncties.
from time import sleep
flag_connected = 0
broker_url = "192.168.0.190" # MQTT server IP
broker_port = 1883 # MQTT server port
client = mqtt.Client()
deviceName = "Switches" # Name of your PI
# Zet de pinmode op Broadcom SOC.
GPIO.setmode(GPIO.BCM)
# Zet waarschuwingen uit.
GPIO.setwarnings(False)
class ProgramKilled(Exception):
pass
def signal_handler(signum, frame):
raise ProgramKilled
def printstdout(value):
print(value)
sys.stdout.flush()
class Job(threading.Thread):
def __init__(self, interval, execute, *args, **kwargs):
threading.Thread.__init__(self)
self.daemon = False
self.stopped = threading.Event()
self.interval = interval
self.execute = execute
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
while not self.stopped.wait(self.interval.total_seconds()):
self.execute(*self.args, **self.kwargs)
def on_connect(client, userdata, flags, rc):
global flag_connected
flag_connected = 1
def on_disconnect(client, userdata, rc):
global flag_connected
flag_connected = 0
# Deze functie wordt uitgevoerd als er op de knop gedrukt is.
def pressed(pin):
topic = settings["ports"][pin]["topic"]
printstdout("Er is gedrukt!, interrupt op pin: {}".format(str(topic)))
client.publish("custom/switches/{}/state".format(topic), '{ "value": "ON" }')
def buttonPressed(pinRpi, pinHat):
printstdout("Er is gedrukt!, interrupt op pin: {}".format(str(pinRpi)))
pressed(pinHat)
GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(27, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(22, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(24, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(10, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(25, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(9, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(11, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(5, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.add_event_detect(
17, GPIO.FALLING, callback=lambda x: buttonPressed(x, 1), bouncetime=400
)
GPIO.add_event_detect(
27, GPIO.FALLING, callback=lambda x: buttonPressed(x, 2), bouncetime=400
)
GPIO.add_event_detect(
23, GPIO.FALLING, callback=lambda x: buttonPressed(x, 3), bouncetime=400
)
GPIO.add_event_detect(
22, GPIO.FALLING, callback=lambda x: buttonPressed(x, 4), bouncetime=400
)
GPIO.add_event_detect(
24, GPIO.FALLING, callback=lambda x: buttonPressed(x, 5), bouncetime=400
)
GPIO.add_event_detect(
10, GPIO.FALLING, callback=lambda x: buttonPressed(x, 6), bouncetime=400
)
GPIO.add_event_detect(
25, GPIO.FALLING, callback=lambda x: buttonPressed(x, 7), bouncetime=400
)
GPIO.add_event_detect(
9, GPIO.FALLING, callback=lambda x: buttonPressed(x, 8), bouncetime=400
)
GPIO.add_event_detect(
11, GPIO.FALLING, callback=lambda x: buttonPressed(x, 9), bouncetime=400
)
GPIO.add_event_detect(
5, GPIO.FALLING, callback=lambda x: buttonPressed(x, 10), bouncetime=400
)
if __name__ == "__main__":
with open("/home/pi/24V/settings.yaml") as f:
# use safe_load instead load
settings = yaml.safe_load(f)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.connect(broker_url, broker_port)
client.loop()
for port in settings["ports"]:
# GPIO.setup(settings["ports"][port]["pin"], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# GPIO.add_event_detect(settings["ports"][port]["pin"], GPIO.FALLING, callback=lambda x: buttonPressed(x, port), bouncetime=400)
publishString = """{{
"name": "{0}",
"command_topic": "custom/switches/{1}/state",
"value_template": "{{{{ value_json.value }}}}",
"unique_id": "switches_{1}",
"device": {{
"identifiers": [
"switches"
],
"name": "Switches",
"model": "B2",
"manufacturer": "Raspberry PI"
}}
}}""".format(
settings["ports"][port]["name"], settings["ports"][port]["topic"]
)
client.publish(
"homeassistant/switch/{}/config".format(settings["ports"][port]["topic"]),
publishString,
retain=True,
)
while True:
try:
if flag_connected == 1:
client.loop()
else:
client.connect(broker_url, broker_port)
time.sleep(1)
except ProgramKilled:
printstdout("Program killed: running cleanup code")
GPIO.cleanup()
sys.stdout.flush()
break
ports:
1:
topic: allOff
name: All off
pin: 17
2:
topic: toilet
name: Toilet
pin: 27
3:
topic: hallway
name: Hallway
pin: 23
4:
topic: hall
name: Hall
pin: 22
# 5:
# topic: hall
# name: Hall
# pin: 24
# 6:
# topic: test6
# name: test6
# pin: 10
# 7:
# topic: test7
# name: test7
# pin: 25
# 8:
# topic: test8
# name: test8
# pin: 9
# 9:
# topic: test9
# name: test9
# pin: 11
# 10:
# topic: test10
# name: test10
# pin: 5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment