Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions deps/rabbit/src/amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
% exclusive_owner
get_exclusive_owner/1,
get_leader_node/1,
get_nodes/1,
% name (#resource)
get_name/1,
set_name/2,
Expand Down Expand Up @@ -425,15 +424,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader;
get_leader_node(#amqqueue{pid = none}) -> none;
get_leader_node(#amqqueue{pid = Pid}) -> node(Pid).

-spec get_nodes(amqqueue_v2()) -> [node(),...].

get_nodes(Q) ->
case amqqueue:get_type_state(Q) of
#{nodes := Nodes} ->
Nodes;
_ ->
[get_leader_node(Q)]
end.

% operator_policy

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqp_management.erl
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) ->
{Leader :: node() | none, Replicas :: [node(),...]}.
queue_topology(Q) ->
Leader = amqqueue:get_leader_node(Q),
Replicas = amqqueue:get_nodes(Q),
Replicas = rabbit_queue_type:get_nodes(Q),
{Leader, Replicas}.

decode_exchange({map, KVList}) ->
Expand Down
7 changes: 1 addition & 6 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2035,12 +2035,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable)
).

get_quorum_nodes(Q) ->
case amqqueue:get_type_state(Q) of
#{nodes := Nodes} ->
Nodes;
_ ->
[]
end.
rabbit_queue_type:get_nodes(Q).

-spec prepend_extra_bcc(Qs) ->
Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}].
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,10 @@
depends_on => ['rabbitmq_4.1.0'],
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
}}).

-rabbit_feature_flag(
{'track_qq_members_uids',
#{desc => "Track queue members UIDs in the metadata store",
stability => stable,
depends_on => []
}}).
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_queue_location.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) ->
Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]),
Queues = GetQueues(),
Counters = lists:foldl(fun(Q, Acc) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes = rabbit_queue_type:get_nodes(Q),
lists:foldl(fun(N, A)
when is_map_key(N, A) ->
maps:update_with(N, fun(C) -> C+1 end, A);
Expand Down
10 changes: 10 additions & 0 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
format/2,
remove/2,
info/2,
get_nodes/1,
state_info/1,
format_status/1,
info_down/2,
Expand Down Expand Up @@ -422,6 +423,15 @@ info(Q, Items) ->
Mod = amqqueue:get_type(Q),
Mod:info(Q, Items).

-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...].
get_nodes(Q) ->
case info(Q, [members]) of
[{members, Nodes}] ->
Nodes;
[] ->
[]
end.

fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) ->
maps:fold(Fun, Acc, Ctxs).

Expand Down
134 changes: 102 additions & 32 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,17 @@ start_cluster(Q) ->
{LeaderNode, FollowerNodes} =
rabbit_queue_location:select_leader_and_followers(Q, QuorumSize),
LeaderId = {RaName, LeaderNode},
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
|| Node <- [LeaderNode | FollowerNodes]]),
NewQ0 = amqqueue:set_pid(Q, LeaderId),
NewQ1 = amqqueue:set_type_state(NewQ0,
#{nodes => [LeaderNode | FollowerNodes]}),
NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
false ->
amqqueue:set_type_state(NewQ0,
#{nodes => [LeaderNode | FollowerNodes]});
true ->
amqqueue:set_type_state(NewQ0,
#{nodes => UIDs})
end,

Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
rabbit_fifo, version, [],
Expand Down Expand Up @@ -717,7 +725,7 @@ repair_amqqueue_nodes(Q0) ->
{Name, _} = amqqueue:get_pid(Q0),
Members = ra_leaderboard:lookup_members(Name),
RaNodes = [N || {_, N} <- Members],
#{nodes := Nodes} = amqqueue:get_type_state(Q0),
Nodes = get_nodes(Q0),
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
true ->
%% up to date
Expand All @@ -726,7 +734,16 @@ repair_amqqueue_nodes(Q0) ->
%% update amqqueue record
Fun = fun (Q) ->
TS0 = amqqueue:get_type_state(Q),
TS = TS0#{nodes => RaNodes},
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
false ->
TS0#{nodes => RaNodes};
true ->
RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of,
[?RA_SYSTEM, Name],
?RPC_TIMEOUT)}
|| N <- RaNodes]),
TS0#{nodes => RaUids}
end,
amqqueue:set_type_state(Q, TS)
end,
_ = rabbit_amqqueue:update(QName, Fun),
Expand Down Expand Up @@ -785,11 +802,28 @@ maybe_apply_policies(Q, #{config := CurrentConfig}) ->
{[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
recover(_Vhost, Queues) ->
lists:foldl(
fun (Q0, {R0, F0}) ->
{Name, _} = amqqueue:get_pid(Q0),
fun (Q, {R0, F0}) ->
{Name, _} = amqqueue:get_pid(Q),
ServerId = {Name, node()},
QName = amqqueue:get_name(Q0),
MutConf = make_mutable_config(Q0),
QName = amqqueue:get_name(Q),
MutConf = make_mutable_config(Q),
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
#{nodes := Nodes} = amqqueue:get_type_state(Q),
case Nodes of
List when is_list(List) ->
%% Queue is not aware of node to uid mapping, do nothing
ok;
#{node() := RaUId} ->
%% Queue is aware and uid for current node is correct, do
%% nothing
ok;
#{node() := _NewRaUId} ->
%% Queue is aware but it does not match the one returned by
%% ra_directory
rabbit_log:info("Quorum queue ~ts: detected node uuid change, "
"deleting old data directory", [rabbit_misc:rs(QName)]),
maybe_delete_data_dir(RaUId)
end,
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
ok ->
% queue was restarted, good
Expand All @@ -802,7 +836,7 @@ recover(_Vhost, Queues) ->
[rabbit_misc:rs(QName), Err1]),
% queue was never started on this node
% so needs to be started from scratch.
case start_server(make_ra_conf(Q0, ServerId)) of
case start_server(make_ra_conf(Q, ServerId)) of
ok -> ok;
Err2 ->
?LOG_WARNING("recover: quorum queue ~w could not"
Expand All @@ -824,8 +858,7 @@ recover(_Vhost, Queues) ->
%% present in the rabbit_queue table and not just in
%% rabbit_durable_queue
%% So many code paths are dependent on this.
ok = rabbit_db_queue:set_dirty(Q0),
Q = Q0,
ok = rabbit_db_queue:set_dirty(Q),
case Res of
ok ->
{[Q | R0], F0};
Expand Down Expand Up @@ -1208,12 +1241,17 @@ cleanup_data_dir() ->
maybe_delete_data_dir(UId) ->
_ = ra_directory:unregister_name(?RA_SYSTEM, UId),
Dir = ra_env:server_data_dir(?RA_SYSTEM, UId),
{ok, Config} = ra_log:read_config(Dir),
case maps:get(machine, Config) of
{module, rabbit_fifo, _} ->
ra_lib:recursive_delete(Dir);
_ ->
ok
case filelib:is_dir(Dir) of
false ->
ok;
true ->
{ok, Config} = ra_log:read_config(Dir),
case maps:get(machine, Config) of
{module, rabbit_fifo, _} ->
ra_lib:recursive_delete(Dir);
_ ->
ok
end
end.

policy_changed(Q) ->
Expand Down Expand Up @@ -1378,16 +1416,29 @@ add_member(Q, Node, Membership) ->
do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT).


do_add_member(Q, Node, Membership, Timeout)
when ?is_amqqueue(Q) andalso
?amqqueue_is_quorum(Q) andalso
do_add_member(Q0, Node, Membership, Timeout)
when ?is_amqqueue(Q0) andalso
?amqqueue_is_quorum(Q0) andalso
is_atom(Node) ->
{RaName, _} = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
{RaName, _} = amqqueue:get_pid(Q0),
QName = amqqueue:get_name(Q0),
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
ServerId = {RaName, Node},
Members = members(Q),

Members = members(Q0),
QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0),
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
QTypeState = case Nodes of
L when is_list(L) ->
%% Queue is not aware of node to uid mapping, just add the new node
QTypeState0#{nodes => lists:usort([Node | Nodes])};
#{Node := _} ->
%% Queue is aware and uid for targeted node exists, do nothing
QTypeState0;
_ ->
%% Queue is aware but current node has no UId, regen uid
QTypeState0#{nodes => Nodes#{Node => NewRaUId}}
end,
Q = amqqueue:set_type_state(Q0, QTypeState),
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
case ra:start_server(?RA_SYSTEM, Conf) of
Expand All @@ -1403,8 +1454,12 @@ do_add_member(Q, Node, Membership, Timeout)
{ok, {RaIndex, RaTerm}, Leader} ->
Fun = fun(Q1) ->
Q2 = update_type_state(
Q1, fun(#{nodes := Nodes} = Ts) ->
Ts#{nodes => lists:usort([Node | Nodes])}
Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) ->
Ts#{nodes => lists:usort([Node | NodesList])};
(#{nodes := #{Node := _}} = Ts) ->
Ts;
(#{nodes := NodesMap} = Ts) when is_map(NodesMap) ->
Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)}
end),
amqqueue:set_pid(Q2, Leader)
end,
Expand Down Expand Up @@ -1477,8 +1532,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
Fun = fun(Q1) ->
update_type_state(
Q1,
fun(#{nodes := Nodes} = Ts) ->
Ts#{nodes => lists:delete(Node, Nodes)}
fun(#{nodes := Nodes} = Ts) when is_list(Nodes) ->
Ts#{nodes => lists:delete(Node, Nodes)};
(#{nodes := Nodes} = Ts) when is_map(Nodes) ->
Ts#{nodes => maps:remove(Node, Nodes)}
end)
end,
_ = rabbit_amqqueue:update(QName, Fun),
Expand Down Expand Up @@ -1999,7 +2056,15 @@ make_ra_conf(Q, ServerId, TickTimeout,
#resource{name = QNameBin} = QName,
RaMachine = ra_machine(Q),
[{ClusterName, _} | _] = Members = members(Q),
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
{_, Node} = ServerId,
UId = case amqqueue:get_type_state(Q) of
#{nodes := #{Node := Id}} ->
Id;
_ ->
%% Queue was declared on an older version of RabbitMQ
%% or does not have the node to uid mappings
ra:new_uid(ra_lib:to_binary(ClusterName))
end,
FName = rabbit_misc:rs(QName),
Formatter = {?MODULE, format_ra_event, [QName]},
LogCfg = #{uid => UId,
Expand Down Expand Up @@ -2031,7 +2096,12 @@ make_mutable_config(Q) ->

get_nodes(Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes.
case Nodes of
List when is_list(List) ->
List;
Map when is_map(Map) ->
maps:keys(Map)
end.

get_connected_nodes(Q) when ?is_amqqueue(Q) ->
ErlangNodes = [node() | nodes()],
Expand Down Expand Up @@ -2138,7 +2208,7 @@ force_checkpoint_on_queue(QName) ->
{ok, Q} when ?amqqueue_is_quorum(Q) ->
{RaName, _} = amqqueue:get_pid(Q),
?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]),
Nodes = amqqueue:get_nodes(Q),
Nodes = rabbit_queue_type:get_nodes(Q),
_ = [ra:cast_aux_command({RaName, Node}, force_checkpoint)
|| Node <- Nodes],
ok;
Expand Down Expand Up @@ -2333,7 +2403,7 @@ transfer_leadership(_CandidateNodes) ->
%% wait for leader elections before processing next chunk of queues
[begin
{RaName, LeaderNode} = amqqueue:get_pid(Q),
MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)),
MemberNodes = lists:delete(LeaderNode, rabbit_queue_type:get_nodes(Q)),
%% we don't do any explicit error handling here as it is more
%% important to make progress
_ = lists:any(fun (N) ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ stop() ->

new_stream(Q, LeaderNode)
when ?is_amqqueue(Q) andalso is_atom(LeaderNode) ->
#{name := StreamId,
nodes := Nodes} = amqqueue:get_type_state(Q),
#{name := StreamId} = amqqueue:get_type_state(Q),
Nodes = rabbit_queue_type:get_nodes(Q),
%% assertion leader is in nodes configuration
true = lists:member(LeaderNode, Nodes),
process_command({new_stream, StreamId,
Expand Down
Loading
Loading