diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2023-02-06 15:29:55 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2023-02-06 15:29:55 +0700 |
| commit | 7cfed1e19f2e340d966ed2068176d21a0e8e9834 (patch) | |
| tree | ec9077cb4f89d23378ef09f9da0adb7135548081 /auditlog/models/rule.py | |
| parent | 4b3b2d8b1a9a7a72fbe3d623e93dea3802ef0e56 (diff) | |
add audit log
Diffstat (limited to 'auditlog/models/rule.py')
| -rw-r--r-- | auditlog/models/rule.py | 738 |
1 files changed, 738 insertions, 0 deletions
diff --git a/auditlog/models/rule.py b/auditlog/models/rule.py new file mode 100644 index 0000000..1db2687 --- /dev/null +++ b/auditlog/models/rule.py @@ -0,0 +1,738 @@ +# Copyright 2015 ABF OSIELL <https://osiell.com> +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +import copy + +from odoo import _, api, fields, models, modules +from odoo.exceptions import UserError + +FIELDS_BLACKLIST = [ + "id", + "create_uid", + "create_date", + "write_uid", + "write_date", + "display_name", + "__last_update", +] +# Used for performance, to avoid a dictionary instanciation when we need an +# empty dict to simplify algorithms +EMPTY_DICT = {} + + +class DictDiffer(object): + """Calculate the difference between two dictionaries as: + (1) items added + (2) items removed + (3) keys same in both but changed values + (4) keys same in both and unchanged values + """ + + def __init__(self, current_dict, past_dict): + self.current_dict, self.past_dict = current_dict, past_dict + self.set_current = set(current_dict) + self.set_past = set(past_dict) + self.intersect = self.set_current.intersection(self.set_past) + + def added(self): + return self.set_current - self.intersect + + def removed(self): + return self.set_past - self.intersect + + def changed(self): + return {o for o in self.intersect if self.past_dict[o] != self.current_dict[o]} + + def unchanged(self): + return {o for o in self.intersect if self.past_dict[o] == self.current_dict[o]} + + +class AuditlogRule(models.Model): + _name = "auditlog.rule" + _description = "Auditlog - Rule" + + name = fields.Char(required=True, states={"subscribed": [("readonly", True)]}) + model_id = fields.Many2one( + "ir.model", + "Model", + help="Select model for which you want to generate log.", + states={"subscribed": [("readonly", True)]}, + ondelete="set null", + index=True, + ) + model_name = fields.Char(readonly=True) + model_model = fields.Char(string="Technical Model Name", readonly=True) + user_ids = fields.Many2many( + "res.users", + "audittail_rules_users", + "user_id", + "rule_id", + string="Users", + help="if User is not added then it will applicable for all users", + states={"subscribed": [("readonly", True)]}, + ) + log_read = fields.Boolean( + "Log Reads", + help=( + "Select this if you want to keep track of read/open on any " + "record of the model of this rule" + ), + states={"subscribed": [("readonly", True)]}, + ) + log_write = fields.Boolean( + "Log Writes", + default=True, + help=( + "Select this if you want to keep track of modification on any " + "record of the model of this rule" + ), + states={"subscribed": [("readonly", True)]}, + ) + log_unlink = fields.Boolean( + "Log Deletes", + default=True, + help=( + "Select this if you want to keep track of deletion on any " + "record of the model of this rule" + ), + states={"subscribed": [("readonly", True)]}, + ) + log_create = fields.Boolean( + "Log Creates", + default=True, + help=( + "Select this if you want to keep track of creation on any " + "record of the model of this rule" + ), + states={"subscribed": [("readonly", True)]}, + ) + log_type = fields.Selection( + [("full", "Full log"), ("fast", "Fast log")], + string="Type", + required=True, + default="full", + help=( + "Full log: make a diff between the data before and after " + "the operation (log more info like computed fields which were " + "updated, but it is slower)\n" + "Fast log: only log the changes made through the create and " + "write operations (less information, but it is faster)" + ), + states={"subscribed": [("readonly", True)]}, + ) + # log_action = fields.Boolean( + # "Log Action", + # help=("Select this if you want to keep track of actions on the " + # "model of this rule")) + # log_workflow = fields.Boolean( + # "Log Workflow", + # help=("Select this if you want to keep track of workflow on any " + # "record of the model of this rule")) + state = fields.Selection( + [("draft", "Draft"), ("subscribed", "Subscribed")], + required=True, + default="draft", + ) + action_id = fields.Many2one( + "ir.actions.act_window", + string="Action", + states={"subscribed": [("readonly", True)]}, + ) + capture_record = fields.Boolean( + "Capture Record", + help="Select this if you want to keep track of Unlink Record", + ) + users_to_exclude_ids = fields.Many2many( + "res.users", + string="Users to Exclude", + context={"active_test": False}, + states={"subscribed": [("readonly", True)]}, + ) + + fields_to_exclude_ids = fields.Many2many( + "ir.model.fields", + domain="[('model_id', '=', model_id)]", + string="Fields to Exclude", + states={"subscribed": [("readonly", True)]}, + ) + + _sql_constraints = [ + ( + "model_uniq", + "unique(model_id)", + ( + "There is already a rule defined on this model\n" + "You cannot define another: please edit the existing one." + ), + ) + ] + + def _register_hook(self): + """Get all rules and apply them to log method calls.""" + super(AuditlogRule, self)._register_hook() + if not hasattr(self.pool, "_auditlog_field_cache"): + self.pool._auditlog_field_cache = {} + if not hasattr(self.pool, "_auditlog_model_cache"): + self.pool._auditlog_model_cache = {} + if not self: + self = self.search([("state", "=", "subscribed")]) + return self._patch_methods() + + def _patch_methods(self): + """Patch ORM methods of models defined in rules to log their calls.""" + updated = False + model_cache = self.pool._auditlog_model_cache + for rule in self: + if rule.state != "subscribed": + continue + if not self.pool.get(rule.model_id.model or rule.model_model): + # ignore rules for models not loadable currently + continue + model_cache[rule.model_id.model] = rule.model_id.id + model_model = self.env[rule.model_id.model or rule.model_model] + # CRUD + # -> create + check_attr = "auditlog_ruled_create" + if rule.log_create and not hasattr(model_model, check_attr): + model_model._patch_method("create", rule._make_create()) + setattr(type(model_model), check_attr, True) + updated = True + # -> read + check_attr = "auditlog_ruled_read" + if rule.log_read and not hasattr(model_model, check_attr): + model_model._patch_method("read", rule._make_read()) + setattr(type(model_model), check_attr, True) + updated = True + # -> write + check_attr = "auditlog_ruled_write" + if rule.log_write and not hasattr(model_model, check_attr): + model_model._patch_method("write", rule._make_write()) + setattr(type(model_model), check_attr, True) + updated = True + # -> unlink + check_attr = "auditlog_ruled_unlink" + if rule.log_unlink and not hasattr(model_model, check_attr): + model_model._patch_method("unlink", rule._make_unlink()) + setattr(type(model_model), check_attr, True) + updated = True + return updated + + def _revert_methods(self): + """Restore original ORM methods of models defined in rules.""" + updated = False + for rule in self: + model_model = self.env[rule.model_id.model or rule.model_model] + for method in ["create", "read", "write", "unlink"]: + if getattr(rule, "log_%s" % method) and hasattr( + getattr(model_model, method), "origin" + ): + model_model._revert_method(method) + delattr(type(model_model), "auditlog_ruled_%s" % method) + updated = True + if updated: + modules.registry.Registry(self.env.cr.dbname).signal_changes() + + @api.model + def create(self, vals): + """Update the registry when a new rule is created.""" + if "model_id" not in vals or not vals["model_id"]: + raise UserError(_("No model defined to create line.")) + model = self.env["ir.model"].browse(vals["model_id"]) + vals.update({"model_name": model.name, "model_model": model.model}) + new_record = super().create(vals) + if new_record._register_hook(): + modules.registry.Registry(self.env.cr.dbname).signal_changes() + return new_record + + def write(self, vals): + """Update the registry when existing rules are updated.""" + if "model_id" in vals: + if not vals["model_id"]: + raise UserError(_("Field 'model_id' cannot be empty.")) + model = self.env["ir.model"].browse(vals["model_id"]) + vals.update({"model_name": model.name, "model_model": model.model}) + res = super().write(vals) + if self._register_hook(): + modules.registry.Registry(self.env.cr.dbname).signal_changes() + return res + + def unlink(self): + """Unsubscribe rules before removing them.""" + self.unsubscribe() + return super(AuditlogRule, self).unlink() + + @api.model + def get_auditlog_fields(self, model): + """ + Get the list of auditlog fields for a model + By default it is all stored fields only, but you can + override this. + """ + return list( + n + for n, f in model._fields.items() + if (not f.compute and not f.related) or f.store + ) + + def _make_create(self): + """Instanciate a create method that log its calls.""" + self.ensure_one() + log_type = self.log_type + users_to_exclude = self.mapped("users_to_exclude_ids") + + @api.model_create_multi + @api.returns("self", lambda value: value.id) + def create_full(self, vals_list, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + new_records = create_full.origin(self, vals_list, **kwargs) + # Take a snapshot of record values from the cache instead of using + # 'read()'. It avoids issues with related/computed fields which + # stored in the database only at the end of the transaction, but + # their values exist in cache. + new_values = {} + fields_list = rule_model.get_auditlog_fields(self) + for new_record in new_records.sudo(): + new_values.setdefault(new_record.id, {}) + for fname, field in new_record._fields.items(): + if fname not in fields_list: + continue + new_values[new_record.id][fname] = field.convert_to_read( + new_record[fname], new_record + ) + if self.env.user in users_to_exclude: + return new_records + rule_model.sudo().create_logs( + self.env.uid, + self._name, + new_records.ids, + "create", + None, + new_values, + {"log_type": log_type}, + ) + return new_records + + @api.model_create_multi + @api.returns("self", lambda value: value.id) + def create_fast(self, vals_list, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + vals_list = rule_model._update_vals_list(vals_list) + vals_list2 = copy.deepcopy(vals_list) + new_records = create_fast.origin(self, vals_list, **kwargs) + new_values = {} + for vals, new_record in zip(vals_list2, new_records): + new_values.setdefault(new_record.id, vals) + if self.env.user in users_to_exclude: + return new_records + rule_model.sudo().create_logs( + self.env.uid, + self._name, + new_records.ids, + "create", + None, + new_values, + {"log_type": log_type}, + ) + return new_records + + return create_full if self.log_type == "full" else create_fast + + def _make_read(self): + """Instanciate a read method that log its calls.""" + self.ensure_one() + log_type = self.log_type + users_to_exclude = self.mapped("users_to_exclude_ids") + + def read(self, fields=None, load="_classic_read", **kwargs): + result = read.origin(self, fields, load, **kwargs) + # Sometimes the result is not a list but a dictionary + # Also, we can not modify the current result as it will break calls + result2 = result + if not isinstance(result2, list): + result2 = [result] + read_values = {d["id"]: d for d in result2} + # Old API + + # If the call came from auditlog itself, skip logging: + # avoid logs on `read` produced by auditlog during internal + # processing: read data of relevant records, 'ir.model', + # 'ir.model.fields'... (no interest in logging such operations) + if self.env.context.get("auditlog_disabled"): + return result + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + if self.env.user in users_to_exclude: + return result + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "read", + read_values, + None, + {"log_type": log_type}, + ) + return result + + return read + + def _make_write(self): + """Instanciate a write method that log its calls.""" + self.ensure_one() + log_type = self.log_type + users_to_exclude = self.mapped("users_to_exclude_ids") + + def write_full(self, vals, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + fields_list = rule_model.get_auditlog_fields(self) + old_values = { + d["id"]: d + for d in self.sudo() + .with_context(prefetch_fields=False) + .read(fields_list) + } + result = write_full.origin(self, vals, **kwargs) + new_values = { + d["id"]: d + for d in self.sudo() + .with_context(prefetch_fields=False) + .read(fields_list) + } + if self.env.user in users_to_exclude: + return result + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "write", + old_values, + new_values, + {"log_type": log_type}, + ) + return result + + def write_fast(self, vals, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + # Log the user input only, no matter if the `vals` is updated + # afterwards as it could not represent the real state + # of the data in the database + vals2 = dict(vals) + old_vals2 = dict.fromkeys(list(vals2.keys()), False) + old_values = {id_: old_vals2 for id_ in self.ids} + new_values = {id_: vals2 for id_ in self.ids} + result = write_fast.origin(self, vals, **kwargs) + if self.env.user in users_to_exclude: + return result + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "write", + old_values, + new_values, + {"log_type": log_type}, + ) + return result + + return write_full if self.log_type == "full" else write_fast + + def _make_unlink(self): + """Instanciate an unlink method that log its calls.""" + self.ensure_one() + log_type = self.log_type + users_to_exclude = self.mapped("users_to_exclude_ids") + + def unlink_full(self, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + fields_list = rule_model.get_auditlog_fields(self) + old_values = { + d["id"]: d + for d in self.sudo() + .with_context(prefetch_fields=False) + .read(fields_list) + } + if self.env.user in users_to_exclude: + return unlink_full.origin(self, **kwargs) + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "unlink", + old_values, + None, + {"log_type": log_type}, + ) + return unlink_full.origin(self, **kwargs) + + def unlink_fast(self, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + if self.env.user in users_to_exclude: + return unlink_fast.origin(self, **kwargs) + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "unlink", + None, + None, + {"log_type": log_type}, + ) + return unlink_fast.origin(self, **kwargs) + + return unlink_full if self.log_type == "full" else unlink_fast + + def create_logs( + self, + uid, + res_model, + res_ids, + method, + old_values=None, + new_values=None, + additional_log_values=None, + ): + """Create logs. `old_values` and `new_values` are dictionaries, e.g: + {RES_ID: {'FIELD': VALUE, ...}} + """ + if old_values is None: + old_values = EMPTY_DICT + if new_values is None: + new_values = EMPTY_DICT + log_model = self.env["auditlog.log"] + http_request_model = self.env["auditlog.http.request"] + http_session_model = self.env["auditlog.http.session"] + model_model = self.env[res_model] + model_id = self.pool._auditlog_model_cache[res_model] + auditlog_rule = self.env["auditlog.rule"].search([("model_id", "=", model_id)]) + fields_to_exclude = auditlog_rule.fields_to_exclude_ids.mapped("name") + for res_id in res_ids: + name = model_model.browse(res_id).name_get() + res_name = name and name[0] and name[0][1] + vals = { + "name": res_name, + "model_id": model_id, + "res_id": res_id, + "method": method, + "user_id": uid, + "http_request_id": http_request_model.current_http_request(), + "http_session_id": http_session_model.current_http_session(), + } + vals.update(additional_log_values or {}) + log = log_model.create(vals) + diff = DictDiffer( + new_values.get(res_id, EMPTY_DICT), old_values.get(res_id, EMPTY_DICT) + ) + if method == "create": + self._create_log_line_on_create( + log, diff.added(), new_values, fields_to_exclude + ) + elif method == "read": + self._create_log_line_on_read( + log, + list(old_values.get(res_id, EMPTY_DICT).keys()), + old_values, + fields_to_exclude, + ) + elif method == "write": + self._create_log_line_on_write( + log, diff.changed(), old_values, new_values, fields_to_exclude + ) + elif method == "unlink" and auditlog_rule.capture_record: + self._create_log_line_on_read( + log, + list(old_values.get(res_id, EMPTY_DICT).keys()), + old_values, + fields_to_exclude, + ) + + def _get_field(self, model, field_name): + cache = self.pool._auditlog_field_cache + if field_name not in cache.get(model.model, {}): + cache.setdefault(model.model, {}) + # - we use 'search()' then 'read()' instead of the 'search_read()' + # to take advantage of the 'classic_write' loading + # - search the field in the current model and those it inherits + field_model = self.env["ir.model.fields"] + all_model_ids = [model.id] + all_model_ids.extend(model.inherited_model_ids.ids) + field = field_model.search( + [("model_id", "in", all_model_ids), ("name", "=", field_name)] + ) + # The field can be a dummy one, like 'in_group_X' on 'res.users' + # As such we can't log it (field_id is required to create a log) + if not field: + cache[model.model][field_name] = False + else: + field_data = field.read(load="_classic_write")[0] + cache[model.model][field_name] = field_data + return cache[model.model][field_name] + + def _create_log_line_on_read( + self, log, fields_list, read_values, fields_to_exclude + ): + """Log field filled on a 'read' operation.""" + log_line_model = self.env["auditlog.log.line"] + fields_to_exclude = fields_to_exclude + FIELDS_BLACKLIST + for field_name in fields_list: + if field_name in fields_to_exclude: + continue + field = self._get_field(log.model_id, field_name) + # not all fields have an ir.models.field entry (ie. related fields) + if field: + log_vals = self._prepare_log_line_vals_on_read(log, field, read_values) + log_line_model.create(log_vals) + + def _prepare_log_line_vals_on_read(self, log, field, read_values): + """Prepare the dictionary of values used to create a log line on a + 'read' operation. + """ + vals = { + "field_id": field["id"], + "log_id": log.id, + "old_value": read_values[log.res_id][field["name"]], + "old_value_text": read_values[log.res_id][field["name"]], + "new_value": False, + "new_value_text": False, + } + if field["relation"] and "2many" in field["ttype"]: + old_value_text = ( + self.env[field["relation"]].browse(vals["old_value"]).name_get() + ) + vals["old_value_text"] = old_value_text + return vals + + def _create_log_line_on_write( + self, log, fields_list, old_values, new_values, fields_to_exclude + ): + """Log field updated on a 'write' operation.""" + log_line_model = self.env["auditlog.log.line"] + fields_to_exclude = fields_to_exclude + FIELDS_BLACKLIST + for field_name in fields_list: + if field_name in fields_to_exclude: + continue + field = self._get_field(log.model_id, field_name) + # not all fields have an ir.models.field entry (ie. related fields) + if field: + log_vals = self._prepare_log_line_vals_on_write( + log, field, old_values, new_values + ) + log_line_model.create(log_vals) + + def _prepare_log_line_vals_on_write(self, log, field, old_values, new_values): + """Prepare the dictionary of values used to create a log line on a + 'write' operation. + """ + vals = { + "field_id": field["id"], + "log_id": log.id, + "old_value": old_values[log.res_id][field["name"]], + "old_value_text": old_values[log.res_id][field["name"]], + "new_value": new_values[log.res_id][field["name"]], + "new_value_text": new_values[log.res_id][field["name"]], + } + # for *2many fields, log the name_get + if log.log_type == "full" and field["relation"] and "2many" in field["ttype"]: + # Filter IDs to prevent a 'name_get()' call on deleted resources + existing_ids = self.env[field["relation"]]._search( + [("id", "in", vals["old_value"])] + ) + old_value_text = [] + if existing_ids: + existing_values = ( + self.env[field["relation"]].browse(existing_ids).name_get() + ) + old_value_text.extend(existing_values) + # Deleted resources will have a 'DELETED' text representation + deleted_ids = set(vals["old_value"]) - set(existing_ids) + for deleted_id in deleted_ids: + old_value_text.append((deleted_id, "DELETED")) + vals["old_value_text"] = old_value_text + new_value_text = ( + self.env[field["relation"]].browse(vals["new_value"]).name_get() + ) + vals["new_value_text"] = new_value_text + return vals + + def _create_log_line_on_create( + self, log, fields_list, new_values, fields_to_exclude + ): + """Log field filled on a 'create' operation.""" + log_line_model = self.env["auditlog.log.line"] + fields_to_exclude = fields_to_exclude + FIELDS_BLACKLIST + for field_name in fields_list: + if field_name in fields_to_exclude: + continue + field = self._get_field(log.model_id, field_name) + # not all fields have an ir.models.field entry (ie. related fields) + if field: + log_vals = self._prepare_log_line_vals_on_create(log, field, new_values) + log_line_model.create(log_vals) + + def _prepare_log_line_vals_on_create(self, log, field, new_values): + """Prepare the dictionary of values used to create a log line on a + 'create' operation. + """ + vals = { + "field_id": field["id"], + "log_id": log.id, + "old_value": False, + "old_value_text": False, + "new_value": new_values[log.res_id][field["name"]], + "new_value_text": new_values[log.res_id][field["name"]], + } + if log.log_type == "full" and field["relation"] and "2many" in field["ttype"]: + new_value_text = ( + self.env[field["relation"]].browse(vals["new_value"]).name_get() + ) + vals["new_value_text"] = new_value_text + return vals + + def subscribe(self): + """Subscribe Rule for auditing changes on model and apply shortcut + to view logs on that model. + """ + act_window_model = self.env["ir.actions.act_window"] + for rule in self: + # Create a shortcut to view logs + domain = "[('model_id', '=', %s), ('res_id', '=', active_id)]" % ( + rule.model_id.id + ) + vals = { + "name": _("View logs"), + "res_model": "auditlog.log", + "binding_model_id": rule.model_id.id, + "domain": domain, + } + act_window = act_window_model.sudo().create(vals) + rule.write({"state": "subscribed", "action_id": act_window.id}) + return True + + def unsubscribe(self): + """Unsubscribe Auditing Rule on model.""" + # Revert patched methods + self._revert_methods() + for rule in self: + # Remove the shortcut to view logs + act_window = rule.action_id + if act_window: + act_window.unlink() + self.write({"state": "draft"}) + return True + + @api.model + def _update_vals_list(self, vals_list): + # Odoo supports empty recordset assignment (while it doesn't handle + # non-empty recordset ¯\_(ツ)_/¯ ), it could be an Odoo issue, but in + # the meanwhile we have to handle this case to avoid errors when using + # ``deepcopy`` to log data. + for vals in vals_list: + for fieldname, fieldvalue in vals.items(): + if isinstance(fieldvalue, models.BaseModel) and not fieldvalue: + vals[fieldname] = False + return vals_list |
