Skip to content

Instantly share code, notes, and snippets.

@freol35241
Last active February 23, 2021 13:31
Show Gist options
  • Save freol35241/704d3126cbbbf89bd2ab9eca8e7d8f32 to your computer and use it in GitHub Desktop.
Save freol35241/704d3126cbbbf89bd2ab9eca8e7d8f32 to your computer and use it in GitHub Desktop.
import enum
import time
import uuid
import logging
from typing import Any, Callable, Dict
from concurrent.futures import ProcessPoolExecutor, Future, Executor
class Status(enum.Enum):
ONGOING = "ongoing"
QUEUED = "queued"
FAILED = "failed"
SUCCESSFUL = "successful"
### RESULTS "DB" ###
RESULTS: Dict[uuid.UUID, Any] = dict()
def set_result(job_id: uuid.UUID, result: Any):
RESULTS[job_id] = result
def get_result(job_id: uuid.UUID) -> Any:
return RESULTS[job_id]
###
class Dispatcher:
jobs: Dict[uuid.UUID, Future] = dict()
def __init__(self, pool: Executor = ProcessPoolExecutor()):
self.pool = pool
def submit(self, fn: Callable, *args, **kwargs) -> uuid.UUID:
job_id = uuid.uuid4()
future = self.pool.submit(fn, *args, **kwargs)
def _callback(fut: Future):
try:
set_result(job_id, fut.result())
except Exception:
logging.exception(f"Job with job_id {job_id} failed!")
future.add_done_callback(_callback)
self.jobs[job_id] = future
return job_id
def status(self, job_id: uuid.UUID) -> Status:
future: Future = self.jobs[job_id] # Will raise KeyError if job_id is invalid
if not future.done():
if future.running():
return Status.ONGOING
return Status.QUEUED
if future.exception():
return Status.FAILED
return Status.SUCCESSFUL
if __name__ == "__main__":
# Blocking dummy task
def task(sleep):
time.sleep(sleep)
if sleep > 3:
raise ValueError
return f"Slept for {sleep} seconds!"
# Dispatcher with default ProcessPoolExecutor
d = Dispatcher()
# Submit 5 jobs
job_ids = []
for i in range(5):
job_ids.append(d.submit(task, sleep=i))
# Check if the last task has been started yet
print(Dispatcher().status(job_ids[-1]))
# Wait until finished
time.sleep(5)
# Print job statuses
for job_id in job_ids:
status = d.status(job_id)
print(f"{job_id}: {status}")
if status == Status.SUCCESSFUL:
print(get_result(job_id))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment