Last active
October 3, 2017 21:16
-
-
Save saintsGrad15/d835038162d181f914ebc6ed9a70fa82 to your computer and use it in GitHub Desktop.
A heartbeater class that implements the context manager interface.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from threading import Thread | |
from time import sleep | |
class Heartbeater(Thread): | |
""" | |
A class that adheres to the Thread and Context Manager interfaces. | |
""" | |
def __init__(self, work=None, work_args=None, work_kwargs=None, period=1, daemon=False): | |
""" | |
This class will asynchronously conduct work periodically. | |
It can be triggered using the Thread interface from which it inherits | |
or it can be used with the 'with' operator as a context manager. | |
:param work: A function that will be run at each heartbeat. | |
:param work_args: A list of positional arguments that will be passed to 'work()' at each invocation | |
:param work_kwargs: A dictionary of keyword arguments that will be passed to 'work()' at each invocation | |
:param period: The number of seconds to wait between each "heartbeat". | |
:param daemon: Whether or not the Heartbeater instance should continue to run | |
when the main process dies. | |
""" | |
Thread.__init__(self) | |
self.daemon = daemon | |
self.active = True # The flag indicating if the Thread should continue to run | |
if work is None: | |
self.work = Heartbeater._default_work | |
else: | |
self.work = work | |
if work_args is None: | |
self.work_args = list() | |
else: | |
self.work_args = work_args | |
if work_kwargs is None: | |
self.work_kwargs = dict() | |
else: | |
self.work_kwargs = work_kwargs | |
self.period = period | |
def run(self): | |
""" | |
Called asynchronously when <heartbeater_instance>.start() is called. | |
:return: None | |
""" | |
# Loop as long as 'self.active' is True | |
while self.active: | |
sleep_start = datetime.now() | |
# Wait for 'period' seconds. Terminate loop if 'self.active' becomes False. | |
while (datetime.now() - sleep_start).total_seconds() < self.period and self.active: | |
# A sleep implementation that only sleeps as long as 'self.active' is True | |
# Avoids unnecessary waiting for 'self.work's that run more quickly than 'self.period' seconds. | |
sleep(.09) # ...to avoid execessive CPU usage | |
if self.active: | |
self.work(*self.work_args, **self.work_kwargs) | |
def stop(self): | |
""" | |
Terminate the running thread. | |
:return: None | |
""" | |
self.active = False | |
def __enter__(self): | |
""" | |
Called when entering a context manager. | |
:return: None | |
""" | |
self.start() | |
return self | |
def __exit__(self, *args): | |
""" | |
Called when exiting a context manager. | |
:param args: Used to swallow unused positional arguments. | |
:return: None | |
""" | |
self.stop() | |
@staticmethod | |
def _default_work(*args, **kwargs): | |
""" | |
:param args: Used to swallow unused positional arguments. | |
:param kwargs: Used to swallow unused keyword arguments. | |
:return: None | |
""" | |
print "Heartbeat..." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment