Last active
June 15, 2016 17:21
-
-
Save Lopi/c3c52e885d425a3b98803b2650543d5c to your computer and use it in GitHub Desktop.
Simple python script to parse ip addresses from da_667 threat-tracking list timeline tweets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import tweepy | |
import re | |
# Twitter API keys and tokens | |
consumer_key = "" | |
consumer_secret = "" | |
access_token = "" | |
access_token_secret = "" | |
# Setup authentication | |
auth = tweepy.OAuthHandler(consumer_key, consumer_secret) | |
auth.set_access_token(access_token, access_token_secret) | |
# Authenticate to the twitter API | |
api = tweepy.API(auth) | |
# Grab da_667's threat-tracking twitter list timeline and store the tweets in a list | |
tweets = [tweet.text for tweet in tweepy.Cursor(api.list_timeline, 'da_667', 'threat-tracking').items()] | |
# ip address regular expression to parse out ip addresses | |
regex = re.compile(r'(((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))') | |
# Apply the regex to our list of tweets | |
ip_list = [m.group(0) for t in tweets for m in [regex.search(t)] if m] | |
# Write the ip list from the list timeline to a text file | |
with open('da_667_threat_tracking_ip_list.txt', 'w') as f: | |
for ip in ip_list: | |
f.write('%s\n' % ip) | |
# Custom stream monitor for da_667 list | |
class daStreamer(tweepy.StreamListener): | |
def on_data(self, data): | |
# Format the data as json | |
all_data = json.loads(data) | |
# Pull out the text of the tweet | |
tweet = all_data['text'] | |
# Apply the regex to current tweet | |
match = regex.search(tweet) | |
# Write to ip to file if the regex matches | |
if match: | |
with open('da_667_threat_tracking_ip_list.txt', 'a') as f: | |
f.write('%s\n' % match.group(0)) | |
print(match.group(0)) | |
def on_error(self, status): | |
print status | |
# Grab the members of the da_667 list | |
members = api.list_members(owner_screen_name='da_667', slug='threat-tracking') | |
# Store the member id numbers in a list | |
member_ids = [m.id_str for m in members] | |
# Start the stream | |
stream = tweepy.Stream(auth, daStreamer()) | |
# Stream the members of the list | |
stream.filter(follow=member_ids) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment