diff --git a/auditlog_clickhouse_write/README.rst b/auditlog_clickhouse_write/README.rst new file mode 100644 index 00000000000..99b11e1b252 --- /dev/null +++ b/auditlog_clickhouse_write/README.rst @@ -0,0 +1,172 @@ +============================= +Store Audit Log in Clickhouse +============================= + +.. + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! This file is generated by oca-gen-addon-readme !! + !! changes will be overwritten. !! + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + !! source digest: sha256:11aaa38bad24a890554c0d34d74d31e13b933facbba3fea31f4cbf22ae8fd842 + !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +.. |badge1| image:: https://img.shields.io/badge/maturity-Beta-yellow.png + :target: https://odoo-community.org/page/development-status + :alt: Beta +.. |badge2| image:: https://img.shields.io/badge/licence-AGPL--3-blue.png + :target: http://www.gnu.org/licenses/agpl-3.0-standalone.html + :alt: License: AGPL-3 +.. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fserver--tools-lightgray.png?logo=github + :target: https://github.com/OCA/server-tools/tree/18.0/auditlog_clickhouse_write + :alt: OCA/server-tools +.. |badge4| image:: https://img.shields.io/badge/weblate-Translate%20me-F47D42.png + :target: https://translation.odoo-community.org/projects/server-tools-18-0/server-tools-18-0-auditlog_clickhouse_write + :alt: Translate me on Weblate +.. |badge5| image:: https://img.shields.io/badge/runboat-Try%20me-875A7B.png + :target: https://runboat.odoo-community.org/builds?repo=OCA/server-tools&target_branch=18.0 + :alt: Try me on Runboat + +|badge1| |badge2| |badge3| |badge4| |badge5| + +This module implements buffered asynchronous transfers audit of logs +from PostgreSQL to ClickHouse. Storing audit data in a columnar database +that is write-only prevents database bloat, makes audit records +effectively immutable, and allows for scaling to very large volumes of +logs without slowing down normal transactions. Audit logs are written +asynchronously to reduce the load on business operations. + +**Table of contents** + +.. contents:: + :local: + +Use Cases / Context +=================== + +The auditlog module stores audit data in PostgreSQL. In production +systems with extensive audit rules, these tables grow without limits, +causing three issues: + +- Database bloat; +- Immutability gap: Members of group_auditlog_manager (implied by + base.group_system) have full CRUD access to audit tables, allowing + audit records to be altered or deleted via UI, ORM, or SQL; +- Performance overhead: Audit logging runs synchronously in the same + transaction and performs multiple ORM create() calls, adding latency + to audited operations. + +Configuration +============= + +This module requires: + +- A reachable ClickHouse server. +- Python dependency ``clickhouse-driver`` available in the Odoo + environment. +- A ClickHouse database created in advance (the module does **not** + create databases/users/grants). +- A ClickHouse user with at least: + + - ``INSERT`` and ``CREATE TABLE`` privileges on the target database. + +.. + + ClickHouse installation (Docker guide): + ``https://clickhouse.com/docs/install/docker`` + +Steps: + +- Make sure ``clickhouse-driver`` is available in your system. +- Install the module. +- Configure the connection parameters in Odoo: + + - **Settings > Technical > Auditlog > Clickhouse configuration** + - Fill in the following parameters: + ++---------------------------------------+ +| Field | ++=======================================+ +| Hostname or IP | ++---------------------------------------+ +| TCP port | ++---------------------------------------+ +| ClickHouse database name | ++---------------------------------------+ +| ClickHouse user | ++---------------------------------------+ +| ClickHouse Password | ++---------------------------------------+ +| queue_job_batch_size (default = 1000) | ++---------------------------------------+ +| channel_id (default root) | ++---------------------------------------+ + +- Click **Test connection**. +- Optionally, click **Create Auditlog Tables** to create the tables in + the target database. + +Usage +===== + +Once auditlog_clickhouse_write is installed and configured: + +- Users perform tracked operations (create, write, unlink, read, export) + on models with active auditlog.rule subscriptions. This behavior is + unchanged from the base auditlog module. +- Log data is serialized and stored in the local auditlog.log.buffer + table instantly. The standard auditlog tables are not populated. +- Every 5 minutes (default), the Cron job runs, pushes data to + ClickHouse, and cleans the local buffer. +- Data is permanently stored in ClickHouse and cannot be modified or + deleted via Odoo. + +Bug Tracker +=========== + +Bugs are tracked on `GitHub Issues `_. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +`feedback `_. + +Do not contact contributors directly about support or help with technical issues. + +Credits +======= + +Authors +------- + +* Cetmix + +Contributors +------------ + +- `Cetmix `__ + + - Ivan Sokolov + - George Smirnov + - Dmitry Meita + +Other credits +------------- + +The development of this module has been financially supported by: + +- Geschäftsstelle Sozialinfo + +Maintainers +----------- + +This module is maintained by the OCA. + +.. image:: https://odoo-community.org/logo.png + :alt: Odoo Community Association + :target: https://odoo-community.org + +OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use. + +This module is part of the `OCA/server-tools `_ project on GitHub. + +You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute. diff --git a/auditlog_clickhouse_write/__init__.py b/auditlog_clickhouse_write/__init__.py new file mode 100644 index 00000000000..0650744f6bc --- /dev/null +++ b/auditlog_clickhouse_write/__init__.py @@ -0,0 +1 @@ +from . import models diff --git a/auditlog_clickhouse_write/__manifest__.py b/auditlog_clickhouse_write/__manifest__.py new file mode 100644 index 00000000000..05e78aff82b --- /dev/null +++ b/auditlog_clickhouse_write/__manifest__.py @@ -0,0 +1,21 @@ +{ + "name": "Store Audit Log in Clickhouse", + "version": "18.0.1.0.0", + "summary": "Asynchronous audit log storage in ClickHouse", + "category": "Tools", + "license": "AGPL-3", + "author": "Odoo Community Association (OCA), Cetmix", + "website": "https://github.com/OCA/server-tools", + "depends": [ + "auditlog", + "queue_job", + ], + "external_dependencies": { + "python": ["clickhouse-driver"], + }, + "data": [ + "security/ir.model.access.csv", + "data/auditlog_clickhouse_queue.xml", + "views/auditlog_clickhouse_config_views.xml", + ], +} diff --git a/auditlog_clickhouse_write/data/auditlog_clickhouse_queue.xml b/auditlog_clickhouse_write/data/auditlog_clickhouse_queue.xml new file mode 100644 index 00000000000..6d5370020f7 --- /dev/null +++ b/auditlog_clickhouse_write/data/auditlog_clickhouse_queue.xml @@ -0,0 +1,27 @@ + + + Auditlog ClickHouse Write: enqueue buffer flush + + code + model._cron_flush_to_clickhouse() + 5 + minutes + True + + + + + Edit buffer flush schedule + ir.cron + + ir.actions.act_window + form + new + + diff --git a/auditlog_clickhouse_write/i18n/auditlog_clickhouse_write.pot b/auditlog_clickhouse_write/i18n/auditlog_clickhouse_write.pot new file mode 100644 index 00000000000..2079fb9ce46 --- /dev/null +++ b/auditlog_clickhouse_write/i18n/auditlog_clickhouse_write.pot @@ -0,0 +1,335 @@ +# Translation of Odoo Server. +# This file contains the translation of the following modules: +# * auditlog_clickhouse_write +# +msgid "" +msgstr "" +"Project-Id-Version: Odoo Server 18.0\n" +"Report-Msgid-Bugs-To: \n" +"POT-Creation-Date: 2026-03-05 18:40+0000\n" +"PO-Revision-Date: 2026-03-05 18:40+0000\n" +"Last-Translator: \n" +"Language-Team: \n" +"MIME-Version: 1.0\n" +"Content-Type: text/plain; charset=UTF-8\n" +"Content-Transfer-Encoding: \n" +"Plural-Forms: \n" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "" +"%s\n" +"\n" +"If you save this configuration as active, the currently active one will be deactivated:\n" +"- %s" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "" +"As soon as this connection to ClickHouse is activated, all log\n" +" entries from that moment will be stored in the configured\n" +" ClickHouse database. Only one connection can be active at a\n" +" time." +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "" +"As soon as this connection to ClickHouse is activated, all log entries from that moment will be stored in the configured ClickHouse database.\n" +"\n" +" Only one connection can be active at a time." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__attempt_count +msgid "Attempt Count" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model,name:auditlog_clickhouse_write.model_auditlog_rule +msgid "Auditlog - Rule" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model,name:auditlog_clickhouse_write.model_auditlog_log_buffer +msgid "Auditlog ClickHouse Buffer" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Auditlog ClickHouse Configuration" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model,name:auditlog_clickhouse_write.model_auditlog_clickhouse_config +msgid "Auditlog ClickHouse Write Configuration" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.actions.server,name:auditlog_clickhouse_write.ir_cron_auditlog_clickhouse_write_enqueue_flush_ir_actions_server +msgid "Auditlog ClickHouse Write: enqueue buffer flush" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "Auditlog tables were created (if they did not exist)." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__queue_batch_size +msgid "Batch size" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__queue_channel_id +msgid "Channel" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.actions.act_window,name:auditlog_clickhouse_write.action_auditlog_clickhouse_config +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_list +msgid "ClickHouse Configuration" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "ClickHouse activation" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.ui.menu,name:auditlog_clickhouse_write.menu_auditlog_clickhouse_config +msgid "ClickHouse configuration" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "ClickHouse connection failed: %s" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__port +msgid "" +"ClickHouse native TCP port used by clickhouse-driver (default is 9000)." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__host +msgid "" +"ClickHouse server hostname or IP address. Must be reachable from the Odoo " +"server." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__user +msgid "" +"ClickHouse user name used for INSERT operations into auditlog tables. " +"Recommended: a dedicated user with INSERT-only privileges." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Configure action" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.actions.act_window,name:auditlog_clickhouse_write.action_configure_auditlog_clickhouse_write_flush_cron +msgid "Configure flush action" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Connection" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "Connection to ClickHouse is OK." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Create Auditlog Tables" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__create_uid +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__create_uid +msgid "Created by" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__create_date +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__create_date +msgid "Created on" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__database +msgid "Database name" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__display_name +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__display_name +msgid "Display Name" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.actions.act_window,name:auditlog_clickhouse_write.action_configure_auditlog_clickhouse_write_cron +msgid "Edit buffer flush schedule" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_log_buffer.py:0 +msgid "Error" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__error_message +msgid "Error Message" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Export Queue" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "Failed to create ClickHouse tables: %s" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_log_buffer.py:0 +msgid "Flushed to ClickHouse but failed to delete buffer rows: %s" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__host +msgid "Hostname or IP" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__id +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__id +msgid "ID" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__is_active +msgid "" +"If checked audit logs will be buffered locally and exported to ClickHouse. " +"Only one configuration can be active at a time." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__is_active +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_list +msgid "Is Active" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__write_uid +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__write_uid +msgid "Last Updated by" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__write_date +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__write_date +msgid "Last Updated on" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "" +"Logs are buffered in PostgreSQL and periodically flushed to\n" +" ClickHouse by cron." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__queue_batch_size +msgid "Maximum number of buffer rows processed per queue job run." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Notes" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__password +msgid "Password" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__password +msgid "Password for the ClickHouse user." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__payload_json +msgid "Payload Json" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_log_buffer.py:0 +msgid "Pending" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/clickhouse_client.py:0 +msgid "" +"Python package 'clickhouse-driver' is not available. Install it in the Odoo " +"environment to use ClickHouse storage." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_log_buffer__state +msgid "State" +msgstr "" + +#. module: auditlog_clickhouse_write +#. odoo-python +#: code:addons/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py:0 +msgid "Success" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__port +msgid "TCP Port" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__database +msgid "" +"Target ClickHouse database where auditlog tables exist (or will be created " +"by the setup button)." +msgstr "" + +#. module: auditlog_clickhouse_write +#: model_terms:ir.ui.view,arch_db:auditlog_clickhouse_write.view_auditlog_clickhouse_config_form +msgid "Test connection" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,field_description:auditlog_clickhouse_write.field_auditlog_clickhouse_config__user +msgid "User" +msgstr "" + +#. module: auditlog_clickhouse_write +#: model:ir.model.fields,help:auditlog_clickhouse_write.field_auditlog_clickhouse_config__queue_channel_id +msgid "queue_job channel used for export jobs." +msgstr "" diff --git a/auditlog_clickhouse_write/models/__init__.py b/auditlog_clickhouse_write/models/__init__.py new file mode 100644 index 00000000000..9791cf29bce --- /dev/null +++ b/auditlog_clickhouse_write/models/__init__.py @@ -0,0 +1,4 @@ +from . import auditlog_clickhouse_config +from . import clickhouse_client +from . import auditlog_log_buffer +from . import auditlog_rule diff --git a/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py b/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py new file mode 100644 index 00000000000..1057f96ef73 --- /dev/null +++ b/auditlog_clickhouse_write/models/auditlog_clickhouse_config.py @@ -0,0 +1,480 @@ +import logging +import re + +from odoo import api, fields, models +from odoo.exceptions import UserError, ValidationError + +from .clickhouse_client import get_clickhouse_client + +_logger = logging.getLogger(__name__) + + +class AuditlogClickhouseConfig(models.Model): + """ + ClickHouse connection configuration for auditlog_clickhouse_write. + + Business rules: + - Only one configuration can be active at a time. + - UI provides tools to test the connection and (optionally) create tables. + + Notes: + - As soon as a configuration becomes active, audit log entries will be stored + in the configured ClickHouse database from that moment. + """ + + _name = "auditlog.clickhouse.config" + _description = "Auditlog ClickHouse Write Configuration" + _rec_name = "display_name" + + DEFAULT_PORT = 9000 + DEFAULT_DB = "odoo_audit" + DB_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") + DEFAULT_USER = "odoo_audit_writer" + DEFAULT_QUEUE_BATCH_SIZE = 1000 + + is_active = fields.Boolean( + help=( + "If checked audit logs will be buffered locally and exported to ClickHouse." + " Only one configuration can be active at a time." + ), + ) + host = fields.Char( + string="Hostname or IP", + required=True, + help=( + "ClickHouse server hostname or IP address. " + "Must be reachable from the Odoo server." + ), + ) + port = fields.Integer( + string="TCP Port", + required=True, + default=DEFAULT_PORT, + help=( + "ClickHouse native TCP port used by clickhouse-driver " "(default is 9000)." + ), + ) + database = fields.Char( + string="Database name", + required=True, + default=DEFAULT_DB, + help=( + "Target ClickHouse database where auditlog tables exist " + "(or will be created by the setup button)." + ), + ) + user = fields.Char( + required=True, + default=DEFAULT_USER, + help=( + "ClickHouse user name used for INSERT operations into auditlog tables. " + "Recommended: a dedicated user with INSERT-only privileges." + ), + ) + password = fields.Char( + help="Password for the ClickHouse user.", + ) + + queue_batch_size = fields.Integer( + string="Batch size", + default=DEFAULT_QUEUE_BATCH_SIZE, + required=True, + help="Maximum number of buffer rows processed per queue job run.", + ) + + def _default_queue_channel(self): + """Return default queue_job channel (root). + + :return: Root queue.job.channel record or empty recordset + :rtype: odoo.models.BaseModel + """ + Channel = self.env["queue.job.channel"].sudo() + channel = Channel.search([("complete_name", "=", "root")], limit=1) + return channel + + queue_channel_id = fields.Many2one( + comodel_name="queue.job.channel", + string="Channel", + required=True, + default=lambda self: self._default_queue_channel(), + ondelete="restrict", + help="queue_job channel used for export jobs.", + ) + + @api.depends("host", "port", "database", "user", "is_active") + def _compute_display_name(self): + """Compute human-readable configuration name. + + Format: + host:port/database (user) [active] + """ + for rec in self: + base = ( + f"{rec.host or ''}:{rec.port or ''}/" + f"{rec.database or ''} ({rec.user or ''})" + ) + rec.display_name = f"{base} [active]" if rec.is_active else base + + @api.model + def get_active_config(self): + """Return the currently active ClickHouse configuration. + + :return: Active configuration record or None + :rtype: Optional[AuditlogClickhouseConfig] + """ + config = self.search([("is_active", "=", True)], limit=1) + _logger.debug( + "auditlog_clickhouse_write: get_active_config -> %s", + config.id if config else None, + ) + return config + + def _deactivate_other_configs(self): + """Deactivate all other active configurations. + + Ensures single-active configuration rule. + """ + other_configs = self.search( + [("is_active", "=", True), ("id", "not in", self.ids)] + ) + if other_configs: + _logger.info( + "auditlog_clickhouse_write: deactivating " + "other configs %s (activated=%s)", + other_configs.ids, + self.ids, + ) + other_configs.write({"is_active": False}) + + @api.onchange("is_active") + def _onchange_is_active(self): + """Display warning when activating configuration. + + Warns user that: + - Logs will start exporting immediately. + - Another active config will be deactivated. + + :return: Warning action dictionary or None + :rtype: Optional[Dict[str, Any]] + """ + for rec in self: + if not rec.is_active or (rec._origin and rec._origin.is_active): + continue + + disclaimer = rec.env._( + "As soon as this connection to ClickHouse is activated, all log entries" + " from that moment will be stored in the configured ClickHouse" + " database.\n\n Only one connection can be active at a time." + ) + + domain = [("is_active", "=", True)] + if rec.id: + domain.append(("id", "!=", rec.id)) + + other = rec.env["auditlog.clickhouse.config"].sudo().search(domain, limit=1) + if other: + message = rec.env._( + "%s\n\nIf you save this configuration as active, " + "the currently active one will be deactivated:\n- %s" + ) % (disclaimer, other.display_name) + return { + "warning": { + "title": rec.env._("ClickHouse activation"), + "message": message, + } + } + + return { + "warning": { + "title": rec.env._("ClickHouse activation"), + "message": disclaimer, + } + } + + @api.model_create_multi + def create(self, vals_list): + """Create configuration records. + + Enforces single active configuration rule after creation. + + :param vals_list: List of record values + :type vals_list: List[Dict[str, Any]] + + :return: Created records + :rtype: AuditlogClickhouseWriteConfig + """ + records = super().create(vals_list) + active_records = records.filtered("is_active") + if active_records: + _logger.info( + "auditlog_clickhouse_write: created active config(s) %s", + active_records.ids, + ) + active_records._deactivate_other_configs() + else: + _logger.debug( + "auditlog_clickhouse_write: created config(s) %s", records.ids + ) + return records + + def write(self, vals): + """Update configuration record(s). + + If activation flag is enabled, ensures other configs are deactivated. + + :param vals: Field values to update + :type vals: Dict[str, Any] + + :return: True if write succeeds + :rtype: bool + """ + turning_on = vals.get("is_active") is True + result = super().write(vals) + + if turning_on: + activated = self.filtered("is_active") + _logger.info( + "auditlog_clickhouse_write: activated config(s) %s (via write)", + activated.ids, + ) + activated._deactivate_other_configs() + else: + _logger.debug( + "auditlog_clickhouse_write: updated config(s) %s (vals=%s)", + self.ids, + sorted(vals.keys()), + ) + + return result + + @api.constrains("database") + def _check_database(self): + """ + Validate CH database identifier to prevent SQL injection/malformed queries. + + :raises ValidationError: If the DB name does not match the allowed pattern. + """ + for rec in self: + db = rec.database or "" + if not self.DB_NAME_RE.match(db): + raise ValidationError( + rec.env._( + "Invalid database name. Allowed characters: " + "letters, digits, underscore. " + "Must start with a letter or underscore." + ) + ) + + def action_test_connection(self): + """Test ClickHouse connectivity using simple SELECT query. + + :raises UserError: If connection fails + + :return: Odoo notification action + :rtype: Dict[str, Any] + """ + self.ensure_one() + _logger.info( + "auditlog_clickhouse_write: testing connection " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + + client = self._get_client() + try: + client.execute("SELECT 1") + except Exception as exc: + _logger.exception( + "auditlog_clickhouse_write: connection test FAILED " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + raise UserError( + self.env._("ClickHouse connection failed: %s") % exc + ) from exc + + _logger.info( + "auditlog_clickhouse_write: connection test OK " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + + return self._notify( + title=self.env._("Success"), + message=self.env._("Connection to ClickHouse is OK."), + notif_type="success", + ) + + def action_create_auditlog_tables(self): + """Create required ClickHouse tables if not present. + + Executes predefined DDL statements. + + :raises UserError: If DDL execution fails + + :return: Odoo notification action + :rtype: Dict[str, Any] + """ + self.ensure_one() + _logger.info( + "auditlog_clickhouse_write: creating tables (config=%s db=%s host=%s:%s)", + self.id, + self.database, + self.host, + self.port, + ) + + client = self._get_client() + try: + for statement in self._get_clickhouse_ddl(): + preview = " ".join(statement.strip().splitlines())[:120] + _logger.debug( + "auditlog_clickhouse_write: executing DDL (config=%s): %s...", + self.id, + preview, + ) + client.execute(statement) + except Exception as exc: + _logger.exception( + "auditlog_clickhouse_write: create tables FAILED " + "(config=%s db=%s host=%s:%s)", + self.id, + self.database, + self.host, + self.port, + ) + raise UserError( + self.env._("Failed to create ClickHouse tables: %s") % exc + ) from exc + + _logger.info( + "auditlog_clickhouse_write: create tables OK (config=%s db=%s)", + self.id, + self.database, + ) + + return self._notify( + title=self.env._("Success"), + message=self.env._("Auditlog tables were created (if they did not exist)."), + notif_type="success", + ) + + def _get_client(self): + """Build clickhouse-driver client from configuration. + + :return: Configured ClickHouse client instance + :rtype: clickhouse_driver.Client + """ + self.ensure_one() + _logger.debug( + "auditlog_clickhouse_write: building client " + "(config=%s host=%s port=%s db=%s user=%s)", + self.id, + self.host, + self.port, + self.database, + self.user, + ) + return get_clickhouse_client( + host=self.host, + port=self.port, + database=self.database, + user=self.user, + password=self.password, + ) + + def _get_clickhouse_ddl(self): + """Return ClickHouse DDL statements. + + Includes: + - auditlog_log + - auditlog_log_line + + :return: List of DDL SQL statements + :rtype: List[str] + """ + self.ensure_one() + db_name = self.database + + return [ + f""" + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log + ( + id Int64, + name Nullable(String), + model_id Int32, + model_name Nullable(String), + model_model String, + res_id Nullable(Int64), + res_ids Nullable(String), + user_id Int32, + method String, + http_request_id Nullable(Int64), + http_session_id Nullable(Int64), + log_type Nullable(String), + create_date DateTime64(3, 'UTC'), + create_uid Int32, + write_date Nullable(DateTime64(3, 'UTC')), + write_uid Nullable(Int32) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, + f""" + CREATE TABLE IF NOT EXISTS {db_name}.auditlog_log_line + ( + id Int64, + log_id Int64, + field_id Int32, + field_name Nullable(String), + field_description Nullable(String), + old_value Nullable(String), + new_value Nullable(String), + old_value_text Nullable(String), + new_value_text Nullable(String), + create_date DateTime64(3, 'UTC'), + create_uid Int32, + write_date Nullable(DateTime64(3, 'UTC')), + write_uid Nullable(Int32) + ) + ENGINE = MergeTree + ORDER BY (create_date, id) + """, + ] + + @staticmethod + def _notify(*, title, message, notif_type="info"): + """Build standard Odoo display_notification action. + + :param title: Notification title + :type title: str + :param message: Notification message + :type message: str + :param notif_type: Notification type, defaults to "info" + :type notif_type: str, optional + + :return: Odoo client action dictionary + :rtype: Dict[str, Any] + """ + return { + "type": "ir.actions.client", + "tag": "display_notification", + "params": { + "title": title, + "message": message, + "type": notif_type, + "sticky": False, + }, + } diff --git a/auditlog_clickhouse_write/models/auditlog_log_buffer.py b/auditlog_clickhouse_write/models/auditlog_log_buffer.py new file mode 100644 index 00000000000..f7429a352ea --- /dev/null +++ b/auditlog_clickhouse_write/models/auditlog_log_buffer.py @@ -0,0 +1,697 @@ +import json +import logging +from datetime import datetime, timezone +from typing import Any + +from dateutil import parser as dt_parser + +from odoo import api, fields, models +from odoo.tools import SQL + +from odoo.addons.queue_job.exception import RetryableJobError + +_logger = logging.getLogger(__name__) + +JsonMapping = dict[str, Any] +ChRow = tuple[Any, ...] + + +class AuditlogLogBuffer(models.Model): + """ + Buffered audit log payloads waiting to be flushed into ClickHouse. + + Each record stores a pre-built payload produced by the auditlog.rule override. + Export is asynchronous: + + - A cron enqueues a queue_job. + - The queue_job locks pending buffer rows (FOR UPDATE SKIP LOCKED), + converts payloads to ClickHouse tuples and inserts them in batches. + - Successfully flushed buffer rows are removed from PostgreSQL. + + Design notes: + - This model is an internal queue; no user-facing ACLs should be provided. + - queue_job provides retries/backoff when ClickHouse is slow/unavailable. + """ + + _name = "auditlog.log.buffer" + _description = "Auditlog ClickHouse Buffer" + _order = "create_date asc, id asc" + + STATE_PENDING = "pending" + STATE_ERROR = "error" + EXISTING_ROWS_CHUNK_SIZE = 2000 + + # Column order MUST match CREATE TABLE schema and inserted tuples. + _CH_LOG_COLUMNS: tuple[str, ...] = ( + "id", + "name", + "model_id", + "model_name", + "model_model", + "res_id", + "res_ids", + "user_id", + "method", + "http_request_id", + "http_session_id", + "log_type", + "create_date", + "create_uid", + "write_date", + "write_uid", + ) + _CH_LINE_COLUMNS: tuple[str, ...] = ( + "id", + "log_id", + "field_id", + "field_name", + "field_description", + "old_value", + "new_value", + "old_value_text", + "new_value_text", + "create_date", + "create_uid", + "write_date", + "write_uid", + ) + + _INVALID_PAYLOAD_MESSAGE = ( + "Invalid payload structure (expected object with 'log' and 'lines')." + ) + + @api.model + def _selection_state(self): + """Return centralized state selection values. + + :return: List of (value, label) tuples + :rtype: List[Tuple[str, str]] + """ + return [ + (self.STATE_PENDING, self.env._("Pending")), + (self.STATE_ERROR, self.env._("Error")), + ] + + payload_json = fields.Json(required=True) + state = fields.Selection( + selection=lambda self: self._selection_state(), + default=lambda self: self.STATE_PENDING, + required=True, + index=True, + ) + attempt_count = fields.Integer(default=0, required=True) + error_message = fields.Text() + + @staticmethod + def _to_ch_nullable_string(value): + """Convert value to ClickHouse Nullable(String). + + Rules: + - None/False -> None + - str -> unchanged + - list/dict/tuple -> JSON string + - other -> str(value) + + :param value: Any value + :type value: Any + :return: Nullable string value + :rtype: Optional[str] + """ + if value is None or value is False: + return None + if isinstance(value, str): + return value + if isinstance(value, (dict | list | tuple)): + return json.dumps(value, ensure_ascii=False, default=str) + return str(value) + + @staticmethod + def _to_ch_datetime_utc(value): + """Convert value to timezone-aware UTC datetime. + + Normalizes to UTC for ClickHouse DateTime64(3, 'UTC'). + + :param value: Incoming datetime or string + :type value: Any + :return: UTC-aware datetime or None + :rtype: Optional[datetime] + """ + if not value: + return None + + if isinstance(value, datetime): + parsed = value + else: + raw = str(value).strip().replace("Z", "+00:00") + try: + parsed = dt_parser.parse(raw) + except (ValueError, TypeError, OverflowError): + # Fallback: Odoo parser usually returns naive datetime. + parsed = fields.Datetime.from_string(value) + + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + @staticmethod + def _ch_format_in_list(values): + """Format a Python iterable as a ClickHouse IN (...) list. + + Used to build a safe-ish `IN (...)` fragment for identifiers when + parametrization cannot be applied (e.g. identifiers in ClickHouse SQL). + + Rules: + - None values are removed. + - If all values are ints -> rendered without quotes. + - Otherwise values are rendered as single-quoted strings with minimal + escaping for backslashes and single quotes. + + :param values: Iterable of values (ints/strings) to include in IN list. + :type values: Iterable[Any] + :return: Comma-separated list content (without surrounding parentheses). + :rtype: str + """ + cleaned = [v for v in values if v is not None] + if not cleaned: + return "" + # ints -> no quotes + if all(isinstance(v, int) for v in cleaned): + return ", ".join(str(v) for v in cleaned) + + # everything else -> single-quoted + def _q(v): + s = str(v).replace("\\", "\\\\").replace("'", "\\'") + return f"'{s}'" + + return ", ".join(_q(v) for v in cleaned) + + @staticmethod + def _chunk_values(values, chunk_size): + """Yield chunks from a sequence. + + :param list values: Source values to split into chunks. + :param int chunk_size: Maximum size of one chunk. + :yield: Chunk of source values. + :rtype: list + """ + for index in range(0, len(values), chunk_size): + yield values[index : index + chunk_size] + + def _filter_existing_rows(self, client, config, table_name, rows): + """Filter out rows already present in ClickHouse by id. + + This helper makes inserts idempotent on retry: if part of a multi-table + insert succeeds (e.g. auditlog_log) and another part fails (e.g. lines), + the next retry must not re-insert already stored rows. + + The existence check is executed in chunks to avoid oversized ClickHouse + queries caused by a very large ``IN (...)`` clause. + + :param client: ClickHouse client. + :type client: clickhouse_driver.Client + :param config: Active configuration (provides database name). + :type config: AuditlogClickhouseConfig + :param table_name: ClickHouse table name (without database prefix). + :type table_name: str + :param rows: Prepared ClickHouse rows. + :type rows: List[ChRow] + :return: Rows that are not yet present in ClickHouse. + :rtype: List[ChRow] + """ + if not rows: + return rows + + ids = list(dict.fromkeys(row[0] for row in rows if row[0] is not None)) + if not ids: + return rows + + existing_ids = set() + for ids_chunk in self._chunk_values(ids, self.EXISTING_ROWS_CHUNK_SIZE): + in_list = self._ch_format_in_list(ids_chunk) + if not in_list: + continue + query = ( + f"SELECT id FROM {config.database}.{table_name} " + f"WHERE id IN ({in_list})" + ) + existing = client.execute(query) or [] + existing_ids.update(row[0] for row in existing) + if not existing_ids: + return rows + return [row for row in rows if row[0] not in existing_ids] + + def _set_error(self, message): + """Mark buffer records as error and increment attempt counter. + + :param message: Error message + :type message: str + """ + for rec in self: + rec.write( + { + "state": self.STATE_ERROR, + "attempt_count": rec.attempt_count + 1, + "error_message": message, + } + ) + + @api.model + def _lock_pending_buffers(self, batch_size): + """Fetch and lock pending buffers using FOR UPDATE SKIP LOCKED. + + Prevents concurrent workers from selecting the same rows. + + :param batch_size: Maximum number of rows to fetch + :type batch_size: int + :return: Locked buffer recordset + :rtype: AuditlogLogBuffer + """ + query = SQL( + """ + SELECT id + FROM %s + WHERE state = %s + ORDER BY id + FOR UPDATE SKIP LOCKED + LIMIT %s + """, + SQL.identifier(self._table), + self.STATE_PENDING, + batch_size, + ) + self.env.cr.execute(query) + ids = [row[0] for row in self.env.cr.fetchall()] + return self.browse(ids) + + @api.model + def _cron_flush_to_clickhouse(self, batch_size=None): + """Schedule queue_job to flush buffered rows. + + Does not perform ClickHouse INSERT directly. + + :param batch_size: Optional override batch size + :type batch_size: Optional[int] + :return: True (cron compatibility) + :rtype: bool + """ + config = self.env["auditlog.clickhouse.config"].sudo().get_active_config() + if not config: + _logger.warning( + "auditlog_clickhouse_write: cron flush " "skipped (no active config)" + ) + return True + + effective_batch = int(batch_size or config.queue_batch_size or 0) or 1000 + + if not self.sudo().search([("state", "=", self.STATE_PENDING)], limit=1): + _logger.debug( + "auditlog_clickhouse_write: cron flush skipped (no pending buffers)" + ) + return True + + channel_name = ( + config.queue_channel_id.complete_name + if config.queue_channel_id + and getattr(config.queue_channel_id, "complete_name", None) + else "root" + ) + + _logger.info( + "auditlog_clickhouse_write: enqueue flush job " + "(config=%s channel=%s batch_size=%s)", + config.id, + channel_name, + effective_batch, + ) + + self.sudo().with_delay( + channel=channel_name, + description=f"auditlog_clickhouse_write: " + f"flush buffers (config={config.id})", + )._job_flush_to_clickhouse(config.id, effective_batch) + + return True + + @api.model + def _get_active_config_for_job(self, config_id): + """Return active config for job execution. + + :param config_id: Configuration record ID + :type config_id: int + :return: Active config or None + :rtype: Optional[AuditlogClickhouseConfig] + """ + config = self.env["auditlog.clickhouse.config"].sudo().browse(config_id) + if not config or not config.exists() or not config.is_active: + _logger.info( + "auditlog_clickhouse_write: job skipped " + "(config missing or not active) (config_id=%s)", + config_id, + ) + return None + return config + + @classmethod + def _payload_is_valid(cls, payload): + """Validate payload structure before processing. + + Ensures minimal required fields exist to avoid endless retry loops. + + :param payload: Parsed JSON payload + :type payload: Any + :return: True if valid + :rtype: bool + """ + if not isinstance(payload, dict): + return False + + log_data = payload.get("log") + lines_data = payload.get("lines") + + if not isinstance(log_data, dict) or not isinstance(lines_data, list): + return False + + # Minimal required log fields (to avoid CH insert failures forever) + required = ( + "id", + "model_id", + "model_model", + "user_id", + "method", + "create_date", + "create_uid", + ) + for key in required: + if not log_data.get(key): + return False + + # Lines must be a list of dicts (if any line is broken -> whole payload invalid) + return all(isinstance(line, dict) for line in lines_data) + + def _collect_rows_from_buffers(self, buffers): + """Extract ClickHouse rows from buffer payloads. + + :param buffers: Buffer recordset + :type buffers: AuditlogLogBuffer + :return: (valid_buffers, invalid_buffers, log_rows, line_rows) + :rtype: Tuple[AuditlogLogBuffer, AuditlogLogBuffer, List[ChRow], List[ChRow]] + """ + log_rows: list[ChRow] = [] + line_rows: list[ChRow] = [] + invalid_buffers = self.browse() + + for rec in buffers: + payload = rec.payload_json + + if not self._payload_is_valid(payload): + invalid_buffers |= rec + continue + + log_data = payload["log"] + lines_data = payload["lines"] + + log_rows.append(self._build_ch_log_row(log_data)) + for line_data in lines_data: + line_rows.append(self._build_ch_line_row(line_data)) + + valid_buffers = buffers - invalid_buffers + return valid_buffers, invalid_buffers, log_rows, line_rows + + def _mark_invalid_buffers(self, invalid_buffers, config): + """Mark invalid payload buffers as error. + + :param invalid_buffers: Buffers to mark + :type invalid_buffers: AuditlogLogBuffer + :param config: Active configuration + :type config: AuditlogClickhouseConfig + """ + if not invalid_buffers: + return + invalid_buffers._set_error(self.env._(self._INVALID_PAYLOAD_MESSAGE)) + _logger.warning( + "auditlog_clickhouse_write: invalid payloads=%s (marked error) (config=%s)", + len(invalid_buffers), + config.id, + ) + + def _insert_rows_to_clickhouse( + self, client, config, log_rows, line_rows, valid_buffers + ): + """Insert prepared rows into ClickHouse. + + Raises RetryableJobError on failure. + + :param client: ClickHouse client + :type client: clickhouse_driver.Client + :param config: Active configuration + :type config: AuditlogClickhouseConfig + :param log_rows: Rows for auditlog_log + :type log_rows: List[ChRow] + :param line_rows: Rows for auditlog_log_line + :type line_rows: List[ChRow] + :param valid_buffers: Successfully processed buffers + :type valid_buffers: AuditlogLogBuffer + """ + log_rows_to_insert = log_rows + line_rows_to_insert = line_rows + if log_rows: + log_rows_to_insert = self._filter_existing_rows( + client, config, "auditlog_log", log_rows + ) + if line_rows: + line_rows_to_insert = self._filter_existing_rows( + client, config, "auditlog_log_line", line_rows + ) + + if not log_rows_to_insert and not line_rows_to_insert: + _logger.warning( + "auditlog_clickhouse_write: nothing to insert " + "(config_id=%s buffers=%s)", + config.id, + len(valid_buffers), + ) + return + + try: + if log_rows_to_insert: + client.execute( + f"INSERT INTO {config.database}.auditlog_log (" + f"{', '.join(self._CH_LOG_COLUMNS)}) VALUES", + log_rows_to_insert, + ) + if line_rows_to_insert: + client.execute( + f"INSERT INTO {config.database}.auditlog_log_line (" + f"{', '.join(self._CH_LINE_COLUMNS)}) VALUES", + line_rows_to_insert, + ) + except Exception as exc: + _logger.exception( + "auditlog_clickhouse_write: INSERT failed (will retry) " + "(config=%s buffers=%s logs=%s lines=%s)", + config.id, + len(valid_buffers), + len(log_rows_to_insert), + len(line_rows_to_insert), + ) + raise RetryableJobError( + f"ClickHouse insert failed: {exc}", + seconds=60, + ) from exc + + def _delete_flushed_buffers(self, valid_buffers, config): + """Delete flushed buffer rows from PostgreSQL. + + If deletion fails, mark them as error. + + :param valid_buffers: Buffers to delete + :type valid_buffers: AuditlogLogBuffer + :param config: Active configuration + :type config: AuditlogClickhouseConfig + """ + if not valid_buffers: + _logger.warning( + "auditlog_clickhouse_write: no flushed " + "buffers to delete (config_id=%s)", + config.id, + ) + return + + try: + valid_buffers.unlink() + except Exception as exc: + _logger.exception( + "auditlog_clickhouse_write: failed to delete flushed buffers " + "(config=%s buffers=%s)", + config.id, + len(valid_buffers), + ) + valid_buffers._set_error( + self.env._("Flushed to ClickHouse but failed to delete buffer rows: %s") + % exc + ) + else: + _logger.info( + "auditlog_clickhouse_write: job flushed batch " + "(config=%s flushed_buffers=%s)", + config.id, + len(valid_buffers), + ) + + def _enqueue_next_flush_job_if_needed(self, config, batch_size): + """Schedule next flush job if pending buffers remain. + + :param config: Active configuration + :type config: AuditlogClickhouseConfig + :param batch_size: Batch size + :type batch_size: int + """ + if not self.sudo().search([("state", "=", self.STATE_PENDING)], limit=1): + return + + channel_name = ( + config.queue_channel_id.complete_name + if config.queue_channel_id + and getattr(config.queue_channel_id, "complete_name", None) + else "root" + ) + _logger.debug( + "auditlog_clickhouse_write: more pending buffers detected, " + "enqueue next job (config=%s channel=%s batch_size=%s)", + config.id, + channel_name, + batch_size, + ) + # NOTE: pass config.id (not recordset) to keep queue_job args JSON-serializable + # and re-check config existence/is_active at execution time. + self.sudo().with_delay( + channel=channel_name, + description=f"auditlog_clickhouse_write: " + f"flush buffers (config={config.id})", + )._job_flush_to_clickhouse(config.id, int(batch_size)) + + @api.model + def _job_flush_to_clickhouse(self, config_id, batch_size): + """Queue job: flush one batch of buffers into ClickHouse. + + Steps: + - Lock pending buffers + - Validate payloads + - Build CH rows + - INSERT into ClickHouse (retryable) + - Delete flushed buffers + - Mark invalid buffers + - Enqueue next batch if needed + + :param config_id: Active config ID + :type config_id: int + :param batch_size: Batch size + :type batch_size: int + """ + config = self._get_active_config_for_job(config_id) + if not config: + return + + pending_buffers = self.sudo()._lock_pending_buffers(int(batch_size)) + if not pending_buffers: + _logger.debug( + "auditlog_clickhouse_write: job no-op (no pending buffers) (config=%s)", + config.id, + ) + return + + valid_buffers, invalid_buffers, log_rows, line_rows = ( + self._collect_rows_from_buffers(pending_buffers) + ) + + # Nothing valid: just mark invalids and exit successfully. + if not valid_buffers: + self._mark_invalid_buffers(invalid_buffers, config) + return + + client = config._get_client() + self._insert_rows_to_clickhouse( + client=client, + config=config, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + # Delete flushed buffers; if deletion fails, + # mark them as error to avoid re-inserts. + self._delete_flushed_buffers(valid_buffers, config) + + # Mark invalid ones only after successful CH insert + # (so RetryableJobError doesn't rollback the marking) + self._mark_invalid_buffers(invalid_buffers, config) + + # Continue draining queue + self._enqueue_next_flush_job_if_needed(config, int(batch_size)) + + @classmethod + def _build_ch_log_row(cls, log_data): + """Convert payload['log'] dict to ClickHouse tuple. + + Order must match _CH_LOG_COLUMNS. + + :param log_data: Log dictionary + :type log_data: Dict[str, Any] + :return: ClickHouse row tuple + :rtype: ChRow + """ + return ( + int(log_data.get("id") or 0), + cls._to_ch_nullable_string(log_data.get("name")), + int(log_data.get("model_id") or 0), + cls._to_ch_nullable_string(log_data.get("model_name")), + (log_data.get("model_model") or "unknown"), + int(log_data.get("res_id") or 0) + if log_data.get("res_id") is not None + else None, + cls._to_ch_nullable_string(log_data.get("res_ids")), + int(log_data.get("user_id") or 0), + (log_data.get("method") or "unknown"), + int(log_data.get("http_request_id") or 0) + if log_data.get("http_request_id") is not None + else None, + int(log_data.get("http_session_id") or 0) + if log_data.get("http_session_id") is not None + else None, + cls._to_ch_nullable_string(log_data.get("log_type")), + cls._to_ch_datetime_utc(log_data.get("create_date")), + int(log_data.get("create_uid") or 0), + cls._to_ch_datetime_utc(log_data.get("write_date")), + int(log_data.get("write_uid") or 0) + if log_data.get("write_uid") is not None + else None, + ) + + @classmethod + def _build_ch_line_row(cls, line_data): + """Convert payload['lines'][] dict to ClickHouse tuple. + + Order must match _CH_LINE_COLUMNS. + + :param line_data: Line dictionary + :type line_data: Dict[str, Any] + :return: ClickHouse row tuple + :rtype: ChRow + """ + return ( + int(line_data.get("id") or 0), + int(line_data.get("log_id") or 0), + int(line_data.get("field_id") or 0), + cls._to_ch_nullable_string(line_data.get("field_name")), + cls._to_ch_nullable_string(line_data.get("field_description")), + cls._to_ch_nullable_string(line_data.get("old_value")), + cls._to_ch_nullable_string(line_data.get("new_value")), + cls._to_ch_nullable_string(line_data.get("old_value_text")), + cls._to_ch_nullable_string(line_data.get("new_value_text")), + cls._to_ch_datetime_utc(line_data.get("create_date")), + int(line_data.get("create_uid") or 0), + cls._to_ch_datetime_utc(line_data.get("write_date")), + int(line_data.get("write_uid") or 0) + if line_data.get("write_uid") is not None + else None, + ) diff --git a/auditlog_clickhouse_write/models/auditlog_rule.py b/auditlog_clickhouse_write/models/auditlog_rule.py new file mode 100644 index 00000000000..803045143a4 --- /dev/null +++ b/auditlog_clickhouse_write/models/auditlog_rule.py @@ -0,0 +1,615 @@ +import logging +import time +from collections.abc import Mapping +from datetime import date, datetime, timezone +from decimal import Decimal +from typing import Any, TypedDict + +from odoo import models + +from odoo.addons.auditlog.models.rule import EMPTY_DICT, FIELDS_BLACKLIST, DictDiffer + +_logger = logging.getLogger(__name__) + + +class _PayloadLog(TypedDict, total=False): + """Structured payload for auditlog_log row before buffering.""" + + id: str + name: str | None + model_id: int + model_name: str | None + model_model: str + res_id: int | None + res_ids: str | None + user_id: int + method: str + http_request_id: int | None + http_session_id: int | None + log_type: str | None + create_date: str + create_uid: int + + +class _PayloadLine(TypedDict, total=False): + """Structured payload for auditlog_log_line row before buffering.""" + + id: str + log_id: str + field_id: int + field_name: str | None + field_description: str | None + old_value: Any | None + new_value: Any | None + old_value_text: Any | None + new_value_text: Any | None + create_date: str + create_uid: int + + +class _Payload(TypedDict): + """Full payload stored in auditlog.log.buffer.""" + + log: _PayloadLog + lines: list[_PayloadLine] + + +def _json_sanitize(obj): + """Convert value to JSON-serializable structure. + + Handles: + - datetime/date -> ISO string + - Decimal -> float + - bytes -> UTF-8 string + - recordsets -> list of ids + - mappings/sequences -> recursively sanitized + - unknown types -> string representation + + :param obj: Arbitrary value + :type obj: Any + :return: JSON-safe value + :rtype: Any + """ + if obj is None or isinstance(obj, (str | int | float | bool)): + return obj + + if isinstance(obj, (datetime | date)): + return obj.isoformat() + + if isinstance(obj, Decimal): + return float(obj) + + if isinstance(obj, bytes): + return obj.decode("utf-8", errors="replace") + + if isinstance(obj, models.BaseModel): + return list(obj.ids) + + if isinstance(obj, Mapping): + return {str(k): _json_sanitize(v) for k, v in obj.items()} + + if isinstance(obj, (list | tuple | set)): + return [_json_sanitize(v) for v in obj] + + return str(obj) + + +class AuditlogRule(models.Model): + _inherit = "auditlog.rule" + + def _next_ids(self, seq_name: str, count: int) -> list[int]: + if count <= 0: + return [] + self.env.cr.execute( + "SELECT nextval(%s::regclass) FROM generate_series(1, %s)", + (seq_name, count), + ) + return [row[0] for row in self.env.cr.fetchall()] + + def _get_rule_settings(self, model_id): + """Return rule settings for given model. + + Computes: + - Union of excluded field names + - Whether record capture is enabled + + Result is cached on registry pool to avoid repeated DB lookups. + + :param model_id: ir.model record ID + :type model_id: int + :return: (excluded_fields_set, capture_record_flag) + :rtype: Tuple[Set[str], bool] + """ + if not hasattr(self.pool, "_auditlog_clickhouse_write_rule_cache"): + self.pool._auditlog_clickhouse_write_rule_cache = {} + cache = self.pool._auditlog_clickhouse_write_rule_cache + + rules = self.sudo().filtered(lambda r: r.model_id.id == model_id) + if not rules: + domain = [("model_id", "=", model_id)] + if "state" in self._fields: + domain.append(("state", "=", "subscribed")) + rules = self.sudo().search(domain) + + stamp = tuple( + ( + rule.id, + bool(rule.capture_record), + tuple(sorted(rule.fields_to_exclude_ids.ids)), + rule.write_date or rule.create_date, + ) + for rule in rules.sorted("id") + ) + key = (model_id, stamp) + if key in cache: + return cache[key] + + excluded: set[str] = set(FIELDS_BLACKLIST) + capture_record = False + + if len(rules) > 1: + _logger.warning( + "auditlog_clickhouse_write: multiple rules found for model_id=%s " + "(rules=%s); using union of excluded fields and any(capture_record).", + model_id, + rules.ids, + ) + for rule in rules: + excluded |= set(rule.fields_to_exclude_ids.mapped("name")) + capture_record = capture_record or bool(rule.capture_record) + + cache[key] = (excluded, capture_record) + return cache[key] + + def _get_audit_model_id(self, res_model): + """Resolve ir.model ID for given model name. + + Prefers auditlog in-memory cache, falls back to ir.model lookup. + + :param res_model: Technical model name (e.g. "res.partner") + :type res_model: str + :return: ir.model record ID + :rtype: int + """ + model_id = getattr(self.pool, "_auditlog_model_cache", {}).get(res_model) + if model_id: + return int(model_id) + return int(self.env["ir.model"].sudo()._get(res_model).id) + + def _dump_payload_json(self, payload): + """Sanitize payload for storage in fields.Json column. + + :param payload: Raw payload dictionary + :type payload: Dict[str, Any] + :return: JSON-serializable payload + :rtype: Dict[str, Any] + """ + return _json_sanitize(payload) + + def _get_http_ids(self): + """Return current auditlog HTTP identifiers (same as base auditlog). + + :return: (http_request_id, http_session_id) + :rtype: Tuple[Optional[int], Optional[int]] + """ + http_request_id = ( + self.env["auditlog.http.request"].current_http_request() or None + ) + http_session_id = ( + self.env["auditlog.http.session"].current_http_session() or None + ) + return http_request_id, http_session_id + + def _build_base_log( + self, + *, + uid, + method, + model_id, + now_iso, + log_type, + ): + """Build base log mapping shared across payloads. + + :param uid: User ID performing operation + :type uid: int + :param method: ORM method name + :type method: str + :param model_id: ir.model record ID + :type model_id: int + :param now_iso: UTC ISO timestamp with milliseconds + :type now_iso: str + :param log_type: Audit log type (from rule/additional values) + :type log_type: Any + :return: Base log dict + :rtype: Dict[str, Any] + """ + model_rec = self.env["ir.model"].sudo().browse(model_id) + http_request_id, http_session_id = self._get_http_ids() + + return { + "model_id": int(model_id), + "model_name": model_rec.name, + "model_model": model_rec.model, + "user_id": int(uid), + "method": method, + "http_request_id": http_request_id, + "http_session_id": http_session_id, + "log_type": log_type, + "create_date": now_iso, + "create_uid": int(uid), + "write_date": None, + "write_uid": None, + } + + def _get_buffer_model(self): + """Return auditlog.log.buffer model used to store payloads. + + :return: auditlog.log.buffer model recordset (sudo + tracking_disable) + :rtype: odoo.models.BaseModel + """ + return ( + self.env["auditlog.log.buffer"].sudo().with_context(tracking_disable=True) + ) + + def _buffer_export_data_payload( + self, + *, + buffer_model, + base_log, + res_model, + res_ids, + started, + ): + """Store export_data payload (no lines) into the buffer. + + :param buffer_model: auditlog.log.buffer model recordset + :type buffer_model: odoo.models.BaseModel + :param base_log: Base log mapping + :type base_log: Dict[str, Any] + :param res_model: Model technical name + :type res_model: str + :param res_ids: Record IDs affected + :type res_ids: Sequence[int] + :param started: monotonic() timestamp when create_logs started + :type started: float + """ + log_id = int(self._next_ids("auditlog_log_id_seq", 1)[0]) + payload: _Payload = { + "log": { + "id": log_id, + "name": res_model, + "res_id": None, + "res_ids": str(list(res_ids)), + **base_log, + }, + "lines": [], + } + buffer_model.create([{"payload_json": self._dump_payload_json(payload)}]) + _logger.debug( + "auditlog_clickhouse_write: create_logs end export_data (elapsed=%.3fs)", + time.monotonic() - started, + ) + + def _select_line_builder_and_sources( + self, + *, + method, + include_lines_on_unlink, + old_values, + new_values, + ): + """Select line builder callback and value sources based on method. + + :param method: ORM method name + :type method: str + :param include_lines_on_unlink: Whether unlink should include lines + :type include_lines_on_unlink: bool + :param old_values: Values before change + :type old_values: Mapping[int, Mapping[str, Any]] + :param new_values: Values after change + :type new_values: Mapping[int, Mapping[str, Any]] + :return: (line_builder, values_src) + :rtype: Tuple[Optional[Callable], Tuple] + """ + if method == "create": + return self._prepare_log_line_vals_on_create, (new_values,) + if method == "read": + return self._prepare_log_line_vals_on_read, (old_values,) + if method == "write": + return self._prepare_log_line_vals_on_write, (old_values, new_values) + if include_lines_on_unlink: + return self._prepare_log_line_vals_on_read, (old_values,) + return None, () + + @staticmethod + def _fields_list_for_method( + *, + method, + include_lines_on_unlink, + diff: DictDiffer, + old_values, + res_id, + ): + """Return list of fields to build lines for given method. + + :param method: ORM method name + :type method: str + :param include_lines_on_unlink: Whether unlink should include lines + :type include_lines_on_unlink: bool + :param diff: DictDiffer for old/new values + :type diff: DictDiffer + :param old_values: Old values mapping + :type old_values: Mapping[int, Mapping[str, Any]] + :param res_id: Current record ID + :type res_id: int + :return: Iterable of field names + :rtype: Any + """ + if method == "create": + return diff.added() + if method == "read" or include_lines_on_unlink: + return old_values.get(res_id, EMPTY_DICT).keys() + if method == "write": + return diff.changed() + return () + + def _build_payloads_for_records( + self, + *, + uid, + res_model, + res_ids, + method, + model_id, + model_rs, + log_type, + now_iso, + base_log, + fields_to_exclude_set, + old_values, + new_values, + line_builder, + values_src, + include_lines_on_unlink, + ): + """Build (log, lines) payloads for each record. + + :return: (payloads, total_lines) + :rtype: Tuple[List[Tuple[_PayloadLog, List[_PayloadLine]]], int] + """ + log_ids = self._next_ids("auditlog_log_id_seq", len(res_ids)) + payloads: list[tuple[_PayloadLog, list[_PayloadLine]]] = [] + total_lines = 0 + + for idx, res_id in enumerate(res_ids): + log_id = int(log_ids[idx]) + record = model_rs.browse(res_id) + + log: _PayloadLog = { + "id": log_id, + "name": record.display_name, + "res_id": int(res_id), + "res_ids": None, + **base_log, + } + + diff = DictDiffer( + dict(new_values.get(res_id, EMPTY_DICT)), + dict(old_values.get(res_id, EMPTY_DICT)), + ) + + fields_list = self._fields_list_for_method( + method=method, + include_lines_on_unlink=include_lines_on_unlink, + diff=diff, + old_values=old_values, + res_id=res_id, + ) + + lines: list[_PayloadLine] = [] + if line_builder: + one_source = method in ("create", "read") or include_lines_on_unlink + log_ctx = {"res_id": res_id, "model_id": model_id, "log_type": log_type} + + for field_name in fields_list: + if field_name in fields_to_exclude_set: + continue + field = self._get_field(model_id, field_name) + if not field: + continue + + if one_source: + vals = line_builder(log_ctx, field, values_src[0]) + else: + vals = line_builder( + log_ctx, field, values_src[0], values_src[1] + ) + + lines.append( + { + "id": 0, + "log_id": log_id, + "field_id": int(field["id"]), + "field_name": field.get("name"), + "field_description": field.get("field_description"), + "old_value": vals.get("old_value"), + "new_value": vals.get("new_value"), + "old_value_text": vals.get("old_value_text"), + "new_value_text": vals.get("new_value_text"), + "create_date": now_iso, + "create_uid": int(uid), + "write_date": None, + "write_uid": None, + } + ) + + payloads.append((log, lines)) + total_lines += len(lines) + + return payloads, total_lines + + def _build_buffer_vals_from_payloads( + self, + *, + payloads, + total_lines, + method, + ): + """Assign line IDs and build buffer create vals. + + :param payloads: List of (log, lines) payloads + :type payloads: List[Tuple[_PayloadLog, List[_PayloadLine]]] + :param total_lines: Total number of line items across all payloads + :type total_lines: int + :param method: ORM method name + :type method: str + :return: Values list for auditlog.log.buffer.create() + :rtype: List[Dict[str, Any]] + """ + line_ids: list[int] = self._next_ids("auditlog_log_line_id_seq", total_lines) + pos = 0 + + buffer_vals_list: list[dict[str, Any]] = [] + for log, lines in payloads: + for line in lines: + line["id"] = int(line_ids[pos]) + pos += 1 + + if method == "unlink" or lines: + buffer_vals_list.append( + { + "payload_json": self._dump_payload_json( + {"log": log, "lines": lines} + ) + } + ) + + return buffer_vals_list + + def create_logs( + self, + uid, + res_model, + res_ids, + method, + old_values=None, + new_values=None, + additional_log_values=None, + ): + """Override auditlog log creation to buffer payloads for ClickHouse. + + Behavior: + - If no active ClickHouse config -> fallback to parent implementation. + - Otherwise: + - Build structured payload (log + lines) + - Sanitize to JSON + - Store in auditlog.log.buffer + - Export to ClickHouse is performed asynchronously via queue_job. + + Supported methods: + - create, read, write unlink export_data + + :param uid: User ID performing operation + :type uid: int + :param res_model: Model technical name + :type res_model: str + :param res_ids: Record IDs affected + :type res_ids: Sequence[int] + :param method: ORM method name + :type method: str + :param old_values: Values before change + :type old_values: Optional[Mapping[int, Mapping[str, Any]]] + :param new_values: Values after change + :type new_values: Optional[Mapping[int, Mapping[str, Any]]] + :param additional_log_values: Extra audit metadata + :type additional_log_values: Optional[Mapping[str, Any]] + """ + config = self.env["auditlog.clickhouse.config"].sudo().get_active_config() + if not config: + return super().create_logs( + uid, + res_model, + res_ids, + method, + old_values=old_values, + new_values=new_values, + additional_log_values=additional_log_values, + ) + + started = time.monotonic() + old_values = old_values or EMPTY_DICT + new_values = new_values or EMPTY_DICT + additional_log_values = dict(additional_log_values or {}) + log_type = additional_log_values.get("log_type") + + model_id = self._get_audit_model_id(res_model) + model_rs = self.env[res_model] + fields_to_exclude_set, capture_record = self._get_rule_settings(model_id) + + now_iso = datetime.now(timezone.utc).isoformat(timespec="milliseconds") + base_log = self._build_base_log( + uid=int(uid), + method=method, + model_id=int(model_id), + now_iso=now_iso, + log_type=log_type, + ) + buffer_model = self._get_buffer_model() + + # export_data is special (no lines) + if method == "export_data": + self._buffer_export_data_payload( + buffer_model=buffer_model, + base_log=base_log, + res_model=res_model, + res_ids=res_ids, + started=started, + ) + return + + include_lines_on_unlink = method == "unlink" and capture_record + line_builder, values_src = self._select_line_builder_and_sources( + method=method, + include_lines_on_unlink=include_lines_on_unlink, + old_values=old_values, + new_values=new_values, + ) + + payloads, total_lines = self._build_payloads_for_records( + uid=int(uid), + res_model=res_model, + res_ids=res_ids, + method=method, + model_id=int(model_id), + model_rs=model_rs, + log_type=log_type, + now_iso=now_iso, + base_log=base_log, + fields_to_exclude_set=fields_to_exclude_set, + old_values=old_values, + new_values=new_values, + line_builder=line_builder, + values_src=values_src, + include_lines_on_unlink=include_lines_on_unlink, + ) + + buffer_vals_list = self._build_buffer_vals_from_payloads( + payloads=payloads, + total_lines=total_lines, + method=method, + ) + + if buffer_vals_list: + buffer_model.create(buffer_vals_list) + + _logger.debug( + "auditlog_clickhouse_write: create_logs end (model=%s method=%s res_ids=%s " + "payloads=%s lines=%s elapsed=%.3fs)", + res_model, + method, + len(res_ids), + len(buffer_vals_list), + total_lines, + time.monotonic() - started, + ) diff --git a/auditlog_clickhouse_write/models/clickhouse_client.py b/auditlog_clickhouse_write/models/clickhouse_client.py new file mode 100644 index 00000000000..cf02786dc15 --- /dev/null +++ b/auditlog_clickhouse_write/models/clickhouse_client.py @@ -0,0 +1,62 @@ +from odoo import _ +from odoo.exceptions import UserError + +try: + from clickhouse_driver import Client as ClickHouseClient +except ImportError: + ClickHouseClient = None + + +def _require_driver() -> None: + """Ensure clickhouse-driver is available in the current environment. + + :raises UserError: If clickhouse-driver is not installed. + """ + if ClickHouseClient is None: + raise UserError( + _( + "Python package 'clickhouse-driver' is not available. " + "Install it in the Odoo environment to use ClickHouse storage." + ) + ) + + +def get_clickhouse_client( + *, + host, + port, + database, + user, + password=None, + settings=None, +) -> "ClickHouseClient": + """Create and return a clickhouse-driver Client instance. + + Uses native TCP protocol. + + :param host: ClickHouse hostname or IP + :type host: str + :param port: Native TCP port + :type port: int + :param database: Default database + :type database: str + :param user: ClickHouse username + :type user: str + :param password: ClickHouse password, optional + :type password: Optional[str] + :param settings: Optional clickhouse-driver settings mapping + :type settings: Optional[Mapping[str, Any]] + :return: Configured ClickHouse client + :rtype: clickhouse_driver.Client + :raises UserError: If clickhouse-driver is not installed + """ + _require_driver() + # `settings` is passed as-is to clickhouse-driver, keep it optional and immutable. + return ClickHouseClient( + host=host, + port=port, + database=database, + user=user, + password=password or "", + settings=dict(settings or {}), + ) diff --git a/auditlog_clickhouse_write/pyproject.toml b/auditlog_clickhouse_write/pyproject.toml new file mode 100644 index 00000000000..4231d0cccb3 --- /dev/null +++ b/auditlog_clickhouse_write/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["whool"] +build-backend = "whool.buildapi" diff --git a/auditlog_clickhouse_write/readme/CONFIGURE.md b/auditlog_clickhouse_write/readme/CONFIGURE.md new file mode 100644 index 00000000000..42f9462aef7 --- /dev/null +++ b/auditlog_clickhouse_write/readme/CONFIGURE.md @@ -0,0 +1,31 @@ +This module requires: + +- A reachable ClickHouse server. +- Python dependency `clickhouse-driver` available in the Odoo environment. +- A ClickHouse database created in advance (the module does **not** create databases/users/grants). +- A ClickHouse user with at least: + - `INSERT` and `CREATE TABLE` privileges on the target database. + +> ClickHouse installation (Docker guide): +> `https://clickhouse.com/docs/install/docker` + +Steps: + +- Make sure `clickhouse-driver` is available in your system. +- Install the module. +- Configure the connection parameters in Odoo: + - **Settings > Technical > Auditlog > Clickhouse configuration** + - Fill in the following parameters: + +| Field | +|:-----| +| Hostname or IP | +| TCP port | +| ClickHouse database name | +| ClickHouse user | +| ClickHouse Password | +| queue_job_batch_size (default = 1000) | +| channel_id (default root) | + +- Click **Test connection**. +- Optionally, click **Create Auditlog Tables** to create the tables in the target database. diff --git a/auditlog_clickhouse_write/readme/CONTEXT.md b/auditlog_clickhouse_write/readme/CONTEXT.md new file mode 100644 index 00000000000..af8999013bb --- /dev/null +++ b/auditlog_clickhouse_write/readme/CONTEXT.md @@ -0,0 +1,5 @@ +The auditlog module stores audit data in PostgreSQL. In production systems with extensive audit rules, these tables grow without limits, causing three issues: + +- Database bloat; +- Immutability gap: Members of group_auditlog_manager (implied by base.group_system) have full CRUD access to audit tables, allowing audit records to be altered or deleted via UI, ORM, or SQL; +- Performance overhead: Audit logging runs synchronously in the same transaction and performs multiple ORM create() calls, adding latency to audited operations. diff --git a/auditlog_clickhouse_write/readme/CONTRIBUTORS.md b/auditlog_clickhouse_write/readme/CONTRIBUTORS.md new file mode 100644 index 00000000000..f2ec264300f --- /dev/null +++ b/auditlog_clickhouse_write/readme/CONTRIBUTORS.md @@ -0,0 +1,4 @@ +- [Cetmix](https://cetmix.com/) + - Ivan Sokolov + - George Smirnov + - Dmitry Meita diff --git a/auditlog_clickhouse_write/readme/CREDITS.md b/auditlog_clickhouse_write/readme/CREDITS.md new file mode 100644 index 00000000000..ce94d167cfd --- /dev/null +++ b/auditlog_clickhouse_write/readme/CREDITS.md @@ -0,0 +1,3 @@ +The development of this module has been financially supported by: + +- Geschäftsstelle Sozialinfo diff --git a/auditlog_clickhouse_write/readme/DESCRIPTION.md b/auditlog_clickhouse_write/readme/DESCRIPTION.md new file mode 100644 index 00000000000..74bf7db4fd2 --- /dev/null +++ b/auditlog_clickhouse_write/readme/DESCRIPTION.md @@ -0,0 +1,4 @@ +This module implements buffered asynchronous transfers audit of logs from PostgreSQL to ClickHouse. +Storing audit data in a columnar database that is write-only prevents database bloat, makes audit records effectively +immutable, and allows for scaling to very large volumes of logs without slowing down normal transactions. +Audit logs are written asynchronously to reduce the load on business operations. diff --git a/auditlog_clickhouse_write/readme/USAGE.md b/auditlog_clickhouse_write/readme/USAGE.md new file mode 100644 index 00000000000..588ba32ef32 --- /dev/null +++ b/auditlog_clickhouse_write/readme/USAGE.md @@ -0,0 +1,7 @@ +Once auditlog_clickhouse_write is installed and configured: + +- Users perform tracked operations (create, write, unlink, read, export) on models with active auditlog.rule subscriptions. + This behavior is unchanged from the base auditlog module. +- Log data is serialized and stored in the local auditlog.log.buffer table instantly. The standard auditlog tables are not populated. +- Every 5 minutes (default), the Cron job runs, pushes data to ClickHouse, and cleans the local buffer. +- Data is permanently stored in ClickHouse and cannot be modified or deleted via Odoo. diff --git a/auditlog_clickhouse_write/security/ir.model.access.csv b/auditlog_clickhouse_write/security/ir.model.access.csv new file mode 100644 index 00000000000..22a4f6ee8dd --- /dev/null +++ b/auditlog_clickhouse_write/security/ir.model.access.csv @@ -0,0 +1,3 @@ +id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink +access_auditlog_clickhouse_config_manager,auditlog.clickhouse.config manager,model_auditlog_clickhouse_config,auditlog.group_auditlog_manager,1,1,1,1 +access_auditlog_log_buffer_system_read,auditlog.log.buffer system read,model_auditlog_log_buffer,base.group_system,1,0,0,0 diff --git a/auditlog_clickhouse_write/static/description/index.html b/auditlog_clickhouse_write/static/description/index.html new file mode 100644 index 00000000000..a8696ef1ca0 --- /dev/null +++ b/auditlog_clickhouse_write/static/description/index.html @@ -0,0 +1,533 @@ + + + + + +Store Audit Log in Clickhouse + + + +
+

Store Audit Log in Clickhouse

+ + +

Beta License: AGPL-3 OCA/server-tools Translate me on Weblate Try me on Runboat

+

This module implements buffered asynchronous transfers audit of logs +from PostgreSQL to ClickHouse. Storing audit data in a columnar database +that is write-only prevents database bloat, makes audit records +effectively immutable, and allows for scaling to very large volumes of +logs without slowing down normal transactions. Audit logs are written +asynchronously to reduce the load on business operations.

+

Table of contents

+ +
+

Use Cases / Context

+

The auditlog module stores audit data in PostgreSQL. In production +systems with extensive audit rules, these tables grow without limits, +causing three issues:

+
    +
  • Database bloat;
  • +
  • Immutability gap: Members of group_auditlog_manager (implied by +base.group_system) have full CRUD access to audit tables, allowing +audit records to be altered or deleted via UI, ORM, or SQL;
  • +
  • Performance overhead: Audit logging runs synchronously in the same +transaction and performs multiple ORM create() calls, adding latency +to audited operations.
  • +
+
+
+

Configuration

+

This module requires:

+
    +
  • A reachable ClickHouse server.
  • +
  • Python dependency clickhouse-driver available in the Odoo +environment.
  • +
  • A ClickHouse database created in advance (the module does not +create databases/users/grants).
  • +
  • A ClickHouse user with at least:
      +
    • INSERT and CREATE TABLE privileges on the target database.
    • +
    +
  • +
+ +
+ClickHouse installation (Docker guide): +https://clickhouse.com/docs/install/docker
+

Steps:

+
    +
  • Make sure clickhouse-driver is available in your system.
  • +
  • Install the module.
  • +
  • Configure the connection parameters in Odoo:
      +
    • Settings > Technical > Auditlog > Clickhouse configuration
    • +
    • Fill in the following parameters:
    • +
    +
  • +
+ +++ + + + + + + + + + + + + + + + + + + + + +
Field
Hostname or IP
TCP port
ClickHouse database name
ClickHouse user
ClickHouse Password
queue_job_batch_size (default = 1000)
channel_id (default root)
+
    +
  • Click Test connection.
  • +
  • Optionally, click Create Auditlog Tables to create the tables in +the target database.
  • +
+
+
+

Usage

+

Once auditlog_clickhouse_write is installed and configured:

+
    +
  • Users perform tracked operations (create, write, unlink, read, export) +on models with active auditlog.rule subscriptions. This behavior is +unchanged from the base auditlog module.
  • +
  • Log data is serialized and stored in the local auditlog.log.buffer +table instantly. The standard auditlog tables are not populated.
  • +
  • Every 5 minutes (default), the Cron job runs, pushes data to +ClickHouse, and cleans the local buffer.
  • +
  • Data is permanently stored in ClickHouse and cannot be modified or +deleted via Odoo.
  • +
+
+
+

Bug Tracker

+

Bugs are tracked on GitHub Issues. +In case of trouble, please check there if your issue has already been reported. +If you spotted it first, help us to smash it by providing a detailed and welcomed +feedback.

+

Do not contact contributors directly about support or help with technical issues.

+
+
+

Credits

+
+

Authors

+
    +
  • Cetmix
  • +
+
+
+

Contributors

+
    +
  • Cetmix
      +
    • Ivan Sokolov
    • +
    • George Smirnov
    • +
    • Dmitry Meita
    • +
    +
  • +
+
+
+

Other credits

+

The development of this module has been financially supported by:

+
    +
  • Geschäftsstelle Sozialinfo
  • +
+
+
+

Maintainers

+

This module is maintained by the OCA.

+ +Odoo Community Association + +

OCA, or the Odoo Community Association, is a nonprofit organization whose +mission is to support the collaborative development of Odoo features and +promote its widespread use.

+

This module is part of the OCA/server-tools project on GitHub.

+

You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

+
+
+
+ + diff --git a/auditlog_clickhouse_write/tests/__init__.py b/auditlog_clickhouse_write/tests/__init__.py new file mode 100644 index 00000000000..4b4eb8888e9 --- /dev/null +++ b/auditlog_clickhouse_write/tests/__init__.py @@ -0,0 +1,2 @@ +from . import test_auditlog_clickhouse_write +from . import test_clickhouse_config diff --git a/auditlog_clickhouse_write/tests/common.py b/auditlog_clickhouse_write/tests/common.py new file mode 100644 index 00000000000..532bd3becf8 --- /dev/null +++ b/auditlog_clickhouse_write/tests/common.py @@ -0,0 +1,129 @@ +import contextlib +import re +from unittest.mock import patch + +from odoo.addons.auditlog.tests.common import AuditLogRuleCommon + + +class DummyClickHouseClient: + """Tiny fake clickhouse client collecting execute() calls.""" + + def __init__( + self, *, raise_on_insert: bool = False, raise_on_line_insert_once: bool = False + ): + self.raise_on_insert = raise_on_insert + self.raise_on_line_insert_once = raise_on_line_insert_once + self._line_failed_once = False + + self.calls = [] # list[(query, params)] + self.log_ids = set() + + def _parse_in_ids(self, query): + m = re.search(r"\bIN\s*\(([^)]*)\)", query, flags=re.IGNORECASE) + if not m: + return set() + raw = m.group(1).strip() + if not raw: + return set() + ids = set() + for part in raw.split(","): + p = part.strip().strip("'") + if not p: + continue + # tests use ints + try: + ids.add(int(p)) + except ValueError: + ids.add(p) + return ids + + def execute(self, query, params=None): + self.calls.append((query, params)) + q = (query or "").strip() + q_up = q.upper() + + if q_up.startswith("SELECT 1"): + return [(1,)] + + if q_up.startswith("SELECT ID FROM") and "AUDITLOG_LOG" in q_up: + wanted = self._parse_in_ids(q) + existing = sorted(self.log_ids.intersection(wanted)) + return [(x,) for x in existing] + + if self.raise_on_insert and "INSERT INTO" in q_up: + raise Exception("Simulated ClickHouse insert error") + + if "INSERT INTO" in q_up and "AUDITLOG_LOG_LINE" in q_up: + if self.raise_on_line_insert_once and not self._line_failed_once: + self._line_failed_once = True + raise Exception("Simulated ClickHouse line insert error") + return [] + + if ( + "INSERT INTO" in q_up + and "AUDITLOG_LOG" in q_up + and "AUDITLOG_LOG_LINE" not in q_up + ): + # collect inserted ids (1st tuple element) + if params: + for row in params: + self.log_ids.add(row[0]) + return [] + + return [] + + +class AuditLogClickhouseCommon(AuditLogRuleCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls._cleanup_clickhouse_test_data() + + @classmethod + def tearDownClass(cls): + try: + cls._cleanup_clickhouse_test_data() + finally: + super().tearDownClass() + + @classmethod + def _cleanup_clickhouse_test_data(cls): + """Ensure clean state for configs and buffer across suites.""" + cls.env["auditlog.clickhouse.config"].sudo().search([]).write( + {"is_active": False} + ) + cls.env["auditlog.log.buffer"].sudo().search([]).unlink() + + @classmethod + def create_config(cls, **vals): + """Create ClickHouse config with minimal defaults for tests.""" + defaults = { + "host": "localhost", + "port": 9000, + "database": "db", + "user": "user", + "password": "pass", + "is_active": False, + } + defaults.update(vals) + return ( + cls.env["auditlog.clickhouse.config"] + .with_context(tracking_disable=True) + .create(defaults) + ) + + @contextlib.contextmanager + def _patched_clickhouse_client(self, *, raise_on_insert: bool = False): + """Patch ClickHouse client getter so tests don't require real ClickHouse.""" + dummy = DummyClickHouseClient(raise_on_insert=raise_on_insert) + target = ( + "odoo.addons.auditlog_clickhouse_write.models." + "auditlog_clickhouse_config.get_clickhouse_client" + ) + with patch(target, autospec=True, return_value=dummy): + yield dummy + + def _parse_payloads(self): + """Return list of decoded payload dicts from buffer (oldest first).""" + buf = self.env["auditlog.log.buffer"].sudo().search([], order="id asc") + return [rec.payload_json for rec in buf] diff --git a/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py b/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py new file mode 100644 index 00000000000..a1a140274a3 --- /dev/null +++ b/auditlog_clickhouse_write/tests/test_auditlog_clickhouse_write.py @@ -0,0 +1,348 @@ +from odoo.tests import tagged +from odoo.tools import mute_logger + +from odoo.addons.queue_job.exception import RetryableJobError + +from .common import AuditLogClickhouseCommon, DummyClickHouseClient + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseBuffer(AuditLogClickhouseCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.groups_model_id = cls.env.ref("base.model_res_groups").id + + # Rule for groups: full logging + cls.groups_rule = cls.create_rule( + { + "name": "testrule groups clickhouse", + "model_id": cls.groups_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_export_data": True, + "log_type": "full", + "capture_record": False, + } + ) + + # Active config to enable buffering + cls.config = cls.create_config(is_active=True) + + def setUp(self): + super().setUp() + self.groups_rule.subscribe() + + def test_01_create_writes_to_buffer_not_auditlog_tables(self): + buf = self.env["auditlog.log.buffer"].sudo() + log_model = self.env["auditlog.log"] + + start_buf = buf.search_count([]) + start_logs = log_model.search_count([("model_id", "=", self.groups_model_id)]) + + group = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "ch_test_group_1"}) + ) + + self.assertEqual( + log_model.search_count([("model_id", "=", self.groups_model_id)]) + - start_logs, + 0, + "auditlog.log must NOT be written by auditlog_clickhouse_write", + ) + self.assertEqual(buf.search_count([]) - start_buf, 1) + + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "create") + self.assertEqual(payload["log"]["model_id"], self.groups_model_id) + self.assertEqual(payload["log"]["res_id"], group.id) + + def test_02_write_creates_lines(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + group = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "CH Group"}) + ) + group.with_context(tracking_disable=True).write({"name": "CH Group v2"}) + + self.assertGreater(buf.search_count([]), start_buf) + + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "write") + self.assertEqual(payload["log"]["model_model"], "res.groups") + + field_names = {line.get("field_name") for line in payload["lines"]} + self.assertIn("name", field_names) + + def test_03_export_data_creates_single_payload_no_lines(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + self.env["res.groups"].search([]).export_data(["name"]) + + self.assertEqual(buf.search_count([]) - start_buf, 1) + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "export_data") + self.assertEqual(payload["lines"], []) + + def test_04_unlink_is_logged(self): + buf = self.env["auditlog.log.buffer"].sudo() + start_buf = buf.search_count([]) + + g = ( + self.env["res.groups"] + .with_context(tracking_disable=True) + .create({"name": "ch_test_group_unlink"}) + ) + g.unlink() + + self.assertGreater(buf.search_count([]), start_buf) + payload = buf.search([], order="id desc", limit=1).payload_json + self.assertEqual(payload["log"]["method"], "unlink") + self.assertIsInstance(payload["lines"], list) + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseQueueJobs(AuditLogClickhouseCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.partner_model_id = cls.env.ref("base.model_res_partner").id + cls.rule = cls.create_rule( + { + "name": "testrule partner clickhouse queue", + "model_id": cls.partner_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "full", + } + ) + cls.config = cls.create_config(is_active=True) + + def setUp(self): + super().setUp() + self.rule.subscribe() + + def test_01_cron_enqueues_job_and_does_not_flush_inline(self): + """ + Cron must only enqueue queue.job (no direct ClickHouse INSERTs here). + """ + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + partner = ( + self.env["res.partner"] + .with_context(tracking_disable=True) + .create({"name": "Cron Enqueue Test"}) + ) + partner.with_context(tracking_disable=True).write( + {"name": "Cron Enqueue Test v2"} + ) + + self.assertGreater(buf.search_count([]), 0) + + start_jobs = job_model.search_count([]) + res = buf._cron_flush_to_clickhouse() # uses config.queue_batch_size + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, + 1, + "Cron must enqueue exactly one job", + ) + + job = job_model.search([], order="id desc", limit=1) + self.assertEqual(job.model_name, "auditlog.log.buffer") + self.assertEqual(job.method_name, "_job_flush_to_clickhouse") + self.assertEqual(job.args[0], self.config.id) + self.assertEqual(job.args[1], self.config.queue_batch_size) + + expected_channel = ( + self.config.queue_channel_id.complete_name + if self.config.queue_channel_id + else "root" + ) + self.assertEqual(job.channel, expected_channel) + + def test_02_cron_skips_when_no_pending_buffers(self): + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + # Ensure no pending buffers + buf.search([]).unlink() + + start_jobs = job_model.search_count([]) + res = buf._cron_flush_to_clickhouse() + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, 0, "No pending buffers -> no job" + ) + + def test_03_cron_skips_without_active_config(self): + self.env["auditlog.clickhouse.config"].search([]).write({"is_active": False}) + + buf = self.env["auditlog.log.buffer"].sudo() + job_model = self.env["queue.job"].sudo() + + start_jobs = job_model.search_count([]) + rec = buf.create( + {"payload_json": {"log": {}, "lines": []}, "state": buf.STATE_PENDING} + ) + + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + res = buf._cron_flush_to_clickhouse() + + self.assertTrue(res) + self.assertEqual( + job_model.search_count([]) - start_jobs, 0, "No active config -> no job" + ) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_PENDING) + self.assertFalse(rec.error_message) + + def test_04_job_flush_success_deletes_buffers_and_calls_insert(self): + buf = self.env["auditlog.log.buffer"].sudo() + + partner = ( + self.env["res.partner"] + .with_context(tracking_disable=True) + .create({"name": "Job Flush OK"}) + ) + partner.with_context(tracking_disable=True).write({"name": "Job Flush OK v2"}) + + self.assertGreater(buf.search_count([]), 0) + + with self._patched_clickhouse_client() as dummy: + buf._job_flush_to_clickhouse(self.config.id, self.config.queue_batch_size) + + self.assertEqual( + buf.search_count([]), + 0, + "Buffers must be removed after successful job flush", + ) + + insert_calls = [ + q for (q, _params) in dummy.calls if "INSERT INTO" in (q or "").upper() + ] + self.assertTrue(insert_calls, "Job must insert into ClickHouse") + + def test_05_job_invalid_payload_marks_error_and_keeps_row(self): + buf = self.env["auditlog.log.buffer"].sudo() + + # Invalid structure for payload_json (Json field accepts string, + # but our code expects mapping with log/lines) + rec = buf.create( + {"payload_json": "NOT A JSON OBJECT", "state": buf.STATE_PENDING} + ) + + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + buf._job_flush_to_clickhouse(self.config.id, batch_size=10) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_ERROR) + self.assertTrue(rec.error_message) + self.assertGreaterEqual(rec.attempt_count, 1) + + def test_06_retry_after_partial_insert_does_not_duplicate_log_rows(self): + buf = self.env["auditlog.log.buffer"].sudo() + buf.search([]).unlink() + + partner = ( + self.env["res.partner"] + .with_context(tracking_disable=True) + .create({"name": "Partial insert"}) + ) + partner.with_context(tracking_disable=True).write({"name": "Partial insert v2"}) + + pending = buf.search([("state", "=", buf.STATE_PENDING)], order="id asc") + valid_buffers, invalid_buffers, log_rows, line_rows = ( + buf._collect_rows_from_buffers(pending) + ) + + self.assertTrue(log_rows) + self.assertTrue(line_rows) + + dummy = DummyClickHouseClient(raise_on_line_insert_once=True) + + # 1st try: expected failure -> mute ERROR traceback + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + with self.assertRaises(RetryableJobError): + buf._insert_rows_to_clickhouse( + client=dummy, + config=self.config, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + # 2nd try: should pass + buf._insert_rows_to_clickhouse( + client=dummy, + config=self.config, + log_rows=log_rows, + line_rows=line_rows, + valid_buffers=valid_buffers, + ) + + log_inserts = [ + q + for (q, _p) in dummy.calls + if "INSERT INTO" in (q or "").upper() + and "AUDITLOG_LOG" in (q or "").upper() + and "AUDITLOG_LOG_LINE" not in (q or "").upper() + ] + self.assertEqual( + len(log_inserts), 1, "Log rows must not be inserted twice on retry" + ) + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseRuleCache(AuditLogClickhouseCommon): + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.partner_model_id = cls.env.ref("base.model_res_partner").id + cls.rule = cls.create_rule( + { + "name": "testrule cache invalidation", + "model_id": cls.partner_model_id, + "log_read": True, + "log_create": True, + "log_write": True, + "log_unlink": True, + "log_type": "full", + "capture_record": False, + } + ) + + def test_01_cache_updates_when_rule_changes_but_id_same(self): + # isolate cache for this test + if hasattr(self.env.registry, "_auditlog_clickhouse_write_rule_cache"): + self.env.registry._auditlog_clickhouse_write_rule_cache = {} + + excluded_1, capture_1 = self.rule._get_rule_settings(self.partner_model_id) + self.assertFalse(capture_1) + + # change rule with same id + self.rule.write({"capture_record": True}) + self.rule.invalidate_recordset() + + excluded_2, capture_2 = self.rule._get_rule_settings(self.partner_model_id) + self.assertTrue(capture_2) diff --git a/auditlog_clickhouse_write/tests/test_clickhouse_config.py b/auditlog_clickhouse_write/tests/test_clickhouse_config.py new file mode 100644 index 00000000000..d6083b8c1b9 --- /dev/null +++ b/auditlog_clickhouse_write/tests/test_clickhouse_config.py @@ -0,0 +1,67 @@ +from odoo.tests import tagged +from odoo.tools import mute_logger + +from .common import AuditLogClickhouseCommon + + +@tagged("-at_install", "post_install") +class TestAuditlogClickhouseConfig(AuditLogClickhouseCommon): + def test_01_single_active_on_create(self): + cfg1 = self.create_config(is_active=True, host="h1") + cfg2 = self.create_config(is_active=True, host="h2") + + cfg1.invalidate_recordset() + cfg2.invalidate_recordset() + + active = self.env["auditlog.clickhouse.config"].search( + [("is_active", "=", True)] + ) + self.assertEqual(len(active), 1) + self.assertTrue(cfg2.is_active) + self.assertFalse(cfg1.is_active) + + def test_02_single_active_on_write(self): + cfg1 = self.create_config(is_active=False, host="h1") + cfg2 = self.create_config(is_active=True, host="h2") + + cfg1.write({"is_active": True}) + cfg1.invalidate_recordset() + cfg2.invalidate_recordset() + + active = self.env["auditlog.clickhouse.config"].search( + [("is_active", "=", True)] + ) + self.assertEqual(len(active), 1) + self.assertTrue(cfg1.is_active) + self.assertFalse(cfg2.is_active) + + def test_03_test_connection_uses_client(self): + cfg = self.create_config(is_active=True) + + # Without patch, get_clickhouse_client may + # raise if clickhouse-driver isn't installed + with self._patched_clickhouse_client() as dummy: + action = cfg.action_test_connection() + + self.assertTrue(action) + self.assertTrue(any("SELECT 1" in (q or "") for (q, params) in dummy.calls)) + + def test_04_cron_skips_without_active_config(self): + self.env["auditlog.clickhouse.config"].search([]).write({"is_active": False}) + + buf = self.env["auditlog.log.buffer"].sudo() + rec = buf.create({"payload_json": "NOT A JSON", "state": buf.STATE_PENDING}) + + with self._patched_clickhouse_client() as dummy: + with mute_logger( + "odoo.addons.auditlog_clickhouse_write.models.auditlog_log_buffer" + ): + res = buf._cron_flush_to_clickhouse(batch_size=10) + + self.assertTrue(res) + + rec.invalidate_recordset() + self.assertEqual(rec.state, buf.STATE_PENDING) + self.assertFalse(rec.error_message) + + self.assertFalse(dummy.calls) diff --git a/auditlog_clickhouse_write/views/auditlog_clickhouse_config_views.xml b/auditlog_clickhouse_write/views/auditlog_clickhouse_config_views.xml new file mode 100644 index 00000000000..9c311160a22 --- /dev/null +++ b/auditlog_clickhouse_write/views/auditlog_clickhouse_config_views.xml @@ -0,0 +1,115 @@ + + + Configure flush action + ir.cron + + ir.actions.act_window + form + new + + + + auditlog.clickhouse.config.form + auditlog.clickhouse.config + +
+
+
+ + + + + + + + + + + + + + + + + + + +
+
+
+ + +
+ Logs are buffered in PostgreSQL and periodically flushed to + ClickHouse by cron. +
+
+
+
+
+
+ + + auditlog.clickhouse.config.list + auditlog.clickhouse.config + + + + + + + + + + + + + + ClickHouse Configuration + auditlog.clickhouse.config + list,form + + + +
diff --git a/requirements.txt b/requirements.txt index 5d1fefa6f23..d3e18b80a6d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ # generated from manifests external_dependencies +clickhouse-driver cryptography dataclasses odoo_test_helper