Skip to content

Instantly share code, notes, and snippets.

@kostja
Last active November 19, 2019 15:32
Show Gist options
  • Save kostja/9333f4e4fcbdfda613c30b962c162e1f to your computer and use it in GitHub Desktop.
Save kostja/9333f4e4fcbdfda613c30b962c162e1f to your computer and use it in GitHub Desktop.
test.py
kostja@hulk ~ % ./test.py
^CShutdown requested
/usr/lib/python3.7/asyncio/unix_events.py:861: RuntimeWarning: A loop is being detached from a child watcher with pending handlers
RuntimeWarning)
/usr/lib/python3.7/asyncio/unix_events.py:137: RuntimeWarning: coroutine 'setup_signal_handlers.<locals>.shutdown' was never awaited
del self._signal_handlers[sig]
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
File "./test.py", line 59, in <module>
asyncio.run(main())
File "/usr/lib/python3.7/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
concurrent.futures._base.CancelledError
kostja@hulk ~ %
#!python3
import asyncio
import io
import os
import signal
import functools
def setup_signal_handlers():
loop = asyncio.get_event_loop()
async def shutdown():
print("Shutdown requested")
tasks = [
task
for task in asyncio.Task.all_tasks()
if task is not asyncio.tasks.Task.current_task()
]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
loop.add_signal_handler(signal.SIGINT, asyncio.create_task, shutdown())
loop.add_signal_handler(signal.SIGTERM, asyncio.create_task, shutdown())
async def run_test():
process = None
try:
process = await asyncio.create_subprocess_exec(
"/usr/bin/sleep",
"5",
stderr=asyncio.subprocess.STDOUT,
stdout=asyncio.subprocess.PIPE,
env=dict(os.environ, KEY="value"),
preexec_fn=os.setsid,
)
stdout, _ = await asyncio.wait_for(process.communicate(), 10)
except (asyncio.TimeoutError, CancelledError) as e:
process.terminate()
stdout, _ = await process.communicate()
print("Process exit status: {}".format(process.returncode))
print(stdout.decode())
async def main():
loop = asyncio.get_event_loop()
setup_signal_handlers()
tasks = []
for i in range(1, 10):
tasks.append(loop.create_task(run_test()))
while len(tasks):
_, tasks = await asyncio.wait(tasks)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment