Skip to content

Instantly share code, notes, and snippets.

@Devwarlt
Last active August 16, 2021 20:49
Show Gist options
  • Save Devwarlt/e2c499489959aeeb1709235062e7535b to your computer and use it in GitHub Desktop.
Save Devwarlt/e2c499489959aeeb1709235062e7535b to your computer and use it in GitHub Desktop.
A load balancer algorithm for thread balancing in order to avoid overwhelming processing and CPU exhaustion.
# Author: Devwarlt
# Date: 14 Aug 2021
#
# About this algorithm:
# A load balancer algorithm for thread balancing in order to avoid
# overwhelming processing and CPU exhaustion.
#
from typing import (
Callable, Iterable, Any,
List, Tuple, Union
)
from warnings import warn
from queue import Queue
from math import floor
from threading import Thread
from multiprocessing import cpu_count
class LoadBalancer(object):
# For more details, see following references:
# - "StackOverflow: Optimal number of threads per core",
# available at https://stackoverflow.com/a/1718522/13190706
# - "Quora: How many Python threads can I run?",
# available at https://qr.ae/pGgLJD
__THREADS_PER_CORE: Tuple[int, int] = (36, 40)
def __init__(
self, per_thread: int, total: int, callback: Callable[..., None]
) -> None:
super().__init__()
if per_thread == 0:
raise ZeroDivisionError(
"Attribute 'per_thread' couldn't be zero."
)
self.__per_thread: int = per_thread
self.__total: int = total
self.__callback: Callable[..., None] = callback
self.__queue: Queue = Queue(per_thread * total)
count_threads: int = floor(self.__total / self.__per_thread)
if self.__total % self.__per_thread != 0:
count_threads += 1
max_min_threads: int = None
max_max_threads: int = None
max_min_threads, max_max_threads\
= LoadBalancer.__measure_optimal_num_threads()
tutorial: str = "\n\t<<Tutorial>>\tConsider to change "\
"number of threads to the optimal threshold value. "\
f"Values can vary between {max_min_threads} to "\
f"{max_max_threads}. For more details, proceed "\
"changing number of items per thread according "\
"formula:\n\t<<Formula>>\tNumber of Threads = "\
"Total Items / Items Per Thread"
if count_threads < max_min_threads:
msg: str = "You can allocate more threads if you want."
msg += f"\n{tutorial}"
warn(msg, category=UserWarning)
if count_threads >= max_max_threads:
msg: str = "You are allocating more threads than usual "\
"and it can be harmful for your environment!"
msg += f"\n{tutorial}"
raise OverflowError(msg)
self.__count_threads: int = count_threads
@staticmethod
def __measure_optimal_num_threads() -> Tuple[int, int]:
num_cores: int = cpu_count()
optimal_num_threads: Tuple[int, int]\
= Tuple[int, int]([
i * num_cores for i in LoadBalancer.__THREADS_PER_CORE
])
return optimal_num_threads
def attach(
self, *args: Iterable[Union[List[Any], Tuple[Any]]]
) -> None:
self.__queue.put(args)
def begin(self) -> None:
for _ in range(self.__count_threads):
next_pendings: Iterable[Union[List[Any], Tuple[Any]]]\
= self.__fetch_pendings()
thread: Thread = Thread(
target=self.__callback_cluster,
kwargs={'items': next_pendings},
daemon=True
)
thread.start()
def __callback_cluster(
self, items: List[Tuple[Iterable[Union[List[Any], Tuple[Any]]]]]
) -> None:
for item in items:
self.__callback(*item)
def __fetch_pendings(
self
) -> List[Tuple[Iterable[Union[List[Any], Tuple[Any]]]]]:
pending_items: List[Tuple[Iterable[Union[List[Any], Tuple[Any]]]]]\
= []
for _ in range(self.__per_thread):
if self.__queue.empty():
break
pending_item: Tuple[Iterable[Union[List[Any], Tuple[Any]]]]\
= self.__queue.get()
pending_items.append(pending_item)
return pending_items
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment