Skip to content

Instantly share code, notes, and snippets.

@rfinnie
Last active December 23, 2017 09:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rfinnie/dc7505babd5967c27798b4d0374a5619 to your computer and use it in GitHub Desktop.
Save rfinnie/dc7505babd5967c27798b4d0374a5619 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import socketserver
import socket
import time
import struct
class MCastUDPServer(socketserver.UDPServer):
# Usage:
# MCastUDPServer(([
# ('239.192.9.9', '0.0.0.0'),
# ('239.192.9.10', '127.0.0.1'),
# ], 12345), SampleHandler)
# MCastUDPServer(([
# ('ff08::909', 0),
# ('ff08::90a', 1),
# ], 12345), SampleHandler, ipv6=True)
# allow_reuse_port is used by default on supported platforms
def __init__(
self, server_address, RequestHandlerClass, bind_and_activate=True,
allow_reuse_address=True, allow_reuse_port=True, ipv6=False
):
mcast_subscriptions = server_address[0]
server_address = ('', server_address[1])
self.allow_reuse_address = allow_reuse_address
if ipv6:
self.address_family = socket.AF_INET6
socketserver.UDPServer.__init__(
self, server_address, RequestHandlerClass, bind_and_activate
)
if hasattr(socket, 'SO_REUSEPORT'):
self.socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT, int(bool(allow_reuse_port))
)
if ipv6:
for (mcast_addr, interface) in mcast_subscriptions:
self.socket.setsockopt(
socket.IPPROTO_IPV6,
socket.IPV6_JOIN_GROUP,
(
socket.inet_pton(self.address_family, mcast_addr) +
struct.pack('=l', interface)
),
)
else:
for (mcast_addr, bind_addr) in mcast_subscriptions:
self.socket.setsockopt(
socket.IPPROTO_IP,
socket.IP_ADD_MEMBERSHIP,
(
socket.inet_pton(self.address_family, mcast_addr) +
socket.inet_pton(self.address_family, bind_addr)
),
)
class SampleHandler(socketserver.BaseRequestHandler):
def handle(self):
data = self.request[0]
print('{} {} < {} bytes: {}'.format(
time.strftime('%c'),
self.client_address[0:2],
len(data),
data,
), flush=True)
if __name__ == '__main__':
server = MCastUDPServer(
([
('239.192.9.9', '0.0.0.0'), ('239.192.9.10', '127.0.0.1')
], 12345), SampleHandler
)
# server = MCastUDPServer(
# ([
# ('ff08::909', 0), ('ff08::90a', 1)
# ], 12345), SampleHandler, ipv6=True
# )
try:
server.serve_forever()
except KeyboardInterrupt:
server.server_close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment