Skip to content

Instantly share code, notes, and snippets.

@AndreiPashkin
Last active January 7, 2020 17:08
Show Gist options
  • Save AndreiPashkin/04c287def6d165fc2832 to your computer and use it in GitHub Desktop.
Save AndreiPashkin/04c287def6d165fc2832 to your computer and use it in GitHub Desktop.
Python asyncio coroutines pipelining and exception propagation sandbox
#!/usr/bin/env python
# coding=utf-8
"""Client that demonstrates processing pipeline."""
import asyncio
from functools import partial
import logging
logging.basicConfig(level='DEBUG')
class Pipeline(object):
def __init__(self, *units):
in_ = output = self.input = asyncio.Queue()
tasks = []
for unit in units:
output = asyncio.Queue()
tasks.append(asyncio.async(unit(in_, output)))
in_ = output
self.output = output
self.future = asyncio.gather(*tasks)
self.future.add_done_callback(lambda _: self.future.cancel())
@asyncio.coroutine
def put(self, item):
yield from self.input.put(item)
@staticmethod
def _propagate_exc(future, target_future):
if future.cancelled():
target_future.cancel()
elif future.exception():
target_future.set_exception(future.exception())
else:
target_future.set_excpetion(
RuntimeError('One of the unit was prematurely '
'stopped execution.'))
@asyncio.coroutine
def _waiter(self, target):
result = (yield from self.output.get())
target.set_result(result)
@asyncio.coroutine
def get(self):
try:
result = self.output.get_nowait()
except asyncio.QueueEmpty:
result = asyncio.Future()
self.future.add_done_callback(partial(self._propagate_exc,
target_future=result))
asyncio.async(self._waiter(result))
return result
def empty(self):
return self.output.empty()
@asyncio.coroutine
def upper(input, output):
while True:
char = yield from input.get()
print('Got char: ', char)
yield from output.put(char.upper())
@asyncio.coroutine
def words(input, output):
"""Raises after collecting 3 words."""
counter = 0
word = []
while True:
if counter >= 3:
print('Oops!..')
raise RuntimeError('Oops!..')
e = yield from input.get()
if e.isalnum():
word.append(e)
print('Current word: ', word)
elif len(word) > 0:
yield from output.put(''.join(word))
counter += 1
word = []
@asyncio.coroutine
def tcp_echo_client(loop):
reader, writer = yield from asyncio.open_connection('127.0.0.1', 5555,
loop=loop)
pipeline = Pipeline(upper, words)
@asyncio.coroutine
def read():
while True:
data = yield from reader.read(100)
data = data.decode('utf-8')
print('Data: ', data)
for byte in data:
yield from pipeline.put(byte)
asyncio.async(read())
while True:
print('Pipeline output: ', (yield from pipeline.get()))
print('Close the socket')
writer.close()
@asyncio.coroutine
def background_stuff():
while True:
yield from asyncio.sleep(3)
print('Other background stuff...')
loop = asyncio.get_event_loop()
asyncio.async(background_stuff())
loop.run_until_complete(tcp_echo_client(loop))
loop.close()
#!/usr/bin/env python
# coding=utf-8
"""Server that accepts a client and send it strings from user input."""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = ''
port = 5555
s.bind((host, port))
s.listen(1)
print('Listening...')
conn, addr = s.accept()
print('Client ({}) connected.'.format(addr))
while True:
conn.send(bytes(input('Enter data to send: '), 'UTF-8'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment