Last active
August 16, 2021 20:49
-
-
Save Devwarlt/e2c499489959aeeb1709235062e7535b to your computer and use it in GitHub Desktop.
A load balancer algorithm for thread balancing in order to avoid overwhelming processing and CPU exhaustion.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Author: Devwarlt | |
# Date: 14 Aug 2021 | |
# | |
# About this algorithm: | |
# A load balancer algorithm for thread balancing in order to avoid | |
# overwhelming processing and CPU exhaustion. | |
# | |
from typing import ( | |
Callable, Iterable, Any, | |
List, Tuple, Union | |
) | |
from warnings import warn | |
from queue import Queue | |
from math import floor | |
from threading import Thread | |
from multiprocessing import cpu_count | |
class LoadBalancer(object): | |
# For more details, see following references: | |
# - "StackOverflow: Optimal number of threads per core", | |
# available at https://stackoverflow.com/a/1718522/13190706 | |
# - "Quora: How many Python threads can I run?", | |
# available at https://qr.ae/pGgLJD | |
__THREADS_PER_CORE: Tuple[int, int] = (36, 40) | |
def __init__( | |
self, per_thread: int, total: int, callback: Callable[..., None] | |
) -> None: | |
super().__init__() | |
if per_thread == 0: | |
raise ZeroDivisionError( | |
"Attribute 'per_thread' couldn't be zero." | |
) | |
self.__per_thread: int = per_thread | |
self.__total: int = total | |
self.__callback: Callable[..., None] = callback | |
self.__queue: Queue = Queue(per_thread * total) | |
count_threads: int = floor(self.__total / self.__per_thread) | |
if self.__total % self.__per_thread != 0: | |
count_threads += 1 | |
max_min_threads: int = None | |
max_max_threads: int = None | |
max_min_threads, max_max_threads\ | |
= LoadBalancer.__measure_optimal_num_threads() | |
tutorial: str = "\n\t<<Tutorial>>\tConsider to change "\ | |
"number of threads to the optimal threshold value. "\ | |
f"Values can vary between {max_min_threads} to "\ | |
f"{max_max_threads}. For more details, proceed "\ | |
"changing number of items per thread according "\ | |
"formula:\n\t<<Formula>>\tNumber of Threads = "\ | |
"Total Items / Items Per Thread" | |
if count_threads < max_min_threads: | |
msg: str = "You can allocate more threads if you want." | |
msg += f"\n{tutorial}" | |
warn(msg, category=UserWarning) | |
if count_threads >= max_max_threads: | |
msg: str = "You are allocating more threads than usual "\ | |
"and it can be harmful for your environment!" | |
msg += f"\n{tutorial}" | |
raise OverflowError(msg) | |
self.__count_threads: int = count_threads | |
@staticmethod | |
def __measure_optimal_num_threads() -> Tuple[int, int]: | |
num_cores: int = cpu_count() | |
optimal_num_threads: Tuple[int, int]\ | |
= Tuple[int, int]([ | |
i * num_cores for i in LoadBalancer.__THREADS_PER_CORE | |
]) | |
return optimal_num_threads | |
def attach( | |
self, *args: Iterable[Union[List[Any], Tuple[Any]]] | |
) -> None: | |
self.__queue.put(args) | |
def begin(self) -> None: | |
for _ in range(self.__count_threads): | |
next_pendings: Iterable[Union[List[Any], Tuple[Any]]]\ | |
= self.__fetch_pendings() | |
thread: Thread = Thread( | |
target=self.__callback_cluster, | |
kwargs={'items': next_pendings}, | |
daemon=True | |
) | |
thread.start() | |
def __callback_cluster( | |
self, items: List[Tuple[Iterable[Union[List[Any], Tuple[Any]]]]] | |
) -> None: | |
for item in items: | |
self.__callback(*item) | |
def __fetch_pendings( | |
self | |
) -> List[Tuple[Iterable[Union[List[Any], Tuple[Any]]]]]: | |
pending_items: List[Tuple[Iterable[Union[List[Any], Tuple[Any]]]]]\ | |
= [] | |
for _ in range(self.__per_thread): | |
if self.__queue.empty(): | |
break | |
pending_item: Tuple[Iterable[Union[List[Any], Tuple[Any]]]]\ | |
= self.__queue.get() | |
pending_items.append(pending_item) | |
return pending_items |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment