Skip to content

Instantly share code, notes, and snippets.

@steveatinfincia
Created May 21, 2012 14:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steveatinfincia/2762699 to your computer and use it in GitHub Desktop.
Save steveatinfincia/2762699 to your computer and use it in GitHub Desktop.
Dearlytweeted's streaming filter, now with a config file for oauth credentials
import urllib
import json
import re
import sys
import os
import datetime
from time import sleep
import sys
import threading
import logging
import tweepy
import ConfigParser
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s',)
newpath = "/opt/dearlytweeted/releases"
if newpath not in sys.path:
sys.path.append(newpath)
# need to grab the config before trying anything else and verify it, config changes depending on where the code is being deployed
# so this has to make sure to fail early
"""
Format for the config file is as follows:
[OAuth]
consumer_key = REPLACE
consumer_secret = REPLACE
access_token = REPLACE
access_token_secret = REPLACE
"""
config = ConfigParser.ConfigParser()
config.read('/opt/dearlytweeted/oauth.cfg')
consumer_key = config.get('OAuth', 'consumer_key')
consumer_secret = config.get('OAuth', 'consumer_secret')
access_token = config.get('OAuth', 'access_token')
access_token_secret = config.get('OAuth', 'access_token_secret')
if len(consumer_key) <= 0 or len(consumer_secret) <= 0 or len(access_token) <= 0 or len(access_token_secret) <= 0:
logging.debug("Config data not available, exiting so we don't hammer twitter with auths")
sys.exit()
else:
pass # we dont actually need to log these and its a bad idea, but during dev its an easy way to make sure config is there
#logging.debug("CK: %s" % consumer_key)
#logging.debug("CS: %s" % consumer_secret)
#logging.debug("AT: %s" % access_token)
#logging.debug("AS: %s" % access_token_secret)
from dearlytweeted.database import Database
dearlytweeted_db = Database()
# global variable to store the last time a message was received by the twitter API
last_message_timestamp = datetime.datetime.now()
# everything related to dealing with twitter is inside this class
class DTStream(tweepy.StreamListener):
def on_status(self, status):
try:
# grab a lock so we dont stomp all over the timeout thread
tlock = threading.Lock()
# use the lock as a context manager, automatically handles acquiring and releasing the lock
with tlock:
global last_message_timestamp
last_message_timestamp = datetime.datetime.now()
# check for links and simply return if we find one, avoids tons of spam and useless tweets
if "http://" in status.text or "https://" in status.text:
return
# find out if the tweet data actually matches the format, if not just ignore it
if re.match('\ADear\s+[A-Za-z0-9\-_]+,',status.text,flags=re.IGNORECASE):
#logging.debug("Message: " + str(datetime.datetime.now()))
# since the tweet matched we can pull the subject out
subject = re.match('\ADear\s+([A-Za-z0-9\-_]+),',status.text,flags=re.IGNORECASE).group(1)
# pass the tweet back to the database along with the subject
dearlytweeted_db.save_tweet(status,subject)
else:
# no match, no reason to do anything at all with this tweet
pass
#logging.debug("No match: " + str(datetime.datetime.now()))
except Exception as e:
# log the exception so we can spot problems if they come up in the future
logging.debug("Exception during stream parsing: %s" % e.__str__())
return
def on_delete(self, status_id, user_id):
"""Called when a delete notice arrives for a status"""
return
def on_limit(self, track):
logging.debug("Limit notification returned by twitter: %s" % str(track) )
return
def on_error(self, status_code):
logging.debug("Error returned by twitter: %s" % str(status_code) )
return False
def on_timeout(self):
logging.debug("Stream exiting at %s" % str( datetime.datetime.now()) )
sys.exit()
return
# background thread loop that does nothing but check the last message timestamp, if
# its been more than 30 seconds it means the twitter heartbeat signal (or a tweet) hasnt arrived recently
# which means the connection probably dropped for any of 10+ legitimate reasons that arent errors but
# must be handled by exiting and letting supervisord restart this process.
def timeout_check():
logging.debug("Timeout thread running")
while True:
current_time = datetime.datetime.now()
# grab the lock
tlock = threading.Lock()
# context manager wraps the whole thing here, acquires the lock and gets rid of it after its through
with tlock:
elapsed = current_time - last_message_timestamp
if elapsed > datetime.timedelta(seconds=90):
logging.debug("Longer than 30 seconds since last twitter message, exiting")
sys.exit()
else:
#logging.debug("< 30 seconds since last twitter keepalive, continuing")
pass
sleep(5)
return
# this point should never be hit until the variables used here are actually available, but beware :)
tauth = tweepy.OAuthHandler(consumer_key, consumer_secret)
tauth.set_access_token(access_token, access_token_secret)
api = tweepy.API(tauth)
# log the beginning of the stream and start it
logging.debug("Starting request stream with %s at %s" % ( api.me().name, str(datetime.datetime.now()) ) )
stream = DTStream()
streamer = tweepy.Stream(auth=tauth, listener=stream, timeout=None )
filter_terms = ['dear']
streamer.filter(None,filter_terms)
# create a new timeout checker thread that will run in the background and kill this process if twitter stops heartbeat'ing
t = threading.Thread(target=timeout_check)
t.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment