Skip to content

Instantly share code, notes, and snippets.

@maxfischer2781
Last active June 22, 2021 16:37
Show Gist options
  • Save maxfischer2781/38e7af51cc25b5a19d2b0f451a6b5dc3 to your computer and use it in GitHub Desktop.
Save maxfischer2781/38e7af51cc25b5a19d2b0f451a6b5dc3 to your computer and use it in GitHub Desktop.
Utility to record/replay UDP datagrams
#!/usr/bin/env python3
from typing import BinaryIO
import socket
import argparse
import pathlib
import struct
import time
CLI = argparse.ArgumentParser(
description=(
"Utility to record/replay UDP datagrams\n"
r" ___________ " "\n"
r" --\ |.UDP.Tape..| --\ " "\n"
r" --/ | ()___() | --/ " "\n"
r" |__/_____\__|"
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
CLI.add_argument(
"PORT",
help="Local UDP port where packets arrive",
type=int,
)
CLI.add_argument(
"STORE",
help="Path at which to store packets",
type=pathlib.Path,
)
CLI.add_argument(
"DIRECTION",
help="Whether to 'record' or 'replay' packets",
choices=["record", "replay"],
)
CLI.add_argument(
"-ff",
"--fast-forward",
help="Ignore delays during replay",
action="store_true",
)
CLI.add_argument(
"--duration",
help="How long to record/replay [default: inf]",
type=float,
default=float('inf'),
)
def main():
options = CLI.parse_args()
port, store, direction = options.PORT, options.STORE, options.DIRECTION
duration, fast_forward = options.duration, options.fast_forward
try:
if direction == "record":
record(port, store, duration)
elif direction == "replay":
replay(port, store, duration, fast_forward)
else:
raise ValueError(f"Unknown direction {direction:r}")
except KeyboardInterrupt:
pass
# Header for each message:
# - packet size (uint16)
# - packet delay (double)
HEADER = struct.Struct("!Hd")
def record(port: int, location: pathlib.Path, duration: float):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.bind(('localhost', port))
with location.open(mode="ab") as out_stream:
previous = time.monotonic()
until = previous + duration
while previous < until:
message = sock.recv(64*1024) # unlikely, but maximum size
current = time.monotonic()
# make sure there is only one write for both size + payload
out_stream.write(
HEADER.pack(len(message), current-previous) + message
)
previous = current
if __debug__:
print('record', message[:96], '...' if len(message) > 96 else '')
def read_messages(in_stream: BinaryIO):
header, header_size = HEADER, HEADER.size
try:
size, delay = header.unpack(in_stream.read(header_size))
delay = 0 # start immediately
for message in iter(lambda: in_stream.read(size), b''):
yield delay, message
size, delay = header.unpack(in_stream.read(header_size))
except struct.error: # raised when there are no further headers
pass
def replay(port: int, location: pathlib.Path, duration: float, fast_forward: bool):
address = ('localhost', port)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
with location.open(mode="rb") as in_stream:
total_delay = 0.0
for delay, message in read_messages(in_stream):
total_delay += delay
if total_delay > duration:
break
if not fast_forward:
time.sleep(delay)
if __debug__:
print('replay', len(message), message[:96], '...' if len(message) > 96 else '')
sock.sendto(message, address)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment