Last active
August 4, 2016 03:36
-
-
Save kylethebaker/8cb5a24ce18e18427e52e347ca71e09e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import socketserver | |
import logging | |
from random import randint | |
HOST, PORT = "", 999 | |
class ArduinoLED(object): | |
LEDS_AVAILABLE = ["red", "green", "blue"] | |
LED_PINS = {"red": 14, "green": 13, "blue": 12} | |
@classmethod | |
def get_random_led(cls): | |
return str(cls.LED_PINS[cls.LEDS_AVAILABLE[randint(0, 2)]]) | |
@classmethod | |
def get_random_delay(cls): | |
return str(randint(100, 2000)) | |
@classmethod | |
def get_random_highlow(cls): | |
return str(randint(0, 1)) | |
class ArduinoTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): | |
daemon_threads = True | |
allow_reuse_address = True | |
def __init__(self, server_address, RequestHandlerClass): | |
socketserver.TCPServer.__init__( | |
self, server_address, | |
RequestHandlerClass | |
) | |
class ArduinoTCPHandler(socketserver.BaseRequestHandler): | |
def _log_client_action(self, msg): | |
logging.info("[{}] {}".format(self.client_address[0], msg)) | |
def setup(self): | |
logging.info("[{}] Connected".format(self.client_address[0])) | |
def handle(self): | |
# handle the connection until an explicit exit is received | |
while True: | |
# if the connection has been closed by the client recv will | |
# contain an empty string. if this happens we close the | |
# connection on our end too. | |
self.data = self.request.recv(1024) | |
if not self.data: return True | |
self.data = self.data.strip() | |
self._log_client_action("Received: {}".format(self.data)) | |
if self.data == b"PING": | |
self._log_client_action("Sending PONG") | |
self.request.sendall(b"PONG\n") | |
elif self.data == b"GETLED": | |
led = ArduinoLED.get_random_led() | |
direction = ArduinoLED.get_random_highlow() | |
delay = ArduinoLED.get_random_delay() | |
led_code = "{} {} {}".format(led, direction, delay) | |
self._log_client_action("Sending LED Code: {}".format(led_code)) | |
self.request.sendall("{}\n".format(led_code).encode()) | |
elif self.data == b"GOODBYE": | |
return True | |
else: | |
self._log_client_action("{} is unrecognized".format(self.data)) | |
self.request.sendall(b"I don't recognize that.\n") | |
def finish(self): | |
self._log_client_action("Closing Connection") | |
self.request.sendall(b"Bye!\n") | |
if __name__ == "__main__": | |
# setup basic logging | |
logformat = "[%(asctime)s] [%(levelname)s] %(message)s " | |
datefmt = "%m/%d/%y %H:%M:%S" | |
logging.basicConfig(format=logformat, level=logging.DEBUG, datefmt=datefmt) | |
logging.info("Starting server on port " + str(PORT)) | |
# start the server and keep it running until a keyboard interrupt is | |
# received, in which case cleanly shut down | |
server = ArduinoTCPServer((HOST, PORT), ArduinoTCPHandler) | |
try: | |
server.serve_forever() | |
except KeyboardInterrupt: | |
server.shutdown() | |
server.server_close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment