Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 41 additions & 19 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -905,16 +905,32 @@ ack(AckTags, ChPid, State) ->
State1#q{backing_queue_state = BQS1}
end).

requeue(AckTags, ChPid, State) ->
requeue(AckTags, DelFailed, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, false, State1) end).
fun (State1) -> requeue_and_run(AckTags, DelFailed, false, State1) end).

discard(AckTags, DelFailed, ChPid, State) ->
with_dlx(
State#q.dlx,
fun (X) -> subtract_acks(ChPid, AckTags, State,
fun (State1) ->
dead_letter_rejected_msgs(
AckTags, DelFailed, X, State1)
end) end,
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
disabled, length(AckTags)),
ack(AckTags, ChPid, State) end).

requeue_and_run(AckTags, ActiveConsumersChanged, State) ->
requeue_and_run(AckTags, true, ActiveConsumersChanged, State).

requeue_and_run(AckTags,
DelFailed,
ActiveConsumersChanged,
#q{backing_queue = BQ,
backing_queue_state = BQS0} = State0) ->
WasEmpty = BQ:is_empty(BQS0),
{_MsgIds, BQS} = BQ:requeue(AckTags, BQS0),
{_MsgIds, BQS} = BQ:requeue(AckTags, DelFailed, BQS0),
State1 = State0#q{backing_queue_state = BQS},
{_Dropped, State2} = maybe_drop_head(State1),
State3 = drop_expired_msgs(State2),
Expand Down Expand Up @@ -1079,11 +1095,11 @@ dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
end, expired, X, State).

dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
dead_letter_rejected_msgs(AckTags, DelFailed, X, State = #q{backing_queue = BQ}) ->
{ok, State1} =
dead_letter_msgs(
fun (DLFun, Acc, BQS) ->
{Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags),
{Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags, DelFailed),
{ok, Acc1, BQS1}
end, rejected, X, State),
State1.
Expand Down Expand Up @@ -1258,7 +1274,8 @@ prioritise_cast(Msg, _Len, State) ->
delete_immediately -> 8;
{delete_exclusive, _Pid} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{ack, _AckTags, _ChPid} -> 4; %% [1]
{ack, _AckTags, _ChPid} -> 4; %% [1] %% @todo Remove when 'rabbitmq_4.3.0' FF is required.
{complete, _AckTags, _ChPid} -> 4; %% [1]
{resume, _ChPid} -> 3;
{notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2);
_ -> 0
Expand Down Expand Up @@ -1527,23 +1544,28 @@ handle_cast({deliver,
State1 = State#q{senders = Senders1},
noreply(maybe_deliver_or_enqueue(Delivery, Delivered, State1));

%% Compat for RabbitMQ 4.2. @todo Remove when 'rabbitmq_4.3.0' FF is required.
handle_cast({ack, AckTags, ChPid}, State) ->
handle_cast({complete, AckTags, ChPid}, State);
handle_cast({reject, true, AckTags, ChPid}, State) ->
handle_cast({requeue, AckTags, ChPid}, State);
handle_cast({reject, false, AckTags, ChPid}, State) ->
handle_cast({discard, AckTags, ChPid}, State);

handle_cast({complete, AckTags, ChPid}, State) ->
noreply(ack(AckTags, ChPid, State));

handle_cast({reject, true, AckTags, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
handle_cast({requeue, AckTags, ChPid}, State) ->
noreply(requeue(AckTags, false, ChPid, State));

handle_cast({reject, false, AckTags, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
fun (X) -> subtract_acks(ChPid, AckTags, State,
fun (State1) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
end) end,
fun () -> rabbit_global_counters:messages_dead_lettered(rejected, rabbit_classic_queue,
disabled, length(AckTags)),
ack(AckTags, ChPid, State) end));
handle_cast({discard, AckTags, ChPid}, State) ->
noreply(discard(AckTags, true, ChPid, State));

handle_cast({modify, AckTags, DelFailed, false, _Anns, ChPid}, State) ->
noreply(requeue(AckTags, DelFailed, ChPid, State));

handle_cast({modify, AckTags, DelFailed, true, _Anns, ChPid}, State) ->
noreply(discard(AckTags, DelFailed, ChPid, State));

handle_cast({delete_exclusive, ConnPid}, State) ->
log_delete_exclusive(ConnPid, State),
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@

%% Reinsert messages into the queue which have already been delivered
%% and were pending acknowledgement.
-callback requeue([ack()], state()) -> {msg_ids(), state()}.
-callback requeue([ack()], boolean(), state()) -> {msg_ids(), state()}.

%% Fold over messages by ack tag. The supplied function is called with
%% each message, its ack tag, and an accumulator.
-callback ackfold(msg_fun(A), A, state(), [ack()]) -> {A, state()}.
-callback ackfold(msg_fun(A), A, state(), [ack()], boolean()) -> {A, state()}.

%% How long is my queue?
-callback len(state()) -> non_neg_integer().
Expand Down
23 changes: 20 additions & 3 deletions deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -333,16 +333,31 @@ cancel(Q, Spec, State) ->
-spec settle(rabbit_amqqueue:name(), rabbit_queue_type:settle_op(),
rabbit_types:ctag(), [non_neg_integer()], state()) ->
{state(), rabbit_queue_type:actions()}.
settle(QName, {modify, _DelFailed, Undel, _Anns}, CTag, MsgIds, State) ->
settle(QName, Op, CTag, MsgIds, State) ->
case rabbit_feature_flags:is_enabled('rabbitmq_4.3.0') of
true -> settle_43(QName, Op, CTag, MsgIds, State);
false -> settle_compat(QName, Op, CTag, MsgIds, State)
end.

settle_43(_QName, {modify, DelFailed, Undel, Anns}, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
Arg = {modify, MsgIds, DelFailed, Undel, Anns, self()},
delegate:invoke_no_result(Pid, {gen_server2, cast, [Arg]}),
{State, []};
settle_43(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
Arg = {Op, MsgIds, self()},
delegate:invoke_no_result(Pid, {gen_server2, cast, [Arg]}),
{State, []}.

settle_compat(QName, {modify, _DelFailed, Undel, _Anns}, CTag, MsgIds, State) ->
%% translate modify into other op
Op = case Undel of
true ->
discard;
false ->
requeue
end,
settle(QName, Op, CTag, MsgIds, State);
settle(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
settle_compat(QName, Op, CTag, MsgIds, State);
settle_compat(_QName, Op, _CTag, MsgIds, State = #?STATE{pid = Pid}) ->
Arg = case Op of
complete ->
{ack, MsgIds, self()};
Expand Down Expand Up @@ -430,6 +445,8 @@ supports_stateful_delivery() -> true.
deliver(Qs0, Msg0, Options) ->
%% add guid to content here instead of in rabbit_basic:message/3,
%% as classic queues are the only ones that need it
%% @todo Do we need to regenerate it for every time it gets dead lettered?
%% We can likely do better and avoid rewriting to the shared message store.
Msg = mc:prepare(store, mc:set_annotation(id, rabbit_guid:gen(), Msg0)),
Mandatory = maps:get(mandatory, Options, false),
MsgSeqNo = maps:get(correlation, Options, undefined),
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,10 @@
depends_on => ['rabbitmq_4.1.0'],
callbacks => #{enable => {rabbit_khepri, enable_feature_flag}}
}}).

-rabbit_feature_flag(
{'rabbitmq_4.3.0',
#{desc => "Allows rolling upgrades to 4.3.x",
stability => stable,
depends_on => ['rabbitmq_4.2.0']
}}).
20 changes: 10 additions & 10 deletions deps/rabbit/src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, len/1, is_empty/1, depth/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/3,
ackfold/5, len/1, is_empty/1, depth/1,
update_rates/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2,
Expand Down Expand Up @@ -279,27 +279,27 @@ ack(AckTags, State = #state{bq = BQ}) ->
ack(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(ack(AckTags, BQS)).

requeue(AckTags, State = #state{bq = BQ}) ->
requeue(AckTags, DelFailed, State = #state{bq = BQ}) ->
fold_by_acktags2(fun (AckTagsN, BQSN) ->
BQ:requeue(AckTagsN, BQSN)
BQ:requeue(AckTagsN, DelFailed, BQSN)
end, AckTags, State);
requeue(AckTags, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(requeue(AckTags, BQS)).
requeue(AckTags, DelFailed, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(requeue(AckTags, DelFailed, BQS)).

%% Similar problem to fetchwhile/4
ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) ->
ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags, DelFailed) ->
AckTagsByPriority = partition_acktags(AckTags),
fold2(
fun (P, BQSN, AccN) ->
case maps:find(P, AckTagsByPriority) of
{ok, ATagsN} -> {AccN1, BQSN1} =
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN),
BQ:ackfold(MsgFun, AccN, BQSN, ATagsN, DelFailed),
{priority_on_acktags(P, AccN1), BQSN1};
error -> {AccN, BQSN}
end
end, Acc, State);
ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags) ->
?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags)).
ackfold(MsgFun, Acc, State = #passthrough{bq = BQ, bqs = BQS}, AckTags, DelFailed) ->
?passthrough2(ackfold(MsgFun, Acc, BQS, AckTags, DelFailed)).

len(#state{bq = BQ, bqss = BQSs}) ->
add0(fun (_P, BQSN) -> BQ:len(BQSN) end, BQSs);
Expand Down
Loading
Loading