Skip to content

Instantly share code, notes, and snippets.

@tudorelu
Created October 18, 2022 04:26
Show Gist options
  • Select an option

  • Save tudorelu/71bca07e15b96fa659e1d2acd87920d1 to your computer and use it in GitHub Desktop.

Select an option

Save tudorelu/71bca07e15b96fa659e1d2acd87920d1 to your computer and use it in GitHub Desktop.
Takes a bunch of async functions and calls them periodically, in a separate thread
# ---------------------------------------------
#
# Class which takes a bunch of async functions
# and calls them periodically in a separate
# thread
#
# ---------------------------------------------
# ---------------------------------------------
# std imports
# ---------------------------------------------
import asyncio
from time import sleep, monotonic
from threading import Thread
from typing import Callable, Dict, List
from concurrent.futures import Future
# ---------------------------------------------
def _start_loop(loop):
'''Sets `loop` as event loop calls `run_forever()` on it.'''
asyncio.set_event_loop(loop)
loop.run_forever()
class TaskExitException(Exception):
pass
class _PeriodicTaskAsync:
''' Wrapper around a callable (function) enabling it to be called periodically '''
def __init__(self, function:Callable, repeat_seconds:float,
max_exception_count:int=0, *args, **kwargs):
'''
`function` : function to be periodically called
`repeat_seconds` : seconds between function calls
`max_exception_count` : number of times to skip raised exceptions
before raising an exception
`args` : arguments to be passed to function
`kwargs` : keyword arguments to be passed to function
'''
print(f'function {function}')
print(f'repeat_seconds {repeat_seconds}')
print(f'args {args}')
print(f'kwargs {kwargs}')
print(f'max_exception_count {max_exception_count}')
self._func : Callable = function
self._name : str = function.__name__
self._func_args : List = args
self._func_kwargs : Dict = kwargs
self._repeat_seconds : float = repeat_seconds
self._keep_alive : bool = True
self._exceptions_raised : List[Exception] = []
self._max_exception_cnt : int = max_exception_count
print(f'Initialised task {self._name}')
print(f'Seconds {self._repeat_seconds}')
print(f'Kwargs {self._func_kwargs}')
async def __call__(self, loop:asyncio.AbstractEventLoop):
async def async_run_for_seconds(fn:Callable, secs:float, *args, **kwargs):
''' Calls an async function `fn`. If `fn` took less than `secs` seconds
to compute, sleep until `secs` seconds have elapsed in total, so that
this function takes a total of `secs` seconds to complete. '''
print(f'Calling task {fn.__name__}')
print(f'Seconds {secs}')
print(f'Kwargs {kwargs}')
st_ = monotonic()
ret = await fn(*args, **kwargs)
elapsed = monotonic() - st_
await asyncio.sleep(max(0, secs - elapsed))
return ret
while loop.is_running() and self._keep_alive:
try:
await async_run_for_seconds(self._func, self._repeat_seconds, *self._func_args, **self._func_kwargs)
except Exception as e:
self._exceptions_raised.append(e)
if len(self._exceptions_raised) > self._max_exception_cnt:
raise TaskExitException(f'Task {self._name} exited.')
def stop(self):
self._keep_alive = False
class TaskHandler:
'''
Class that runs a bunch of async functions periodically, in a separate thread.
'''
def __init__(self, loop:asyncio.AbstractEventLoop=None):
self._periodic_tasks : List[_PeriodicTaskAsync] = []
self._running_futures : List[Future] = []
self._started : bool = False
if not loop:
loop = asyncio.new_event_loop()
self.loop = loop
def add_task(self, task:Callable, every_seconds:float, *args, **kwargs):
''' Initialises periodic task.'''
print(f'task {task}')
print(f'every_seconds {every_seconds}')
print(f'args {args}')
print(f'kwargs {kwargs}')
_task = _PeriodicTaskAsync(task, repeat_seconds=every_seconds, *args, **kwargs)
self._periodic_tasks.append(_task)
def start(self):
''' Starts executing the tasks. '''
if self._started:
return
# starts loop in other thread
if not self.loop.is_running():
self._thread = Thread(target=_start_loop, args=(self.loop,), daemon=True)
self._thread.start()
# submits tasks to the loop
for task in self._periodic_tasks:
rf = asyncio.run_coroutine_threadsafe(task(self.loop), self.loop)
self._running_futures.append(rf)
self._started = True
def stop(self):
''' Stops the execution of tasks. '''
if not self._started:
return
# stopping loop
while self.loop.is_running():
self.loop.stop()
sleep(1)
# for pt in self._periodic_tasks:
# pt.stop()
self._thread.join()
self.loop.close()
self._started = False
def check_for_exceptions(self) -> bool:
'''
Checks whether any running future raised an exception.
Returns: `True` if any of the running futures raised exceptions, `False` otherwise.'''
for rt in self._running_futures:
if rt._exception:
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment