diff --git a/deps/rabbit/src/amqqueue.erl b/deps/rabbit/src/amqqueue.erl index 3c958d90fea0..9386c104754d 100644 --- a/deps/rabbit/src/amqqueue.erl +++ b/deps/rabbit/src/amqqueue.erl @@ -30,7 +30,6 @@ % exclusive_owner get_exclusive_owner/1, get_leader_node/1, - get_nodes/1, % name (#resource) get_name/1, set_name/2, @@ -425,15 +424,6 @@ get_leader_node(#amqqueue{pid = {_, Leader}}) -> Leader; get_leader_node(#amqqueue{pid = none}) -> none; get_leader_node(#amqqueue{pid = Pid}) -> node(Pid). --spec get_nodes(amqqueue_v2()) -> [node(),...]. - -get_nodes(Q) -> - case amqqueue:get_type_state(Q) of - #{nodes := Nodes} -> - Nodes; - _ -> - [get_leader_node(Q)] - end. % operator_policy diff --git a/deps/rabbit/src/rabbit_amqp_management.erl b/deps/rabbit/src/rabbit_amqp_management.erl index eb0178fe5352..a21cc5e4e350 100644 --- a/deps/rabbit/src/rabbit_amqp_management.erl +++ b/deps/rabbit/src/rabbit_amqp_management.erl @@ -471,7 +471,7 @@ encode_queue(Q, NumMsgs, NumConsumers) -> {Leader :: node() | none, Replicas :: [node(),...]}. queue_topology(Q) -> Leader = amqqueue:get_leader_node(Q), - Replicas = amqqueue:get_nodes(Q), + Replicas = rabbit_queue_type:get_nodes(Q), {Leader, Replicas}. decode_exchange({map, KVList}) -> diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 36f6b63966df..ee877f8ad154 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -2035,12 +2035,7 @@ pseudo_queue(#resource{kind = queue} = QueueName, Pid, Durable) ). get_quorum_nodes(Q) -> - case amqqueue:get_type_state(Q) of - #{nodes := Nodes} -> - Nodes; - _ -> - [] - end. + rabbit_queue_type:get_nodes(Q). -spec prepend_extra_bcc(Qs) -> Qs when Qs :: [amqqueue:target() | {amqqueue:target(), route_infos()}]. diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index 7f27f9f5544e..c724f570e930 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -219,3 +219,10 @@ depends_on => ['rabbitmq_4.1.0'], callbacks => #{enable => {rabbit_khepri, enable_feature_flag}} }}). + +-rabbit_feature_flag( + {'track_qq_members_uids', + #{desc => "Track queue members UIDs in the metadata store", + stability => stable, + depends_on => [] + }}). diff --git a/deps/rabbit/src/rabbit_queue_location.erl b/deps/rabbit/src/rabbit_queue_location.erl index 0f204f97347e..f6a5c55e4dd6 100644 --- a/deps/rabbit/src/rabbit_queue_location.erl +++ b/deps/rabbit/src/rabbit_queue_location.erl @@ -143,7 +143,7 @@ select_members(Size, _, AllNodes, RunningNodes, _, _, GetQueues) -> Counters0 = maps:from_list([{N, 0} || N <- lists:delete(?MODULE:node(), AllNodes)]), Queues = GetQueues(), Counters = lists:foldl(fun(Q, Acc) -> - #{nodes := Nodes} = amqqueue:get_type_state(Q), + Nodes = rabbit_queue_type:get_nodes(Q), lists:foldl(fun(N, A) when is_map_key(N, A) -> maps:update_with(N, fun(C) -> C+1 end, A); diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index 2a115c54552d..4101453cf660 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -38,6 +38,7 @@ format/2, remove/2, info/2, + get_nodes/1, state_info/1, format_status/1, info_down/2, @@ -422,6 +423,15 @@ info(Q, Items) -> Mod = amqqueue:get_type(Q), Mod:info(Q, Items). +-spec get_nodes(amqqueue:amqqueue_v2()) -> [node(),...]. +get_nodes(Q) -> + case info(Q, [members]) of + [{members, Nodes}] -> + Nodes; + [] -> + [] + end. + fold_state(Fun, Acc, #?STATE{ctxs = Ctxs}) -> maps:fold(Fun, Acc, Ctxs). diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 601144d9076f..01fc4f3441b5 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -270,9 +270,17 @@ start_cluster(Q) -> {LeaderNode, FollowerNodes} = rabbit_queue_location:select_leader_and_followers(Q, QuorumSize), LeaderId = {RaName, LeaderNode}, + UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))} + || Node <- [LeaderNode | FollowerNodes]]), NewQ0 = amqqueue:set_pid(Q, LeaderId), - NewQ1 = amqqueue:set_type_state(NewQ0, - #{nodes => [LeaderNode | FollowerNodes]}), + NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of + false -> + amqqueue:set_type_state(NewQ0, + #{nodes => [LeaderNode | FollowerNodes]}); + true -> + amqqueue:set_type_state(NewQ0, + #{nodes => UIDs}) + end, Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes, rabbit_fifo, version, [], @@ -717,7 +725,7 @@ repair_amqqueue_nodes(Q0) -> {Name, _} = amqqueue:get_pid(Q0), Members = ra_leaderboard:lookup_members(Name), RaNodes = [N || {_, N} <- Members], - #{nodes := Nodes} = amqqueue:get_type_state(Q0), + Nodes = get_nodes(Q0), case lists:sort(RaNodes) =:= lists:sort(Nodes) of true -> %% up to date @@ -726,7 +734,16 @@ repair_amqqueue_nodes(Q0) -> %% update amqqueue record Fun = fun (Q) -> TS0 = amqqueue:get_type_state(Q), - TS = TS0#{nodes => RaNodes}, + TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of + false -> + TS0#{nodes => RaNodes}; + true -> + RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of, + [?RA_SYSTEM, Name], + ?RPC_TIMEOUT)} + || N <- RaNodes]), + TS0#{nodes => RaUids} + end, amqqueue:set_type_state(Q, TS) end, _ = rabbit_amqqueue:update(QName, Fun), @@ -785,11 +802,28 @@ maybe_apply_policies(Q, #{config := CurrentConfig}) -> {[amqqueue:amqqueue()], [amqqueue:amqqueue()]}. recover(_Vhost, Queues) -> lists:foldl( - fun (Q0, {R0, F0}) -> - {Name, _} = amqqueue:get_pid(Q0), + fun (Q, {R0, F0}) -> + {Name, _} = amqqueue:get_pid(Q), ServerId = {Name, node()}, - QName = amqqueue:get_name(Q0), - MutConf = make_mutable_config(Q0), + QName = amqqueue:get_name(Q), + MutConf = make_mutable_config(Q), + RaUId = ra_directory:uid_of(?RA_SYSTEM, Name), + #{nodes := Nodes} = amqqueue:get_type_state(Q), + case Nodes of + List when is_list(List) -> + %% Queue is not aware of node to uid mapping, do nothing + ok; + #{node() := RaUId} -> + %% Queue is aware and uid for current node is correct, do + %% nothing + ok; + #{node() := _NewRaUId} -> + %% Queue is aware but it does not match the one returned by + %% ra_directory + rabbit_log:info("Quorum queue ~ts: detected node uuid change, " + "deleting old data directory", [rabbit_misc:rs(QName)]), + maybe_delete_data_dir(RaUId) + end, Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of ok -> % queue was restarted, good @@ -802,7 +836,7 @@ recover(_Vhost, Queues) -> [rabbit_misc:rs(QName), Err1]), % queue was never started on this node % so needs to be started from scratch. - case start_server(make_ra_conf(Q0, ServerId)) of + case start_server(make_ra_conf(Q, ServerId)) of ok -> ok; Err2 -> ?LOG_WARNING("recover: quorum queue ~w could not" @@ -824,8 +858,7 @@ recover(_Vhost, Queues) -> %% present in the rabbit_queue table and not just in %% rabbit_durable_queue %% So many code paths are dependent on this. - ok = rabbit_db_queue:set_dirty(Q0), - Q = Q0, + ok = rabbit_db_queue:set_dirty(Q), case Res of ok -> {[Q | R0], F0}; @@ -1208,12 +1241,17 @@ cleanup_data_dir() -> maybe_delete_data_dir(UId) -> _ = ra_directory:unregister_name(?RA_SYSTEM, UId), Dir = ra_env:server_data_dir(?RA_SYSTEM, UId), - {ok, Config} = ra_log:read_config(Dir), - case maps:get(machine, Config) of - {module, rabbit_fifo, _} -> - ra_lib:recursive_delete(Dir); - _ -> - ok + case filelib:is_dir(Dir) of + false -> + ok; + true -> + {ok, Config} = ra_log:read_config(Dir), + case maps:get(machine, Config) of + {module, rabbit_fifo, _} -> + ra_lib:recursive_delete(Dir); + _ -> + ok + end end. policy_changed(Q) -> @@ -1378,16 +1416,29 @@ add_member(Q, Node, Membership) -> do_add_member(Q, Node, Membership, ?MEMBER_CHANGE_TIMEOUT). -do_add_member(Q, Node, Membership, Timeout) - when ?is_amqqueue(Q) andalso - ?amqqueue_is_quorum(Q) andalso +do_add_member(Q0, Node, Membership, Timeout) + when ?is_amqqueue(Q0) andalso + ?amqqueue_is_quorum(Q0) andalso is_atom(Node) -> - {RaName, _} = amqqueue:get_pid(Q), - QName = amqqueue:get_name(Q), + {RaName, _} = amqqueue:get_pid(Q0), + QName = amqqueue:get_name(Q0), %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, - Members = members(Q), - + Members = members(Q0), + QTypeState0 = #{nodes := Nodes} = amqqueue:get_type_state(Q0), + NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)), + QTypeState = case Nodes of + L when is_list(L) -> + %% Queue is not aware of node to uid mapping, just add the new node + QTypeState0#{nodes => lists:usort([Node | Nodes])}; + #{Node := _} -> + %% Queue is aware and uid for targeted node exists, do nothing + QTypeState0; + _ -> + %% Queue is aware but current node has no UId, regen uid + QTypeState0#{nodes => Nodes#{Node => NewRaUId}} + end, + Q = amqqueue:set_type_state(Q0, QTypeState), MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity), Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion), case ra:start_server(?RA_SYSTEM, Conf) of @@ -1403,8 +1454,12 @@ do_add_member(Q, Node, Membership, Timeout) {ok, {RaIndex, RaTerm}, Leader} -> Fun = fun(Q1) -> Q2 = update_type_state( - Q1, fun(#{nodes := Nodes} = Ts) -> - Ts#{nodes => lists:usort([Node | Nodes])} + Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) -> + Ts#{nodes => lists:usort([Node | NodesList])}; + (#{nodes := #{Node := _}} = Ts) -> + Ts; + (#{nodes := NodesMap} = Ts) when is_map(NodesMap) -> + Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)} end), amqqueue:set_pid(Q2, Leader) end, @@ -1477,8 +1532,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> Fun = fun(Q1) -> update_type_state( Q1, - fun(#{nodes := Nodes} = Ts) -> - Ts#{nodes => lists:delete(Node, Nodes)} + fun(#{nodes := Nodes} = Ts) when is_list(Nodes) -> + Ts#{nodes => lists:delete(Node, Nodes)}; + (#{nodes := Nodes} = Ts) when is_map(Nodes) -> + Ts#{nodes => maps:remove(Node, Nodes)} end) end, _ = rabbit_amqqueue:update(QName, Fun), @@ -1999,7 +2056,15 @@ make_ra_conf(Q, ServerId, TickTimeout, #resource{name = QNameBin} = QName, RaMachine = ra_machine(Q), [{ClusterName, _} | _] = Members = members(Q), - UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + {_, Node} = ServerId, + UId = case amqqueue:get_type_state(Q) of + #{nodes := #{Node := Id}} -> + Id; + _ -> + %% Queue was declared on an older version of RabbitMQ + %% or does not have the node to uid mappings + ra:new_uid(ra_lib:to_binary(ClusterName)) + end, FName = rabbit_misc:rs(QName), Formatter = {?MODULE, format_ra_event, [QName]}, LogCfg = #{uid => UId, @@ -2031,7 +2096,12 @@ make_mutable_config(Q) -> get_nodes(Q) when ?is_amqqueue(Q) -> #{nodes := Nodes} = amqqueue:get_type_state(Q), - Nodes. + case Nodes of + List when is_list(List) -> + List; + Map when is_map(Map) -> + maps:keys(Map) + end. get_connected_nodes(Q) when ?is_amqqueue(Q) -> ErlangNodes = [node() | nodes()], @@ -2138,7 +2208,7 @@ force_checkpoint_on_queue(QName) -> {ok, Q} when ?amqqueue_is_quorum(Q) -> {RaName, _} = amqqueue:get_pid(Q), ?LOG_DEBUG("Sending command to force ~ts to take a checkpoint", [QNameFmt]), - Nodes = amqqueue:get_nodes(Q), + Nodes = rabbit_queue_type:get_nodes(Q), _ = [ra:cast_aux_command({RaName, Node}, force_checkpoint) || Node <- Nodes], ok; @@ -2333,7 +2403,7 @@ transfer_leadership(_CandidateNodes) -> %% wait for leader elections before processing next chunk of queues [begin {RaName, LeaderNode} = amqqueue:get_pid(Q), - MemberNodes = lists:delete(LeaderNode, amqqueue:get_nodes(Q)), + MemberNodes = lists:delete(LeaderNode, rabbit_queue_type:get_nodes(Q)), %% we don't do any explicit error handling here as it is more %% important to make progress _ = lists:any(fun (N) -> diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index cb01bfadf6d0..3d7f768033ce 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -153,8 +153,8 @@ stop() -> new_stream(Q, LeaderNode) when ?is_amqqueue(Q) andalso is_atom(LeaderNode) -> - #{name := StreamId, - nodes := Nodes} = amqqueue:get_type_state(Q), + #{name := StreamId} = amqqueue:get_type_state(Q), + Nodes = rabbit_queue_type:get_nodes(Q), %% assertion leader is in nodes configuration true = lists:member(LeaderNode, Nodes), process_command({new_stream, StreamId, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 1a3fed31227a..ad49d59f2a59 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -105,7 +105,9 @@ groups() -> force_checkpoint, policy_repair, gh_12635, - replica_states + replica_states, + restart_after_queue_reincarnation, + no_messages_after_queue_reincarnation ] ++ all_tests()}, {cluster_size_5, [], [start_queue, @@ -2795,15 +2797,21 @@ add_member_wrong_type(Config) -> [<<"/">>, SQ, Server, voter, 5000])). add_member_already_a_member(Config) -> - [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server, Server2 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + R1 = rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}]), %% idempotent by design ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server, voter, 5000])). + [<<"/">>, QQ, Server, voter, 5000])), + ?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])), + ?assertEqual(ok, + rpc:call(Server, rabbit_quorum_queue, add_member, + [<<"/">>, QQ, Server2, voter, 5000])), + ?assertEqual(R1, rpc:call(Server, rabbit_amqqueue, lookup, [{resource, <<"/">>, queue, QQ}])). add_member_not_found(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -2847,10 +2855,14 @@ add_member_2(Config) -> {<<"x-quorum-initial-group-size">>, long, 1}])), ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member, [<<"/">>, QQ, Server0, 5000])), - Info = rpc:call(Server0, rabbit_quorum_queue, infos, - [rabbit_misc:r(<<"/">>, queue, QQ)]), + #{online := Onlines} = ?awaitMatch(#{online := [_One, _Two]}, + maps:from_list(rpc:call(Server0, + rabbit_quorum_queue, + infos, + [rabbit_misc:r(<<"/">>, queue, QQ)])), + 3000), Servers = lists:sort([Server0, Server1]), - ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))). + ?assertEqual(Servers, lists:sort(Onlines)). delete_member_not_running(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -4920,6 +4932,155 @@ replica_states(Config) -> end end, Result2). +% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/discussions/13131 +restart_after_queue_reincarnation(Config) -> + case rabbit_ct_helpers:is_mixed_versions() of + true -> + {skip, "queue reincarnation protection can't work on mixed mode"}; + false -> + restart_after_queue_reincarnation_(Config) + end. + +restart_after_queue_reincarnation_(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + QName = <<"QQ">>, + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + VHost = amqqueue:get_vhost(Q), + + MessagesPublished = 1000, + publish_many(Ch, QName, MessagesPublished), + + %% Trigger a snapshot by purging the queue. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_queue_type, purge, [Q]), + + %% Stop S3 + rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3), + ?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)), + + %% Delete and re-declare queue with the same name. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + % Now S3 should have the old queue state, and S1 and S2 a new one. + St1 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + Status0 = [{proplists:get_value(<<"Node Name">>, S), S} || S <- St1], + S3_Status1 = proplists:get_value(S3, Status0), + Others_Status1 = [V || {_K, V} <- proplists:delete(S3, Status0)], + + S3_LastLogIndex = proplists:get_value(<<"Last Log Index">>, S3_Status1), + S3_LastWritten = proplists:get_value(<<"Last Written">>, S3_Status1), + S3_LastApplied = proplists:get_value(<<"Last Applied">>, S3_Status1), + S3_CommitIndex = proplists:get_value(<<"Commit Index">>, S3_Status1), + S3_Term = proplists:get_value(<<"Term">>, S3_Status1), + + ?assertEqual(noproc, proplists:get_value(<<"Raft State">>, S3_Status1)), + ?assertEqual(unknown, proplists:get_value(<<"Membership">>, S3_Status1)), + [begin + ?assert(S3_LastLogIndex > proplists:get_value(<<"Last Log Index">>, O)), + ?assert(S3_LastWritten > proplists:get_value(<<"Last Written">>, O)), + ?assert(S3_LastApplied > proplists:get_value(<<"Last Applied">>, O)), + ?assert(S3_CommitIndex > proplists:get_value(<<"Commit Index">>, O)), + ?assertEqual(S3_Term, proplists:get_value(<<"Term">>, O)) + end || O <- Others_Status1], + + %% Bumping term in online nodes + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]), + + %% Restart S3 + ?assertEqual(ok, rabbit_control_helper:command(start_app, S3)), + + ?awaitMatch(true, begin + %% Now all three nodes should have the new state. + % They are either leader or follower. + Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + lists:all( + fun(NodeStatus) -> + NodeRaftState = proplists:get_value(<<"Raft State">>, NodeStatus), + lists:member(NodeRaftState, [leader, follower]) + end, Status2) + end, ?DEFAULT_AWAIT), + ?awaitMatch(true, begin + Status2 = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, status, [VHost, QName]), + + % Remove "Node Name" and "Raft State" from the status. + Status3 = [NE1, NE2, NE3]= [ + begin + R = proplists:delete(<<"Node Name">>, NodeEntry), + proplists:delete(<<"Raft State">>, R) + end || NodeEntry <- Status2], + % Check all other properties have same value on all nodes. + ct:pal("Status3: ~tp", [Status3]), + lists:all(fun({A, B}) -> A == B end, [ {V, proplists:get_value(K, NE2)} || {K, V} <- NE1]) andalso + lists:all(fun({A, B}) -> A == B end, [ {V, proplists:get_value(K, NE3)} || {K, V} <- NE1]) + end, ?DEFAULT_AWAIT). + +% Testcase motivated by : https://github.com/rabbitmq/rabbitmq-server/issues/12366 +no_messages_after_queue_reincarnation(Config) -> + case rabbit_ct_helpers:is_mixed_versions() of + true -> + {skip, "queue reincarnation protection can't work on mixed mode"}; + false -> + no_messages_after_queue_reincarnation_(Config) + end. + +no_messages_after_queue_reincarnation_(Config) -> + [S1, S2, S3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, S1), + QName = <<"QQ">>, + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + [Q] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, list, []), + + publish(Ch, QName, <<"msg1">>), + publish(Ch, QName, <<"msg2">>), + + %% Stop S3 + rabbit_ct_broker_helpers:mark_as_being_drained(Config, S3), + ?assertEqual(ok, rabbit_control_helper:command(stop_app, S3)), + + qos(Ch, 1, false), + subscribe(Ch, QName, false, <<"tag0">>, [], 500), + DeliveryTag = receive + {#'basic.deliver'{delivery_tag = DT}, #amqp_msg{}} -> + receive + {#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} -> + ct:fail("did not expect the second one") + after 500 -> + DT + end + after 500 -> + ct:fail("Expected some delivery, but got none") + end, + + %% Delete and re-declare queue with the same name. + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, delete, [Q,false,false,<<"dummy_user">>]), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Bumping term in online nodes + rabbit_ct_broker_helpers:rpc(Config, 1, rabbit_quorum_queue, transfer_leadership, [Q, S2]), + + %% Restart S3 + ?assertEqual(ok, rabbit_control_helper:command(start_app, S3)), + + ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + %% No message should be delivered after reincarnation + receive + {#'basic.deliver'{consumer_tag = <<"tag0">>}, #amqp_msg{}} -> + ct:fail("Expected no deliveries, but got one") + after 500 -> + ok + end. + %%---------------------------------------------------------------------------- same_elements(L1, L2) @@ -4989,7 +5150,10 @@ consume_empty(Ch, Queue, NoAck) -> subscribe(Ch, Queue, NoAck) -> subscribe(Ch, Queue, NoAck, <<"ctag">>, []). + subscribe(Ch, Queue, NoAck, Tag, Args) -> + subscribe(Ch, Queue, NoAck, Tag, Args, ?TIMEOUT). +subscribe(Ch, Queue, NoAck, Tag, Args, Timeout) -> amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, no_ack = NoAck, arguments = Args, @@ -4998,7 +5162,7 @@ subscribe(Ch, Queue, NoAck, Tag, Args) -> receive #'basic.consume_ok'{consumer_tag = Tag} -> ok - after ?TIMEOUT -> + after Timeout -> flush(100), exit(subscribe_timeout) end. diff --git a/deps/rabbitmq_ct_helpers/src/queue_utils.erl b/deps/rabbitmq_ct_helpers/src/queue_utils.erl index d2c69792fde0..78f1991a8aa2 100644 --- a/deps/rabbitmq_ct_helpers/src/queue_utils.erl +++ b/deps/rabbitmq_ct_helpers/src/queue_utils.erl @@ -208,7 +208,7 @@ assert_number_of_replicas(Config, Server, VHost, QQ, Count) -> begin {ok, Q} = rabbit_ct_broker_helpers:rpc( Config, Server, rabbit_amqqueue, lookup, [QQ, VHost]), - #{nodes := Nodes} = amqqueue:get_type_state(Q), + Nodes = rabbit_queue_type:get_nodes(Q), length(Nodes) end, 30000). diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 7f6ed70d56dc..25dabd69421b 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -451,8 +451,7 @@ emit_queue_info(Prefix, VHostsFilter, Callback) -> true -> Acc; false -> Type = amqqueue:get_type(Q), - TypeState = amqqueue:get_type_state(Q), - Members = maps:get(nodes, TypeState, []), + Members = rabbit_queue_type:get_nodes(Q), case membership(amqqueue:get_pid(Q), Members) of not_a_member -> Acc;