Last active
December 23, 2019 20:12
-
-
Save pavel-a/da9f87da5aae5b3956e67ca881e128dc to your computer and use it in GitHub Desktop.
Simple TCP, UDP Echo clients in Python. Generate random messages of random length. To test some bare metal embedded thingy.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
Echo client / TCP | |
Parameters: set below: server IP address, port=7, timeouts | |
Message random generated, binary; length and data random : see make_random_msg() | |
NOTE: First message takes very long time, don't set REPLY_TIMEOUT too short! | |
pa01 23-dec-2019 | |
""" | |
import socket | |
import sys, time | |
import random | |
PORT = 7 | |
SERVER = "10.1.1.21" | |
CONN_TIMEOUT = 1.0 # connect timeout, sec | |
REPLY_TIMEOUT = 0.5 # device reply timeout, sec | |
N_ITER = 10 # Number of repeats | |
print_OK = False # If print OK results. False=print only errors | |
def make_random_msg(): | |
# Create random echo test message | |
nbytes = random.randrange(8, 300) | |
x = bytearray( [ random.randrange(256) for i in range(nbytes) ] ) | |
return x | |
print("TCP echo client -> IP: %s port: %u, repeat: %u" % (SERVER,PORT, N_ITER)) | |
# Create TCP socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
# Connect the socket to the port where the server is listening | |
server_address = (SERVER, PORT) | |
sock.settimeout(CONN_TIMEOUT) | |
print('Connecting to %s port %s' % server_address) | |
try: | |
sock.connect(server_address) | |
except OSError as e: | |
print("Connect failed: %r" % e) | |
max_delay, min_delay = 0.0, 0.0 # ns | |
first_delay = 0.0 | |
num_errs = 0 | |
for iter in range(1, N_ITER+1): | |
try: | |
# Send data & wait reply | |
sock.settimeout(REPLY_TIMEOUT) | |
message = make_random_msg() | |
amount_expected = len(message) | |
amount_received = 0 | |
reply = bytearray() | |
#print('sending "%r"' % message) | |
sock.sendall(message) | |
# Wait for the response | |
time_sent = time.monotonic_ns() | |
while amount_received < amount_expected: | |
frag = sock.recv(1024) # arg=buffer size, power of 2: 1024 | |
amount_received += len(frag) | |
reply += frag | |
#print('Received "%r"' % reply) | |
delay = (time.monotonic_ns() - time_sent) # reply latency, ns | |
# First message takes long time | |
if iter > 1: | |
max_delay = max(max_delay, delay) | |
min_delay = min(min_delay, delay) | |
else: | |
first_delay = delay | |
if reply != message: | |
print("Echo #%u error: data mismatch" % iter) | |
num_errs += 1 | |
elif print_OK: | |
print("Reply #%u OK, delay=%.1f ms" % (iter, (delay*1.0e-6)) ) | |
except OSError as e: | |
print("RX #%u failed: %r" % (iter, e)) | |
num_errs += 1 | |
except Exception as e: | |
print("Unhandled exception:", e) | |
print('Closing socket') | |
sock.close() | |
# print stats | |
print("Errors: %u" % num_errs) | |
print("1st.delay= %.1f Max.delay= %.1f Min.delay= %.1f ms" % (first_delay*1.0e-6, max_delay*1.0e-6, min_delay*1.0e-6)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
""" | |
Echo client / UDP | |
Parameters: set below: server IP address, port=7, timeouts | |
Message random generated, binary; length and data random : see make_random_msg() | |
pa01 23-dec-2019 | |
""" | |
import socket | |
import sys, time | |
import random | |
PORT = 7 | |
SERVER = "10.1.1.21" | |
#SERVER,PORT='localhost',10000 | |
REPLY_TIMEOUT = 0.3 # device reply timeout, sec | |
N_ITER = 200 # Number of repeats | |
print_OK = False # If print OK results. False=print only errors | |
def make_random_msg(): | |
# Create random echo test message | |
nbytes = random.randrange(8, 300) | |
x = bytearray( [ random.randrange(256) for i in range(nbytes) ] ) | |
return x | |
print("UDP echo client random -> IP: %s port: %u, repeat: %u" % (SERVER,PORT, N_ITER)) | |
# Create UDP socket | |
server_address = (SERVER, PORT) | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
max_delay, min_delay = 0.0, 0.0 # ns | |
first_delay = 0.0 | |
num_errs = 0 | |
sock.settimeout(REPLY_TIMEOUT) | |
time_start = time.monotonic() | |
for iter in range(1, N_ITER+1): | |
#time.sleep(1.0) | |
try: | |
# Send data & wait reply | |
#message = b'This is the message. It will be repeated.' | |
message = make_random_msg() | |
amount_expected = len(message) | |
amount_received = 0 | |
#print('\n\nSending len=%u "%r"' % (amount_expected, message)) | |
sock.sendto(message, server_address) | |
# Wait for the response | |
time_sent = time.monotonic_ns() | |
reply, host_from = sock.recvfrom(2048) # arg=buffer size, power of 2: | |
amount_received = len(reply) | |
#print('\nReceived "%r"' % reply) | |
#print("From:", host_from) | |
delay = (time.monotonic_ns() - time_sent) # reply latency, ns | |
if iter > 1: | |
max_delay = max(max_delay, delay) | |
min_delay = min(min_delay, delay) | |
else: | |
first_delay = delay | |
if reply != message: | |
print("Echo #%u error: data mismatch len=%u" % (iter,amount_received) ) | |
num_errs += 1 | |
elif print_OK: | |
print("Reply #%u OK, delay=%.1f ms" % (iter, (delay*1.0e-6)) ) | |
except OSError as e: | |
print("\nRX #%u failed: %r\n\n" % (iter, e)) | |
num_errs += 1 | |
except Exception as e: | |
print("Unhandled exception:", e) | |
time_total = time.monotonic() - time_start # seconds | |
print('Closing socket') | |
sock.close() | |
# print stats | |
print("Errors: %u" % num_errs) | |
print("1st.delay= %.1f Max.delay= %.1f Min.delay= %.1f ms" % (first_delay*1.0e-6, max_delay*1.0e-6, min_delay*1.0e-6)) | |
print("Total time, ms=", int(time_total * 1000)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Echo UDP server for PC - reference | |
import socket | |
import sys | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# Bind the socket to the port | |
server_address = ('localhost', 10000) | |
print('UDP echo server on %s port %s' % server_address) | |
sock.bind(server_address) | |
while True: | |
data, address = sock.recvfrom(4096) | |
print('Received %s bytes from %s' % (len(data), address)) | |
print(data) | |
if data: | |
sent = sock.sendto(data, address) | |
print('sent %s bytes back to %s' % (sent, address)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment