Created
October 28, 2020 21:50
-
-
Save jgibbard/f728dc4e253efeef1d9218bba2a4edd7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import select, socket, sys, queue, time | |
from threading import Thread | |
from threading import Event | |
import logging | |
class TcpClient(Thread): | |
def __init__(self, ipAddress, port, recvQueue, sendQueue, stopEvent): | |
Thread.__init__(self) | |
self.sendQueue = sendQueue | |
self.recvQueue = recvQueue | |
self.stopEvent = stopEvent | |
self.ipAddress = ipAddress | |
self.port = port | |
self.connected = False | |
self.logger = logging.getLogger(__name__) | |
def run(self): | |
while not self.stopEvent.is_set(): | |
# Keep trying to connect to the server | |
if not self.connected: | |
try: | |
self.logger.debug("Attempting to connect to {}:{}".format(self.ipAddress, self.port)) | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.sock.connect((self.ipAddress, self.port)) | |
# Set socket to not block | |
self.sock.setblocking(0) | |
self.connected = True | |
except: | |
self.logger.debug("Connecting to {}:{} failed".format(self.ipAddress, self.port)) | |
# Retry every 1 second | |
self.connected = False | |
time.sleep(1) | |
else: | |
self.logger.info("Successfully connected to slave TCP server {}:{}".format(self.ipAddress, self.port)) | |
# Once connected send and recieve data as required | |
# Keep socket open | |
else: | |
# Always check for new data | |
inList = [self.sock] | |
if not self.sendQueue.empty(): | |
outList = [self.sock] | |
else: | |
outList = [] | |
# Time out after 0.01 seconds so it doesn't block indefinitely | |
readable, writable, exceptional = select.select(inList, outList, inList, 0.01) | |
if readable: | |
# Get data from socket | |
try: | |
data = readable[0].recv(1024) | |
except: | |
self.logger.error("Connection to slave TCP server {}:{} terminated unexpectedly".format(self.ipAddress, self.port)) | |
readable[0].close() | |
self.connected = False | |
continue | |
if data: | |
# Add data to received queue | |
self.recvQueue.put_nowait(data) | |
self.logger.debug("Data Rx [{}:{}]: {}".format(self.ipAddress, self.port,data.hex())) | |
# If there is no data then server has disconnected | |
else: | |
self.logger.info("Slave TCP server {}:{} disconnected".format(self.ipAddress, self.port)) | |
readable[0].close() | |
self.connected = False | |
continue | |
if writable: | |
# If there is any data to send | |
if not self.sendQueue.empty(): | |
# Get data from the queue - Non blocking | |
sendData = self.sendQueue.get_nowait() | |
# Double check that we got data | |
if sendData: | |
# Sent data to server | |
try: | |
writable[0].send(sendData) | |
self.logger.debug("Data Tx [{}:{}]: {}".format(self.ipAddress, self.port,sendData.hex())) | |
except: | |
self.logger.error("Connection to slave TCP server {}:{} terminated unexpectedly".format(self.ipAddress, self.port)) | |
writable[0].close() | |
self.connected = False | |
continue | |
if exceptional: | |
self.logger.error("Slave TCP socket returned exception!") | |
exceptional[0].close() | |
self.connected = False | |
continue | |
# Done, close all socket | |
if self.connected: | |
self.sock.close() | |
if __name__ == '__main__': | |
# Set up default logging settings | |
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.DEBUG) | |
logging.info("Example TCP Client") | |
rxQueue = queue.Queue() | |
txQueue = queue.Queue() | |
stopEvent = Event() | |
server = TcpClient("localhost", 50000, rxQueue, txQueue, stopEvent) | |
server.start() | |
try: | |
while True: | |
while not rxQueue.empty(): | |
data = rxQueue.get(block=True, timeout=0.5) | |
txQueue.put("Hello, World!".encode('utf-8'), block=True, timeout=0.5) | |
time.sleep(0.1) | |
except KeyboardInterrupt: | |
logging.debug("Terminating TCP Client...") | |
stopEvent.set() | |
server.join() | |
logging.info("Done") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import select, socket, sys, queue, time | |
from threading import Thread | |
from threading import Event | |
import logging | |
class ThreadedServer(Thread): | |
def __init__(self, port, recvQueue, sendQueue, stopEvent): | |
Thread.__init__(self) | |
self.sendQueue = sendQueue | |
self.recvQueue = recvQueue | |
self.stopEvent = stopEvent | |
self.port = port | |
# Start listening for connections | |
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.server.setblocking(0) | |
self.server.bind(('', self.port)) | |
self.server.listen(5) | |
# List of all clients | |
self.clients = [] | |
# Dict to store client addresses | |
self.clientMap = {} | |
self.logger = logging.getLogger(__name__) | |
def run(self): | |
self.logger.info("TCP server running on port {}".format(self.port)) | |
while not self.stopEvent.is_set(): | |
# Check for new clients and data from existing clients | |
inList = [self.server] + self.clients | |
# Check existing clients for possibility to send data | |
if not self.sendQueue.empty(): | |
outList = self.clients | |
else: | |
outList = [] | |
# Time out after 0.01 seconds so it doesn't block indefinitely | |
readable, writable, exceptional = select.select(inList, outList, inList, 0.01) | |
for s in readable: | |
# Check for new connections | |
if s is self.server: | |
# Add any new connections to the input list | |
connection, client_address = s.accept() | |
# Store the address of the client in a dictionary | |
self.clientMap[connection] = client_address[0] | |
# Non blocking | |
connection.setblocking(0) | |
# Add new client to list of clients | |
self.clients.append(connection) | |
self.logger.info("Client connected: {}".format(client_address[0])) | |
# Receive any data from each of the connections | |
else: | |
clientIp = self.lookupIP(s) | |
if not clientIp: | |
break | |
# Receive data from clients | |
try: | |
data = s.recv(1024) | |
except (ConnectionAbortedError, ConnectionResetError): | |
self.logger.info("Client Disconnected: {}".format(clientIp)) | |
self.clients.remove(s) | |
del self.clientMap[s] | |
s.close() | |
continue | |
if data: | |
# Add tuple of ip and data to received queue | |
self.recvQueue.put_nowait((clientIp, data)) | |
self.logger.debug("Data Rx [{}]: {}".format(clientIp, data.hex())) | |
# If there is no data then client has disconnected | |
else: | |
self.logger.info("Client Disconnected: {}".format(clientIp)) | |
s.close() | |
self.clients.remove(s) | |
del self.clientMap[s] | |
continue | |
# If there are any sockets available to send data | |
if writable: | |
# If there is any data to send | |
if not self.sendQueue.empty(): | |
# Get data from the queue - Non blocking | |
sendData = self.sendQueue.get_nowait() | |
# Double check that we got data | |
if sendData: | |
# Send the same data to all writable sockets | |
for s in writable: | |
clientIp = self.lookupIP(s) | |
if not clientIp: | |
break | |
try: | |
s.send(sendData) | |
self.logger.debug("Data Tx [{}]: {}".format(clientIp, sendData.hex())) | |
except: | |
self.logger.info("Client Disconnected: {}".format(clientIp)) | |
s.close() | |
self.clients.remove(s) | |
del self.clientMap[s] | |
continue | |
for s in exceptional: | |
s.close() | |
self.clients.remove(s) | |
del self.clientMap[s] | |
# Done, close all sockets | |
for client in self.clients: | |
client.close() | |
self.server.close() | |
def lookupIP(self, socketToLookup): | |
# Look up IP address of client | |
try: | |
return self.clientMap[socketToLookup] | |
except KeyError: | |
self.logger.critical("Client not in address lookup table.") | |
return None | |
if __name__ == '__main__': | |
# Set up default logging settings | |
logging.basicConfig(format='[%(levelname)s] %(message)s', level=logging.DEBUG) | |
logging.info("Example TCP Server") | |
rxQueue = queue.Queue() | |
txQueue = queue.Queue() | |
stopEvent = Event() | |
server = ThreadedServer(50000, rxQueue, txQueue, stopEvent) | |
server.start() | |
try: | |
while True: | |
if txQueue.empty(): | |
txQueue.put("THIS IS A TEST!".encode('utf-8'), block=True, timeout=0.5) | |
while not rxQueue.empty(): | |
address, data = rxQueue.get(block=True, timeout=0.5) | |
time.sleep(0.5) | |
except KeyboardInterrupt: | |
logging.debug("Terminating TCP server...") | |
stopEvent.set() | |
server.join() | |
logging.info("Done") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment