Skip to content

Instantly share code, notes, and snippets.

@iklobato
Last active February 22, 2024 14:55
Show Gist options
  • Save iklobato/59e1a99415225647d4b9b28a5662ea70 to your computer and use it in GitHub Desktop.
Save iklobato/59e1a99415225647d4b9b28a5662ea70 to your computer and use it in GitHub Desktop.
elements_distribution.py
from time import time, sleep
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from itertools import cycle, product
from queue import Queue
from typing import Union
from tabulate import tabulate
NUMBER_OF_NOTEBOOKS = 20
NUMBER_OF_QUEUES = 4
NUMBER_OF_THREADS = 16
@dataclass
class Notebook:
id: int
name: str
processed: bool = False
def make_distribution(ids_to_distribute, group: list, parsed: bool = False) -> list:
if not group:
group = [[] for _ in range(NUMBER_OF_QUEUES)]
if parsed:
for i in cycle(group):
if not ids_to_distribute:
return group
num = ids_to_distribute.pop(0)
i.append(Notebook(id=num, name=f"Notebook {num}"))
else:
return group
def iter_distribution(values_to_distribute, group, parsed=False):
for i in make_distribution(values_to_distribute, group, parsed):
yield i
def populate_notebooks(_queue: Union[Queue, list]):
"""
Populates the queues with given number of notebooks
:param _queue:
:return:
"""
if isinstance(_queue, list):
for i in range(_queue):
n = Notebook(id=i+1, name=f"Notebook {i}")
_queue[i].put(n)
return
for i in range(NUMBER_OF_NOTEBOOKS):
n = Notebook(id=i+1, name=f"Notebook {i}")
_queue.put(n)
def process_notebook(n: Notebook) -> Notebook:
sleep(1)
n.processed = True
return n
def process_combination(combination, output_dict: dict) -> dict:
notebook_no, queue_no, executors_no = combination
print(f'Creating scenario: {notebook_no} notebooks, {queue_no} queues, {executors_no} executors')
groups = make_distribution([i for i in range(notebook_no)], [], True)
start_time = time()
with ThreadPoolExecutor(max_workers=executors_no) as executor:
for g in groups:
for n in g:
executor.submit(process_notebook, n)
end_time = time()
elapsed_time = end_time - start_time
output_dict[(notebook_no, queue_no, executors_no)] = elapsed_time
return output_dict
def main():
results_table = {}
comb = product(range(1, NUMBER_OF_NOTEBOOKS + 1), range(1, NUMBER_OF_QUEUES + 1), range(1, NUMBER_OF_THREADS + 1))
for ix, c in enumerate(comb):
results_table = process_combination(c, results_table)
if ix == 10:
break
sorted_results = sorted(results_table.items(), key=lambda x: x[1])[:10]
print(tabulate(sorted_results, headers=["Combination", "Time"]))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment