Skip to content

Instantly share code, notes, and snippets.

@anandbaburajan
Last active July 12, 2021 16:43
Show Gist options
  • Save anandbaburajan/ab1777a646b0a8567fd1d6ce4eb542db to your computer and use it in GitHub Desktop.
Save anandbaburajan/ab1777a646b0a8567fd1d6ce4eb542db to your computer and use it in GitHub Desktop.
K2 simulator
import click
import struct
import socket
BLOCK_SIZE = 0x5758
GROUP = "225.1.1.1"
@click.command()
@click.option("--port", type=int)
def main(port):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
sock.bind(('0.0.0.0', port))
mreq = struct.pack('4sL', socket.inet_aton(GROUP), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
data, addr = sock.recvfrom(BLOCK_SIZE)
while data:
data, addr = sock.recvfrom(BLOCK_SIZE)
sock.close()
if __name__ == "__main__":
main()
import os
import click
import glob
import time
import socket
import threading
PORTS = [2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008]
BLOCK_SIZE = 0x5758
GROUP = '225.1.1.1'
class SectorRunner:
def __init__(self, filename, port):
self._filename = filename
self._port = port
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
self._sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 1)
self._filesize = os.stat(self._filename).st_size
self._file = open(self._filename, "rb")
def run(self):
t0 = time.time()
data = self._file.read(BLOCK_SIZE)
while data:
self._sock.sendto(data, (GROUP, self._port))
data = self._file.read(BLOCK_SIZE)
t1 = time.time()
throughput = self._filesize / (BLOCK_SIZE * 32) / (t1 - t0)
print("Throughput: %dfps" % (throughput))
self.close()
def close(self):
self._sock.close()
self._file.close()
class K2Simulator:
def __init__(self, gtg_path):
self._gtg_path = gtg_path
self._files = self._get_files()
self._sectors = [
SectorRunner(file, port) for file, port in zip(self._files, PORTS)
]
self._threads = [
threading.Thread(target=sector.run) for sector in self._sectors
]
def _get_files(self):
path, ext = os.path.splitext(self._gtg_path)
pattern = "%s*.bin" % path
files = glob.glob(pattern)
return files
def run(self):
try:
for t in self._threads:
t.start()
finally:
for t in self._threads:
t.join()
@click.command()
@click.argument("gtg_path", type=click.Path(exists=True))
def main(gtg_path):
sim = K2Simulator(gtg_path)
sim.run()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment