diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
| commit | 3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch) | |
| tree | a44932296ef4a9b71d5f010906253d8c53727726 /addons/l10n_it_edi/models | |
| parent | 0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff) | |
initial commit 2
Diffstat (limited to 'addons/l10n_it_edi/models')
| -rw-r--r-- | addons/l10n_it_edi/models/__init__.py | 9 | ||||
| -rw-r--r-- | addons/l10n_it_edi/models/account_edi_format.py | 606 | ||||
| -rw-r--r-- | addons/l10n_it_edi/models/account_invoice.py | 317 | ||||
| -rw-r--r-- | addons/l10n_it_edi/models/ddt.py | 18 | ||||
| -rw-r--r-- | addons/l10n_it_edi/models/ir_mail_server.py | 434 | ||||
| -rw-r--r-- | addons/l10n_it_edi/models/res_company.py | 110 | ||||
| -rw-r--r-- | addons/l10n_it_edi/models/res_partner.py | 49 |
7 files changed, 1543 insertions, 0 deletions
diff --git a/addons/l10n_it_edi/models/__init__.py b/addons/l10n_it_edi/models/__init__.py new file mode 100644 index 00000000..e2f3f697 --- /dev/null +++ b/addons/l10n_it_edi/models/__init__.py @@ -0,0 +1,9 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from . import res_partner +from . import res_company +from . import account_invoice +from . import account_edi_format +from . import ir_mail_server +from . import ddt diff --git a/addons/l10n_it_edi/models/account_edi_format.py b/addons/l10n_it_edi/models/account_edi_format.py new file mode 100644 index 00000000..24149cbd --- /dev/null +++ b/addons/l10n_it_edi/models/account_edi_format.py @@ -0,0 +1,606 @@ +# -*- coding:utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from odoo import api, models, fields, _ +from odoo.tests.common import Form +from odoo.exceptions import UserError +from odoo.addons.l10n_it_edi.tools.remove_signature import remove_signature +from odoo.osv.expression import OR, AND + +from lxml import etree +from datetime import datetime +import re +import logging + + +_logger = logging.getLogger(__name__) + +DEFAULT_FACTUR_ITALIAN_DATE_FORMAT = '%Y-%m-%d' + + +class AccountEdiFormat(models.Model): + _inherit = 'account.edi.format' + + # ------------------------------------------------------------------------- + # Helpers + # ------------------------------------------------------------------------- + + @api.model + def _l10n_it_edi_generate_electronic_invoice_filename(self, invoice): + '''Returns a name conform to the Fattura pa Specifications: + See ES documentation 2.2 + ''' + a = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" + n = invoice.id + progressive_number = "" + while n: + (n, m) = divmod(n, len(a)) + progressive_number = a[m] + progressive_number + + return '%(country_code)s%(codice)s_%(progressive_number)s.xml' % { + 'country_code': invoice.company_id.country_id.code, + 'codice': invoice.company_id.l10n_it_codice_fiscale.replace(' ', ''), + 'progressive_number': progressive_number.zfill(5), + } + + def _l10n_it_edi_check_invoice_configuration(self, invoice): + errors = [] + seller = invoice.company_id + buyer = invoice.commercial_partner_id + + # <1.1.1.1> + if not seller.country_id: + errors.append(_("%s must have a country", seller.display_name)) + + # <1.1.1.2> + if not seller.vat: + errors.append(_("%s must have a VAT number", seller.display_name)) + elif len(seller.vat) > 30: + errors.append(_("The maximum length for VAT number is 30. %s have a VAT number too long: %s.", seller.display_name, seller.vat)) + + # <1.2.1.2> + if not seller.l10n_it_codice_fiscale: + errors.append(_("%s must have a codice fiscale number", seller.display_name)) + + # <1.2.1.8> + if not seller.l10n_it_tax_system: + errors.append(_("The seller's company must have a tax system.")) + + # <1.2.2> + if not seller.street and not seller.street2: + errors.append(_("%s must have a street.", seller.display_name)) + if not seller.zip: + errors.append(_("%s must have a post code.", seller.display_name)) + elif len(seller.zip) != 5 and seller.country_id.code == 'IT': + errors.append(_("%s must have a post code of length 5.", seller.display_name)) + if not seller.city: + errors.append(_("%s must have a city.", seller.display_name)) + if not seller.country_id: + errors.append(_("%s must have a country.", seller.display_name)) + + if seller.l10n_it_has_tax_representative and not seller.l10n_it_tax_representative_partner_id.vat: + errors.append(_("Tax representative partner %s of %s must have a tax number.", seller.l10n_it_tax_representative_partner_id.display_name, seller.display_name)) + + # <1.4.1> + if not buyer.vat and not buyer.l10n_it_codice_fiscale and buyer.country_id.code == 'IT': + errors.append(_("The buyer, %s, or his company must have either a VAT number either a tax code (Codice Fiscale).", buyer.display_name)) + + # <1.4.2> + if not buyer.street and not buyer.street2: + errors.append(_("%s must have a street.", buyer.display_name)) + if not buyer.zip: + errors.append(_("%s must have a post code.", buyer.display_name)) + elif len(buyer.zip) != 5 and buyer.country_id.code == 'IT': + errors.append(_("%s must have a post code of length 5.", buyer.display_name)) + if not buyer.city: + errors.append(_("%s must have a city.", buyer.display_name)) + if not buyer.country_id: + errors.append(_("%s must have a country.", buyer.display_name)) + + # <2.2.1> + for invoice_line in invoice.invoice_line_ids: + if not invoice_line.display_type and len(invoice_line.tax_ids) != 1: + raise UserError(_("You must select one and only one tax by line.")) + + for tax_line in invoice.line_ids.filtered(lambda line: line.tax_line_id): + if not tax_line.tax_line_id.l10n_it_kind_exoneration and tax_line.tax_line_id.amount == 0: + errors.append(_("%s has an amount of 0.0, you must indicate the kind of exoneration.", tax_line.name)) + + if not invoice.partner_bank_id: + errors.append(_("The seller must have a bank account.")) + + return errors + + # ------------------------------------------------------------------------- + # Export + # ------------------------------------------------------------------------- + + def _is_embedding_to_invoice_pdf_needed(self): + # OVERRIDE + self.ensure_one() + return True if self.code == 'fattura_pa' else super()._is_embedding_to_invoice_pdf_needed() + + def _is_compatible_with_journal(self, journal): + # OVERRIDE + self.ensure_one() + if self.code != 'fattura_pa': + return super()._is_compatible_with_journal(journal) + return journal.type == 'sale' and journal.country_code == 'IT' + + def _l10n_it_edi_is_required_for_invoice(self, invoice): + """ Is the edi required for this invoice based on the method (here: PEC mail) + Deprecated: in future release PEC mail will be removed. + TO OVERRIDE + """ + return invoice.is_sale_document() and invoice.l10n_it_send_state not in ('sent', 'delivered', 'delivered_accepted') and invoice.country_code == 'IT' + + def _is_required_for_invoice(self, invoice): + # OVERRIDE + self.ensure_one() + if self.code != 'fattura_pa': + return super()._is_required_for_invoice(invoice) + + return self._l10n_it_edi_is_required_for_invoice(invoice) + + def _post_fattura_pa(self, invoices): + # TO OVERRIDE + invoice = invoices # no batching ensure that we only have one invoice + invoice.l10n_it_send_state = 'other' + invoice._check_before_xml_exporting() + if invoice.l10n_it_einvoice_id and invoice.l10n_it_send_state not in ['invalid', 'to_send']: + return {'error': _("You can't regenerate an E-Invoice when the first one is sent and there are no errors")} + if invoice.l10n_it_einvoice_id: + invoice.l10n_it_einvoice_id.unlink() + res = invoice.invoice_generate_xml() + if len(invoice.commercial_partner_id.l10n_it_pa_index or '') == 6: + invoice.message_post( + body=(_("Invoices for PA are not managed by Odoo, you can download the document and send it on your own.")) + ) + else: + invoice.l10n_it_send_state = 'to_send' + return {invoice: res} + + def _post_invoice_edi(self, invoices, test_mode=False): + # OVERRIDE + self.ensure_one() + edi_result = super()._post_invoice_edi(invoices) + if self.code != 'fattura_pa': + return edi_result + + return self._post_fattura_pa(invoices) + + # ------------------------------------------------------------------------- + # Import + # ------------------------------------------------------------------------- + + def _check_filename_is_fattura_pa(self, filename): + return re.search("([A-Z]{2}[A-Za-z0-9]{2,28}_[A-Za-z0-9]{0,5}.(xml.p7m|xml))", filename) + + def _is_fattura_pa(self, filename, tree): + return self.code == 'fattura_pa' and self._check_filename_is_fattura_pa(filename) + + def _create_invoice_from_xml_tree(self, filename, tree): + self.ensure_one() + if self._is_fattura_pa(filename, tree): + return self._import_fattura_pa(tree, self.env['account.move']) + return super()._create_invoice_from_xml_tree(filename, tree) + + def _update_invoice_from_xml_tree(self, filename, tree, invoice): + self.ensure_one() + if self._is_fattura_pa(filename, tree): + if len(tree.xpath('//FatturaElettronicaBody')) > 1: + invoice.message_post(body='The attachment contains multiple invoices, this invoice was not updated from it.', + message_type='comment', + subtype_xmlid='mail.mt_note', + author_id=self.env.ref('base.partner_root').id) + else: + return self._import_fattura_pa(tree, invoice) + return super()._update_invoice_from_xml_tree(filename, tree, invoice) + + def _decode_p7m_to_xml(self, filename, content): + decoded_content = remove_signature(content) + if not decoded_content: + return None + + try: + # Some malformed XML are accepted by FatturaPA, this expends compatibility + parser = etree.XMLParser(recover=True) + xml_tree = etree.fromstring(decoded_content, parser) + except Exception as e: + _logger.exception("Error when converting the xml content to etree: %s", e) + return None + if not len(xml_tree): + return None + + return xml_tree + + def _create_invoice_from_binary(self, filename, content, extension): + self.ensure_one() + if extension.lower() == '.xml.p7m': + decoded_content = self._decode_p7m_to_xml(filename, content) + if decoded_content is not None and self._is_fattura_pa(filename, decoded_content): + return self._import_fattura_pa(decoded_content, self.env['account.move']) + return super()._create_invoice_from_binary(filename, content, extension) + + def _update_invoice_from_binary(self, filename, content, extension, invoice): + self.ensure_one() + if extension.lower() == '.xml.p7m': + decoded_content = self._decode_p7m_to_xml(filename, content) + if decoded_content is not None and self._is_fattura_pa(filename, decoded_content): + return self._import_fattura_pa(decoded_content, invoice) + return super()._update_invoice_from_binary(filename, content, extension, invoice) + + def _import_fattura_pa(self, tree, invoice): + """ Decodes a fattura_pa invoice into an invoice. + + :param tree: the fattura_pa tree to decode. + :param invoice: the invoice to update or an empty recordset. + :returns: the invoice where the fattura_pa data was imported. + """ + invoices = self.env['account.move'] + first_run = True + + # possible to have multiple invoices in the case of an invoice batch, the batch itself is repeated for every invoice of the batch + for body_tree in tree.xpath('//FatturaElettronicaBody'): + if not first_run or not invoice: + # make sure all the iterations create a new invoice record (except the first which could have already created one) + invoice = self.env['account.move'] + first_run = False + + # Type must be present in the context to get the right behavior of the _default_journal method (account.move). + # journal_id must be present in the context to get the right behavior of the _default_account method (account.move.line). + elements = tree.xpath('//CessionarioCommittente//IdCodice') + company = elements and self.env['res.company'].search([('vat', 'ilike', elements[0].text)], limit=1) + if not company: + elements = tree.xpath('//CessionarioCommittente//CodiceFiscale') + company = elements and self.env['res.company'].search([('l10n_it_codice_fiscale', 'ilike', elements[0].text)], limit=1) + if not company: + # Only invoices with a correct VAT or Codice Fiscale can be imported + _logger.warning('No company found with VAT or Codice Fiscale like %r.', elements[0].text) + continue + + # Refund type. + # TD01 == invoice + # TD02 == advance/down payment on invoice + # TD03 == advance/down payment on fee + # TD04 == credit note + # TD05 == debit note + # TD06 == fee + # For unsupported document types, just assume in_invoice, and log that the type is unsupported + elements = tree.xpath('//DatiGeneraliDocumento/TipoDocumento') + move_type = 'in_invoice' + if elements and elements[0].text and elements[0].text == 'TD04': + move_type = 'in_refund' + elif elements and elements[0].text and elements[0].text != 'TD01': + _logger.info('Document type not managed: %s. Invoice type is set by default.', elements[0].text) + + # Setup the context for the Invoice Form + invoice_ctx = invoice.with_company(company) \ + .with_context(default_move_type=move_type, + account_predictive_bills_disable_prediction=True) + + # move could be a single record (editing) or be empty (new). + with Form(invoice_ctx) as invoice_form: + message_to_log = [] + + # Partner (first step to avoid warning 'Warning! You must first select a partner.'). <1.2> + elements = tree.xpath('//CedentePrestatore//IdCodice') + partner = elements and self.env['res.partner'].search(['&', ('vat', 'ilike', elements[0].text), '|', ('company_id', '=', company.id), ('company_id', '=', False)], limit=1) + if not partner: + elements = tree.xpath('//CedentePrestatore//CodiceFiscale') + if elements: + codice = elements[0].text + domains = [[('l10n_it_codice_fiscale', '=', codice)]] + if re.match(r'^[0-9]{11}$', codice): + domains.append([('l10n_it_codice_fiscale', '=', 'IT' + codice)]) + elif re.match(r'^IT[0-9]{11}$', codice): + domains.append([('l10n_it_codice_fiscale', '=', self.env['res.partner']._l10n_it_normalize_codice_fiscale(codice))]) + partner = elements and self.env['res.partner'].search( + AND([OR(domains), OR([[('company_id', '=', company.id)], [('company_id', '=', False)]])]), limit=1) + if not partner: + elements = tree.xpath('//DatiTrasmissione//Email') + partner = elements and self.env['res.partner'].search(['&', '|', ('email', '=', elements[0].text), ('l10n_it_pec_email', '=', elements[0].text), '|', ('company_id', '=', company.id), ('company_id', '=', False)], limit=1) + if partner: + invoice_form.partner_id = partner + else: + message_to_log.append("%s<br/>%s" % ( + _("Vendor not found, useful informations from XML file:"), + invoice._compose_info_message( + tree, './/CedentePrestatore'))) + + # Numbering attributed by the transmitter. <1.1.2> + elements = tree.xpath('//ProgressivoInvio') + if elements: + invoice_form.payment_reference = elements[0].text + + elements = body_tree.xpath('.//DatiGeneraliDocumento//Numero') + if elements: + invoice_form.ref = elements[0].text + + # Currency. <2.1.1.2> + elements = body_tree.xpath('.//DatiGeneraliDocumento/Divisa') + if elements: + currency_str = elements[0].text + currency = self.env.ref('base.%s' % currency_str.upper(), raise_if_not_found=False) + if currency != self.env.company.currency_id and currency.active: + invoice_form.currency_id = currency + + # Date. <2.1.1.3> + elements = body_tree.xpath('.//DatiGeneraliDocumento/Data') + if elements: + date_str = elements[0].text + date_obj = datetime.strptime(date_str, DEFAULT_FACTUR_ITALIAN_DATE_FORMAT) + invoice_form.invoice_date = date_obj + + # Dati Bollo. <2.1.1.6> + elements = body_tree.xpath('.//DatiGeneraliDocumento/DatiBollo/ImportoBollo') + if elements: + invoice_form.l10n_it_stamp_duty = float(elements[0].text) + + # List of all amount discount (will be add after all article to avoid to have a negative sum) + discount_list = [] + percentage_global_discount = 1.0 + + # Global discount. <2.1.1.8> + discount_elements = body_tree.xpath('.//DatiGeneraliDocumento/ScontoMaggiorazione') + total_discount_amount = 0.0 + if discount_elements: + for discount_element in discount_elements: + discount_line = discount_element.xpath('.//Tipo') + discount_sign = -1 + if discount_line and discount_line[0].text == 'SC': + discount_sign = 1 + discount_percentage = discount_element.xpath('.//Percentuale') + if discount_percentage and discount_percentage[0].text: + percentage_global_discount *= 1 - float(discount_percentage[0].text)/100 * discount_sign + + discount_amount_text = discount_element.xpath('.//Importo') + if discount_amount_text and discount_amount_text[0].text: + discount_amount = float(discount_amount_text[0].text) * discount_sign * -1 + discount = {} + discount["seq"] = 0 + + if discount_amount < 0: + discount["name"] = _('GLOBAL DISCOUNT') + else: + discount["name"] = _('GLOBAL EXTRA CHARGE') + discount["amount"] = discount_amount + discount["tax"] = [] + discount_list.append(discount) + + # Comment. <2.1.1.11> + elements = body_tree.xpath('.//DatiGeneraliDocumento//Causale') + for element in elements: + invoice_form.narration = '%s%s\n' % (invoice_form.narration or '', element.text) + + # Informations relative to the purchase order, the contract, the agreement, + # the reception phase or invoices previously transmitted + # <2.1.2> - <2.1.6> + for document_type in ['DatiOrdineAcquisto', 'DatiContratto', 'DatiConvenzione', 'DatiRicezione', 'DatiFattureCollegate']: + elements = body_tree.xpath('.//DatiGenerali/' + document_type) + if elements: + for element in elements: + message_to_log.append("%s %s<br/>%s" % (document_type, _("from XML file:"), + invoice._compose_info_message(element, '.'))) + + # Dati DDT. <2.1.8> + elements = body_tree.xpath('.//DatiGenerali/DatiDDT') + if elements: + message_to_log.append("%s<br/>%s" % ( + _("Transport informations from XML file:"), + invoice._compose_info_message(body_tree, './/DatiGenerali/DatiDDT'))) + + # Due date. <2.4.2.5> + elements = body_tree.xpath('.//DatiPagamento/DettaglioPagamento/DataScadenzaPagamento') + if elements: + date_str = elements[0].text + date_obj = datetime.strptime(date_str, DEFAULT_FACTUR_ITALIAN_DATE_FORMAT) + invoice_form.invoice_date_due = fields.Date.to_string(date_obj) + + # Total amount. <2.4.2.6> + elements = body_tree.xpath('.//ImportoPagamento') + amount_total_import = 0 + for element in elements: + amount_total_import += float(element.text) + if amount_total_import: + message_to_log.append(_("Total amount from the XML File: %s") % ( + amount_total_import)) + + # Bank account. <2.4.2.13> + if invoice_form.move_type not in ('out_invoice', 'in_refund'): + elements = body_tree.xpath('.//DatiPagamento/DettaglioPagamento/IBAN') + if elements: + if invoice_form.partner_id and invoice_form.partner_id.commercial_partner_id: + bank = self.env['res.partner.bank'].search([ + ('acc_number', '=', elements[0].text), + ('partner_id.id', '=', invoice_form.partner_id.commercial_partner_id.id) + ]) + else: + bank = self.env['res.partner.bank'].search([('acc_number', '=', elements[0].text)]) + if bank: + invoice_form.partner_bank_id = bank + else: + message_to_log.append("%s<br/>%s" % ( + _("Bank account not found, useful informations from XML file:"), + invoice._compose_multi_info_message( + body_tree, ['.//DatiPagamento//Beneficiario', + './/DatiPagamento//IstitutoFinanziario', + './/DatiPagamento//IBAN', + './/DatiPagamento//ABI', + './/DatiPagamento//CAB', + './/DatiPagamento//BIC', + './/DatiPagamento//ModalitaPagamento']))) + else: + elements = body_tree.xpath('.//DatiPagamento/DettaglioPagamento') + if elements: + message_to_log.append("%s<br/>%s" % ( + _("Bank account not found, useful informations from XML file:"), + invoice._compose_info_message(body_tree, './/DatiPagamento'))) + + # Invoice lines. <2.2.1> + elements = body_tree.xpath('.//DettaglioLinee') + if elements: + for element in elements: + with invoice_form.invoice_line_ids.new() as invoice_line_form: + + # Sequence. + line_elements = element.xpath('.//NumeroLinea') + if line_elements: + invoice_line_form.sequence = int(line_elements[0].text) * 2 + + # Product. + line_elements = element.xpath('.//Descrizione') + if line_elements: + invoice_line_form.name = " ".join(line_elements[0].text.split()) + + elements_code = element.xpath('.//CodiceArticolo') + if elements_code: + for element_code in elements_code: + type_code = element_code.xpath('.//CodiceTipo')[0] + code = element_code.xpath('.//CodiceValore')[0] + if type_code.text == 'EAN': + product = self.env['product.product'].search([('barcode', '=', code.text)]) + if product: + invoice_line_form.product_id = product + break + if partner: + product_supplier = self.env['product.supplierinfo'].search([('name', '=', partner.id), ('product_code', '=', code.text)]) + if product_supplier and product_supplier.product_id: + invoice_line_form.product_id = product_supplier.product_id + break + if not invoice_line_form.product_id: + for element_code in elements_code: + code = element_code.xpath('.//CodiceValore')[0] + product = self.env['product.product'].search([('default_code', '=', code.text)]) + if product: + invoice_line_form.product_id = product + break + + # Price Unit. + line_elements = element.xpath('.//PrezzoUnitario') + if line_elements: + invoice_line_form.price_unit = float(line_elements[0].text) + + # Quantity. + line_elements = element.xpath('.//Quantita') + if line_elements: + invoice_line_form.quantity = float(line_elements[0].text) + else: + invoice_line_form.quantity = 1 + + # Taxes + tax_element = element.xpath('.//AliquotaIVA') + natura_element = element.xpath('.//Natura') + invoice_line_form.tax_ids.clear() + if tax_element and tax_element[0].text: + percentage = float(tax_element[0].text) + if natura_element and natura_element[0].text: + l10n_it_kind_exoneration = natura_element[0].text + tax = self.env['account.tax'].search([ + ('company_id', '=', invoice_form.company_id.id), + ('amount_type', '=', 'percent'), + ('type_tax_use', '=', 'purchase'), + ('amount', '=', percentage), + ('l10n_it_kind_exoneration', '=', l10n_it_kind_exoneration), + ], limit=1) + else: + tax = self.env['account.tax'].search([ + ('company_id', '=', invoice_form.company_id.id), + ('amount_type', '=', 'percent'), + ('type_tax_use', '=', 'purchase'), + ('amount', '=', percentage), + ], limit=1) + l10n_it_kind_exoneration = '' + + if tax: + invoice_line_form.tax_ids.add(tax) + else: + if l10n_it_kind_exoneration: + message_to_log.append(_("Tax not found with percentage: %s and exoneration %s for the article: %s") % ( + percentage, + l10n_it_kind_exoneration, + invoice_line_form.name)) + else: + message_to_log.append(_("Tax not found with percentage: %s for the article: %s") % ( + percentage, + invoice_line_form.name)) + + # Discount in cascade mode. + # if 3 discounts : -10% -50€ -20% + # the result must be : (((price -10%)-50€) -20%) + # Generic form : (((price -P1%)-A1€) -P2%) + # It will be split in two parts: fix amount and pourcent amount + # example: (((((price - A1€) -P2%) -A3€) -A4€) -P5€) + # pourcent: 1-(1-P2)*(1-P5) + # fix amount: A1*(1-P2)*(1-P5)+A3*(1-P5)+A4*(1-P5) (we must take account of all + # percentage present after the fix amount) + line_elements = element.xpath('.//ScontoMaggiorazione') + total_discount_amount = 0.0 + total_discount_percentage = percentage_global_discount + if line_elements: + for line_element in line_elements: + discount_line = line_element.xpath('.//Tipo') + discount_sign = -1 + if discount_line and discount_line[0].text == 'SC': + discount_sign = 1 + discount_percentage = line_element.xpath('.//Percentuale') + if discount_percentage and discount_percentage[0].text: + pourcentage_actual = 1 - float(discount_percentage[0].text)/100 * discount_sign + total_discount_percentage *= pourcentage_actual + total_discount_amount *= pourcentage_actual + + discount_amount = line_element.xpath('.//Importo') + if discount_amount and discount_amount[0].text: + total_discount_amount += float(discount_amount[0].text) * discount_sign * -1 + + # Save amount discount. + if total_discount_amount != 0: + discount = {} + discount["seq"] = invoice_line_form.sequence + 1 + + if total_discount_amount < 0: + discount["name"] = _('DISCOUNT: %s', invoice_line_form.name) + else: + discount["name"] = _('EXTRA CHARGE: %s', invoice_line_form.name) + discount["amount"] = total_discount_amount + discount["tax"] = [] + for tax in invoice_line_form.tax_ids: + discount["tax"].append(tax) + discount_list.append(discount) + invoice_line_form.discount = (1 - total_discount_percentage) * 100 + + # Apply amount discount. + for discount in discount_list: + with invoice_form.invoice_line_ids.new() as invoice_line_form_discount: + invoice_line_form_discount.tax_ids.clear() + invoice_line_form_discount.sequence = discount["seq"] + invoice_line_form_discount.name = discount["name"] + invoice_line_form_discount.price_unit = discount["amount"] + + new_invoice = invoice_form.save() + new_invoice.l10n_it_send_state = "other" + + elements = body_tree.xpath('.//Allegati') + if elements: + for element in elements: + name_attachment = element.xpath('.//NomeAttachment')[0].text + attachment_64 = str.encode(element.xpath('.//Attachment')[0].text) + attachment_64 = self.env['ir.attachment'].create({ + 'name': name_attachment, + 'datas': attachment_64, + 'type': 'binary', + }) + + # default_res_id is had to context to avoid facturx to import his content + # no_new_invoice to prevent from looping on the message_post that would create a new invoice without it + new_invoice.with_context(no_new_invoice=True, default_res_id=new_invoice.id).message_post( + body=(_("Attachment from XML")), + attachment_ids=[attachment_64.id] + ) + + for message in message_to_log: + new_invoice.message_post(body=message) + + invoices += new_invoice + + return invoices diff --git a/addons/l10n_it_edi/models/account_invoice.py b/addons/l10n_it_edi/models/account_invoice.py new file mode 100644 index 00000000..df578e5d --- /dev/null +++ b/addons/l10n_it_edi/models/account_invoice.py @@ -0,0 +1,317 @@ +# -*- coding:utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import base64 +import zipfile +import io +import logging +import re + +from datetime import date, datetime +from lxml import etree + +from odoo import api, fields, models, _ +from odoo.tools import float_repr +from odoo.exceptions import UserError, ValidationError +from odoo.addons.base.models.ir_mail_server import MailDeliveryException +from odoo.tests.common import Form + + +_logger = logging.getLogger(__name__) + +DEFAULT_FACTUR_ITALIAN_DATE_FORMAT = '%Y-%m-%d' + + +class AccountMove(models.Model): + _inherit = 'account.move' + + l10n_it_send_state = fields.Selection([ + ('new', 'New'), + ('other', 'Other'), + ('to_send', 'Not yet send'), + ('sent', 'Sent, waiting for response'), + ('invalid', 'Sent, but invalid'), + ('delivered', 'This invoice is delivered'), + ('delivered_accepted', 'This invoice is delivered and accepted by destinatory'), + ('delivered_refused', 'This invoice is delivered and refused by destinatory'), + ('delivered_expired', 'This invoice is delivered and expired (expiry of the maximum term for communication of acceptance/refusal)'), + ('failed_delivery', 'Delivery impossible, ES certify that it has received the invoice and that the file \ + could not be delivered to the addressee') # ok we must do nothing + ], default='to_send', copy=False) + + l10n_it_stamp_duty = fields.Float(default=0, string="Dati Bollo", readonly=True, states={'draft': [('readonly', False)]}) + + l10n_it_ddt_id = fields.Many2one('l10n_it.ddt', string='DDT', readonly=True, states={'draft': [('readonly', False)]}, copy=False) + + l10n_it_einvoice_name = fields.Char(compute='_compute_l10n_it_einvoice') + + l10n_it_einvoice_id = fields.Many2one('ir.attachment', string="Electronic invoice", compute='_compute_l10n_it_einvoice') + + @api.depends('edi_document_ids', 'edi_document_ids.attachment_id') + def _compute_l10n_it_einvoice(self): + fattura_pa = self.env.ref('l10n_it_edi.edi_fatturaPA') + for invoice in self: + einvoice = invoice.edi_document_ids.filtered(lambda d: d.edi_format_id == fattura_pa) + invoice.l10n_it_einvoice_id = einvoice.attachment_id + invoice.l10n_it_einvoice_name = einvoice.attachment_id.name + + def _check_before_xml_exporting(self): + # DEPRECATED use AccountEdiFormat._l10n_it_edi_check_invoice_configuration instead + errors = self.env['account.edi.format']._l10n_it_edi_check_invoice_configuration(self) + if errors: + raise UserError(self.env['account.edi.format']._format_error_message(_("Invalid configuration:"), errors)) + + def invoice_generate_xml(self): + self.ensure_one() + report_name = self.env['account.edi.format']._l10n_it_edi_generate_electronic_invoice_filename(self) + + data = b"<?xml version='1.0' encoding='UTF-8'?>" + self._export_as_xml() + description = _('Italian invoice: %s', self.move_type) + attachment = self.env['ir.attachment'].create({ + 'name': report_name, + 'res_id': self.id, + 'res_model': self._name, + 'datas': base64.encodebytes(data), + 'description': description, + 'type': 'binary', + }) + + self.message_post( + body=(_("E-Invoice is generated on %s by %s") % (fields.Datetime.now(), self.env.user.display_name)) + ) + return {'attachment': attachment} + + def _prepare_fatturapa_export_values(self): + self.ensure_one() + + def format_date(dt): + # Format the date in the italian standard. + dt = dt or datetime.now() + return dt.strftime(DEFAULT_FACTUR_ITALIAN_DATE_FORMAT) + + def format_monetary(number, currency): + # Format the monetary values to avoid trailing decimals (e.g. 90.85000000000001). + return float_repr(number, min(2, currency.decimal_places)) + + def format_numbers(number): + #format number to str with between 2 and 8 decimals (event if it's .00) + number_splited = str(number).split('.') + if len(number_splited) == 1: + return "%.02f" % number + + cents = number_splited[1] + if len(cents) > 8: + return "%.08f" % number + return float_repr(number, max(2, len(cents))) + + def format_numbers_two(number): + #format number to str with 2 (event if it's .00) + return "%.02f" % number + + def discount_type(discount): + return 'SC' if discount > 0 else 'MG' + + def format_phone(number): + if not number: + return False + number = number.replace(' ', '').replace('/', '').replace('.', '') + if len(number) > 4 and len(number) < 13: + return number + return False + + def get_vat_number(vat): + return vat[2:].replace(' ', '') + + def get_vat_country(vat): + return vat[:2].upper() + + def in_eu(partner): + europe = self.env.ref('base.europe', raise_if_not_found=False) + country = partner.country_id + if not europe or not country or country in europe.country_ids: + return True + return False + + formato_trasmissione = "FPR12" + if len(self.commercial_partner_id.l10n_it_pa_index or '1') == 6: + formato_trasmissione = "FPA12" + + if self.move_type == 'out_invoice': + document_type = 'TD01' + elif self.move_type == 'out_refund': + document_type = 'TD04' + else: + document_type = 'TD0X' + + pdf = self.env.ref('account.account_invoices')._render_qweb_pdf(self.id)[0] + pdf = base64.b64encode(pdf) + pdf_name = re.sub(r'\W+', '', self.name) + '.pdf' + + # tax map for 0% taxes which have no tax_line_id + tax_map = dict() + for line in self.line_ids: + for tax in line.tax_ids: + if tax.amount == 0.0: + tax_map[tax] = tax_map.get(tax, 0.0) + line.price_subtotal + + # Create file content. + template_values = { + 'record': self, + 'format_date': format_date, + 'format_monetary': format_monetary, + 'format_numbers': format_numbers, + 'format_numbers_two': format_numbers_two, + 'format_phone': format_phone, + 'discount_type': discount_type, + 'get_vat_number': get_vat_number, + 'get_vat_country': get_vat_country, + 'in_eu': in_eu, + 'abs': abs, + 'formato_trasmissione': formato_trasmissione, + 'document_type': document_type, + 'pdf': pdf, + 'pdf_name': pdf_name, + 'tax_map': tax_map, + } + return template_values + + def _export_as_xml(self): + '''DEPRECATED : this will be moved to AccountEdiFormat in a future version. + Create the xml file content. + :return: The XML content as str. + ''' + template_values = self._prepare_fatturapa_export_values() + content = self.env.ref('l10n_it_edi.account_invoice_it_FatturaPA_export')._render(template_values) + return content + + def _post(self, soft=True): + # OVERRIDE + posted = super()._post(soft=soft) + + for move in posted.filtered(lambda m: m.l10n_it_send_state == 'to_send' and m.move_type == 'out_invoice' and m.company_id.country_id.code == 'IT'): + move.send_pec_mail() + + return posted + + def send_pec_mail(self): + self.ensure_one() + allowed_state = ['to_send', 'invalid'] + + if ( + not self.company_id.l10n_it_mail_pec_server_id + or not self.company_id.l10n_it_mail_pec_server_id.active + or not self.company_id.l10n_it_address_send_fatturapa + ): + self.message_post( + body=(_("Error when sending mail with E-Invoice: Your company must have a mail PEC server and must indicate the mail PEC that will send electronic invoice.")) + ) + self.l10n_it_send_state = 'invalid' + return + + if self.l10n_it_send_state not in allowed_state: + raise UserError(_("%s isn't in a right state. It must be in a 'Not yet send' or 'Invalid' state.") % (self.display_name)) + + message = self.env['mail.message'].create({ + 'subject': _('Sending file: %s') % (self.l10n_it_einvoice_name), + 'body': _('Sending file: %s to ES: %s') % (self.l10n_it_einvoice_name, self.env.company.l10n_it_address_recipient_fatturapa), + 'author_id': self.env.user.partner_id.id, + 'email_from': self.env.company.l10n_it_address_send_fatturapa, + 'reply_to': self.env.company.l10n_it_address_send_fatturapa, + 'mail_server_id': self.env.company.l10n_it_mail_pec_server_id.id, + 'attachment_ids': [(6, 0, self.l10n_it_einvoice_id.ids)], + }) + + mail_fattura = self.env['mail.mail'].sudo().with_context(wo_bounce_return_path=True).create({ + 'mail_message_id': message.id, + 'email_to': self.env.company.l10n_it_address_recipient_fatturapa, + }) + try: + mail_fattura.send(raise_exception=True) + self.message_post( + body=(_("Mail sent on %s by %s") % (fields.Datetime.now(), self.env.user.display_name)) + ) + self.l10n_it_send_state = 'sent' + except MailDeliveryException as error: + self.message_post( + body=(_("Error when sending mail with E-Invoice: %s") % (error.args[0])) + ) + self.l10n_it_send_state = 'invalid' + + def _compose_info_message(self, tree, element_tags): + output_str = "" + elements = tree.xpath(element_tags) + for element in elements: + output_str += "<ul>" + for line in element.iter(): + if line.text: + text = " ".join(line.text.split()) + if text: + output_str += "<li>%s: %s</li>" % (line.tag, text) + output_str += "</ul>" + return output_str + + def _compose_multi_info_message(self, tree, element_tags): + output_str = "<ul>" + + for element_tag in element_tags: + elements = tree.xpath(element_tag) + if not elements: + continue + for element in elements: + text = " ".join(element.text.split()) + if text: + output_str += "<li>%s: %s</li>" % (element.tag, text) + return output_str + "</ul>" + +class AccountTax(models.Model): + _name = "account.tax" + _inherit = "account.tax" + + l10n_it_vat_due_date = fields.Selection([ + ("I", "[I] IVA ad esigibilità immediata"), + ("D", "[D] IVA ad esigibilità differita"), + ("S", "[S] Scissione dei pagamenti")], default="I", string="VAT due date") + + l10n_it_has_exoneration = fields.Boolean(string="Has exoneration of tax (Italy)", help="Tax has a tax exoneration.") + l10n_it_kind_exoneration = fields.Selection(selection=[ + ("N1", "[N1] Escluse ex art. 15"), + ("N2", "[N2] Non soggette"), + ("N2.1", "[N2.1] Non soggette ad IVA ai sensi degli artt. Da 7 a 7-septies del DPR 633/72"), + ("N2.2", "[N2.2] Non soggette – altri casi"), + ("N3", "[N3] Non imponibili"), + ("N3.1", "[N3.1] Non imponibili – esportazioni"), + ("N3.2", "[N3.2] Non imponibili – cessioni intracomunitarie"), + ("N3.3", "[N3.3] Non imponibili – cessioni verso San Marino"), + ("N3.4", "[N3.4] Non imponibili – operazioni assimilate alle cessioni all’esportazione"), + ("N3.5", "[N3.5] Non imponibili – a seguito di dichiarazioni d’intento"), + ("N3.6", "[N3.6] Non imponibili – altre operazioni che non concorrono alla formazione del plafond"), + ("N4", "[N4] Esenti"), + ("N5", "[N5] Regime del margine / IVA non esposta in fattura"), + ("N6", "[N6] Inversione contabile (per le operazioni in reverse charge ovvero nei casi di autofatturazione per acquisti extra UE di servizi ovvero per importazioni di beni nei soli casi previsti)"), + ("N6.1", "[N6.1] Inversione contabile – cessione di rottami e altri materiali di recupero"), + ("N6.2", "[N6.2] Inversione contabile – cessione di oro e argento puro"), + ("N6.3", "[N6.3] Inversione contabile – subappalto nel settore edile"), + ("N6.4", "[N6.4] Inversione contabile – cessione di fabbricati"), + ("N6.5", "[N6.5] Inversione contabile – cessione di telefoni cellulari"), + ("N6.6", "[N6.6] Inversione contabile – cessione di prodotti elettronici"), + ("N6.7", "[N6.7] Inversione contabile – prestazioni comparto edile esettori connessi"), + ("N6.8", "[N6.8] Inversione contabile – operazioni settore energetico"), + ("N6.9", "[N6.9] Inversione contabile – altri casi"), + ("N7", "[N7] IVA assolta in altro stato UE (vendite a distanza ex art. 40 c. 3 e 4 e art. 41 c. 1 lett. b, DL 331/93; prestazione di servizi di telecomunicazioni, tele-radiodiffusione ed elettronici ex art. 7-sexies lett. f, g, art. 74-sexies DPR 633/72)")], + string="Exoneration", + help="Exoneration type", + default="N1") + l10n_it_law_reference = fields.Char(string="Law Reference", size=100) + + @api.constrains('l10n_it_has_exoneration', + 'l10n_it_kind_exoneration', + 'l10n_it_law_reference', + 'amount', + 'l10n_it_vat_due_date') + def _check_exoneration_with_no_tax(self): + for tax in self: + if tax.l10n_it_has_exoneration: + if not tax.l10n_it_kind_exoneration or not tax.l10n_it_law_reference or tax.amount != 0: + raise ValidationError(_("If the tax has exoneration, you must enter a kind of exoneration, a law reference and the amount of the tax must be 0.0.")) + if tax.l10n_it_kind_exoneration == 'N6' and tax.l10n_it_vat_due_date == 'S': + raise UserError(_("'Scissione dei pagamenti' is not compatible with exoneration of kind 'N6'")) diff --git a/addons/l10n_it_edi/models/ddt.py b/addons/l10n_it_edi/models/ddt.py new file mode 100644 index 00000000..1047e823 --- /dev/null +++ b/addons/l10n_it_edi/models/ddt.py @@ -0,0 +1,18 @@ +# -*- coding:utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from odoo import fields, models, api + +class L10nItDdt(models.Model): + _name = 'l10n_it.ddt' + _description = 'Transport Document' + + invoice_id = fields.One2many('account.move', 'l10n_it_ddt_id', string='Invoice Reference') + name = fields.Char(string="Numero DDT", size=20, help="Transport document number", required=True) + date = fields.Date(string="Data DDT", help="Transport document date", required=True) + + def name_get(self): + res = [] + for ddt in self: + res.append((ddt.id, ("%s (%s)") % (ddt.name, ddt.date))) + return res diff --git a/addons/l10n_it_edi/models/ir_mail_server.py b/addons/l10n_it_edi/models/ir_mail_server.py new file mode 100644 index 00000000..e74e6467 --- /dev/null +++ b/addons/l10n_it_edi/models/ir_mail_server.py @@ -0,0 +1,434 @@ +# -*- coding:utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import zipfile +import io +import re +import logging +import email +import email.policy +import dateutil +import pytz + +from lxml import etree +from datetime import datetime +from xmlrpc import client as xmlrpclib + +from odoo import api, fields, models, tools, _ +from odoo.exceptions import ValidationError, UserError +from odoo.addons.l10n_it_edi.tools.remove_signature import remove_signature + + +_logger = logging.getLogger(__name__) + +class FetchmailServer(models.Model): + _name = 'fetchmail.server' + _inherit = 'fetchmail.server' + + l10n_it_is_pec = fields.Boolean('PEC server', help="If PEC Server, only mail from '...@pec.fatturapa.it' will be processed.") + l10n_it_last_uid = fields.Integer(string='Last message UID', default=1) + + def _search_edi_invoice(self, att_name, send_state=False): + """ Search sent l10n_it_edi fatturaPA invoices """ + + conditions = [ + ('move_id', "!=", False), + ('edi_format_id.code', '=', 'fattura_pa'), + ('attachment_id.name', '=', att_name), + ] + if send_state: + conditions.append(('move_id.l10n_it_send_state', '=', send_state)) + + return self.env['account.edi.document'].search(conditions, limit=1).move_id + + @api.constrains('l10n_it_is_pec', 'server_type') + def _check_pec(self): + for record in self: + if record.l10n_it_is_pec and record.server_type != 'imap': + raise ValidationError(_("PEC mail server must be of type IMAP.")) + + def fetch_mail(self): + """ WARNING: meant for cron usage only - will commit() after each email! """ + + MailThread = self.env['mail.thread'] + for server in self.filtered(lambda s: s.l10n_it_is_pec): + _logger.info('start checking for new emails on %s PEC server %s', server.server_type, server.name) + + count, failed = 0, 0 + imap_server = None + try: + imap_server = server.connect() + imap_server.select() + + # Only download new emails + email_filter = ['(UID %s:*)' % (server.l10n_it_last_uid)] + + # The l10n_it_edi.fatturapa_bypass_incoming_address_filter prevents the sender address check on incoming email. + bypass_incoming_address_filter = self.env['ir.config_parameter'].get_param('l10n_it_edi.bypass_incoming_address_filter', False) + if not bypass_incoming_address_filter: + email_filter.append('(FROM "@pec.fatturapa.it")') + + data = imap_server.uid('search', None, *email_filter)[1] + + new_max_uid = server.l10n_it_last_uid + for uid in data[0].split(): + if int(uid) <= server.l10n_it_last_uid: + # We get always minimum 1 message. If no new message, we receive the newest already managed. + continue + + result, data = imap_server.uid('fetch', uid, '(RFC822)') + + if not data[0]: + continue + message = data[0][1] + + # To leave the mail in the state in which they were. + if "Seen" not in data[1].decode("utf-8"): + imap_server.uid('STORE', uid, '+FLAGS', '\\Seen') + else: + imap_server.uid('STORE', uid, '-FLAGS', '\\Seen') + + # See details in message_process() in mail_thread.py + if isinstance(message, xmlrpclib.Binary): + message = bytes(message.data) + if isinstance(message, str): + message = message.encode('utf-8') + msg_txt = email.message_from_bytes(message, policy=email.policy.SMTP) + + try: + self._attachment_invoice(msg_txt) + new_max_uid = max(new_max_uid, int(uid)) + except Exception: + _logger.info('Failed to process mail from %s server %s.', server.server_type, server.name, exc_info=True) + failed += 1 + self._cr.commit() + count += 1 + server.write({'l10n_it_last_uid': new_max_uid}) + _logger.info("Fetched %d email(s) on %s server %s; %d succeeded, %d failed.", count, server.server_type, server.name, (count - failed), failed) + except Exception: + _logger.info("General failure when trying to fetch mail from %s server %s.", server.server_type, server.name, exc_info=True) + finally: + if imap_server: + imap_server.close() + imap_server.logout() + server.write({'date': fields.Datetime.now()}) + return super(FetchmailServer, self.filtered(lambda s: not s.l10n_it_is_pec)).fetch_mail() + + def _attachment_invoice(self, msg_txt): + parsed_values = self.env['mail.thread']._message_parse_extract_payload(msg_txt) + body, attachments = parsed_values['body'], parsed_values['attachments'] + from_address = msg_txt.get('from') + for attachment in attachments: + split_attachment = attachment.fname.rpartition('.') + if len(split_attachment) < 3: + _logger.info('E-invoice filename not compliant: %s', attachment.fname) + continue + attachment_name = split_attachment[0] + attachment_ext = split_attachment[2] + split_underscore = attachment_name.rsplit('_', 2) + if len(split_underscore) < 2: + _logger.info('E-invoice filename not compliant: %s', attachment.fname) + continue + + if attachment_ext != 'zip': + if split_underscore[1] in ['RC', 'NS', 'MC', 'MT', 'EC', 'SE', 'NE', 'DT']: + # we have a receipt + self._message_receipt_invoice(split_underscore[1], attachment) + else: + att_filename = attachment.fname + match = re.search("([A-Z]{2}[A-Za-z0-9]{2,28}_[A-Za-z0-9]{0,5}.(xml.p7m|xml))", att_filename) + # If match, we have an invoice. + if match: + # If it's signed, the content has a bytes type and we just remove the signature's envelope + if match.groups()[1] == 'xml.p7m': + att_content_data = remove_signature(attachment.content) + # If the envelope cannot be removed, the remove_signature returns None, so we skip + if not att_content_data: + _logger.warning("E-invoice couldn't be read: %s", att_filename) + continue + att_filename = att_filename.replace('.xml.p7m', '.xml') + else: + # Otherwise, it should be an utf-8 encoded XML string + att_content_data = attachment.content.encode() + self._create_invoice_from_mail(att_content_data, att_filename, from_address) + else: + if split_underscore[1] == 'AT': + # Attestazione di avvenuta trasmissione della fattura con impossibilità di recapito + self._message_AT_invoice(attachment) + else: + _logger.info('New E-invoice in zip file: %s', attachment.fname) + self._create_invoice_from_mail_with_zip(attachment, from_address) + + def _create_invoice_from_mail(self, att_content_data, att_name, from_address): + """ Creates an invoice from the content of an email present in ir.attachments + + :param att_content_data: The 'utf-8' encoded bytes string representing the content of the attachment. + :param att_name: The attachment's file name. + :param from_address: The sender address of the email. + """ + + invoices = self.env['account.move'] + + # Check if we already imported the email as an attachment + existing = self.env['ir.attachment'].search([('name', '=', att_name), ('res_model', '=', 'account.move')]) + if existing: + _logger.info('E-invoice already exist: %s', att_name) + return invoices + + # Create the new attachment for the file + attachment = self.env['ir.attachment'].create({ + 'name': att_name, + 'raw': att_content_data, + 'res_model': 'account.move', + 'type': 'binary'}) + + # Decode the file. + try: + tree = etree.fromstring(att_content_data) + except Exception: + _logger.info('The xml file is badly formatted: %s', att_name) + return invoices + + invoices = self.env.ref('l10n_it_edi.edi_fatturaPA')._create_invoice_from_xml_tree(att_name, tree) + if not invoices: + _logger.info('E-invoice not found in file: %s', att_name) + return invoices + invoices.l10n_it_send_state = "new" + invoices.invoice_source_email = from_address + for invoice in invoices: + invoice.with_context(no_new_invoice=True, default_res_id=invoice.id) \ + .message_post(body=(_("Original E-invoice XML file")), attachment_ids=[attachment.id]) + + self._cr.commit() + + _logger.info('New E-invoices (%s), ids: %s', att_name, [x.id for x in invoices]) + return invoices + + def _create_invoice_from_mail_with_zip(self, attachment_zip, from_address): + with zipfile.ZipFile(io.BytesIO(attachment_zip.content)) as z: + for att_name in z.namelist(): + existing = self.env['ir.attachment'].search([('name', '=', att_name), ('res_model', '=', 'account.move')]) + if existing: + # invoice already exist + _logger.info('E-invoice in zip file (%s) already exist: %s', attachment_zip.fname, att_name) + continue + att_content = z.open(att_name).read() + + self._create_invoice_from_mail(att_content, att_name, from_address) + + def _message_AT_invoice(self, attachment_zip): + with zipfile.ZipFile(io.BytesIO(attachment_zip.content)) as z: + for attachment_name in z.namelist(): + split_name_attachment = attachment_name.rpartition('.') + if len(split_name_attachment) < 3: + continue + split_underscore = split_name_attachment[0].rsplit('_', 2) + if len(split_underscore) < 2: + continue + if split_underscore[1] == 'AT': + attachment = z.open(attachment_name).read() + _logger.info('New AT receipt for: %s', split_underscore[0]) + try: + tree = etree.fromstring(attachment) + except: + _logger.info('Error in decoding new receipt file: %s', attachment_name) + return + + elements = tree.xpath('//NomeFile') + if elements and elements[0].text: + filename = elements[0].text + else: + return + + related_invoice = self._search_edi_invoice(filename) + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', filename) + return + + related_invoice.l10n_it_send_state = 'failed_delivery' + info = self._return_multi_line_xml(tree, ['//IdentificativoSdI', '//DataOraRicezione', '//MessageId', '//PecMessageId', '//Note']) + related_invoice.message_post( + body=(_("ES certify that it has received the invoice and that the file \ + could not be delivered to the addressee. <br/>%s") % (info)) + ) + + def _message_receipt_invoice(self, receipt_type, attachment): + + try: + tree = etree.fromstring(attachment.content.encode()) + except: + _logger.info('Error in decoding new receipt file: %s', attachment.fname) + return {} + + elements = tree.xpath('//NomeFile') + if elements and elements[0].text: + filename = elements[0].text + else: + return {} + + if receipt_type == 'RC': + # Delivery receipt + # This is the receipt sent by the ES to the transmitting subject to communicate + # delivery of the file to the addressee + related_invoice = self._search_edi_invoice(filename, 'sent') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + related_invoice.l10n_it_send_state = 'delivered' + info = self._return_multi_line_xml(tree, ['//IdentificativoSdI', '//DataOraRicezione', '//DataOraConsegna', '//Note']) + related_invoice.message_post( + body=(_("E-Invoice is delivery to the destinatory:<br/>%s") % (info)) + ) + + elif receipt_type == 'NS': + # Rejection notice + # This is the receipt sent by the ES to the transmitting subject if one or more of + # the checks carried out by the ES on the file received do not have a successful result. + related_invoice = self._search_edi_invoice(filename, 'sent') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + related_invoice.l10n_it_send_state = 'invalid' + error = self._return_error_xml(tree) + related_invoice.message_post( + body=(_("Errors in the E-Invoice :<br/>%s") % (error)) + ) + related_invoice.activity_schedule( + 'mail.mail_activity_data_todo', + summary='Rejection notice', + user_id=related_invoice.invoice_user_id.id if related_invoice.invoice_user_id else self.env.user.id) + + elif receipt_type == 'MC': + # Failed delivery notice + # This is the receipt sent by the ES to the transmitting subject if the file is not + # delivered to the addressee. + related_invoice = self._search_edi_invoice(filename, 'sent') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + info = self._return_multi_line_xml(tree, [ + '//IdentificativoSdI', + '//DataOraRicezione', + '//Descrizione', + '//MessageId', + '//Note']) + related_invoice.message_post( + body=(_("The E-invoice is not delivered to the addressee. The Exchange System is\ + unable to deliver the file to the Public Administration. The Exchange System will\ + contact the PA to report the problem and request that they provide a solution. \ + During the following 15 days, the Exchange System will try to forward the FatturaPA\ + file to the Administration in question again. More information:<br/>%s") % (info)) + ) + + elif receipt_type == 'NE': + # Outcome notice + # This is the receipt sent by the ES to the invoice sender to communicate the result + # (acceptance or refusal of the invoice) of the checks carried out on the document by + # the addressee. + related_invoice = self._search_edi_invoice(filename, 'delivered') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + elements = tree.xpath('//Esito') + if elements and elements[0].text: + if elements[0].text == 'EC01': + related_invoice.l10n_it_send_state = 'delivered_accepted' + elif elements[0].text == 'EC02': + related_invoice.l10n_it_send_state = 'delivered_refused' + + info = self._return_multi_line_xml(tree, + ['//Esito', + '//Descrizione', + '//IdentificativoSdI', + '//DataOraRicezione', + '//DataOraConsegna', + '//Note' + ]) + related_invoice.message_post( + body=(_("Outcome notice: %s<br/>%s") % (related_invoice.l10n_it_send_state, info)) + ) + if related_invoice.l10n_it_send_state == 'delivered_refused': + related_invoice.activity_schedule( + 'mail.mail_activity_todo', + user_id=related_invoice.invoice_user_id.id if related_invoice.invoice_user_id else self.env.user.id, + summary='Outcome notice: Refused') + + # elif receipt_type == 'MT': + # Metadata file + # This is the file sent by the ES to the addressee together with the invoice file, + # containing the main reference data of the file useful for processing, including + # the IdentificativoSDI. + # Useless for Odoo + + elif receipt_type == 'DT': + # Deadline passed notice + # This is the receipt sent by the ES to both the invoice sender and the invoice + # addressee to communicate the expiry of the maximum term for communication of + # acceptance/refusal. + related_invoice = self._search_edi_invoice(filename, 'delivered') + if not related_invoice: + _logger.info('Error: invoice not found for receipt file: %s', attachment.fname) + return + related_invoice.l10n_it_send_state = 'delivered_expired' + info = self._return_multi_line_xml(tree, [ + '//Descrizione', + '//IdentificativoSdI', + '//Note']) + related_invoice.message_post( + body=(_("Expiration of the maximum term for communication of acceptance/refusal:\ + %s<br/>%s") % (filename, info)) + ) + + def _return_multi_line_xml(self, tree, element_tags): + output_str = "<ul>" + + for element_tag in element_tags: + elements = tree.xpath(element_tag) + if not elements: + continue + for element in elements: + if element.text: + text = " ".join(element.text.split()) + output_str += "<li>%s: %s</li>" % (element.tag, text) + return output_str + "</ul>" + + def _return_error_xml(self, tree): + output_str = "<ul>" + + elements = tree.xpath('//Errore') + if not elements: + return + for element in elements: + descrizione = " ".join(element[1].text.split()) + if descrizione: + output_str += "<li>Errore %s: %s</li>" % (element[0].text, descrizione) + return output_str + "</ul>" + +class IrMailServer(models.Model): + _name = "ir.mail_server" + _inherit = "ir.mail_server" + + def _get_test_email_addresses(self): + self.ensure_one() + + company = self.env["res.company"].search([("l10n_it_mail_pec_server_id", "=", self.id)], limit=1) + if not company: + # it's not a PEC server + return super()._get_test_email_addresses() + email_from = self.smtp_user + if not email_from: + raise UserError(_('Please configure Username for this Server PEC')) + email_to = company.l10n_it_address_recipient_fatturapa + if not email_to: + raise UserError(_('Please configure Government PEC-mail in company settings')) + return email_from, email_to + + def build_email(self, email_from, email_to, subject, body, email_cc=None, email_bcc=None, reply_to=False, + attachments=None, message_id=None, references=None, object_id=False, subtype='plain', headers=None, + body_alternative=None, subtype_alternative='plain'): + + if self.env.context.get('wo_bounce_return_path') and headers: + headers['Return-Path'] = email_from + return super(IrMailServer, self).build_email(email_from, email_to, subject, body, email_cc=email_cc, email_bcc=email_bcc, reply_to=reply_to, + attachments=attachments, message_id=message_id, references=references, object_id=object_id, subtype=subtype, headers=headers, + body_alternative=body_alternative, subtype_alternative=subtype_alternative) diff --git a/addons/l10n_it_edi/models/res_company.py b/addons/l10n_it_edi/models/res_company.py new file mode 100644 index 00000000..71c651f8 --- /dev/null +++ b/addons/l10n_it_edi/models/res_company.py @@ -0,0 +1,110 @@ +# -*- coding:utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from odoo import api, fields, models, _ +from odoo.exceptions import ValidationError + +TAX_SYSTEM = [ + ("RF01", "[RF01] Ordinario"), + ("RF02", "[RF02] Contribuenti minimi (art.1, c.96-117, L. 244/07)"), + ("RF04", "[RF04] Agricoltura e attività connesse e pesca (artt.34 e 34-bis, DPR 633/72)"), + ("RF05", "[RF05] Vendita sali e tabacchi (art.74, c.1, DPR. 633/72)"), + ("RF06", "[RF06] Commercio fiammiferi (art.74, c.1, DPR 633/72)"), + ("RF07", "[RF07] Editoria (art.74, c.1, DPR 633/72)"), + ("RF08", "[RF08] Gestione servizi telefonia pubblica (art.74, c.1, DPR 633/72)"), + ("RF09", "[RF09] Rivendita documenti di trasporto pubblico e di sosta (art.74, c.1, DPR 633/72)"), + ("RF10", "[RF10] Intrattenimenti, giochi e altre attività di cui alla tariffa allegata al DPR 640/72 (art.74, c.6, DPR 633/72)"), + ("RF11", "[RF11] Agenzie viaggi e turismo (art.74-ter, DPR 633/72)"), + ("RF12", "[RF12] Agriturismo (art.5, c.2, L. 413/91)"), + ("RF13", "[RF13] Vendite a domicilio (art.25-bis, c.6, DPR 600/73)"), + ("RF14", "[RF14] Rivendita beni usati, oggetti d’arte, d’antiquariato o da collezione (art.36, DL 41/95)"), + ("RF15", "[RF15] Agenzie di vendite all’asta di oggetti d’arte, antiquariato o da collezione (art.40-bis, DL 41/95)"), + ("RF16", "[RF16] IVA per cassa P.A. (art.6, c.5, DPR 633/72)"), + ("RF17", "[RF17] IVA per cassa (art. 32-bis, DL 83/2012)"), + ("RF18", "[RF18] Altro"), + ("RF19", "[RF19] Regime forfettario (art.1, c.54-89, L. 190/2014)"), +] + +class ResCompany(models.Model): + _name = 'res.company' + _inherit = 'res.company' + + l10n_it_codice_fiscale = fields.Char(string="Codice Fiscale", size=16, related='partner_id.l10n_it_codice_fiscale', + store=True, readonly=False, help="Fiscal code of your company") + l10n_it_tax_system = fields.Selection(selection=TAX_SYSTEM, string="Tax System", + help="Please select the Tax system to which you are subjected.") + + # PEC server + l10n_it_mail_pec_server_id = fields.Many2one('ir.mail_server', string="Server PEC", + help="Configure your PEC-mail server to send electronic invoices.") + l10n_it_address_recipient_fatturapa = fields.Char(string="Government PEC-mail", + help="Enter Government PEC-mail address. Ex: sdi01@pec.fatturapa.it") + l10n_it_address_send_fatturapa = fields.Char(string="Company PEC-mail", + help="Enter your company PEC-mail address. Ex: yourcompany@pec.mail.it") + + + # Economic and Administrative Index + l10n_it_has_eco_index = fields.Boolean(default=False, + help="The seller/provider is a company listed on the register of companies and as\ + such must also indicate the registration data on all documents (art. 2250, Italian\ + Civil Code)") + l10n_it_eco_index_office = fields.Many2one('res.country.state', domain="[('country_id','=','IT')]", + string="Province of the register-of-companies office") + l10n_it_eco_index_number = fields.Char(string="Number in register of companies", size=20, + help="This field must contain the number under which the\ + seller/provider is listed on the register of companies.") + l10n_it_eco_index_share_capital = fields.Float(default=0.0, string="Share capital actually paid up", + help="Mandatory if the seller/provider is a company with share\ + capital (SpA, SApA, Srl), this field must contain the amount\ + of share capital actually paid up as resulting from the last\ + financial statement") + l10n_it_eco_index_sole_shareholder = fields.Selection( + [ + ("NO", "Not a limited liability company"), + ("SU", "Socio unico"), + ("SM", "Più soci")], + string="Shareholder") + l10n_it_eco_index_liquidation_state = fields.Selection( + [ + ("LS", "The company is in a state of liquidation"), + ("LN", "The company is not in a state of liquidation")], + string="Liquidation state") + + + # Tax representative + l10n_it_has_tax_representative = fields.Boolean(default=False, + help="The seller/provider is a non-resident subject which\ + carries out transactions in Italy with relevance for VAT\ + purposes and which takes avail of a tax representative in\ + Italy") + l10n_it_tax_representative_partner_id = fields.Many2one('res.partner', string='Tax representative partner') + + @api.constrains('l10n_it_has_eco_index', + 'l10n_it_eco_index_office', + 'l10n_it_eco_index_number', + 'l10n_it_eco_index_share_capital', + 'l10n_it_eco_index_sole_shareholder', + 'l10n_it_eco_index_liquidation_state') + def _check_eco_admin_index(self): + for record in self: + if not record.l10n_it_has_eco_index: + continue + if not record.l10n_it_eco_index_office\ + or not record.l10n_it_eco_index_number\ + or not record.l10n_it_eco_index_share_capital\ + or not record.l10n_it_eco_index_sole_shareholder\ + or not record.l10n_it_eco_index_liquidation_state: + raise ValidationError(_("All fields about the Economic and Administrative Index must be completed.")) + + @api.constrains('l10n_it_has_tax_representative', + 'l10n_it_tax_representative_partner_id') + def _check_tax_representative(self): + for record in self: + if not record.l10n_it_has_tax_representative: + continue + if not record.l10n_it_tax_representative_partner_id: + raise ValidationError(_("You must select a tax representative.")) + if not record.l10n_it_tax_representative_partner_id.vat: + raise ValidationError(_("Your tax representative partner must have a tax number.")) + if not record.l10n_it_tax_representative_partner_id.country_id: + raise ValidationError(_("Your tax representative partner must have a country.")) diff --git a/addons/l10n_it_edi/models/res_partner.py b/addons/l10n_it_edi/models/res_partner.py new file mode 100644 index 00000000..3a1d8d71 --- /dev/null +++ b/addons/l10n_it_edi/models/res_partner.py @@ -0,0 +1,49 @@ +# -*- coding:utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +from odoo import api, fields, models, _ +from odoo.exceptions import UserError, ValidationError + +import re + + +class ResPartner(models.Model): + _name = 'res.partner' + _inherit = 'res.partner' + + l10n_it_pec_email = fields.Char(string="PEC e-mail") + l10n_it_codice_fiscale = fields.Char(string="Codice Fiscale", size=16) + l10n_it_pa_index = fields.Char(string="PA index", + size=7, + help="Must contain the 6-character (or 7) code, present in the PA\ + Index in the information relative to the electronic invoicing service,\ + associated with the office which, within the addressee administration, deals\ + with receiving (and processing) the invoice.") + + _sql_constraints = [ + ('l10n_it_codice_fiscale', + "CHECK(l10n_it_codice_fiscale IS NULL OR l10n_it_codice_fiscale = '' OR LENGTH(l10n_it_codice_fiscale) >= 11)", + "Codice fiscale must have between 11 and 16 characters."), + + ('l10n_it_pa_index', + "CHECK(l10n_it_pa_index IS NULL OR l10n_it_pa_index = '' OR LENGTH(l10n_it_pa_index) >= 6)", + "PA index must have between 6 and 7 characters."), + ] + + @api.model + def _l10n_it_normalize_codice_fiscale(self, codice): + if codice and re.match(r'^IT[0-9]{11}$', codice): + return codice[2:13] + return codice + + @api.onchange('vat') + def _l10n_it_onchange_vat(self): + if not self.l10n_it_codice_fiscale: + self.l10n_it_codice_fiscale = self._l10n_it_normalize_codice_fiscale(self.vat) + + @api.constrains('l10n_it_codice_fiscale') + def validate_codice_fiscale(self): + p = re.compile(r'^([A-Za-z]{6}[0-9]{2}[A-Za-z]{1}[0-9]{2}[A-Za-z]{1}[0-9]{3}[A-Za-z]{1}$)|([0-9]{11})|(IT[0-9]{11})$') + for record in self: + if record.l10n_it_codice_fiscale and not p.match(record.l10n_it_codice_fiscale): + raise UserError(_("Invalid Codice Fiscale '%s': should be like 'MRTMTT91D08F205J' for physical person and '12345678901' or 'IT12345678901' for businesses.", record.l10n_it_codice_fiscale)) |
