Skip to content

Instantly share code, notes, and snippets.

@georgewhewell
Created March 3, 2013 21:50
Show Gist options
  • Save georgewhewell/5078471 to your computer and use it in GitHub Desktop.
Save georgewhewell/5078471 to your computer and use it in GitHub Desktop.
import time, tweepy, pusher
# Twitter conf
CONSUMER_KEY = ''
CONSUMER_SECRET = ''
ACCESS_TOKEN = ''
ACCESS_TOKEN_SECRET = ''
# pusher conf
pusher.app_id = ''
pusher.key = ''
pusher.secret = ''
pusher_channel = 'bynd-includes-demo'
pusher_event = 'bieber'
p = pusher.Pusher()
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args, **kargs):
elapsed = time.time() - lastTimeCalled[0]
if elapsed > minInterval:
lastTimeCalled[0] = time.time()
return func(*args, **kargs)
return rateLimitedFunction
return decorate
@RateLimited(1) # 1 per second at most
def PushTweet(text):
p[pusher_channel].trigger(pusher_event, text)
while(True):
try:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
class PushBieber(tweepy.StreamListener):
def on_status(self, status):
PushTweet(status.text)
streaming_api = tweepy.streaming.Stream(auth, PushBieber(), timeout=60)
streaming_api.filter(track=['bieber'])
except:
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment