Last active
November 10, 2021 16:21
-
-
Save bsodhi/7a7bcbc9626a4fbbc5357786c622d886 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Process, JoinableQueue, Value, Lock | |
import time | |
import logging | |
from abc import ABC, abstractmethod | |
class Runnable(ABC): | |
"""Abstract base class for tasks that can be submitted for | |
execution to the {Parrun} executor. | |
Arguments: | |
ABC {[type]} -- [description] | |
""" | |
@abstractmethod | |
def run(self): | |
pass | |
class Parrun(object): | |
""" | |
Implements a simple task executor backed by a JoinableQueue | |
queue which can have a bounded capacity. This is somewhat similar | |
to the java.util.concurrent.Executor. | |
""" | |
def __init__(self, queue_size=100, worker_count=4, status_callback=None): | |
"""Constructor for the class. | |
Keyword Arguments: | |
queue_size {int} -- Size of the task queue (default: {100}) | |
worker_count {int} -- No. of worker processes to create (default: {4}) | |
status_callback -- Callback function for updating status back to caller | |
""" | |
self.q = JoinableQueue(queue_size) | |
self.worker_count = worker_count | |
self.status_callback = status_callback | |
self.closing = False | |
self.items_done = Value("i", 0) | |
self.lock = Lock() | |
def submit_item(self, item: Runnable): | |
"""Adds a Runnable instance to the task queue. This method | |
blocks if the queue is full. | |
Arguments: | |
item {Runnable} -- Task object to run. | |
Raises: | |
ValueError: If the supplied item is not an instance of | |
Runnable class, an exception is raised. | |
""" | |
if not isinstance(item, Runnable): | |
raise ValueError("Only instances of Runnable can be submitted.") | |
if not self.closing: | |
self.q.put(item) | |
else: | |
logging.debug("Runner closing. Rejecting the item.") | |
def run_task(self): | |
"""Each of the worker processes will execute this method. | |
Until the executor is shutdown, this method keeps looping | |
to get the items off the task queue. The worker processes | |
block when the queue is empty. The method returns either | |
when the item in the queue is None, or when the executor is | |
shut down. | |
""" | |
while not self.closing: | |
try: | |
runnable = self.q.get() | |
if runnable == None: | |
break | |
done_count = runnable.run() | |
with self.lock: | |
if done_count: | |
self.items_done.value += done_count | |
if self.status_callback is not None: | |
self.status_callback(self.items_done.value) | |
except Exception as ex: | |
logging.exception("Error when handling task.") | |
def start_workers(self): | |
"""Starts the worker processes. | |
""" | |
for w in range(0, self.worker_count): | |
p = Process(target=self.run_task) | |
p.start() | |
logging.debug("Started {} consumers.".format(self.worker_count)) | |
def shutdown(self): | |
logging.info("Shutting down workers.") | |
for w in range(0, self.worker_count): | |
self.q.put(None) | |
self.q.close() | |
self.closing = True | |
class MyRunnable(Runnable): | |
""" | |
Simple example for testing/demo. | |
""" | |
def __init__(self, id): | |
self.id = id | |
def run(self): | |
time.sleep(2) | |
print("Running task: "+str(self.id)) | |
def main(): | |
pr = Parrun() | |
pr.start_workers() | |
for s in range(0, 10): | |
pr.submit_item(MyRunnable(s)) | |
pr.shutdown() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment