Last active

Embed URL

HTTPS clone URL

SSH clone URL

You can clone with HTTPS or SSH.

Download Gist

a client for /bin/cat in Python 3.4 asyncio

View catclient.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
#!/usr/bin/env python3.4
 
 
import asyncio
 
 
class CatClient(asyncio.SubprocessProtocol):
"""meows at /bin/cat"""
 
def ping(self):
"""send data to the process"""
data = b'meow'
print('writing data %r to subprocess' % (data,))
self.stdin.write(data)
# Schedule another ping call
self._schedule_ping()
 
 
def _schedule_ping(self):
"""schedules a ping call for 1 second in the future"""
asyncio.get_event_loop().call_later(1, self.ping)
 
 
def connection_made(self, transport):
"""called when the process is ready to be conversed with"""
# This is the subprocess' stdin, not ours. We write to it
self.stdin = transport.get_pipe_transport(0)
# Sechedule the first ping call
self._schedule_ping()
 
 
def pipe_data_received(self, fd, data):
print('received %r on fd %d' % (data, fd))
 
 
loop = asyncio.get_event_loop()
 
# This coroutine will return when the subprocess has started
coro = loop.subprocess_exec(CatClient, '/bin/cat')
loop.run_until_complete(coro)
 
# Now communicate with it
loop.run_forever()
 
 
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.