Skip to content

Instantly share code, notes, and snippets.

@pirogoeth
Last active February 22, 2019 17:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pirogoeth/ea7448f5b3642fac9998782186dbd64a to your computer and use it in GitHub Desktop.
Save pirogoeth/ea7448f5b3642fac9998782186dbd64a to your computer and use it in GitHub Desktop.
a stdlib replacement for twisted's LoopingCall.
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
__doc__ = """
This module contains a polling caller, built on top of Python's
stdlib `threading` module. It can be used as an alternative to
twisted's `LoopingCall`.
Usage:
- Instantiate a `threading.Event` to use as the poller signal
```
keep_fetching = threading.Event()
keep_fetching.set() # We keep the flag _set_ as the poller should be running.
```
- Subclass the `PollingCall` class, providing your business
logic within `_execute`:
```
class FetchLoop(PollingCall):
def __init__(self, poller_flag: threading.Event, url: str, cadence: timedelta):
super(FetchLoop, self).__init__(poller_flag, cadence)
self._url = url
def _execute(self):
data = requests.get(self._url)
# do something with `data`
```
- Run the `PollingCall` instance:
```
fetcher = FetchLoop("https://example.org/api/my_endpoint")
fetcher.run()
```
- When "someCondition" happens, stop the poller by clearing the `keep_fetching` flag
```
keep_fetching.clear()
# Since `fetcher` is a `threading.Thread` subclass, you can `join()` on it to wait until it halts
fetcher.join(timeout=5)
```
"""
import logging
import threading
import time
from datetime import timedelta
log = logging.getLogger(__name__)
class PollingCall(threading.Thread):
def __init__(self, should_poll, cadence, *args, **kw):
super(_Poller, self).__init__(*args, **kw)
# threading.Event instance that controls thread lifecycle
self._should_poll = should_poll
# Polling cadence, as either an integer or timedelta
if isinstance(cadence, timedelta):
self._cadence = cadence.total_seconds()
elif isinstance(cadence, int):
self._cadence = cadence
else:
raise TypeError("Expected `cadence` to be either `int` or `datetime.timedelta`")
# Make this a daemon thread so it will die with the main thread
self.daemon = True
def run(self):
""" Actually run the poller loop.
"""
while self._should_poll.isSet():
try:
self._execute()
except Exception:
log.exception("poller encountered an exception")
finally:
time.sleep(self._cadence)
log.info("poller run loop has exited")
def _execute(self):
""" Execute a poller iteration
"""
# insert poller logic here
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment