Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@3v1n0
Last active November 14, 2023 20:58
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 3v1n0/99de748e5c191e82423c69d21e793ba2 to your computer and use it in GitHub Desktop.
Save 3v1n0/99de748e5c191e82423c69d21e793ba2 to your computer and use it in GitHub Desktop.
Extract italian electronic invoices (fatture elettroniche) from a PEC provider IMAP server
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2019-2022 - Marco Trevisan
#
# A Tool to export your invoices in the Italian electronic Fattura Elettronica
# format from any IMAP PEC provider (defaulting to Aruba PEC) to your computer
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import argparse
import email.utils
import imaplib
import datetime
import mailbox
import os
import re
import string
import sys
import time
import urllib.parse
import xml.etree.ElementTree as ET
IMAP_SERVER = os.getenv('FE_IMAP_SERVER', default='imaps.pec.aruba.it')
IMAP_PORT = os.getenv('FE_IMAP_PORT', default='993')
IMAP_NAME = os.getenv('FE_IMAP_NAME', default='')
IMAP_PASSWORD = os.getenv('FE_IMAP_PASSWORD', default='')
FE_NAMESPACE = 'http://ivaservizi.agenziaentrate.gov.it/docs/xsd/fatture/v1.2'
P7M_FE_CONTENTS_RE = re.compile(
b'<\?xml.*</[\w\d_.-]+:FatturaElettronica>', re.MULTILINE | re.DOTALL)
def peek_header(imap, id, header):
return imap.fetch(id,
'BODY.PEEK[HEADER.FIELDS ({})]'.format(header))[1][0][1].decode(
'utf-8').strip().split('{}: '.format(header), 1)[-1]
def extract_imap_invoices(imap_folder='INBOX', filter_messages='all',
period=[], mark_as_read=False, output_path=None,
overwrite=False, organize=True):
imap = imaplib.IMAP4_SSL(IMAP_SERVER, IMAP_PORT)
try:
imap.login(IMAP_NAME, IMAP_PASSWORD)
except:
print('Login failed, check your credentials')
sys.exit(1)
imap.select(imap_folder)
resp, [invoices] = imap.search(None,
'(HEADER X-Fattura-PA Yes {} SINCE {:%d-%b-%Y} BEFORE {:%d-%b-%Y})'.format(
ARGS_TO_IMAP_READ_ARGS[filter_messages], *period))
if not invoices:
return
for i in invoices.split(b' '):
was_read = b'\\Seen' in imaplib.ParseFlags(
imap.fetch(i, '(FLAGS)')[1][0])
print('Getting email {} dated {} "{}"'.format(
i.decode('utf-8'), peek_header(imap, i, 'Date'),
peek_header(imap, i, 'Subject')))
try:
fullmsg = imap.fetch(i, '(RFC822)')
finally:
if not was_read and not mark_as_read:
imap.store(i, '-FLAGS', '\\Seen')
message = mailbox.Message(fullmsg[1][0][1])
extract_invoice(message, period, output_path, overwrite, organize)
def extract_p7m_invoice(file_contents):
xml_content = P7M_FE_CONTENTS_RE.search(file_contents)
if xml_content:
decoded = xml_content.group().decode('utf-8', 'ignore')
xml_content = ''.join([c for c in decoded if c in string.printable])
return xml_content.encode('utf-8')
def check_and_extract_p7m_invoice(filename, file_contents):
try:
from OpenSSL import crypto
from OpenSSL._util import (
ffi as _ffi,
lib as _lib,
)
except:
return extract_p7m_invoice(file_contents)
p7 = crypto.load_pkcs7_data(crypto.FILETYPE_ASN1, file_contents)
bio_out =crypto._new_mem_buf()
res = _lib.PKCS7_verify(
p7._pkcs7, _ffi.NULL, _ffi.NULL, _ffi.NULL, bio_out, _lib.PKCS7_NOVERIFY)
if res == 1:
return crypto._bio_to_string(bio_out)
print('Failed to load and verify invoice {} p7m data, using manual extraction'.format(
filename))
return extract_p7m_invoice(file_contents)
def extract_invoice(message, period=[], output_path=None, overwrite=False, organize=True):
if message['X-Fattura-PA'] != 'Yes':
return
if message.get_content_maintype() != 'multipart':
return
for part in message.walk():
if part.get_content_maintype() == 'multipart':
continue
if not part.get('Content-Disposition'):
continue
filename = part.get_filename()
if (not filename or
not (filename.endswith('.xml') or filename.endswith('.xml.p7m'))):
continue
invoice_content = part.get_payload(decode=True)
invoice_folder = output_path if output_path else os.path.curdir
if filename.endswith('.xml.p7m'):
xml_content = check_and_extract_p7m_invoice(filename, invoice_content)
else:
xml_content = invoice_content
if not xml_content:
continue
xml_root = ET.fromstring(xml_content)
if xml_root.tag != '{{}}FatturaElettronica'.format(FE_NAMESPACE) and \
not xml_root.find('.//FatturaElettronicaHeader'):
continue
tm = time.mktime(email.utils.parsedate(message['Date']))
dt = datetime.datetime.fromtimestamp(tm)
invoice_date = xml_root.find(
'.//FatturaElettronicaBody//DatiGeneraliDocumento/Data')
if invoice_date != None:
dt = datetime.datetime.strptime(invoice_date.text, '%Y-%m-%d')
invoice_amount = xml_root.find(
'.//FatturaElettronicaBody//DatiGeneraliDocumento/ImportoTotaleDocumento')
if invoice_amount != None:
invoice_amount = float(invoice_amount.text)
if isinstance(period, datetime.datetime):
if dt.year != period.year:
continue
if len(period) == 2:
if not (dt >= period[0] and dt < period[1]):
continue
if organize:
invoice_folder = os.path.join(invoice_folder, str(dt.year))
emitter = xml_root.find(
'.//FatturaElettronicaHeader//CedentePrestatore//Denominazione')
if emitter != None:
emitter = emitter.text
invoice_folder = os.path.join(invoice_folder, emitter)
outpath = os.path.join(invoice_folder, filename)
if not overwrite and os.path.exists(outpath):
continue
if not os.path.isdir(invoice_folder):
os.makedirs(invoice_folder, mode=0o750)
with open(outpath, 'wb') as f:
print('Saving message "{}"'.format(message['Subject']))
print(' sender: {}'.format(message['From']))
print(' date: {}'.format(dt))
print(' amount: {}'.format(invoice_amount))
print(' output file: file://{}'.format(
urllib.parse.quote(f.name)))
f.write(invoice_content)
os.utime(outpath, (tm, tm))
if filename.endswith('.xml.p7m'):
outpath = os.path.join(invoice_folder, filename.rstrip('.p7m'))
with open(outpath, 'wb') as f:
print(' extracted output file: file://{}'.format(
urllib.parse.quote(f.name)))
f.write(xml_content)
os.utime(outpath, (tm, tm))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group()
group.add_argument('mboxfile',
nargs='?',
type=argparse.FileType('r'),
help='Path to local .mbox file')
imapgroup = parser.add_argument_group('IMAP Settings')
imapgroup.add_argument('-s', '--imap-server',
default=':'.join([IMAP_SERVER, IMAP_PORT]),
help='IMAP server in the form host:port (default: %(default)s)',
metavar="host:port")
imapgroup.add_argument('-u', '--imap-username',
help='User name to access to the IMAP server',
metavar='user@domain.it')
imapgroup.add_argument('-f', '--imap-folder',
default='INBOX',
help='The folder where search the invoices (default: %(default)s)',
metavar='FOLDER_NAME')
imapgroup.add_argument('-r', '--mark-as-read',
action='store_true',
help='Mark the processed email as read')
ARGS_TO_IMAP_READ_ARGS = {'unread': 'UNSEEN', 'read': 'SEEN', 'all': ''}
imapgroup.add_argument('-m', '--only',
choices=ARGS_TO_IMAP_READ_ARGS.keys(),
default='all',
help='Email set to process (default: %(default)s)')
timegroup = parser.add_argument_group('Time selection')
timegroup.add_argument('-y', '--year',
type=lambda s: datetime.datetime.strptime(s, '%Y'),
help='Only extract invoices in the year (example: 2018)')
timegroup.add_argument('-b', '--before',
type=lambda s: datetime.datetime.strptime(s, '%d-%m-%Y'),
help='Only extract invoices starting from the given date (example: 31-05-2019)')
timegroup.add_argument('-a', '--after',
type=lambda s: datetime.datetime.strptime(s, '%d-%m-%Y'),
help='Only extract invoice until the given date (example: 18-10-2019)')
parser.add_argument('-p', '--output-path',
help='Where to save the output .xml files (default: %(default)s)',
default=os.path.realpath(os.path.join(os.path.curdir, 'Invoices')))
parser.add_argument('-w', '--overwrite',
action='store_true',
help='Overwrite an invoice file if already existing')
parser.add_argument('-n', '--disable-auto-organizer',
action='store_true',
help='Don\'t move the invoce to Year/Emitter Name/InvoiceFile.xml')
ARGS = parser.parse_args()
if not ARGS.mboxfile:
if ARGS.imap_username:
IMAP_NAME = ARGS.imap_username
if not IMAP_NAME:
IMAP_NAME = input('User name for IMAP server {}:{}: '.format(
IMAP_SERVER, IMAP_PORT))
if not IMAP_NAME:
print('Impossible to use an empty account name')
sys.exit(1)
if not IMAP_PASSWORD:
from getpass import getpass
IMAP_PASSWORD = getpass('Password for {} at {}:{}: '.format(
IMAP_NAME, IMAP_SERVER, IMAP_PORT))
if not IMAP_SERVER or not IMAP_PORT or not IMAP_NAME:
print('No IMAP server configured, must pass a mbox file as input')
sys.exit(1)
period = (datetime.datetime.min.replace(year=1900), datetime.datetime.max)
if ARGS.year:
period = (ARGS.year, datetime.datetime(ARGS.year.year+1, 1, 1))
if ARGS.before:
period = (period[0], ARGS.before)
if ARGS.after:
period = (ARGS.after, period[1])
if ARGS.mboxfile:
for message in mailbox.mbox(ARGS.mboxfile.name):
extract_invoice(message, period, ARGS.output_path, ARGS.overwrite,
not ARGS.disable_auto_organizer)
else:
extract_imap_invoices(ARGS.imap_folder, ARGS.only, period,
ARGS.mark_as_read, ARGS.output_path, ARGS.overwrite,
not ARGS.disable_auto_organizer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment