Skip to content

Instantly share code, notes, and snippets.

@pirogoeth pirogoeth/poller.py
Last active Feb 22, 2019

Embed
What would you like to do?
a stdlib replacement for twisted's LoopingCall.
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
__doc__ = """
This module contains a polling caller, built on top of Python's
stdlib `threading` module. It can be used as an alternative to
twisted's `LoopingCall`.
Usage:
- Instantiate a `threading.Event` to use as the poller signal
```
keep_fetching = threading.Event()
keep_fetching.set() # We keep the flag _set_ as the poller should be running.
```
- Subclass the `PollingCall` class, providing your business
logic within `_execute`:
```
class FetchLoop(PollingCall):
def __init__(self, poller_flag: threading.Event, url: str, cadence: timedelta):
super(FetchLoop, self).__init__(poller_flag, cadence)
self._url = url
def _execute(self):
data = requests.get(self._url)
# do something with `data`
```
- Run the `PollingCall` instance:
```
fetcher = FetchLoop("https://example.org/api/my_endpoint")
fetcher.run()
```
- When "someCondition" happens, stop the poller by clearing the `keep_fetching` flag
```
keep_fetching.clear()
# Since `fetcher` is a `threading.Thread` subclass, you can `join()` on it to wait until it halts
fetcher.join(timeout=5)
```
"""
import logging
import threading
import time
from datetime import timedelta
log = logging.getLogger(__name__)
class PollingCall(threading.Thread):
def __init__(self, should_poll, cadence, *args, **kw):
super(_Poller, self).__init__(*args, **kw)
# threading.Event instance that controls thread lifecycle
self._should_poll = should_poll
# Polling cadence, as either an integer or timedelta
if isinstance(cadence, timedelta):
self._cadence = cadence.total_seconds()
elif isinstance(cadence, int):
self._cadence = cadence
else:
raise TypeError("Expected `cadence` to be either `int` or `datetime.timedelta`")
# Make this a daemon thread so it will die with the main thread
self.daemon = True
def run(self):
""" Actually run the poller loop.
"""
while self._should_poll.isSet():
try:
self._execute()
except Exception:
log.exception("poller encountered an exception")
finally:
time.sleep(self._cadence)
log.info("poller run loop has exited")
def _execute(self):
""" Execute a poller iteration
"""
# insert poller logic here
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.