Last active
September 14, 2022 01:54
-
-
Save jramirez857/9b81e7e89b1a4ffc366080d023dad76f to your computer and use it in GitHub Desktop.
Gist for getting top email senders from your most recent n emails. Requires GMail API Project Setup with credentials in a credentials.json file. Usage 1. python -m venv venv 2. `source venv/bin/activate` 3. `pip install -r requirements.txt` 4. `python -m top_senders.py --help`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module contains the GmailAuth class. It is used to authenticate with | |
Gmail. | |
""" | |
import argparse | |
import os.path as p | |
from googleapiclient.discovery import build | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
# Sets the scope and application name for the project. This is required | |
# for the flow to create the necessary credentials. | |
SCOPES = ["https://mail.google.com/"] | |
APPLICATION_NAME = "Gmail API Python" | |
class GmailService: | |
""" | |
The GmailAuth class is used to authenticate with the Gmail API. | |
""" | |
def __init__(self, **kwargs): | |
self.credentials_path = kwargs.get("credentials", "credentials.json") | |
def _get_credentials(self, token_file="token.json"): | |
""" | |
Checks if the token.json file exists and uses it if it does. | |
If it doesnt, it checks for the credentials.json file, runs through the flow and | |
saves the credentials for the next run. | |
""" | |
user_credentials = None | |
if p.exists(token_file): | |
user_credentials = Credentials.from_authorized_user_file(token_file, SCOPES) | |
if user_credentials.expired and user_credentials.refresh_token: | |
user_credentials.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file( | |
self.credentials_path, SCOPES | |
) | |
user_credentials = flow.run_local_server(port=0) | |
with open(token_file, "w") as token: | |
token.write(user_credentials.to_json()) | |
return user_credentials | |
def get(self): | |
""" | |
Returns the service object for the email API. | |
""" | |
return build("gmail", "v1", credentials=self._get_credentials()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
cachetools==4.2.4 | |
certifi==2021.10.8 | |
charset-normalizer==2.0.7 | |
google-api-core==2.2.2 | |
google-api-python-client==2.29.0 | |
google-auth==2.3.3 | |
google-auth-httplib2==0.1.0 | |
google-auth-oauthlib==0.4.6 | |
googleapis-common-protos==1.53.0 | |
httplib2==0.20.2 | |
idna==3.3 | |
oauthlib==3.1.1 | |
progressbar==2.5 | |
protobuf==3.19.1 | |
pyasn1==0.4.8 | |
pyasn1-modules==0.2.8 | |
pydantic==1.8.2 | |
pyparsing==3.0.5 | |
requests==2.26.0 | |
requests-oauthlib==1.3.0 | |
rsa==4.7.2 | |
six==1.16.0 | |
typing-extensions==3.10.0.2 | |
uritemplate==4.1.1 | |
urllib3==1.26.7 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This module implements the GmailSenders class. This class is used to | |
get the list of senders from a Gmail account for a given number of emails. | |
""" | |
from collections import defaultdict, OrderedDict | |
import pprint | |
import logging | |
import argparse | |
import progressbar | |
from gmail import GmailService | |
from pydantic import BaseModel | |
pp = pprint.PrettyPrinter(indent=4) | |
class GMail(BaseModel): | |
id: str | |
sender: str | |
class TopSenders: | |
""" | |
This class is used to get the list of senders for a given number of emails. | |
:param num_emails: The number of emails to get the senders for. | |
:param gmail_service: The Gmail API service. | |
:param log_level: The logging level. | |
:var senders: An OrderedDict of senders and their counts in descending order. | |
""" | |
def __init__(self, **kwargs): | |
self.gmail = kwargs.get("gmail_service", GmailService().get()) | |
logging.basicConfig( | |
level=kwargs.get("log_level", logging.DEBUG), | |
format="%(funcName)s():%(lineno)i: %(levelname)s: %(message)s", | |
) | |
def _parse_email(self, message) -> GMail: | |
""" | |
Parses the email and returns an Email object. | |
:param message: The message to parse. | |
:return: An Email object. | |
""" | |
_email = self.get_email_by_id(message["id"]) | |
_sender = self._get_sender(_email) | |
return GMail(id=_email["id"], sender=_sender) | |
def _extract_email_info(self, messages: list) -> list: | |
""" | |
Extracts the needed information for emails in a response message. | |
:param num_emails: The number of emails to get the senders for. | |
:return: A list of emails. | |
""" | |
emails = [] | |
for message in messages: | |
_email = self._parse_email(message) | |
emails.append(_email) | |
return emails | |
def _get_response(self, token=None) -> list: | |
""" | |
Gets the messages from the Gmail API for the logged in user. | |
""" | |
return ( | |
self.gmail.users().messages().list(userId="me", pageToken=token).execute() | |
if token | |
else self.gmail.users().messages().list(userId="me").execute() | |
) | |
def _get_num_emails(self, num: int) -> list: | |
""" | |
Gets a batch of messages from the Gmail class for the logged in user. | |
""" | |
response = self._get_response() | |
emails = [] | |
logging.info("Getting %d emails", num) | |
for _ in progressbar.progressbar(range(0, num, len(response["messages"]))): | |
messages = response["messages"] | |
emails.extend(self._extract_email_info(messages)) | |
if response.get("nextPageToken", None) is not None: | |
response = self._get_response(response["nextPageToken"]) | |
else: | |
logging.warning("No more messages") | |
break | |
logging.info("Successfully retrieved %d messages", len(emails)) | |
return emails | |
def _get_sender(self, email) -> str: | |
""" | |
Extracts the sender from the email and adds it to the senders dictionary. | |
Increments the count for the sender in the dictionary. | |
:param email: The email to extract the sender from. | |
:return: The sender of the email. | |
""" | |
sender = "Unknown Sender" | |
for header in email["payload"]["headers"]: | |
if header["name"] == "From": | |
sender = header["value"] | |
break | |
return sender | |
def get_email_by_id(self, message_id, user="me") -> dict: | |
""" | |
Gets the email by the message id for specified user. | |
:param message_id: The message to get the email for. | |
:param user: The user to get the email for. | |
""" | |
return self.gmail.users().messages().get(userId=user, id=message_id).execute() | |
def get(self, num_emails: int, num_senders: int = 10) -> OrderedDict: | |
""" | |
Gets a list of top senders from the Gmail API for the logged in user. | |
:param num_emails: The number of emails to get the senders for. | |
:param num_senders: The number of senders to return. | |
:return: A list of the top senders of size num_senders. | |
""" | |
senders = defaultdict(int) | |
emails = self._get_num_emails(num=num_emails) | |
logging.info(f"Getting top senders for { len(emails) } number of emails.") | |
for email in progressbar.progressbar(emails): | |
senders[email.sender] += 1 | |
top_senders = sorted(senders, key=senders.get, reverse=True)[:num_senders] | |
logging.info("top senders: %s", top_senders) | |
return top_senders | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser( | |
description="Get the senders for a given number of emails." | |
) | |
parser.add_argument( | |
"-n", | |
"--num_emails", | |
type=int, | |
default=1000, | |
help="The number of emails to get the senders for.", | |
) | |
parser.add_argument( | |
"-s", | |
"--num_senders", | |
type=int, | |
default=10, | |
help="The number of top senders to return.", | |
) | |
args = parser.parse_args() | |
top_senders = TopSenders(log_level=logging.INFO).get(num_emails=args.num_emails, num_senders=args.num_senders) | |
pp.pprint(top_senders) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment