Last active
October 20, 2023 15:35
-
-
Save eruffaldi/b0154187e2ee19d3ae0dc4e39d9cbe4f to your computer and use it in GitHub Desktop.
Simple UDP packet recorder and playback
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Emanuele Ruffaldi 2018 | |
import argparse,socket,socket,json,time | |
def str2bool(v): | |
if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
return True | |
elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
return False | |
else: | |
raise argparse.ArgumentTypeError('Boolean value expected.') | |
def timedplayer(filename,timescale,loop): | |
while True: | |
with open(filename,"rb") as of: | |
t0r = None | |
t0v = None | |
while True: | |
x = of.readline().strip() | |
if len(x) == 0: | |
break | |
inc = json.loads(x) | |
tvs = inc["absolutetime"] | |
tr = time.time() | |
if t0v is not None: | |
tv = (tr-t0r)*timescale + t0v # virtual time | |
dtv = tvs-tv # delta of virtual time and the stored one | |
if dtv > 0: | |
time.sleep(dtv) | |
else: | |
t0r = tr | |
t0v = tvs | |
# string is unicode we encode it as latin1 8-bit | |
do = inc["data"].encode("latin1") | |
yield do,tr,tvs | |
if args.loop: | |
break | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='UDP data player') | |
parser.add_argument("input",help="input file") | |
parser.add_argument('--port',default=9763,type=int) | |
parser.add_argument('--host',default="localhost") | |
parser.add_argument('--verbose',action="store_true") | |
parser.add_argument('--loop',type=str2bool, nargs='?', | |
const=True, default=True, | |
help="Activate loop mode.") | |
parser.add_argument('--timescale',default=1.0,type=float,help="> 1.0 for real faster") | |
args = parser.parse_args() | |
sock = socket.socket(socket.AF_INET, # Internet | |
socket.SOCK_DGRAM) # UDP | |
for data,realtime,virtualtime in timedplayer(args.input,args.timescale,args.loop): | |
sock.sendto(data, (args.host, args.port)) | |
if args.verbose: | |
print len(data),virtualtime | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Test Record | |
# nc 127.0.0.1 9763 -u | |
# python udp_recorder.py x | |
# cat x | |
# nc 127.0.0.1 -ul 9863 | |
# python udp_player.py x | |
# Emanuele Ruffaldi 2018 | |
import argparse,socket,socket,json,time | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='UDP data recorder') | |
parser.add_argument("output",help="output file") | |
parser.add_argument('--port',default=9763,type=int) | |
parser.add_argument('--host',default="localhost") | |
parser.add_argument('--source',default="",help="limit to source IP address") | |
args = parser.parse_args() | |
sock = socket.socket(socket.AF_INET, # Internet | |
socket.SOCK_DGRAM) # UDP | |
sock.bind((args.host, args.port)) | |
with open(args.output,"wb") as of: | |
t0 = time.time() | |
while True: | |
data, addr = sock.recvfrom(32768*2) # buffer size is 1024 bytes | |
if args.source != "" and addr != args.source: | |
continue | |
abstime = time.time() | |
dt = abstime-t0 | |
print dt,len(data),addr | |
out = dict(absolutetime=abstime,reltime_seconds=dt,data=data,addr=addr) | |
json.dump(out,of) | |
of.write("\n") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment