Last active
November 30, 2020 07:41
-
-
Save aayla-secura/02684e8599bfcd1ef1af89d36fa8b8c1 to your computer and use it in GitHub Desktop.
Search for an email in gmail and extract regex from it
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from googleapiclient.discovery import build | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
import pickle | |
import base64 | |
import os.path | |
import sys | |
import re | |
import argparse | |
import logging | |
# If modifying these scopes, delete the file token.pickle. | |
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] | |
logger = logging.getLogger('search_gmail') | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
description='Search in gmail emails') | |
parser.add_argument( | |
'--query', '-q', help='Search query for emails') | |
parser.add_argument( | |
'--regex', '-r', | |
help='Regex to search for; if none, entire email is shown') | |
parser.add_argument( | |
'--regex-group', '-G', default=0, type=int, | |
help='Regex capture group to display') | |
parser.add_argument( | |
'--regex-flags', '-f', default='', | |
help='Regex flags, single string with one letter per-flag') | |
parser.add_argument( | |
'--search', '-s', default=['body'], nargs='+', | |
choices=['body', 'subject'], # TODO more, e.g. attachments | |
help='Where to search for string') | |
parser.add_argument( | |
'--first-only', '-1', default=False, action='store_true', | |
help='Print first match only') | |
parser.add_argument( | |
'--mime-type', '-M', default='text/plain', | |
help='Preferred MIME type to search in') | |
parser.add_argument( | |
'--verbose', '-v', action='count', | |
help='Be verbose; can be given multiple times') | |
class Message: | |
def __init__(self, msg, preferred_mime_type='text/plain'): | |
self._msg = msg | |
self._preferred_mime_type = preferred_mime_type | |
self.mime_type = None | |
self.headers = {} | |
self.body = None | |
self.subject = None | |
self.snippet = None | |
self.date = None | |
self.matches = {} | |
self.attachments = [] # TODO | |
self._save_data() | |
logger.debug('Message subject = "{}", snippet = "{}"'.format( | |
self.subject, self.snippet)) | |
def __str__(self): | |
return self.__repr__() | |
def __repr__(self): | |
return str_trunc(self.snippet, 20) | |
def _get_parts(self, payload=None): | |
if payload is None: | |
payload = self._msg['payload'] | |
if payload['filename']: | |
# TODO | |
logger.debug('Skipping attachment') | |
return [] | |
logger.debug('Processing payload') | |
body = payload['body'] | |
mime = payload['mimeType'] | |
result = [] | |
try: | |
data = body['data'] | |
except KeyError: | |
logger.debug('Body is multi-part') | |
for p in payload['parts']: | |
result.extend(self._get_parts(p)) | |
else: | |
logger.debug('Body is single-part') | |
result.append({'data': data, 'mimeType': mime}) | |
return result | |
def _save_data(self): | |
self.snippet = self._msg['snippet'] | |
self.date = float(self._msg['internalDate']) / 1000 | |
for hdr in self._msg['payload']['headers']: | |
self.headers[hdr['name'].lower()] = hdr['value'] | |
self.subject = self.headers.get('subject') | |
parts = self._get_parts() | |
for p in parts: | |
self.body = decode_data(p['data']) | |
self.mime_type = p['mimeType'] | |
if p['mimeType'] == self._preferred_mime_type: | |
break | |
def search(self, | |
regex=None, | |
regex_group=0, | |
regex_flags='', | |
first_only=False, | |
search_in=['body', 'subject']): | |
def _find(text): | |
result = [] | |
for m in regex_c.finditer(text): | |
result.append(m.group(regex_group)) | |
if first_only: | |
logger.debug('Returning first match only') | |
return result | |
return result | |
def get_first(text): | |
m = regex_c.search(text) | |
if m is not None: | |
return m.group(regex_group) | |
if regex is None: | |
return | |
if isinstance(regex, re.Pattern): | |
regex_c = regex | |
if regex_flags: | |
logger.warn( | |
'Ignoring regex flags with already compiled pattern') | |
else: | |
flags = 0 | |
for f in regex_flags: | |
flags |= getattr(re, f) | |
regex_c = re.compile(regex, flags) | |
logger.debug('Using regex {} wih flags {!r}'.format( | |
regex, flags)) | |
self.matches[regex] = [] | |
if 'subject' in search_in: | |
m = _find(self.subject) | |
logger.debug('Matched in subject: {}'.format(m)) | |
self.matches[regex].extend(m) | |
if 'body' in search_in: | |
m = _find(self.body) | |
logger.debug('Matched in body: {}'.format(m)) | |
self.matches[regex].extend(m) | |
return self.matches[regex] | |
def decode_data(data): | |
altchars = None | |
if '-' in data or '_' in data: | |
altchars = '-_' | |
return base64.b64decode(data, altchars=altchars).decode( | |
'utf-8', errors='backslashreplace') | |
def get_service(): | |
creds = None | |
if os.path.exists('token.pickle'): | |
with open('token.pickle', 'rb') as token: | |
creds = pickle.load(token) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file( | |
'credentials.json', SCOPES) | |
creds = flow.run_local_server(port=50001) | |
with open('token.pickle', 'wb') as token: | |
pickle.dump(creds, token) | |
return build('gmail', 'v1', credentials=creds) | |
def get_msgs(service, query=None): | |
def _get_msgs(page_token): | |
logger.debug('Fetching messages, pageToken={}'.format( | |
page_token)) | |
return service.users().messages().list( | |
userId='me', | |
pageToken=page_token, | |
q=query).execute() | |
messages = [] | |
first = True | |
page_token = None | |
while first or page_token: | |
results = _get_msgs(page_token) | |
messages.extend(results.get('messages', [])) | |
page_token = results.get('nextPageToken') | |
first = False | |
return messages | |
def get_msg(service, msg_id): | |
return service.users().messages().get( | |
userId='me', id=msg_id).execute() | |
def process_msgs(service, | |
preferred_mime_type=None, | |
query=None, | |
**search_kwargs): | |
messages = get_msgs(service, query=query) | |
logger.info('Found {} messages'.format(len(messages))) | |
result = [] | |
for m in messages: | |
msg = Message(get_msg(service, m['id']), | |
preferred_mime_type=preferred_mime_type) | |
logger.info('~~~~~~~~~~ {}'.format(msg)) | |
msg.search(**search_kwargs) | |
for regex, matches in msg.matches.items(): | |
logger.info('{} -> {}'.format( | |
regex, set(matches))) | |
if not search_kwargs or msg.matches: | |
result.append(msg) | |
return result[::-1] | |
def setup_logger(verbosity=None): | |
handler = logging.StreamHandler(sys.stderr) | |
handler.setLevel(1) | |
if verbosity is None: | |
logger.setLevel(logging.WARNING) | |
elif verbosity > 2: | |
logger.setLevel(1) | |
elif verbosity > 1: | |
logger.setLevel(logging.DEBUG) | |
elif verbosity > 0: | |
logger.setLevel(logging.INFO) | |
logger.addHandler(handler) | |
def str_trunc(s, size): | |
if len(s) <= size: | |
return s | |
if isinstance(s, bytes): | |
suff = b'...' | |
else: | |
suff = '...' | |
return s[:size - 3] + suff | |
if __name__ == '__main__': | |
args = parser.parse_args() | |
setup_logger(args.verbose) | |
service = get_service() | |
process_msgs(service, | |
preferred_mime_type=args.mime_type, | |
query=args.query, | |
regex=args.regex, | |
regex_group=args.regex_group, | |
regex_flags=args.regex_flags, | |
first_only=args.first_only, | |
search_in=args.search) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment