from odoo import fields, models, api, _ from odoo.exceptions import AccessError, UserError, ValidationError from datetime import datetime, timedelta from pytz import timezone class PurchasePricelist(models.Model): _name = 'purchase.pricelist' _rec_name = 'product_id' _inherit = ['mail.thread', 'mail.activity.mixin'] name = fields.Char(string='Name', compute="_compute_name") product_id = fields.Many2one('product.product', string="Product", required=True) product_categ_ids = fields.Many2many('product.public.category', related="product_id.public_categ_ids") vendor_id = fields.Many2one('res.partner', string="Vendor", required=True) product_price = fields.Float(string='Human Price', required=True) system_price = fields.Float(string='System Price', readonly=True) human_last_update = fields.Datetime(string='Human Update') system_last_update = fields.Datetime(string='System Update') count_trx_po = fields.Integer(string='Count Trx Product') count_trx_po_vendor = fields.Integer(string='Count Trx Vendor') taxes_product_id = fields.Many2one('account.tax', string='Human Tax', domain=[('type_tax_use', '=', 'purchase')]) taxes_system_id = fields.Many2one('account.tax', string='System Tax') include_price = fields.Float(string='Final Price', readonly=True) brand_id = fields.Many2one('x_manufactures', string='Brand') count_brand_vendor = fields.Integer(string='Count Brand Vendor') is_winner = fields.Boolean(string='Winner', default=False, help='Pemenang yang direkomendasikan oleh Merchandise') def sync_pricelist_tier(self): for rec in self: promotion_product = self.env['promotion.product'].search([('product_id', '=', rec.product_id.id)]) for promotion in promotion_product: promotion.program_line_id.get_price_tier(promotion.product_id, promotion.qty) @api.depends('product_id', 'vendor_id') def _compute_name(self): self.name = self.vendor_id.name + ', ' + self.product_id.name @api.constrains('product_price','system_price','vendor_id','product_id','taxes_system_id','taxes_product_id') def _contrains_product_price(self): current_time = fields.Datetime.now(timezone('Asia/Jakarta')).strftime('%Y-%m-%d %H:%M:%S') update_by = self._context.get('update_by') if update_by == 'system': self.system_last_update = current_time else: self.human_last_update = current_time @api.constrains('system_last_update','system_price','product_price','human_last_update','taxes_system_id','taxes_product_id') def _constrains_include_price(self): price, taxes = self._get_valid_price() # When have tax is excluded or empty tax, then multiply by 1.11 if (taxes and not taxes.price_include) or not taxes: price *= 1.11 self.include_price = price def _get_valid_price(self): purchase_price = self price = 0 taxes = None human_last_update = purchase_price.human_last_update or datetime.min system_last_update = purchase_price.system_last_update or datetime.min if system_last_update > human_last_update: price = purchase_price.system_price taxes = purchase_price.taxes_system_id else: price = purchase_price.product_price taxes = purchase_price.taxes_product_id return price, taxes @api.constrains('vendor_id', 'product_id') def _check_duplicate_purchase_pricelist(self): for price in self: if not price.product_id or not price.vendor_id: continue domain = [ ('product_id', '=', price.product_id.id), ('vendor_id', '=', price.vendor_id.id) ] domain.append(('id', '!=', price.id)) existing_purchase = self.search(domain, limit=1) massage="Ada duplikat product dan vendor, berikut data yang anda duplikat : \n" + str(existing_purchase.product_id.name) + " - " + str(existing_purchase.vendor_id.name) + " - " + str(existing_purchase.product_price) if existing_purchase: raise UserError(massage) def sync_pricelist_item_promo(self, product): pricelist_product = self.env['product.pricelist.item'].search([('product_id', '=', product.id), ('pricelist_id', '=', 17022)]) for pricelist in pricelist_product: if pricelist.fixed_price == 0: flashsale = self.env['product.pricelist.item'].search([('product_id', '=', product.id), ('pricelist_id.is_flash_sale', '=', True)]) if flashsale: flashsale.fixed_price = 0 return def action_calculate_pricelist(self): MAX_PRICELIST = 10 active_ids = self.env.context.get('active_ids', []) if len(active_ids) > MAX_PRICELIST: raise ValidationError(_('You can only calculate up to %s pricelists at a time.' % MAX_PRICELIST)) records = self.env['purchase.pricelist'].browse(active_ids) price_group = self.env['price.group'].collect_price_group() for rec in records: if rec.include_price == 0: rec.sync_pricelist_item_promo(rec.product_id) product_group = rec.product_id.product_tmpl_id.x_manufacture.pricing_group or None price_incl = rec.include_price if not product_group: self.env.cr.commit() raise ValidationError(_('Produk %s tidak memiliki "Pricing Group"' % rec.product_id.display_name)) markup_percentage = price_group['markup'][product_group] if markup_percentage == 0: continue product_domain = [('product_id', '=', rec.product_id.id)] markup_pricelist = price_group['markup'].pricelist_id # base_price = price_incl + (price_incl * markup_percentage / 100) base_price = round(price_incl + (price_incl * markup_percentage / 100), 12) base_prod_pricelist = self.env['product.pricelist.item'].search(product_domain + [('pricelist_id', '=', markup_pricelist.id)], limit=1) base_prod_pricelist.fixed_price = base_price tier_percentages = [price_group[f'tier_{i}'][product_group] for i in range(1, 6)] for i, tier_percentage in enumerate(tier_percentages): tier_pricelist = price_group[f'tier_{i + 1}'].pricelist_id tier_price = round(price_incl + (price_incl * tier_percentage / 100), 12) tier_perc = round((base_price - tier_price) / base_price * 100, 12) tier_prod_pricelist = self.env['product.pricelist.item'].search(product_domain + [('pricelist_id', '=', tier_pricelist.id)], limit=1) tier_prod_pricelist.price_discount = tier_perc rec.sync_pricelist_tier() rec.product_id.product_tmpl_id._create_solr_queue('_sync_price_to_solr') def _collect_old_values(self, vals): return { record.id: { field_name: record[field_name] for field_name in vals.keys() if field_name in record._fields } for record in self } def _log_field_changes(self, vals, old_values): exclude_fields = ['solr_flag', 'desc_update_solr', 'last_update_solr', 'is_edited'] for record in self: changes = [] for field_name in vals: if field_name not in record._fields or field_name in exclude_fields: continue field = record._fields[field_name] field_label = field.string or field_name old_value = old_values.get(record.id, {}).get(field_name) new_value = record[field_name] def stringify(val, field, record): if val in [None, False]: return 'None' # Handle Selection if isinstance(field, fields.Selection): selection = field.selection if callable(selection): selection = selection(record) return dict(selection).get(val, str(val)) # Handle Boolean if isinstance(field, fields.Boolean): return 'Yes' if val else 'No' # Handle Many2one if isinstance(field, fields.Many2one): if isinstance(val, int): rec = record.env[field.comodel_name].browse(val) return rec.display_name if rec.exists() else str(val) elif isinstance(val, models.BaseModel): return val.display_name return str(val) # Handle Many2many if isinstance(field, fields.Many2many): records = val if isinstance(val, models.BaseModel) else record[field.name] if not records: return 'None' for attr in ['name', 'x_name', 'display_name']: if hasattr(records[0], attr): return ", ".join(records.mapped(attr)) return ", ".join(str(r.id) for r in records) # Handle One2many if isinstance(field, fields.One2many): records = val if isinstance(val, models.BaseModel) else record[field.name] if not records: return 'None' return f"{field.comodel_name}({', '.join(str(r.id) for r in records)})" # Default case (Char, Float, Integer, etc) return str(val) old_val_str = stringify(old_value, field, record) new_val_str = stringify(new_value, field, record) if old_val_str != new_val_str: changes.append(f"