Created
June 12, 2020 15:05
-
-
Save giuliom95/a763896422d76c64c4f9c7e4c70971d8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import serial | |
import socket | |
import spidev | |
import sys | |
TCP_OK_CODE = b'ok' | |
TCP_ERR_CODE = b'err' | |
TCP_IP = '' | |
TCP_PORT = 5005 | |
TCP_BUFFER_SIZE = 20 | |
AAP_RELEASE_CMD = b'\xff\x55\x03\x02\x00\x00\xFB' | |
volume = 127 | |
def genAapCmd(cmd: bytes, ser: serial.Serial): | |
def _genAapCmd(remotecmd: bytes): | |
ser.write(cmd + AAP_RELEASE_CMD) | |
return TCP_OK_CODE | |
return _genAapCmd | |
def genVolCmd(offset: int, spi: spidev.SpiDev): | |
def _genVolCmd(remotecmd: bytes): | |
global volume | |
volume += offset | |
if volume < 0: volume = 0 | |
if volume > 127: volume = 127 | |
spi.writebytes(b'\x13' + bytes([volume])) | |
return TCP_OK_CODE | |
return _genVolCmd | |
def genVolSetCmd(spi: spidev.SpiDev): | |
def _genVolSetCmd(remotecmd: bytes): | |
global volume | |
volume = int(remotecmd[6:]) | |
if volume < 0: volume = 0 | |
if volume > 127: volume = 127 | |
spi.writebytes(b'\x13' + bytes([volume])) | |
return TCP_OK_CODE | |
return _genVolSetCmd | |
if __name__ == "__main__": | |
spi = spidev.SpiDev() | |
try: | |
ser_port = sys.argv[1] | |
ser = serial.Serial(ser_port, 19200) | |
except Exception as e: | |
print('Give me a valid serial port address. Error message:', e) | |
sys.exit(1) | |
try: | |
spi_addr = list(map(int, sys.argv[2].split(','))) | |
spi.open(*spi_addr) | |
spi.max_speed_hz = 50000 | |
except Exception as e: | |
print('Give me a valid SPI address in the form `<x>,<y>`. Error message:', e) | |
sys.exit(1) | |
genVolSetCmd(spi)(b'volset'+bytes(str(127), 'utf8')) | |
actions = { | |
b'play': genAapCmd(b'\xff\x55\x03\x02\x00\x01\xFA', ser), | |
b'next': genAapCmd(b'\xff\x55\x03\x02\x00\x08\xF3', ser), | |
b'prev': genAapCmd(b'\xff\x55\x03\x02\x00\x10\xEB', ser), | |
b'on': genAapCmd(b'\xff\x55\x05\x02\x00\x00\x00\x08\xF1', ser), | |
b'off': genAapCmd(b'\xff\x55\x05\x02\x00\x00\x00\x04\xF5', ser), | |
b'voldown': genVolCmd(-5, spi), | |
b'volup': genVolCmd(+5, spi), | |
b'volset': genVolSetCmd(spi), | |
b'quit': sys.exit, | |
} | |
actionskeys = actions.keys() | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
s.bind((TCP_IP, TCP_PORT)) | |
s.listen() | |
print('Listening on ' + str(TCP_IP) + ':' + str(TCP_PORT) + '...') | |
gottaquit = False | |
while True: | |
if gottaquit: break | |
conn, addr = s.accept() | |
print('Connected by', addr) | |
while True: | |
if gottaquit: break | |
data = conn.recv(TCP_BUFFER_SIZE) | |
if not data: break | |
try: | |
key = filter(lambda x: data.startswith(x), actionskeys).__next__() | |
response = actions[key](data) | |
except KeyError: | |
response = TCP_ERR_CODE | |
except SystemExit: | |
response = TCP_OK_CODE | |
gottaquit = True | |
conn.sendall(response) | |
conn.close() | |
s.close() | |
spi.close() | |
ser.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment