Skip to content

Instantly share code, notes, and snippets.

@teeks99
Created March 10, 2023 18:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save teeks99/9213d80fa61532f1d30ee6a397cd5f0a to your computer and use it in GitHub Desktop.
Save teeks99/9213d80fa61532f1d30ee6a397cd5f0a to your computer and use it in GitHub Desktop.
Multicast test send and receive
import socket
import struct
import time
test_groups = ['224.1.1.1', '239.255.255.255']
test_interfaces = ['127.0.0.1', '192.168.42.42']
port = 15007
receivers = []
for rcv_interface in test_interfaces:
rcv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
rcv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
rcv.bind((rcv_interface, port))
for rcv_group in test_groups:
mreq = struct.pack(
"4s4s", socket.inet_aton(rcv_group),
socket.inet_aton(rcv_interface))
rcv.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
rcv.settimeout(0)
receivers.append(rcv)
def receive():
received = False
for rcv in receivers:
try:
data, address = rcv.recvfrom(10240)
received = True
print(f"From: {address}, received: {data}")
except socket.timeout:
pass
except BlockingIOError:
pass
if not received:
print("No data received")
def send(group, interface):
#print(f"Test from {group} {interface}")
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1)
sock.setsockopt(
socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(interface))
mreq = struct.pack(
"4s4s", socket.inet_aton(group),
socket.inet_aton(interface))
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
data = f"multicast send - group: {group} interface: {interface}"
data = data.encode()
try:
sock.sendto(data, (group, port))
except socket.timeout:
pass
except OSError as e:
print(e)
receive()
for test_group in test_groups:
for test_interface in test_interfaces:
send(test_group, test_interface)
time.sleep(0.5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment