Skip to content

Instantly share code, notes, and snippets.

@Zakarialabib
Last active April 22, 2023 20:41
Show Gist options
  • Save Zakarialabib/32dfc62db79faaa0f4bdd3cd02b133d5 to your computer and use it in GitHub Desktop.
Save Zakarialabib/32dfc62db79faaa0f4bdd3cd02b133d5 to your computer and use it in GitHub Desktop.
Invoices Classification

The bot can then connect to the email account, search for emails with invoices, extract client names, and save the invoices to the appropriate folders.

0 9,17 * * * /path/to/python /path/to/bot.py --server example.com --username user --password pass --keywords invoice facture bon de commande --output-dir /path/to/output

This Cron job runs the command /path/to/python /path/to/bot.py twice a day, at 9:00 AM and 5:00 PM, every day of the week (* * * * *). The command-line arguments for the bot are specified after the command. In this example, the email server is example.com, the username is user, the password is pass, the keywords to search for in the email subject are invoice facture bon de commande, and the output directory for saving invoices is /path/to/output.

python invoice_bot.py --server imap.gmail.com --username user@gmail.com --password mypassword --keywords invoice facture bon de commande --output-dir /path/to/output/directory

import imaplib
import email
import re
import os
import datetime
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from argparse import ArgumentParser
def extract_text_from_email(email_message):
"""Extract the text from an email message.
This function extracts the text from an email message by recursively
searching through the email parts.
"""
text = ""
for part in email_message.walk():
if part.get_content_type() == "text/plain":
text += part.get_payload()
return text
def extract_client_name(text):
"""Extract the client name from the email text.
This function extracts the client name from the email text by searching
for a pattern that looks like "Bill To: Client Name".
"""
pattern = r"Bill To: (.+)"
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1)
else:
return None
def create_client_folder(output_dir, client_name):
"""Create a folder for the client if it does not exist.
"""
client_dir = os.path.join(output_dir, client_name)
if not os.path.exists(client_dir):
os.mkdir(client_dir)
return client_dir
def save_attachment(attachment, output_filename):
"""Save an email attachment to a file.
"""
with open(output_filename, "wb") as f:
f.write(attachment.get_payload(decode=True))
def process_email(email_message, keywords, output_dir):
"""Process an email message and save any invoices to the output directory.
"""
text = extract_text_from_email(email_message)
client_name = extract_client_name(text)
if client_name:
client_dir = create_client_folder(output_dir, client_name)
for part in email_message.walk():
filename = part.get_filename()
if filename and any(keyword in filename.lower() or keyword in text.lower() for keyword in keywords):
output_filename = os.path.join(client_dir, filename)
save_attachment(part, output_filename)
def search_emails(server, username, password, keywords):
"""Search for emails with the specified keywords.
This function connects to the email server and searches for emails with
the specified keywords in the subject line or body of the email. It then
processes any emails with invoices.
"""
imap = imaplib.IMAP4_SSL(server)
imap.login(username, password)
imap.select("INBOX")
# Search for emails with the specified keywords
for keyword in keywords:
result, data = imap.search(None, f'(OR SUBJECT "{keyword}" BODY "{keyword}")')
if result == "OK":
for email_id in data[0].split():
result, data = imap.fetch(email_id, "(RFC822)")
if result == "OK":
email_message = email.message_from_bytes(data[0][1])
process_email(email_message, keywords, output_dir)
imap.logout()
def generate_report(output_dir):
"""Generate a PDF report of the invoices processed.
This function generates a PDF report that lists the number of invoices
processed and the number of invoices for each client.
"""
# Get the current date and time
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Get a list of all client folders in the output directory
clients = os.listdir(output_dir)
# Create a new PDF file
pdf_filename = "report.pdf"
pdf = canvas.Canvas(pdf_filename, pagesize=letter)
# Add a title to the first page of the PDF
pdf.setFont("Helvetica-Bold", 16)
pdf.drawString(100, 700, "Invoice Report")
pdf.setFont("Helvetica", 12)
pdf.drawString(100, 670, f"Generated on: {now}")
pdf.drawString(100, 640, f"Number of invoices processed: {len(clients)}")
# Add a table of clients and the number of invoices for each client
y = 600
for client in clients:
invoices = os.listdir(os.path.join(output_dir, client))
pdf.drawString(100, y, f"{client}: {len(invoices)} invoices")
y -= 20
# Save the PDF file
pdf.save()
# Print the report to the console
print(f"Report generated on {now}")
print(f"Number of invoices processed: {len(clients)}")
for client in clients:
invoices = os.listdir(os.path.join(output_dir, client))
print(f"{client}: {len(invoices)} invoices")
import os
import sqlite3
import smtplib
from email.message import EmailMessage
from PyQt5.QtWidgets import (
QApplication,
QWidget,
QVBoxLayout,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QFileDialog,
QMessageBox
)
DB_FILE = "email_credentials.db"
class EmailBot(QWidget):
def __init__(self):
super().__init__()
# Initialize UI
self.init_ui()
def init_ui(self):
# Server input
server_label = QLabel("Server:")
self.server_input = QLineEdit()
server_layout = QHBoxLayout()
server_layout.addWidget(server_label)
server_layout.addWidget(self.server_input)
# Username input
username_label = QLabel("Username:")
self.username_input = QLineEdit()
username_layout = QHBoxLayout()
username_layout.addWidget(username_label)
username_layout.addWidget(self.username_input)
# Password input
password_label = QLabel("Password:")
self.password_input = QLineEdit()
self.password_input.setEchoMode(QLineEdit.Password)
password_layout = QHBoxLayout()
password_layout.addWidget(password_label)
password_layout.addWidget(self.password_input)
# Keywords input
keywords_label = QLabel("Keywords:")
self.keywords_input = QLineEdit()
keywords_layout = QHBoxLayout()
keywords_layout.addWidget(keywords_label)
keywords_layout.addWidget(self.keywords_input)
# Output directory input
output_dir_label = QLabel("Output Directory:")
self.output_dir_input = QLineEdit()
output_dir_layout = QHBoxLayout()
output_dir_layout.addWidget(output_dir_label)
output_dir_layout.addWidget(self.output_dir_input)
output_dir_button = QPushButton("Browse")
output_dir_button.clicked.connect(self.browse_output_dir)
output_dir_layout.addWidget(output_dir_button)
# Start button
start_button = QPushButton("Start")
start_button.clicked.connect(self.start_email_bot)
# Main layout
main_layout = QVBoxLayout()
main_layout.addLayout(server_layout)
main_layout.addLayout(username_layout)
main_layout.addLayout(password_layout)
main_layout.addLayout(keywords_layout)
main_layout.addLayout(output_dir_layout)
main_layout.addWidget(start_button)
self.setLayout(main_layout)
# Initialize database
self.init_db()
self.setWindowTitle("Email Bot")
self.show()
def init_db(self):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS email_credentials (
server TEXT NOT NULL,
username TEXT NOT NULL,
password TEXT NOT NULL,
port INTEGER,
use_ssl INTEGER,
use_tls INTEGER
)
""")
conn.commit()
conn.close()
def save_email_credentials(self, server, username, password, port, use_ssl, use_tls):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("""
INSERT INTO email_credentials (server, username, password, port, use_ssl, use_tls)
VALUES (?, ?, ?, ?, ?, ?)
""", (server, username, password, port, use_ssl, use_tls))
conn.commit()
conn.close()
def get_email_credentials(self):
conn = sqlite3.connect(DB_FILE)
c = conn.cursor()
c.execute("SELECT * FROM email_credentials")
credentials = c.fetchone()
conn.close()
return credentials
def browse_output_dir(self):
output_dir = QFileDialog.getExistingDirectory(self, "Select Directory")
self.output_dir_input.setText(output_dir)
def start_email_bot(self):
# Get input values
server = self.server_input.text().strip()
username = self.username_input.text().strip()
password = self.password_input.text()
keywords = self.keywords_input.toPlainText().strip()
output_dir = self.output_dir_input.text().strip()
# Validate user input
if not all((server, username, password, keywords, output_dir)):
QMessageBox.warning(self, "Error", "Please fill in all fields.")
return
if not os.path.isdir(output_dir):
QMessageBox.warning(self, "Error", "Invalid output directory.")
return
# Disable input fields and start button
self.server_input.setDisabled(True)
self.username_input.setDisabled(True)
self.password_input.setDisabled(True)
self.keywords_input.setDisabled(True)
self.output_dir_input.setDisabled(True)
self.start_button.setDisabled(True)
# Start email bot in a new thread
self.thread = QThread()
self.worker = EmailBot(server, username, password, keywords, output_dir)
self.worker.moveToThread(self.thread)
self.worker.finished.connect(self.thread.quit)
self.thread.started.connect(self.worker.run)
self.worker.progress.connect(self.update_progress)
self.thread.finished.connect(self.on_thread_finished)
self.thread.start()
def send_email(self, recipient, subject, body):
# Get email credentials
credentials = self.get_email_credentials()
if not credentials:
return False
server, username, password, port, use_ssl, use_tls = credentials
# Create message object
msg = EmailMessage()
msg.set_content(body)
msg["Subject"] = subject
msg["From"] = username
msg["To"] = recipient
try:
# Connect to SMTP server
if use_ssl:
smtp_server = smtplib.SMTP_SSL(server, port)
else:
smtp_server = smtplib.SMTP(server, port)
# Start TLS if needed
if use_tls:
smtp_server.starttls()
# Login to SMTP server
smtp_server.login(username, password)
# Send email
smtp_server.send_message(msg)
# Disconnect from SMTP server
smtp_server.quit()
return True
except Exception as e:
print(e)
return False
def search_emails(self, keywords, output_dir):
# Get email credentials
credentials = self.get_email_credentials()
if not credentials:
return False
server, username, password, port, use_ssl, use_tls = credentials
# Connect to IMAP server
if use_ssl:
imap_server = imaplib.IMAP4_SSL(server, port)
else:
imap_server = imaplib.IMAP4(server, port)
# Login to IMAP server
imap_server.login(username, password)
# Select INBOX folder
imap_server.select("INBOX")
# Search for emails
status, messages = imap_server.search(None, "UNSEEN")
if status != "OK":
return False
# Loop through emails
for msg_id in messages[0].split():
status, msg_data = imap_server.fetch(msg_id, "(RFC822)")
if status != "OK":
continue
email_body = msg_data[0][1].decode("utf-8")
email_message = email.message_from_string(email_body)
# Check if email subject contains any of the keywords
if not any(keyword in email_message["Subject"] for keyword in keywords.split(",")):
continue
# Save email to file
filename = email_message["Subject"] + ".txt"
filepath = os.path.join(output_dir, filename)
# Write email message to a file
with open(filepath, "w") as f:
f.write(email_body)
# Mark email as read
imap_server.store(msg_id, "+FLAGS", "\\Seen")
# Close the mailbox and log out
imap_server.close()
imap_server.logout()
return True
if __name__ == "__main__":
# initialize the EmailBot object and run it
bot = EmailBot()
bot.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment