Skip to content

Instantly share code, notes, and snippets.

@mmmunk

mmmunk/receive_commands.py

Last active Jan 12, 2021
Embed
What would you like to do?
Simple UDP packet commands from one program to another
import sys
import socket
import random
# Create and bind UDP socket
bind_ip = sys.argv[1]
bind_port = int(sys.argv[2])
msgsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
msgsock.bind((bind_ip, bind_port))
print('Waiting for commands on', bind_ip, 'port', bind_port)
# Endless loop
while True:
# Blocking wait to receive UDP packet
data, address = msgsock.recvfrom(2048)
print('Received', data, 'from', address)
# Check size
if len(data) != 20:
print('Packet size is wrong for a command')
continue
# Check type
packet_type = int.from_bytes(data[0:4], 'little', signed=False)
print('Packet type is', packet_type)
if packet_type != 1:
print('Packet type is not a command')
continue
# Extract packet data
command_number = int.from_bytes(data[4:8], 'little', signed=False)
unique_id = int.from_bytes(data[8:12], 'little', signed=False)
argument = int.from_bytes(data[12:20], 'little', signed=True)
print('Command number =', command_number, '- Unique ID =', unique_id, '- argument =', argument)
# Build result (answer) including same command number and unique id
result_data = [int(2).to_bytes(4, 'little', signed=False)]
result_data.append(command_number.to_bytes(4, 'little', signed=False))
result_data.append(unique_id.to_bytes(4, 'little', signed=False))
todays_result = random.choice([(-1, 'Error'), (0, 'OK'), (1, 'Yes'), (2, 'No'), (3, 'Maybe or not')])
result_data.append(todays_result[0].to_bytes(4, 'little', signed=True))
result_data.append(len(todays_result[1]).to_bytes(1, 'little', signed=False))
result_data.append(bytes(todays_result[1], 'ascii'))
# Send result back to sender
result_data = b''.join(result_data)
print('Sent', result_data, 'to', address)
msgsock.sendto(result_data, address)
import sys
import socket
import random
# Command line arguments
command_ip = sys.argv[1]
command_port = int(sys.argv[2])
command_number = int(sys.argv[3])
argument = int(sys.argv[4])
# Build command data structure
command = [int(1).to_bytes(4, 'little', signed=False)]
command.append(command_number.to_bytes(4, 'little', signed=False))
unique_id = random.randrange(4200000000)
command.append(unique_id.to_bytes(4, 'little', signed=False))
command.append(argument.to_bytes(8, 'little', signed=True))
# Send command
command = b''.join(command)
print('Sent', command, 'to', command_ip, 'port', command_port)
msgsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
msgsock.sendto(command, (command_ip, command_port))
# Wait for result (answer)
result, address = msgsock.recvfrom(2048)
print('Received', result, 'from', address)
# Check sender port
if address[1] != command_port:
print('Packet is not sent from command port')
exit()
# Check size
if len(result) < 17:
print('Packet size is wrong for a result')
exit()
# Check packet type
if int.from_bytes(result[0:4], 'little', signed=False) != 2:
print('Packet type is not a result')
exit()
# Check command number
if int.from_bytes(result[4:8], 'little', signed=False) != command_number:
print('Command numbers are not identical')
exit()
# Check unique ID
if int.from_bytes(result[8:12], 'little', signed=False) != unique_id:
print('Unique IDs are not identical')
exit()
# Extract result data
result_code = int.from_bytes(result[12:16], 'little', signed=True)
result_str_len = result[16]
result_str = result[17:17+result_str_len].decode('cp1252', 'ignore')
print('Result code:', result_code)
print('Result string length:', result_str_len)
print('Result string:', result_str)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment