Skip to content

Instantly share code, notes, and snippets.

@m053m716
Created May 18, 2024 20:04
Show Gist options
  • Save m053m716/467d81521e5ea66db066c23a15b5570e to your computer and use it in GitHub Desktop.
Save m053m716/467d81521e5ea66db066c23a15b5570e to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
# Send UDP messages as UTF-8 char messages like '<channel> <state>' -- sends back '<channel> <state> <code>\r\n' where code is 1 if successful, -1 if state invalid, -2 if channel invalid.
# channel: Should be '1', '2', '3', '4', '5', '6', '7', or '8'
# state: Should be '0' (off) or '1' (on)
import RPi.GPIO as GPIO
import socket
import time
UDP_RELAY_IP = "0.0.0.0"
UDP_RELAY_PORT = 7010 # Access this UDP port for byte messages
PINOUT = [5, 6, 13, 16, 19 ,20, 21, 26] # BCM pins for Relay Hat Module
def initializeRelayPins():
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
for pin in PINOUT:
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, False)
def myNetworkInterface(listener_ip: str = UDP_RELAY_IP, listener_port: int = UDP_RELAY_PORT):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((listener_ip, listener_port))
print(f"Listening for UDP messages on {listener_ip}:{listener_port}...")
while True:
data, addr = sock.recvfrom(8) # Buffer size is 8 bytes
message = data.decode('utf-8').strip()
print(f"Received message: {message} from {addr}")
try:
channel, state = message.split()
channel = int(channel)-1
confirmation_base = "f"{channel+1} {state}"
if 0 <= channel < len(PINOUT):
pin_number = PINOUT[channel]
if state == '1':
GPIO.output(pin_number, True)
confirmation_state = 1
elif state == '0':
GPIO.output(pin_number, False)
confirmation_state = 1
else:
print(f"Unknown state: {state}")
confirmation_state = -1
else:
print(f"Invalid channel number: {channel + 1}")
confirmation_state = -2
confirmation_message = f"{confirmation_base} {confirmation_state}\r\n"
sock.sendto(confirmation_message.encode('utf-8'),addr)
except Exception as e:
print(f"Error processing message: {e}")
if __name__ == "__main__":
initializeRelayPins()
try:
myNetworkInterface()
except KeyboardInterrupt:
for pin in PINOUT:
GPIO.output(pin,False)
GPIO.cleanup()
except Exception as e:
print(f"An error occurred: {e}")
for pin in PINOUT:
GPIO.output(pin,False)
GPIO.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment