Skip to content

Instantly share code, notes, and snippets.

@adamoudad
Created January 29, 2019 07:06
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adamoudad/85a2f1d103df1fd8f731c975edb7b0ca to your computer and use it in GitHub Desktop.
Save adamoudad/85a2f1d103df1fd8f731c975edb7b0ca to your computer and use it in GitHub Desktop.
import sys
from tweepy import OAuthHandler
from tweepy import API
from tweepy import Stream
from tweepy.streaming import StreamListener
# Replace the "None"s by your own credentials
ACCESS_TOKEN = None
ACCESS_TOKEN_SECRET = None
CONSUMER_KEY = None
CONSUMER_SECRET = None
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = API(auth, wait_on_rate_limit=True,
wait_on_rate_limit_notify=True)
class Listener(StreamListener):
def __init__(self, output_file=sys.stdout):
super(Listener,self).__init__()
self.output_file = output_file
def on_status(self, status):
print(status.text, file=self.output_file)
def on_error(self, status_code):
print(status_code)
return False
output = open('stream_output.txt', 'w')
listener = Listener(output_file=output)
stream = Stream(auth=api.auth, listener=listener)
try:
print('Start streaming.')
stream.sample(languages=['en'])
except KeyboardInterrupt:
print("Stopped.")
finally:
print('Done.')
stream.disconnect()
output.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment