Skip to content

Instantly share code, notes, and snippets.

@djdunc
Created October 20, 2022 16:29
Show Gist options
  • Save djdunc/816373cca73e6a7980936052d673391c to your computer and use it in GitHub Desktop.
Save djdunc/816373cca73e6a7980936052d673391c to your computer and use it in GitHub Desktop.
script to run on RPi robot in lab to listen for tweets mentioning the lab and send them to the lab mqtt stream
# script to run on RPi robot in lab to listen for tweets mentioning the lab and
# send them to the lab mqtt stream
# D.Wilson Oct 2022
## settings.py file also needed in the same folder with these details:
# API_KEY=""
# API_SECRET=""
# ACCESS_TOKEN=""
# ACCESS_TOKEN_SECRET=""
# MQTTUSER=""
# MQTTPASS=""
import tweepy
import paho.mqtt.client as mqtt
from settings import * # Passwords in seperate file
client = mqtt.Client("CASAUCLTwitterBot") # create client object
client.username_pw_set(username=MQTTUSER, password=MQTTPASS)
print("Started. Waiting for messages.")
# Subclass Stream to print IDs of Tweets received
class TweetPrinter(tweepy.Stream):
def on_status(self, status):
print("User.screen_name: " + status.user.screen_name)
print("Status text: " + status.text)
print("Status id: " + status.id_str)
try:
print("publishing to mqtt")
print("Connecting...")
client.connect("mqtt.cetools.org", port=1884) #establish connection
print("Connected")
client.publish("personal/ucjtdjw/twitterbot/CASAUCL/id", str(status.id_str))
client.publish("personal/ucjtdjw/twitterbot/CASAUCL/name", str(status.user.screen_name))
client.publish("personal/ucjtdjw/twitterbot/CASAUCL/text", str(status.text))
except:
print("no connection")
# Initialize instance of the subclass
printer = TweetPrinter(API_KEY, API_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
# Filter realtime Tweets by keyword
printer.filter(track=["@CASAUCL","@djdunc","#CE_Lab","#CASAmonkey"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment