Skip to content

Instantly share code, notes, and snippets.

@hrszanini
Created February 11, 2021 06:11
Show Gist options
  • Save hrszanini/9cddd8a72cb912034ce305664d68f7ec to your computer and use it in GitHub Desktop.
Save hrszanini/9cddd8a72cb912034ce305664d68f7ec to your computer and use it in GitHub Desktop.
ContinuousReference
from .structure import (
ContinuousReference
)
__all__ = ["ContinuousReference"]
from . import thread
class ContinuousReference:
reference_dictionary = {}
def __init__(self, thread_pool_size: int = 0, callback_function=None, **vargs):
self.thread_pool_size = thread_pool_size
for num in range(0, thread_pool_size):
self._add_thread(_id=num, callback_function=callback_function, **vargs)
def add_pool_thread(self, callback_function, thread_pool_size: int = 1, **vargs):
self.thread_pool_size += thread_pool_size
for num in range(len(self.reference_dictionary.keys()), self.thread_pool_size+1):
self._add_thread(_id=num, callback_function=callback_function, **vargs)
def _add_thread(self, _id, callback_function, **vargs):
key = f'thread-{_id}'
calling_thread = thread.ContinuosThread(
reference_dict=self.reference_dictionary,
key=key,
callback_function=callback_function,
**vargs
)
self.reference_dictionary[key] = calling_thread
self.reference_dictionary[key].start()
def __str__(self):
response = {}
for key, value in self.reference_dictionary.items():
response[key] = {
'function': value.callback_function.__name__,
'vargs': value.vargs
}
return str(response)
from threading import Thread
from loguru import logger
class ContinuosThread(Thread):
def __init__(self, reference_dict: dict, key: str, callback_function, **vargs):
super().__init__(name=key)
self.reference_dict = reference_dict
self.key = key
self.callback_function = callback_function
self.vargs = vargs
self.alive = True
def run(self):
try:
callback_return = self.callback_function(**self.vargs)
if callback_return:
callback_function = callback_return.pop('callback_function')
if type(callback_function) == type(self.callback_function):
self.callback_function = callback_function
self.vargs = callback_return
except Exception as e:
logger.error(e)
try:
self.reference_dict[self.key] = ContinuosThread(
self.reference_dict,
self.key,
self.callback_function,
**self.vargs
)
self.reference_dict[self.key].start()
except Exception:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment