Skip to content

Instantly share code, notes, and snippets.

@swinton
Created September 12, 2012 20:26
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save swinton/3709635 to your computer and use it in GitHub Desktop.
Save swinton/3709635 to your computer and use it in GitHub Desktop.
Example of connecting to the Twitter user stream using Requests.
# OAuth stuff...
ACCESS_TOKEN = "YOUR_ACCESS_TOKEN"
ACCESS_TOKEN_SECRET = "YOUR_ACCESS_TOKEN_SECRET"
CONSUMER_KEY = "YOUR_CONSUMER_KEY"
CONSUMER_SECRET = "YOUR_CONSUMER_SECRET"
#!/usr/bin/env python
"""
Example of connecting to the Twitter user stream using Requests.
"""
import sys
import json
import requests
from oauth_hook import OAuthHook
def userstream(access_token, access_token_secret, consumer_key, consumer_secret):
oauth_hook = OAuthHook(access_token=access_token, access_token_secret=access_token_secret,
consumer_key=consumer_key, consumer_secret=consumer_secret,
header_auth=True)
hooks = dict(pre_request=oauth_hook)
config = dict(verbose=sys.stderr)
client = requests.session(hooks=hooks, config=config)
data = dict(delimited="length")
r = client.post("https://userstream.twitter.com/2/user.json", data=data, prefetch=False)
# TODO detect disconnection somehow
# https://github.com/kennethreitz/requests/pull/200/files#L13R169
# Use a timeout? http://pguides.net/python-tutorial/python-timeout-a-function/
for chunk in r.iter_lines(chunk_size=1):
if chunk and not chunk.isdigit():
yield json.loads(chunk)
if __name__ == "__main__":
import pprint
import settings
for obj in userstream(access_token=settings.ACCESS_TOKEN, access_token_secret=settings.ACCESS_TOKEN_SECRET, consumer_key=settings.CONSUMER_KEY, consumer_secret=settings.CONSUMER_SECRET):
pprint.pprint(obj)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment