# -*- coding:utf-8 -*- # Part of Odoo. See LICENSE file for full copyright and licensing details. from odoo import api, models, fields, _ from odoo.tests.common import Form from odoo.exceptions import UserError from odoo.addons.l10n_it_edi.tools.remove_signature import remove_signature from odoo.osv.expression import OR, AND from lxml import etree from datetime import datetime import re import logging _logger = logging.getLogger(__name__) DEFAULT_FACTUR_ITALIAN_DATE_FORMAT = '%Y-%m-%d' class AccountEdiFormat(models.Model): _inherit = 'account.edi.format' # ------------------------------------------------------------------------- # Helpers # ------------------------------------------------------------------------- @api.model def _l10n_it_edi_generate_electronic_invoice_filename(self, invoice): '''Returns a name conform to the Fattura pa Specifications: See ES documentation 2.2 ''' a = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" n = invoice.id progressive_number = "" while n: (n, m) = divmod(n, len(a)) progressive_number = a[m] + progressive_number return '%(country_code)s%(codice)s_%(progressive_number)s.xml' % { 'country_code': invoice.company_id.country_id.code, 'codice': invoice.company_id.l10n_it_codice_fiscale.replace(' ', ''), 'progressive_number': progressive_number.zfill(5), } def _l10n_it_edi_check_invoice_configuration(self, invoice): errors = [] seller = invoice.company_id buyer = invoice.commercial_partner_id # <1.1.1.1> if not seller.country_id: errors.append(_("%s must have a country", seller.display_name)) # <1.1.1.2> if not seller.vat: errors.append(_("%s must have a VAT number", seller.display_name)) elif len(seller.vat) > 30: errors.append(_("The maximum length for VAT number is 30. %s have a VAT number too long: %s.", seller.display_name, seller.vat)) # <1.2.1.2> if not seller.l10n_it_codice_fiscale: errors.append(_("%s must have a codice fiscale number", seller.display_name)) # <1.2.1.8> if not seller.l10n_it_tax_system: errors.append(_("The seller's company must have a tax system.")) # <1.2.2> if not seller.street and not seller.street2: errors.append(_("%s must have a street.", seller.display_name)) if not seller.zip: errors.append(_("%s must have a post code.", seller.display_name)) elif len(seller.zip) != 5 and seller.country_id.code == 'IT': errors.append(_("%s must have a post code of length 5.", seller.display_name)) if not seller.city: errors.append(_("%s must have a city.", seller.display_name)) if not seller.country_id: errors.append(_("%s must have a country.", seller.display_name)) if seller.l10n_it_has_tax_representative and not seller.l10n_it_tax_representative_partner_id.vat: errors.append(_("Tax representative partner %s of %s must have a tax number.", seller.l10n_it_tax_representative_partner_id.display_name, seller.display_name)) # <1.4.1> if not buyer.vat and not buyer.l10n_it_codice_fiscale and buyer.country_id.code == 'IT': errors.append(_("The buyer, %s, or his company must have either a VAT number either a tax code (Codice Fiscale).", buyer.display_name)) # <1.4.2> if not buyer.street and not buyer.street2: errors.append(_("%s must have a street.", buyer.display_name)) if not buyer.zip: errors.append(_("%s must have a post code.", buyer.display_name)) elif len(buyer.zip) != 5 and buyer.country_id.code == 'IT': errors.append(_("%s must have a post code of length 5.", buyer.display_name)) if not buyer.city: errors.append(_("%s must have a city.", buyer.display_name)) if not buyer.country_id: errors.append(_("%s must have a country.", buyer.display_name)) # <2.2.1> for invoice_line in invoice.invoice_line_ids: if not invoice_line.display_type and len(invoice_line.tax_ids) != 1: raise UserError(_("You must select one and only one tax by line.")) for tax_line in invoice.line_ids.filtered(lambda line: line.tax_line_id): if not tax_line.tax_line_id.l10n_it_kind_exoneration and tax_line.tax_line_id.amount == 0: errors.append(_("%s has an amount of 0.0, you must indicate the kind of exoneration.", tax_line.name)) if not invoice.partner_bank_id: errors.append(_("The seller must have a bank account.")) return errors # ------------------------------------------------------------------------- # Export # ------------------------------------------------------------------------- def _is_embedding_to_invoice_pdf_needed(self): # OVERRIDE self.ensure_one() return True if self.code == 'fattura_pa' else super()._is_embedding_to_invoice_pdf_needed() def _is_compatible_with_journal(self, journal): # OVERRIDE self.ensure_one() if self.code != 'fattura_pa': return super()._is_compatible_with_journal(journal) return journal.type == 'sale' and journal.country_code == 'IT' def _l10n_it_edi_is_required_for_invoice(self, invoice): """ Is the edi required for this invoice based on the method (here: PEC mail) Deprecated: in future release PEC mail will be removed. TO OVERRIDE """ return invoice.is_sale_document() and invoice.l10n_it_send_state not in ('sent', 'delivered', 'delivered_accepted') and invoice.country_code == 'IT' def _is_required_for_invoice(self, invoice): # OVERRIDE self.ensure_one() if self.code != 'fattura_pa': return super()._is_required_for_invoice(invoice) return self._l10n_it_edi_is_required_for_invoice(invoice) def _post_fattura_pa(self, invoices): # TO OVERRIDE invoice = invoices # no batching ensure that we only have one invoice invoice.l10n_it_send_state = 'other' invoice._check_before_xml_exporting() if invoice.l10n_it_einvoice_id and invoice.l10n_it_send_state not in ['invalid', 'to_send']: return {'error': _("You can't regenerate an E-Invoice when the first one is sent and there are no errors")} if invoice.l10n_it_einvoice_id: invoice.l10n_it_einvoice_id.unlink() res = invoice.invoice_generate_xml() if len(invoice.commercial_partner_id.l10n_it_pa_index or '') == 6: invoice.message_post( body=(_("Invoices for PA are not managed by Odoo, you can download the document and send it on your own.")) ) else: invoice.l10n_it_send_state = 'to_send' return {invoice: res} def _post_invoice_edi(self, invoices, test_mode=False): # OVERRIDE self.ensure_one() edi_result = super()._post_invoice_edi(invoices) if self.code != 'fattura_pa': return edi_result return self._post_fattura_pa(invoices) # ------------------------------------------------------------------------- # Import # ------------------------------------------------------------------------- def _check_filename_is_fattura_pa(self, filename): return re.search("([A-Z]{2}[A-Za-z0-9]{2,28}_[A-Za-z0-9]{0,5}.(xml.p7m|xml))", filename) def _is_fattura_pa(self, filename, tree): return self.code == 'fattura_pa' and self._check_filename_is_fattura_pa(filename) def _create_invoice_from_xml_tree(self, filename, tree): self.ensure_one() if self._is_fattura_pa(filename, tree): return self._import_fattura_pa(tree, self.env['account.move']) return super()._create_invoice_from_xml_tree(filename, tree) def _update_invoice_from_xml_tree(self, filename, tree, invoice): self.ensure_one() if self._is_fattura_pa(filename, tree): if len(tree.xpath('//FatturaElettronicaBody')) > 1: invoice.message_post(body='The attachment contains multiple invoices, this invoice was not updated from it.', message_type='comment', subtype_xmlid='mail.mt_note', author_id=self.env.ref('base.partner_root').id) else: return self._import_fattura_pa(tree, invoice) return super()._update_invoice_from_xml_tree(filename, tree, invoice) def _decode_p7m_to_xml(self, filename, content): decoded_content = remove_signature(content) if not decoded_content: return None try: # Some malformed XML are accepted by FatturaPA, this expends compatibility parser = etree.XMLParser(recover=True) xml_tree = etree.fromstring(decoded_content, parser) except Exception as e: _logger.exception("Error when converting the xml content to etree: %s", e) return None if not len(xml_tree): return None return xml_tree def _create_invoice_from_binary(self, filename, content, extension): self.ensure_one() if extension.lower() == '.xml.p7m': decoded_content = self._decode_p7m_to_xml(filename, content) if decoded_content is not None and self._is_fattura_pa(filename, decoded_content): return self._import_fattura_pa(decoded_content, self.env['account.move']) return super()._create_invoice_from_binary(filename, content, extension) def _update_invoice_from_binary(self, filename, content, extension, invoice): self.ensure_one() if extension.lower() == '.xml.p7m': decoded_content = self._decode_p7m_to_xml(filename, content) if decoded_content is not None and self._is_fattura_pa(filename, decoded_content): return self._import_fattura_pa(decoded_content, invoice) return super()._update_invoice_from_binary(filename, content, extension, invoice) def _import_fattura_pa(self, tree, invoice): """ Decodes a fattura_pa invoice into an invoice. :param tree: the fattura_pa tree to decode. :param invoice: the invoice to update or an empty recordset. :returns: the invoice where the fattura_pa data was imported. """ invoices = self.env['account.move'] first_run = True # possible to have multiple invoices in the case of an invoice batch, the batch itself is repeated for every invoice of the batch for body_tree in tree.xpath('//FatturaElettronicaBody'): if not first_run or not invoice: # make sure all the iterations create a new invoice record (except the first which could have already created one) invoice = self.env['account.move'] first_run = False # Type must be present in the context to get the right behavior of the _default_journal method (account.move). # journal_id must be present in the context to get the right behavior of the _default_account method (account.move.line). elements = tree.xpath('//CessionarioCommittente//IdCodice') company = elements and self.env['res.company'].search([('vat', 'ilike', elements[0].text)], limit=1) if not company: elements = tree.xpath('//CessionarioCommittente//CodiceFiscale') company = elements and self.env['res.company'].search([('l10n_it_codice_fiscale', 'ilike', elements[0].text)], limit=1) if not company: # Only invoices with a correct VAT or Codice Fiscale can be imported _logger.warning('No company found with VAT or Codice Fiscale like %r.', elements[0].text) continue # Refund type. # TD01 == invoice # TD02 == advance/down payment on invoice # TD03 == advance/down payment on fee # TD04 == credit note # TD05 == debit note # TD06 == fee # For unsupported document types, just assume in_invoice, and log that the type is unsupported elements = tree.xpath('//DatiGeneraliDocumento/TipoDocumento') move_type = 'in_invoice' if elements and elements[0].text and elements[0].text == 'TD04': move_type = 'in_refund' elif elements and elements[0].text and elements[0].text != 'TD01': _logger.info('Document type not managed: %s. Invoice type is set by default.', elements[0].text) # Setup the context for the Invoice Form invoice_ctx = invoice.with_company(company) \ .with_context(default_move_type=move_type, account_predictive_bills_disable_prediction=True) # move could be a single record (editing) or be empty (new). with Form(invoice_ctx) as invoice_form: message_to_log = [] # Partner (first step to avoid warning 'Warning! You must first select a partner.'). <1.2> elements = tree.xpath('//CedentePrestatore//IdCodice') partner = elements and self.env['res.partner'].search(['&', ('vat', 'ilike', elements[0].text), '|', ('company_id', '=', company.id), ('company_id', '=', False)], limit=1) if not partner: elements = tree.xpath('//CedentePrestatore//CodiceFiscale') if elements: codice = elements[0].text domains = [[('l10n_it_codice_fiscale', '=', codice)]] if re.match(r'^[0-9]{11}$', codice): domains.append([('l10n_it_codice_fiscale', '=', 'IT' + codice)]) elif re.match(r'^IT[0-9]{11}$', codice): domains.append([('l10n_it_codice_fiscale', '=', self.env['res.partner']._l10n_it_normalize_codice_fiscale(codice))]) partner = elements and self.env['res.partner'].search( AND([OR(domains), OR([[('company_id', '=', company.id)], [('company_id', '=', False)]])]), limit=1) if not partner: elements = tree.xpath('//DatiTrasmissione//Email') partner = elements and self.env['res.partner'].search(['&', '|', ('email', '=', elements[0].text), ('l10n_it_pec_email', '=', elements[0].text), '|', ('company_id', '=', company.id), ('company_id', '=', False)], limit=1) if partner: invoice_form.partner_id = partner else: message_to_log.append("%s
%s" % ( _("Vendor not found, useful informations from XML file:"), invoice._compose_info_message( tree, './/CedentePrestatore'))) # Numbering attributed by the transmitter. <1.1.2> elements = tree.xpath('//ProgressivoInvio') if elements: invoice_form.payment_reference = elements[0].text elements = body_tree.xpath('.//DatiGeneraliDocumento//Numero') if elements: invoice_form.ref = elements[0].text # Currency. <2.1.1.2> elements = body_tree.xpath('.//DatiGeneraliDocumento/Divisa') if elements: currency_str = elements[0].text currency = self.env.ref('base.%s' % currency_str.upper(), raise_if_not_found=False) if currency != self.env.company.currency_id and currency.active: invoice_form.currency_id = currency # Date. <2.1.1.3> elements = body_tree.xpath('.//DatiGeneraliDocumento/Data') if elements: date_str = elements[0].text date_obj = datetime.strptime(date_str, DEFAULT_FACTUR_ITALIAN_DATE_FORMAT) invoice_form.invoice_date = date_obj # Dati Bollo. <2.1.1.6> elements = body_tree.xpath('.//DatiGeneraliDocumento/DatiBollo/ImportoBollo') if elements: invoice_form.l10n_it_stamp_duty = float(elements[0].text) # List of all amount discount (will be add after all article to avoid to have a negative sum) discount_list = [] percentage_global_discount = 1.0 # Global discount. <2.1.1.8> discount_elements = body_tree.xpath('.//DatiGeneraliDocumento/ScontoMaggiorazione') total_discount_amount = 0.0 if discount_elements: for discount_element in discount_elements: discount_line = discount_element.xpath('.//Tipo') discount_sign = -1 if discount_line and discount_line[0].text == 'SC': discount_sign = 1 discount_percentage = discount_element.xpath('.//Percentuale') if discount_percentage and discount_percentage[0].text: percentage_global_discount *= 1 - float(discount_percentage[0].text)/100 * discount_sign discount_amount_text = discount_element.xpath('.//Importo') if discount_amount_text and discount_amount_text[0].text: discount_amount = float(discount_amount_text[0].text) * discount_sign * -1 discount = {} discount["seq"] = 0 if discount_amount < 0: discount["name"] = _('GLOBAL DISCOUNT') else: discount["name"] = _('GLOBAL EXTRA CHARGE') discount["amount"] = discount_amount discount["tax"] = [] discount_list.append(discount) # Comment. <2.1.1.11> elements = body_tree.xpath('.//DatiGeneraliDocumento//Causale') for element in elements: invoice_form.narration = '%s%s\n' % (invoice_form.narration or '', element.text) # Informations relative to the purchase order, the contract, the agreement, # the reception phase or invoices previously transmitted # <2.1.2> - <2.1.6> for document_type in ['DatiOrdineAcquisto', 'DatiContratto', 'DatiConvenzione', 'DatiRicezione', 'DatiFattureCollegate']: elements = body_tree.xpath('.//DatiGenerali/' + document_type) if elements: for element in elements: message_to_log.append("%s %s
%s" % (document_type, _("from XML file:"), invoice._compose_info_message(element, '.'))) # Dati DDT. <2.1.8> elements = body_tree.xpath('.//DatiGenerali/DatiDDT') if elements: message_to_log.append("%s
%s" % ( _("Transport informations from XML file:"), invoice._compose_info_message(body_tree, './/DatiGenerali/DatiDDT'))) # Due date. <2.4.2.5> elements = body_tree.xpath('.//DatiPagamento/DettaglioPagamento/DataScadenzaPagamento') if elements: date_str = elements[0].text date_obj = datetime.strptime(date_str, DEFAULT_FACTUR_ITALIAN_DATE_FORMAT) invoice_form.invoice_date_due = fields.Date.to_string(date_obj) # Total amount. <2.4.2.6> elements = body_tree.xpath('.//ImportoPagamento') amount_total_import = 0 for element in elements: amount_total_import += float(element.text) if amount_total_import: message_to_log.append(_("Total amount from the XML File: %s") % ( amount_total_import)) # Bank account. <2.4.2.13> if invoice_form.move_type not in ('out_invoice', 'in_refund'): elements = body_tree.xpath('.//DatiPagamento/DettaglioPagamento/IBAN') if elements: if invoice_form.partner_id and invoice_form.partner_id.commercial_partner_id: bank = self.env['res.partner.bank'].search([ ('acc_number', '=', elements[0].text), ('partner_id.id', '=', invoice_form.partner_id.commercial_partner_id.id) ]) else: bank = self.env['res.partner.bank'].search([('acc_number', '=', elements[0].text)]) if bank: invoice_form.partner_bank_id = bank else: message_to_log.append("%s
%s" % ( _("Bank account not found, useful informations from XML file:"), invoice._compose_multi_info_message( body_tree, ['.//DatiPagamento//Beneficiario', './/DatiPagamento//IstitutoFinanziario', './/DatiPagamento//IBAN', './/DatiPagamento//ABI', './/DatiPagamento//CAB', './/DatiPagamento//BIC', './/DatiPagamento//ModalitaPagamento']))) else: elements = body_tree.xpath('.//DatiPagamento/DettaglioPagamento') if elements: message_to_log.append("%s
%s" % ( _("Bank account not found, useful informations from XML file:"), invoice._compose_info_message(body_tree, './/DatiPagamento'))) # Invoice lines. <2.2.1> elements = body_tree.xpath('.//DettaglioLinee') if elements: for element in elements: with invoice_form.invoice_line_ids.new() as invoice_line_form: # Sequence. line_elements = element.xpath('.//NumeroLinea') if line_elements: invoice_line_form.sequence = int(line_elements[0].text) * 2 # Product. line_elements = element.xpath('.//Descrizione') if line_elements: invoice_line_form.name = " ".join(line_elements[0].text.split()) elements_code = element.xpath('.//CodiceArticolo') if elements_code: for element_code in elements_code: type_code = element_code.xpath('.//CodiceTipo')[0] code = element_code.xpath('.//CodiceValore')[0] if type_code.text == 'EAN': product = self.env['product.product'].search([('barcode', '=', code.text)]) if product: invoice_line_form.product_id = product break if partner: product_supplier = self.env['product.supplierinfo'].search([('name', '=', partner.id), ('product_code', '=', code.text)]) if product_supplier and product_supplier.product_id: invoice_line_form.product_id = product_supplier.product_id break if not invoice_line_form.product_id: for element_code in elements_code: code = element_code.xpath('.//CodiceValore')[0] product = self.env['product.product'].search([('default_code', '=', code.text)]) if product: invoice_line_form.product_id = product break # Price Unit. line_elements = element.xpath('.//PrezzoUnitario') if line_elements: invoice_line_form.price_unit = float(line_elements[0].text) # Quantity. line_elements = element.xpath('.//Quantita') if line_elements: invoice_line_form.quantity = float(line_elements[0].text) else: invoice_line_form.quantity = 1 # Taxes tax_element = element.xpath('.//AliquotaIVA') natura_element = element.xpath('.//Natura') invoice_line_form.tax_ids.clear() if tax_element and tax_element[0].text: percentage = float(tax_element[0].text) if natura_element and natura_element[0].text: l10n_it_kind_exoneration = natura_element[0].text tax = self.env['account.tax'].search([ ('company_id', '=', invoice_form.company_id.id), ('amount_type', '=', 'percent'), ('type_tax_use', '=', 'purchase'), ('amount', '=', percentage), ('l10n_it_kind_exoneration', '=', l10n_it_kind_exoneration), ], limit=1) else: tax = self.env['account.tax'].search([ ('company_id', '=', invoice_form.company_id.id), ('amount_type', '=', 'percent'), ('type_tax_use', '=', 'purchase'), ('amount', '=', percentage), ], limit=1) l10n_it_kind_exoneration = '' if tax: invoice_line_form.tax_ids.add(tax) else: if l10n_it_kind_exoneration: message_to_log.append(_("Tax not found with percentage: %s and exoneration %s for the article: %s") % ( percentage, l10n_it_kind_exoneration, invoice_line_form.name)) else: message_to_log.append(_("Tax not found with percentage: %s for the article: %s") % ( percentage, invoice_line_form.name)) # Discount in cascade mode. # if 3 discounts : -10% -50€ -20% # the result must be : (((price -10%)-50€) -20%) # Generic form : (((price -P1%)-A1€) -P2%) # It will be split in two parts: fix amount and pourcent amount # example: (((((price - A1€) -P2%) -A3€) -A4€) -P5€) # pourcent: 1-(1-P2)*(1-P5) # fix amount: A1*(1-P2)*(1-P5)+A3*(1-P5)+A4*(1-P5) (we must take account of all # percentage present after the fix amount) line_elements = element.xpath('.//ScontoMaggiorazione') total_discount_amount = 0.0 total_discount_percentage = percentage_global_discount if line_elements: for line_element in line_elements: discount_line = line_element.xpath('.//Tipo') discount_sign = -1 if discount_line and discount_line[0].text == 'SC': discount_sign = 1 discount_percentage = line_element.xpath('.//Percentuale') if discount_percentage and discount_percentage[0].text: pourcentage_actual = 1 - float(discount_percentage[0].text)/100 * discount_sign total_discount_percentage *= pourcentage_actual total_discount_amount *= pourcentage_actual discount_amount = line_element.xpath('.//Importo') if discount_amount and discount_amount[0].text: total_discount_amount += float(discount_amount[0].text) * discount_sign * -1 # Save amount discount. if total_discount_amount != 0: discount = {} discount["seq"] = invoice_line_form.sequence + 1 if total_discount_amount < 0: discount["name"] = _('DISCOUNT: %s', invoice_line_form.name) else: discount["name"] = _('EXTRA CHARGE: %s', invoice_line_form.name) discount["amount"] = total_discount_amount discount["tax"] = [] for tax in invoice_line_form.tax_ids: discount["tax"].append(tax) discount_list.append(discount) invoice_line_form.discount = (1 - total_discount_percentage) * 100 # Apply amount discount. for discount in discount_list: with invoice_form.invoice_line_ids.new() as invoice_line_form_discount: invoice_line_form_discount.tax_ids.clear() invoice_line_form_discount.sequence = discount["seq"] invoice_line_form_discount.name = discount["name"] invoice_line_form_discount.price_unit = discount["amount"] new_invoice = invoice_form.save() new_invoice.l10n_it_send_state = "other" elements = body_tree.xpath('.//Allegati') if elements: for element in elements: name_attachment = element.xpath('.//NomeAttachment')[0].text attachment_64 = str.encode(element.xpath('.//Attachment')[0].text) attachment_64 = self.env['ir.attachment'].create({ 'name': name_attachment, 'datas': attachment_64, 'type': 'binary', }) # default_res_id is had to context to avoid facturx to import his content # no_new_invoice to prevent from looping on the message_post that would create a new invoice without it new_invoice.with_context(no_new_invoice=True, default_res_id=new_invoice.id).message_post( body=(_("Attachment from XML")), attachment_ids=[attachment_64.id] ) for message in message_to_log: new_invoice.message_post(body=message) invoices += new_invoice return invoices