Skip to content

Instantly share code, notes, and snippets.

@thruflo
Created March 15, 2010 11:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thruflo/332769 to your computer and use it in GitHub Desktop.
Save thruflo/332769 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""`Twitter Streaming API`_ consumer based on tweepy_.
Consumer runs asynronously in a thread. Exposes a Tornado_
web server with two request handlers:
* ``/update`` either starts or restarts the consumer
* ``/stop`` stops the consumer
When the consumer is running, it dumps the statuses in a queue
using ``torque.client.add_task``.
The implementation handles all of the requirements for backing off,
reconnecting etc. as specified in the `Twitter Streaming API`_ docs.
.. _`Twitter Streaming API`: http://apiwiki.twitter.com/Streaming-API-Documentation
.. _tweepy: http://github.com/joshthecoder/tweepy
.. _Tornado: http://www.tornadoweb.org/
"""
import logging
import math
import httplib
import sys
import time
import urllib
from socket import timeout, error as socket_error
from threading import Thread, Timer
from tornado import httpserver, ioloop, web
from tornado import options as tornado_options
from tornado.options import define, options
define("debug", default=False, help="debug mode")
define('port', default=8890, help='port to run on')
from tweepy.auth import BasicAuthHandler
from tweepy.streaming import Stream, StreamListener
define("username", default='...', help="Twitter username")
define('password', default='...', help='Twitter password')
making_an_update = 'restarting consumer due to filter predicate changes'
handling_an_exit = 'restarting a consumer in response to an unexpected error'
define('min_update_delay', default=120, help='min secs b4 %s' % making_an_update)
define('max_update_delay', default=600, help='max secs b4 %s' % making_an_update)
define('min_exit_delay', default=0.25, help='min wait b4 %s' % handling_an_exit)
define('max_exit_delay', default=16, help='max wait b4 %s' % handling_an_exit)
from torque.client import add_task
from model import Predicate
from predicates import get_predicates
from utils import do_nothing, json_encode, json_decode, generate_hash
### Streaming API Consumer
TWITTER_SENDS_KEEP_ALIVES_EVERY = 30 # seconds
class Consumer(Stream):
"""Extends the tweepy default with:
#. linear and exponential backoff that strictly follows the docs_
#. a unique ID that gets passed to two additional listener hooks:
``on_connect`` and ``on_exit``
.. _docs: http://apiwiki.twitter.com/Streaming-API-Documentation#Connecting
"""
def __init__(
self,
listener,
username=None,
password=None,
timeout=TWITTER_SENDS_KEEP_ALIVES_EVERY + 1,
retry_count=None,
min_tcp_ip_delay=0.25,
max_tcp_ip_delay=16,
min_http_delay=10,
max_http_delay=240,
buffer_size=1500
):
self.listener = listener
username = username and username or options.username
password = password and password or options.password
self.auth = BasicAuthHandler(username, password)
self.timeout = timeout
self.retry_count = retry_count
self.min_tcp_ip_delay = min_tcp_ip_delay
self.max_tcp_ip_delay = max_tcp_ip_delay
self.min_http_delay = min_http_delay
self.max_http_delay = max_http_delay
self.buffer_size = buffer_size
self.headers = {}
self.body = None
self.conn = None
self.running = False
self.id = generate_hash()
def _incr_tcp_ip_delay(self, delay):
"""When a network error (TCP/IP level) is encountered,
back off linearly.
"""
min_ = self.min_tcp_ip_delay
max_ = self.max_tcp_ip_delay
delay += min_
if delay > max_:
delay = max_
if delay == max_:
logging.warning('Consumer reached max tcp ip delay')
return delay
def _incr_http_delay(self, delay):
"""When an http error (> 200) is returned, back off
exponentially.
>>> d = _incr_http_delay(0)
>>> d
10
>>> d = _incr_http_delay(d)
>>> d
30
>>> d = _incr_http_delay(d)
>>> d
70
>>> d = _incr_http_delay(d)
>>> d
150
>>> d = _incr_http_delay(d)
>>> d
240
>>> d = _incr_http_delay(d)
>>> d
240
"""
min_ = self.min_http_delay
max_ = self.max_http_delay
delay = min_ + min_ * delay / 5
if delay > max_:
delay = max_
if delay == max_:
logging.warning('Consumer reached max http delay')
return delay
def _run(self):
# setup
self.auth.apply_auth(None, None, self.headers, None)
# enter loop
error_counter = 0
tcp_ip_delay = 0
http_delay = 0
while self.running:
if self.retry_count and error_counter > self.retry_count:
# quit if error count greater than retry count
break
try:
self.conn = httplib.HTTPConnection(self.host)
self.conn.connect()
self.conn.sock.settimeout(self.timeout)
self.conn.request('POST', self.url, self.body, headers=self.headers)
resp = self.conn.getresponse()
if resp.status != 200:
if self.listener.on_error(resp.status) is False:
break
self.conn.close()
error_counter += 1
http_delay = self._incr_http_delay(http_delay)
time.sleep(http_delay)
else:
if self.listener.on_connect(self.id) is False:
break
error_counter = 0
tcp_ip_delay = 0
http_delay = 0
self._read_loop(resp)
except (timeout, socket_error):
if self.listener.on_timeout() == False:
break
if self.running is False:
break
self.conn.close()
error_counter += 1
tcp_ip_delay = self._incr_tcp_ip_delay(tcp_ip_delay)
time.sleep(tcp_ip_delay)
except Exception, err:
logging.warning('Fatal exception in Consumer')
logging.warning(err, exc_info=True)
break
# cleanup
self.running = False
if self.conn:
self.conn.close()
# flag up
self.listener.on_exit(self.id)
def _read_loop(self, resp):
data = ''
while self.running:
if resp.isclosed():
break
# read length
length = ''
while True:
c = resp.read(1)
if c == '\n':
break
length += c
length = length.strip()
if length.isdigit():
length = int(length)
else:
continue
# read data and pass into listener
data = resp.read(length)
if self.listener.on_data(data) is False:
self.running = False
time.sleep(0.001)
class BaseManager(StreamListener):
"""Manages running consumer instances, ensuring that
we don't update the connection too often and that,
when we do, we use a `low latency restart`_.
Consumer's run in their own thread, providing notifications
``on_connect``, ``on_data``, ``on_exit``, etc.
.. _`low latency restart`_: http://bit.ly/idOra
"""
# we keep a dictionary of consumers, using the consumer.id
# as the dictionary key
consumers = {}
active_consumer_id = None
# ``threading.Timer`` objects
scheduled_exit_update = None
scheduled_predicate_update = None
# we keep a record of when we last re-started and of the
# number of updates pending, so that we can calculate
# when to restart the connection in response to a predicate
# change
last_update_timestamp = 0.0
updates_pending = 0
# we back off from repeated unexpected exits
exit_delay = 0
# when a predicate update is scheduled in less than two seconds,
# we ignore other requests, to avoid starting two consumers
# at the same time
predup_schd_in_less_than_two_secs = False
# ctl flags
is_stopping = False
accept_updates = True
def __init__(
self,
min_update_delay=None,
max_update_delay=None,
min_exit_delay=None,
max_exit_delay=None
):
"""How frequently do we want to update the consumer in response
to a change in filter predicates and how much should we back
off in response to unexpected errors?
"""
# no matter how many changes there have been,
# don't restart more than every...
self.min_update_delay = min_update_delay is None and \
options.min_update_delay or min_update_delay
# if there's been at least one change, update every...
self.max_update_delay = max_update_delay is None and \
options.max_update_delay or max_update_delay
# what are the minimum and maximum delays when consumers are erroring?
self.min_exit_delay = min_exit_delay is None and \
options.min_exit_delay or min_exit_delay
self.max_exit_delay = max_exit_delay is None and \
options.max_exit_delay or max_exit_delay
def _incr_exit_delay(self, delay):
"""When a network error (TCP/IP level) is encountered,
back off linearly.
"""
min_ = self.min_exit_delay
max_ = self.max_exit_delay
delay += min_
if delay > max_:
delay = max_
if delay == max_:
logging.warning('Manager reached max unexpected exit delay')
return delay
def _calculate_delay(self, n):
"""Lower the delay more for the first few changes than for the last.
>>> m = BaseManager(min_update_delay=120, max_update_delay=600)
>>> [m._calculate_delay(i) for i in range(0, 100, 10)]
[600, 440, 370, 320, 280, 240, 210, 180, 150, 120]
Or to put it pictorially:
|.
| .
| .
d | .
| .
| .
_|___________________________________________________________
| n
"""
# max_delay_in_minutes
maxm = self.max_update_delay / 60
mins = self.min_update_delay
d = int(60 - 60 * math.sqrt(n) / 12) * maxm
if d < mins:
d == mins
return d
def _get_update_delay(self):
"""The more changes pending, the lower the delay.
"""
# no delay the first time
if self.last_update_timestamp is None:
return 0
# otherwise calculate the delay
n = self.updates_pending
delay = self._calculate_delay(n)
# less the elapsed time
elapsed = time.time() - self.last_update_timestamp
delay = delay - elapsed
if delay < 0:
delay = 0
# return the number of seconds to delay an update for
return delay
def _update_consumer(self):
"""Actually start a new consumer.
"""
logging.info('starting a new consumer')
logging.info(time.time())
# store the timestamp
self.last_update_timestamp = time.time()
# flag that there are no updates pending
self.scheduled_exit_update = None
self.scheduled_predicate_update = None
self.updates_pending = 0
self.predup_schd_in_less_than_two_secs = False
# get the latest filter predicates
follow, track = self.get_predicates()
# create the new consumer
consumer = Consumer(listener=self)
# put it in self.consumers
self.consumers[consumer.id] = consumer
logging.info(self.consumers)
# block with the consumer's run loop
consumer.filter(follow=follow, track=track)
def _schedule_exit_update(self):
"""In response to an exit, we cancel any updates pending
and schedule a new update
"""
if self.scheduled_predicate_update is not None:
self.scheduled_predicate_update.cancel()
if self.scheduled_exit_update is not None:
self.scheduled_exit_update.cancel()
self.exit_delay = self._incr_exit_delay(self.exit_delay)
logging.info('scheduling exit update in %s seconds' % self.exit_delay)
self.scheduled_exit_update = Timer(self.exit_delay, self._update_consumer)
self.scheduled_exit_update.start()
def on_error(self, status_code):
"""An error code in the 400s means we're doing something
wrong that isn't going to get better if we keep pinging
the thing:
#. 401 Unauthorized
#. 404 Resource does not exist.
#. 406 Not Acceptable (means our filter predicates are wrong)
#. 413 Too Long
#. 416 Range Unacceptable
Whereas responses in the 500s are likely to be temporary
and are worth retrying:
#. 500 Server Internal Error
#. 503 Service Overloaded
"""
# we're doing something wrong
if 400 < status_code < 500:
logging.warning('consumer error: %s' % status_code)
return False
# Twitter's doing something wrong
return True
def on_connect(self, consumer_id):
"""Close any other active consumers immediately_.
.. _immediately: http://bit.ly/idOra
"""
logging.info('on_connect %s' % consumer_id)
logging.info(self.consumers)
# a successful connection clears the exit delay
self.exit_delay = 0
# the consumer is dead, long live the consumer
self.active_consumer_id = consumer_id
for id_, consumer in self.consumers.iteritems():
if consumer.running and not id_ == consumer_id:
consumer.disconnect()
def on_exit(self, consumer_id):
"""If exit wasn't scheduled, start again.
.. _immediately: http://bit.ly/idOra
"""
logging.info('on_exit %s' % consumer_id)
logging.info(self.consumers)
# remove it from the dict of consumers we're manitaining
del self.consumers[consumer_id]
# if it exited unexpectedly
if consumer_id == self.active_consumer_id:
logging.info('active consumer exited unexpectedly')
self._schedule_exit_update()
elif not self.is_stopping and not self.consumers:
logging.info('only consumer exited without a stop request')
self._schedule_exit_update()
def on_data(self, data):
"""Overwrite to do something_ with the data.
.. _something: http://bit.ly/2NDL32
"""
raise NotImplementedError
def get_predicates(self):
"""Overwrite to return either a ``list`` or ``None`` for
``follow_ids`` and ``track_terms``, e.g.:
follow_ids = None # [96574766, ...]
track_terms = None # ['bananas', ...]
return follow_ids, track_terms
"""
raise NotImplementedError
def update(self):
"""If there isn't an exit update scheduled, schedule
a predicate update at the earliest opportunity.
"""
logging.info('manager.update()')
if not self.accept_updates:
return
# exit updates take precedence
if self.scheduled_exit_update is not None:
logging.info('exit update took precedence')
return
# as do any updates scheduled with zero delay,
# until they've been completed
if self.predup_schd_in_less_than_two_secs:
logging.info('predicate update already scheduled in < 2s')
return
# cancel any previous updates
if self.scheduled_predicate_update is not None:
logging.info('canceling previous update')
self.scheduled_predicate_update.cancel()
# work out how long to wait
delay = self._get_update_delay()
logging.info('scheduling predicate update in %s seconds' % delay)
# safeguard against multiple requests at the crucial moment
if delay < 2:
logging.info('delay is less than 2s')
self.predup_schd_in_less_than_two_secs = True
# update the number of pending
self.updates_pending += 1
logging.info('updates_pending is now: %s' % self.updates_pending)
# schedule the update
self.scheduled_predicate_update = Timer(delay, self._update_consumer)
self.scheduled_predicate_update.start()
def stop(self, accept_updates=True):
"""Close the connection, which will trigger the end of the
line for the consumer.
"""
self.is_stopping = True
if not accept_updates:
self.accept_updates = False
# cancel any pending updates
if self.scheduled_exit_update is not None:
self.scheduled_exit_update.cancel()
if self.scheduled_predicate_update is not None:
self.scheduled_predicate_update.cancel()
# make sure no consumer thinks it's active
self.active_consumer_id = None
for consumer in self.consumers.itervalues():
consumer.disconnect()
class Manager(BaseManager):
"""Generate the filter predicates and handle the data.
"""
def get_predicates(self):
return get_predicates()
def on_data(self, data):
"""Queue each status to be processed asyncronously.
"""
add_task('/hooks/handle_status', params={'s': data})
manager = Manager()
### Web Hooks
class UpdateHandler(web.RequestHandler):
"""Tell the manager to (re)start with updated filter predicates.
"""
def post(self):
logging.info('/update')
manager.update()
class StopHandler(web.RequestHandler):
"""Tell the manager to stop consuming.
"""
def post(self):
logging.info('/stop')
manager.stop()
# url mapping
mapping = [(
r'/update\/?',
UpdateHandler,
), (
r'/stop\/?',
StopHandler
)
]
def main():
# hack around a tornado bug
tornado_options.enable_pretty_logging = do_nothing
tornado_options.parse_command_line()
# create the tornado web server
application = web.Application(mapping, debug=options.debug)
http_server = httpserver.HTTPServer(application)
http_server.bind(options.port)
# start the consumer running
manager.update()
try:
# start the http server running in one process
http_server.start(1)
# start the async ioloop
ioloop.IOLoop.instance().start()
except KeyboardInterrupt, err:
manager.stop()
raise err
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment