Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_db_topic_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
-define(MNESIA_NODE_TABLE, rabbit_topic_trie_node).
-define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge).
-define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding).
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie).
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v2).

-type match_result() :: [rabbit_types:binding_destination() |
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
Expand Down
39 changes: 37 additions & 2 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@
get_feature_state/0, get_feature_state/1,
handle_fallback/1]).

%% Called remotely to handle unregistration of old projections.
-export([supports_rabbit_khepri_topic_trie_v2/0]).

-ifdef(TEST).
-export([register_projections/0,
force_metadata_store/1,
Expand Down Expand Up @@ -1541,7 +1544,7 @@ projection_fun_for_sets(MapFun) ->
end.

register_rabbit_topic_graph_projection() ->
Name = rabbit_khepri_topic_trie,
Name = rabbit_khepri_topic_trie_v2,
%% This projection calls some external functions which are disallowed by
%% Horus because they interact with global or random state. We explicitly
%% allow them here for performance reasons.
Expand Down Expand Up @@ -1612,8 +1615,38 @@ register_rabbit_topic_graph_projection() ->
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
_ = unregister_rabbit_topic_trie_v1_projection(),
khepri:register_projection(?STORE_ID, PathPattern, Projection).

supports_rabbit_khepri_topic_trie_v2() ->
true.

unregister_rabbit_topic_trie_v1_projection() ->
Nodes = rabbit_nodes:list_members(),
Rets = erpc:multicall(
Nodes,
?MODULE, supports_rabbit_khepri_topic_trie_v2, []),
SupportedEverywhere = lists:all(
fun(Ret) ->
Ret =:= {ok, true}
end, Rets),
case SupportedEverywhere of
true ->
?LOG_DEBUG(
"DB: unregister old `rabbit_khepri_topic_trie` Khepri "
"projection",
#{domain => ?RMQLOG_DOMAIN_DB}),
khepri:unregister_projections(
?STORE_ID, [rabbit_khepri_topic_trie]);
false ->
?LOG_DEBUG(
"DB: skipping unregistration of old "
"`rabbit_khepri_topic_trie` Khepri because some RabbitMQ "
"nodes still use it",
#{domain => ?RMQLOG_DOMAIN_DB}),
ok
end.

-spec follow_down_update(Table, Exchange, Words, UpdateFn) -> Ret when
Table :: ets:tid(),
Exchange :: rabbit_types:exchange_name(),
Expand Down Expand Up @@ -1660,7 +1693,9 @@ follow_down_update(Table, Exchange, FromNodeId, [To | Rest], UpdateFn) ->
case follow_down_update(Table, Exchange, ToNodeId, Rest, UpdateFn) of
delete ->
OutEdgePattern = #topic_trie_edge{trie_edge =
TrieEdge#trie_edge{word = '_'},
TrieEdge#trie_edge{
node_id = ToNodeId,
word = '_'},
node_id = '_'},
case ets:match(Table, OutEdgePattern, 1) of
'$end_of_table' ->
Expand Down
216 changes: 211 additions & 5 deletions deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").

-compile([nowarn_export_all, export_all]).

-define(VHOST, <<"/">>).

all() ->
[
{group, mnesia_store}
{group, mnesia_store},
{group, khepri_store}
].

groups() ->
[
{mnesia_store, [], mnesia_tests()},
{khepri_store, [], khepri_tests()},
{benchmarks, [], benchmarks()}
].

Expand All @@ -40,6 +44,11 @@ mnesia_tests() ->
build_multiple_key_from_deletion_events
].

khepri_tests() ->
[
topic_trie_cleanup
].

benchmarks() ->
[
match_benchmark
Expand All @@ -53,15 +62,26 @@ end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(mnesia_store = Group, Config0) ->
Config = rabbit_ct_helpers:set_config(Config0, [{metadata_store, mnesia}]),
Config = rabbit_ct_helpers:set_config(
Config0,
[{metadata_store, mnesia},
{rmq_nodes_count, 1}]),
init_per_group_common(Group, Config);
init_per_group(khepri_store = Group, Config0) ->
Config = rabbit_ct_helpers:set_config(
Config0,
[{metadata_store, khepri},
{rmq_nodes_count, 3}]),
init_per_group_common(Group, Config);
init_per_group(Group, Config) ->
init_per_group(Group, Config0) ->
Config = rabbit_ct_helpers:set_config(
Config0,
[{rmq_nodes_count, 1}]),
init_per_group_common(Group, Config).

init_per_group_common(Group, Config) ->
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, Group},
{rmq_nodes_count, 1}
{rmq_nodename_suffix, Group}
]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
Expand Down Expand Up @@ -375,6 +395,192 @@ build_multiple_key_from_deletion_events1(Config) ->
lists:sort([RK || {_, RK} <- rabbit_db_topic_exchange:trie_records_to_key(Records)])),
passed.

%% ---------------------------------------------------------------------------
%% Khepri-specific Tests
%% ---------------------------------------------------------------------------

% https://github.com/rabbitmq/rabbitmq-server/issues/15024
topic_trie_cleanup(Config) ->
[_, OldNode, NewNode] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

%% this test has to be isolated to avoid flakes
VHost = <<"test-vhost-topic-trie">>,
ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_vhost, add, [VHost, <<"test-user">>]),

%% Create an exchange in the vhost
ExchangeName = rabbit_misc:r(VHost, exchange, <<"test-topic-exchange">>),
{ok, _Exchange} = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_exchange, declare,
[ExchangeName, topic, _Durable = true, _AutoDelete = false,
_Internal = false, _Args = [], <<"test-user">>]),

%% List of routing keys that exercise topic exchange functionality
RoutingKeys = [
%% Exact patterns with common prefixes
<<"a.b.c">>,
<<"a.b.d">>,
<<"a.b.e">>,
<<"a.c.d">>,
<<"a.c.e">>,
<<"b.c.d">>,
%% Patterns with a single wildcard
<<"a.*.c">>,
<<"a.*.d">>,
<<"*.b.c">>,
<<"*.b.d">>,
<<"a.b.*">>,
<<"a.c.*">>,
<<"*.*">>,
<<"a.*">>,
<<"*.b">>,
<<"*">>,
%% Patterns with multiple wildcards
<<"a.#">>,
<<"a.b.#">>,
<<"a.c.#">>,
<<"#.c">>,
<<"#.b.c">>,
<<"#.b.d">>,
<<"#">>,
<<"#.#">>,
%% Mixed patterns
<<"a.*.#">>,
<<"*.b.#">>,
<<"*.#">>,
<<"#.*">>,
<<"#.*.#">>,
%% More complex patterns with common prefixes
<<"orders.created.#">>,
<<"orders.updated.#">>,
<<"orders.*.confirmed">>,
<<"orders.#">>,
<<"events.user.#">>,
<<"events.system.#">>,
<<"events.#">>
],

%% Shuffle the routing keys to test in random order
ShuffledRoutingKeys = [RK || {_, RK} <- lists:sort([{rand:uniform(), RK} || RK <- RoutingKeys])],

%% Create bindings for all routing keys
Bindings = [begin
QueueName = rabbit_misc:r(VHost, queue,
list_to_binary("queue-" ++ integer_to_list(Idx))),
Ret = rabbit_ct_broker_helpers:rpc(
Config, OldNode,
rabbit_amqqueue, declare, [QueueName, true, false, [], self(), <<"test-user">>]),
case Ret of
{new, _Q} -> ok;
{existing, _Q} -> ok
end,
#binding{source = ExchangeName,
key = RoutingKey,
destination = QueueName,
args = []}
end || {Idx, RoutingKey} <- lists:enumerate(ShuffledRoutingKeys)],

%% Add all bindings
[ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, add, [B, <<"test-user">>])
|| B <- Bindings],

%% Log entries that were added to the ETS table
lists:foreach(
fun(Node) ->
VHostEntriesAfterAdd = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2),
ct:pal("Bindings added on node ~s: ~p, ETS entries after add: ~p~n",
[Node, length(Bindings), length(VHostEntriesAfterAdd)])
end, Nodes),

%% Shuffle bindings again for deletion in random order
ShuffledBindings = [B || {_, B} <- lists:sort([{rand:uniform(), B} || B <- Bindings])],

%% Delete all bindings in random order
[ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, remove, [B, <<"test-user">>])
|| B <- ShuffledBindings],

%% Verify that the projection ETS table doesn't contain any entries related
%% to this vhost
try
lists:foreach(
fun(Node) ->
%% We read and check the new projection table only. It is
%% declared by the new node and is available everywhere. The
%% old projection table might be there in case of
%% mixed-version testing. This part will be tested in the
%% second part of the testcase.
VHostEntriesAfterDelete = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2),
ct:pal("ETS entries after delete on node ~s: ~p~n", [Node, length(VHostEntriesAfterDelete)]),

%% Assert that no entries were found for this vhost after deletion
?assertEqual([], VHostEntriesAfterDelete)
end, Nodes),

%% If we reach this point, we know the new projection works as expected
%% and the leaked ETS entries are no more.
%%
%% Now, we want to test that after an upgrade, the old projection is
%% unregistered.
HasOldProjection = try
VHostEntriesInOldTable = read_topic_trie_table(
Config, OldNode, VHost, rabbit_khepri_topic_trie),
ct:pal("Old ETS table entries after delete: ~p~n", [length(VHostEntriesInOldTable)]),
?assertNotEqual([], VHostEntriesInOldTable),
true
catch
error:{exception, badarg, _} ->
%% The old projection doesn't exist. The old
%% node, if we are in a mixed-version test,
%% also supports the new projection. There
%% is nothing more to test.
ct:pal("The old projection was not registered, nothing to test"),
false
end,

case HasOldProjection of
true ->
%% The old projection is registered. Simulate an update by removing
%% node 1 (which is the old one in our mixed-version testing) from
%% the cluster, then restart node 2. On restart, it should
%% unregister the old projection.
%%
%% FIXME: The cluster is configured at the test group level.
%% Therefore, if we add more testcases to this group, following
%% testcases won't have the expected cluster.
?assertEqual(ok, rabbit_ct_broker_helpers:stop_broker(Config, OldNode)),
?assertEqual(ok, rabbit_ct_broker_helpers:forget_cluster_node(Config, NewNode, OldNode)),

ct:pal("Restart new node (node 2)"),
?assertEqual(ok, rabbit_ct_broker_helpers:restart_broker(Config, NewNode)),

ct:pal("Wait for projections to be restored"),
?awaitMatch(
Entries when is_list(Entries),
catch read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie_v2),
60000),

ct:pal("Check that the old projection is gone"),
?assertError(
{exception, badarg, _},
read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie));
false ->
ok
end
after
%% Clean up the vhost
ok = rabbit_ct_broker_helpers:rpc(Config, NewNode, rabbit_vhost, delete, [VHost, <<"test-user">>])
end,

passed.

read_topic_trie_table(Config, Node, VHost, Table) ->
Entries = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list, [Table]),
[Entry || #topic_trie_edge{trie_edge = TrieEdge} = Entry <- Entries,
case TrieEdge of
#trie_edge{exchange_name = #resource{virtual_host = V}} ->
V =:= VHost;
_ ->
false
end].

%% ---------------------------------------------------------------------------
%% Benchmarks
%% ---------------------------------------------------------------------------
Expand Down
Loading