Skip to content

Instantly share code, notes, and snippets.

@brad-anton
Created September 21, 2017 16:05
Show Gist options
  • Save brad-anton/58b01c51b647d35a924a0fd82743e4fc to your computer and use it in GitHub Desktop.
Save brad-anton/58b01c51b647d35a924a0fd82743e4fc to your computer and use it in GitHub Desktop.
Checks your mailbox for lots of 'Confirm' messages and then pulls info from them
import httplib2
import os
import base64
import email
import re
from apiclient import errors
from apiclient import discovery
from oauth2client import client
from oauth2client import tools
from oauth2client.file import Storage
from urlparse import urlparse, urlunsplit
SCOPES = 'https://www.googleapis.com/auth/gmail.readonly'
CLIENT_SECRET_FILE = 'client_secret.json'
APPLICATION_NAME = 'Gmail API Python Quickstart'
class SpamRegurg:
def __init__(self, user_id='me'):
credentials = self.get_credentials()
http = credentials.authorize(httplib2.Http())
self.service = discovery.build('gmail', 'v1', http=http)
self.user_id = 'me'
self.messages = []
self.get_messages()
@staticmethod
def blob2url(blob):
"""Returns a list of strings that resemble a URL within blob
Keyword Arguments:
blob -- String of text to find URLs in
"""
re_url = re.compile(r'(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[\-A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[\-A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
r'(?::\d+)?' # optional port
r'(?:/?)?' # URI Slash
r'(?:[a-z0-9\-._~:/?#%@!$&\'()*+,;=]+[a-z0-9\-._~:/?#%\[\]@!$&\'()*+,;=]+)?', # URI
re.IGNORECASE)
return re_url.findall(blob)
@staticmethod
def get_credentials(credential_dir='.'):
"""Gets valid user credentials from storage.
If nothing has been stored, or if the stored credentials are invalid,
the OAuth2 flow is completed to obtain the new credentials.
Returns:
Credentials, the obtained credential.
"""
credential_path = os.path.join(credential_dir,
'gmail-python-quickstart.json')
store = Storage(credential_path)
credentials = store.get()
if not credentials or credentials.invalid:
flow = client.flow_from_clientsecrets(CLIENT_SECRET_FILE, SCOPES)
flow.user_agent = APPLICATION_NAME
if flags:
credentials = tools.run_flow(flow, store, flags)
else: # Needed only for compatibility with Python 2.6
credentials = tools.run(flow, store)
print('Storing credentials to ' + credential_path)
return credentials
def ListMessagesMatchingQuery(self, query=''):
"""List all Messages of the user's mailbox matching the query.
Args:
service: Authorized Gmail API service instance.
user_id: User's email address. The special value "me"
can be used to indicate the authenticated user.
query: String used to filter messages returned.
Eg.- 'from:user@some_domain.com' for Messages from a particular sender.
Returns:
List of Messages that match the criteria of the query. Note that the
returned list contains Message IDs, you must use get with the
appropriate ID to get the details of a Message.
"""
try:
response = self.service.users().messages().list(userId=self.user_id,
q=query).execute()
messages = []
if 'messages' in response:
messages.extend(response['messages'])
while 'nextPageToken' in response:
page_token = response['nextPageToken']
response = self.service.users().messages().list(userId=self.user_id, q=query,
pageToken=page_token).execute()
messages.extend(response['messages'])
return messages
except errors.HttpError, error:
print 'An error occurred: {}'.format(error)
def GetMimeMessage(self, msg_id):
"""Get a Message and use it to create a MIME Message.
Args:
service: Authorized Gmail API service instance.
user_id: User's email address. The special value "me"
can be used to indicate the authenticated user.
msg_id: The ID of the Message required.
Returns:
A MIME Message, consisting of data from Message.
"""
try:
message = self.service.users().messages().get(userId=self.user_id, id=msg_id,
format='raw').execute()
msg_str = base64.urlsafe_b64decode(message['raw'].encode('ASCII'))
mime_msg = email.message_from_string(msg_str)
return mime_msg
except errors.HttpError, error:
print 'An error occurred: %s' % error
def get_messages(self):
msg_ids = self.ListMessagesMatchingQuery('subject:confirm after:2017/9/20 before:2017/9/23')
for msg_id in msg_ids:
self.messages.append(self.GetMimeMessage(msg_id['id']))
def get_confirmation(self, message):
results = []
if message.is_multipart():
for body in message.get_payload():
results += self.blob2url(body.get_payload())
else:
results += self.blob2url(body.get_payload())
return results
def get_domain(self, message):
results = []
urls = list(set(self.get_confirmation(message)))
for url in urls:
s = urlparse(url)
results.append(urlunsplit([s[0], s[1], '', '', '']))
return results
def get_antiabuse(self, message):
results = []
try:
for item in message.get_all('X-AntiAbuse'):
if item.startswith('Sender Address Domain - '):
results.append(item.replace('Sender Address Domain - ', ''))
except TypeError:
pass
return results
def get_artifacts(self):
results = []
for message in self.messages:
artifacts = { 'confirmations': self.get_confirmation(message),
'domains': self.get_domain(message),
'antiabuse': self.get_antiabuse(message)
}
results.append(artifacts)
return results
if __name__ == '__main__':
s = SpamRegurg()
print s.get_artifacts()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment