Created
September 30, 2020 16:18
-
-
Save wallyqs/22032793c22eea26243b60dabb4c6930 to your computer and use it in GitHub Desktop.
threads-asyncio.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import nats | |
import concurrent.futures | |
import time | |
def train(msg): | |
print("->> Train task started...", msg.data) | |
time.sleep(1) # seconds | |
print("<<- Train task done...", msg.data) | |
return f"DONE: {msg.data.decode()}" | |
async def run(loop): | |
nc = await nats.connect(servers=["nats://localhost:4222"]) | |
# Executor for tasks in parallel. | |
executor = concurrent.futures.ThreadPoolExecutor( | |
max_workers=5, | |
) | |
async def train_handler(msg): | |
print(f"Received '{msg.subject}' request...") | |
async def new_task(msg): | |
result = await loop.run_in_executor(executor, train, msg) | |
await nc.publish(msg.reply, result.encode()) | |
# TODO: This is a future that has to be gathered before closing | |
# the connection with nc.close() | |
asyncio.create_task(new_task(msg)) | |
await nc.subscribe("train", cb=train_handler) | |
print("Listening for train tasks...") | |
if __name__ == '__main__': | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(run(loop)) | |
loop.run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment