Skip to content

Instantly share code, notes, and snippets.

@gersande
Created August 15, 2013 18:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gersande/6243305 to your computer and use it in GitHub Desktop.
Save gersande/6243305 to your computer and use it in GitHub Desktop.
Broken bot supposed to use python-twitter and redis but it ain't working because blarg Stack overflow post still unanswered, will eventually get around to figuring out why my os hates redis http://stackoverflow.com/questions/18224929/python-twitter-bot-code-is-not-working-redis-server-port?noredirect=1#comment26717815_18224929
#!/usr/bin/env python
import sys
import pprint
from time import sleep
import twitter
import redis
API_CREDENTIALS = {
'consumer_key': '',
'consumer_secret': '',
'access_token_key': '',
'access_token_secret': '',
}
TWITTERBOT_KEYWORDS = ('Gersande', 'Gersande La Fleche')
TWITTERBOT_LIST = "gersande_list_twitbot"
BLACKLISTED_USERS = ['gersandelf']
RESPONSE = """pas de reponses pour l'instant"""
def auth_to_twitter(api_credentials):
"""Returns an authenticated twitter session object"""
api = twitter.Api(consumer_key=API_CREDENTIALS['consumer_key'],
consumer_secret=API_CREDENTIALS['consumer_secret'],
access_token_key=API_CREDENTIALS['access_token_key'],
access_token_secret=API_CREDENTIALS['access_token_secret'])
if (api.VerifyCredentials() is not None):
return api
return
def get_since_id(redis, key):
""""""
# Fetching last value in keyword redis place
since_id = redis.get(key + ":last_since_id")
return since_id if since_id else None
def update_since_id(redis, key):
""""""
stored_since_id = redis.get(key + ":last_since_id")
try:
last_tweet_id = str(redis.lrange(key, 0, 0)[0])
except IndexError:
last_tweet_id = None
if last_tweet_id and (last_tweet_id != stored_since_id):
redis.set(key + ":last_since_id", last_tweet_id)
return True
def update_search_stack(api_session, tweet_stack, keyword):
"""Searches for a specific term on twitter public timeline"""
# Storing last fetched id in order to make fewer requests
since_id = get_since_id(redis, "%s:%s" % (TWITTERBOT_LIST, keyword))
search_tweet = api_session.GetSearch(term=keyword, since_id=since_id)
for t in search_tweet:
computed_tweet = {
"keyword": keyword,
"username": t.user.screen_name,
"created_at": t.created_at,
"text": t.text,
}
sys.stdout.write("adding tweet with id %s by user %s to database\n" % (str(t.id), str(t.user.screen_name)))
if (computed_tweet["username"] not in BLACKLISTED_USERS):
redis.rpush((LOLCOIFFEURS_LIST + ":%s" % (keyword)), t.id)
redis.hmset("%s:tweet:%s" % (TWITTERBOT_LIST, t.id), computed_tweet)
print "Last since id en vigueur pour ce mot cle : %s" % since_id
return
def tweet_and_shout(api_session, redis, key, timeout=600):
""""""
for tweet_id in redis.lrange("%s:%s" % (TWITTERBOT_LIST, key), 0, -1):
tweet_dict = redis.hgetall("%s:tweet:%s" % (TWITTERBOT_LIST, tweet_id))
# Tracking answered tweets in a brand new set, and posting
# a reply to it
print "replying tweet : %s" % (tweet_id)
redis.sadd((TWITTERBOT_LIST + ":%s:answered" % (key)), tweet_id)
# api_session.PostUpdate("@%s %s" % (tweet_dict["username"], RESPONSE), in_reply_to_status_id=tweet_id)
# Popping out element from the left of the list
# as we answer it
redis.rpop("%s:%s" % (TWITTERBOT_LIST, key))
# Wait timeout before replying again
sleep(timeout)
return
def run_bot(api_session, redis, keywords):
""""""
while 42:
for key in keywords:
update_search_stack(api_session, redis, key)
update_since_id(redis, "%s:%s" % (TWITTERBOT_LIST, key))
tweet_and_shout(api_session, redis, key, timeout=1)
print "Waiting for one minute\n\n\n"
sleep(60)
if __name__ == "__main__":
twitter_session = auth_to_twitter(API_CREDENTIALS)
redis = redis.StrictRedis(host='localhost', port=6379, db=0)
run_bot(twitter_session, redis, TWITTERBOT_KEYWORDS)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment