Skip to content

Instantly share code, notes, and snippets.

@wallyqs
Created September 30, 2020 16:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wallyqs/22032793c22eea26243b60dabb4c6930 to your computer and use it in GitHub Desktop.
Save wallyqs/22032793c22eea26243b60dabb4c6930 to your computer and use it in GitHub Desktop.
threads-asyncio.py
import asyncio
import nats
import concurrent.futures
import time
def train(msg):
print("->> Train task started...", msg.data)
time.sleep(1) # seconds
print("<<- Train task done...", msg.data)
return f"DONE: {msg.data.decode()}"
async def run(loop):
nc = await nats.connect(servers=["nats://localhost:4222"])
# Executor for tasks in parallel.
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=5,
)
async def train_handler(msg):
print(f"Received '{msg.subject}' request...")
async def new_task(msg):
result = await loop.run_in_executor(executor, train, msg)
await nc.publish(msg.reply, result.encode())
# TODO: This is a future that has to be gathered before closing
# the connection with nc.close()
asyncio.create_task(new_task(msg))
await nc.subscribe("train", cb=train_handler)
print("Listening for train tasks...")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment