Created
July 20, 2016 12:41
-
-
Save deeplook/9ac8386710d9c12b0e750ad35eb78f45 to your computer and use it in GitHub Desktop.
A class wrapping a callable to be called periodically inside a thread.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import threading | |
class PeriodicTimer(object): | |
""" | |
A class wrapping a callable to be called periodically inside a thread. | |
The wrapped function is called the first time immediately after creating | |
an instance of this class. | |
Example: | |
import datetime | |
def foo(): | |
print(datetime.datetime.now().isoformat()) | |
rt = PeriodicTimer(foo, 10, max_time=30) | |
rt.stop() | |
""" | |
def __init__(self, | |
a_callable, period, max_time=None, stop_dt=None, verbose=False, | |
args=[], kwargs={}): | |
""" | |
Constructor for creating a PeriodicTimer object. | |
The period is the number of seconds after which to call the given | |
callable (a_callable, a function or method), again. | |
If max_time is a number > 0 it is taken as the maximum number of seconds | |
after which the callable will not be called anymore. | |
If stop_dt is a datetime object it is taken as the moment in time | |
after which the callable will not be called anymore. Only one of | |
max_time or stop_dt can be set. | |
args and kwargs are a fixed list and dictionary with positional | |
and named parameters, respectively, to be passed to the callable | |
with every call. | |
""" | |
if max_time is not None and stop_dt is not None: | |
msg = 'Only one of either max_time or stop_dt can be given, but not both.' | |
raise AssertionError(msg) | |
self.a_callable = a_callable | |
self.period = period | |
self.max_time = max_time | |
self.stop_dt = stop_dt | |
self.verbose = verbose | |
self.args = args | |
self.kwargs = kwargs | |
self._timer = None | |
self.is_running = False | |
self.daemon = True | |
self.start_time = datetime.datetime.now() | |
self.first_run() | |
self.start() | |
def first_run(self): | |
self.is_running = True | |
if self.verbose: | |
print("** first_run:", self.args, self.kwargs) | |
self.a_callable(*self.args, **self.kwargs) | |
self.is_running = False | |
def _run(self): | |
t = datetime.datetime.now() | |
if self.max_time: | |
delta_t = t - self.start_time | |
if delta_t.total_seconds() >= self.max_time: | |
self.stop() | |
if self.verbose: | |
print("** _run: stopped b/c max_time reached", self.max_time) | |
return | |
elif self.stop_dt: | |
if t >= self.stop_dt: | |
self.stop() | |
if self.verbose: | |
print("** _run: stopped b/c stop_dt reached", self.stop_dt) | |
return | |
self.is_running = False | |
self.start() | |
if self.verbose: | |
print("** _run:", self.args, self.kwargs) | |
self.a_callable(*self.args, **self.kwargs) | |
def start(self): | |
if self.period > 0 and not self.is_running: | |
self._timer = threading.Timer(self.period, self._run) | |
self._timer.start() | |
self.is_running = True | |
def stop(self): | |
self._timer.cancel() | |
self.is_running = False | |
if self.verbose: | |
print("** stop:") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment