Skip to content

Instantly share code, notes, and snippets.

@jgibbard
Created October 28, 2020 21:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jgibbard/f728dc4e253efeef1d9218bba2a4edd7 to your computer and use it in GitHub Desktop.
Save jgibbard/f728dc4e253efeef1d9218bba2a4edd7 to your computer and use it in GitHub Desktop.
import select, socket, sys, queue, time
from threading import Thread
from threading import Event
import logging
class TcpClient(Thread):
def __init__(self, ipAddress, port, recvQueue, sendQueue, stopEvent):
Thread.__init__(self)
self.sendQueue = sendQueue
self.recvQueue = recvQueue
self.stopEvent = stopEvent
self.ipAddress = ipAddress
self.port = port
self.connected = False
self.logger = logging.getLogger(__name__)
def run(self):
while not self.stopEvent.is_set():
# Keep trying to connect to the server
if not self.connected:
try:
self.logger.debug("Attempting to connect to {}:{}".format(self.ipAddress, self.port))
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.ipAddress, self.port))
# Set socket to not block
self.sock.setblocking(0)
self.connected = True
except:
self.logger.debug("Connecting to {}:{} failed".format(self.ipAddress, self.port))
# Retry every 1 second
self.connected = False
time.sleep(1)
else:
self.logger.info("Successfully connected to slave TCP server {}:{}".format(self.ipAddress, self.port))
# Once connected send and recieve data as required
# Keep socket open
else:
# Always check for new data
inList = [self.sock]
if not self.sendQueue.empty():
outList = [self.sock]
else:
outList = []
# Time out after 0.01 seconds so it doesn't block indefinitely
readable, writable, exceptional = select.select(inList, outList, inList, 0.01)
if readable:
# Get data from socket
try:
data = readable[0].recv(1024)
except:
self.logger.error("Connection to slave TCP server {}:{} terminated unexpectedly".format(self.ipAddress, self.port))
readable[0].close()
self.connected = False
continue
if data:
# Add data to received queue
self.recvQueue.put_nowait(data)
self.logger.debug("Data Rx [{}:{}]: {}".format(self.ipAddress, self.port,data.hex()))
# If there is no data then server has disconnected
else:
self.logger.info("Slave TCP server {}:{} disconnected".format(self.ipAddress, self.port))
readable[0].close()
self.connected = False
continue
if writable:
# If there is any data to send
if not self.sendQueue.empty():
# Get data from the queue - Non blocking
sendData = self.sendQueue.get_nowait()
# Double check that we got data
if sendData:
# Sent data to server
try:
writable[0].send(sendData)
self.logger.debug("Data Tx [{}:{}]: {}".format(self.ipAddress, self.port,sendData.hex()))
except:
self.logger.error("Connection to slave TCP server {}:{} terminated unexpectedly".format(self.ipAddress, self.port))
writable[0].close()
self.connected = False
continue
if exceptional:
self.logger.error("Slave TCP socket returned exception!")
exceptional[0].close()
self.connected = False
continue
# Done, close all socket
if self.connected:
self.sock.close()
if __name__ == '__main__':
# Set up default logging settings
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.DEBUG)
logging.info("Example TCP Client")
rxQueue = queue.Queue()
txQueue = queue.Queue()
stopEvent = Event()
server = TcpClient("localhost", 50000, rxQueue, txQueue, stopEvent)
server.start()
try:
while True:
while not rxQueue.empty():
data = rxQueue.get(block=True, timeout=0.5)
txQueue.put("Hello, World!".encode('utf-8'), block=True, timeout=0.5)
time.sleep(0.1)
except KeyboardInterrupt:
logging.debug("Terminating TCP Client...")
stopEvent.set()
server.join()
logging.info("Done")
import select, socket, sys, queue, time
from threading import Thread
from threading import Event
import logging
class ThreadedServer(Thread):
def __init__(self, port, recvQueue, sendQueue, stopEvent):
Thread.__init__(self)
self.sendQueue = sendQueue
self.recvQueue = recvQueue
self.stopEvent = stopEvent
self.port = port
# Start listening for connections
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setblocking(0)
self.server.bind(('', self.port))
self.server.listen(5)
# List of all clients
self.clients = []
# Dict to store client addresses
self.clientMap = {}
self.logger = logging.getLogger(__name__)
def run(self):
self.logger.info("TCP server running on port {}".format(self.port))
while not self.stopEvent.is_set():
# Check for new clients and data from existing clients
inList = [self.server] + self.clients
# Check existing clients for possibility to send data
if not self.sendQueue.empty():
outList = self.clients
else:
outList = []
# Time out after 0.01 seconds so it doesn't block indefinitely
readable, writable, exceptional = select.select(inList, outList, inList, 0.01)
for s in readable:
# Check for new connections
if s is self.server:
# Add any new connections to the input list
connection, client_address = s.accept()
# Store the address of the client in a dictionary
self.clientMap[connection] = client_address[0]
# Non blocking
connection.setblocking(0)
# Add new client to list of clients
self.clients.append(connection)
self.logger.info("Client connected: {}".format(client_address[0]))
# Receive any data from each of the connections
else:
clientIp = self.lookupIP(s)
if not clientIp:
break
# Receive data from clients
try:
data = s.recv(1024)
except (ConnectionAbortedError, ConnectionResetError):
self.logger.info("Client Disconnected: {}".format(clientIp))
self.clients.remove(s)
del self.clientMap[s]
s.close()
continue
if data:
# Add tuple of ip and data to received queue
self.recvQueue.put_nowait((clientIp, data))
self.logger.debug("Data Rx [{}]: {}".format(clientIp, data.hex()))
# If there is no data then client has disconnected
else:
self.logger.info("Client Disconnected: {}".format(clientIp))
s.close()
self.clients.remove(s)
del self.clientMap[s]
continue
# If there are any sockets available to send data
if writable:
# If there is any data to send
if not self.sendQueue.empty():
# Get data from the queue - Non blocking
sendData = self.sendQueue.get_nowait()
# Double check that we got data
if sendData:
# Send the same data to all writable sockets
for s in writable:
clientIp = self.lookupIP(s)
if not clientIp:
break
try:
s.send(sendData)
self.logger.debug("Data Tx [{}]: {}".format(clientIp, sendData.hex()))
except:
self.logger.info("Client Disconnected: {}".format(clientIp))
s.close()
self.clients.remove(s)
del self.clientMap[s]
continue
for s in exceptional:
s.close()
self.clients.remove(s)
del self.clientMap[s]
# Done, close all sockets
for client in self.clients:
client.close()
self.server.close()
def lookupIP(self, socketToLookup):
# Look up IP address of client
try:
return self.clientMap[socketToLookup]
except KeyError:
self.logger.critical("Client not in address lookup table.")
return None
if __name__ == '__main__':
# Set up default logging settings
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.DEBUG)
logging.info("Example TCP Server")
rxQueue = queue.Queue()
txQueue = queue.Queue()
stopEvent = Event()
server = ThreadedServer(50000, rxQueue, txQueue, stopEvent)
server.start()
try:
while True:
if txQueue.empty():
txQueue.put("THIS IS A TEST!".encode('utf-8'), block=True, timeout=0.5)
while not rxQueue.empty():
address, data = rxQueue.get(block=True, timeout=0.5)
time.sleep(0.5)
except KeyboardInterrupt:
logging.debug("Terminating TCP server...")
stopEvent.set()
server.join()
logging.info("Done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment