diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
| commit | 3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch) | |
| tree | a44932296ef4a9b71d5f010906253d8c53727726 /addons/l10n_it_edi/models/ir_mail_server.py | |
| parent | 0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff) | |
initial commit 2
Diffstat (limited to 'addons/l10n_it_edi/models/ir_mail_server.py')
| -rw-r--r-- | addons/l10n_it_edi/models/ir_mail_server.py | 434 |
1 files changed, 434 insertions, 0 deletions
diff --git a/addons/l10n_it_edi/models/ir_mail_server.py b/addons/l10n_it_edi/models/ir_mail_server.py new file mode 100644 index 00000000..e74e6467 --- /dev/null +++ b/addons/l10n_it_edi/models/ir_mail_server.py @@ -0,0 +1,434 @@ +# -*- coding:utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import zipfile +import io +import re +import logging +import email +import email.policy +import dateutil +import pytz + +from lxml import etree +from datetime import datetime +from xmlrpc import client as xmlrpclib + +from odoo import api, fields, models, tools, _ +from odoo.exceptions import ValidationError, UserError +from odoo.addons.l10n_it_edi.tools.remove_signature import remove_signature + + +_logger = logging.getLogger(__name__) + +class FetchmailServer(models.Model): + _name = 'fetchmail.server' + _inherit = 'fetchmail.server' + + l10n_it_is_pec = fields.Boolean('PEC server', help="If PEC Server, only mail from '...@pec.fatturapa.it' will be processed.") + l10n_it_last_uid = fields.Integer(string='Last message UID', default=1) + + def _search_edi_invoice(self, att_name, send_state=False): + """ Search sent l10n_it_edi fatturaPA invoices """ + + conditions = [ + ('move_id', "!=", False), + ('edi_format_id.code', '=', 'fattura_pa'), + ('attachment_id.name', '=', att_name), + ] + if send_state: + conditions.append(('move_id.l10n_it_send_state', '=', send_state)) + + return self.env['account.edi.document'].search(conditions, limit=1).move_id + + @api.constrains('l10n_it_is_pec', 'server_type') + def _check_pec(self): + for record in self: + if record.l10n_it_is_pec and record.server_type != 'imap': + raise ValidationError(_("PEC mail server must be of type IMAP.")) + + def fetch_mail(self): + """ WARNING: meant for cron usage only - will commit() after each email! """ + + MailThread = self.env['mail.thread'] + for server in self.filtered(lambda s: s.l10n_it_is_pec): + _logger.info('start checking for new emails on %s PEC server %s', server.server_type, server.name) + + count, failed = 0, 0 + imap_server = None + try: + imap_server = server.connect() + imap_server.select() + + # Only download new emails + email_filter = ['(UID %s:*)' % (server.l10n_it_last_uid)] + + # The l10n_it_edi.fatturapa_bypass_incoming_address_filter prevents the sender address check on incoming email. + bypass_incoming_address_filter = self.env['ir.config_parameter'].get_param('l10n_it_edi.bypass_incoming_address_filter', False) + if not bypass_incoming_address_filter: + email_filter.append('(FROM "@pec.fatturapa.it")') + + data = imap_server.uid('search', None, *email_filter)[1] + + new_max_uid = server.l10n_it_last_uid + for uid in data[0].split(): + if int(uid) <= server.l10n_it_last_uid: + # We get always minimum 1 message. If no new message, we receive the newest already managed. + continue + + result, data = imap_server.uid('fetch', uid, '(RFC822)') + + if not data[0]: + continue + message = data[0][1] + + # To leave the mail in the state in which they were. + if "Seen" not in data[1].decode("utf-8"): + imap_server.uid('STORE', uid, '+FLAGS', '\\Seen') + else: + imap_server.uid('STORE', uid, '-FLAGS', '\\Seen') + + # See details in message_process() in mail_thread.py + if isinstance(message, xmlrpclib.Binary): + message = bytes(message.data) + if isinstance(message, str): + message = message.encode('utf-8') + msg_txt = email.message_from_bytes(message, policy=email.policy.SMTP) + + try: + self._attachment_invoice(msg_txt) + new_max_uid = max(new_max_uid, int(uid)) + except Exception: + _logger.info('Failed to process mail from %s server %s.', server.server_type, server.name, exc_info=True) + failed += 1 + self._cr.commit() + count += 1 + server.write({'l10n_it_last_uid': new_max_uid}) + _logger.info("Fetched %d email(s) on %s server %s; %d succeeded, %d failed.", count, server.server_type, server.name, (count - failed), failed) + except Exception: + _logger.info("General failure when trying to fetch mail from %s server %s.", server.server_type, server.name, exc_info=True) + finally: + if imap_server: + imap_server.close() + imap_server.logout() + server.write({'date': fields.Datetime.now()}) + return super(FetchmailServer, self.filtered(lambda s: not s.l10n_it_is_pec)).fetch_mail() + + def _attachment_invoice(self, msg_txt): + parsed_values = self.env['mail.thread']._message_parse_extract_payload(msg_txt) + body, attachments = parsed_values['body'], parsed_values['attachments'] + from_address = msg_txt.get('from') + for attachment in attachments: + split_attachment = attachment.fname.rpartition('.') + if len(split_attachment) < 3: + _logger.info('E-invoice filename not compliant: %s', attachment.fname) + continue + attachment_name = split_attachment[0] + attachment_ext = split_attachment[2] + split_underscore = attachment_name.rsplit('_', 2) + if len(split_underscore) < 2: + _logger.info('E-invoice filename not compliant: %s', attachment.fname) + continue + + if attachment_ext != 'zip': + if split_underscore[1] in ['RC', 'NS', 'MC', 'MT', 'EC', 'SE', 'NE', 'DT']: + # we have a receipt + self._message_receipt_invoice(split_underscore[1], attachment) + else: + att_filename = attachment.fname + match = re.search("([A-Z]{2}[A-Za-z0-9]{2,28}_[A-Za-z0-9]{0,5}.(xml.p7m|xml))", att_filename) + # If match, we have an invoice. + if match: + # If it's signed, the content has a bytes type and we just remove the signature's envelope + if match.groups()[1] == 'xml.p7m': + att_content_data = remove_signature(attachment.content) + # If the envelope cannot be removed, the remove_signature returns None, so we skip + if not att_content_data: + _logger.warning("E-invoice couldn't be read: %s", att_filename) + continue + att_filename = att_filename.replace('.xml.p7m', '.xml') + else: + # Otherwise, it should be an utf-8 encoded XML string + att_content_data = attachment.content.encode() + self._create_invoice_from_mail(att_content_data, att_filename, from_address) + else: + if split_underscore[1] == 'AT': + # Attestazione di avvenuta trasmissione della fattura con impossibilità di recapito + self._message_AT_invoice(attachment) + else: + _logger.info('New E-invoice in zip file: %s', attachment.fname) + self._create_invoice_from_mail_with_zip(attachment, from_address) + + def _create_invoice_from_mail(self, att_content_data, att_name, from_address): + """ Creates an invoice from the content of an email present in ir.attachments + + :param att_content_data: The 'utf-8' encoded bytes string representing the content of the attachment. + :param att_name: The attachment's file name. + :param from_address: The sender address of the email. + """ + + invoices = self.env['account.move'] + + # Check if we already imported the email as an attachment + existing = self.env['ir.attachment'].search([('name', '=', att_name), ('res_model', '=', 'account.move')]) + if existing: + _logger.info('E-invoice already exist: %s', att_name) + return invoices + + # Create the new attachment for the file + attachment = self.env['ir.attachment'].create({ + 'name': att_name, + 'raw': att_content_data, + 'res_model': 'account.move', + 'type': 'binary'}) + + # Decode the file. + try: + tree = etree.fromstring(att_content_data) + except Exception: + _logger.info('The xml file is badly formatted: %s', att_name) + return invoices + + invoices = self.env.ref('l10n_it_edi.edi_fatturaPA')._create_invoice_from_xml_tree(att_name, tree) + if not invoices: + _logger.info('E-invoice not found in file: %s', att_name) + return invoices + invoices.l10n_it_send_state = "new" + invoices.invoice_source_email = from_address + for invoice in invoices: + invoice.with_context(no_new_invoice=True, default_res_id=invoice.id) \ + .message_post(body=(_("Original E-invoice XML file")), attachment_ids=[attachment.id]) + + self._cr.commit() + + _logger.info('New E-invoices (%s), ids: %s', att_name, [x.id for x in invoices]) + return invoices + + def _create_invoice_from_mail_with_zip(self, attachment_zip, from_address): + with zipfile.ZipFile(io.BytesIO(attachment_zip.content)) as z: + for att_name in z.namelist(): + existing = self.env['ir.attachment'].search([('name', '=', att_name), ('res_model', '=', 'account.move')]) + if existing: + # invoice already exist + _logger.info('E-invoice in zip file (%s) already exist: %s', attachment_zip.fname, att_name) + continue + att_content = z.open(att_name).read() + + self._create_invoice_from_mail(att_content, att_name, from_address) + + def _message_AT_invoice(self, attachment_zip): + with zipfile.ZipFile(io.BytesIO(attachment_zip.content)) as z: + for attachment_name in z.namelist(): + split_name_attachment = attachment_name.rpartition('.') + if len(split_name_attachment) < 3: + continue + split_underscore = split_name_attachment[0].rsplit('_', 2) + if len(split_underscore) < 2: + continue + if split_underscore[1] == 'AT': + attachment = z.open(attachment_name).read() + _logger.info('New AT receipt for: %s', split_underscore[0]) + try: + tree = etree.fromstring(attachment) + except: + _logger.info('Error in decoding new receipt file: %s', attachment_name) + return + + elements = tree.xpath('//NomeFile') + if elements and elements[0].text: + filename = elements[0].text + else: + return + + related_invoice = self._search_edi_invoice(filename) + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', filename) + return + + related_invoice.l10n_it_send_state = 'failed_delivery' + info = self._return_multi_line_xml(tree, ['//IdentificativoSdI', '//DataOraRicezione', '//MessageId', '//PecMessageId', '//Note']) + related_invoice.message_post( + body=(_("ES certify that it has received the invoice and that the file \ + could not be delivered to the addressee. <br/>%s") % (info)) + ) + + def _message_receipt_invoice(self, receipt_type, attachment): + + try: + tree = etree.fromstring(attachment.content.encode()) + except: + _logger.info('Error in decoding new receipt file: %s', attachment.fname) + return {} + + elements = tree.xpath('//NomeFile') + if elements and elements[0].text: + filename = elements[0].text + else: + return {} + + if receipt_type == 'RC': + # Delivery receipt + # This is the receipt sent by the ES to the transmitting subject to communicate + # delivery of the file to the addressee + related_invoice = self._search_edi_invoice(filename, 'sent') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + related_invoice.l10n_it_send_state = 'delivered' + info = self._return_multi_line_xml(tree, ['//IdentificativoSdI', '//DataOraRicezione', '//DataOraConsegna', '//Note']) + related_invoice.message_post( + body=(_("E-Invoice is delivery to the destinatory:<br/>%s") % (info)) + ) + + elif receipt_type == 'NS': + # Rejection notice + # This is the receipt sent by the ES to the transmitting subject if one or more of + # the checks carried out by the ES on the file received do not have a successful result. + related_invoice = self._search_edi_invoice(filename, 'sent') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + related_invoice.l10n_it_send_state = 'invalid' + error = self._return_error_xml(tree) + related_invoice.message_post( + body=(_("Errors in the E-Invoice :<br/>%s") % (error)) + ) + related_invoice.activity_schedule( + 'mail.mail_activity_data_todo', + summary='Rejection notice', + user_id=related_invoice.invoice_user_id.id if related_invoice.invoice_user_id else self.env.user.id) + + elif receipt_type == 'MC': + # Failed delivery notice + # This is the receipt sent by the ES to the transmitting subject if the file is not + # delivered to the addressee. + related_invoice = self._search_edi_invoice(filename, 'sent') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + info = self._return_multi_line_xml(tree, [ + '//IdentificativoSdI', + '//DataOraRicezione', + '//Descrizione', + '//MessageId', + '//Note']) + related_invoice.message_post( + body=(_("The E-invoice is not delivered to the addressee. The Exchange System is\ + unable to deliver the file to the Public Administration. The Exchange System will\ + contact the PA to report the problem and request that they provide a solution. \ + During the following 15 days, the Exchange System will try to forward the FatturaPA\ + file to the Administration in question again. More information:<br/>%s") % (info)) + ) + + elif receipt_type == 'NE': + # Outcome notice + # This is the receipt sent by the ES to the invoice sender to communicate the result + # (acceptance or refusal of the invoice) of the checks carried out on the document by + # the addressee. + related_invoice = self._search_edi_invoice(filename, 'delivered') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + elements = tree.xpath('//Esito') + if elements and elements[0].text: + if elements[0].text == 'EC01': + related_invoice.l10n_it_send_state = 'delivered_accepted' + elif elements[0].text == 'EC02': + related_invoice.l10n_it_send_state = 'delivered_refused' + + info = self._return_multi_line_xml(tree, + ['//Esito', + '//Descrizione', + '//IdentificativoSdI', + '//DataOraRicezione', + '//DataOraConsegna', + '//Note' + ]) + related_invoice.message_post( + body=(_("Outcome notice: %s<br/>%s") % (related_invoice.l10n_it_send_state, info)) + ) + if related_invoice.l10n_it_send_state == 'delivered_refused': + related_invoice.activity_schedule( + 'mail.mail_activity_todo', + user_id=related_invoice.invoice_user_id.id if related_invoice.invoice_user_id else self.env.user.id, + summary='Outcome notice: Refused') + + # elif receipt_type == 'MT': + # Metadata file + # This is the file sent by the ES to the addressee together with the invoice file, + # containing the main reference data of the file useful for processing, including + # the IdentificativoSDI. + # Useless for Odoo + + elif receipt_type == 'DT': + # Deadline passed notice + # This is the receipt sent by the ES to both the invoice sender and the invoice + # addressee to communicate the expiry of the maximum term for communication of + # acceptance/refusal. + related_invoice = self._search_edi_invoice(filename, 'delivered') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + related_invoice.l10n_it_send_state = 'delivered_expired' + info = self._return_multi_line_xml(tree, [ + '//Descrizione', + '//IdentificativoSdI', + '//Note']) + related_invoice.message_post( + body=(_("Expiration of the maximum term for communication of acceptance/refusal:\ + %s<br/>%s") % (filename, info)) + ) + + def _return_multi_line_xml(self, tree, element_tags): + output_str = "<ul>" + + for element_tag in element_tags: + elements = tree.xpath(element_tag) + if not elements: + continue + for element in elements: + if element.text: + text = " ".join(element.text.split()) + output_str += "<li>%s: %s</li>" % (element.tag, text) + return output_str + "</ul>" + + def _return_error_xml(self, tree): + output_str = "<ul>" + + elements = tree.xpath('//Errore') + if not elements: + return + for element in elements: + descrizione = " ".join(element[1].text.split()) + if descrizione: + output_str += "<li>Errore %s: %s</li>" % (element[0].text, descrizione) + return output_str + "</ul>" + +class IrMailServer(models.Model): + _name = "ir.mail_server" + _inherit = "ir.mail_server" + + def _get_test_email_addresses(self): + self.ensure_one() + + company = self.env["res.company"].search([("l10n_it_mail_pec_server_id", "=", self.id)], limit=1) + if not company: + # it's not a PEC server + return super()._get_test_email_addresses() + email_from = self.smtp_user + if not email_from: + raise UserError(_('Please configure Username for this Server PEC')) + email_to = company.l10n_it_address_recipient_fatturapa + if not email_to: + raise UserError(_('Please configure Government PEC-mail in company settings')) + return email_from, email_to + + def build_email(self, email_from, email_to, subject, body, email_cc=None, email_bcc=None, reply_to=False, + attachments=None, message_id=None, references=None, object_id=False, subtype='plain', headers=None, + body_alternative=None, subtype_alternative='plain'): + + if self.env.context.get('wo_bounce_return_path') and headers: + headers['Return-Path'] = email_from + return super(IrMailServer, self).build_email(email_from, email_to, subject, body, email_cc=email_cc, email_bcc=email_bcc, reply_to=reply_to, + attachments=attachments, message_id=message_id, references=references, object_id=object_id, subtype=subtype, headers=headers, + body_alternative=body_alternative, subtype_alternative=subtype_alternative) |
