-
-
Save Sennevds/98e2f2340eac7323f2723de353fcdd9f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/python3 | |
# importeer de GPIO bibliotheek. | |
import RPi.GPIO as GPIO | |
import yaml | |
import sys | |
import paho.mqtt.client as mqtt | |
import threading, time, signal | |
# Importeer de time biblotheek voor tijdfuncties. | |
from time import sleep | |
flag_connected = 0 | |
broker_url = "192.168.0.190" # MQTT server IP | |
broker_port = 1883 # MQTT server port | |
client = mqtt.Client() | |
deviceName = "Switches" # Name of your PI | |
# Zet de pinmode op Broadcom SOC. | |
GPIO.setmode(GPIO.BCM) | |
# Zet waarschuwingen uit. | |
GPIO.setwarnings(False) | |
class ProgramKilled(Exception): | |
pass | |
def signal_handler(signum, frame): | |
raise ProgramKilled | |
def printstdout(value): | |
print(value) | |
sys.stdout.flush() | |
class Job(threading.Thread): | |
def __init__(self, interval, execute, *args, **kwargs): | |
threading.Thread.__init__(self) | |
self.daemon = False | |
self.stopped = threading.Event() | |
self.interval = interval | |
self.execute = execute | |
self.args = args | |
self.kwargs = kwargs | |
def stop(self): | |
self.stopped.set() | |
self.join() | |
def run(self): | |
while not self.stopped.wait(self.interval.total_seconds()): | |
self.execute(*self.args, **self.kwargs) | |
def on_connect(client, userdata, flags, rc): | |
global flag_connected | |
flag_connected = 1 | |
def on_disconnect(client, userdata, rc): | |
global flag_connected | |
flag_connected = 0 | |
# Deze functie wordt uitgevoerd als er op de knop gedrukt is. | |
def pressed(pin): | |
topic = settings["ports"][pin]["topic"] | |
printstdout("Er is gedrukt!, interrupt op pin: {}".format(str(topic))) | |
client.publish("custom/switches/{}/state".format(topic), '{ "value": "ON" }') | |
def buttonPressed(pinRpi, pinHat): | |
printstdout("Er is gedrukt!, interrupt op pin: {}".format(str(pinRpi))) | |
pressed(pinHat) | |
GPIO.setup(17, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(27, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(23, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(22, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(24, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(10, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(25, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(9, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(11, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.setup(5, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
GPIO.add_event_detect( | |
17, GPIO.FALLING, callback=lambda x: buttonPressed(x, 1), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
27, GPIO.FALLING, callback=lambda x: buttonPressed(x, 2), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
23, GPIO.FALLING, callback=lambda x: buttonPressed(x, 3), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
22, GPIO.FALLING, callback=lambda x: buttonPressed(x, 4), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
24, GPIO.FALLING, callback=lambda x: buttonPressed(x, 5), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
10, GPIO.FALLING, callback=lambda x: buttonPressed(x, 6), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
25, GPIO.FALLING, callback=lambda x: buttonPressed(x, 7), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
9, GPIO.FALLING, callback=lambda x: buttonPressed(x, 8), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
11, GPIO.FALLING, callback=lambda x: buttonPressed(x, 9), bouncetime=400 | |
) | |
GPIO.add_event_detect( | |
5, GPIO.FALLING, callback=lambda x: buttonPressed(x, 10), bouncetime=400 | |
) | |
if __name__ == "__main__": | |
with open("/home/pi/24V/settings.yaml") as f: | |
# use safe_load instead load | |
settings = yaml.safe_load(f) | |
client.on_connect = on_connect | |
client.on_disconnect = on_disconnect | |
client.connect(broker_url, broker_port) | |
client.loop() | |
for port in settings["ports"]: | |
# GPIO.setup(settings["ports"][port]["pin"], GPIO.IN, pull_up_down=GPIO.PUD_DOWN) | |
# GPIO.add_event_detect(settings["ports"][port]["pin"], GPIO.FALLING, callback=lambda x: buttonPressed(x, port), bouncetime=400) | |
publishString = """{{ | |
"name": "{0}", | |
"command_topic": "custom/switches/{1}/state", | |
"value_template": "{{{{ value_json.value }}}}", | |
"unique_id": "switches_{1}", | |
"device": {{ | |
"identifiers": [ | |
"switches" | |
], | |
"name": "Switches", | |
"model": "B2", | |
"manufacturer": "Raspberry PI" | |
}} | |
}}""".format( | |
settings["ports"][port]["name"], settings["ports"][port]["topic"] | |
) | |
client.publish( | |
"homeassistant/switch/{}/config".format(settings["ports"][port]["topic"]), | |
publishString, | |
retain=True, | |
) | |
while True: | |
try: | |
if flag_connected == 1: | |
client.loop() | |
else: | |
client.connect(broker_url, broker_port) | |
time.sleep(1) | |
except ProgramKilled: | |
printstdout("Program killed: running cleanup code") | |
GPIO.cleanup() | |
sys.stdout.flush() | |
break | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ports: | |
1: | |
topic: allOff | |
name: All off | |
pin: 17 | |
2: | |
topic: toilet | |
name: Toilet | |
pin: 27 | |
3: | |
topic: hallway | |
name: Hallway | |
pin: 23 | |
4: | |
topic: hall | |
name: Hall | |
pin: 22 | |
# 5: | |
# topic: hall | |
# name: Hall | |
# pin: 24 | |
# 6: | |
# topic: test6 | |
# name: test6 | |
# pin: 10 | |
# 7: | |
# topic: test7 | |
# name: test7 | |
# pin: 25 | |
# 8: | |
# topic: test8 | |
# name: test8 | |
# pin: 9 | |
# 9: | |
# topic: test9 | |
# name: test9 | |
# pin: 11 | |
# 10: | |
# topic: test10 | |
# name: test10 | |
# pin: 5 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment