Skip to content

Instantly share code, notes, and snippets.


a6kme/ Secret

Created March 26, 2023 14:02
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
What would you like to do?
import base64
import io
import os
import pickle
from datetime import datetime
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from PyPDF2 import PdfReader, PdfWriter
# Set the destination folder and PDF password
destination_folder = "./attachments"
pdf_password = os.environ.get('PDF_PASSWORD')
# Authenticate with the Gmail API
def authenticate():
creds = None
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', [''])
creds = flow.run_local_server(port=0)
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
# Download and decrypt the PDF attachments
def download_attachments(service, query):
print("Fetching emails...")
result = service.users().messages().list(userId='me', q=query).execute()
messages = result.get('messages', [])
print(f"Found {len(messages)} emails with matching query")
for idx, message in enumerate(messages):
print(f"Processing email {idx + 1} of {len(messages)}")
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
payload = msg['payload']
# Get the email's received date
email_received_timestamp = int(msg['internalDate'])
email_received_datetime = datetime.fromtimestamp(email_received_timestamp / 1000)
destination_file_name = email_received_datetime.strftime('%B-%Y')
for part in payload.get("parts"):
filename = part.get("filename")
if filename.endswith(".pdf") and "body" in part:
print(f"Downloading attachment: {filename}")
if "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment = service.users().messages().attachments().get(userId='me', messageId=message['id'], id=attachment_id).execute()
file_data = attachment["data"]
file_data = part["body"].get("data")
file_path = os.path.join(destination_folder, f"{destination_file_name}.pdf")
file_data = base64.urlsafe_b64decode(file_data)
print(f"Decrypting attachment: {filename}")
decrypt_pdf(file_data, file_path, pdf_password)
print(f"Attachment saved and decrypted: {file_path}")
def decrypt_pdf(file_data, file_path, password):
with io.BytesIO(file_data) as input_pdf:
reader = PdfReader(input_pdf, password=password)
writer = PdfWriter()
for page in reader.pages:
with open(file_path, "wb") as output_pdf:
if __name__ == '__main__':
service = authenticate()
query = ' after:2022-03-01 has:attachment'
download_attachments(service, query)
except HttpError as error:
print(f"An error occurred: {error}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment