Skip to content

Instantly share code, notes, and snippets.

@bsodhi
Last active November 10, 2021 16:21
Show Gist options
  • Save bsodhi/7a7bcbc9626a4fbbc5357786c622d886 to your computer and use it in GitHub Desktop.
Save bsodhi/7a7bcbc9626a4fbbc5357786c622d886 to your computer and use it in GitHub Desktop.
from multiprocessing import Process, JoinableQueue, Value, Lock
import time
import logging
from abc import ABC, abstractmethod
class Runnable(ABC):
"""Abstract base class for tasks that can be submitted for
execution to the {Parrun} executor.
Arguments:
ABC {[type]} -- [description]
"""
@abstractmethod
def run(self):
pass
class Parrun(object):
"""
Implements a simple task executor backed by a JoinableQueue
queue which can have a bounded capacity. This is somewhat similar
to the java.util.concurrent.Executor.
"""
def __init__(self, queue_size=100, worker_count=4, status_callback=None):
"""Constructor for the class.
Keyword Arguments:
queue_size {int} -- Size of the task queue (default: {100})
worker_count {int} -- No. of worker processes to create (default: {4})
status_callback -- Callback function for updating status back to caller
"""
self.q = JoinableQueue(queue_size)
self.worker_count = worker_count
self.status_callback = status_callback
self.closing = False
self.items_done = Value("i", 0)
self.lock = Lock()
def submit_item(self, item: Runnable):
"""Adds a Runnable instance to the task queue. This method
blocks if the queue is full.
Arguments:
item {Runnable} -- Task object to run.
Raises:
ValueError: If the supplied item is not an instance of
Runnable class, an exception is raised.
"""
if not isinstance(item, Runnable):
raise ValueError("Only instances of Runnable can be submitted.")
if not self.closing:
self.q.put(item)
else:
logging.debug("Runner closing. Rejecting the item.")
def run_task(self):
"""Each of the worker processes will execute this method.
Until the executor is shutdown, this method keeps looping
to get the items off the task queue. The worker processes
block when the queue is empty. The method returns either
when the item in the queue is None, or when the executor is
shut down.
"""
while not self.closing:
try:
runnable = self.q.get()
if runnable == None:
break
done_count = runnable.run()
with self.lock:
if done_count:
self.items_done.value += done_count
if self.status_callback is not None:
self.status_callback(self.items_done.value)
except Exception as ex:
logging.exception("Error when handling task.")
def start_workers(self):
"""Starts the worker processes.
"""
for w in range(0, self.worker_count):
p = Process(target=self.run_task)
p.start()
logging.debug("Started {} consumers.".format(self.worker_count))
def shutdown(self):
logging.info("Shutting down workers.")
for w in range(0, self.worker_count):
self.q.put(None)
self.q.close()
self.closing = True
class MyRunnable(Runnable):
"""
Simple example for testing/demo.
"""
def __init__(self, id):
self.id = id
def run(self):
time.sleep(2)
print("Running task: "+str(self.id))
def main():
pr = Parrun()
pr.start_workers()
for s in range(0, 10):
pr.submit_item(MyRunnable(s))
pr.shutdown()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment