Skip to content

Instantly share code, notes, and snippets.

@mitbailey
Last active October 26, 2022 03:29
Show Gist options
  • Save mitbailey/3ffd1193d5177fd18b54aef22d747513 to your computer and use it in GitHub Desktop.
Save mitbailey/3ffd1193d5177fd18b54aef22d747513 to your computer and use it in GitHub Desktop.
#
# @file net.py
# @author Mit Bailey (mitbailey@outlook.com)
# @brief A simple test-enabling prototype for the MMC GUI/Interface <==> Middleware network communications handling layer.
# @version See Git tags for version information.
# @date 2022.10.25
#
# @copyright Copyright (c) 2022
#
#
import socket
import threading
import signal
import time
import sys
# Handles when the user presses ^C.
def sighandler(signal, x):
print("KeyboardInterrupt\n^C")
# This will signal the main thread to move from busy-waiting to waiting on join().
global done
done = True
class Net:
def __init__(self):
self.done = False
self.address = 'localhost'
self.tx_port: int = 52000 + int(input('TX on 52000 + '))
print(self.tx_port)
self.rx_port: int = 52000 + int(input('RX on 52000 + '))
print(self.rx_port)
print()
pal = (self.address, self.rx_port)
self.skt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.skt.bind(pal)
self.skt.setblocking(False)
# self.skt.settimeout(1)
self.tx_tid = threading.Thread(target=self.transmitter)
self.rx_tid = threading.Thread(target=self.receiver)
self.tx_tid.start()
self.rx_tid.start()
def __del__(self):
self.done = True
self.tx_tid.join()
print('Transmitter thread terminated.')
self.rx_tid.join()
print('Receiver thread terminated.')
def transmitter(self):
while self.done == False:
try:
msg = input('')
except Exception:
self.done = True
self.skt.sendto(msg.encode('utf-8'), (self.address, self.tx_port))
def receiver(self):
while self.done == False:
try:
msg, addr = self.skt.recvfrom(4096)
except socket.error as e:
# print(e)
continue
print(msg.decode('utf-8'))
if __name__ == '__main__':
global done
done = False
# Register our ^C callback.
signal.signal(signal.SIGINT, sighandler)
nethandler = Net()
while not done:
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment