Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channe
PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange
PARALLEL_CT_SET_2_B = clustering_recovery crashing_queues deprecated_features direct_exchange_routing_v2 disconnect_detected_during_alarm exchanges unit_gen_server2
PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_file_handle_cache unit_log_management unit_operator_policy prevent_startup_if_node_was_reset
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation stream_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator

PARALLEL_CT_SET_3_A = definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue
PARALLEL_CT_SET_3_B = cluster_upgrade list_consumers_sanity_check list_queues_online_and_offline logging lqueue maintenance_mode rabbit_fifo_q
Expand Down
23 changes: 23 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2688,6 +2688,29 @@ end}.
]}.


%%
%% Stream membership reconciliation
%%

{mapping, "stream.continuous_membership_reconciliation.enabled", "rabbit.stream_membership_reconciliation_enabled", [
{datatype, {enum, [true, false]}}]}.

{mapping, "stream.continuous_membership_reconciliation.auto_remove", "rabbit.stream_membership_reconciliation_auto_remove", [
{datatype, {enum, [true, false]}}]}.

{mapping, "stream.continuous_membership_reconciliation.interval", "rabbit.stream_membership_reconciliation_interval", [
{datatype, integer}, {validators, ["non_negative_integer"]}
]}.

{mapping, "stream.continuous_membership_reconciliation.trigger_interval", "rabbit.stream_membership_reconciliation_trigger_interval", [
{datatype, integer}, {validators, ["non_negative_integer"]}
]}.

{mapping, "stream.continuous_membership_reconciliation.target_group_size", "rabbit.stream_membership_reconciliation_target_group_size", [
{datatype, integer}, {validators, ["non_negative_integer"]}
]}.


%%
%% Runtime parameters
%%
Expand Down
6 changes: 6 additions & 0 deletions deps/rabbit/src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@
[rabbit_quorum_queue_periodic_membership_reconciliation]}},
{requires, [database]}]}).

-rabbit_boot_step({rabbit_stream_periodic_membership_reconciliation,
[{description, "Stream membership reconciliation"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_stream_periodic_membership_reconciliation]}},
{requires, [recovery]}]}).

-rabbit_boot_step({rabbit_epmd_monitor,
[{description, "epmd monitor"},
{mfa, {rabbit_sup, start_restartable_child,
Expand Down
54 changes: 54 additions & 0 deletions deps/rabbit/src/rabbit_stream_event_subscriber.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_stream_event_subscriber).

-behaviour(gen_event).

-export([init/1, handle_event/2, handle_call/2, handle_info/2]).
-export([register/0, unregister/0]).

-include_lib("rabbit_common/include/rabbit.hrl").

-rabbit_boot_step({rabbit_stream_event_subscriber,
[{description, "stream event subscriber"},
{mfa, {?MODULE, register, []}},
{cleanup, {?MODULE, unregister, []}},
{requires, rabbit_event},
{enables, recovery}]}).

register() ->
gen_event:add_handler(rabbit_alarm, ?MODULE, []),
gen_event:add_handler(rabbit_event, ?MODULE, []).

unregister() ->
gen_event:delete_handler(rabbit_alarm, ?MODULE, []),
gen_event:delete_handler(rabbit_event, ?MODULE, []).

init([]) ->
{ok, []}.

handle_call( _, State) ->
{ok, ok, State}.

handle_event({node_up, Node}, State) ->
rabbit_stream_periodic_membership_reconciliation:on_node_up(Node),
{ok, State};
handle_event({node_down, Node}, State) ->
rabbit_stream_periodic_membership_reconciliation:on_node_down(Node),
{ok, State};
handle_event(#event{type = policy_set}, State) ->
rabbit_stream_periodic_membership_reconciliation:policy_set(),
{ok, State};
handle_event(#event{type = operator_policy_set}, State) ->
rabbit_stream_periodic_membership_reconciliation:policy_set(),
{ok, State};
handle_event(_, State) ->
{ok, State}.

handle_info(_, State) ->
{ok, State}.
264 changes: 264 additions & 0 deletions deps/rabbit/src/rabbit_stream_periodic_membership_reconciliation.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(rabbit_stream_periodic_membership_reconciliation).

-feature(maybe_expr, enable).

-behaviour(gen_server).

-export([on_node_up/1, on_node_down/1, queue_created/1, policy_set/0]).

-export([start_link/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("kernel/include/logger.hrl").

-define(SERVER, ?MODULE).
-define(DEFAULT_INTERVAL, 60_000*60).
-define(DEFAULT_TRIGGER_INTERVAL, 10_000).
-define(QUEUE_COUNT_START_RANDOM_SELECTION, 1_000).

-define(EVAL_MSG, membership_reconciliation).

-record(state, {timer_ref :: reference() | undefined,
interval :: non_neg_integer(),
trigger_interval :: non_neg_integer(),
target_group_size :: non_neg_integer() | undefined,
enabled :: boolean(),
auto_remove :: boolean()}).

%%----------------------------------------------------------------------------
%% Start
%%----------------------------------------------------------------------------

-spec start_link() -> rabbit_types:ok_pid_or_error().
start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%%----------------------------------------------------------------------------
%% API
%%----------------------------------------------------------------------------

on_node_up(Node) ->
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {node_up, Node}}).

on_node_down(Node) ->
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {node_down, Node}}).

queue_created(Q) ->
gen_server:cast(?SERVER, {membership_reconciliation_trigger, {queue_created, Q}}).

policy_set() ->
gen_server:cast(?SERVER, {membership_reconciliation_trigger, policy_set}).

%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------

init([]) ->
Enabled = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_enabled,
false),
AutoRemove = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_auto_remove,
false),
Interval = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_interval,
?DEFAULT_INTERVAL),
TriggerInterval = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_trigger_interval,
?DEFAULT_TRIGGER_INTERVAL),
TargetGroupSize = rabbit_misc:get_env(rabbit, stream_membership_reconciliation_target_group_size,
undefined),
State = #state{interval = Interval,
trigger_interval = TriggerInterval,
target_group_size = TargetGroupSize,
enabled = Enabled,
auto_remove = AutoRemove},
case Enabled of
true ->
Ref = erlang:send_after(Interval, self(), ?EVAL_MSG),
{ok, State#state{timer_ref = Ref}};
false ->
{ok, State, hibernate}
end.

handle_call(_Request, _From, State) ->
{reply, ok, State}.

handle_cast({membership_reconciliation_trigger, _Reason}, #state{enabled = false} = State) ->
{noreply, State, hibernate};
handle_cast({membership_reconciliation_trigger, Reason}, #state{timer_ref = OldRef,
trigger_interval = Time} = State) ->
?LOG_DEBUG("Stream membership reconciliation scheduled: ~p", [Reason]),
_ = erlang:cancel_timer(OldRef),
Ref = erlang:send_after(Time, self(), ?EVAL_MSG),
{noreply, State#state{timer_ref = Ref}};
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(?EVAL_MSG, #state{interval = Interval,
trigger_interval = TriggerInterval} = State) ->
Res = reconciliate_stream_membership(State),
NewTimeout = case Res of
noop ->
Interval;
_ ->
TriggerInterval
end,
Ref = erlang:send_after(NewTimeout, self(), ?EVAL_MSG),
{noreply, State#state{timer_ref = Ref}};
handle_info(_Info, #state{enabled = false} = State) ->
{noreply, State, hibernate};
handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

%%----------------------------------------------------------------------------
%% Internal functions
%%----------------------------------------------------------------------------

reconciliate_stream_membership(State) ->
LocalStreams = rabbit_amqqueue:list_local_stream_queues(),
LocalLeaders = lists:filter(fun(Q) ->
#{leader_node := LeaderNode} = amqqueue:get_type_state(Q),
LeaderNode =:= node()
end, LocalStreams),
ExpectedNodes = rabbit_nodes:list_members(),
Running = rabbit_nodes:list_running(),
reconciliate_stream_members(ExpectedNodes, Running, LocalLeaders, State, noop).

reconciliate_stream_members([], _Running, _, _State, Result) ->
%% if there are no expected nodes rabbit_nodes:list_running/0 encountered
%% an error during query and returned the empty list which is case we need
%% to handle
Result;
reconciliate_stream_members(_ExpectedNodes, _Running, [], _State, Result) ->
Result;
reconciliate_stream_members(ExpectedNodes, Running, [Q | LocalLeaders],
#state{target_group_size = TargetSize} = State,
OldResult) ->
Result =
maybe
#{name := _StreamId, nodes := MemberNodes, leader_node := LeaderNode} = amqqueue:get_type_state(Q),
%% Check if Leader is indeed this node
LeaderNode ?= node(),
%% And that this node is not in maintenance mode
true ?= not rabbit_maintenance:is_being_drained_local_read(node()),
DanglingNodes = MemberNodes -- ExpectedNodes,
case maybe_remove(DanglingNodes, State) of
false ->
maybe_add_member(Q, Running, MemberNodes, get_target_size(Q, TargetSize));
true ->
remove_members(Q, DanglingNodes)
end
else
_ ->
noop
end,
reconciliate_stream_members(ExpectedNodes, Running, LocalLeaders, State,
update_result(OldResult, Result)).

maybe_remove(_, #state{auto_remove = false}) ->
false;
maybe_remove([], #state{auto_remove = true}) ->
false;
maybe_remove(_Nodes, #state{auto_remove = true}) ->
true.

maybe_add_member(Q, Running, MemberNodes, TargetSize) ->
%% Filter out any new nodes under maintenance
New = rabbit_maintenance:filter_out_drained_nodes_local_read(Running -- MemberNodes),
case should_add_node(MemberNodes, New, TargetSize) of
true ->
%% In the future, sort the list of new nodes based on load,
%% availability zones etc
Node = select_node(New),
QName = #resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q),
case rabbit_stream_queue:add_replica(VHost, Name, Node) of
ok ->
?LOG_DEBUG(
"Added node ~ts as a replica to ~ts as "
"the streams target group size(#~w) is not met and "
"there are enough new nodes(#~w) in the cluster",
[Node, rabbit_misc:rs(QName), TargetSize, length(New)]);
{error, Err} ->
?LOG_WARNING(
"~ts: failed to add replica on node ~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err])
end,
ok;
false ->
noop
end.

should_add_node(MemberNodes, New, TargetSize) ->
CurrentSize = length(MemberNodes),
NumberOfNewNodes = length(New),
maybe
true ?= NumberOfNewNodes > 0, %% There are new nodes to grow to
true ?= CurrentSize < TargetSize, %% Target size not reached
true ?= rabbit_misc:is_even(CurrentSize) orelse NumberOfNewNodes > 1, %% Enough nodes to grow to odd member size
true ?= rabbit_nodes:is_running(lists:delete(node(), MemberNodes))
end.

get_target_size(Q, undefined) ->
get_target_size(Q);
get_target_size(Q, N) when N > 0 ->
max(N, get_target_size(Q)).

get_target_size(Q) ->
PolicyValue = case rabbit_policy:get(<<"target-group-size">>, Q) of
undefined ->
0;
PolicyN ->
PolicyN
end,
Arguments = amqqueue:get_arguments(Q),
case rabbit_misc:table_lookup(Arguments, <<"x-stream-target-group-size">>) of
undefined ->
PolicyValue;
ArgN ->
max(ArgN, PolicyValue)
end.

remove_members(_Q, []) ->
ok;
remove_members(Q, [Node | Nodes]) ->
QName = #resource{name = Name, virtual_host = VHost} = amqqueue:get_name(Q),
case rabbit_stream_queue:delete_replica(VHost, Name, Node) of
ok ->
QName = amqqueue:get_name(Q),
?LOG_DEBUG("~ts: Successfully removed replica on node ~w",
[rabbit_misc:rs(QName), Node]),
ok;
{error, Err} ->
QName = amqqueue:get_name(Q),
?LOG_DEBUG("~ts: failed to remove replica on node "
"~w, error: ~w",
[rabbit_misc:rs(QName), Node, Err])
end,
remove_members(Q, Nodes).


%% Make sure any non-noop result is stored.
update_result(noop, Result) ->
Result;
update_result(Result, noop) ->
Result;
update_result(Result, Result) ->
Result.

select_node([Node]) ->
Node;
select_node(Nodes) ->
lists:nth(rand:uniform(length(Nodes)), Nodes).
Loading
Loading