Skip to content

Instantly share code, notes, and snippets.

@kmkmjhyiiiu
Last active January 2, 2019 13:10
Show Gist options
  • Save kmkmjhyiiiu/2fc5c0a55c5810c3049185c5f47759db to your computer and use it in GitHub Desktop.
Save kmkmjhyiiiu/2fc5c0a55c5810c3049185c5f47759db to your computer and use it in GitHub Desktop.
Limit Number Of Process to run at same time. (Python Multiprocessing)
from multiprocessing import Process as Mp
from typing import Tuple
from time import sleep
class Process(Mp):
"""
Added Custom method for checking if process has started.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_started = False
def start(self):
super().start()
self._is_started = True
def set_daemon(self, daemon: bool):
self.daemon = daemon
def is_started(self) -> bool:
"""
:return: bool
"""
return self._is_started
class MultiProcess(object):
"""
Limit Number Of Process to run at same time.
"""
def __init__(self, limit: int = None, delay: float = None):
"""
:param limit: Number of limits of process to be run at same.
:param delay: Sets delay between running each process.
"""
self._limit = limit
self._processes = []
self._no_of_started_process = 0
self._started_process = []
self.delay = delay
def set_delay(self, delay: float):
"""
Sets delay between running each process.
:param delay:
:return:
"""
self.delay = delay
def add_process(self, func: callable, args: Tuple = (), kwargs: dict={}, group=None, daemon: bool = False):
"""
:param daemon:
:param func: function name to run
:param args: function positional arguments.
:param kwargs: function keyword arguments.
:param group: group inherited from base `Process` class.
:return:
"""
process_no = len(self._processes) + 1
name = "Process No.%s" % process_no
process_: Process = Process(target=func, args=args, kwargs=kwargs, group=group,
name=name)
process_.set_daemon(daemon)
self._processes.append(process_)
def set_limit(self, limit: int):
"""
You can set limit to run at a time with help of this function.
:param limit:
:return:
"""
self._limit = limit
def wait_for_processes(self):
"""
This gonna wait for started process to be finished.
:return:
"""
if self._no_of_started_process >= self._limit:
for s_process in self._started_process:
s_process.join()
self._no_of_started_process = 0
def wait_for_all_processes(self):
"""
This will wait for all processes to be completed.
:return:
"""
for pro in self._processes:
pro.join()
def start(self):
"""
This method starts the process and handle the rest. No death
:return:
"""
for process in self._processes:
self._started_process.append(process)
is_started = process.is_started()
if not is_started:
process.start()
self._no_of_started_process += 1
if self.delay is not None:
sleep(self.delay)
self.wait_for_processes()
self.wait_for_all_processes()
# set all processes to empty because they are done.
self._processes = []
@kmkmjhyiiiu
Copy link
Author

My concepts are still unclear regarding to multiprocessing in python. So there is any mistake or something to be improved. Then feel free to correct me. I have deadline of project in few hours so that's why my concepts are still unclear. :P

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment