From cc3d23fb462c1c506d5437aacfd9d64e0b327026 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Thu, 27 Nov 2025 11:13:00 +0100 Subject: [PATCH] rabbit_khepri: Fix topic binding deletion leak [Why] We use a Khepri projection to compute a graph for bindings that have a topic exchange as their source. This allows more efficient queries during routing. This graph is not stored in Khepri, only in the projection ETS table. When a binding is deleted, we need to clean up the graph. However, the pattern used to match the trie edges to delete was incorrect, leading to "orphaned" trie edges. The accumulation of these leftovers caused a memory leak. [How] The pattern was fixed to correctly match the appropriate trie edges. However, this fix alone is effective for new deployments of RabbitMQ only, when the projection function is registered for the first time. We also need to handle the update of already registered projections in existing clusters. To achieve that, first, we renamed the projection from `rabbit_khepri_topic_trie` to `rabbit_khepri_topic_trie_v2` to distinguish the bad one and the good one. Any updated RabbitMQ nodes in an existing cluster will use this new projection. Other existing out-of-date nodes will continue to use the old projection. Because both projections continue to exist, the cluster will still be affected by the memory leak. Then, each node will verify on startup if all other cluster members support the new projection. If that is the case, they will unregister the old projection. Therefore, once all nodes in a cluster are up-to-date and use the new projection, the old one will go away and the leaked memory will be reclaimed. This startup check could have been made simpler with a feature flag. We decided to go with a custom check in case a user would try to upgrade from a 4.1.x release that has the fix to a 4.2.x release that does not for instance. A feature flag would have prevented that upgrade path. Fixes #15024. --- deps/rabbit/src/rabbit_db_topic_exchange.erl | 2 +- deps/rabbit/src/rabbit_khepri.erl | 39 +++- .../test/rabbit_db_topic_exchange_SUITE.erl | 216 +++++++++++++++++- 3 files changed, 249 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_db_topic_exchange.erl b/deps/rabbit/src/rabbit_db_topic_exchange.erl index 35e6cf59d56c..76c149371ab1 100644 --- a/deps/rabbit/src/rabbit_db_topic_exchange.erl +++ b/deps/rabbit/src/rabbit_db_topic_exchange.erl @@ -26,7 +26,7 @@ -define(MNESIA_NODE_TABLE, rabbit_topic_trie_node). -define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge). -define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding). --define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie). +-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v2). -type match_result() :: [rabbit_types:binding_destination() | {rabbit_amqqueue:name(), rabbit_types:binding_key()}]. diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 6b7f0a6a898e..d79ee6bcbecc 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -175,6 +175,9 @@ get_feature_state/0, get_feature_state/1, handle_fallback/1]). +%% Called remotely to handle unregistration of old projections. +-export([supports_rabbit_khepri_topic_trie_v2/0]). + -ifdef(TEST). -export([register_projections/0, force_metadata_store/1, @@ -1541,7 +1544,7 @@ projection_fun_for_sets(MapFun) -> end. register_rabbit_topic_graph_projection() -> - Name = rabbit_khepri_topic_trie, + Name = rabbit_khepri_topic_trie_v2, %% This projection calls some external functions which are disallowed by %% Horus because they interact with global or random state. We explicitly %% allow them here for performance reasons. @@ -1612,8 +1615,38 @@ register_rabbit_topic_graph_projection() -> _Kind = ?KHEPRI_WILDCARD_STAR, _DstName = ?KHEPRI_WILDCARD_STAR, _RoutingKey = ?KHEPRI_WILDCARD_STAR), + _ = unregister_rabbit_topic_trie_v1_projection(), khepri:register_projection(?STORE_ID, PathPattern, Projection). +supports_rabbit_khepri_topic_trie_v2() -> + true. + +unregister_rabbit_topic_trie_v1_projection() -> + Nodes = rabbit_nodes:list_members(), + Rets = erpc:multicall( + Nodes, + ?MODULE, supports_rabbit_khepri_topic_trie_v2, []), + SupportedEverywhere = lists:all( + fun(Ret) -> + Ret =:= {ok, true} + end, Rets), + case SupportedEverywhere of + true -> + ?LOG_DEBUG( + "DB: unregister old `rabbit_khepri_topic_trie` Khepri " + "projection", + #{domain => ?RMQLOG_DOMAIN_DB}), + khepri:unregister_projections( + ?STORE_ID, [rabbit_khepri_topic_trie]); + false -> + ?LOG_DEBUG( + "DB: skipping unregistration of old " + "`rabbit_khepri_topic_trie` Khepri because some RabbitMQ " + "nodes still use it", + #{domain => ?RMQLOG_DOMAIN_DB}), + ok + end. + -spec follow_down_update(Table, Exchange, Words, UpdateFn) -> Ret when Table :: ets:tid(), Exchange :: rabbit_types:exchange_name(), @@ -1660,7 +1693,9 @@ follow_down_update(Table, Exchange, FromNodeId, [To | Rest], UpdateFn) -> case follow_down_update(Table, Exchange, ToNodeId, Rest, UpdateFn) of delete -> OutEdgePattern = #topic_trie_edge{trie_edge = - TrieEdge#trie_edge{word = '_'}, + TrieEdge#trie_edge{ + node_id = ToNodeId, + word = '_'}, node_id = '_'}, case ets:match(Table, OutEdgePattern, 1) of '$end_of_table' -> diff --git a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl index b59a1696fc75..f526924155ed 100644 --- a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl @@ -11,18 +11,22 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). + -compile([nowarn_export_all, export_all]). -define(VHOST, <<"/">>). all() -> [ - {group, mnesia_store} + {group, mnesia_store}, + {group, khepri_store} ]. groups() -> [ {mnesia_store, [], mnesia_tests()}, + {khepri_store, [], khepri_tests()}, {benchmarks, [], benchmarks()} ]. @@ -40,6 +44,11 @@ mnesia_tests() -> build_multiple_key_from_deletion_events ]. +khepri_tests() -> + [ + topic_trie_cleanup + ]. + benchmarks() -> [ match_benchmark @@ -53,15 +62,26 @@ end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). init_per_group(mnesia_store = Group, Config0) -> - Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, mnesia}]), + Config = rabbit_ct_helpers:set_config( + Config0, + [{metadata_store, mnesia}, + {rmq_nodes_count, 1}]), + init_per_group_common(Group, Config); +init_per_group(khepri_store = Group, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [{metadata_store, khepri}, + {rmq_nodes_count, 3}]), init_per_group_common(Group, Config); -init_per_group(Group, Config) -> +init_per_group(Group, Config0) -> + Config = rabbit_ct_helpers:set_config( + Config0, + [{rmq_nodes_count, 1}]), init_per_group_common(Group, Config). init_per_group_common(Group, Config) -> Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group}, - {rmq_nodes_count, 1} + {rmq_nodename_suffix, Group} ]), rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ @@ -375,6 +395,192 @@ build_multiple_key_from_deletion_events1(Config) -> lists:sort([RK || {_, RK} <- rabbit_db_topic_exchange:trie_records_to_key(Records)])), passed. +%% --------------------------------------------------------------------------- +%% Khepri-specific Tests +%% --------------------------------------------------------------------------- + +% https://github.com/rabbitmq/rabbitmq-server/issues/15024 +topic_trie_cleanup(Config) -> + [_, OldNode, NewNode] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% this test has to be isolated to avoid flakes + VHost = <<"test-vhost-topic-trie">>, + ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_vhost, add, [VHost, <<"test-user">>]), + + %% Create an exchange in the vhost + ExchangeName = rabbit_misc:r(VHost, exchange, <<"test-topic-exchange">>), + {ok, _Exchange} = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_exchange, declare, + [ExchangeName, topic, _Durable = true, _AutoDelete = false, + _Internal = false, _Args = [], <<"test-user">>]), + + %% List of routing keys that exercise topic exchange functionality + RoutingKeys = [ + %% Exact patterns with common prefixes + <<"a.b.c">>, + <<"a.b.d">>, + <<"a.b.e">>, + <<"a.c.d">>, + <<"a.c.e">>, + <<"b.c.d">>, + %% Patterns with a single wildcard + <<"a.*.c">>, + <<"a.*.d">>, + <<"*.b.c">>, + <<"*.b.d">>, + <<"a.b.*">>, + <<"a.c.*">>, + <<"*.*">>, + <<"a.*">>, + <<"*.b">>, + <<"*">>, + %% Patterns with multiple wildcards + <<"a.#">>, + <<"a.b.#">>, + <<"a.c.#">>, + <<"#.c">>, + <<"#.b.c">>, + <<"#.b.d">>, + <<"#">>, + <<"#.#">>, + %% Mixed patterns + <<"a.*.#">>, + <<"*.b.#">>, + <<"*.#">>, + <<"#.*">>, + <<"#.*.#">>, + %% More complex patterns with common prefixes + <<"orders.created.#">>, + <<"orders.updated.#">>, + <<"orders.*.confirmed">>, + <<"orders.#">>, + <<"events.user.#">>, + <<"events.system.#">>, + <<"events.#">> + ], + + %% Shuffle the routing keys to test in random order + ShuffledRoutingKeys = [RK || {_, RK} <- lists:sort([{rand:uniform(), RK} || RK <- RoutingKeys])], + + %% Create bindings for all routing keys + Bindings = [begin + QueueName = rabbit_misc:r(VHost, queue, + list_to_binary("queue-" ++ integer_to_list(Idx))), + Ret = rabbit_ct_broker_helpers:rpc( + Config, OldNode, + rabbit_amqqueue, declare, [QueueName, true, false, [], self(), <<"test-user">>]), + case Ret of + {new, _Q} -> ok; + {existing, _Q} -> ok + end, + #binding{source = ExchangeName, + key = RoutingKey, + destination = QueueName, + args = []} + end || {Idx, RoutingKey} <- lists:enumerate(ShuffledRoutingKeys)], + + %% Add all bindings + [ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, add, [B, <<"test-user">>]) + || B <- Bindings], + + %% Log entries that were added to the ETS table + lists:foreach( + fun(Node) -> + VHostEntriesAfterAdd = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2), + ct:pal("Bindings added on node ~s: ~p, ETS entries after add: ~p~n", + [Node, length(Bindings), length(VHostEntriesAfterAdd)]) + end, Nodes), + + %% Shuffle bindings again for deletion in random order + ShuffledBindings = [B || {_, B} <- lists:sort([{rand:uniform(), B} || B <- Bindings])], + + %% Delete all bindings in random order + [ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, remove, [B, <<"test-user">>]) + || B <- ShuffledBindings], + + %% Verify that the projection ETS table doesn't contain any entries related + %% to this vhost + try + lists:foreach( + fun(Node) -> + %% We read and check the new projection table only. It is + %% declared by the new node and is available everywhere. The + %% old projection table might be there in case of + %% mixed-version testing. This part will be tested in the + %% second part of the testcase. + VHostEntriesAfterDelete = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2), + ct:pal("ETS entries after delete on node ~s: ~p~n", [Node, length(VHostEntriesAfterDelete)]), + + %% Assert that no entries were found for this vhost after deletion + ?assertEqual([], VHostEntriesAfterDelete) + end, Nodes), + + %% If we reach this point, we know the new projection works as expected + %% and the leaked ETS entries are no more. + %% + %% Now, we want to test that after an upgrade, the old projection is + %% unregistered. + HasOldProjection = try + VHostEntriesInOldTable = read_topic_trie_table( + Config, OldNode, VHost, rabbit_khepri_topic_trie), + ct:pal("Old ETS table entries after delete: ~p~n", [length(VHostEntriesInOldTable)]), + ?assertNotEqual([], VHostEntriesInOldTable), + true + catch + error:{exception, badarg, _} -> + %% The old projection doesn't exist. The old + %% node, if we are in a mixed-version test, + %% also supports the new projection. There + %% is nothing more to test. + ct:pal("The old projection was not registered, nothing to test"), + false + end, + + case HasOldProjection of + true -> + %% The old projection is registered. Simulate an update by removing + %% node 1 (which is the old one in our mixed-version testing) from + %% the cluster, then restart node 2. On restart, it should + %% unregister the old projection. + %% + %% FIXME: The cluster is configured at the test group level. + %% Therefore, if we add more testcases to this group, following + %% testcases won't have the expected cluster. + ?assertEqual(ok, rabbit_ct_broker_helpers:stop_broker(Config, OldNode)), + ?assertEqual(ok, rabbit_ct_broker_helpers:forget_cluster_node(Config, NewNode, OldNode)), + + ct:pal("Restart new node (node 2)"), + ?assertEqual(ok, rabbit_ct_broker_helpers:restart_broker(Config, NewNode)), + + ct:pal("Wait for projections to be restored"), + ?awaitMatch( + Entries when is_list(Entries), + catch read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie_v2), + 60000), + + ct:pal("Check that the old projection is gone"), + ?assertError( + {exception, badarg, _}, + read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie)); + false -> + ok + end + after + %% Clean up the vhost + ok = rabbit_ct_broker_helpers:rpc(Config, NewNode, rabbit_vhost, delete, [VHost, <<"test-user">>]) + end, + + passed. + +read_topic_trie_table(Config, Node, VHost, Table) -> + Entries = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list, [Table]), + [Entry || #topic_trie_edge{trie_edge = TrieEdge} = Entry <- Entries, + case TrieEdge of + #trie_edge{exchange_name = #resource{virtual_host = V}} -> + V =:= VHost; + _ -> + false + end]. + %% --------------------------------------------------------------------------- %% Benchmarks %% ---------------------------------------------------------------------------