Skip to content

Instantly share code, notes, and snippets.

@inkychris
Last active April 19, 2020 14:09
Show Gist options
  • Save inkychris/7eae2dd00098503feff20858a141a3b3 to your computer and use it in GitHub Desktop.
Save inkychris/7eae2dd00098503feff20858a141a3b3 to your computer and use it in GitHub Desktop.
UDP Broadcast Server and Client
import socket
import time
def serve(port):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
while True:
sock.sendto(bytes(time.ctime(), encoding='utf-8'), ('<broadcast>', port))
time.sleep(1)
def monitor(port):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) as sock:
sock.bind(('', port))
while True:
content, address = sock.recvfrom(64)
print(f'\r> {address[0]}:{address[1]}: {content.decode("utf-8")}', end='')
if __name__ == '__main__':
import argparse
import multiprocessing
parser = argparse.ArgumentParser(description='serve and/or monitor UDP broadcast traffic')
parser.add_argument('-p', '--port', type=int, default=5606)
parser.add_argument('-s', '--serve', action='store_true', help='broadcast traffic')
parser.add_argument('-m', '--monitor', action='store_true', help='listen for traffic')
args = parser.parse_args()
if not (args.serve or args.monitor):
parser.error('no action specified: expected at least one of --serve, --monitor')
processes = []
if args.serve:
processes.append(multiprocessing.Process(target=serve, args=(args.port,)))
if args.monitor:
processes.append(multiprocessing.Process(target=monitor, args=(args.port,)))
for process in processes:
process.start()
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print('\nstopping...')
for process in processes:
process.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment