Skip to content

Instantly share code, notes, and snippets.

@aayla-secura
Last active November 30, 2020 07:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aayla-secura/02684e8599bfcd1ef1af89d36fa8b8c1 to your computer and use it in GitHub Desktop.
Save aayla-secura/02684e8599bfcd1ef1af89d36fa8b8c1 to your computer and use it in GitHub Desktop.
Search for an email in gmail and extract regex from it
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import pickle
import base64
import os.path
import sys
import re
import argparse
import logging
# If modifying these scopes, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
logger = logging.getLogger('search_gmail')
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='Search in gmail emails')
parser.add_argument(
'--query', '-q', help='Search query for emails')
parser.add_argument(
'--regex', '-r',
help='Regex to search for; if none, entire email is shown')
parser.add_argument(
'--regex-group', '-G', default=0, type=int,
help='Regex capture group to display')
parser.add_argument(
'--regex-flags', '-f', default='',
help='Regex flags, single string with one letter per-flag')
parser.add_argument(
'--search', '-s', default=['body'], nargs='+',
choices=['body', 'subject'], # TODO more, e.g. attachments
help='Where to search for string')
parser.add_argument(
'--first-only', '-1', default=False, action='store_true',
help='Print first match only')
parser.add_argument(
'--mime-type', '-M', default='text/plain',
help='Preferred MIME type to search in')
parser.add_argument(
'--verbose', '-v', action='count',
help='Be verbose; can be given multiple times')
class Message:
def __init__(self, msg, preferred_mime_type='text/plain'):
self._msg = msg
self._preferred_mime_type = preferred_mime_type
self.mime_type = None
self.headers = {}
self.body = None
self.subject = None
self.snippet = None
self.date = None
self.matches = {}
self.attachments = [] # TODO
self._save_data()
logger.debug('Message subject = "{}", snippet = "{}"'.format(
self.subject, self.snippet))
def __str__(self):
return self.__repr__()
def __repr__(self):
return str_trunc(self.snippet, 20)
def _get_parts(self, payload=None):
if payload is None:
payload = self._msg['payload']
if payload['filename']:
# TODO
logger.debug('Skipping attachment')
return []
logger.debug('Processing payload')
body = payload['body']
mime = payload['mimeType']
result = []
try:
data = body['data']
except KeyError:
logger.debug('Body is multi-part')
for p in payload['parts']:
result.extend(self._get_parts(p))
else:
logger.debug('Body is single-part')
result.append({'data': data, 'mimeType': mime})
return result
def _save_data(self):
self.snippet = self._msg['snippet']
self.date = float(self._msg['internalDate']) / 1000
for hdr in self._msg['payload']['headers']:
self.headers[hdr['name'].lower()] = hdr['value']
self.subject = self.headers.get('subject')
parts = self._get_parts()
for p in parts:
self.body = decode_data(p['data'])
self.mime_type = p['mimeType']
if p['mimeType'] == self._preferred_mime_type:
break
def search(self,
regex=None,
regex_group=0,
regex_flags='',
first_only=False,
search_in=['body', 'subject']):
def _find(text):
result = []
for m in regex_c.finditer(text):
result.append(m.group(regex_group))
if first_only:
logger.debug('Returning first match only')
return result
return result
def get_first(text):
m = regex_c.search(text)
if m is not None:
return m.group(regex_group)
if regex is None:
return
if isinstance(regex, re.Pattern):
regex_c = regex
if regex_flags:
logger.warn(
'Ignoring regex flags with already compiled pattern')
else:
flags = 0
for f in regex_flags:
flags |= getattr(re, f)
regex_c = re.compile(regex, flags)
logger.debug('Using regex {} wih flags {!r}'.format(
regex, flags))
self.matches[regex] = []
if 'subject' in search_in:
m = _find(self.subject)
logger.debug('Matched in subject: {}'.format(m))
self.matches[regex].extend(m)
if 'body' in search_in:
m = _find(self.body)
logger.debug('Matched in body: {}'.format(m))
self.matches[regex].extend(m)
return self.matches[regex]
def decode_data(data):
altchars = None
if '-' in data or '_' in data:
altchars = '-_'
return base64.b64decode(data, altchars=altchars).decode(
'utf-8', errors='backslashreplace')
def get_service():
creds = None
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=50001)
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
def get_msgs(service, query=None):
def _get_msgs(page_token):
logger.debug('Fetching messages, pageToken={}'.format(
page_token))
return service.users().messages().list(
userId='me',
pageToken=page_token,
q=query).execute()
messages = []
first = True
page_token = None
while first or page_token:
results = _get_msgs(page_token)
messages.extend(results.get('messages', []))
page_token = results.get('nextPageToken')
first = False
return messages
def get_msg(service, msg_id):
return service.users().messages().get(
userId='me', id=msg_id).execute()
def process_msgs(service,
preferred_mime_type=None,
query=None,
**search_kwargs):
messages = get_msgs(service, query=query)
logger.info('Found {} messages'.format(len(messages)))
result = []
for m in messages:
msg = Message(get_msg(service, m['id']),
preferred_mime_type=preferred_mime_type)
logger.info('~~~~~~~~~~ {}'.format(msg))
msg.search(**search_kwargs)
for regex, matches in msg.matches.items():
logger.info('{} -> {}'.format(
regex, set(matches)))
if not search_kwargs or msg.matches:
result.append(msg)
return result[::-1]
def setup_logger(verbosity=None):
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(1)
if verbosity is None:
logger.setLevel(logging.WARNING)
elif verbosity > 2:
logger.setLevel(1)
elif verbosity > 1:
logger.setLevel(logging.DEBUG)
elif verbosity > 0:
logger.setLevel(logging.INFO)
logger.addHandler(handler)
def str_trunc(s, size):
if len(s) <= size:
return s
if isinstance(s, bytes):
suff = b'...'
else:
suff = '...'
return s[:size - 3] + suff
if __name__ == '__main__':
args = parser.parse_args()
setup_logger(args.verbose)
service = get_service()
process_msgs(service,
preferred_mime_type=args.mime_type,
query=args.query,
regex=args.regex,
regex_group=args.regex_group,
regex_flags=args.regex_flags,
first_only=args.first_only,
search_in=args.search)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment