Skip to content

Instantly share code, notes, and snippets.

@Lopi
Last active June 15, 2016 17:21
Show Gist options
  • Save Lopi/c3c52e885d425a3b98803b2650543d5c to your computer and use it in GitHub Desktop.
Save Lopi/c3c52e885d425a3b98803b2650543d5c to your computer and use it in GitHub Desktop.
Simple python script to parse ip addresses from da_667 threat-tracking list timeline tweets
#!/usr/bin/env python
import tweepy
import re
# Twitter API keys and tokens
consumer_key = ""
consumer_secret = ""
access_token = ""
access_token_secret = ""
# Setup authentication
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# Authenticate to the twitter API
api = tweepy.API(auth)
# Grab da_667's threat-tracking twitter list timeline and store the tweets in a list
tweets = [tweet.text for tweet in tweepy.Cursor(api.list_timeline, 'da_667', 'threat-tracking').items()]
# ip address regular expression to parse out ip addresses
regex = re.compile(r'(((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?))')
# Apply the regex to our list of tweets
ip_list = [m.group(0) for t in tweets for m in [regex.search(t)] if m]
# Write the ip list from the list timeline to a text file
with open('da_667_threat_tracking_ip_list.txt', 'w') as f:
for ip in ip_list:
f.write('%s\n' % ip)
# Custom stream monitor for da_667 list
class daStreamer(tweepy.StreamListener):
def on_data(self, data):
# Format the data as json
all_data = json.loads(data)
# Pull out the text of the tweet
tweet = all_data['text']
# Apply the regex to current tweet
match = regex.search(tweet)
# Write to ip to file if the regex matches
if match:
with open('da_667_threat_tracking_ip_list.txt', 'a') as f:
f.write('%s\n' % match.group(0))
print(match.group(0))
def on_error(self, status):
print status
# Grab the members of the da_667 list
members = api.list_members(owner_screen_name='da_667', slug='threat-tracking')
# Store the member id numbers in a list
member_ids = [m.id_str for m in members]
# Start the stream
stream = tweepy.Stream(auth, daStreamer())
# Stream the members of the list
stream.filter(follow=member_ids)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment