Skip to content

Instantly share code, notes, and snippets.

@HQJaTu
Created September 23, 2020 07:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save HQJaTu/345f7147065f1e10587169dc36cc1edb to your computer and use it in GitHub Desktop.
Save HQJaTu/345f7147065f1e10587169dc36cc1edb to your computer and use it in GitHub Desktop.
Python asyncio ordering test
#!/usr/bin/env python3
# vim: autoindent tabstop=4 shiftwidth=4 expandtab softtabstop=4 filetype=python
# Proof-of-Concept for https://stackoverflow.com/q/64017656/1548275
# Do Python asyncio Streams maintain order over multiple writers and readers?
import sys
import argparse
import logging
import asyncio
from datetime import datetime
from pprint import pprint
from random import randrange
log = logging.getLogger(__name__)
DEFAULT_TCP_PORT = 8888
def _setup_logger():
log_formatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s] %(message)s")
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setFormatter(log_formatter)
console_handler.propagate = False
logging.getLogger().addHandler(console_handler)
log.setLevel('DEBUG')
def run_server(loop, tcp_port):
log.info("Starting server in localhost TCP-port: %d" % tcp_port)
coro = asyncio.start_server(_server_coro, '127.0.0.1', tcp_port, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
socket_info = server.sockets[0].getsockname()
log.info("Serving on: %s:%d" % (socket_info[0], socket_info[1]))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
async def _server_coro(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
delay = randrange(3000)
log.debug("Received %s from %s" % (message, addr))
message_out = "Got: '%s'" % message
await asyncio.sleep(delay / 1000)
log.debug("Sending after delay of %d ms: %s" % (delay, message))
writer.write(message_out.encode('UTF-8'))
await writer.drain()
log.debug("Close the client socket")
writer.close()
def run_client(loop, tcp_port, count_connections):
loop.run_until_complete(_client_coro(loop, tcp_port, count_connections))
async def _client_coro(loop, tcp_port, count_connections):
tasks = [asyncio.create_task(_client_task(loop, tcp_port, conn_idx)) for conn_idx in range(count_connections)]
log.info("Running client to localhost TCP-port: %d" % tcp_port)
await asyncio.wait(tasks)
async def _client_task(loop, tcp_port, conn_idx):
message = "Test %d" % (conn_idx + 1)
reader, writer = await asyncio.open_connection('127.0.0.1', tcp_port, loop=loop)
log.debug('Send: %s' % message)
writer.write(message.encode())
data = await reader.read(100)
log.debug('Received: %s' % data.decode())
log.debug('Close the socket')
writer.close()
def main():
parser = argparse.ArgumentParser(description='Name.com DNS tool')
parser.add_argument('--server', action='store_true',
help='Run as a test server')
parser.add_argument('--client', action='store_true',
help='Run as a test client')
parser.add_argument('--port', '-p',
default=DEFAULT_TCP_PORT, type=int,
help="TCP-port for server to listen or client to connect. Default: %d" % DEFAULT_TCP_PORT)
parser.add_argument('--count', '-c',
default=1, type=int,
help="Number of client connections to make towards server. Default: 1")
args = parser.parse_args()
_setup_logger()
if not args.server and not args.client:
log.error("Need either --server or --client!")
exit(2)
# Init async I/O
async_loop = asyncio.get_event_loop()
if args.server:
run_server(async_loop, args.port)
elif args.client:
run_client(async_loop, args.port, args.count)
else:
raise ValueError("Internal: Duh?")
# In a nice and calm fashion, shut down any possible tasks that are pending.
for task in asyncio.Task.all_tasks():
task.cancel()
async_loop.run_until_complete(async_loop.shutdown_asyncgens())
async_loop.close()
log.info("Done.")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment