Skip to content

Instantly share code, notes, and snippets.

@RedmondLee
Created Dec 7, 2021
Embed
What would you like to do?
import asyncio
import subprocess
from collections import deque
from functools import partial
from selectors import DefaultSelector, EVENT_READ
class SelectorManager:
def __init__(self, proc):
self.proc = proc
self.fileobj = proc.stdout
self._selector = DefaultSelector()
def __enter__(self):
self._selector.register(self.fileobj, EVENT_READ, None)
return self._selector
def __exit__(self, exc_type, exc_val, exc_tb):
self._selector.unregister(self.fileobj)
self._selector.close()
self.proc.kill()
class EpolledTailFile:
def __init__(self, file_name):
self.file_name = file_name
self.ready_lines = deque()
self.read_wait = None
def _listener_daemon(self, proc):
with SelectorManager(proc) as selector:
while True:
events = selector.select()
for key, _ in events:
self.loop.call_soon_threadsafe(partial(self.addline, proc.stdout))
def addline(self, fileobj):
self.ready_lines.append(fileobj.readline())
if self.read_wait and not self.read_wait.done():
self.read_wait.set_result(Exception())
async def upstream(self):
while True:
while self.ready_lines:
self.read_wait = None
return self.ready_lines.popleft()
self.read_wait = asyncio.Future()
await self.read_wait
def start_listen(self):
proc = subprocess.Popen(
f'ping {self.file_name}',
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
shell=True,
close_fds=True,
)
self.loop = asyncio.get_running_loop()
self.loop.run_in_executor(None, self._listener_daemon, proc)
async def main():
async_reader = EpolledTailFile('www.baidu.com')
async_reader.start_listen()
while True:
print(await async_reader.upstream())
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment