Skip to content

Instantly share code, notes, and snippets.

@mosquito
Last active March 1, 2024 16:46
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mosquito/d54547f1dc8d2eb32fd8b41c4690cde3 to your computer and use it in GitHub Desktop.
Save mosquito/d54547f1dc8d2eb32fd8b41c4690cde3 to your computer and use it in GitHub Desktop.
Asyncio PIPE
import asyncio
import fcntl
import os
from functools import partial
class AsyncPIPE:
@staticmethod
def create_pipe():
read_fd, write_fd = os.pipe()
for fd in (read_fd, write_fd):
fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
return read_fd, write_fd
def __init__(self, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.read_fd, self.write_fd = self.create_pipe()
self._write_futures = []
self._read_futures = []
self.loop.add_reader(self.read_fd, self.on_read)
self.loop.add_writer(self.write_fd, self.on_write)
def on_read(self):
if not self._read_futures:
return
f, size = self._read_futures.pop(0)
buff = os.read(self.read_fd, size)
f.set_result(buff)
def on_write(self):
if not self._write_futures:
return
f, data = self._write_futures.pop(0)
os.write(self.write_fd, data)
f.set_result(True)
def __del__(self):
self.close()
def close(self):
if self.read_fd is None:
return
read_fd, write_fd = self.read_fd, self.write_fd
self.read_fd, self.write_fd = None, None
self.loop.remove_reader(read_fd)
os.close(read_fd)
os.close(write_fd)
def write(self, data: bytes):
f = self.loop.create_future()
self._write_futures.append((f, data))
return f
def read(self, size):
f = self.loop.create_future()
self._read_futures.append((f, size))
return f
import asyncio
loop = asyncio.get_event_loop()
async def main():
pipe = AsyncPIPE(loop)
await pipe.write(b'foo' * 1024)
print(await pipe.read(1024))
pipe.close()
task = loop.create_task(main())
try:
loop.run_forever()
except:
task.cancel()
loop.run_until_complete(task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment