summaryrefslogtreecommitdiff
path: root/addons/account_edi/models/account_edi_format.py
diff options
context:
space:
mode:
Diffstat (limited to 'addons/account_edi/models/account_edi_format.py')
-rw-r--r--addons/account_edi/models/account_edi_format.py552
1 files changed, 552 insertions, 0 deletions
diff --git a/addons/account_edi/models/account_edi_format.py b/addons/account_edi/models/account_edi_format.py
new file mode 100644
index 00000000..1dce2c58
--- /dev/null
+++ b/addons/account_edi/models/account_edi_format.py
@@ -0,0 +1,552 @@
+# -*- coding: utf-8 -*-
+# Part of Odoo. See LICENSE file for full copyright and licensing details.
+
+from odoo import models, fields, api
+from odoo.tools.pdf import OdooPdfFileReader, OdooPdfFileWriter
+from odoo.osv import expression
+from odoo.tools import html_escape
+
+from lxml import etree
+import base64
+import io
+import logging
+import pathlib
+
+_logger = logging.getLogger(__name__)
+
+
+class AccountEdiFormat(models.Model):
+ _name = 'account.edi.format'
+ _description = 'EDI format'
+
+ name = fields.Char()
+ code = fields.Char(required=True)
+
+ _sql_constraints = [
+ ('unique_code', 'unique (code)', 'This code already exists')
+ ]
+
+
+ ####################################################
+ # Low-level methods
+ ####################################################
+
+ @api.model_create_multi
+ def create(self, vals_list):
+ edi_formats = super().create(vals_list)
+
+ # activate by default on journal
+ journals = self.env['account.journal'].search([])
+ for journal in journals:
+ for edi_format in edi_formats:
+ if edi_format._is_compatible_with_journal(journal):
+ journal.edi_format_ids += edi_format
+
+ # activate cron
+ if any(edi_format._needs_web_services() for edi_format in edi_formats):
+ self.env.ref('account_edi.ir_cron_edi_network').active = True
+
+ return edi_formats
+
+ ####################################################
+ # Export method to override based on EDI Format
+ ####################################################
+
+ def _is_required_for_invoice(self, invoice):
+ """ Indicate if this EDI must be generated for the invoice passed as parameter.
+
+ :param invoice: An account.move having the invoice type.
+ :returns: True if the EDI must be generated, False otherwise.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return True
+
+ def _is_required_for_payment(self, payment):
+ """ Indicate if this EDI must be generated for the payment passed as parameter.
+
+ :param payment: An account.move linked to either an account.payment, either an account.bank.statement.line.
+ :returns: True if the EDI must be generated, False otherwise.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return False
+
+ def _needs_web_services(self):
+ """ Indicate if the EDI must be generated asynchronously through to some web services.
+
+ :return: True if such a web service is available, False otherwise.
+ """
+ self.ensure_one()
+ return False
+
+ def _is_compatible_with_journal(self, journal):
+ """ Indicate if the EDI format should appear on the journal passed as parameter to be selected by the user.
+ If True, this EDI format will be selected by default on the journal.
+
+ :param journal: The journal.
+ :returns: True if this format can be enabled by default on the journal, False otherwise.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return journal.type == 'sale'
+
+ def _is_embedding_to_invoice_pdf_needed(self):
+ """ Indicate if the EDI must be embedded inside the PDF report.
+
+ :returns: True if the documents need to be embedded, False otherwise.
+ """
+ # TO OVERRIDE
+ return False
+
+ def _get_embedding_to_invoice_pdf_values(self, invoice):
+ """ Get the values to embed to pdf.
+
+ :returns: A dictionary {'name': name, 'datas': datas} or False if there are no values to embed.
+ * name: The name of the file.
+ * datas: The bytes ot the file.
+ """
+ self.ensure_one()
+ attachment = invoice._get_edi_attachment(self)
+ if not attachment or not self._is_embedding_to_invoice_pdf_needed():
+ return False
+ datas = base64.b64decode(attachment.with_context(bin_size=False).datas)
+ return {'name': attachment.name, 'datas': datas}
+
+ def _support_batching(self, move=None, state=None, company=None):
+ """ Indicate if we can send multiple documents in the same time to the web services.
+ If True, the _post_%s_edi methods will get multiple documents in the same time.
+ Otherwise, these methods will be called with only one record at a time.
+
+ :returns: True if batching is supported, False otherwise.
+ """
+ # TO OVERRIDE
+ return False
+
+ def _get_batch_key(self, move, state):
+ """ Returns a tuple that will be used as key to partitionnate the invoices/payments when creating batches
+ with multiple invoices/payments.
+ The type of move (invoice or payment), its company_id, its edi state and the edi_format are used by default, if
+ no further partition is needed for this format, this method should return ().
+
+ :returns: The key to be used when partitionning the batches.
+ """
+ move.ensure_one()
+ return ()
+
+ def _check_move_configuration(self, move):
+ """ Checks the move and relevant records for potential error (missing data, etc).
+
+ :param invoice: The move to check.
+ :returns: A list of error messages.
+ """
+ # TO OVERRIDE
+ return []
+
+ def _post_invoice_edi(self, invoices, test_mode=False):
+ """ Create the file content representing the invoice (and calls web services if necessary).
+
+ :param invoices: A list of invoices to post.
+ :param test_mode: A flag indicating the EDI should only simulate the EDI without sending data.
+ :returns: A dictionary with the invoice as key and as value, another dictionary:
+ * attachment: The attachment representing the invoice in this edi_format if the edi was successfully posted.
+ * error: An error if the edi was not successfully posted.
+ * blocking_level: (optional, requires account_edi_extended) How bad is the error (how should the edi flow be blocked ?)
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return {}
+
+ def _cancel_invoice_edi(self, invoices, test_mode=False):
+ """Calls the web services to cancel the invoice of this document.
+
+ :param invoices: A list of invoices to cancel.
+ :param test_mode: A flag indicating the EDI should only simulate the EDI without sending data.
+ :returns: A dictionary with the invoice as key and as value, another dictionary:
+ * success: True if the invoice was successfully cancelled.
+ * error: An error if the edi was not successfully cancelled.
+ * blocking_level: (optional, requires account_edi_extended) How bad is the error (how should the edi flow be blocked ?)
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return {invoice: {'success': True} for invoice in invoices} # By default, cancel succeeds doing nothing.
+
+ def _post_payment_edi(self, payments, test_mode=False):
+ """ Create the file content representing the payment (and calls web services if necessary).
+
+ :param payments: The payments to post.
+ :param test_mode: A flag indicating the EDI should only simulate the EDI without sending data.
+ :returns: A dictionary with the payment as key and as value, another dictionary:
+ * attachment: The attachment representing the payment in this edi_format if the edi was successfully posted.
+ * error: An error if the edi was not successfully posted.
+ * blocking_level: (optional, requires account_edi_extended) How bad is the error (how should the edi flow be blocked ?)
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return {}
+
+ def _cancel_payment_edi(self, payments, test_mode=False):
+ """Calls the web services to cancel the payment of this document.
+
+ :param payments: A list of payments to cancel.
+ :param test_mode: A flag indicating the EDI should only simulate the EDI without sending data.
+ :returns: A dictionary with the payment as key and as value, another dictionary:
+ * success: True if the payment was successfully cancelled.
+ * error: An error if the edi was not successfully cancelled.
+ * blocking_level: (optional, requires account_edi_extended) How bad is the error (how should the edi flow be blocked ?)
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return {payment: {'success': True} for payment in payments} # By default, cancel succeeds doing nothing.
+
+ ####################################################
+ # Import methods to override based on EDI Format
+ ####################################################
+
+ def _create_invoice_from_xml_tree(self, filename, tree):
+ """ Create a new invoice with the data inside the xml.
+
+ :param filename: The name of the xml.
+ :param tree: The tree of the xml to import.
+ :returns: The created invoice.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return self.env['account.move']
+
+ def _update_invoice_from_xml_tree(self, filename, tree, invoice):
+ """ Update an existing invoice with the data inside the xml.
+
+ :param filename: The name of the xml.
+ :param tree: The tree of the xml to import.
+ :param invoice: The invoice to update.
+ :returns: The updated invoice.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return self.env['account.move']
+
+ def _create_invoice_from_pdf_reader(self, filename, reader):
+ """ Create a new invoice with the data inside a pdf.
+
+ :param filename: The name of the pdf.
+ :param reader: The OdooPdfFileReader of the pdf to import.
+ :returns: The created invoice.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+
+ return self.env['account.move']
+
+ def _update_invoice_from_pdf_reader(self, filename, reader, invoice):
+ """ Update an existing invoice with the data inside the pdf.
+
+ :param filename: The name of the pdf.
+ :param reader: The OdooPdfFileReader of the pdf to import.
+ :param invoice: The invoice to update.
+ :returns: The updated invoice.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return self.env['account.move']
+
+ def _create_invoice_from_binary(self, filename, content, extension):
+ """ Create a new invoice with the data inside a binary file.
+
+ :param filename: The name of the file.
+ :param content: The content of the binary file.
+ :param extension: The extensions as a string.
+ :returns: The created invoice.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return self.env['account.move']
+
+ def _update_invoice_from_binary(self, filename, content, extension, invoice):
+ """ Update an existing invoice with the data inside a binary file.
+
+ :param filename: The name of the file.
+ :param content: The content of the binary file.
+ :param extension: The extensions as a string.
+ :param invoice: The invoice to update.
+ :returns: The updated invoice.
+ """
+ # TO OVERRIDE
+ self.ensure_one()
+ return self.env['account.move']
+
+ ####################################################
+ # Export Internal methods (not meant to be overridden)
+ ####################################################
+
+ def _embed_edis_to_pdf(self, pdf_content, invoice):
+ """ Create the EDI document of the invoice and embed it in the pdf_content.
+
+ :param pdf_content: the bytes representing the pdf to add the EDIs to.
+ :param invoice: the invoice to generate the EDI from.
+ :returns: the same pdf_content with the EDI of the invoice embed in it.
+ """
+ attachments = []
+ for edi_format in self.filtered(lambda edi_format: edi_format._is_embedding_to_invoice_pdf_needed()):
+ attach = edi_format._get_embedding_to_invoice_pdf_values(invoice)
+ if attach:
+ attachments.append(attach)
+
+ if attachments:
+ # Add the attachments to the pdf file
+ reader_buffer = io.BytesIO(pdf_content)
+ reader = OdooPdfFileReader(reader_buffer, strict=False)
+ writer = OdooPdfFileWriter()
+ writer.cloneReaderDocumentRoot(reader)
+ for vals in attachments:
+ writer.addAttachment(vals['name'], vals['datas'])
+ buffer = io.BytesIO()
+ writer.write(buffer)
+ pdf_content = buffer.getvalue()
+ reader_buffer.close()
+ buffer.close()
+ return pdf_content
+
+ ####################################################
+ # Import Internal methods (not meant to be overridden)
+ ####################################################
+
+ def _decode_xml(self, filename, content):
+ """Decodes an xml into a list of one dictionary representing an attachment.
+
+ :param filename: The name of the xml.
+ :param content: The bytes representing the xml.
+ :returns: A list with a dictionary.
+ * filename: The name of the attachment.
+ * content: The content of the attachment.
+ * type: The type of the attachment.
+ * xml_tree: The tree of the xml if type is xml.
+ """
+ to_process = []
+ try:
+ xml_tree = etree.fromstring(content)
+ except Exception as e:
+ _logger.exception("Error when converting the xml content to etree: %s" % e)
+ return to_process
+ if len(xml_tree):
+ to_process.append({
+ 'filename': filename,
+ 'content': content,
+ 'type': 'xml',
+ 'xml_tree': xml_tree,
+ })
+ return to_process
+
+ def _decode_pdf(self, filename, content):
+ """Decodes a pdf and unwrap sub-attachment into a list of dictionary each representing an attachment.
+
+ :param filename: The name of the pdf.
+ :param content: The bytes representing the pdf.
+ :returns: A list of dictionary for each attachment.
+ * filename: The name of the attachment.
+ * content: The content of the attachment.
+ * type: The type of the attachment.
+ * xml_tree: The tree of the xml if type is xml.
+ * pdf_reader: The pdf_reader if type is pdf.
+ """
+ to_process = []
+ try:
+ buffer = io.BytesIO(content)
+ pdf_reader = OdooPdfFileReader(buffer, strict=False)
+ except Exception as e:
+ # Malformed pdf
+ _logger.exception("Error when reading the pdf: %s" % e)
+ return to_process
+
+ # Process embedded files.
+ try:
+ for xml_name, content in pdf_reader.getAttachments():
+ to_process.extend(self._decode_xml(xml_name, content))
+ except NotImplementedError as e:
+ _logger.warning("Unable to access the attachments of %s. Tried to decrypt it, but %s." % (filename, e))
+
+ # Process the pdf itself.
+ to_process.append({
+ 'filename': filename,
+ 'content': content,
+ 'type': 'pdf',
+ 'pdf_reader': pdf_reader,
+ })
+
+ return to_process
+
+ def _decode_binary(self, filename, content):
+ """Decodes any file into a list of one dictionary representing an attachment.
+ This is a fallback for all files that are not decoded by other methods.
+
+ :param filename: The name of the file.
+ :param content: The bytes representing the file.
+ :returns: A list with a dictionary.
+ * filename: The name of the attachment.
+ * content: The content of the attachment.
+ * type: The type of the attachment.
+ """
+ return [{
+ 'filename': filename,
+ 'extension': ''.join(pathlib.Path(filename).suffixes),
+ 'content': content,
+ 'type': 'binary',
+ }]
+
+ def _decode_attachment(self, attachment):
+ """Decodes an ir.attachment and unwrap sub-attachment into a list of dictionary each representing an attachment.
+
+ :param attachment: An ir.attachment record.
+ :returns: A list of dictionary for each attachment.
+ * filename: The name of the attachment.
+ * content: The content of the attachment.
+ * type: The type of the attachment.
+ * xml_tree: The tree of the xml if type is xml.
+ * pdf_reader: The pdf_reader if type is pdf.
+ """
+ content = base64.b64decode(attachment.with_context(bin_size=False).datas)
+ to_process = []
+
+ if 'pdf' in attachment.mimetype:
+ to_process.extend(self._decode_pdf(attachment.name, content))
+ elif 'xml' in attachment.mimetype:
+ to_process.extend(self._decode_xml(attachment.name, content))
+ else:
+ to_process.extend(self._decode_binary(attachment.name, content))
+
+ return to_process
+
+ def _create_invoice_from_attachment(self, attachment):
+ """Decodes an ir.attachment to create an invoice.
+
+ :param attachment: An ir.attachment record.
+ :returns: The invoice where to import data.
+ """
+ for file_data in self._decode_attachment(attachment):
+ for edi_format in self:
+ res = False
+ try:
+ if file_data['type'] == 'xml':
+ res = edi_format._create_invoice_from_xml_tree(file_data['filename'], file_data['xml_tree'])
+ elif file_data['type'] == 'pdf':
+ res = edi_format._create_invoice_from_pdf_reader(file_data['filename'], file_data['pdf_reader'])
+ file_data['pdf_reader'].stream.close()
+ else:
+ res = edi_format._create_invoice_from_binary(file_data['filename'], file_data['content'], file_data['extension'])
+ except Exception as e:
+ _logger.exception("Error importing attachment \"%s\" as invoice with format \"%s\"", file_data['filename'], edi_format.name, str(e))
+ if res:
+ if 'extract_state' in res:
+ # Bypass the OCR to prevent overwriting data when an EDI was succesfully imported.
+ # TODO : remove when we integrate the OCR to the EDI flow.
+ res.write({'extract_state': 'done'})
+ return res
+ return self.env['account.move']
+
+ def _update_invoice_from_attachment(self, attachment, invoice):
+ """Decodes an ir.attachment to update an invoice.
+
+ :param attachment: An ir.attachment record.
+ :returns: The invoice where to import data.
+ """
+ for file_data in self._decode_attachment(attachment):
+ for edi_format in self:
+ res = False
+ try:
+ if file_data['type'] == 'xml':
+ res = edi_format._update_invoice_from_xml_tree(file_data['filename'], file_data['xml_tree'], invoice)
+ elif file_data['type'] == 'pdf':
+ res = edi_format._update_invoice_from_pdf_reader(file_data['filename'], file_data['pdf_reader'], invoice)
+ file_data['pdf_reader'].stream.close()
+ else: # file_data['type'] == 'binary'
+ res = edi_format._update_invoice_from_binary(file_data['filename'], file_data['content'], file_data['extension'], invoice)
+ except Exception as e:
+ _logger.exception("Error importing attachment \"%s\" as invoice with format \"%s\"", file_data['filename'], edi_format.name, str(e))
+ if res:
+ if 'extract_state' in res:
+ # Bypass the OCR to prevent overwriting data when an EDI was succesfully imported.
+ # TODO : remove when we integrate the OCR to the EDI flow.
+ res.write({'extract_state': 'done'})
+ return res
+ return self.env['account.move']
+
+ ####################################################
+ # Import helpers
+ ####################################################
+
+ def _find_value(self, xpath, xml_element, namespaces=None):
+ element = xml_element.xpath(xpath, namespaces=namespaces)
+ return element[0].text if element else None
+
+ def _retrieve_partner(self, name=None, phone=None, mail=None, vat=None):
+ '''Search all partners and find one that matches one of the parameters.
+
+ :param name: The name of the partner.
+ :param phone: The phone or mobile of the partner.
+ :param mail: The mail of the partner.
+ :param vat: The vat number of the partner.
+ :returns: A partner or an empty recordset if not found.
+ '''
+ domains = []
+ for value, domain in (
+ (name, [('name', 'ilike', name)]),
+ (phone, expression.OR([[('phone', '=', phone)], [('mobile', '=', phone)]])),
+ (mail, [('email', '=', mail)]),
+ (vat, [('vat', 'like', vat)]),
+ ):
+ if value is not None:
+ domains.append(domain)
+
+ domain = expression.OR(domains)
+ return self.env['res.partner'].search(domain, limit=1)
+
+ def _retrieve_product(self, name=None, default_code=None, barcode=None):
+ '''Search all products and find one that matches one of the parameters.
+
+ :param name: The name of the product.
+ :param default_code: The default_code of the product.
+ :param barcode: The barcode of the product.
+ :returns: A product or an empty recordset if not found.
+ '''
+ domains = []
+ for value, domain in (
+ (name, ('name', 'ilike', name)),
+ (default_code, ('default_code', '=', default_code)),
+ (barcode, ('barcode', '=', barcode)),
+ ):
+ if value is not None:
+ domains.append([domain])
+
+ domain = expression.OR(domains)
+ return self.env['product.product'].search(domain, limit=1)
+
+ def _retrieve_tax(self, amount, type_tax_use):
+ '''Search all taxes and find one that matches all of the parameters.
+
+ :param amount: The amount of the tax.
+ :param type_tax_use: The type of the tax.
+ :returns: A tax or an empty recordset if not found.
+ '''
+ domains = [
+ [('amount', '=', float(amount))],
+ [('type_tax_use', '=', type_tax_use)]
+ ]
+
+ return self.env['account.tax'].search(expression.AND(domains), order='sequence ASC', limit=1)
+
+ def _retrieve_currency(self, code):
+ '''Search all currencies and find one that matches the code.
+
+ :param code: The code of the currency.
+ :returns: A currency or an empty recordset if not found.
+ '''
+ return self.env['res.currency'].search([('name', '=', code.upper())], limit=1)
+
+ ####################################################
+ # Other helpers
+ ####################################################
+
+ @api.model
+ def _format_error_message(self, error_title, errors):
+ bullet_list_msg = ''.join('<li>%s</li>' % html_escape(msg) for msg in errors)
+ return '%s<ul>%s</ul>' % (error_title, bullet_list_msg)