Skip to content

Instantly share code, notes, and snippets.

@dpallot
Created May 13, 2015 06:04
Show Gist options
  • Save dpallot/3c5f3ed2631c1ebd1a40 to your computer and use it in GitHub Desktop.
Save dpallot/3c5f3ed2631c1ebd1a40 to your computer and use it in GitHub Desktop.
Pipes using objects, queues and asyncio
import asyncio
class Pipe:
_in_q = None
_out_q = None
_chained = False
def __init__(self, *args, **kwargs):
self.future = asyncio.async(self.run())
def __or__(self, other):
self._chained = True
self._out_q = asyncio.Queue()
other._in_q = self._out_q
return other
@asyncio.coroutine
def get(self):
item = yield from self._in_q.get()
return item
@asyncio.coroutine
def put(self, item):
if self._chained is True:
yield from self._out_q.put(item)
@asyncio.coroutine
def run(self):
pass
class TextGenerator(Pipe):
@asyncio.coroutine
def run(self):
yield from self.put('Hello world!')
yield from self.put('A pipe example')
yield from self.put('using asyncio')
yield from self.put('and some magic functions.')
class Grep(Pipe):
pattern = None
def __init__(self, pattern):
self.pattern = pattern
super().__init__()
@asyncio.coroutine
def run(self):
while True:
item = yield from self.get()
if self.pattern in item:
yield from self.put(item)
class Printer(Pipe):
@asyncio.coroutine
def run(self):
while True:
item = yield from self.get()
print (item)
TextGenerator() | Grep('a') | Grep('e') | Grep('i') | Grep('o') | Grep('u') | Printer()
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment