Skip to content

Instantly share code, notes, and snippets.

@hiromu
Created October 7, 2012 12:07
Show Gist options
  • Save hiromu/3848178 to your computer and use it in GitHub Desktop.
Save hiromu/3848178 to your computer and use it in GitHub Desktop.
Test for using Twitter Streaming API
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import tweepy
import json
import pymongo
db_host = 'localhost'
db_port = 27017
consumer_token = ''
consumer_secret = ''
access_token = ''
access_secret = ''
class CustomStreamListener(tweepy.StreamListener):
def __init__(self):
connection = pymongo.Connection(db_host, db_port)
self.db = connection.death
def on_data(self, data):
print data
if data.startswith("{"):
p = json.loads(data)
if u'text' in p:
for i in xrange(6):
p[[u'day', u'month', u'dait', u'time', u'zone', u'year'][i]] = p[u'created_at'].split()[i]
if self.db.tweets.find({u'id': p[u'id']}).count() == 0:
self.db.tweets.insert(p)
def on_error(self, status_code):
print >> sys.stderr, 'Encounted Exception with status code:', status_code
return True
def on_timeout(self):
print >> sys.stderr, 'Timeout...'
return True
if __name__ == '__main__':
auth = tweepy.OAuthHandler(consumer_token, consumer_secret)
auth.set_access_token(access_token, access_secret)
streaming_api = tweepy.Stream(auth, CustomStreamListener(), timeout=None)
streaming_api.filter(follow = None, track = ['突然の', '人人人'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment