diff options
| author | stephanchrst <stephanchrst@gmail.com> | 2023-02-06 15:29:55 +0700 |
|---|---|---|
| committer | stephanchrst <stephanchrst@gmail.com> | 2023-02-06 15:29:55 +0700 |
| commit | 7cfed1e19f2e340d966ed2068176d21a0e8e9834 (patch) | |
| tree | ec9077cb4f89d23378ef09f9da0adb7135548081 /auditlog/tests | |
| parent | 4b3b2d8b1a9a7a72fbe3d623e93dea3802ef0e56 (diff) | |
add audit log
Diffstat (limited to 'auditlog/tests')
| -rw-r--r-- | auditlog/tests/__init__.py | 3 | ||||
| -rw-r--r-- | auditlog/tests/test_auditlog.py | 633 | ||||
| -rw-r--r-- | auditlog/tests/test_autovacuum.py | 44 |
3 files changed, 680 insertions, 0 deletions
diff --git a/auditlog/tests/__init__.py b/auditlog/tests/__init__.py new file mode 100644 index 0000000..33f5b47 --- /dev/null +++ b/auditlog/tests/__init__.py @@ -0,0 +1,3 @@ +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). +from . import test_auditlog +from . import test_autovacuum diff --git a/auditlog/tests/test_auditlog.py b/auditlog/tests/test_auditlog.py new file mode 100644 index 0000000..d6a98fe --- /dev/null +++ b/auditlog/tests/test_auditlog.py @@ -0,0 +1,633 @@ +# Copyright 2015 Therp BV <https://therp.nl> +# © 2018 Pieter Paulussen <pieter_paulussen@me.com> +# © 2021 Stefan Rijnhart <stefan@opener.amsterdam> +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +from odoo.modules.migration import load_script +from odoo.tests.common import Form, SavepointCase, TransactionCase + +from odoo.addons.base.models.ir_model import MODULE_UNINSTALL_FLAG + + +class AuditlogCommon(object): + def test_LogCreation(self): + """First test, caching some data.""" + self.groups_rule.subscribe() + group = self.env["res.groups"].create({"name": "testgroup1"}) + self.assertEqual( + self.env["auditlog.log"].search_count( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "create"), + ("res_id", "=", group.id), + ] + ), + 1, + ) + + def test_LogCreation2(self): + """Second test, using cached data of the first one.""" + + self.groups_rule.subscribe() + + auditlog_log = self.env["auditlog.log"] + testgroup2 = self.env["res.groups"].create({"name": "testgroup2"}) + self.assertTrue( + auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "create"), + ("res_id", "=", testgroup2.id), + ] + ).ensure_one() + ) + + def test_LogCreation3(self): + """Third test, two groups, the latter being the parent of the former. + Then we remove it right after (with (2, X) tuple) to test the creation + of a 'write' log with a deleted resource (so with no text + representation). + """ + + self.groups_rule.subscribe() + auditlog_log = self.env["auditlog.log"] + testgroup3 = testgroup3 = self.env["res.groups"].create({"name": "testgroup3"}) + testgroup4 = self.env["res.groups"].create( + {"name": "testgroup4", "implied_ids": [(4, testgroup3.id)]} + ) + testgroup4.write({"implied_ids": [(2, testgroup3.id)]}) + self.assertTrue( + auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "create"), + ("res_id", "=", testgroup3.id), + ] + ).ensure_one() + ) + self.assertTrue( + auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "create"), + ("res_id", "=", testgroup4.id), + ] + ).ensure_one() + ) + self.assertTrue( + auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "write"), + ("res_id", "=", testgroup4.id), + ] + ).ensure_one() + ) + + def test_LogCreation4(self): + """Fourth test, create several records at once (with create multi + feature starting from Odoo 12) and check that the same number of logs + has been generated. + """ + + self.groups_rule.subscribe() + + auditlog_log = self.env["auditlog.log"] + groups_vals = [ + {"name": "testgroup1"}, + {"name": "testgroup3"}, + {"name": "testgroup2"}, + ] + groups = self.env["res.groups"].create(groups_vals) + # Ensure that the recordset returns is in the same order + # than list of vals + expected_names = ["testgroup1", "testgroup3", "testgroup2"] + self.assertEqual(groups.mapped("name"), expected_names) + + logs = auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "create"), + ("res_id", "in", groups.ids), + ] + ) + self.assertEqual(len(logs), len(groups)) + + def test_LogCreation5(self): + """Fifth test, create a record and check that the same number of logs + has been generated. And then delete it, check that it has created log + with 0 fields updated. + """ + self.groups_rule.subscribe() + + auditlog_log = self.env["auditlog.log"] + testgroup5 = self.env["res.groups"].create({"name": "testgroup5"}) + self.assertTrue( + auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "create"), + ("res_id", "=", testgroup5.id), + ] + ).ensure_one() + ) + testgroup5.unlink() + log_record = auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "unlink"), + ("res_id", "=", testgroup5.id), + ] + ).ensure_one() + self.assertTrue(log_record) + if not self.groups_rule.capture_record: + self.assertEqual(len(log_record.line_ids), 0) + + def test_LogCreation6(self): + """Six test, create a record and check that the same number of logs + has been generated. And then delete it, check that it has created log + with x fields updated as per rule + """ + self.groups_rule.subscribe() + + auditlog_log = self.env["auditlog.log"] + testgroup6 = self.env["res.groups"].create({"name": "testgroup6"}) + self.assertTrue( + auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "create"), + ("res_id", "=", testgroup6.id), + ] + ).ensure_one() + ) + testgroup6.unlink() + log_record = auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "unlink"), + ("res_id", "=", testgroup6.id), + ] + ).ensure_one() + self.assertTrue(log_record) + if self.groups_rule.capture_record: + self.assertTrue(len(log_record.line_ids) > 0) + + def test_LogCreation7(self): + """Seventh test: multi-create with different M2O values. + + Check that creation goes as planned (no error coming from ``deepcopy``) + """ + self.groups_rule.subscribe() + + auditlog_log = self.env["auditlog.log"] + cat = self.env["ir.module.category"].create({"name": "Test Category"}) + groups_vals = [ + {"name": "testgroup1"}, + {"name": "testgroup3", "category_id": cat.browse()}, + {"name": "testgroup2", "category_id": False}, + {"name": "testgroup4", "category_id": cat.id}, + ] + groups = self.env["res.groups"].create(groups_vals) + + # Ensure ``category_id`` field has the correct values + expected_ids = [False, False, False, cat.id] + self.assertEqual([g.category_id.id for g in groups], expected_ids) + + # Ensure the correct number of logs have been created + logs = auditlog_log.search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "create"), + ("res_id", "in", groups.ids), + ] + ) + self.assertEqual(len(logs), len(groups)) + + def test_LogUpdate(self): + """Tests write results with different M2O values.""" + self.groups_rule.subscribe() + group = self.env["res.groups"].create({"name": "testgroup1"}) + cat = self.env["ir.module.category"].create({"name": "Test Category"}) + group.write( + { + "name": "Testgroup1", + "category_id": cat.browse(), + } + ) + log1 = self.env["auditlog.log"].search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "write"), + ("res_id", "=", group.id), + ] + ) + self.assertEqual(len(log1), 1) + group.write({"name": "Testgroup2", "category_id": cat.id}) + log2 = self.env["auditlog.log"].search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "write"), + ("res_id", "=", group.id), + ("id", "not in", log1.ids), + ] + ) + self.assertEqual(len(log2), 1) + group.write({"name": "Testgroup3", "category_id": False}) + log3 = self.env["auditlog.log"].search( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "write"), + ("res_id", "=", group.id), + ("id", "not in", (log1 + log2).ids), + ] + ) + self.assertEqual(len(log3), 1) + + def test_LogDelete(self): + """Tests unlink results""" + self.groups_rule.subscribe() + group = self.env["res.groups"].create({"name": "testgroup1"}) + group.unlink() + self.assertEqual( + self.env["auditlog.log"].search_count( + [ + ("model_id", "=", self.groups_model_id), + ("method", "=", "unlink"), + ("res_id", "=", group.id), + ] + ), + 1, + ) + + +class TestAuditlogFull(TransactionCase, AuditlogCommon): + def setUp(self): + super(TestAuditlogFull, self).setUp() + self.groups_model_id = self.env.ref("base.model_res_groups").id + self.groups_rule = self.env["auditlog.rule"].create( + { + "name": "testrule for groups", + "model_id": self.groups_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "full", + } + ) + + def tearDown(self): + self.groups_rule.unlink() + super(TestAuditlogFull, self).tearDown() + + +class TestAuditlogFast(TransactionCase, AuditlogCommon): + def setUp(self): + super(TestAuditlogFast, self).setUp() + self.groups_model_id = self.env.ref("base.model_res_groups").id + self.groups_rule = self.env["auditlog.rule"].create( + { + "name": "testrule for groups", + "model_id": self.groups_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "fast", + } + ) + + def tearDown(self): + self.groups_rule.unlink() + super(TestAuditlogFast, self).tearDown() + + +class TestFieldRemoval(SavepointCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + + # Clear all existing logging lines + existing_audit_logs = cls.env["auditlog.log"].search([]) + existing_audit_logs.unlink() + + # Create a test model to remove + cls.test_model = cls.env["ir.model"].create( + {"name": "x_test_model", "model": "x_test.model", "state": "manual"} + ) + + # Create a test model field to remove + cls.test_field = cls.env["ir.model.fields"].create( + { + "name": "x_test_field", + "field_description": "x_Test Field", + "model_id": cls.test_model.id, + "ttype": "char", + "state": "manual", + } + ) + + # Setup auditlog rule + cls.auditlog_rule = cls.env["auditlog.rule"].create( + { + "name": "test.model", + "model_id": cls.test_model.id, + "log_type": "fast", + "log_read": False, + "log_create": True, + "log_write": True, + "log_unlink": False, + } + ) + + cls.auditlog_rule.subscribe() + # Trigger log creation + rec = cls.env["x_test.model"].create({"x_test_field": "test value"}) + rec.write({"x_test_field": "test value 2"}) + + cls.logs = cls.env["auditlog.log"].search( + [("res_id", "=", rec.id), ("model_id", "=", cls.test_model.id)] + ) + + def assert_values(self): + """Assert that the denormalized field and model info is present + on the auditlog records""" + self.logs.refresh() + self.assertEqual(self.logs[0].model_name, "x_test_model") + self.assertEqual(self.logs[0].model_model, "x_test.model") + + log_lines = self.logs.mapped("line_ids") + self.assertEqual(len(log_lines), 2) + self.assertEqual(log_lines[0].field_name, "x_test_field") + self.assertEqual(log_lines[0].field_description, "x_Test Field") + + self.auditlog_rule.refresh() + self.assertEqual(self.auditlog_rule.model_name, "x_test_model") + self.assertEqual(self.auditlog_rule.model_model, "x_test.model") + + def test_01_field_and_model_removal(self): + """Test field and model removal to check auditlog line persistence""" + self.assert_values() + + # Remove the field + self.test_field.with_context({MODULE_UNINSTALL_FLAG: True}).unlink() + self.assert_values() + # The field should not be linked + self.assertFalse(self.logs.mapped("line_ids.field_id")) + + # Remove the model + self.test_model.with_context({MODULE_UNINSTALL_FLAG: True}).unlink() + self.assert_values() + + # The model should not be linked + self.assertFalse(self.logs.mapped("model_id")) + # Assert rule values + self.assertFalse(self.auditlog_rule.model_id) + + def test_02_migration(self): + """Test the migration of the data model related to this feature""" + self.env.cr.execute("""DROP VIEW auditlog_log_line_view""") + # Drop the data model + self.env.cr.execute( + """ALTER TABLE auditlog_log + DROP COLUMN model_name, DROP COLUMN model_model""" + ) + self.env.cr.execute( + """ALTER TABLE auditlog_rule + DROP COLUMN model_name, DROP COLUMN model_model""" + ) + self.env.cr.execute( + """ALTER TABLE auditlog_log_line + DROP COLUMN field_name, DROP COLUMN field_description""" + ) + + # Recreate the data model + mod = load_script( + "auditlog/migrations/14.0.1.1.0/pre-migration.py", "pre-migration" + ) + mod.migrate(self.env.cr, "14.0.1.0.2") + + # Values are restored + self.assert_values() + + # The migration script is tolerant if the data model is already in place + mod.migrate(self.env.cr, "14.0.1.0.2") + + +class TestAuditlogFullCaptureRecord(TransactionCase, AuditlogCommon): + def setUp(self): + super(TestAuditlogFullCaptureRecord, self).setUp() + self.groups_model_id = self.env.ref("base.model_res_groups").id + self.groups_rule = self.env["auditlog.rule"].create( + { + "name": "testrule for groups with capture unlink record", + "model_id": self.groups_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "full", + "capture_record": True, + } + ) + + def tearDown(self): + self.groups_rule.unlink() + super(TestAuditlogFullCaptureRecord, self).tearDown() + + +class AuditLogRuleTestForUserFields(SavepointCase): + @classmethod + def setUpClass(cls): + super(AuditLogRuleTestForUserFields, cls).setUpClass() + # get Contact model id + cls.contact_model_id = ( + cls.env["ir.model"].search([("model", "=", "res.partner")]).id + ) + + # get phone field id + cls.fields_to_exclude_ids = ( + cls.env["ir.model.fields"] + .search([("model", "=", "res.partner"), ("name", "=", "phone")]) + .id + ) + + # get user id + cls.user = ( + cls.env["res.users"] + .with_context(no_reset_password=True, tracking_disable=True) + .create( + { + "name": "Test User", + "login": "testuser", + } + ) + ) + cls.user_2 = ( + cls.env["res.users"] + .with_context(no_reset_password=True, tracking_disable=True) + .create( + { + "name": "Test User2", + "login": "testuser2", + } + ) + ) + + cls.users_to_exclude_ids = cls.user.id + + # creating auditlog.rule + cls.auditlog_rule = ( + cls.env["auditlog.rule"] + .with_context(tracking_disable=True) + .create( + { + "name": "testrule 01", + "model_id": cls.contact_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "full", + "capture_record": True, + } + ) + ) + + # Updating phone in fields_to_exclude_ids + cls.auditlog_rule.fields_to_exclude_ids = [[4, cls.fields_to_exclude_ids]] + + # Updating users_to_exclude_ids + cls.auditlog_rule.users_to_exclude_ids = [[4, cls.users_to_exclude_ids]] + + # Subscribe auditlog.rule + cls.auditlog_rule.subscribe() + + cls.auditlog_log = cls.env["auditlog.log"] + + # Creating new res.partner + cls.testpartner1 = ( + cls.env["res.partner"] + .with_context(tracking_disable=True) + .create( + { + "name": "testpartner1", + "phone": "123", + } + ) + ) + + # Creating new res.partner from excluded user + cls.testpartner2 = ( + cls.env["res.partner"] + .with_context(tracking_disable=True) + .with_user(cls.user.id) + .create( + { + "name": "testpartner2", + } + ) + ) + + def test_01_AuditlogFull_field_exclude_create_log(self): + # Checking log is created for testpartner1 + create_log_record = self.auditlog_log.search( + [ + ("model_id", "=", self.auditlog_rule.model_id.id), + ("method", "=", "create"), + ("res_id", "=", self.testpartner1.id), + ] + ).ensure_one() + self.assertTrue(create_log_record) + field_names = create_log_record.line_ids.mapped("field_name") + + # Checking log lines not created for phone + self.assertTrue("phone" not in field_names) + + # Removing created log record + create_log_record.unlink() + + def test_02_AuditlogFull_field_exclude_write_log(self): + # Checking fields_to_exclude_ids + self.testpartner1.with_context(tracking_disable=True).write( + { + "phone": "1234567890", + } + ) + # Checking log is created for testpartner1 + write_log_record = self.auditlog_log.search( + [ + ("model_id", "=", self.auditlog_rule.model_id.id), + ("method", "=", "write"), + ("res_id", "=", self.testpartner1.id), + ] + ).ensure_one() + self.assertTrue(write_log_record) + field_names = write_log_record.line_ids.mapped("field_name") + + # Checking log lines not created for phone + self.assertTrue("phone" not in field_names) + + def test_03_AuditlogFull_user_exclude_write_log(self): + # Update email in Form view with excluded user + partner_form = Form( + self.testpartner1.with_user(self.user.id).with_context( + tracking_disable=True + ) + ) + partner_form.email = "vendor@mail.com" + testpartner1 = partner_form.save() + + # Checking write log not created + with self.assertRaises(ValueError): + self.auditlog_log.search( + [ + ("model_id", "=", self.auditlog_rule.model_id.id), + ("method", "=", "write"), + ("res_id", "=", testpartner1.id), + ("user_id", "=", self.user.id), + ] + ).ensure_one() + + def test_04_AuditlogFull_user_exclude_create_log(self): + # Checking create log not created for testpartner2 + with self.assertRaises(ValueError): + self.auditlog_log.search( + [ + ("model_id", "=", self.auditlog_rule.model_id.id), + ("method", "=", "create"), + ("res_id", "=", self.testpartner2.id), + ] + ).ensure_one() + + def test_05_AuditlogFull_user_exclude_unlink_log(self): + # Removing testpartner2 from excluded user + self.testpartner2.with_user(self.user).unlink() + + # Checking delete log not created for testpartner2 + with self.assertRaises(ValueError): + self.auditlog_log.search( + [ + ("model_id", "=", self.auditlog_rule.model_id.id), + ("method", "=", "unlink"), + ("res_id", "=", self.testpartner2.id), + ] + ).ensure_one() + + def test_06_AuditlogFull_unlink_log(self): + # Removing testpartner1 with user_2 + self.testpartner1.with_user(self.user_2).unlink() + delete_log_record = self.auditlog_log.search( + [ + ("model_id", "=", self.auditlog_rule.model_id.id), + ("method", "=", "unlink"), + ("res_id", "=", self.testpartner1.id), + ("user_id", "=", self.user_2.id), + ] + ).ensure_one() + + # Checking log lines are created + self.assertTrue(delete_log_record) + + # Removing auditlog_rule + self.auditlog_rule.unlink() diff --git a/auditlog/tests/test_autovacuum.py b/auditlog/tests/test_autovacuum.py new file mode 100644 index 0000000..2b4de24 --- /dev/null +++ b/auditlog/tests/test_autovacuum.py @@ -0,0 +1,44 @@ +# Copyright 2016 ABF OSIELL <https://osiell.com> +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import time + +from odoo.tests.common import TransactionCase + + +class TestAuditlogAutovacuum(TransactionCase): + def setUp(self): + super(TestAuditlogAutovacuum, self).setUp() + self.groups_model_id = self.env.ref("base.model_res_groups").id + self.groups_rule = self.env["auditlog.rule"].create( + { + "name": "testrule for groups", + "model_id": self.groups_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "state": "subscribed", + "log_type": "full", + } + ) + + def tearDown(self): + self.groups_rule.unlink() + super(TestAuditlogAutovacuum, self).tearDown() + + def test_autovacuum(self): + log_model = self.env["auditlog.log"] + autovacuum_model = self.env["auditlog.autovacuum"] + group = self.env["res.groups"].create({"name": "testgroup1"}) + nb_logs = log_model.search_count( + [("model_id", "=", self.groups_model_id), ("res_id", "=", group.id)] + ) + self.assertGreater(nb_logs, 0) + # Milliseconds are ignored by autovacuum, waiting 1s ensure that + # the logs generated will be processed by the vacuum + time.sleep(1) + autovacuum_model.autovacuum(days=0) + nb_logs = log_model.search_count( + [("model_id", "=", self.groups_model_id), ("res_id", "=", group.id)] + ) + self.assertEqual(nb_logs, 0) |
