Skip to content

Instantly share code, notes, and snippets.

@claws
Created May 4, 2016 12:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save claws/213d0f86a6ef9f6f92b82c5fcb97ae95 to your computer and use it in GitHub Desktop.
Save claws/213d0f86a6ef9f6f92b82c5fcb97ae95 to your computer and use it in GitHub Desktop.
Demonstrates a minor difference between the asyncio and uvloop create_server handling of the port argument.
#!/usr/bin/env python3.5
'''
This script can be used to demonstrate a minor difference
between acceptable port argument passed to the `create_server`
method on the `asyncio` and `uvloop` event loop.
When the script is run without any args it uses `asyncio` to
create a server socket using some candidate `port` tokens that
instruct the event loop to create a server on an ephemeral port.
..code-block:: console
$ create_server.py
Creating server using loop:asyncio, port_candidate:0
Server was bound on port: 51862
Creating server using loop:asyncio, port_candidate:None
Server was bound on port: 51863
When the script is run with the `--uvloop` arg it uses `uvloop`
to run the same checks.
..code-block:: console
$ create_server.py --uvloop
Creating server using loop:uvloop, port_candidate:0
Server was bound on port: 55882
Creating server using loop:uvloop, port_candidate:None
Failed to bind server using loop:uvloop, port_candidate:None
'''
import argparse
import asyncio
import socket
async def main(loop, loop_name):
'''
Bind a server to an ephemeral port using a range of
port candidate tokens.
'''
port_candidates = [0, None]
for port_candidate in port_candidates:
try:
# create server
print(
'Creating server using loop:{}, port_candidate:{}'.format(
loop_name, port_candidate))
listener = await loop.create_server(
asyncio.Protocol,
'127.0.0.1',
port_candidate,
family=socket.AF_INET)
# fetch details
host, port = listener.sockets[0].getsockname()
print('Server was bound on port:', port)
# close server
listener.close()
await listener.wait_closed()
except:
print(
'Failed to bind server using loop:{}, '
'port_candidate:{}'.format(
loop_name, port_candidate))
ARGS = argparse.ArgumentParser(description='Create server tester')
ARGS.add_argument(
'--uvloop', action="store_true",
help="Use the uvloop instead of default asyncio event loop")
if __name__ == '__main__':
args = ARGS.parse_args()
loop_name = 'asyncio'
if args.uvloop:
import uvloop
asyncio.DefaultEventLoopPolicy = uvloop.EventLoopPolicy
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
loop_name = 'uvloop'
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(loop, loop_name))
except KeyboardInterrupt:
pass
finally:
loop.stop()
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment