Skip to content

Instantly share code, notes, and snippets.

@giuliom95
Created June 12, 2020 15:05
Show Gist options
  • Save giuliom95/a763896422d76c64c4f9c7e4c70971d8 to your computer and use it in GitHub Desktop.
Save giuliom95/a763896422d76c64c4f9c7e4c70971d8 to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
import serial
import socket
import spidev
import sys
TCP_OK_CODE = b'ok'
TCP_ERR_CODE = b'err'
TCP_IP = ''
TCP_PORT = 5005
TCP_BUFFER_SIZE = 20
AAP_RELEASE_CMD = b'\xff\x55\x03\x02\x00\x00\xFB'
volume = 127
def genAapCmd(cmd: bytes, ser: serial.Serial):
def _genAapCmd(remotecmd: bytes):
ser.write(cmd + AAP_RELEASE_CMD)
return TCP_OK_CODE
return _genAapCmd
def genVolCmd(offset: int, spi: spidev.SpiDev):
def _genVolCmd(remotecmd: bytes):
global volume
volume += offset
if volume < 0: volume = 0
if volume > 127: volume = 127
spi.writebytes(b'\x13' + bytes([volume]))
return TCP_OK_CODE
return _genVolCmd
def genVolSetCmd(spi: spidev.SpiDev):
def _genVolSetCmd(remotecmd: bytes):
global volume
volume = int(remotecmd[6:])
if volume < 0: volume = 0
if volume > 127: volume = 127
spi.writebytes(b'\x13' + bytes([volume]))
return TCP_OK_CODE
return _genVolSetCmd
if __name__ == "__main__":
spi = spidev.SpiDev()
try:
ser_port = sys.argv[1]
ser = serial.Serial(ser_port, 19200)
except Exception as e:
print('Give me a valid serial port address. Error message:', e)
sys.exit(1)
try:
spi_addr = list(map(int, sys.argv[2].split(',')))
spi.open(*spi_addr)
spi.max_speed_hz = 50000
except Exception as e:
print('Give me a valid SPI address in the form `<x>,<y>`. Error message:', e)
sys.exit(1)
genVolSetCmd(spi)(b'volset'+bytes(str(127), 'utf8'))
actions = {
b'play': genAapCmd(b'\xff\x55\x03\x02\x00\x01\xFA', ser),
b'next': genAapCmd(b'\xff\x55\x03\x02\x00\x08\xF3', ser),
b'prev': genAapCmd(b'\xff\x55\x03\x02\x00\x10\xEB', ser),
b'on': genAapCmd(b'\xff\x55\x05\x02\x00\x00\x00\x08\xF1', ser),
b'off': genAapCmd(b'\xff\x55\x05\x02\x00\x00\x00\x04\xF5', ser),
b'voldown': genVolCmd(-5, spi),
b'volup': genVolCmd(+5, spi),
b'volset': genVolSetCmd(spi),
b'quit': sys.exit,
}
actionskeys = actions.keys()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((TCP_IP, TCP_PORT))
s.listen()
print('Listening on ' + str(TCP_IP) + ':' + str(TCP_PORT) + '...')
gottaquit = False
while True:
if gottaquit: break
conn, addr = s.accept()
print('Connected by', addr)
while True:
if gottaquit: break
data = conn.recv(TCP_BUFFER_SIZE)
if not data: break
try:
key = filter(lambda x: data.startswith(x), actionskeys).__next__()
response = actions[key](data)
except KeyError:
response = TCP_ERR_CODE
except SystemExit:
response = TCP_OK_CODE
gottaquit = True
conn.sendall(response)
conn.close()
s.close()
spi.close()
ser.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment