Created
September 10, 2017 13:35
-
-
Save planglois925/5df8473b49253a5f10ba7bdd196fc92f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tweepy | |
from tweepy.streaming import StreamListener | |
import os | |
from urlparse import urlparse | |
#TODO: alert if the API doesn't connect | |
def main(): | |
print "[] Twitter monitoring starting" | |
# these are the values associated with the both the application and consumer | |
ckey = "" | |
csecret = "" | |
atoken = "" | |
asecret = "" | |
# create the auth object leveraging the client key + client secret | |
auth = tweepy.OAuthHandler(ckey, csecret) | |
# set the application token + application secret to the new auth object | |
auth.set_access_token(atoken, asecret) | |
# now that we have our authentication set, we can connect to the API | |
api = tweepy.API(auth) | |
# the listener the API | |
listener = Listener(api) | |
# when our powers combine, twitter allows us to connect! | |
streamer = tweepy.Stream(auth=auth, listener=listener) | |
print '[] Twitter successfully connected' | |
# interim use of track, we'll make sure this value is pulled down from a file in the final version | |
track = ['test','awesome','hello','world', "xsssdsf123"] | |
streamer.filter(track=track) | |
# we need to extend the listener class from Tweepy and build upon it | |
class Listener(StreamListener): | |
def __init__(self, api=None): | |
# This method used to define what needs to be done | |
# before the class can be actually used | |
# In our case, we're making sure the api is set up | |
# and loading the values we need for our tests | |
self.api = api or tweepy.API() | |
self.domains = load_domains() | |
self.twitter_accounts = load_twitter_accounts() | |
# simple proof of concept | |
def on_status(self, status): | |
# This is the method the processes the statuses | |
# sent by twitter, in our case we want to check | |
# the statuses against our tests | |
if self.domain_test(status=status): | |
# This is the function we want to call in | |
# when we find a match :) | |
twitter_hit(status) | |
if self.user_mention(status=status): | |
# This is the function we want to call in | |
# when we find a match :) | |
twitter_hit(status) | |
# test to see if the domain in the mention is in their mentioned | |
def domain_test(self,status): | |
# begin by making an empty array that will store our TRUE/FALSE responses | |
results = [] | |
# first lets check to see if we got URLs in our Entities object of the status | |
if status.entities['urls']: | |
# In a status we'll want to check all the urls that might be there | |
# so we create a very quick for loop | |
for domain in status.entities['urls']: | |
# A check to see if there's anything in our expanded url | |
# Twitter automatically converts MOST [if not all] urls | |
# into their twitter url shorting service the 't.co' | |
# this is to save space on the actual tweet itself, | |
# however, the full expanded is still stored with the status. | |
# So that's where we'll grab it | |
if domain['expanded_url']: | |
# This is the meat of the script | |
# This part helps us determine if the expanded url is in our domain list | |
# It then adds a TRUE or FALSE to results | |
# If you want to test to see if this section is working, | |
# you can change this logic to just print the values, instead of storing them | |
# but caution, you'll be getting lots of hits depending on your track | |
results.append(str(urlparse(domain['expanded_url']).netloc).lower() in self.domains) | |
else: | |
pass | |
else: | |
pass | |
# We now return if there's any TRUE's in our array | |
return any(results) | |
def user_mention(self, status): | |
# Once again we create an empty array | |
results = [] | |
# Here we want to check the user mentions component of the status | |
# So the first step is to identify if it's empty or not | |
if status.entities['user_mentions']: | |
# Multiple users can be mentioned in a tweet, | |
# so we'll want to build a for-loop | |
for user in status.entities['user_mentions']: | |
# We lower our results like we lowered our input to compare them | |
# For our case, we're pulling out the value of screen_name, this is | |
# based on the assumption that you're tracking the screen names of the user's. | |
# Alternatively you could use their id, but you'd need to set up a process to get | |
# the ids first. | |
results.append(str(user['screen_name']).lower() in self.twitter_accounts) | |
else: | |
pass | |
# We want to return if there's any that true in our array | |
return any(results) | |
def twitter_hit(status): | |
# This is a super simple indicator that we found something, | |
# and the logic will be built up in the next one | |
print '[x] Hit found \n' | |
def load_domains(): | |
# start an empty array | |
domains = [] | |
# A little status information for us | |
print "[] Loading domains" | |
# We want our code to platform neutral, so use os.path.join | |
# to get to our data directory and extract out domains | |
file_location = os.path.join('data','domains.txt') | |
# First check to see if the file is actually there | |
if os.path.isfile(file_location): | |
# Open the file and ready each line | |
with open(file_location) as f: | |
for domain in f.readlines(): | |
# lets strip out newline characters + lower them | |
dom =domain.strip('\n').strip('\r').lower() | |
domains.append(str(dom)) | |
# At the end lets provide a status that | |
# tells us how many domains we got | |
print "[x] %s Domains Loaded" % len(domains) | |
else: | |
print '][ Failed to load domains, File not found' | |
# return to new array of domains | |
return domains | |
def load_twitter_accounts(): | |
twitter_accounts = [] | |
print "[] Loading twitter accounts (the good ones)" | |
file_location = os.path.join('data','twitter.txt') | |
if os.path.isfile(file_location): | |
with open(file_location) as f: | |
for account in f.readlines(): | |
acc = account.strip('\n').strip('\r').lower() | |
twitter_accounts.append(str(acc)) | |
print "[x] %s Accounts Loaded" % len(twitter_accounts) | |
else: | |
print "][ Failed to load accounts: File not found" | |
return twitter_accounts | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment