Skip to content

Instantly share code, notes, and snippets.

@taikedz
Last active March 6, 2023 00:00
Show Gist options
  • Save taikedz/46fab0ae3b0fc02a96e91bd4a9b05df4 to your computer and use it in GitHub Desktop.
Save taikedz/46fab0ae3b0fc02a96e91bd4a9b05df4 to your computer and use it in GitHub Desktop.
Little UDP test

UDP test

(Note: taken further in a proper repo: https://github.com/taikedz/udp_test)

This is a quick thing I put together for testing UDP packet sending behaviours.

Interestingly, if a message is delayed by the server and sent after the client times out, the message is still received client side. An actual client would need to proactively manage response validity.

I did try randomly generating multiple identical repsonses from the server, that only caused confusion for this simple client. Writing a more complex client capable of recognising and discarding duplicate server packets, and tracking those for which no response was received, would become necessary.

NAT

The other exercise I wanted to attempt was NAT traversal. To mock this out, I run the client from within a docker container:

The Dockerfile (replacing <HOST IP> with the IP of my machine):

FROM ubuntu:22.04
RUN apt-get update && apt-get install -y python3 busybox
ADD udp_test.py /udp_test.py

CMD python3 /udp_test.py client <HOST IP>

The test:

docker build -t nattest:latest .
docker run -d --rm --name=nattest_container nattest:latest
python3 udp_test.py server
docker stop nattest_container

The server responds successfully to the client (with the appropriate firewall port opened!).

import socket
import random
import time
import sys
def server(port):
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print(f"Listening on port {port}")
server_socket.bind(('0.0.0.0', port))
while True:
message, address = server_socket.recvfrom(1024)
message = str(message, 'utf-8')
delay = 0.1 * random.randint(1,14)
print(f"Response in {delay:.2f}s | {message} from {address}")
time.sleep(delay)
response = bytes(f"I got your '{message}'", 'utf-8')
server_socket.sendto(response, address)
def client(host, port):
TICK = 1/5.0
emitting = True
messages = ["Hi", "hello", "ciao", "aloha", "salutations", "greetings", "yo"]
print(f"Using {host}:{port}")
addr = (host, port)
client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.settimeout(1)
i = 0
while True:
try:
if emitting:
i += 1
message = bytes(f"{i}: {random.choice(messages)}", 'utf-8')
client_socket.sendto(message, addr)
try:
data, server = client_socket.recvfrom(1024)
print(f"{data} returned from {server}")
except TimeoutError:
if emitting:
print(f"(Dropped {message})")
if emitting:
time.sleep(TICK)
except KeyboardInterrupt:
if emitting:
print("\nStopped emitting new messages. Here are the remainder responses:\n")
emitting = False
else:
raise
def main():
try:
if not sys.argv[1:]:
print("Specify 'client' or 'server'")
elif sys.argv[1] == "server":
port = int(sys.argv[2]) if sys.argv[2:] else 12000
server(port)
elif sys.argv[1] == "client":
host = sys.argv[2] if sys.argv[2:] else "127.0.0.1"
port = int(sys.argv[3]) if sys.argv[3:] else 12000
client(host, port)
else:
print("Unknown mode")
except KeyboardInterrupt:
print("\nQuit.")
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment