Skip to content

Instantly share code, notes, and snippets.

@kylethebaker
Last active August 4, 2016 03:36
Show Gist options
  • Save kylethebaker/8cb5a24ce18e18427e52e347ca71e09e to your computer and use it in GitHub Desktop.
Save kylethebaker/8cb5a24ce18e18427e52e347ca71e09e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import socketserver
import logging
from random import randint
HOST, PORT = "", 999
class ArduinoLED(object):
LEDS_AVAILABLE = ["red", "green", "blue"]
LED_PINS = {"red": 14, "green": 13, "blue": 12}
@classmethod
def get_random_led(cls):
return str(cls.LED_PINS[cls.LEDS_AVAILABLE[randint(0, 2)]])
@classmethod
def get_random_delay(cls):
return str(randint(100, 2000))
@classmethod
def get_random_highlow(cls):
return str(randint(0, 1))
class ArduinoTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
daemon_threads = True
allow_reuse_address = True
def __init__(self, server_address, RequestHandlerClass):
socketserver.TCPServer.__init__(
self, server_address,
RequestHandlerClass
)
class ArduinoTCPHandler(socketserver.BaseRequestHandler):
def _log_client_action(self, msg):
logging.info("[{}] {}".format(self.client_address[0], msg))
def setup(self):
logging.info("[{}] Connected".format(self.client_address[0]))
def handle(self):
# handle the connection until an explicit exit is received
while True:
# if the connection has been closed by the client recv will
# contain an empty string. if this happens we close the
# connection on our end too.
self.data = self.request.recv(1024)
if not self.data: return True
self.data = self.data.strip()
self._log_client_action("Received: {}".format(self.data))
if self.data == b"PING":
self._log_client_action("Sending PONG")
self.request.sendall(b"PONG\n")
elif self.data == b"GETLED":
led = ArduinoLED.get_random_led()
direction = ArduinoLED.get_random_highlow()
delay = ArduinoLED.get_random_delay()
led_code = "{} {} {}".format(led, direction, delay)
self._log_client_action("Sending LED Code: {}".format(led_code))
self.request.sendall("{}\n".format(led_code).encode())
elif self.data == b"GOODBYE":
return True
else:
self._log_client_action("{} is unrecognized".format(self.data))
self.request.sendall(b"I don't recognize that.\n")
def finish(self):
self._log_client_action("Closing Connection")
self.request.sendall(b"Bye!\n")
if __name__ == "__main__":
# setup basic logging
logformat = "[%(asctime)s] [%(levelname)s] %(message)s "
datefmt = "%m/%d/%y %H:%M:%S"
logging.basicConfig(format=logformat, level=logging.DEBUG, datefmt=datefmt)
logging.info("Starting server on port " + str(PORT))
# start the server and keep it running until a keyboard interrupt is
# received, in which case cleanly shut down
server = ArduinoTCPServer((HOST, PORT), ArduinoTCPHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
server.shutdown()
server.server_close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment