Skip to content

Instantly share code, notes, and snippets.

@justinvw
Created June 12, 2013 11:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinvw/5764625 to your computer and use it in GitHub Desktop.
Save justinvw/5764625 to your computer and use it in GitHub Desktop.
"""
Dead simple classes to consume the Twitter Streaming API
Last updated: 2013-06-12
:copyright: (c) 2013 by Justin van Wees
"""
import requests
import json
from requests_oauthlib import OAuth1
class BaseStream(object):
def __init__(self, consumer_key, consumer_secret, access_token,
access_token_secret, url, post_data, timeout=90,
parse_json=True):
self.url = url
self.auth = OAuth1(consumer_key, consumer_secret, access_token,
access_token_secret)
self.post_data = post_data
self.parse_json = parse_json
self.timeout = timeout
def connect(self):
self.connection = requests.post(self.url, data=self.post_data,
auth=self.auth, stream=True,
timeout=self.timeout)
def stream(self):
self.connect()
for line in self.connection.iter_lines():
if not line:
continue
if self.parse_json:
tweet = json.loads(line)
else:
tweet = line
yield tweet
class FilterStream(BaseStream):
def __init__(self, consumer_key, consumer_secret, access_token,
access_token_secret, track=None, follow=None, timeout=90,
parse_json=True):
url = 'https://stream.twitter.com/1.1/statuses/filter.json'
self.track = track
self.follow = follow
post_data = self.format_params()
BaseStream.__init__(self, consumer_key, consumer_secret, access_token,
access_token_secret, url, post_data,
timeout=timeout, parse_json=parse_json)
def format_params(self):
params = {}
if self.track:
params['track'] = ','.join(self.track)
if self.follow:
params['follow'] = ','.join(self.follow)
return params
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment