Last active
October 26, 2022 03:29
-
-
Save mitbailey/3ffd1193d5177fd18b54aef22d747513 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# @file net.py | |
# @author Mit Bailey (mitbailey@outlook.com) | |
# @brief A simple test-enabling prototype for the MMC GUI/Interface <==> Middleware network communications handling layer. | |
# @version See Git tags for version information. | |
# @date 2022.10.25 | |
# | |
# @copyright Copyright (c) 2022 | |
# | |
# | |
import socket | |
import threading | |
import signal | |
import time | |
import sys | |
# Handles when the user presses ^C. | |
def sighandler(signal, x): | |
print("KeyboardInterrupt\n^C") | |
# This will signal the main thread to move from busy-waiting to waiting on join(). | |
global done | |
done = True | |
class Net: | |
def __init__(self): | |
self.done = False | |
self.address = 'localhost' | |
self.tx_port: int = 52000 + int(input('TX on 52000 + ')) | |
print(self.tx_port) | |
self.rx_port: int = 52000 + int(input('RX on 52000 + ')) | |
print(self.rx_port) | |
print() | |
pal = (self.address, self.rx_port) | |
self.skt = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.skt.bind(pal) | |
self.skt.setblocking(False) | |
# self.skt.settimeout(1) | |
self.tx_tid = threading.Thread(target=self.transmitter) | |
self.rx_tid = threading.Thread(target=self.receiver) | |
self.tx_tid.start() | |
self.rx_tid.start() | |
def __del__(self): | |
self.done = True | |
self.tx_tid.join() | |
print('Transmitter thread terminated.') | |
self.rx_tid.join() | |
print('Receiver thread terminated.') | |
def transmitter(self): | |
while self.done == False: | |
try: | |
msg = input('') | |
except Exception: | |
self.done = True | |
self.skt.sendto(msg.encode('utf-8'), (self.address, self.tx_port)) | |
def receiver(self): | |
while self.done == False: | |
try: | |
msg, addr = self.skt.recvfrom(4096) | |
except socket.error as e: | |
# print(e) | |
continue | |
print(msg.decode('utf-8')) | |
if __name__ == '__main__': | |
global done | |
done = False | |
# Register our ^C callback. | |
signal.signal(signal.SIGINT, sighandler) | |
nethandler = Net() | |
while not done: | |
time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment