Skip to content

Instantly share code, notes, and snippets.

@impshum
Created February 8, 2018 21:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save impshum/13de240b64f1d1d4d412c8af7a6d2d7c to your computer and use it in GitHub Desktop.
Save impshum/13de240b64f1d1d4d412c8af7a6d2d7c to your computer and use it in GitHub Desktop.
Tweepy Stream Hashtag
import time
import tweepy
import json
import sys
from halo import Halo
target_tag = '#wood'
consumer_key = 'XXXX'
consumer_secret = 'XXXX'
access_key = 'XXXX-XXXX'
access_secret = 'XXXX'
timer = 30
message = 'I like wood!'
class Colour:
Green, Red, Purple, White, Yellow = '\033[92m', '\033[91m', '\033[95m', '\033[0m', '\033[93m'
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
class PrintListener(tweepy.StreamListener):
def on_data(self, data):
tweet_text = json.loads(data)
#print(data)
try:
tweet = tweet_text['text']
user = tweet_text['user']['screen_name']
tweet_id = tweet_text['id']
if not tweet_text['retweeted'] and 'RT' not in tweet:
if target_tag in tweet:
print(Colour.Green + tweet)
m = '@{} {}'.format(user, message)
api.update_status(status=m, in_reply_to_status_id=tweet_id)
except Exception as e:
print(Colour.Red + str(e))
def on_error(self, status):
print(status)
sys.exit()
if __name__ == '__main__':
try:
print(Colour.Purple + '\nLooking for ' + target_tag + '\n')
spinner = Halo(text=' ', spinner='dots')
spinner.start()
listener = PrintListener()
stream = tweepy.Stream(auth, listener)
stream.filter(track=[target_tag])
except KeyboardInterrupt:
print('\nExiting\n')
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment