Skip to content

Instantly share code, notes, and snippets.

@steveatinfincia
Created May 20, 2012 23:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steveatinfincia/2759887 to your computer and use it in GitHub Desktop.
Save steveatinfincia/2759887 to your computer and use it in GitHub Desktop.
Dearlytweeted's streaming filter, updated to use tweepy and an actual reason to have a class :)
import urllib
import json
import re
import sys
import datetime
from time import sleep
import sys
import threading
import logging
import tweepy
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s',)
newpath = "/opt/dearlytweeted/releases"
if newpath not in sys.path:
sys.path.append(newpath)
from dearlytweeted.database import Database
dearlytweeted_db = Database()
# global variable to store the last time a message was received by the twitter API
last_message_timestamp = datetime.datetime.now()
# everything related to dealing with twitter is inside this class
class DTStream(tweepy.StreamListener):
def on_status(self, status):
try:
# grab a lock so we dont stomp all over the timeout thread
tlock = threading.Lock()
# use the lock as a context manager, automatically handles acquiring and releasing the lock
with tlock:
global last_message_timestamp
last_message_timestamp = datetime.datetime.now()
# check for links and simply return if we find one, avoids tons of spam and useless tweets
if "http://" in status.text or "https://" in status.text:
return
# find out if the tweet data actually matches the format, if not just ignore it
if re.match('\ADear\s+[A-Za-z0-9\-_]+,',status.text,flags=re.IGNORECASE):
#logging.debug("Message: " + str(datetime.datetime.now()))
# since the tweet matched we can pull the subject out
subject = re.match('\ADear\s+([A-Za-z0-9\-_]+),',status.text,flags=re.IGNORECASE).group(1)
# pass the tweet back to the database along with the subject
dearlytweeted_db.save_tweet(status,subject)
else:
# no match, no reason to do anything at all with this tweet
pass
#logging.debug("No match: " + str(datetime.datetime.now()))
except Exception as e:
# log the exception so we can spot problems if they come up in the future
logging.debug("Exception during stream parsing: %s" % e.__str__())
return
def on_delete(self, status_id, user_id):
"""Called when a delete notice arrives for a status"""
return
def on_limit(self, track):
logging.debug("Limit notification returned by twitter: %s" % str(track) )
return
def on_error(self, status_code):
logging.debug("Error returned by twitter: %s" % str(status_code) )
return False
def on_timeout(self):
logging.debug("Stream exiting at %s" % str( datetime.datetime.now()) )
sys.exit()
return
# background thread loop that does nothing but check the last message timestamp, if
# its been more than 30 seconds it means the twitter heartbeat signal (or a tweet) hasnt arrived recently
# which means the connection probably dropped for any of 10+ legitimate reasons that arent errors but
# must be handled by exiting and letting supervisord restart this process.
def timeout_check():
logging.debug("Timeout thread running")
while True:
current_time = datetime.datetime.now()
# grab the lock
tlock = threading.Lock()
# context manager wraps the whole thing here, acquires the lock and gets rid of it after its through
with tlock:
elapsed = current_time - last_message_timestamp
if elapsed > datetime.timedelta(seconds=90):
logging.debug("Longer than 30 seconds since last twitter message, exiting")
sys.exit()
else:
#logging.debug("< 30 seconds since last twitter keepalive, continuing")
pass
sleep(5)
return
# == OAuth Authentication ==
#
# This mode of authentication is the new preferred way
# of authenticating with Twitter.
# The consumer keys can be found on your application's Details
# page located at https://dev.twitter.com/apps (under "OAuth settings")
consumer_key="REPLACE"
consumer_secret="REPLACE"
# The access tokens can be found on your applications's Details
# page located at https://dev.twitter.com/apps (located
# under "Your access token")
access_token="REPLACE"
access_token_secret="REPLACE"
tauth = tweepy.OAuthHandler(consumer_key, consumer_secret)
tauth.set_access_token(access_token, access_token_secret)
api = tweepy.API(tauth)
# log the beginning of the stream and start it
logging.debug("Starting request stream with %s at %s" % ( api.me().name, str(datetime.datetime.now()) ) )
stream = DTStream()
streamer = tweepy.Stream(auth=tauth, listener=stream, timeout=None )
filter_terms = ['dear']
streamer.filter(None,filter_terms)
# create a new timeout checker thread that will run in the background and kill this process if twitter stops heartbeat'ing
t = threading.Thread(target=timeout_check)
t.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment