diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2023-02-06 15:29:55 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2023-02-06 15:29:55 +0700 |
| commit | 7cfed1e19f2e340d966ed2068176d21a0e8e9834 (patch) | |
| tree | ec9077cb4f89d23378ef09f9da0adb7135548081 /auditlog/models | |
| parent | 4b3b2d8b1a9a7a72fbe3d623e93dea3802ef0e56 (diff) | |
add audit log
Diffstat (limited to 'auditlog/models')
| -rw-r--r-- | auditlog/models/__init__.py | 8 | ||||
| -rw-r--r-- | auditlog/models/auditlog_log_line_view.py | 69 | ||||
| -rw-r--r-- | auditlog/models/autovacuum.py | 37 | ||||
| -rw-r--r-- | auditlog/models/http_request.py | 68 | ||||
| -rw-r--r-- | auditlog/models/http_session.py | 53 | ||||
| -rw-r--r-- | auditlog/models/log.py | 93 | ||||
| -rw-r--r-- | auditlog/models/rule.py | 738 |
7 files changed, 1066 insertions, 0 deletions
diff --git a/auditlog/models/__init__.py b/auditlog/models/__init__.py new file mode 100644 index 0000000..75e7754 --- /dev/null +++ b/auditlog/models/__init__.py @@ -0,0 +1,8 @@ +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from . import rule +from . import http_session +from . import http_request +from . import log +from . import auditlog_log_line_view +from . import autovacuum diff --git a/auditlog/models/auditlog_log_line_view.py b/auditlog/models/auditlog_log_line_view.py new file mode 100644 index 0000000..4703b95 --- /dev/null +++ b/auditlog/models/auditlog_log_line_view.py @@ -0,0 +1,69 @@ +from odoo import fields, models, tools + + +class AuditlogLogLineView(models.Model): + _name = "auditlog.log.line.view" + _inherit = "auditlog.log.line" + _description = "Auditlog - Log details (fields updated)" + _auto = False + _log_access = True + + name = fields.Char() + model_id = fields.Many2one("ir.model") + model_name = fields.Char() + model_model = fields.Char() + res_id = fields.Integer() + user_id = fields.Many2one("res.users") + method = fields.Char() + http_session_id = fields.Many2one( + "auditlog.http.session", string="Session", index=True + ) + http_request_id = fields.Many2one( + "auditlog.http.request", string="HTTP Request", index=True + ) + log_type = fields.Selection( + selection=lambda r: r.env["auditlog.rule"]._fields["log_type"].selection, + string="Type", + ) + + def _select_query(self): + return """ + alogl.id, + alogl.create_date, + alogl.create_uid, + alogl.write_uid, + alogl.write_date, + alogl.field_id, + alogl.log_id, + alogl.old_value, + alogl.new_value, + alogl.old_value_text, + alogl.new_value_text, + alogl.field_name, + alogl.field_description, + alog.name, + alog.model_id, + alog.model_name, + alog.model_model, + alog.res_id, + alog.user_id, + alog.method, + alog.http_session_id, + alog.http_request_id, + alog.log_type + """ + + def _from_query(self): + return """ + auditlog_log_line alogl + JOIN auditlog_log alog ON alog.id = alogl.log_id + """ + + def _query(self): + return "SELECT %s FROM %s" % (self._select_query(), self._from_query()) + + def init(self): + tools.drop_view_if_exists(self.env.cr, self._table) + self.env.cr.execute( + """CREATE or REPLACE VIEW %s as (%s)""" % (self._table, self._query()) + ) diff --git a/auditlog/models/autovacuum.py b/auditlog/models/autovacuum.py new file mode 100644 index 0000000..bf56fc5 --- /dev/null +++ b/auditlog/models/autovacuum.py @@ -0,0 +1,37 @@ +# Copyright 2016 ABF OSIELL <https://osiell.com> +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from datetime import datetime, timedelta + +from odoo import api, fields, models + +_logger = logging.getLogger(__name__) + + +class AuditlogAutovacuum(models.TransientModel): + _name = "auditlog.autovacuum" + _description = "Auditlog - Delete old logs" + + @api.model + def autovacuum(self, days, chunk_size=None): + """Delete all logs older than ``days``. This includes: + - CRUD logs (create, read, write, unlink) + - HTTP requests + - HTTP user sessions + + Called from a cron. + """ + days = (days > 0) and int(days) or 0 + deadline = datetime.now() - timedelta(days=days) + data_models = ("auditlog.log", "auditlog.http.request", "auditlog.http.session") + for data_model in data_models: + records = self.env[data_model].search( + [("create_date", "<=", fields.Datetime.to_string(deadline))], + limit=chunk_size, + order="create_date asc", + ) + nb_records = len(records) + with self.env.norecompute(): + records.unlink() + _logger.info("AUTOVACUUM - %s '%s' records deleted", nb_records, data_model) + return True diff --git a/auditlog/models/http_request.py b/auditlog/models/http_request.py new file mode 100644 index 0000000..c4c6512 --- /dev/null +++ b/auditlog/models/http_request.py @@ -0,0 +1,68 @@ +# Copyright 2015 ABF OSIELL <https://osiell.com> +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from psycopg2.extensions import AsIs + +from odoo import api, fields, models +from odoo.http import request + + +class AuditlogHTTPRequest(models.Model): + _name = "auditlog.http.request" + _description = "Auditlog - HTTP request log" + _order = "create_date DESC" + + display_name = fields.Char("Name", compute="_compute_display_name", store=True) + name = fields.Char("Path") + root_url = fields.Char("Root URL") + user_id = fields.Many2one("res.users", string="User") + http_session_id = fields.Many2one( + "auditlog.http.session", string="Session", index=True + ) + user_context = fields.Char("Context") + log_ids = fields.One2many("auditlog.log", "http_request_id", string="Logs") + + @api.depends("create_date", "name") + def _compute_display_name(self): + for httprequest in self: + create_date = fields.Datetime.from_string(httprequest.create_date) + tz_create_date = fields.Datetime.context_timestamp(httprequest, create_date) + httprequest.display_name = "{} ({})".format( + httprequest.name or "?", fields.Datetime.to_string(tz_create_date) + ) + + def name_get(self): + return [(request.id, request.display_name) for request in self] + + @api.model + def current_http_request(self): + """Create a log corresponding to the current HTTP request, and returns + its ID. This method can be called several times during the + HTTP query/response cycle, it will only log the request on the + first call. + If no HTTP request is available, returns `False`. + """ + if not request: + return False + http_session_model = self.env["auditlog.http.session"] + httprequest = request.httprequest + if httprequest: + if hasattr(httprequest, "auditlog_http_request_id"): + # Verify existence. Could have been rolled back after a + # concurrency error + self.env.cr.execute( + "SELECT id FROM %s WHERE id = %s", + (AsIs(self._table), httprequest.auditlog_http_request_id), + ) + if self.env.cr.fetchone(): + return httprequest.auditlog_http_request_id + vals = { + "name": httprequest.path, + "root_url": httprequest.url_root, + "user_id": request.uid, + "http_session_id": http_session_model.current_http_session(), + "user_context": request.context, + } + httprequest.auditlog_http_request_id = self.create(vals).id + return httprequest.auditlog_http_request_id + return False diff --git a/auditlog/models/http_session.py b/auditlog/models/http_session.py new file mode 100644 index 0000000..e64d291 --- /dev/null +++ b/auditlog/models/http_session.py @@ -0,0 +1,53 @@ +# Copyright 2015 ABF OSIELL <https://osiell.com> +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import api, fields, models +from odoo.http import request + + +class AuditlogtHTTPSession(models.Model): + _name = "auditlog.http.session" + _description = "Auditlog - HTTP User session log" + _order = "create_date DESC" + + display_name = fields.Char("Name", compute="_compute_display_name", store=True) + name = fields.Char("Session ID", index=True) + user_id = fields.Many2one("res.users", string="User", index=True) + http_request_ids = fields.One2many( + "auditlog.http.request", "http_session_id", string="HTTP Requests" + ) + + @api.depends("create_date", "user_id") + def _compute_display_name(self): + for httpsession in self: + create_date = fields.Datetime.from_string(httpsession.create_date) + tz_create_date = fields.Datetime.context_timestamp(httpsession, create_date) + httpsession.display_name = "{} ({})".format( + httpsession.user_id and httpsession.user_id.name or "?", + fields.Datetime.to_string(tz_create_date), + ) + + def name_get(self): + return [(session.id, session.display_name) for session in self] + + @api.model + def current_http_session(self): + """Create a log corresponding to the current HTTP user session, and + returns its ID. This method can be called several times during the + HTTP query/response cycle, it will only log the user session on the + first call. + If no HTTP user session is available, returns `False`. + """ + if not request: + return False + httpsession = request.session + if httpsession: + existing_session = self.search( + [("name", "=", httpsession.sid), ("user_id", "=", request.uid)], limit=1 + ) + if existing_session: + return existing_session.id + vals = {"name": httpsession.sid, "user_id": request.uid} + httpsession.auditlog_http_session_id = self.create(vals).id + return httpsession.auditlog_http_session_id + return False diff --git a/auditlog/models/log.py b/auditlog/models/log.py new file mode 100644 index 0000000..6093c77 --- /dev/null +++ b/auditlog/models/log.py @@ -0,0 +1,93 @@ +# Copyright 2015 ABF OSIELL <https://osiell.com> +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from odoo import _, api, fields, models +from odoo.exceptions import UserError + + +class AuditlogLog(models.Model): + _name = "auditlog.log" + _description = "Auditlog - Log" + _order = "create_date desc" + + name = fields.Char("Resource Name", size=64) + model_id = fields.Many2one( + "ir.model", string="Model", index=True, ondelete="set null" + ) + model_name = fields.Char(readonly=True) + model_model = fields.Char(string="Technical Model Name", readonly=True) + res_id = fields.Integer("Resource ID") + user_id = fields.Many2one("res.users", string="User") + method = fields.Char(size=64) + line_ids = fields.One2many("auditlog.log.line", "log_id", string="Fields updated") + http_session_id = fields.Many2one( + "auditlog.http.session", string="Session", index=True + ) + http_request_id = fields.Many2one( + "auditlog.http.request", string="HTTP Request", index=True + ) + log_type = fields.Selection( + [("full", "Full log"), ("fast", "Fast log")], string="Type" + ) + + @api.model_create_multi + def create(self, vals_list): + """Insert model_name and model_model field values upon creation.""" + for vals in vals_list: + if not vals.get("model_id"): + raise UserError(_("No model defined to create log.")) + model = self.env["ir.model"].browse(vals["model_id"]) + vals.update({"model_name": model.name, "model_model": model.model}) + return super().create(vals_list) + + def write(self, vals): + """Update model_name and model_model field values to reflect model_id + changes.""" + if "model_id" in vals: + if not vals["model_id"]: + raise UserError(_("The field 'model_id' cannot be empty.")) + model = self.env["ir.model"].browse(vals["model_id"]) + vals.update({"model_name": model.name, "model_model": model.model}) + return super().write(vals) + + +class AuditlogLogLine(models.Model): + _name = "auditlog.log.line" + _description = "Auditlog - Log details (fields updated)" + + field_id = fields.Many2one( + "ir.model.fields", ondelete="set null", string="Field", index=True + ) + log_id = fields.Many2one( + "auditlog.log", string="Log", ondelete="cascade", index=True + ) + old_value = fields.Text() + new_value = fields.Text() + old_value_text = fields.Text("Old value Text") + new_value_text = fields.Text("New value Text") + field_name = fields.Char("Technical name", readonly=True) + field_description = fields.Char("Description", readonly=True) + + @api.model_create_multi + def create(self, vals_list): + """Ensure field_id is not empty on creation and store field_name and + field_description.""" + for vals in vals_list: + if not vals.get("field_id"): + raise UserError(_("No field defined to create line.")) + field = self.env["ir.model.fields"].browse(vals["field_id"]) + vals.update( + {"field_name": field.name, "field_description": field.field_description} + ) + return super().create(vals_list) + + def write(self, vals): + """Ensure field_id is set during write and update field_name and + field_description values.""" + if "field_id" in vals: + if not vals["field_id"]: + raise UserError(_("The field 'field_id' cannot be empty.")) + field = self.env["ir.model.fields"].browse(vals["field_id"]) + vals.update( + {"field_name": field.name, "field_description": field.field_description} + ) + return super().write(vals) diff --git a/auditlog/models/rule.py b/auditlog/models/rule.py new file mode 100644 index 0000000..1db2687 --- /dev/null +++ b/auditlog/models/rule.py @@ -0,0 +1,738 @@ +# Copyright 2015 ABF OSIELL <https://osiell.com> +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +import copy + +from odoo import _, api, fields, models, modules +from odoo.exceptions import UserError + +FIELDS_BLACKLIST = [ + "id", + "create_uid", + "create_date", + "write_uid", + "write_date", + "display_name", + "__last_update", +] +# Used for performance, to avoid a dictionary instanciation when we need an +# empty dict to simplify algorithms +EMPTY_DICT = {} + + +class DictDiffer(object): + """Calculate the difference between two dictionaries as: + (1) items added + (2) items removed + (3) keys same in both but changed values + (4) keys same in both and unchanged values + """ + + def __init__(self, current_dict, past_dict): + self.current_dict, self.past_dict = current_dict, past_dict + self.set_current = set(current_dict) + self.set_past = set(past_dict) + self.intersect = self.set_current.intersection(self.set_past) + + def added(self): + return self.set_current - self.intersect + + def removed(self): + return self.set_past - self.intersect + + def changed(self): + return {o for o in self.intersect if self.past_dict[o] != self.current_dict[o]} + + def unchanged(self): + return {o for o in self.intersect if self.past_dict[o] == self.current_dict[o]} + + +class AuditlogRule(models.Model): + _name = "auditlog.rule" + _description = "Auditlog - Rule" + + name = fields.Char(required=True, states={"subscribed": [("readonly", True)]}) + model_id = fields.Many2one( + "ir.model", + "Model", + help="Select model for which you want to generate log.", + states={"subscribed": [("readonly", True)]}, + ondelete="set null", + index=True, + ) + model_name = fields.Char(readonly=True) + model_model = fields.Char(string="Technical Model Name", readonly=True) + user_ids = fields.Many2many( + "res.users", + "audittail_rules_users", + "user_id", + "rule_id", + string="Users", + help="if User is not added then it will applicable for all users", + states={"subscribed": [("readonly", True)]}, + ) + log_read = fields.Boolean( + "Log Reads", + help=( + "Select this if you want to keep track of read/open on any " + "record of the model of this rule" + ), + states={"subscribed": [("readonly", True)]}, + ) + log_write = fields.Boolean( + "Log Writes", + default=True, + help=( + "Select this if you want to keep track of modification on any " + "record of the model of this rule" + ), + states={"subscribed": [("readonly", True)]}, + ) + log_unlink = fields.Boolean( + "Log Deletes", + default=True, + help=( + "Select this if you want to keep track of deletion on any " + "record of the model of this rule" + ), + states={"subscribed": [("readonly", True)]}, + ) + log_create = fields.Boolean( + "Log Creates", + default=True, + help=( + "Select this if you want to keep track of creation on any " + "record of the model of this rule" + ), + states={"subscribed": [("readonly", True)]}, + ) + log_type = fields.Selection( + [("full", "Full log"), ("fast", "Fast log")], + string="Type", + required=True, + default="full", + help=( + "Full log: make a diff between the data before and after " + "the operation (log more info like computed fields which were " + "updated, but it is slower)\n" + "Fast log: only log the changes made through the create and " + "write operations (less information, but it is faster)" + ), + states={"subscribed": [("readonly", True)]}, + ) + # log_action = fields.Boolean( + # "Log Action", + # help=("Select this if you want to keep track of actions on the " + # "model of this rule")) + # log_workflow = fields.Boolean( + # "Log Workflow", + # help=("Select this if you want to keep track of workflow on any " + # "record of the model of this rule")) + state = fields.Selection( + [("draft", "Draft"), ("subscribed", "Subscribed")], + required=True, + default="draft", + ) + action_id = fields.Many2one( + "ir.actions.act_window", + string="Action", + states={"subscribed": [("readonly", True)]}, + ) + capture_record = fields.Boolean( + "Capture Record", + help="Select this if you want to keep track of Unlink Record", + ) + users_to_exclude_ids = fields.Many2many( + "res.users", + string="Users to Exclude", + context={"active_test": False}, + states={"subscribed": [("readonly", True)]}, + ) + + fields_to_exclude_ids = fields.Many2many( + "ir.model.fields", + domain="[('model_id', '=', model_id)]", + string="Fields to Exclude", + states={"subscribed": [("readonly", True)]}, + ) + + _sql_constraints = [ + ( + "model_uniq", + "unique(model_id)", + ( + "There is already a rule defined on this model\n" + "You cannot define another: please edit the existing one." + ), + ) + ] + + def _register_hook(self): + """Get all rules and apply them to log method calls.""" + super(AuditlogRule, self)._register_hook() + if not hasattr(self.pool, "_auditlog_field_cache"): + self.pool._auditlog_field_cache = {} + if not hasattr(self.pool, "_auditlog_model_cache"): + self.pool._auditlog_model_cache = {} + if not self: + self = self.search([("state", "=", "subscribed")]) + return self._patch_methods() + + def _patch_methods(self): + """Patch ORM methods of models defined in rules to log their calls.""" + updated = False + model_cache = self.pool._auditlog_model_cache + for rule in self: + if rule.state != "subscribed": + continue + if not self.pool.get(rule.model_id.model or rule.model_model): + # ignore rules for models not loadable currently + continue + model_cache[rule.model_id.model] = rule.model_id.id + model_model = self.env[rule.model_id.model or rule.model_model] + # CRUD + # -> create + check_attr = "auditlog_ruled_create" + if rule.log_create and not hasattr(model_model, check_attr): + model_model._patch_method("create", rule._make_create()) + setattr(type(model_model), check_attr, True) + updated = True + # -> read + check_attr = "auditlog_ruled_read" + if rule.log_read and not hasattr(model_model, check_attr): + model_model._patch_method("read", rule._make_read()) + setattr(type(model_model), check_attr, True) + updated = True + # -> write + check_attr = "auditlog_ruled_write" + if rule.log_write and not hasattr(model_model, check_attr): + model_model._patch_method("write", rule._make_write()) + setattr(type(model_model), check_attr, True) + updated = True + # -> unlink + check_attr = "auditlog_ruled_unlink" + if rule.log_unlink and not hasattr(model_model, check_attr): + model_model._patch_method("unlink", rule._make_unlink()) + setattr(type(model_model), check_attr, True) + updated = True + return updated + + def _revert_methods(self): + """Restore original ORM methods of models defined in rules.""" + updated = False + for rule in self: + model_model = self.env[rule.model_id.model or rule.model_model] + for method in ["create", "read", "write", "unlink"]: + if getattr(rule, "log_%s" % method) and hasattr( + getattr(model_model, method), "origin" + ): + model_model._revert_method(method) + delattr(type(model_model), "auditlog_ruled_%s" % method) + updated = True + if updated: + modules.registry.Registry(self.env.cr.dbname).signal_changes() + + @api.model + def create(self, vals): + """Update the registry when a new rule is created.""" + if "model_id" not in vals or not vals["model_id"]: + raise UserError(_("No model defined to create line.")) + model = self.env["ir.model"].browse(vals["model_id"]) + vals.update({"model_name": model.name, "model_model": model.model}) + new_record = super().create(vals) + if new_record._register_hook(): + modules.registry.Registry(self.env.cr.dbname).signal_changes() + return new_record + + def write(self, vals): + """Update the registry when existing rules are updated.""" + if "model_id" in vals: + if not vals["model_id"]: + raise UserError(_("Field 'model_id' cannot be empty.")) + model = self.env["ir.model"].browse(vals["model_id"]) + vals.update({"model_name": model.name, "model_model": model.model}) + res = super().write(vals) + if self._register_hook(): + modules.registry.Registry(self.env.cr.dbname).signal_changes() + return res + + def unlink(self): + """Unsubscribe rules before removing them.""" + self.unsubscribe() + return super(AuditlogRule, self).unlink() + + @api.model + def get_auditlog_fields(self, model): + """ + Get the list of auditlog fields for a model + By default it is all stored fields only, but you can + override this. + """ + return list( + n + for n, f in model._fields.items() + if (not f.compute and not f.related) or f.store + ) + + def _make_create(self): + """Instanciate a create method that log its calls.""" + self.ensure_one() + log_type = self.log_type + users_to_exclude = self.mapped("users_to_exclude_ids") + + @api.model_create_multi + @api.returns("self", lambda value: value.id) + def create_full(self, vals_list, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + new_records = create_full.origin(self, vals_list, **kwargs) + # Take a snapshot of record values from the cache instead of using + # 'read()'. It avoids issues with related/computed fields which + # stored in the database only at the end of the transaction, but + # their values exist in cache. + new_values = {} + fields_list = rule_model.get_auditlog_fields(self) + for new_record in new_records.sudo(): + new_values.setdefault(new_record.id, {}) + for fname, field in new_record._fields.items(): + if fname not in fields_list: + continue + new_values[new_record.id][fname] = field.convert_to_read( + new_record[fname], new_record + ) + if self.env.user in users_to_exclude: + return new_records + rule_model.sudo().create_logs( + self.env.uid, + self._name, + new_records.ids, + "create", + None, + new_values, + {"log_type": log_type}, + ) + return new_records + + @api.model_create_multi + @api.returns("self", lambda value: value.id) + def create_fast(self, vals_list, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + vals_list = rule_model._update_vals_list(vals_list) + vals_list2 = copy.deepcopy(vals_list) + new_records = create_fast.origin(self, vals_list, **kwargs) + new_values = {} + for vals, new_record in zip(vals_list2, new_records): + new_values.setdefault(new_record.id, vals) + if self.env.user in users_to_exclude: + return new_records + rule_model.sudo().create_logs( + self.env.uid, + self._name, + new_records.ids, + "create", + None, + new_values, + {"log_type": log_type}, + ) + return new_records + + return create_full if self.log_type == "full" else create_fast + + def _make_read(self): + """Instanciate a read method that log its calls.""" + self.ensure_one() + log_type = self.log_type + users_to_exclude = self.mapped("users_to_exclude_ids") + + def read(self, fields=None, load="_classic_read", **kwargs): + result = read.origin(self, fields, load, **kwargs) + # Sometimes the result is not a list but a dictionary + # Also, we can not modify the current result as it will break calls + result2 = result + if not isinstance(result2, list): + result2 = [result] + read_values = {d["id"]: d for d in result2} + # Old API + + # If the call came from auditlog itself, skip logging: + # avoid logs on `read` produced by auditlog during internal + # processing: read data of relevant records, 'ir.model', + # 'ir.model.fields'... (no interest in logging such operations) + if self.env.context.get("auditlog_disabled"): + return result + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + if self.env.user in users_to_exclude: + return result + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "read", + read_values, + None, + {"log_type": log_type}, + ) + return result + + return read + + def _make_write(self): + """Instanciate a write method that log its calls.""" + self.ensure_one() + log_type = self.log_type + users_to_exclude = self.mapped("users_to_exclude_ids") + + def write_full(self, vals, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + fields_list = rule_model.get_auditlog_fields(self) + old_values = { + d["id"]: d + for d in self.sudo() + .with_context(prefetch_fields=False) + .read(fields_list) + } + result = write_full.origin(self, vals, **kwargs) + new_values = { + d["id"]: d + for d in self.sudo() + .with_context(prefetch_fields=False) + .read(fields_list) + } + if self.env.user in users_to_exclude: + return result + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "write", + old_values, + new_values, + {"log_type": log_type}, + ) + return result + + def write_fast(self, vals, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + # Log the user input only, no matter if the `vals` is updated + # afterwards as it could not represent the real state + # of the data in the database + vals2 = dict(vals) + old_vals2 = dict.fromkeys(list(vals2.keys()), False) + old_values = {id_: old_vals2 for id_ in self.ids} + new_values = {id_: vals2 for id_ in self.ids} + result = write_fast.origin(self, vals, **kwargs) + if self.env.user in users_to_exclude: + return result + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "write", + old_values, + new_values, + {"log_type": log_type}, + ) + return result + + return write_full if self.log_type == "full" else write_fast + + def _make_unlink(self): + """Instanciate an unlink method that log its calls.""" + self.ensure_one() + log_type = self.log_type + users_to_exclude = self.mapped("users_to_exclude_ids") + + def unlink_full(self, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + fields_list = rule_model.get_auditlog_fields(self) + old_values = { + d["id"]: d + for d in self.sudo() + .with_context(prefetch_fields=False) + .read(fields_list) + } + if self.env.user in users_to_exclude: + return unlink_full.origin(self, **kwargs) + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "unlink", + old_values, + None, + {"log_type": log_type}, + ) + return unlink_full.origin(self, **kwargs) + + def unlink_fast(self, **kwargs): + self = self.with_context(auditlog_disabled=True) + rule_model = self.env["auditlog.rule"] + if self.env.user in users_to_exclude: + return unlink_fast.origin(self, **kwargs) + rule_model.sudo().create_logs( + self.env.uid, + self._name, + self.ids, + "unlink", + None, + None, + {"log_type": log_type}, + ) + return unlink_fast.origin(self, **kwargs) + + return unlink_full if self.log_type == "full" else unlink_fast + + def create_logs( + self, + uid, + res_model, + res_ids, + method, + old_values=None, + new_values=None, + additional_log_values=None, + ): + """Create logs. `old_values` and `new_values` are dictionaries, e.g: + {RES_ID: {'FIELD': VALUE, ...}} + """ + if old_values is None: + old_values = EMPTY_DICT + if new_values is None: + new_values = EMPTY_DICT + log_model = self.env["auditlog.log"] + http_request_model = self.env["auditlog.http.request"] + http_session_model = self.env["auditlog.http.session"] + model_model = self.env[res_model] + model_id = self.pool._auditlog_model_cache[res_model] + auditlog_rule = self.env["auditlog.rule"].search([("model_id", "=", model_id)]) + fields_to_exclude = auditlog_rule.fields_to_exclude_ids.mapped("name") + for res_id in res_ids: + name = model_model.browse(res_id).name_get() + res_name = name and name[0] and name[0][1] + vals = { + "name": res_name, + "model_id": model_id, + "res_id": res_id, + "method": method, + "user_id": uid, + "http_request_id": http_request_model.current_http_request(), + "http_session_id": http_session_model.current_http_session(), + } + vals.update(additional_log_values or {}) + log = log_model.create(vals) + diff = DictDiffer( + new_values.get(res_id, EMPTY_DICT), old_values.get(res_id, EMPTY_DICT) + ) + if method == "create": + self._create_log_line_on_create( + log, diff.added(), new_values, fields_to_exclude + ) + elif method == "read": + self._create_log_line_on_read( + log, + list(old_values.get(res_id, EMPTY_DICT).keys()), + old_values, + fields_to_exclude, + ) + elif method == "write": + self._create_log_line_on_write( + log, diff.changed(), old_values, new_values, fields_to_exclude + ) + elif method == "unlink" and auditlog_rule.capture_record: + self._create_log_line_on_read( + log, + list(old_values.get(res_id, EMPTY_DICT).keys()), + old_values, + fields_to_exclude, + ) + + def _get_field(self, model, field_name): + cache = self.pool._auditlog_field_cache + if field_name not in cache.get(model.model, {}): + cache.setdefault(model.model, {}) + # - we use 'search()' then 'read()' instead of the 'search_read()' + # to take advantage of the 'classic_write' loading + # - search the field in the current model and those it inherits + field_model = self.env["ir.model.fields"] + all_model_ids = [model.id] + all_model_ids.extend(model.inherited_model_ids.ids) + field = field_model.search( + [("model_id", "in", all_model_ids), ("name", "=", field_name)] + ) + # The field can be a dummy one, like 'in_group_X' on 'res.users' + # As such we can't log it (field_id is required to create a log) + if not field: + cache[model.model][field_name] = False + else: + field_data = field.read(load="_classic_write")[0] + cache[model.model][field_name] = field_data + return cache[model.model][field_name] + + def _create_log_line_on_read( + self, log, fields_list, read_values, fields_to_exclude + ): + """Log field filled on a 'read' operation.""" + log_line_model = self.env["auditlog.log.line"] + fields_to_exclude = fields_to_exclude + FIELDS_BLACKLIST + for field_name in fields_list: + if field_name in fields_to_exclude: + continue + field = self._get_field(log.model_id, field_name) + # not all fields have an ir.models.field entry (ie. related fields) + if field: + log_vals = self._prepare_log_line_vals_on_read(log, field, read_values) + log_line_model.create(log_vals) + + def _prepare_log_line_vals_on_read(self, log, field, read_values): + """Prepare the dictionary of values used to create a log line on a + 'read' operation. + """ + vals = { + "field_id": field["id"], + "log_id": log.id, + "old_value": read_values[log.res_id][field["name"]], + "old_value_text": read_values[log.res_id][field["name"]], + "new_value": False, + "new_value_text": False, + } + if field["relation"] and "2many" in field["ttype"]: + old_value_text = ( + self.env[field["relation"]].browse(vals["old_value"]).name_get() + ) + vals["old_value_text"] = old_value_text + return vals + + def _create_log_line_on_write( + self, log, fields_list, old_values, new_values, fields_to_exclude + ): + """Log field updated on a 'write' operation.""" + log_line_model = self.env["auditlog.log.line"] + fields_to_exclude = fields_to_exclude + FIELDS_BLACKLIST + for field_name in fields_list: + if field_name in fields_to_exclude: + continue + field = self._get_field(log.model_id, field_name) + # not all fields have an ir.models.field entry (ie. related fields) + if field: + log_vals = self._prepare_log_line_vals_on_write( + log, field, old_values, new_values + ) + log_line_model.create(log_vals) + + def _prepare_log_line_vals_on_write(self, log, field, old_values, new_values): + """Prepare the dictionary of values used to create a log line on a + 'write' operation. + """ + vals = { + "field_id": field["id"], + "log_id": log.id, + "old_value": old_values[log.res_id][field["name"]], + "old_value_text": old_values[log.res_id][field["name"]], + "new_value": new_values[log.res_id][field["name"]], + "new_value_text": new_values[log.res_id][field["name"]], + } + # for *2many fields, log the name_get + if log.log_type == "full" and field["relation"] and "2many" in field["ttype"]: + # Filter IDs to prevent a 'name_get()' call on deleted resources + existing_ids = self.env[field["relation"]]._search( + [("id", "in", vals["old_value"])] + ) + old_value_text = [] + if existing_ids: + existing_values = ( + self.env[field["relation"]].browse(existing_ids).name_get() + ) + old_value_text.extend(existing_values) + # Deleted resources will have a 'DELETED' text representation + deleted_ids = set(vals["old_value"]) - set(existing_ids) + for deleted_id in deleted_ids: + old_value_text.append((deleted_id, "DELETED")) + vals["old_value_text"] = old_value_text + new_value_text = ( + self.env[field["relation"]].browse(vals["new_value"]).name_get() + ) + vals["new_value_text"] = new_value_text + return vals + + def _create_log_line_on_create( + self, log, fields_list, new_values, fields_to_exclude + ): + """Log field filled on a 'create' operation.""" + log_line_model = self.env["auditlog.log.line"] + fields_to_exclude = fields_to_exclude + FIELDS_BLACKLIST + for field_name in fields_list: + if field_name in fields_to_exclude: + continue + field = self._get_field(log.model_id, field_name) + # not all fields have an ir.models.field entry (ie. related fields) + if field: + log_vals = self._prepare_log_line_vals_on_create(log, field, new_values) + log_line_model.create(log_vals) + + def _prepare_log_line_vals_on_create(self, log, field, new_values): + """Prepare the dictionary of values used to create a log line on a + 'create' operation. + """ + vals = { + "field_id": field["id"], + "log_id": log.id, + "old_value": False, + "old_value_text": False, + "new_value": new_values[log.res_id][field["name"]], + "new_value_text": new_values[log.res_id][field["name"]], + } + if log.log_type == "full" and field["relation"] and "2many" in field["ttype"]: + new_value_text = ( + self.env[field["relation"]].browse(vals["new_value"]).name_get() + ) + vals["new_value_text"] = new_value_text + return vals + + def subscribe(self): + """Subscribe Rule for auditing changes on model and apply shortcut + to view logs on that model. + """ + act_window_model = self.env["ir.actions.act_window"] + for rule in self: + # Create a shortcut to view logs + domain = "[('model_id', '=', %s), ('res_id', '=', active_id)]" % ( + rule.model_id.id + ) + vals = { + "name": _("View logs"), + "res_model": "auditlog.log", + "binding_model_id": rule.model_id.id, + "domain": domain, + } + act_window = act_window_model.sudo().create(vals) + rule.write({"state": "subscribed", "action_id": act_window.id}) + return True + + def unsubscribe(self): + """Unsubscribe Auditing Rule on model.""" + # Revert patched methods + self._revert_methods() + for rule in self: + # Remove the shortcut to view logs + act_window = rule.action_id + if act_window: + act_window.unlink() + self.write({"state": "draft"}) + return True + + @api.model + def _update_vals_list(self, vals_list): + # Odoo supports empty recordset assignment (while it doesn't handle + # non-empty recordset ¯\_(ツ)_/¯ ), it could be an Odoo issue, but in + # the meanwhile we have to handle this case to avoid errors when using + # ``deepcopy`` to log data. + for vals in vals_list: + for fieldname, fieldvalue in vals.items(): + if isinstance(fieldvalue, models.BaseModel) and not fieldvalue: + vals[fieldname] = False + return vals_list |
