Skip to content

Instantly share code, notes, and snippets.

@mosquito
Created September 27, 2017 11:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mosquito/e37e44bf951e9a926c1952c755fcc119 to your computer and use it in GitHub Desktop.
Save mosquito/e37e44bf951e9a926c1952c755fcc119 to your computer and use it in GitHub Desktop.
AsyncIO PIPE
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import asyncio\n",
"import sys\n",
"from typing import Tuple\n",
"from asyncio.streams import StreamWriter, FlowControlMixin\n",
"\n",
"reader, writer = None, None\n",
"\n",
"\n",
"READ_FD, WRITE_FD = int, int\n",
"\n",
"\n",
"class AIOPipe:\n",
" def __init__(self, fds: Tuple[READ_FD, WRITE_FD]=None, loop=None):\n",
" self.loop = loop or asyncio.get_event_loop()\n",
" \n",
" self._read_fd, self._write_fd = map(os.fdopen, fds if fds else os.pipe())\n",
"\n",
" self.reader = None\n",
" self.writer = None\n",
"\n",
" self._closed = False\n",
"\n",
" def fds(self) -> Tuple[READ_FD, WRITE_FD]:\n",
" return self._read_fd.fd(), self._write_fd.fd()\n",
" \n",
" @asyncio.coroutine\n",
" def initialize(self):\n",
" self.reader = asyncio.StreamReader(loop=self.loop)\n",
" reader_protocol = asyncio.StreamReaderProtocol(self.reader, loop=self.loop)\n",
" \n",
" writer_transport, writer_protocol = yield from loop.connect_write_pipe(\n",
" FlowControlMixin,\n",
" self._write_fd\n",
" )\n",
"\n",
" self.writer = StreamWriter(writer_transport, writer_protocol, None, loop=self.loop)\n",
"\n",
" yield from loop.connect_read_pipe(lambda: reader_protocol, self._read_fd)\n",
"\n",
" def close(self):\n",
" if self._closed == False:\n",
" return\n",
"\n",
" self._closed = True\n",
"\n",
" self.reader.close()\n",
" self.writer.close()\n",
"\n",
" def __del__(self):\n",
" self.close()\n",
"\n",
" \n",
"@asyncio.coroutine\n",
"def create_pipe(loop=None) -> AIOPipe:\n",
" aio_pipe = AIOPipe(loop=loop)\n",
" yield from aio_pipe.initialize()\n",
" return aio_pipe\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"b'one two \\n'\n"
]
}
],
"source": [
"import asyncio\n",
"\n",
"\n",
"async def main(loop):\n",
" pipe = await create_pipe(loop=loop)\n",
" \n",
" async def writer():\n",
" pipe.writer.write(b'one ')\n",
" await pipe.writer.drain()\n",
"\n",
" await asyncio.sleep(1, loop=loop)\n",
"\n",
" pipe.writer.write(b'two ')\n",
" await pipe.writer.drain()\n",
" \n",
" await asyncio.sleep(1, loop=loop)\n",
" \n",
" pipe.writer.write(b'\\n')\n",
" await pipe.writer.drain()\n",
"\n",
" loop.create_task(writer())\n",
"\n",
" print(await pipe.reader.readline())\n",
"\n",
"\n",
"loop = asyncio.get_event_loop()\n",
"loop.run_until_complete(main(loop))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment