Created
March 15, 2010 11:43
-
-
Save thruflo/332769 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
"""`Twitter Streaming API`_ consumer based on tweepy_. | |
Consumer runs asynronously in a thread. Exposes a Tornado_ | |
web server with two request handlers: | |
* ``/update`` either starts or restarts the consumer | |
* ``/stop`` stops the consumer | |
When the consumer is running, it dumps the statuses in a queue | |
using ``torque.client.add_task``. | |
The implementation handles all of the requirements for backing off, | |
reconnecting etc. as specified in the `Twitter Streaming API`_ docs. | |
.. _`Twitter Streaming API`: http://apiwiki.twitter.com/Streaming-API-Documentation | |
.. _tweepy: http://github.com/joshthecoder/tweepy | |
.. _Tornado: http://www.tornadoweb.org/ | |
""" | |
import logging | |
import math | |
import httplib | |
import sys | |
import time | |
import urllib | |
from socket import timeout, error as socket_error | |
from threading import Thread, Timer | |
from tornado import httpserver, ioloop, web | |
from tornado import options as tornado_options | |
from tornado.options import define, options | |
define("debug", default=False, help="debug mode") | |
define('port', default=8890, help='port to run on') | |
from tweepy.auth import BasicAuthHandler | |
from tweepy.streaming import Stream, StreamListener | |
define("username", default='...', help="Twitter username") | |
define('password', default='...', help='Twitter password') | |
making_an_update = 'restarting consumer due to filter predicate changes' | |
handling_an_exit = 'restarting a consumer in response to an unexpected error' | |
define('min_update_delay', default=120, help='min secs b4 %s' % making_an_update) | |
define('max_update_delay', default=600, help='max secs b4 %s' % making_an_update) | |
define('min_exit_delay', default=0.25, help='min wait b4 %s' % handling_an_exit) | |
define('max_exit_delay', default=16, help='max wait b4 %s' % handling_an_exit) | |
from torque.client import add_task | |
from model import Predicate | |
from predicates import get_predicates | |
from utils import do_nothing, json_encode, json_decode, generate_hash | |
### Streaming API Consumer | |
TWITTER_SENDS_KEEP_ALIVES_EVERY = 30 # seconds | |
class Consumer(Stream): | |
"""Extends the tweepy default with: | |
#. linear and exponential backoff that strictly follows the docs_ | |
#. a unique ID that gets passed to two additional listener hooks: | |
``on_connect`` and ``on_exit`` | |
.. _docs: http://apiwiki.twitter.com/Streaming-API-Documentation#Connecting | |
""" | |
def __init__( | |
self, | |
listener, | |
username=None, | |
password=None, | |
timeout=TWITTER_SENDS_KEEP_ALIVES_EVERY + 1, | |
retry_count=None, | |
min_tcp_ip_delay=0.25, | |
max_tcp_ip_delay=16, | |
min_http_delay=10, | |
max_http_delay=240, | |
buffer_size=1500 | |
): | |
self.listener = listener | |
username = username and username or options.username | |
password = password and password or options.password | |
self.auth = BasicAuthHandler(username, password) | |
self.timeout = timeout | |
self.retry_count = retry_count | |
self.min_tcp_ip_delay = min_tcp_ip_delay | |
self.max_tcp_ip_delay = max_tcp_ip_delay | |
self.min_http_delay = min_http_delay | |
self.max_http_delay = max_http_delay | |
self.buffer_size = buffer_size | |
self.headers = {} | |
self.body = None | |
self.conn = None | |
self.running = False | |
self.id = generate_hash() | |
def _incr_tcp_ip_delay(self, delay): | |
"""When a network error (TCP/IP level) is encountered, | |
back off linearly. | |
""" | |
min_ = self.min_tcp_ip_delay | |
max_ = self.max_tcp_ip_delay | |
delay += min_ | |
if delay > max_: | |
delay = max_ | |
if delay == max_: | |
logging.warning('Consumer reached max tcp ip delay') | |
return delay | |
def _incr_http_delay(self, delay): | |
"""When an http error (> 200) is returned, back off | |
exponentially. | |
>>> d = _incr_http_delay(0) | |
>>> d | |
10 | |
>>> d = _incr_http_delay(d) | |
>>> d | |
30 | |
>>> d = _incr_http_delay(d) | |
>>> d | |
70 | |
>>> d = _incr_http_delay(d) | |
>>> d | |
150 | |
>>> d = _incr_http_delay(d) | |
>>> d | |
240 | |
>>> d = _incr_http_delay(d) | |
>>> d | |
240 | |
""" | |
min_ = self.min_http_delay | |
max_ = self.max_http_delay | |
delay = min_ + min_ * delay / 5 | |
if delay > max_: | |
delay = max_ | |
if delay == max_: | |
logging.warning('Consumer reached max http delay') | |
return delay | |
def _run(self): | |
# setup | |
self.auth.apply_auth(None, None, self.headers, None) | |
# enter loop | |
error_counter = 0 | |
tcp_ip_delay = 0 | |
http_delay = 0 | |
while self.running: | |
if self.retry_count and error_counter > self.retry_count: | |
# quit if error count greater than retry count | |
break | |
try: | |
self.conn = httplib.HTTPConnection(self.host) | |
self.conn.connect() | |
self.conn.sock.settimeout(self.timeout) | |
self.conn.request('POST', self.url, self.body, headers=self.headers) | |
resp = self.conn.getresponse() | |
if resp.status != 200: | |
if self.listener.on_error(resp.status) is False: | |
break | |
self.conn.close() | |
error_counter += 1 | |
http_delay = self._incr_http_delay(http_delay) | |
time.sleep(http_delay) | |
else: | |
if self.listener.on_connect(self.id) is False: | |
break | |
error_counter = 0 | |
tcp_ip_delay = 0 | |
http_delay = 0 | |
self._read_loop(resp) | |
except (timeout, socket_error): | |
if self.listener.on_timeout() == False: | |
break | |
if self.running is False: | |
break | |
self.conn.close() | |
error_counter += 1 | |
tcp_ip_delay = self._incr_tcp_ip_delay(tcp_ip_delay) | |
time.sleep(tcp_ip_delay) | |
except Exception, err: | |
logging.warning('Fatal exception in Consumer') | |
logging.warning(err, exc_info=True) | |
break | |
# cleanup | |
self.running = False | |
if self.conn: | |
self.conn.close() | |
# flag up | |
self.listener.on_exit(self.id) | |
def _read_loop(self, resp): | |
data = '' | |
while self.running: | |
if resp.isclosed(): | |
break | |
# read length | |
length = '' | |
while True: | |
c = resp.read(1) | |
if c == '\n': | |
break | |
length += c | |
length = length.strip() | |
if length.isdigit(): | |
length = int(length) | |
else: | |
continue | |
# read data and pass into listener | |
data = resp.read(length) | |
if self.listener.on_data(data) is False: | |
self.running = False | |
time.sleep(0.001) | |
class BaseManager(StreamListener): | |
"""Manages running consumer instances, ensuring that | |
we don't update the connection too often and that, | |
when we do, we use a `low latency restart`_. | |
Consumer's run in their own thread, providing notifications | |
``on_connect``, ``on_data``, ``on_exit``, etc. | |
.. _`low latency restart`_: http://bit.ly/idOra | |
""" | |
# we keep a dictionary of consumers, using the consumer.id | |
# as the dictionary key | |
consumers = {} | |
active_consumer_id = None | |
# ``threading.Timer`` objects | |
scheduled_exit_update = None | |
scheduled_predicate_update = None | |
# we keep a record of when we last re-started and of the | |
# number of updates pending, so that we can calculate | |
# when to restart the connection in response to a predicate | |
# change | |
last_update_timestamp = 0.0 | |
updates_pending = 0 | |
# we back off from repeated unexpected exits | |
exit_delay = 0 | |
# when a predicate update is scheduled in less than two seconds, | |
# we ignore other requests, to avoid starting two consumers | |
# at the same time | |
predup_schd_in_less_than_two_secs = False | |
# ctl flags | |
is_stopping = False | |
accept_updates = True | |
def __init__( | |
self, | |
min_update_delay=None, | |
max_update_delay=None, | |
min_exit_delay=None, | |
max_exit_delay=None | |
): | |
"""How frequently do we want to update the consumer in response | |
to a change in filter predicates and how much should we back | |
off in response to unexpected errors? | |
""" | |
# no matter how many changes there have been, | |
# don't restart more than every... | |
self.min_update_delay = min_update_delay is None and \ | |
options.min_update_delay or min_update_delay | |
# if there's been at least one change, update every... | |
self.max_update_delay = max_update_delay is None and \ | |
options.max_update_delay or max_update_delay | |
# what are the minimum and maximum delays when consumers are erroring? | |
self.min_exit_delay = min_exit_delay is None and \ | |
options.min_exit_delay or min_exit_delay | |
self.max_exit_delay = max_exit_delay is None and \ | |
options.max_exit_delay or max_exit_delay | |
def _incr_exit_delay(self, delay): | |
"""When a network error (TCP/IP level) is encountered, | |
back off linearly. | |
""" | |
min_ = self.min_exit_delay | |
max_ = self.max_exit_delay | |
delay += min_ | |
if delay > max_: | |
delay = max_ | |
if delay == max_: | |
logging.warning('Manager reached max unexpected exit delay') | |
return delay | |
def _calculate_delay(self, n): | |
"""Lower the delay more for the first few changes than for the last. | |
>>> m = BaseManager(min_update_delay=120, max_update_delay=600) | |
>>> [m._calculate_delay(i) for i in range(0, 100, 10)] | |
[600, 440, 370, 320, 280, 240, 210, 180, 150, 120] | |
Or to put it pictorially: | |
|. | |
| . | |
| . | |
d | . | |
| . | |
| . | |
_|___________________________________________________________ | |
| n | |
""" | |
# max_delay_in_minutes | |
maxm = self.max_update_delay / 60 | |
mins = self.min_update_delay | |
d = int(60 - 60 * math.sqrt(n) / 12) * maxm | |
if d < mins: | |
d == mins | |
return d | |
def _get_update_delay(self): | |
"""The more changes pending, the lower the delay. | |
""" | |
# no delay the first time | |
if self.last_update_timestamp is None: | |
return 0 | |
# otherwise calculate the delay | |
n = self.updates_pending | |
delay = self._calculate_delay(n) | |
# less the elapsed time | |
elapsed = time.time() - self.last_update_timestamp | |
delay = delay - elapsed | |
if delay < 0: | |
delay = 0 | |
# return the number of seconds to delay an update for | |
return delay | |
def _update_consumer(self): | |
"""Actually start a new consumer. | |
""" | |
logging.info('starting a new consumer') | |
logging.info(time.time()) | |
# store the timestamp | |
self.last_update_timestamp = time.time() | |
# flag that there are no updates pending | |
self.scheduled_exit_update = None | |
self.scheduled_predicate_update = None | |
self.updates_pending = 0 | |
self.predup_schd_in_less_than_two_secs = False | |
# get the latest filter predicates | |
follow, track = self.get_predicates() | |
# create the new consumer | |
consumer = Consumer(listener=self) | |
# put it in self.consumers | |
self.consumers[consumer.id] = consumer | |
logging.info(self.consumers) | |
# block with the consumer's run loop | |
consumer.filter(follow=follow, track=track) | |
def _schedule_exit_update(self): | |
"""In response to an exit, we cancel any updates pending | |
and schedule a new update | |
""" | |
if self.scheduled_predicate_update is not None: | |
self.scheduled_predicate_update.cancel() | |
if self.scheduled_exit_update is not None: | |
self.scheduled_exit_update.cancel() | |
self.exit_delay = self._incr_exit_delay(self.exit_delay) | |
logging.info('scheduling exit update in %s seconds' % self.exit_delay) | |
self.scheduled_exit_update = Timer(self.exit_delay, self._update_consumer) | |
self.scheduled_exit_update.start() | |
def on_error(self, status_code): | |
"""An error code in the 400s means we're doing something | |
wrong that isn't going to get better if we keep pinging | |
the thing: | |
#. 401 Unauthorized | |
#. 404 Resource does not exist. | |
#. 406 Not Acceptable (means our filter predicates are wrong) | |
#. 413 Too Long | |
#. 416 Range Unacceptable | |
Whereas responses in the 500s are likely to be temporary | |
and are worth retrying: | |
#. 500 Server Internal Error | |
#. 503 Service Overloaded | |
""" | |
# we're doing something wrong | |
if 400 < status_code < 500: | |
logging.warning('consumer error: %s' % status_code) | |
return False | |
# Twitter's doing something wrong | |
return True | |
def on_connect(self, consumer_id): | |
"""Close any other active consumers immediately_. | |
.. _immediately: http://bit.ly/idOra | |
""" | |
logging.info('on_connect %s' % consumer_id) | |
logging.info(self.consumers) | |
# a successful connection clears the exit delay | |
self.exit_delay = 0 | |
# the consumer is dead, long live the consumer | |
self.active_consumer_id = consumer_id | |
for id_, consumer in self.consumers.iteritems(): | |
if consumer.running and not id_ == consumer_id: | |
consumer.disconnect() | |
def on_exit(self, consumer_id): | |
"""If exit wasn't scheduled, start again. | |
.. _immediately: http://bit.ly/idOra | |
""" | |
logging.info('on_exit %s' % consumer_id) | |
logging.info(self.consumers) | |
# remove it from the dict of consumers we're manitaining | |
del self.consumers[consumer_id] | |
# if it exited unexpectedly | |
if consumer_id == self.active_consumer_id: | |
logging.info('active consumer exited unexpectedly') | |
self._schedule_exit_update() | |
elif not self.is_stopping and not self.consumers: | |
logging.info('only consumer exited without a stop request') | |
self._schedule_exit_update() | |
def on_data(self, data): | |
"""Overwrite to do something_ with the data. | |
.. _something: http://bit.ly/2NDL32 | |
""" | |
raise NotImplementedError | |
def get_predicates(self): | |
"""Overwrite to return either a ``list`` or ``None`` for | |
``follow_ids`` and ``track_terms``, e.g.: | |
follow_ids = None # [96574766, ...] | |
track_terms = None # ['bananas', ...] | |
return follow_ids, track_terms | |
""" | |
raise NotImplementedError | |
def update(self): | |
"""If there isn't an exit update scheduled, schedule | |
a predicate update at the earliest opportunity. | |
""" | |
logging.info('manager.update()') | |
if not self.accept_updates: | |
return | |
# exit updates take precedence | |
if self.scheduled_exit_update is not None: | |
logging.info('exit update took precedence') | |
return | |
# as do any updates scheduled with zero delay, | |
# until they've been completed | |
if self.predup_schd_in_less_than_two_secs: | |
logging.info('predicate update already scheduled in < 2s') | |
return | |
# cancel any previous updates | |
if self.scheduled_predicate_update is not None: | |
logging.info('canceling previous update') | |
self.scheduled_predicate_update.cancel() | |
# work out how long to wait | |
delay = self._get_update_delay() | |
logging.info('scheduling predicate update in %s seconds' % delay) | |
# safeguard against multiple requests at the crucial moment | |
if delay < 2: | |
logging.info('delay is less than 2s') | |
self.predup_schd_in_less_than_two_secs = True | |
# update the number of pending | |
self.updates_pending += 1 | |
logging.info('updates_pending is now: %s' % self.updates_pending) | |
# schedule the update | |
self.scheduled_predicate_update = Timer(delay, self._update_consumer) | |
self.scheduled_predicate_update.start() | |
def stop(self, accept_updates=True): | |
"""Close the connection, which will trigger the end of the | |
line for the consumer. | |
""" | |
self.is_stopping = True | |
if not accept_updates: | |
self.accept_updates = False | |
# cancel any pending updates | |
if self.scheduled_exit_update is not None: | |
self.scheduled_exit_update.cancel() | |
if self.scheduled_predicate_update is not None: | |
self.scheduled_predicate_update.cancel() | |
# make sure no consumer thinks it's active | |
self.active_consumer_id = None | |
for consumer in self.consumers.itervalues(): | |
consumer.disconnect() | |
class Manager(BaseManager): | |
"""Generate the filter predicates and handle the data. | |
""" | |
def get_predicates(self): | |
return get_predicates() | |
def on_data(self, data): | |
"""Queue each status to be processed asyncronously. | |
""" | |
add_task('/hooks/handle_status', params={'s': data}) | |
manager = Manager() | |
### Web Hooks | |
class UpdateHandler(web.RequestHandler): | |
"""Tell the manager to (re)start with updated filter predicates. | |
""" | |
def post(self): | |
logging.info('/update') | |
manager.update() | |
class StopHandler(web.RequestHandler): | |
"""Tell the manager to stop consuming. | |
""" | |
def post(self): | |
logging.info('/stop') | |
manager.stop() | |
# url mapping | |
mapping = [( | |
r'/update\/?', | |
UpdateHandler, | |
), ( | |
r'/stop\/?', | |
StopHandler | |
) | |
] | |
def main(): | |
# hack around a tornado bug | |
tornado_options.enable_pretty_logging = do_nothing | |
tornado_options.parse_command_line() | |
# create the tornado web server | |
application = web.Application(mapping, debug=options.debug) | |
http_server = httpserver.HTTPServer(application) | |
http_server.bind(options.port) | |
# start the consumer running | |
manager.update() | |
try: | |
# start the http server running in one process | |
http_server.start(1) | |
# start the async ioloop | |
ioloop.IOLoop.instance().start() | |
except KeyboardInterrupt, err: | |
manager.stop() | |
raise err | |
if __name__ == "__main__": | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment