Skip to content

Instantly share code, notes, and snippets.

@erget
Created November 19, 2017 14:33
Show Gist options
  • Save erget/2d1789fe3263e275caffee4bbc2ebf7f to your computer and use it in GitHub Desktop.
Save erget/2d1789fe3263e275caffee4bbc2ebf7f to your computer and use it in GitHub Desktop.
An (incomplete) example of how to control the FSE 2017 robot over a network connection.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of FSE 2017.
#
# FSE 2017 is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# FSE 2017 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with FSE 2017. If not, see <http://www.gnu.org/licenses/>.
"""Client for sending commands to remote server controlling robot."""
import socket
import time
from logging import debug
from threading import Thread
import pygame
IP_ADDRESS = "192.168.0.102"
IP_PORT = 22000
KEY_TEXT = {
pygame.K_UP: "forward",
pygame.K_DOWN: "reverse",
pygame.K_LEFT: "left",
pygame.K_RIGHT: "right"
}
class KeyListener(Thread):
"""Listener for key entries."""
BUFFER_SIZE = 4096
END_OF_MESSAGE_INDICATOR = "\0"
def __init__(self):
"""Try to establish connection to remote server."""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
debug("Connecting...")
try:
self.sock.connect((IP_ADDRESS, IP_PORT))
self.connected = True
except socket.gaierror as error:
debug("Connection failed.")
raise RuntimeError(
"Connection to {}:{:d} failed".format(IP_ADDRESS, IP_PORT))
def send_command(self, cmd):
"""Send text to remote server."""
debug("sendCommand() with cmd = " + cmd)
try:
self.sock.sendall(cmd + self.END_OF_MESSAGE_INDICATOR)
except socket.error:
debug("Exception in sendCommand()")
def __enter__(self):
return self
def __exit__(self, *args):
debug("Closing socket")
self.connected = False
self.sock.close()
if __name__ == "__main__":
try:
pygame.init()
pygame.display.set_mode((1, 1))
pygame.key.set_repeat(100, 100)
with KeyListener() as listener:
print("Connection established")
time.sleep(1)
while True:
for event in pygame.event.get():
if event.type == pygame.KEYUP:
listener.send_command("stop")
elif event.type == pygame.KEYDOWN:
listener.send_command(KEY_TEXT[event.key])
else:
pass
except KeyboardInterrupt:
pass
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of FSE 2017.
#
# FSE 2017 is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# FSE 2017 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with FSE 2017. If not, see <http://www.gnu.org/licenses/>.
"""Server to run on a robot which is remotely controlled by network."""
import socket
from logging import debug
from threading import Thread
IP_PORT = 22000
class SocketHandler(Thread):
"""Server to handle incoming commands from remote control agent."""
command_terminator = "\0"
def __init__(self, conn):
Thread.__init__(self)
self.conn = conn
self.connected = False
def run(self):
debug("SocketHandler started")
while True:
cmd = ""
try:
cmd = self.conn.recv(1024)
except socket.error:
debug("Connection was reset by peer.")
break
debug("Received cmd: " + cmd + " len: " + str(len(cmd)))
if not len(cmd):
break
self.execute(cmd)
connection.close()
print("Client disconnected. Waiting for next client...")
self.connected = False
debug("SocketHandler terminated")
def execute(self, cmd):
print("Sending command: {}".format(cmd))
self.conn.sendall(cmd)
cmd = cmd[:-1]
print(cmd)
if __name__ == "__main__":
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Close port when process exits
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
debug("Socket created")
HOSTNAME = "" # Symbolic name for all available interfaces
server_socket.bind((HOSTNAME, IP_PORT))
server_socket.listen(10)
print("Waiting for a connecting client...")
while True:
debug("Calling blocking accept()...")
connection, address = server_socket.accept()
print("Connected with client at " + address[0])
socketHandler = SocketHandler(connection)
# Terminate socket handler with parent thread
socketHandler.setDaemon(True)
socketHandler.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment