diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2022-05-10 21:51:50 +0700 |
| commit | 3751379f1e9a4c215fb6eb898b4ccc67659b9ace (patch) | |
| tree | a44932296ef4a9b71d5f010906253d8c53727726 /addons/account/models/partner.py | |
| parent | 0a15094050bfde69a06d6eff798e9a8ddf2b8c21 (diff) | |
initial commit 2
Diffstat (limited to 'addons/account/models/partner.py')
| -rw-r--r-- | addons/account/models/partner.py | 510 |
1 files changed, 510 insertions, 0 deletions
diff --git a/addons/account/models/partner.py b/addons/account/models/partner.py new file mode 100644 index 00000000..7d0c08b2 --- /dev/null +++ b/addons/account/models/partner.py @@ -0,0 +1,510 @@ +# -*- coding: utf-8 -*- +# Part of Odoo. See LICENSE file for full copyright and licensing details. + +import time +import logging + +from psycopg2 import sql, DatabaseError + +from odoo import api, fields, models, _ +from odoo.tools import DEFAULT_SERVER_DATETIME_FORMAT +from odoo.exceptions import ValidationError +from odoo.addons.base.models.res_partner import WARNING_MESSAGE, WARNING_HELP + +_logger = logging.getLogger(__name__) + +class AccountFiscalPosition(models.Model): + _name = 'account.fiscal.position' + _description = 'Fiscal Position' + _order = 'sequence' + + sequence = fields.Integer() + name = fields.Char(string='Fiscal Position', required=True) + active = fields.Boolean(default=True, + help="By unchecking the active field, you may hide a fiscal position without deleting it.") + company_id = fields.Many2one( + comodel_name='res.company', + string='Company', required=True, readonly=True, + default=lambda self: self.env.company) + account_ids = fields.One2many('account.fiscal.position.account', 'position_id', string='Account Mapping', copy=True) + tax_ids = fields.One2many('account.fiscal.position.tax', 'position_id', string='Tax Mapping', copy=True) + note = fields.Text('Notes', translate=True, help="Legal mentions that have to be printed on the invoices.") + auto_apply = fields.Boolean(string='Detect Automatically', help="Apply automatically this fiscal position.") + vat_required = fields.Boolean(string='VAT required', help="Apply only if partner has a VAT number.") + country_id = fields.Many2one('res.country', string='Country', + help="Apply only if delivery country matches.") + country_group_id = fields.Many2one('res.country.group', string='Country Group', + help="Apply only if delivery country matches the group.") + state_ids = fields.Many2many('res.country.state', string='Federal States') + zip_from = fields.Char(string='Zip Range From') + zip_to = fields.Char(string='Zip Range To') + # To be used in hiding the 'Federal States' field('attrs' in view side) when selected 'Country' has 0 states. + states_count = fields.Integer(compute='_compute_states_count') + + def _compute_states_count(self): + for position in self: + position.states_count = len(position.country_id.state_ids) + + @api.constrains('zip_from', 'zip_to') + def _check_zip(self): + for position in self: + if position.zip_from and position.zip_to and position.zip_from > position.zip_to: + raise ValidationError(_('Invalid "Zip Range", please configure it properly.')) + + def map_tax(self, taxes, product=None, partner=None): + if not self: + return taxes + result = self.env['account.tax'] + for tax in taxes: + taxes_correspondance = self.tax_ids.filtered(lambda t: t.tax_src_id == tax._origin) + result |= taxes_correspondance.tax_dest_id if taxes_correspondance else tax + return result + + def map_account(self, account): + for pos in self.account_ids: + if pos.account_src_id == account: + return pos.account_dest_id + return account + + def map_accounts(self, accounts): + """ Receive a dictionary having accounts in values and try to replace those accounts accordingly to the fiscal position. + """ + ref_dict = {} + for line in self.account_ids: + ref_dict[line.account_src_id] = line.account_dest_id + for key, acc in accounts.items(): + if acc in ref_dict: + accounts[key] = ref_dict[acc] + return accounts + + @api.onchange('country_id') + def _onchange_country_id(self): + if self.country_id: + self.zip_from = self.zip_to = self.country_group_id = False + self.state_ids = [(5,)] + self.states_count = len(self.country_id.state_ids) + + @api.onchange('country_group_id') + def _onchange_country_group_id(self): + if self.country_group_id: + self.zip_from = self.zip_to = self.country_id = False + self.state_ids = [(5,)] + + @api.model + def _convert_zip_values(self, zip_from='', zip_to=''): + max_length = max(len(zip_from), len(zip_to)) + if zip_from.isdigit(): + zip_from = zip_from.rjust(max_length, '0') + if zip_to.isdigit(): + zip_to = zip_to.rjust(max_length, '0') + return zip_from, zip_to + + @api.model + def create(self, vals): + zip_from = vals.get('zip_from') + zip_to = vals.get('zip_to') + if zip_from and zip_to: + vals['zip_from'], vals['zip_to'] = self._convert_zip_values(zip_from, zip_to) + return super(AccountFiscalPosition, self).create(vals) + + def write(self, vals): + zip_from = vals.get('zip_from') + zip_to = vals.get('zip_to') + if zip_from or zip_to: + for rec in self: + vals['zip_from'], vals['zip_to'] = self._convert_zip_values(zip_from or rec.zip_from, zip_to or rec.zip_to) + return super(AccountFiscalPosition, self).write(vals) + + @api.model + def _get_fpos_by_region(self, country_id=False, state_id=False, zipcode=False, vat_required=False): + if not country_id: + return False + base_domain = [ + ('auto_apply', '=', True), + ('vat_required', '=', vat_required), + ('company_id', 'in', [self.env.company.id, False]), + ] + null_state_dom = state_domain = [('state_ids', '=', False)] + null_zip_dom = zip_domain = [('zip_from', '=', False), ('zip_to', '=', False)] + null_country_dom = [('country_id', '=', False), ('country_group_id', '=', False)] + + if zipcode: + zip_domain = [('zip_from', '<=', zipcode), ('zip_to', '>=', zipcode)] + + if state_id: + state_domain = [('state_ids', '=', state_id)] + + domain_country = base_domain + [('country_id', '=', country_id)] + domain_group = base_domain + [('country_group_id.country_ids', '=', country_id)] + + # Build domain to search records with exact matching criteria + fpos = self.search(domain_country + state_domain + zip_domain, limit=1) + # return records that fit the most the criteria, and fallback on less specific fiscal positions if any can be found + if not fpos and state_id: + fpos = self.search(domain_country + null_state_dom + zip_domain, limit=1) + if not fpos and zipcode: + fpos = self.search(domain_country + state_domain + null_zip_dom, limit=1) + if not fpos and state_id and zipcode: + fpos = self.search(domain_country + null_state_dom + null_zip_dom, limit=1) + + # fallback: country group with no state/zip range + if not fpos: + fpos = self.search(domain_group + null_state_dom + null_zip_dom, limit=1) + + if not fpos: + # Fallback on catchall (no country, no group) + fpos = self.search(base_domain + null_country_dom, limit=1) + return fpos + + @api.model + def get_fiscal_position(self, partner_id, delivery_id=None): + """ + :return: fiscal position found (recordset) + :rtype: :class:`account.fiscal.position` + """ + if not partner_id: + return self.env['account.fiscal.position'] + + # This can be easily overridden to apply more complex fiscal rules + PartnerObj = self.env['res.partner'] + partner = PartnerObj.browse(partner_id) + delivery = PartnerObj.browse(delivery_id) + + # If partner and delivery have the same vat prefix, use invoicing + if not delivery or (delivery.vat and partner.vat and delivery.vat[:2] == partner.vat[:2]): + delivery = partner + + # partner manually set fiscal position always win + if delivery.property_account_position_id or partner.property_account_position_id: + return delivery.property_account_position_id or partner.property_account_position_id + + # First search only matching VAT positions + vat_required = bool(partner.vat) + fp = self._get_fpos_by_region(delivery.country_id.id, delivery.state_id.id, delivery.zip, vat_required) + + # Then if VAT required found no match, try positions that do not require it + if not fp and vat_required: + fp = self._get_fpos_by_region(delivery.country_id.id, delivery.state_id.id, delivery.zip, False) + + return fp or self.env['account.fiscal.position'] + + +class AccountFiscalPositionTax(models.Model): + _name = 'account.fiscal.position.tax' + _description = 'Tax Mapping of Fiscal Position' + _rec_name = 'position_id' + _check_company_auto = True + + position_id = fields.Many2one('account.fiscal.position', string='Fiscal Position', + required=True, ondelete='cascade') + company_id = fields.Many2one('res.company', string='Company', related='position_id.company_id', store=True) + tax_src_id = fields.Many2one('account.tax', string='Tax on Product', required=True, check_company=True) + tax_dest_id = fields.Many2one('account.tax', string='Tax to Apply', check_company=True) + + _sql_constraints = [ + ('tax_src_dest_uniq', + 'unique (position_id,tax_src_id,tax_dest_id)', + 'A tax fiscal position could be defined only one time on same taxes.') + ] + + +class AccountFiscalPositionAccount(models.Model): + _name = 'account.fiscal.position.account' + _description = 'Accounts Mapping of Fiscal Position' + _rec_name = 'position_id' + _check_company_auto = True + + position_id = fields.Many2one('account.fiscal.position', string='Fiscal Position', + required=True, ondelete='cascade') + company_id = fields.Many2one('res.company', string='Company', related='position_id.company_id', store=True) + account_src_id = fields.Many2one('account.account', string='Account on Product', + check_company=True, required=True, + domain="[('deprecated', '=', False), ('company_id', '=', company_id)]") + account_dest_id = fields.Many2one('account.account', string='Account to Use Instead', + check_company=True, required=True, + domain="[('deprecated', '=', False), ('company_id', '=', company_id)]") + + _sql_constraints = [ + ('account_src_dest_uniq', + 'unique (position_id,account_src_id,account_dest_id)', + 'An account fiscal position could be defined only one time on same accounts.') + ] + + +class ResPartner(models.Model): + _name = 'res.partner' + _inherit = 'res.partner' + + @api.depends_context('company') + def _credit_debit_get(self): + tables, where_clause, where_params = self.env['account.move.line'].with_context(state='posted', company_id=self.env.company.id)._query_get() + where_params = [tuple(self.ids)] + where_params + if where_clause: + where_clause = 'AND ' + where_clause + self._cr.execute("""SELECT account_move_line.partner_id, act.type, SUM(account_move_line.amount_residual) + FROM """ + tables + """ + LEFT JOIN account_account a ON (account_move_line.account_id=a.id) + LEFT JOIN account_account_type act ON (a.user_type_id=act.id) + WHERE act.type IN ('receivable','payable') + AND account_move_line.partner_id IN %s + AND account_move_line.reconciled IS NOT TRUE + """ + where_clause + """ + GROUP BY account_move_line.partner_id, act.type + """, where_params) + treated = self.browse() + for pid, type, val in self._cr.fetchall(): + partner = self.browse(pid) + if type == 'receivable': + partner.credit = val + if partner not in treated: + partner.debit = False + treated |= partner + elif type == 'payable': + partner.debit = -val + if partner not in treated: + partner.credit = False + treated |= partner + remaining = (self - treated) + remaining.debit = False + remaining.credit = False + + def _asset_difference_search(self, account_type, operator, operand): + if operator not in ('<', '=', '>', '>=', '<='): + return [] + if type(operand) not in (float, int): + return [] + sign = 1 + if account_type == 'payable': + sign = -1 + res = self._cr.execute(''' + SELECT partner.id + FROM res_partner partner + LEFT JOIN account_move_line aml ON aml.partner_id = partner.id + JOIN account_move move ON move.id = aml.move_id + RIGHT JOIN account_account acc ON aml.account_id = acc.id + WHERE acc.internal_type = %s + AND NOT acc.deprecated AND acc.company_id = %s + AND move.state = 'posted' + GROUP BY partner.id + HAVING %s * COALESCE(SUM(aml.amount_residual), 0) ''' + operator + ''' %s''', (account_type, self.env.user.company_id.id, sign, operand)) + res = self._cr.fetchall() + if not res: + return [('id', '=', '0')] + return [('id', 'in', [r[0] for r in res])] + + @api.model + def _credit_search(self, operator, operand): + return self._asset_difference_search('receivable', operator, operand) + + @api.model + def _debit_search(self, operator, operand): + return self._asset_difference_search('payable', operator, operand) + + def _invoice_total(self): + self.total_invoiced = 0 + if not self.ids: + return True + + all_partners_and_children = {} + all_partner_ids = [] + for partner in self.filtered('id'): + # price_total is in the company currency + all_partners_and_children[partner] = self.with_context(active_test=False).search([('id', 'child_of', partner.id)]).ids + all_partner_ids += all_partners_and_children[partner] + + domain = [ + ('partner_id', 'in', all_partner_ids), + ('state', 'not in', ['draft', 'cancel']), + ('move_type', 'in', ('out_invoice', 'out_refund')), + ] + price_totals = self.env['account.invoice.report'].read_group(domain, ['price_subtotal'], ['partner_id']) + for partner, child_ids in all_partners_and_children.items(): + partner.total_invoiced = sum(price['price_subtotal'] for price in price_totals if price['partner_id'][0] in child_ids) + + def _compute_journal_item_count(self): + AccountMoveLine = self.env['account.move.line'] + for partner in self: + partner.journal_item_count = AccountMoveLine.search_count([('partner_id', '=', partner.id)]) + + def _compute_has_unreconciled_entries(self): + for partner in self: + # Avoid useless work if has_unreconciled_entries is not relevant for this partner + if not partner.active or not partner.is_company and partner.parent_id: + partner.has_unreconciled_entries = False + continue + self.env.cr.execute( + """ SELECT 1 FROM( + SELECT + p.last_time_entries_checked AS last_time_entries_checked, + MAX(l.write_date) AS max_date + FROM + account_move_line l + RIGHT JOIN account_account a ON (a.id = l.account_id) + RIGHT JOIN res_partner p ON (l.partner_id = p.id) + WHERE + p.id = %s + AND EXISTS ( + SELECT 1 + FROM account_move_line l + WHERE l.account_id = a.id + AND l.partner_id = p.id + AND l.amount_residual > 0 + ) + AND EXISTS ( + SELECT 1 + FROM account_move_line l + WHERE l.account_id = a.id + AND l.partner_id = p.id + AND l.amount_residual < 0 + ) + GROUP BY p.last_time_entries_checked + ) as s + WHERE (last_time_entries_checked IS NULL OR max_date > last_time_entries_checked) + """, (partner.id,)) + partner.has_unreconciled_entries = self.env.cr.rowcount == 1 + + def mark_as_reconciled(self): + self.env['account.partial.reconcile'].check_access_rights('write') + return self.sudo().write({'last_time_entries_checked': time.strftime(DEFAULT_SERVER_DATETIME_FORMAT)}) + + def _get_company_currency(self): + for partner in self: + if partner.company_id: + partner.currency_id = partner.sudo().company_id.currency_id + else: + partner.currency_id = self.env.company.currency_id + + credit = fields.Monetary(compute='_credit_debit_get', search=_credit_search, + string='Total Receivable', help="Total amount this customer owes you.") + debit = fields.Monetary(compute='_credit_debit_get', search=_debit_search, string='Total Payable', + help="Total amount you have to pay to this vendor.") + debit_limit = fields.Monetary('Payable Limit') + total_invoiced = fields.Monetary(compute='_invoice_total', string="Total Invoiced", + groups='account.group_account_invoice,account.group_account_readonly') + currency_id = fields.Many2one('res.currency', compute='_get_company_currency', readonly=True, + string="Currency", help='Utility field to express amount currency') + journal_item_count = fields.Integer(compute='_compute_journal_item_count', string="Journal Items") + property_account_payable_id = fields.Many2one('account.account', company_dependent=True, + string="Account Payable", + domain="[('internal_type', '=', 'payable'), ('deprecated', '=', False), ('company_id', '=', current_company_id)]", + help="This account will be used instead of the default one as the payable account for the current partner", + required=True) + property_account_receivable_id = fields.Many2one('account.account', company_dependent=True, + string="Account Receivable", + domain="[('internal_type', '=', 'receivable'), ('deprecated', '=', False), ('company_id', '=', current_company_id)]", + help="This account will be used instead of the default one as the receivable account for the current partner", + required=True) + property_account_position_id = fields.Many2one('account.fiscal.position', company_dependent=True, + string="Fiscal Position", + domain="[('company_id', '=', current_company_id)]", + help="The fiscal position determines the taxes/accounts used for this contact.") + property_payment_term_id = fields.Many2one('account.payment.term', company_dependent=True, + string='Customer Payment Terms', + domain="[('company_id', 'in', [current_company_id, False])]", + help="This payment term will be used instead of the default one for sales orders and customer invoices") + property_supplier_payment_term_id = fields.Many2one('account.payment.term', company_dependent=True, + string='Vendor Payment Terms', + domain="[('company_id', 'in', [current_company_id, False])]", + help="This payment term will be used instead of the default one for purchase orders and vendor bills") + ref_company_ids = fields.One2many('res.company', 'partner_id', + string='Companies that refers to partner') + has_unreconciled_entries = fields.Boolean(compute='_compute_has_unreconciled_entries', + help="The partner has at least one unreconciled debit and credit since last time the invoices & payments matching was performed.") + last_time_entries_checked = fields.Datetime( + string='Latest Invoices & Payments Matching Date', readonly=True, copy=False, + help='Last time the invoices & payments matching was performed for this partner. ' + 'It is set either if there\'s not at least an unreconciled debit and an unreconciled credit ' + 'or if you click the "Done" button.') + invoice_ids = fields.One2many('account.move', 'partner_id', string='Invoices', readonly=True, copy=False) + contract_ids = fields.One2many('account.analytic.account', 'partner_id', string='Partner Contracts', readonly=True) + bank_account_count = fields.Integer(compute='_compute_bank_count', string="Bank") + trust = fields.Selection([('good', 'Good Debtor'), ('normal', 'Normal Debtor'), ('bad', 'Bad Debtor')], string='Degree of trust you have in this debtor', default='normal', company_dependent=True) + invoice_warn = fields.Selection(WARNING_MESSAGE, 'Invoice', help=WARNING_HELP, default="no-message") + invoice_warn_msg = fields.Text('Message for Invoice') + # Computed fields to order the partners as suppliers/customers according to the + # amount of their generated incoming/outgoing account moves + supplier_rank = fields.Integer(default=0) + customer_rank = fields.Integer(default=0) + + def _get_name_search_order_by_fields(self): + res = super()._get_name_search_order_by_fields() + partner_search_mode = self.env.context.get('res_partner_search_mode') + if not partner_search_mode in ('customer', 'supplier'): + return res + order_by_field = 'COALESCE(res_partner.%s, 0) DESC,' + if partner_search_mode == 'customer': + field = 'customer_rank' + else: + field = 'supplier_rank' + + order_by_field = order_by_field % field + return '%s, %s' % (res, order_by_field % field) if res else order_by_field + + def _compute_bank_count(self): + bank_data = self.env['res.partner.bank'].read_group([('partner_id', 'in', self.ids)], ['partner_id'], ['partner_id']) + mapped_data = dict([(bank['partner_id'][0], bank['partner_id_count']) for bank in bank_data]) + for partner in self: + partner.bank_account_count = mapped_data.get(partner.id, 0) + + def _find_accounting_partner(self, partner): + ''' Find the partner for which the accounting entries will be created ''' + return partner.commercial_partner_id + + @api.model + def _commercial_fields(self): + return super(ResPartner, self)._commercial_fields() + \ + ['debit_limit', 'property_account_payable_id', 'property_account_receivable_id', 'property_account_position_id', + 'property_payment_term_id', 'property_supplier_payment_term_id', 'last_time_entries_checked'] + + def action_view_partner_invoices(self): + self.ensure_one() + action = self.env["ir.actions.actions"]._for_xml_id("account.action_move_out_invoice_type") + action['domain'] = [ + ('move_type', 'in', ('out_invoice', 'out_refund')), + ('partner_id', 'child_of', self.id), + ] + action['context'] = {'default_move_type':'out_invoice', 'move_type':'out_invoice', 'journal_type': 'sale', 'search_default_unpaid': 1} + return action + + def can_edit_vat(self): + ''' Can't edit `vat` if there is (non draft) issued invoices. ''' + can_edit_vat = super(ResPartner, self).can_edit_vat() + if not can_edit_vat: + return can_edit_vat + has_invoice = self.env['account.move'].search([ + ('move_type', 'in', ['out_invoice', 'out_refund']), + ('partner_id', 'child_of', self.commercial_partner_id.id), + ('state', '=', 'posted') + ], limit=1) + return can_edit_vat and not (bool(has_invoice)) + + @api.model_create_multi + def create(self, vals_list): + search_partner_mode = self.env.context.get('res_partner_search_mode') + is_customer = search_partner_mode == 'customer' + is_supplier = search_partner_mode == 'supplier' + if search_partner_mode: + for vals in vals_list: + if is_customer and 'customer_rank' not in vals: + vals['customer_rank'] = 1 + elif is_supplier and 'supplier_rank' not in vals: + vals['supplier_rank'] = 1 + return super().create(vals_list) + + def _increase_rank(self, field, n=1): + if self.ids and field in ['customer_rank', 'supplier_rank']: + try: + with self.env.cr.savepoint(flush=False): + query = sql.SQL(""" + SELECT {field} FROM res_partner WHERE ID IN %(partner_ids)s FOR UPDATE NOWAIT; + UPDATE res_partner SET {field} = {field} + %(n)s + WHERE id IN %(partner_ids)s + """).format(field=sql.Identifier(field)) + self.env.cr.execute(query, {'partner_ids': tuple(self.ids), 'n': n}) + for partner in self: + self.env.cache.remove(partner, partner._fields[field]) + except DatabaseError as e: + if e.pgcode == '55P03': + _logger.debug('Another transaction already locked partner rows. Cannot update partner ranks.') + else: + raise e |
