Skip to content

Instantly share code, notes, and snippets.

@steveatinfincia
Created May 16, 2012 17:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save steveatinfincia/2712239 to your computer and use it in GitHub Desktop.
Save steveatinfincia/2712239 to your computer and use it in GitHub Desktop.
Dearly Tweeted's stream filter
import pycurl
import urllib
import json
import re
import sys
import datetime
from time import sleep
import sys
import threading
import logging
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s',)
newpath = "/opt/dearlytweeted/releases"
if newpath not in sys.path:
sys.path.append(newpath)
from dearlytweeted.database import Database
STREAM_URL = "https://stream.twitter.com/1/statuses/filter.json"
USER = "USER_HERE"
PASS = "PASSWORD_HERE"
# global variable to store the last time a message was received by the twitter API
last_message_timestamp = datetime.datetime.now()
# background thread loop that does nothing but check the last message timestamp, if
# its been more than 30 seconds it means the twitter heartbeat signal (or a tweet) hasnt arrived recently
# which means the connection probably dropped for any of 10+ legitimate reasons that arent errors but
# must be handled by exiting and letting supervisord restart this process.
def timeout_check():
logging.debug("Timeout thread running")
while True:
current_time = datetime.datetime.now()
elapsed = current_time - last_message_timestamp
if elapsed > datetime.timedelta(seconds=30):
logging.debug("Longer than 30 seconds since last twitter message, exiting")
sys.exit()
else:
logging.debug("< 30 seconds since last twitter keepalive, continuing")
pass
sleep(5)
return
# everything related to dealing with twitter is inside this class
class Stream:
def __init__(self):
# setup our filter, all we need to do is POST the word "dear" to the streaming filter API
filter = [ ('track',"dear") ]
post_data = urllib.urlencode(filter)
# a new database handler (same class the web workers use)
self.database = Database()
# empty buffer to store potentially incomplete returned bytes
self.buffer = ""
self.conn = pycurl.Curl()
self.conn.setopt(pycurl.USERPWD, "%s:%s" % (USER, PASS))
self.conn.setopt(pycurl.URL, STREAM_URL)
self.conn.setopt(pycurl.POSTFIELDS, post_data)
self.conn.setopt(pycurl.POST, 1)
self.conn.setopt(pycurl.WRITEFUNCTION, self.on_receive)
self.conn.perform()
def on_receive(self, data):
global last_message_timestamp
last_message_timestamp = datetime.datetime.now()
self.buffer += data
if data.endswith("\r\n") and self.buffer.strip():
try:
tweet = json.loads(self.buffer)
self.buffer = ""
if "text" in tweet:
text = tweet['text']
if "http://" in text or "https://" in text:
return
if re.match('^Dear\s+[A-Za-z0-9]+,',text,flags=re.IGNORECASE):
logging.debug("Message: " + str(datetime.datetime.now()))
subject = re.match('^Dear\s+([A-Za-z0-9]+),',text,flags=re.IGNORECASE).group(1)
self.database.save_tweet(tweet,subject)
else:
logging.debug("No match: " + str(datetime.datetime.now()))
except Exception as e:
self.buffer = ""
logging.debug("Exception during stream parsing: %s" % e.__str__())
t = threading.Thread(target=timeout_check)
t.start()
logging.debug("Starting request stream")
stream = Stream()
logging.debug("Request stream ended")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment