Skip to content

Instantly share code, notes, and snippets.

@niklasf
Last active September 23, 2019 19:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niklasf/b9fbe22e207184a23304f676ae7acd13 to your computer and use it in GitHub Desktop.
Save niklasf/b9fbe22e207184a23304f676ae7acd13 to your computer and use it in GitHub Desktop.
A ChildWatcher that does not require the asyncio loop to be running in the main thread
#!/usr/bin/python
import asyncio
import os
import sys
import threading
import time
import warnings
def setup_loop():
if sys.platform == "win32" or threading.current_thread() == threading.main_thread():
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
class PollingChildWatcher(asyncio.SafeChildWatcher):
def __init__(self):
super().__init__()
self._poll_handle = None
def attach_loop(self, loop):
assert loop is None or isinstance(loop, asyncio.AbstractEventLoop)
if self._loop is not None and loop is None and self._callbacks:
warnings.warn("A loop is being detached from a child watcher with pending handlers", RuntimeWarning)
if self._poll_handle is not None:
self._poll_handle.cancel()
self._loop = loop
if loop is not None:
self._poll_handle = self._loop.call_soon(self._poll)
# Prevent a race condition in case a child terminated
# during the switch.
self._do_waitpid_all()
def _poll(self):
if self._loop:
self._do_waitpid_all()
self._poll_handle = self._loop.call_later(0.2, self._poll)
policy = asyncio.get_event_loop_policy()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
watcher = PollingChildWatcher()
watcher.attach_loop(loop)
policy.set_child_watcher(watcher)
return loop
class ExampleProtocol(asyncio.SubprocessProtocol):
def __init__(self):
self.returncode = asyncio.get_running_loop().create_future()
def connection_made(self, transport):
print("Connection made")
self.transport = transport
def connection_lost(self, exc):
print("Connection lost")
self.returncode.set_result(self.transport.get_returncode())
def pipe_data_received(self, fd, data):
print("{}: {}".format(fd, data))
async def example_coroutine(command):
loop = asyncio.get_running_loop()
transport, protocol = await loop.subprocess_shell(ExampleProtocol, command)
code = await protocol.returncode
print("Exited with code", code)
def example():
loop = setup_loop()
try:
loop.run_until_complete(example_coroutine(sys.argv[1]))
finally:
loop.close()
def main():
thread = threading.Thread(target=example)
thread.start()
print("Running thread ...")
thread.join()
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage:", sys.argv[0], "<shell command>")
sys.exit(128)
else:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment