Skip to content

Instantly share code, notes, and snippets.

@glemaitre
Created November 21, 2018 17:09
Show Gist options
  • Save glemaitre/b0f0dddcadd9ebe7df39b657c777ac50 to your computer and use it in GitHub Desktop.
Save glemaitre/b0f0dddcadd9ebe7df39b657c777ac50 to your computer and use it in GitHub Desktop.
import asyncio
import random
import time
from concurrent.futures import ProcessPoolExecutor
def simulator_submission():
"""Give ``None`` or a submission id."""
return random.choice([random.randint(0, 1000), None])
async def launch_submission(submission_queue, process_queue):
while True:
# to be replaced by the database query
generated_submission = simulator_submission()
await submission_queue.put(generated_submission)
# No need to wait, we surely have an item at least: None or the
# submission id.
submission_id = submission_queue.get_nowait()
if submission_id is None:
await asyncio.sleep(0)
continue
print(f'launch the training of the submission {submission_id}')
proc = await asyncio.subprocess.create_subprocess_shell(
'ssh glemaitre@anakim.u-bourgogne.fr "sleep {}"'
.format(random.randint(0, 3)),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT
)
await process_queue.put(proc)
print(f'Queuing the process {proc}')
async def collect_result(submission_queue, process_queue):
while True:
proc = await process_queue.get()
if proc.returncode is None:
# await process_queue.put(proc) lock proc.returncode to change
# status.
process_queue.put_nowait(proc)
await asyncio.sleep(0)
else:
print(f'collect the log of the submission {proc}')
await proc.communicate()
# just to simulate different time processing of collection.
await asyncio.sleep(random.randint(0, 3))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
submission_queue = asyncio.LifoQueue(loop=loop, maxsize=5)
process_queue = asyncio.LifoQueue(loop=loop, maxsize=5)
launcher = launch_submission(submission_queue, process_queue)
collecter = collect_result(submission_queue, process_queue)
loop.run_until_complete(asyncio.gather(launcher, collecter))
print('loop completed')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment