Skip to content

Instantly share code, notes, and snippets.

@andiwand
Created November 12, 2018 18:35
Show Gist options
  • Save andiwand/36df175bc9b332a471ab1bae7e530fd0 to your computer and use it in GitHub Desktop.
Save andiwand/36df175bc9b332a471ab1bae7e530fd0 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import socket
import struct
import threading
def arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--tcp-port', type=int, default=5004)
parser.add_argument('--multicast-address', type=str, default='239.255.42.42')
parser.add_argument('--multicast-port', type=int, default=5004)
parser.add_argument('--max-packet-size', type=int, default=65507)
parser.add_argument('--max-cache-size', type=int, default=4*1024**2)
return parser.parse_args()
def configuration(args):
return {
'tcp_port': args.tcp_port,
'multicast_address': args.multicast_address,
'multicast_port': args.multicast_port,
'max_packet_size': args.max_packet_size,
'max_cache_size': args.max_cache_size,
}
def initialisation(config):
multicastsocket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
multicastsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
multicastsocket.bind(('', config['multicast_port']))
mreq = struct.pack('=4sl', socket.inet_aton(config['multicast_address']), socket.INADDR_ANY)
multicastsocket.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('0.0.0.0', config['tcp_port']))
serversocket.listen(1)
return {
'multicastsocket': multicastsocket,
'serversocket': serversocket,
'cache': [],
'cache_size': 0,
'lock': threading.Lock(),
}
def destroy(state):
state['multicastsocket'].close()
state['serversocket'].close()
def producer(config, state):
while True:
data = state['multicastsocket'].recvfrom(config['max_packet_size'])[0]
if len(data) <= 0:
continue
state['lock'].acquire()
state['cache'].append(data)
state['cache_size'] += len(data)
while state['cache_size'] > config['max_cache_size']:
front = state['cache'].pop(0)
state['cache_size'] -= len(front)
state['lock'].release()
def consumer(config, state):
def worker(state, connection):
try:
state['lock'].acquire()
cache = state['cache'].copy()
state['lock'].release()
for data in cache:
connection.send(data)
finally:
connection.close()
while True:
connection, address = state['serversocket'].accept()
w = threading.Thread(target=worker, args=(state, connection))
w.start()
def main():
args = arguments()
config = configuration(args)
state = initialisation(config)
t = threading.Thread(target=producer, args=(config, state))
t.start()
try:
consumer(config, state)
finally:
destroy(state)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment