Skip to content

Instantly share code, notes, and snippets.

@jacobcase
Created December 17, 2015 23:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jacobcase/2626e2c2bca14b49996a to your computer and use it in GitHub Desktop.
Save jacobcase/2626e2c2bca14b49996a to your computer and use it in GitHub Desktop.
from concurrent.futures import ProcessPoolExecutor
import asyncio
import signal
import sys
import socket
async def read_client(client, wid, sid):
loop = asyncio.get_event_loop()
while True:
b = await loop.sock_recv(client, 2048)
if b:
print(b.decode('utf-8'), end="")
else:
break
print("WORKER {}: Shutting down socke connection {}".format(wid, sid))
client.shutdown(socket.SHUT_RDWR)
client.close()
def worker(wid, server_sock):
sid = 0
loop = asyncio.get_event_loop()
def handler():
print("WORKER {}: Exit signal received".format(wid))
loop.add_signal_handler(signal.SIGINT, handler)
loop.add_signal_handler(signal.SIGTERM, handler)
server = loop.create_server(sock=server_sock)
# s.setblocking(False)
while True:
print("WORKER {}: Waiting for next socket connection".format(wid))
client, addr = loop.run_until_complete(asyncio.ensure_future(loop.sock_accept(s)))
print("WORKER {}: Scheduling socket connection {} for reading".format(wid, sid))
asyncio.ensure_future(read_client(client, wid, sid))
sid = sid + 1
async def run_workers(count):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((socket.gethostname(), 4000))
s.listen(5)
loop = asyncio.get_event_loop()
futures = []
print("MASTER: Starting workers")
for i in range(count):
fut = loop.run_in_executor(None, worker, i, s)
futures.append(fut)
print("MASTER: All workers started")
await asyncio.wait(futures)
print("MASTER: All workers exited")
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=10)
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
def handler():
print("MASTER: Shutting down executor")
executor.shutdown()
loop.add_signal_handler(signal.SIGINT, handler)
print("MASTER: Running until complete")
loop.run_until_complete(asyncio.ensure_future(run_workers(10)))
print("MASTER: Closing Loop")
loop.close()
print("MASTER: Done, Exiting")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment