Skip to content

Instantly share code, notes, and snippets.

@pavel-a
Last active December 23, 2019 20:12
Show Gist options
  • Save pavel-a/da9f87da5aae5b3956e67ca881e128dc to your computer and use it in GitHub Desktop.
Save pavel-a/da9f87da5aae5b3956e67ca881e128dc to your computer and use it in GitHub Desktop.
Simple TCP, UDP Echo clients in Python. Generate random messages of random length. To test some bare metal embedded thingy.
#!/usr/bin/python3
"""
Echo client / TCP
Parameters: set below: server IP address, port=7, timeouts
Message random generated, binary; length and data random : see make_random_msg()
NOTE: First message takes very long time, don't set REPLY_TIMEOUT too short!
pa01 23-dec-2019
"""
import socket
import sys, time
import random
PORT = 7
SERVER = "10.1.1.21"
CONN_TIMEOUT = 1.0 # connect timeout, sec
REPLY_TIMEOUT = 0.5 # device reply timeout, sec
N_ITER = 10 # Number of repeats
print_OK = False # If print OK results. False=print only errors
def make_random_msg():
# Create random echo test message
nbytes = random.randrange(8, 300)
x = bytearray( [ random.randrange(256) for i in range(nbytes) ] )
return x
print("TCP echo client -> IP: %s port: %u, repeat: %u" % (SERVER,PORT, N_ITER))
# Create TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connect the socket to the port where the server is listening
server_address = (SERVER, PORT)
sock.settimeout(CONN_TIMEOUT)
print('Connecting to %s port %s' % server_address)
try:
sock.connect(server_address)
except OSError as e:
print("Connect failed: %r" % e)
max_delay, min_delay = 0.0, 0.0 # ns
first_delay = 0.0
num_errs = 0
for iter in range(1, N_ITER+1):
try:
# Send data & wait reply
sock.settimeout(REPLY_TIMEOUT)
message = make_random_msg()
amount_expected = len(message)
amount_received = 0
reply = bytearray()
#print('sending "%r"' % message)
sock.sendall(message)
# Wait for the response
time_sent = time.monotonic_ns()
while amount_received < amount_expected:
frag = sock.recv(1024) # arg=buffer size, power of 2: 1024
amount_received += len(frag)
reply += frag
#print('Received "%r"' % reply)
delay = (time.monotonic_ns() - time_sent) # reply latency, ns
# First message takes long time
if iter > 1:
max_delay = max(max_delay, delay)
min_delay = min(min_delay, delay)
else:
first_delay = delay
if reply != message:
print("Echo #%u error: data mismatch" % iter)
num_errs += 1
elif print_OK:
print("Reply #%u OK, delay=%.1f ms" % (iter, (delay*1.0e-6)) )
except OSError as e:
print("RX #%u failed: %r" % (iter, e))
num_errs += 1
except Exception as e:
print("Unhandled exception:", e)
print('Closing socket')
sock.close()
# print stats
print("Errors: %u" % num_errs)
print("1st.delay= %.1f Max.delay= %.1f Min.delay= %.1f ms" % (first_delay*1.0e-6, max_delay*1.0e-6, min_delay*1.0e-6))
#!/usr/bin/python3
"""
Echo client / UDP
Parameters: set below: server IP address, port=7, timeouts
Message random generated, binary; length and data random : see make_random_msg()
pa01 23-dec-2019
"""
import socket
import sys, time
import random
PORT = 7
SERVER = "10.1.1.21"
#SERVER,PORT='localhost',10000
REPLY_TIMEOUT = 0.3 # device reply timeout, sec
N_ITER = 200 # Number of repeats
print_OK = False # If print OK results. False=print only errors
def make_random_msg():
# Create random echo test message
nbytes = random.randrange(8, 300)
x = bytearray( [ random.randrange(256) for i in range(nbytes) ] )
return x
print("UDP echo client random -> IP: %s port: %u, repeat: %u" % (SERVER,PORT, N_ITER))
# Create UDP socket
server_address = (SERVER, PORT)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
max_delay, min_delay = 0.0, 0.0 # ns
first_delay = 0.0
num_errs = 0
sock.settimeout(REPLY_TIMEOUT)
time_start = time.monotonic()
for iter in range(1, N_ITER+1):
#time.sleep(1.0)
try:
# Send data & wait reply
#message = b'This is the message. It will be repeated.'
message = make_random_msg()
amount_expected = len(message)
amount_received = 0
#print('\n\nSending len=%u "%r"' % (amount_expected, message))
sock.sendto(message, server_address)
# Wait for the response
time_sent = time.monotonic_ns()
reply, host_from = sock.recvfrom(2048) # arg=buffer size, power of 2:
amount_received = len(reply)
#print('\nReceived "%r"' % reply)
#print("From:", host_from)
delay = (time.monotonic_ns() - time_sent) # reply latency, ns
if iter > 1:
max_delay = max(max_delay, delay)
min_delay = min(min_delay, delay)
else:
first_delay = delay
if reply != message:
print("Echo #%u error: data mismatch len=%u" % (iter,amount_received) )
num_errs += 1
elif print_OK:
print("Reply #%u OK, delay=%.1f ms" % (iter, (delay*1.0e-6)) )
except OSError as e:
print("\nRX #%u failed: %r\n\n" % (iter, e))
num_errs += 1
except Exception as e:
print("Unhandled exception:", e)
time_total = time.monotonic() - time_start # seconds
print('Closing socket')
sock.close()
# print stats
print("Errors: %u" % num_errs)
print("1st.delay= %.1f Max.delay= %.1f Min.delay= %.1f ms" % (first_delay*1.0e-6, max_delay*1.0e-6, min_delay*1.0e-6))
print("Total time, ms=", int(time_total * 1000))
#!/usr/bin/python3
# Echo UDP server for PC - reference
import socket
import sys
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Bind the socket to the port
server_address = ('localhost', 10000)
print('UDP echo server on %s port %s' % server_address)
sock.bind(server_address)
while True:
data, address = sock.recvfrom(4096)
print('Received %s bytes from %s' % (len(data), address))
print(data)
if data:
sent = sock.sendto(data, address)
print('sent %s bytes back to %s' % (sent, address))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment