diff --git a/deps/rabbit/Makefile b/deps/rabbit/Makefile index 96c1312eb8a1..ca42e2b28fdc 100644 --- a/deps/rabbit/Makefile +++ b/deps/rabbit/Makefile @@ -268,7 +268,7 @@ PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channe PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange PARALLEL_CT_SET_2_B = clustering_recovery crashing_queues deprecated_features direct_exchange_routing_v2 disconnect_detected_during_alarm exchanges unit_gen_server2 PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle_cache unit_log_management unit_operator_policy prevent_startup_if_node_was_reset -PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator +PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation stream_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 603f8540a85e..a076a7647e01 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2688,6 +2688,29 @@ end}. ]}. +%% +%% Stream membership reconciliation +%% + +{mapping, "stream.continuous_membership_reconciliation.enabled", "rabbit.stream_membership_reconciliation_enabled", [ + {datatype, {enum, [true, false]}}]}. + +{mapping, "stream.continuous_membership_reconciliation.auto_remove", "rabbit.stream_membership_reconciliation_auto_remove", [ + {datatype, {enum, [true, false]}}]}. + +{mapping, "stream.continuous_membership_reconciliation.interval", "rabbit.stream_membership_reconciliation_interval", [ + {datatype, integer}, {validators, ["non_negative_integer"]} +]}. + +{mapping, "stream.continuous_membership_reconciliation.trigger_interval", "rabbit.stream_membership_reconciliation_trigger_interval", [ + {datatype, integer}, {validators, ["non_negative_integer"]} +]}. + +{mapping, "stream.continuous_membership_reconciliation.target_group_size", "rabbit.stream_membership_reconciliation_target_group_size", [ + {datatype, integer}, {validators, ["non_negative_integer"]} +]}. + + %% %% Runtime parameters %% diff --git a/deps/rabbit/src/rabbit.erl b/deps/rabbit/src/rabbit.erl index c213e13abd00..595233f7bb57 100644 --- a/deps/rabbit/src/rabbit.erl +++ b/deps/rabbit/src/rabbit.erl @@ -183,6 +183,12 @@ [rabbit_quorum_queue_periodic_membership_reconciliation]}}, {requires, [database]}]}). +-rabbit_boot_step({rabbit_stream_periodic_membership_reconciliation, + [{description, "Stream membership reconciliation"}, + {mfa, {rabbit_sup, start_restartable_child, + [rabbit_stream_periodic_membership_reconciliation]}}, + {requires, [recovery]}]}). + -rabbit_boot_step({rabbit_epmd_monitor, [{description, "epmd monitor"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/deps/rabbit/src/rabbit_stream_event_subscriber.erl b/deps/rabbit/src/rabbit_stream_event_subscriber.erl new file mode 100644 index 000000000000..9d1f8e355314 --- /dev/null +++ b/deps/rabbit/src/rabbit_stream_event_subscriber.erl @@ -0,0 +1,54 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_stream_event_subscriber). + +-behaviour(gen_event). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2]). +-export([register/0, unregister/0]). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-rabbit_boot_step({rabbit_stream_event_subscriber, + [{description, "stream event subscriber"}, + {mfa, {?MODULE, register, []}}, + {cleanup, {?MODULE, unregister, []}}, + {requires, rabbit_event}, + {enables, recovery}]}). + +register() -> + gen_event:add_handler(rabbit_alarm, ?MODULE, []), + gen_event:add_handler(rabbit_event, ?MODULE, []). + +unregister() -> + gen_event:delete_handler(rabbit_alarm, ?MODULE, []), + gen_event:delete_handler(rabbit_event, ?MODULE, []). + +init([]) -> + {ok, []}. + +handle_call( _, State) -> + {ok, ok, State}. + +handle_event({node_up, Node}, State) -> + rabbit_stream_periodic_membership_reconciliation:on_node_up(Node), + {ok, State}; +handle_event({node_down, Node}, State) -> + rabbit_stream_periodic_membership_reconciliation:on_node_down(Node), + {ok, State}; +handle_event(#event{type = policy_set}, State) -> + rabbit_stream_periodic_membership_reconciliation:policy_set(), + {ok, State}; +handle_event(#event{type = operator_policy_set}, State) -> + rabbit_stream_periodic_membership_reconciliation:policy_set(), + {ok, State}; +handle_event(_, State) -> + {ok, State}. + +handle_info(_, State) -> + {ok, State}. diff --git a/deps/rabbit/src/rabbit_stream_periodic_membership_reconciliation.erl b/deps/rabbit/src/rabbit_stream_periodic_membership_reconciliation.erl new file mode 100644 index 000000000000..7cff0e9a2b11 --- /dev/null +++ b/deps/rabbit/src/rabbit_stream_periodic_membership_reconciliation.erl @@ -0,0 +1,264 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_stream_periodic_membership_reconciliation). + +-feature(maybe_expr, enable). + +-behaviour(gen_server). + +-export([on_node_up/1, on_node_down/1, queue_created/1, policy_set/0]). + +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-include_lib("rabbit_common/include/rabbit.hrl"). +-include_lib("kernel/include/logger.hrl"). + +-define(SERVER, ?MODULE). +-define(DEFAULT_INTERVAL, 60_000*60). +-define(DEFAULT_TRIGGER_INTERVAL, 10_000). +-define(QUEUE_COUNT_START_RANDOM_SELECTION, 1_000). + +-define(EVAL_MSG, membership_reconciliation). + +-record(state, {timer_ref :: reference() | undefined, + interval :: non_neg_integer(), + trigger_interval :: non_neg_integer(), + target_group_size :: non_neg_integer() | undefined, + enabled :: boolean(), + auto_remove :: boolean()}). + +%%---------------------------------------------------------------------------- +%% Start +%%---------------------------------------------------------------------------- + +-spec start_link() -> rabbit_types:ok_pid_or_error(). +start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%---------------------------------------------------------------------------- +%% API +%%---------------------------------------------------------------------------- + +on_node_up(Node) -> + gen_server:cast(?SERVER, {membership_reconciliation_trigger, {node_up, Node}}). + +on_node_down(Node) -> + gen_server:cast(?SERVER, {membership_reconciliation_trigger, {node_down, Node}}). + +queue_created(Q) -> + gen_server:cast(?SERVER, {membership_reconciliation_trigger, {queue_created, Q}}). + +policy_set() -> + gen_server:cast(?SERVER, {membership_reconciliation_trigger, policy_set}). + +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +init([]) -> + Enabled = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_enabled, + false), + AutoRemove = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_auto_remove, + false), + Interval = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_interval, + ?DEFAULT_INTERVAL), + TriggerInterval = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_trigger_interval, + ?DEFAULT_TRIGGER_INTERVAL), + TargetGroupSize = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_target_group_size, + undefined), + State = #state{interval = Interval, + trigger_interval = TriggerInterval, + target_group_size = TargetGroupSize, + enabled = Enabled, + auto_remove = AutoRemove}, + case Enabled of + true -> + Ref = erlang:send_after(Interval, self(), ?EVAL_MSG), + {ok, State#state{timer_ref = Ref}}; + false -> + {ok, State, hibernate} + end. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast({membership_reconciliation_trigger, _Reason}, #state{enabled = false} = State) -> + {noreply, State, hibernate}; +handle_cast({membership_reconciliation_trigger, Reason}, #state{timer_ref = OldRef, + trigger_interval = Time} = State) -> + ?LOG_DEBUG("Stream membership reconciliation scheduled: ~p", [Reason]), + _ = erlang:cancel_timer(OldRef), + Ref = erlang:send_after(Time, self(), ?EVAL_MSG), + {noreply, State#state{timer_ref = Ref}}; +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(?EVAL_MSG, #state{interval = Interval, + trigger_interval = TriggerInterval} = State) -> + Res = reconciliate_stream_membership(State), + NewTimeout = case Res of + noop -> + Interval; + _ -> + TriggerInterval + end, + Ref = erlang:send_after(NewTimeout, self(), ?EVAL_MSG), + {noreply, State#state{timer_ref = Ref}}; +handle_info(_Info, #state{enabled = false} = State) -> + {noreply, State, hibernate}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- + +reconciliate_stream_membership(State) -> + LocalStreams = rabbit_amqqueue:list_local_stream_queues(), + LocalLeaders = lists:filter(fun(Q) -> + #{leader_node := LeaderNode} = amqqueue:get_type_state(Q), + LeaderNode =:= node() + end, LocalStreams), + ExpectedNodes = rabbit_nodes:list_members(), + Running = rabbit_nodes:list_running(), + reconciliate_stream_members(ExpectedNodes, Running, LocalLeaders, State, noop). + +reconciliate_stream_members([], _Running, _, _State, Result) -> + %% if there are no expected nodes rabbit_nodes:list_running/0 encountered + %% an error during query and returned the empty list which is case we need + %% to handle + Result; +reconciliate_stream_members(_ExpectedNodes, _Running, [], _State, Result) -> + Result; +reconciliate_stream_members(ExpectedNodes, Running, [Q | LocalLeaders], + #state{target_group_size = TargetSize} = State, + OldResult) -> + Result = + maybe + #{name := _StreamId, nodes := MemberNodes, leader_node := LeaderNode} = amqqueue:get_type_state(Q), + %% Check if Leader is indeed this node + LeaderNode ?= node(), + %% And that this node is not in maintenance mode + true ?= not rabbit_maintenance:is_being_drained_local_read(node()), + DanglingNodes = MemberNodes -- ExpectedNodes, + case maybe_remove(DanglingNodes, State) of + false -> + maybe_add_member(Q, Running, MemberNodes, get_target_size(Q, TargetSize)); + true -> + remove_members(Q, DanglingNodes) + end + else + _ -> + noop + end, + reconciliate_stream_members(ExpectedNodes, Running, LocalLeaders, State, + update_result(OldResult, Result)). + +maybe_remove(_, #state{auto_remove = false}) -> + false; +maybe_remove([], #state{auto_remove = true}) -> + false; +maybe_remove(_Nodes, #state{auto_remove = true}) -> + true. + +maybe_add_member(Q, Running, MemberNodes, TargetSize) -> + %% Filter out any new nodes under maintenance + New = rabbit_maintenance:filter_out_drained_nodes_local_read(Running -- MemberNodes), + case should_add_node(MemberNodes, New, TargetSize) of + true -> + %% In the future, sort the list of new nodes based on load, + %% availability zones etc + Node = select_node(New), + QName = #resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q), + case rabbit_stream_queue:add_replica(VHost, Name, Node) of + ok -> + ?LOG_DEBUG( + "Added node ~ts as a replica to ~ts as " + "the streams target group size(#~w) is not met and " + "there are enough new nodes(#~w) in the cluster", + [Node, rabbit_misc:rs(QName), TargetSize, length(New)]); + {error, Err} -> + ?LOG_WARNING( + "~ts: failed to add replica on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]) + end, + ok; + false -> + noop + end. + +should_add_node(MemberNodes, New, TargetSize) -> + CurrentSize = length(MemberNodes), + NumberOfNewNodes = length(New), + maybe + true ?= NumberOfNewNodes > 0, %% There are new nodes to grow to + true ?= CurrentSize < TargetSize, %% Target size not reached + true ?= rabbit_misc:is_even(CurrentSize) orelse NumberOfNewNodes > 1, %% Enough nodes to grow to odd member size + true ?= rabbit_nodes:is_running(lists:delete(node(), MemberNodes)) + end. + +get_target_size(Q, undefined) -> + get_target_size(Q); +get_target_size(Q, N) when N > 0 -> + max(N, get_target_size(Q)). + +get_target_size(Q) -> + PolicyValue = case rabbit_policy:get(<<"target-group-size">>, Q) of + undefined -> + 0; + PolicyN -> + PolicyN + end, + Arguments = amqqueue:get_arguments(Q), + case rabbit_misc:table_lookup(Arguments, <<"x-stream-target-group-size">>) of + undefined -> + PolicyValue; + ArgN -> + max(ArgN, PolicyValue) + end. + +remove_members(_Q, []) -> + ok; +remove_members(Q, [Node | Nodes]) -> + QName = #resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q), + case rabbit_stream_queue:delete_replica(VHost, Name, Node) of + ok -> + QName = amqqueue:get_name(Q), + ?LOG_DEBUG("~ts: Successfully removed replica on node ~w", + [rabbit_misc:rs(QName), Node]), + ok; + {error, Err} -> + QName = amqqueue:get_name(Q), + ?LOG_DEBUG("~ts: failed to remove replica on node " + "~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]) + end, + remove_members(Q, Nodes). + + +%% Make sure any non-noop result is stored. +update_result(noop, Result) -> + Result; +update_result(Result, noop) -> + Result; +update_result(Result, Result) -> + Result. + +select_node([Node]) -> + Node; +select_node(Nodes) -> + lists:nth(rand:uniform(length(Nodes)), Nodes). diff --git a/deps/rabbit/test/stream_member_reconciliation_SUITE.erl b/deps/rabbit/test/stream_member_reconciliation_SUITE.erl new file mode 100644 index 000000000000..2718f587dd39 --- /dev/null +++ b/deps/rabbit/test/stream_member_reconciliation_SUITE.erl @@ -0,0 +1,287 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + + +-module(stream_member_reconciliation_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-compile([nowarn_export_all, export_all]). + +%% The reconciler has two modes of triggering itself +%% - timer based +%% - event based +%% The default config of this test has Interval very short - 5 second which is lower than +%% wait_until timeout. Meaning that even if all domain triggers (node_up/down, policy_set, etc) +%% are disconnected tests would be still green. +%% So to test triggers it is essential to set Interval high enough (the very default value of 60 minutes is perfect) +%% +%% TODO: test `policy_set` trigger + +all() -> + [ + {group, unclustered}, + {group, unclustered_triggers} + ]. + +groups() -> + [ + {unclustered, [], %% low interval, even if triggers do not work all tests should pass + [ + {stream_3, [], [auto_grow, auto_grow_drained_node, auto_shrink]} + ]}, + %% uses an interval longer than `wait_until` (30s by default) + {unclustered_triggers, [], + [ + %% see also `auto_grow_drained_node` + {stream_3, [], [auto_grow, auto_shrink]} + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, []). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(unclustered, Config0) -> + Config1 = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{stream_tick_interval, 1000}, + {stream_membership_reconciliation_enabled, true}, + {stream_membership_reconciliation_auto_remove, true}, + {stream_membership_reconciliation_interval, 5000}, + {stream_membership_reconciliation_trigger_interval, 2000}, + {stream_membership_reconciliation_target_group_size, 3}]}), + rabbit_ct_helpers:set_config(Config1, [{rmq_nodes_clustered, false}]); +init_per_group(unclustered_triggers, Config0) -> + Config1 = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{stream_tick_interval, 1000}, + {stream_membership_reconciliation_enabled, true}, + {stream_membership_reconciliation_auto_remove, true}, + {stream_membership_reconciliation_interval, 50000}, + {stream_membership_reconciliation_trigger_interval, 2000}, + {stream_membership_reconciliation_target_group_size, 3}]}), + %% shrink timeout is set here because without it, when a node stopped right after a queue was created, + %% the test will pass without any triggers because cluster change will likely happen before the trigger_interval, + %% scheduled in response to queue_created event. + %% See also a comment in `auto_shrink/1`. + rabbit_ct_helpers:set_config(Config1, [{rmq_nodes_clustered, false}, + {stream_membership_reconciliation_interval, 50000}, + {shrink_timeout, 2000}]); +init_per_group(Group, Config) -> + ClusterSize = 3, + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}]), + rabbit_ct_helpers:run_steps(Config1, + [fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()). + +end_per_group(unclustered, Config) -> + Config; +end_per_group(unclustered_triggers, Config) -> + Config; +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{queue_name, Q}, + {alt_queue_name, <>}, + {alt_2_queue_name, <>} + ]), + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). + +end_per_testcase(Testcase, Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + amqp_channel:call(Ch, #'queue.delete'{queue = rabbit_data_coercion:to_binary(Testcase)}), + reset_nodes([Server2, Server0], Server1), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +auto_grow(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + %% There is only one node in the cluster at the moment + MemberSize = rabbit_ct_broker_helpers:rpc(Config, Server1, ?MODULE, get_member_size, [QQ]), + ?assertEqual(1, MemberSize), + + add_server_to_cluster(Server0, Server1), + %% With 2 nodes in the cluster, we wont grow as it its an even number and not our target group size + %% new members should be available. We sleep a while so the periodic check + %% runs + timer:sleep(4000), + MemberSize1 = rabbit_ct_broker_helpers:rpc(Config, Server1, ?MODULE, get_member_size, [QQ]), + ?assertEqual(1, MemberSize1), + + add_server_to_cluster(Server2, Server1), + %% With 3 nodes in the cluster, target size is met so eventually it should + %% be 3 members + wait_until(fun() -> + 3 =:= rabbit_ct_broker_helpers:rpc(Config, Server1, ?MODULE, get_member_size, [QQ]) + end). + +auto_grow_drained_node(Config) -> + %% NOTE: with large Interval (larger than wait_until) test will fail. + %% the reason is that entering/exiting drain state does not emit events + %% and even if they did via gen_event, they going to be only local to that node. + %% so reconciliator has no choice but to wait full Interval + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + %% There is only one node in the cluster at the moment + MemberSize = rabbit_ct_broker_helpers:rpc(Config, Server1, ?MODULE, get_member_size, [QQ]), + ?assertEqual(1, MemberSize), + + add_server_to_cluster(Server0, Server1), + %% mark Server0 as drained, which should mean the node is not a candidate + %% for stream membership + rabbit_ct_broker_helpers:mark_as_being_drained(Config, Server0), + rabbit_ct_helpers:await_condition( + fun () -> rabbit_ct_broker_helpers:is_being_drained_local_read(Config, Server0) end, + 10000), + add_server_to_cluster(Server2, Server1), + timer:sleep(5000), + %% We have 3 nodes, but one is drained, so it will not be considered. + MemberSize1 = rabbit_ct_broker_helpers:rpc(Config, Server1, ?MODULE, get_member_size, [QQ]), + ?assertEqual(1, MemberSize1), + + rabbit_ct_broker_helpers:unmark_as_being_drained(Config, Server0), + rabbit_ct_helpers:await_condition( + fun () -> not rabbit_ct_broker_helpers:is_being_drained_local_read(Config, Server0) end, + 10000), + %% We have 3 nodes, none is being drained, so we should grow membership to 3 + wait_until(fun() -> + 3 =:= rabbit_ct_broker_helpers:rpc(Config, Server1, ?MODULE, get_member_size, [QQ]) + end). + +auto_shrink(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + add_server_to_cluster(Server0, Server1), + add_server_to_cluster(Server2, Server1), + + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])), + + wait_until(fun() -> + 3 =:= rabbit_ct_broker_helpers:rpc(Config, Server1, ?MODULE, get_member_size, [QQ]) + end), + + %% Stream member reconciliation does not act immediately but rather after a scheduled delay. + %% So if this test wants to test that the reconciliator reacts to, say, node_down or a similar event, + %% it has to wait at least a trigger_interval ms to pass before removing node. Otherwise + %% the shrink effect would come from the previous trigger. + %% + %% When a `queue_created` trigger set up a timer to fire after a trigger_interval, the queue has 3 members + %% and stop_app executes much quicker than the trigger_interval. Therefore the number of members + %% will be updated even without a node_down event. + + timer:sleep(rabbit_ct_helpers:get_config(Config, shrink_timeout, 0)), + + ok = rabbit_control_helper:command(stop_app, Server2), + ok = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_db_cluster, forget_member, + [Server2, false]), + %% with one node 'forgotten', eventually the membership will shrink to 2 + wait_until(fun() -> + 2 =:= rabbit_ct_broker_helpers:rpc(Config, Server1, ?MODULE, get_member_size, [QQ]) + end). + +%% ------------------------------------------------------------------- +%% Helpers. +%% ------------------------------------------------------------------- + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env( + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [{core_metrics_gc_interval, 100}]}), + {ra, [{min_wal_roll_over_interval, 30000}]}). + +reset_nodes([], _Leader) -> + ok; +reset_nodes([Node| Nodes], Leader) -> + ok = rabbit_control_helper:command(stop_app, Node), + case rabbit_control_helper:command(forget_cluster_node, Leader, [atom_to_list(Node)]) of + ok -> ok; + {error, _, <<"Error:\n{:not_a_cluster_node, ~c\"The node selected is not in the cluster.\"}">>} -> ok + end, + ok = rabbit_control_helper:command(reset, Node), + ok = rabbit_control_helper:command(start_app, Node), + reset_nodes(Nodes, Leader). + +add_server_to_cluster(Server, Leader) -> + ok = rabbit_control_helper:command(stop_app, Server), + ok = rabbit_control_helper:command(join_cluster, Server, [atom_to_list(Leader)], []), + rabbit_control_helper:command(start_app, Server). + +declare(Ch, Q) -> + declare(Ch, Q, []). + +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + auto_delete = false, + arguments = Args}). + +wait_until(Condition) -> + wait_until(Condition, 180). + +wait_until(Condition, 0) -> + ?assertEqual(true, Condition()); +wait_until(Condition, N) -> + case Condition() of + true -> + ok; + _ -> + timer:sleep(500), + wait_until(Condition, N - 1) + end. + +get_member_size(QueueName) -> + {ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QueueName)), + #{name := StreamId} = amqqueue:get_type_state(Q), + {ok, Members} = rabbit_stream_coordinator:members(StreamId), + map_size(Members). + + +stream_id(_) -> + ok. + +delete_queues() -> + [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()].