Skip to content

Instantly share code, notes, and snippets.


a6kme/ Secret

Created March 26, 2023 14:02
Show Gist options
  • Save a6kme/1631aca545c229e59c62ac85d48caee4 to your computer and use it in GitHub Desktop.
Save a6kme/1631aca545c229e59c62ac85d48caee4 to your computer and use it in GitHub Desktop.
import base64
import io
import os
import pickle
from datetime import datetime
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from PyPDF2 import PdfReader, PdfWriter
# Set the destination folder and PDF password
destination_folder = "./attachments"
pdf_password = os.environ.get('PDF_PASSWORD')
# Authenticate with the Gmail API
def authenticate():
creds = None
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
creds = pickle.load(token)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', [''])
creds = flow.run_local_server(port=0)
with open('token.pickle', 'wb') as token:
pickle.dump(creds, token)
return build('gmail', 'v1', credentials=creds)
# Download and decrypt the PDF attachments
def download_attachments(service, query):
print("Fetching emails...")
result = service.users().messages().list(userId='me', q=query).execute()
messages = result.get('messages', [])
print(f"Found {len(messages)} emails with matching query")
for idx, message in enumerate(messages):
print(f"Processing email {idx + 1} of {len(messages)}")
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute()
payload = msg['payload']
# Get the email's received date
email_received_timestamp = int(msg['internalDate'])
email_received_datetime = datetime.fromtimestamp(email_received_timestamp / 1000)
destination_file_name = email_received_datetime.strftime('%B-%Y')
for part in payload.get("parts"):
filename = part.get("filename")
if filename.endswith(".pdf") and "body" in part:
print(f"Downloading attachment: {filename}")
if "attachmentId" in part["body"]:
attachment_id = part["body"]["attachmentId"]
attachment = service.users().messages().attachments().get(userId='me', messageId=message['id'], id=attachment_id).execute()
file_data = attachment["data"]
file_data = part["body"].get("data")
file_path = os.path.join(destination_folder, f"{destination_file_name}.pdf")
file_data = base64.urlsafe_b64decode(file_data)
print(f"Decrypting attachment: {filename}")
decrypt_pdf(file_data, file_path, pdf_password)
print(f"Attachment saved and decrypted: {file_path}")
def decrypt_pdf(file_data, file_path, password):
with io.BytesIO(file_data) as input_pdf:
reader = PdfReader(input_pdf, password=password)
writer = PdfWriter()
for page in reader.pages:
with open(file_path, "wb") as output_pdf:
if __name__ == '__main__':
service = authenticate()
query = ' after:2022-03-01 has:attachment'
download_attachments(service, query)
except HttpError as error:
print(f"An error occurred: {error}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment