Skip to content

Instantly share code, notes, and snippets.

@claws
Last active July 16, 2022 13:30
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save claws/d4076b4b155695844d54 to your computer and use it in GitHub Desktop.
Save claws/d4076b4b155695844d54 to your computer and use it in GitHub Desktop.
Python asyncio SO_REUSEPORT OSX issue

Python asyncio SO_REUSEPORT OSX issue

Overview

My simple service discovery mechanism is built upon Python 3.4 and the asyncio module. It uses a UDP socket bound to a specified discovery port to allow applications to discover network services. System applications running on the same node bind to the same discovery port in order to receive discovery messages broadcast onto the network.

It is not working on OSX. The current asyncio implementation does not provide a convenient developer access point for setting the SO_REUSEPORT socket option prior to the call to bind.

The implementation of asyncio's create_datagram_endpoint method sets the SO_REUSEADDR socket option automatically prior to binding. On Linux this allows two (or more) UDP sockets to bind to exactly the same address and port combination. However, on OSX the SO_REUSEPORT socket option must also be set in order to provide the same functionality.

The patch.py script in this gist attempts to implement the solution suggested in issue 23972 by overriding the event loop's create_datagram_endpoint method with a modified version. This modified method can set extra socket options based on some new method arguments. This implementation allows my simple service discovery mechanism to work on OSX too.

This issue is mentioned here too.

Also see here for a good overview of the socket options being used in this implementation.

Demo

The demo script, demo.py attempts to create two UDP sockets that both bind to the same port. When it is run on OSX with the standard event loop the demo is expected to fail when the second socket is being bound. When it is run on OSX with the patched event loop, the demo is expected to complete. Once the sockets are established they will both send a message to the broadcast address. When both sockets receive two messages (their own and the one from the other socket) they fire a wait_done Future which finishes the demo script.

Run the demo using the standard event loop:

$ python3 demo.py

When run on Linux this is expected to complete but on OSX it is expected to fail with a OSError: [Errno 48] Address already in use.

Run the demo using the workaround event loop patched in:

$ python3 demo.py --workaround

When run on Linux and OSX this is expected to complete.

'''
This example tests the event loop patch implementation by attempting to create
two UDP sockets that both bind to the same port.
When run on OSX with the standard event loop the demo is expected to fail when
the second socket is attempting to bind to the same address/port combination.
When run on OSX with the patched event loop the demo is expected to complete.
Once the sockets are established they will both send a message to the
broadcast address. When both sockets receive two messages (their own and the
one from the other socket) they fire their wait_done Future.
Run the demo using the standard event loop:
.. code-block:: python3
python3 demo.py
When run on Linux this is expected to complete but on OSX it is expected to
fail with a OSError: [Errno 48] Address already in use.
Run the demo using the workaround event loop patched in:
.. code-block:: python3
$ python3 demo.py --workaround
When run on Linux and OSX this is expected to complete.
'''
import argparse
import asyncio
import signal
import socket
class ExampleProtocol(asyncio.DatagramProtocol):
'''
A trivial protocol that each endpoint will use to:
- print debug information; and,
- fire the wait_done Future indicating that the test is complete
after it receives two messages (i.e. b"Hello" and b"World").
'''
def __init__(self, name):
self.name = name
self.wait_ready = asyncio.Future()
self.wait_done = asyncio.Future()
self.wait_closed = asyncio.Future()
self.expected_msgs = set([b'Hello', b'World'])
def connection_made(self, transport):
print('{} connection_made'.format(self.name))
self.transport = transport
self.wait_ready.set_result(True)
def connection_lost(self, exc):
print('{} connection_lost{}'.format(
self.name, ': {}'.format(exc) if exc else ''))
self.wait_closed.set_result(True)
def datagram_received(self, data, addr):
print('{} datagram_received from {}: {}'.format(self.name, addr, data))
self.expected_msgs.discard(data)
if not self.expected_msgs:
self.wait_done.set_result(True)
def sendto(self, data, address):
print('{} sending {} to {}'.format(self.name, data, address))
self.transport.sendto(data, address)
@asyncio.coroutine
def demo(loop, args):
prot1 = ExampleProtocol('prot1')
if args.workaround:
_transport1, _p1 = yield from loop.create_datagram_endpoint(
lambda: prot1, local_addr=(args.address, args.port),
reuse_address=True, reuse_port=True,
allow_broadcast=True)
else:
_transport1, _p1 = yield from loop.create_datagram_endpoint(
lambda: prot1, local_addr=(args.address, args.port))
# Bind another socket to the same address/port combination. This action
# is expected to trigger the issue when run on OSX platform using the
# standard event loop. When the workaround is used this should succeed.
prot2 = ExampleProtocol('prot2')
if args.workaround:
_transport2, _p2 = yield from loop.create_datagram_endpoint(
lambda: prot2, local_addr=(args.address, args.port),
reuse_address=True, reuse_port=True,
allow_broadcast=True)
else:
try:
_transport2, _p2 = yield from loop.create_datagram_endpoint(
lambda: prot2, local_addr=(args.address, args.port))
except Exception as exc:
_transport1.close()
raise exc from exc
yield from asyncio.wait([prot1.wait_ready, prot2.wait_ready])
if not args.workaround:
# When using the standard (non-patched) loop the broadcast socket
# option must be set manually.
prot1.transport.get_extra_info('socket').setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, True)
prot2.transport.get_extra_info('socket').setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, True)
# Send a message to the broadcast address. All messages should be
# received by each endpoint.
target_addr = args.broadcast_addr
prot1.sendto(b'Hello', (target_addr, args.port))
prot2.sendto(b'World', (target_addr, args.port))
yield from asyncio.wait([prot1.wait_done, prot2.wait_done])
prot1.transport.close()
prot2.transport.close()
yield from asyncio.wait([prot1.wait_closed, prot2.wait_closed])
print('done')
ARGS = argparse.ArgumentParser('asyncio patch demo')
ARGS.add_argument(
'--workaround', action='store_true', default=False,
help="Patch the loop before running the demo")
ARGS.add_argument(
'--address', type=str, default='0.0.0.0',
help="The address to use for the demonstration")
ARGS.add_argument(
'--port', type=int, default=7000,
help="The port to use for the demonstration")
if __name__ == '__main__':
ip_addr = socket.gethostbyname(socket.gethostname())
# crude guess of the network broadcast address
subnet_broadcast_addr = '{}.255'.format('.'.join(ip_addr.split('.')[:-1]))
args = ARGS.parse_args()
args.broadcast_addr = subnet_broadcast_addr
if args.workaround:
import patch
(patch, ) # silence unused import warning
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, loop.stop)
try:
loop.run_until_complete(demo(loop, args))
finally:
loop.close()
'''
This module attempts to implement the solution suggested in:
https://bugs.python.org/issue23972
'''
import asyncio
import collections
import socket
import sys
from asyncio import futures
from asyncio.log import logger
version = sys.version_info
PY_3_5_plus = version > (3, 5, )
class PatchedLoop(asyncio.SelectorEventLoop):
'''
The create_datagram_endpoint method below is a modified version of the
original in Python 3.6.0.a0 asyncio/base_events.py. However my local
version of Python is 3.4.1 so the later (3.5+) additions are wrapped in
a PY_3_5_plus check.
This version can set the SO_REUSEPORT socket option prior to the call to
bind. This capability is controlled by some new keyword arguments (e.g.
reuse_address, reuse_port, and allow_broadcast).
'''
@asyncio.coroutine
def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
reuse_address=None, reuse_port=None,
allow_broadcast=None, sock=None):
"""Create datagram connection."""
if sock is None:
if not (local_addr or remote_addr):
if family == 0:
raise ValueError('unexpected address family')
addr_pairs_info = (((family, proto), (None, None)),)
else:
# join address by (family, protocol)
addr_infos = collections.OrderedDict()
for idx, addr in ((0, local_addr), (1, remote_addr)):
if addr is not None:
assert isinstance(addr, tuple) and len(addr) == 2, (
'2-tuple is expected')
infos = yield from self.getaddrinfo(
*addr, family=family, type=socket.SOCK_DGRAM,
proto=proto, flags=flags)
if not infos:
raise OSError('getaddrinfo() returned empty list')
for fam, _, pro, _, address in infos:
key = (fam, pro)
if key not in addr_infos:
addr_infos[key] = [None, None]
addr_infos[key][idx] = address
# each addr has to have info for each (family, proto) pair
addr_pairs_info = [
(key, addr_pair) for key, addr_pair in addr_infos.items()
if not ((local_addr and addr_pair[0] is None) or
(remote_addr and addr_pair[1] is None))]
if not addr_pairs_info:
raise ValueError('can not get address information')
exceptions = []
for ((family, proto),
(local_address, remote_address)) in addr_pairs_info:
sock = None
r_addr = None
try:
sock = socket.socket(
family=family, type=socket.SOCK_DGRAM, proto=proto)
if reuse_address:
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if reuse_port:
if 'SO_REUSEPORT' in vars(socket):
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if allow_broadcast:
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setblocking(False)
if local_addr:
sock.bind(local_address)
if remote_addr:
yield from self.sock_connect(sock, remote_address)
r_addr = remote_address
except OSError as exc:
if sock is not None:
sock.close()
exceptions.append(exc)
except:
if sock is not None:
sock.close()
raise
else:
break
else:
raise exceptions[0]
else:
if local_addr or remote_addr:
raise ValueError(
'local_addr/remote_addr and sock can not be specified '
'at the same time')
protocol = protocol_factory()
if PY_3_5_plus:
waiter = futures.Future(loop=self)
transport = self._make_datagram_transport(sock, protocol, r_addr,
waiter) # python 3.6
else:
transport = self._make_datagram_transport(sock, protocol, r_addr)
if self._debug:
if local_addr:
logger.info("Datagram endpoint local_addr=%r remote_addr=%r "
"created: (%r, %r)",
local_addr, remote_addr, transport, protocol)
else:
logger.debug("Datagram endpoint remote_addr=%r created: "
"(%r, %r)",
remote_addr, transport, protocol)
if PY_3_5_plus:
try:
yield from waiter
except:
transport.close()
raise
return transport, protocol
class PatchedEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
_loop_factory = PatchedLoop
# Now patch asyncio so we can create UDP sockets that are capable of binding
# to the same port on OSX.
asyncio.SelectorEventLoop = PatchedLoop
# Explicitly set the event loop policy to ensure the patched event
# loop is used. This patch file should really be imported before any
# calls to asyncio.get_event_loop().
#
asyncio.DefaultEventLoopPolicy = PatchedEventLoopPolicy
asyncio.set_event_loop_policy(PatchedEventLoopPolicy())
@sonvirgo
Copy link

sonvirgo commented Mar 12, 2018

experience same issue MiniDLNA and GmediaRender-Resurect on High Sierra
But I can run multiple instance of gmediarender at same time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment