Skip to content

Instantly share code, notes, and snippets.

@eruffaldi
Last active Oct 15, 2021
Embed
What would you like to do?
Simple UDP packet recorder and playback
# Emanuele Ruffaldi 2018
import argparse,socket,socket,json,time
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
def timedplayer(filename,timescale,loop):
while True:
with open(filename,"rb") as of:
t0r = None
t0v = None
while True:
x = of.readline().strip()
if len(x) == 0:
break
inc = json.loads(x)
tvs = inc["absolutetime"]
tr = time.time()
if t0v is not None:
tv = (tr-t0r)*timescale + t0v # virtual time
dtv = tvs-tv # delta of virtual time and the stored one
if dtv > 0:
time.sleep(dtv)
else:
t0r = tr
t0v = tvs
# string is unicode we encode it as latin1 8-bit
do = inc["data"].encode("latin1")
yield do,tr,tvs
if args.loop:
break
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='UDP data player')
parser.add_argument("input",help="input file")
parser.add_argument('--port',default=9763,type=int)
parser.add_argument('--host',default="localhost")
parser.add_argument('--verbose',action="store_true")
parser.add_argument('--loop',type=str2bool, nargs='?',
const=True, default=True,
help="Activate loop mode.")
parser.add_argument('--timescale',default=1.0,type=float,help="> 1.0 for real faster")
args = parser.parse_args()
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
for data,realtime,virtualtime in timedplayer(args.input,args.timescale,args.loop):
sock.sendto(data, (args.host, args.port))
if args.verbose:
print len(data),virtualtime
# Test Record
# nc 127.0.0.1 9763 -u
# python udp_recorder.py x
# cat x
# nc 127.0.0.1 -ul 9863
# python udp_player.py x
# Emanuele Ruffaldi 2018
import argparse,socket,socket,json,time
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='UDP data recorder')
parser.add_argument("output",help="output file")
parser.add_argument('--port',default=9763,type=int)
parser.add_argument('--host',default="localhost")
parser.add_argument('--source',default="",help="limit to source IP address")
args = parser.parse_args()
sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
sock.bind((args.host, args.port))
with open(args.output,"wb") as of:
t0 = time.time()
while True:
data, addr = sock.recvfrom(32768*2) # buffer size is 1024 bytes
if args.source != "" and addr != args.source:
continue
abstime = time.time()
dt = abstime-t0
print dt,len(data),addr
out = dict(absolutetime=abstime,reltime_seconds=dt,data=data,addr=addr)
json.dump(out,of)
of.write("\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment