Skip to content

Instantly share code, notes, and snippets.

@tino
Created December 5, 2018 20:24
Show Gist options
  • Save tino/89891af0c336d8b27fa80a031815e285 to your computer and use it in GitHub Desktop.
Save tino/89891af0c336d8b27fa80a031815e285 to your computer and use it in GitHub Desktop.
Master with worker processes in Python asyncio
#!/usr/bin/env python3.6
# or higher
import asyncio
import random
import signal
import subprocess
import sys
NUM_WORKERS = 2
_stop = False
_workers = []
_worker_log_readers = []
async def start_workers():
global _worker, _worker_log_reader
for i in range(1, NUM_WORKERS + 1):
worker = await asyncio.create_subprocess_exec(
"python",
"-u",
"test.py",
"worker",
str(i),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
_workers.append(worker)
_worker_log_readers.append(loop.create_task(stdout_reader(worker.stdout, i)))
async def worker():
global _stop
print("worker started")
while True:
await asyncio.sleep(2)
print("heartbeat")
if _stop and random.random() > 0.3: # Perhaps it hangs!
break
print("👋")
async def master(loop):
global _stop
await start_workers()
while True:
await asyncio.sleep(0.2)
if _stop:
await stop_workers(loop)
break
print("👋")
async def stop_workers(loop):
global _workers, _worker_log_readers
async def stop_worker(i):
print("Shutting down worker... ")
_workers[i].terminate()
try:
await asyncio.wait_for(_workers[i].wait(), timeout=3)
except asyncio.TimeoutError:
print(f"Worker {i + 1} didn't terminate, sending kill... 💀")
_workers[i].kill()
_worker_log_readers[i].cancel()
await asyncio.gather(*[stop_worker(i) for i in range(NUM_WORKERS)])
loop.stop()
print("done.")
async def stdout_reader(stdout, i):
while True:
line = await stdout.readline()
if not line:
print(f"worker {i} gone")
break
print(f"[worker-{i}]: {line.decode('utf-8')}", end="")
def stop(signum, stack,):
global _stop
print(f"Recieved {signum}, shutting down")
_stop = True
def noop(signum, stack):
print(f"Received {signum}, ignoring")
if __name__ == "__main__":
# Run `python main.py` to start the master. Ctrl-C to shut down gracefully
loop = asyncio.get_event_loop()
try:
if len(sys.argv) == 1:
print("starting master")
signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)
loop.create_task(master(loop))
loop.run_forever()
else:
print("starting worker")
# Ignoring SIGING (Ctrl-C) is necessary, as the worker will receive
# the signal too when the master is ctrl-c'ed. It will then die
# before proper shutdown.
signal.signal(signal.SIGINT, noop)
signal.signal(signal.SIGTERM, stop)
loop.run_until_complete(worker())
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment