Created
October 18, 2022 04:26
-
-
Save tudorelu/71bca07e15b96fa659e1d2acd87920d1 to your computer and use it in GitHub Desktop.
Takes a bunch of async functions and calls them periodically, in a separate thread
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # --------------------------------------------- | |
| # | |
| # Class which takes a bunch of async functions | |
| # and calls them periodically in a separate | |
| # thread | |
| # | |
| # --------------------------------------------- | |
| # --------------------------------------------- | |
| # std imports | |
| # --------------------------------------------- | |
| import asyncio | |
| from time import sleep, monotonic | |
| from threading import Thread | |
| from typing import Callable, Dict, List | |
| from concurrent.futures import Future | |
| # --------------------------------------------- | |
| def _start_loop(loop): | |
| '''Sets `loop` as event loop calls `run_forever()` on it.''' | |
| asyncio.set_event_loop(loop) | |
| loop.run_forever() | |
| class TaskExitException(Exception): | |
| pass | |
| class _PeriodicTaskAsync: | |
| ''' Wrapper around a callable (function) enabling it to be called periodically ''' | |
| def __init__(self, function:Callable, repeat_seconds:float, | |
| max_exception_count:int=0, *args, **kwargs): | |
| ''' | |
| `function` : function to be periodically called | |
| `repeat_seconds` : seconds between function calls | |
| `max_exception_count` : number of times to skip raised exceptions | |
| before raising an exception | |
| `args` : arguments to be passed to function | |
| `kwargs` : keyword arguments to be passed to function | |
| ''' | |
| print(f'function {function}') | |
| print(f'repeat_seconds {repeat_seconds}') | |
| print(f'args {args}') | |
| print(f'kwargs {kwargs}') | |
| print(f'max_exception_count {max_exception_count}') | |
| self._func : Callable = function | |
| self._name : str = function.__name__ | |
| self._func_args : List = args | |
| self._func_kwargs : Dict = kwargs | |
| self._repeat_seconds : float = repeat_seconds | |
| self._keep_alive : bool = True | |
| self._exceptions_raised : List[Exception] = [] | |
| self._max_exception_cnt : int = max_exception_count | |
| print(f'Initialised task {self._name}') | |
| print(f'Seconds {self._repeat_seconds}') | |
| print(f'Kwargs {self._func_kwargs}') | |
| async def __call__(self, loop:asyncio.AbstractEventLoop): | |
| async def async_run_for_seconds(fn:Callable, secs:float, *args, **kwargs): | |
| ''' Calls an async function `fn`. If `fn` took less than `secs` seconds | |
| to compute, sleep until `secs` seconds have elapsed in total, so that | |
| this function takes a total of `secs` seconds to complete. ''' | |
| print(f'Calling task {fn.__name__}') | |
| print(f'Seconds {secs}') | |
| print(f'Kwargs {kwargs}') | |
| st_ = monotonic() | |
| ret = await fn(*args, **kwargs) | |
| elapsed = monotonic() - st_ | |
| await asyncio.sleep(max(0, secs - elapsed)) | |
| return ret | |
| while loop.is_running() and self._keep_alive: | |
| try: | |
| await async_run_for_seconds(self._func, self._repeat_seconds, *self._func_args, **self._func_kwargs) | |
| except Exception as e: | |
| self._exceptions_raised.append(e) | |
| if len(self._exceptions_raised) > self._max_exception_cnt: | |
| raise TaskExitException(f'Task {self._name} exited.') | |
| def stop(self): | |
| self._keep_alive = False | |
| class TaskHandler: | |
| ''' | |
| Class that runs a bunch of async functions periodically, in a separate thread. | |
| ''' | |
| def __init__(self, loop:asyncio.AbstractEventLoop=None): | |
| self._periodic_tasks : List[_PeriodicTaskAsync] = [] | |
| self._running_futures : List[Future] = [] | |
| self._started : bool = False | |
| if not loop: | |
| loop = asyncio.new_event_loop() | |
| self.loop = loop | |
| def add_task(self, task:Callable, every_seconds:float, *args, **kwargs): | |
| ''' Initialises periodic task.''' | |
| print(f'task {task}') | |
| print(f'every_seconds {every_seconds}') | |
| print(f'args {args}') | |
| print(f'kwargs {kwargs}') | |
| _task = _PeriodicTaskAsync(task, repeat_seconds=every_seconds, *args, **kwargs) | |
| self._periodic_tasks.append(_task) | |
| def start(self): | |
| ''' Starts executing the tasks. ''' | |
| if self._started: | |
| return | |
| # starts loop in other thread | |
| if not self.loop.is_running(): | |
| self._thread = Thread(target=_start_loop, args=(self.loop,), daemon=True) | |
| self._thread.start() | |
| # submits tasks to the loop | |
| for task in self._periodic_tasks: | |
| rf = asyncio.run_coroutine_threadsafe(task(self.loop), self.loop) | |
| self._running_futures.append(rf) | |
| self._started = True | |
| def stop(self): | |
| ''' Stops the execution of tasks. ''' | |
| if not self._started: | |
| return | |
| # stopping loop | |
| while self.loop.is_running(): | |
| self.loop.stop() | |
| sleep(1) | |
| # for pt in self._periodic_tasks: | |
| # pt.stop() | |
| self._thread.join() | |
| self.loop.close() | |
| self._started = False | |
| def check_for_exceptions(self) -> bool: | |
| ''' | |
| Checks whether any running future raised an exception. | |
| Returns: `True` if any of the running futures raised exceptions, `False` otherwise.''' | |
| for rt in self._running_futures: | |
| if rt._exception: | |
| return True | |
| return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment