Last active
February 22, 2019 17:26
-
-
Save pirogoeth/ea7448f5b3642fac9998782186dbd64a to your computer and use it in GitHub Desktop.
a stdlib replacement for twisted's LoopingCall.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import unicode_literals | |
__doc__ = """ | |
This module contains a polling caller, built on top of Python's | |
stdlib `threading` module. It can be used as an alternative to | |
twisted's `LoopingCall`. | |
Usage: | |
- Instantiate a `threading.Event` to use as the poller signal | |
``` | |
keep_fetching = threading.Event() | |
keep_fetching.set() # We keep the flag _set_ as the poller should be running. | |
``` | |
- Subclass the `PollingCall` class, providing your business | |
logic within `_execute`: | |
``` | |
class FetchLoop(PollingCall): | |
def __init__(self, poller_flag: threading.Event, url: str, cadence: timedelta): | |
super(FetchLoop, self).__init__(poller_flag, cadence) | |
self._url = url | |
def _execute(self): | |
data = requests.get(self._url) | |
# do something with `data` | |
``` | |
- Run the `PollingCall` instance: | |
``` | |
fetcher = FetchLoop("https://example.org/api/my_endpoint") | |
fetcher.run() | |
``` | |
- When "someCondition" happens, stop the poller by clearing the `keep_fetching` flag | |
``` | |
keep_fetching.clear() | |
# Since `fetcher` is a `threading.Thread` subclass, you can `join()` on it to wait until it halts | |
fetcher.join(timeout=5) | |
``` | |
""" | |
import logging | |
import threading | |
import time | |
from datetime import timedelta | |
log = logging.getLogger(__name__) | |
class PollingCall(threading.Thread): | |
def __init__(self, should_poll, cadence, *args, **kw): | |
super(_Poller, self).__init__(*args, **kw) | |
# threading.Event instance that controls thread lifecycle | |
self._should_poll = should_poll | |
# Polling cadence, as either an integer or timedelta | |
if isinstance(cadence, timedelta): | |
self._cadence = cadence.total_seconds() | |
elif isinstance(cadence, int): | |
self._cadence = cadence | |
else: | |
raise TypeError("Expected `cadence` to be either `int` or `datetime.timedelta`") | |
# Make this a daemon thread so it will die with the main thread | |
self.daemon = True | |
def run(self): | |
""" Actually run the poller loop. | |
""" | |
while self._should_poll.isSet(): | |
try: | |
self._execute() | |
except Exception: | |
log.exception("poller encountered an exception") | |
finally: | |
time.sleep(self._cadence) | |
log.info("poller run loop has exited") | |
def _execute(self): | |
""" Execute a poller iteration | |
""" | |
# insert poller logic here | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment