Created
December 26, 2018 08:32
-
-
Save iissme/33d685ded4ef0fdddc09f0dc3882d261 to your computer and use it in GitHub Desktop.
Simple asyncio interval runner
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import logging | |
from abc import ABCMeta, abstractmethod | |
from datetime import datetime, timedelta | |
from functools import partial | |
from typing import Optional, Any | |
from uuid import uuid4 | |
from dateutil import tz | |
from ntech.asyncio_utils import async_task | |
log = logging.getLogger(__name__) | |
class ABCIntervalRunner(metaclass=ABCMeta): | |
def __init__(self, | |
loop: asyncio.AbstractEventLoop, | |
interval: int, | |
schedule_times: Optional[int] = None, | |
run_at_start: Optional[bool] = False): | |
self._loop = loop | |
self.last_run_at = None | |
self.next_run_at = None | |
self._interval = interval | |
self.schedule_times = schedule_times | |
self._timer_handle = None | |
self._run_event = asyncio.Event(loop=loop) | |
self._run_task = None | |
if run_at_start: | |
if self.schedule_times is not None: | |
self.schedule_times -= 1 | |
self.create_task() | |
else: | |
self.schedule_next_run() | |
@property | |
def interval(self): | |
return self._interval | |
@interval.setter | |
def interval(self, interval: int): | |
if self._timer_handle: | |
self._timer_handle.cancel() | |
self._interval = interval | |
self.schedule_next_run() | |
def schedule_next_run(self) -> None: | |
now = datetime.now(tz.tzlocal()) | |
delta = timedelta(seconds=self._interval) | |
if not self.last_run_at: | |
self.last_run_at = now | |
else: | |
self.last_run_at = self.next_run_at | |
self.next_run_at = self.last_run_at + delta | |
run_at = self._loop.time() + delta.total_seconds() | |
self._run_event.clear() | |
self._timer_handle = self._loop.call_at(run_at, self.create_task) | |
log_msg = 'Interval runner scheduled next run at: {}'.format(self.next_run_at.isoformat()) | |
if self.schedule_times is not None: | |
log_msg += ' Remaining runs: {}'.format(self.schedule_times) | |
self.schedule_times -= 1 | |
log.info(log_msg) | |
def create_task(self) -> None: | |
def task_cb(tid, task): | |
try: | |
task.result() | |
log.info('Interval runner task %s finished successfully', tid) | |
except Exception as e: | |
log.warning('Interval runner task %s failed with:\n', tid, exc_info=e) | |
finally: | |
self._run_event.set() | |
task_id = uuid4().hex | |
log.info('Launching interval runner task %s', task_id) | |
self._run_task = async_task(callback_list=[partial(task_cb, task_id)], | |
disable_log_cb=True, # we log excs in task_cb | |
_loop=self._loop)(self.run)() | |
if self.schedule_times is None or self.schedule_times: | |
self.schedule_next_run() | |
def cancel(self) -> None: | |
if self._timer_handle: | |
self._timer_handle.cancel() | |
if self._run_task and not self._run_task.done(): | |
self._run_task.cancel() | |
@abstractmethod | |
async def run(self): | |
raise NotImplementedError |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment