Last active
November 14, 2023 20:58
-
-
Save 3v1n0/99de748e5c191e82423c69d21e793ba2 to your computer and use it in GitHub Desktop.
Extract italian electronic invoices (fatture elettroniche) from a PEC provider IMAP server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright 2019-2022 - Marco Trevisan | |
# | |
# A Tool to export your invoices in the Italian electronic Fattura Elettronica | |
# format from any IMAP PEC provider (defaulting to Aruba PEC) to your computer | |
# | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 3 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
import argparse | |
import email.utils | |
import imaplib | |
import datetime | |
import mailbox | |
import os | |
import re | |
import string | |
import sys | |
import time | |
import urllib.parse | |
import xml.etree.ElementTree as ET | |
IMAP_SERVER = os.getenv('FE_IMAP_SERVER', default='imaps.pec.aruba.it') | |
IMAP_PORT = os.getenv('FE_IMAP_PORT', default='993') | |
IMAP_NAME = os.getenv('FE_IMAP_NAME', default='') | |
IMAP_PASSWORD = os.getenv('FE_IMAP_PASSWORD', default='') | |
FE_NAMESPACE = 'http://ivaservizi.agenziaentrate.gov.it/docs/xsd/fatture/v1.2' | |
P7M_FE_CONTENTS_RE = re.compile( | |
b'<\?xml.*</[\w\d_.-]+:FatturaElettronica>', re.MULTILINE | re.DOTALL) | |
def peek_header(imap, id, header): | |
return imap.fetch(id, | |
'BODY.PEEK[HEADER.FIELDS ({})]'.format(header))[1][0][1].decode( | |
'utf-8').strip().split('{}: '.format(header), 1)[-1] | |
def extract_imap_invoices(imap_folder='INBOX', filter_messages='all', | |
period=[], mark_as_read=False, output_path=None, | |
overwrite=False, organize=True): | |
imap = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT) | |
try: | |
imap.login(IMAP_NAME, IMAP_PASSWORD) | |
except: | |
print('Login failed, check your credentials') | |
sys.exit(1) | |
imap.select(imap_folder) | |
resp, [invoices] = imap.search(None, | |
'(HEADER X-Fattura-PA Yes {} SINCE {:%d-%b-%Y} BEFORE {:%d-%b-%Y})'.format( | |
ARGS_TO_IMAP_READ_ARGS[filter_messages], *period)) | |
if not invoices: | |
return | |
for i in invoices.split(b' '): | |
was_read = b'\\Seen' in imaplib.ParseFlags( | |
imap.fetch(i, '(FLAGS)')[1][0]) | |
print('Getting email {} dated {} "{}"'.format( | |
i.decode('utf-8'), peek_header(imap, i, 'Date'), | |
peek_header(imap, i, 'Subject'))) | |
try: | |
fullmsg = imap.fetch(i, '(RFC822)') | |
finally: | |
if not was_read and not mark_as_read: | |
imap.store(i, '-FLAGS', '\\Seen') | |
message = mailbox.Message(fullmsg[1][0][1]) | |
extract_invoice(message, period, output_path, overwrite, organize) | |
def extract_p7m_invoice(file_contents): | |
xml_content = P7M_FE_CONTENTS_RE.search(file_contents) | |
if xml_content: | |
decoded = xml_content.group().decode('utf-8', 'ignore') | |
xml_content = ''.join([c for c in decoded if c in string.printable]) | |
return xml_content.encode('utf-8') | |
def check_and_extract_p7m_invoice(filename, file_contents): | |
try: | |
from OpenSSL import crypto | |
from OpenSSL._util import ( | |
ffi as _ffi, | |
lib as _lib, | |
) | |
except: | |
return extract_p7m_invoice(file_contents) | |
p7 = crypto.load_pkcs7_data(crypto.FILETYPE_ASN1, file_contents) | |
bio_out =crypto._new_mem_buf() | |
res = _lib.PKCS7_verify( | |
p7._pkcs7, _ffi.NULL, _ffi.NULL, _ffi.NULL, bio_out, _lib.PKCS7_NOVERIFY) | |
if res == 1: | |
return crypto._bio_to_string(bio_out) | |
print('Failed to load and verify invoice {} p7m data, using manual extraction'.format( | |
filename)) | |
return extract_p7m_invoice(file_contents) | |
def extract_invoice(message, period=[], output_path=None, overwrite=False, organize=True): | |
if message['X-Fattura-PA'] != 'Yes': | |
return | |
if message.get_content_maintype() != 'multipart': | |
return | |
for part in message.walk(): | |
if part.get_content_maintype() == 'multipart': | |
continue | |
if not part.get('Content-Disposition'): | |
continue | |
filename = part.get_filename() | |
if (not filename or | |
not (filename.endswith('.xml') or filename.endswith('.xml.p7m'))): | |
continue | |
invoice_content = part.get_payload(decode=True) | |
invoice_folder = output_path if output_path else os.path.curdir | |
if filename.endswith('.xml.p7m'): | |
xml_content = check_and_extract_p7m_invoice(filename, invoice_content) | |
else: | |
xml_content = invoice_content | |
if not xml_content: | |
continue | |
xml_root = ET.fromstring(xml_content) | |
if xml_root.tag != '{{}}FatturaElettronica'.format(FE_NAMESPACE) and \ | |
not xml_root.find('.//FatturaElettronicaHeader'): | |
continue | |
tm = time.mktime(email.utils.parsedate(message['Date'])) | |
dt = datetime.datetime.fromtimestamp(tm) | |
invoice_date = xml_root.find( | |
'.//FatturaElettronicaBody//DatiGeneraliDocumento/Data') | |
if invoice_date != None: | |
dt = datetime.datetime.strptime(invoice_date.text, '%Y-%m-%d') | |
invoice_amount = xml_root.find( | |
'.//FatturaElettronicaBody//DatiGeneraliDocumento/ImportoTotaleDocumento') | |
if invoice_amount != None: | |
invoice_amount = float(invoice_amount.text) | |
if isinstance(period, datetime.datetime): | |
if dt.year != period.year: | |
continue | |
if len(period) == 2: | |
if not (dt >= period[0] and dt < period[1]): | |
continue | |
if organize: | |
invoice_folder = os.path.join(invoice_folder, str(dt.year)) | |
emitter = xml_root.find( | |
'.//FatturaElettronicaHeader//CedentePrestatore//Denominazione') | |
if emitter != None: | |
emitter = emitter.text | |
invoice_folder = os.path.join(invoice_folder, emitter) | |
outpath = os.path.join(invoice_folder, filename) | |
if not overwrite and os.path.exists(outpath): | |
continue | |
if not os.path.isdir(invoice_folder): | |
os.makedirs(invoice_folder, mode=0o750) | |
with open(outpath, 'wb') as f: | |
print('Saving message "{}"'.format(message['Subject'])) | |
print(' sender: {}'.format(message['From'])) | |
print(' date: {}'.format(dt)) | |
print(' amount: {}'.format(invoice_amount)) | |
print(' output file: file://{}'.format( | |
urllib.parse.quote(f.name))) | |
f.write(invoice_content) | |
os.utime(outpath, (tm, tm)) | |
if filename.endswith('.xml.p7m'): | |
outpath = os.path.join(invoice_folder, filename.rstrip('.p7m')) | |
with open(outpath, 'wb') as f: | |
print(' extracted output file: file://{}'.format( | |
urllib.parse.quote(f.name))) | |
f.write(xml_content) | |
os.utime(outpath, (tm, tm)) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
group = parser.add_mutually_exclusive_group() | |
group.add_argument('mboxfile', | |
nargs='?', | |
type=argparse.FileType('r'), | |
help='Path to local .mbox file') | |
imapgroup = parser.add_argument_group('IMAP Settings') | |
imapgroup.add_argument('-s', '--imap-server', | |
default=':'.join([IMAP_SERVER, IMAP_PORT]), | |
help='IMAP server in the form host:port (default: %(default)s)', | |
metavar="host:port") | |
imapgroup.add_argument('-u', '--imap-username', | |
help='User name to access to the IMAP server', | |
metavar='user@domain.it') | |
imapgroup.add_argument('-f', '--imap-folder', | |
default='INBOX', | |
help='The folder where search the invoices (default: %(default)s)', | |
metavar='FOLDER_NAME') | |
imapgroup.add_argument('-r', '--mark-as-read', | |
action='store_true', | |
help='Mark the processed email as read') | |
ARGS_TO_IMAP_READ_ARGS = {'unread': 'UNSEEN', 'read': 'SEEN', 'all': ''} | |
imapgroup.add_argument('-m', '--only', | |
choices=ARGS_TO_IMAP_READ_ARGS.keys(), | |
default='all', | |
help='Email set to process (default: %(default)s)') | |
timegroup = parser.add_argument_group('Time selection') | |
timegroup.add_argument('-y', '--year', | |
type=lambda s: datetime.datetime.strptime(s, '%Y'), | |
help='Only extract invoices in the year (example: 2018)') | |
timegroup.add_argument('-b', '--before', | |
type=lambda s: datetime.datetime.strptime(s, '%d-%m-%Y'), | |
help='Only extract invoices starting from the given date (example: 31-05-2019)') | |
timegroup.add_argument('-a', '--after', | |
type=lambda s: datetime.datetime.strptime(s, '%d-%m-%Y'), | |
help='Only extract invoice until the given date (example: 18-10-2019)') | |
parser.add_argument('-p', '--output-path', | |
help='Where to save the output .xml files (default: %(default)s)', | |
default=os.path.realpath(os.path.join(os.path.curdir, 'Invoices'))) | |
parser.add_argument('-w', '--overwrite', | |
action='store_true', | |
help='Overwrite an invoice file if already existing') | |
parser.add_argument('-n', '--disable-auto-organizer', | |
action='store_true', | |
help='Don\'t move the invoce to Year/Emitter Name/InvoiceFile.xml') | |
ARGS = parser.parse_args() | |
if not ARGS.mboxfile: | |
if ARGS.imap_username: | |
IMAP_NAME = ARGS.imap_username | |
if not IMAP_NAME: | |
IMAP_NAME = input('User name for IMAP server {}:{}: '.format( | |
IMAP_SERVER, IMAP_PORT)) | |
if not IMAP_NAME: | |
print('Impossible to use an empty account name') | |
sys.exit(1) | |
if not IMAP_PASSWORD: | |
from getpass import getpass | |
IMAP_PASSWORD = getpass('Password for {} at {}:{}: '.format( | |
IMAP_NAME, IMAP_SERVER, IMAP_PORT)) | |
if not IMAP_SERVER or not IMAP_PORT or not IMAP_NAME: | |
print('No IMAP server configured, must pass a mbox file as input') | |
sys.exit(1) | |
period = (datetime.datetime.min.replace(year=1900), datetime.datetime.max) | |
if ARGS.year: | |
period = (ARGS.year, datetime.datetime(ARGS.year.year+1, 1, 1)) | |
if ARGS.before: | |
period = (period[0], ARGS.before) | |
if ARGS.after: | |
period = (ARGS.after, period[1]) | |
if ARGS.mboxfile: | |
for message in mailbox.mbox(ARGS.mboxfile.name): | |
extract_invoice(message, period, ARGS.output_path, ARGS.overwrite, | |
not ARGS.disable_auto_organizer) | |
else: | |
extract_imap_invoices(ARGS.imap_folder, ARGS.only, period, | |
ARGS.mark_as_read, ARGS.output_path, ARGS.overwrite, | |
not ARGS.disable_auto_organizer) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment