Skip to content

Instantly share code, notes, and snippets.

@iissme
Created December 26, 2018 08:32
Show Gist options
  • Save iissme/33d685ded4ef0fdddc09f0dc3882d261 to your computer and use it in GitHub Desktop.
Save iissme/33d685ded4ef0fdddc09f0dc3882d261 to your computer and use it in GitHub Desktop.
Simple asyncio interval runner
import asyncio
import logging
from abc import ABCMeta, abstractmethod
from datetime import datetime, timedelta
from functools import partial
from typing import Optional, Any
from uuid import uuid4
from dateutil import tz
from ntech.asyncio_utils import async_task
log = logging.getLogger(__name__)
class ABCIntervalRunner(metaclass=ABCMeta):
def __init__(self,
loop: asyncio.AbstractEventLoop,
interval: int,
schedule_times: Optional[int] = None,
run_at_start: Optional[bool] = False):
self._loop = loop
self.last_run_at = None
self.next_run_at = None
self._interval = interval
self.schedule_times = schedule_times
self._timer_handle = None
self._run_event = asyncio.Event(loop=loop)
self._run_task = None
if run_at_start:
if self.schedule_times is not None:
self.schedule_times -= 1
self.create_task()
else:
self.schedule_next_run()
@property
def interval(self):
return self._interval
@interval.setter
def interval(self, interval: int):
if self._timer_handle:
self._timer_handle.cancel()
self._interval = interval
self.schedule_next_run()
def schedule_next_run(self) -> None:
now = datetime.now(tz.tzlocal())
delta = timedelta(seconds=self._interval)
if not self.last_run_at:
self.last_run_at = now
else:
self.last_run_at = self.next_run_at
self.next_run_at = self.last_run_at + delta
run_at = self._loop.time() + delta.total_seconds()
self._run_event.clear()
self._timer_handle = self._loop.call_at(run_at, self.create_task)
log_msg = 'Interval runner scheduled next run at: {}'.format(self.next_run_at.isoformat())
if self.schedule_times is not None:
log_msg += ' Remaining runs: {}'.format(self.schedule_times)
self.schedule_times -= 1
log.info(log_msg)
def create_task(self) -> None:
def task_cb(tid, task):
try:
task.result()
log.info('Interval runner task %s finished successfully', tid)
except Exception as e:
log.warning('Interval runner task %s failed with:\n', tid, exc_info=e)
finally:
self._run_event.set()
task_id = uuid4().hex
log.info('Launching interval runner task %s', task_id)
self._run_task = async_task(callback_list=[partial(task_cb, task_id)],
disable_log_cb=True, # we log excs in task_cb
_loop=self._loop)(self.run)()
if self.schedule_times is None or self.schedule_times:
self.schedule_next_run()
def cancel(self) -> None:
if self._timer_handle:
self._timer_handle.cancel()
if self._run_task and not self._run_task.done():
self._run_task.cancel()
@abstractmethod
async def run(self):
raise NotImplementedError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment