Skip to content

Instantly share code, notes, and snippets.

@justinatomatic
Last active April 8, 2016 00:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinatomatic/def5550ffcfe1736c817360180d75ab5 to your computer and use it in GitHub Desktop.
Save justinatomatic/def5550ffcfe1736c817360180d75ab5 to your computer and use it in GitHub Desktop.
from TwitterAPI import TwitterAPI
from webpurify import *
import re
import unicodedata
import time
import datetime
import socket
import mosquitto
'''
Query Twitter, sanatise results and publish to MQTT
Dependencies
TwitterAPI pip install TwitterAPI
webpurify local module file
mosquitto
TODO
Logging
'''
TRACK_TERM = '#SyPy,#myotherhashtagisaporsche'
CONSUMER_KEY = '****'
CONSUMER_SECRET = '****'
ACCESS_TOKEN_KEY = '****'
ACCESS_TOKEN_SECRET = '****'
web_purifier = WebPurify('****')
# --- MQTT Callbacks ---
def on_connect(mosq, obj, rc):
if rc == 0:
print("Connected successfully.")
else:
print("Connected failed.")
def on_disconnect(mosq, obj, rc):
print("Disconnected successfully.")
def on_publish(mosq, obj, mid):
print("Message "+str(mid)+" published.")
def on_message(mosq, obj, msg):
print("Message "+str(msg)+" recived.")
def on_subscribe(mosq, obj, mid, qos_list):
print("Subscribe with mid "+str(mid)+" received.")
def connectToBroker():
client.will_set('SyPy/clients', payload="twitter Stream Fail", qos=0)
try:
if not client.connect("localhost"):
print "Connected to Broker"
except socket.error, (value,message):
print "Could not open socket: " + message
time.sleep(5)
client = mosquitto.Mosquitto("SyPy_TweetStream")
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish
client.on_message = on_message
client.on_subscribe = on_subscribe
api = TwitterAPI(
CONSUMER_KEY,
CONSUMER_SECRET,
ACCESS_TOKEN_KEY,
ACCESS_TOKEN_SECRET)
r = api.request('statuses/filter', {'track': TRACK_TERM})
for item in r:
if ('text' in item):
ascii_string = unicodedata.normalize('NFKD', item['text']).encode('ascii','ignore')
count = count + 1
print(ascii_string)
Striped_String = re.sub(r'\w+:\/{2}[\d\w-]+(\.[\d\w-]+)*(?:(?:\/[^\s/]*))*', '', item['text'])
Striped_String = re.sub(r'#([^\\\n\r\s]*)','', Striped_String) # Strip Hastag
Striped_String = re.sub(r'@([^\\ ]*)','', Striped_String) # Strip @ user
Striped_String = re.sub(r'\r?\n|\r',' ', Striped_String) # Remove CR LF
Striped_String = re.sub(r'&','&', Striped_String) # Replace ampersand
Striped_String = re.sub(r'RT ','', Striped_String) # Strip retweets
if (Striped_String != ""):
WebPurifyData = web_purifier.check(Striped_String)
print "Message Purity " + str(WebPurifyData)
tweetData = {}
tweetData['id'] = item['id']
tweetData['time'] = item['created_at']
tweetData['tweet'] = ascii_string
tweetData['type'] = "tweet"
tweetData['replyto'] = item['user']['screen_name']
jsonstring = json.dumps(tweetData)
print jsonstring
if WebPurifyData["found"] == 0:
print("Send Tweet to Queue")
connectToBroker()
client.publish("SyPy/TweetStream",jsonstring)
else:
print("Un pure!!!")
connectToBroker()
client.publish("SyPy/droppedTweets",jsonstring)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment