Skip to content

Instantly share code, notes, and snippets.

@jramirez857
Last active Sep 14, 2022
Embed
What would you like to do?
Gist for getting top email senders from your most recent n emails. Requires GMail API Project Setup with credentials in a credentials.json file. Usage 1. python -m venv venv 2. `source venv/bin/activate` 3. `pip install -r requirements.txt` 4. `python -m top_senders.py --help`
"""
This module contains the GmailAuth class. It is used to authenticate with
Gmail.
"""
import argparse
import os.path as p
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
# Sets the scope and application name for the project. This is required
# for the flow to create the necessary credentials.
SCOPES = ["https://mail.google.com/"]
APPLICATION_NAME = "Gmail API Python"
class GmailService:
"""
The GmailAuth class is used to authenticate with the Gmail API.
"""
def __init__(self, **kwargs):
self.credentials_path = kwargs.get("credentials", "credentials.json")
def _get_credentials(self, token_file="token.json"):
"""
Checks if the token.json file exists and uses it if it does.
If it doesnt, it checks for the credentials.json file, runs through the flow and
saves the credentials for the next run.
"""
user_credentials = None
if p.exists(token_file):
user_credentials = Credentials.from_authorized_user_file(token_file, SCOPES)
if user_credentials.expired and user_credentials.refresh_token:
user_credentials.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
self.credentials_path, SCOPES
)
user_credentials = flow.run_local_server(port=0)
with open(token_file, "w") as token:
token.write(user_credentials.to_json())
return user_credentials
def get(self):
"""
Returns the service object for the email API.
"""
return build("gmail", "v1", credentials=self._get_credentials())
cachetools==4.2.4
certifi==2021.10.8
charset-normalizer==2.0.7
google-api-core==2.2.2
google-api-python-client==2.29.0
google-auth==2.3.3
google-auth-httplib2==0.1.0
google-auth-oauthlib==0.4.6
googleapis-common-protos==1.53.0
httplib2==0.20.2
idna==3.3
oauthlib==3.1.1
progressbar==2.5
protobuf==3.19.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pydantic==1.8.2
pyparsing==3.0.5
requests==2.26.0
requests-oauthlib==1.3.0
rsa==4.7.2
six==1.16.0
typing-extensions==3.10.0.2
uritemplate==4.1.1
urllib3==1.26.7
"""
This module implements the GmailSenders class. This class is used to
get the list of senders from a Gmail account for a given number of emails.
"""
from collections import defaultdict, OrderedDict
import pprint
import logging
import argparse
import progressbar
from gmail import GmailService
from pydantic import BaseModel
pp = pprint.PrettyPrinter(indent=4)
class GMail(BaseModel):
id: str
sender: str
class TopSenders:
"""
This class is used to get the list of senders for a given number of emails.
:param num_emails: The number of emails to get the senders for.
:param gmail_service: The Gmail API service.
:param log_level: The logging level.
:var senders: An OrderedDict of senders and their counts in descending order.
"""
def __init__(self, **kwargs):
self.gmail = kwargs.get("gmail_service", GmailService().get())
logging.basicConfig(
level=kwargs.get("log_level", logging.DEBUG),
format="%(funcName)s():%(lineno)i: %(levelname)s: %(message)s",
)
def _parse_email(self, message) -> GMail:
"""
Parses the email and returns an Email object.
:param message: The message to parse.
:return: An Email object.
"""
_email = self.get_email_by_id(message["id"])
_sender = self._get_sender(_email)
return GMail(id=_email["id"], sender=_sender)
def _extract_email_info(self, messages: list) -> list:
"""
Extracts the needed information for emails in a response message.
:param num_emails: The number of emails to get the senders for.
:return: A list of emails.
"""
emails = []
for message in messages:
_email = self._parse_email(message)
emails.append(_email)
return emails
def _get_response(self, token=None) -> list:
"""
Gets the messages from the Gmail API for the logged in user.
"""
return (
self.gmail.users().messages().list(userId="me", pageToken=token).execute()
if token
else self.gmail.users().messages().list(userId="me").execute()
)
def _get_num_emails(self, num: int) -> list:
"""
Gets a batch of messages from the Gmail class for the logged in user.
"""
response = self._get_response()
emails = []
logging.info("Getting %d emails", num)
for _ in progressbar.progressbar(range(0, num, len(response["messages"]))):
messages = response["messages"]
emails.extend(self._extract_email_info(messages))
if response.get("nextPageToken", None) is not None:
response = self._get_response(response["nextPageToken"])
else:
logging.warning("No more messages")
break
logging.info("Successfully retrieved %d messages", len(emails))
return emails
def _get_sender(self, email) -> str:
"""
Extracts the sender from the email and adds it to the senders dictionary.
Increments the count for the sender in the dictionary.
:param email: The email to extract the sender from.
:return: The sender of the email.
"""
sender = "Unknown Sender"
for header in email["payload"]["headers"]:
if header["name"] == "From":
sender = header["value"]
break
return sender
def get_email_by_id(self, message_id, user="me") -> dict:
"""
Gets the email by the message id for specified user.
:param message_id: The message to get the email for.
:param user: The user to get the email for.
"""
return self.gmail.users().messages().get(userId=user, id=message_id).execute()
def get(self, num_emails: int, num_senders: int = 10) -> OrderedDict:
"""
Gets a list of top senders from the Gmail API for the logged in user.
:param num_emails: The number of emails to get the senders for.
:param num_senders: The number of senders to return.
:return: A list of the top senders of size num_senders.
"""
senders = defaultdict(int)
emails = self._get_num_emails(num=num_emails)
logging.info(f"Getting top senders for { len(emails) } number of emails.")
for email in progressbar.progressbar(emails):
senders[email.sender] += 1
top_senders = sorted(senders, key=senders.get, reverse=True)[:num_senders]
logging.info("top senders: %s", top_senders)
return top_senders
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Get the senders for a given number of emails."
)
parser.add_argument(
"-n",
"--num_emails",
type=int,
default=1000,
help="The number of emails to get the senders for.",
)
parser.add_argument(
"-s",
"--num_senders",
type=int,
default=10,
help="The number of top senders to return.",
)
args = parser.parse_args()
top_senders = TopSenders(log_level=logging.INFO).get(num_emails=args.num_emails, num_senders=args.num_senders)
pp.pprint(top_senders)
@illucent
Copy link

illucent commented Sep 6, 2022

TypeError: 'module' object is not callable

@hectormz
Copy link

hectormz commented Sep 14, 2022

@illucent try uninstalling progressbar and install progressbar2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment