Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@
%% membership changes
add_member/2,
add_member/3,
pipeline_add_member/4,
remove_member/2,
remove_member/3,
pipeline_remove_member/4,
leave_and_terminate/3,
leave_and_terminate/4,
leave_and_delete_server/3,
Expand Down Expand Up @@ -613,6 +615,38 @@ add_member(ServerLoc, ServerId, Timeout) ->
{'$ra_join', ServerId, after_log_append},
Timeout).

%% @doc Asynchronously add a ra server id to a ra cluster's membership
%% configuration.
%%
%% This is the same operation as {@link add_member/2} but the membership
%% change is pipelined in the same way as {@link pipeline_command/4}.
%%
%% @param ServerRef the ra server or servers to send the command to
%% @param ServerId the ra server id of the server to remove
%% @param Correlation a correlation identifier to be included to receive an
%% async notification after the command is applied to the state machine. If the
%% Correlation is set to `no_correlation' then no notifications will be sent.
%% @param Priority command priority. `low' priority commands will be held back
%% and appended to the Raft log in batches. NB: A `normal' priority command sent
%% from the same process can overtake a low priority command that was
%% sent before. There is no high priority.
%% Only use priority level of `low' with commands that
%% do not rely on total execution ordering.
%% @see add_member/2
%% @end
-spec pipeline_add_member(ServerRef :: ra_server_id() | [ra_server_id()],
ServerId :: ra_server_id(),
Correlation :: ra_server:command_correlation() |
no_correlation,
Priority :: ra_server:command_priority()) -> ok.
pipeline_add_member(ServerRef, ServerId, no_correlation, Priority) ->
Cmd = {'$ra_join', ServerId, noreply},
ra_server_proc:cast_command(ServerRef, Priority, Cmd);
pipeline_add_member(ServerRef, ServerId, Correlation, Priority) ->
Cmd = {'$ra_join', ServerId, {notify, Correlation, self()}},
ra_server_proc:cast_command(ServerRef, Priority, Cmd).


%% @doc Removes a server from the cluster's membership configuration.
%% This function returns after appending a cluster membership change
%% command to the log.
Expand Down Expand Up @@ -647,6 +681,37 @@ remove_member(ServerRef, ServerId, Timeout) ->
{'$ra_leave', ServerId, after_log_append},
Timeout).

%% @doc Asynchronously remove a ra server id from a ra cluster's membership
%% configuration.
%%
%% This is the same operation as {@link remove_member/2} but the membership
%% change is pipelined in the same way as {@link pipeline_command/4}.
%%
%% @param ServerRef the ra server or servers to send the command to
%% @param ServerId the ra server id of the server to remove
%% @param Correlation a correlation identifier to be included to receive an
%% async notification after the command is applied to the state machine. If the
%% Correlation is set to `no_correlation' then no notifications will be sent.
%% @param Priority command priority. `low' priority commands will be held back
%% and appended to the Raft log in batches. NB: A `normal' priority command sent
%% from the same process can overtake a low priority command that was
%% sent before. There is no high priority.
%% Only use priority level of `low' with commands that
%% do not rely on total execution ordering.
%% @see remove_member/2
%% @end
-spec pipeline_remove_member(ServerRef :: ra_server_id() | [ra_server_id()],
ServerId :: ra_server_id(),
Correlation :: ra_server:command_correlation() |
no_correlation,
Priority :: ra_server:command_priority()) -> ok.
pipeline_remove_member(ServerRef, ServerId, no_correlation, Priority) ->
Cmd = {'$ra_leave', ServerId, noreply},
ra_server_proc:cast_command(ServerRef, Priority, Cmd);
pipeline_remove_member(ServerRef, ServerId, Correlation, Priority) ->
Cmd = {'$ra_leave', ServerId, {notify, Correlation, self()}},
ra_server_proc:cast_command(ServerRef, Priority, Cmd).

%% @doc Makes the server enter a pre-vote state and attempt to become the leader.
%% It is necessary to call this function when starting a new cluster as a
%% brand new Ra server (node) will not automatically enter the pre-vote state.
Expand Down
2 changes: 2 additions & 0 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3518,6 +3518,8 @@ append_error_reply(Cmd, Reason, Effects0) ->
case Cmd of
{_, #{from := From}, _, _} ->
[{reply, From, {error, Reason}} | Effects0];
{_, _, _, {notify, Corr, Pid}} ->
[{notify, #{Pid => [{Corr, {error, Reason}}]}} | Effects0];
_ ->
Effects0
end.
Expand Down
26 changes: 26 additions & 0 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ all_tests() ->
snapshot_installation,
snapshot_installation_with_call_crash,
add_member,
pipeline_membership_changes,
queue_example,
ramp_up_and_ramp_down,
start_and_join_then_leave_and_terminate,
Expand Down Expand Up @@ -877,6 +878,31 @@ add_member(Config) ->
{ok, 9, Leader} = ra:consistent_query(C, fun(S) -> S end),
terminate_cluster([C | Cluster]).

pipeline_membership_changes(Config) ->
Name = ?config(test_name, Config),
[A, B, C] = Cluster0 = start_local_cluster(3, Name, add_machine()),
{ok, _, Leader0} = ra:process_command(A, 9),
Corr1 = make_ref(),
ok = ra:pipeline_remove_member(Leader0, C, Corr1, normal),
[{Corr1, ok}] = gather_applied([], 0),
stop_server(C),
{ok, Members, Leader} = ra:members(A),
?assertEqual(lists:sort(Members), lists:sort([A, B])),
%% Process a command to ensure that the cluster change command has
%% been committed - this prevents spurious failures of
%% `cluster_change_not_permitted` from the next leave command:
{ok, _, Leader} = ra:process_command(Leader, 4),
Corr2 = make_ref(),
ok = ra:pipeline_remove_member(Leader, C, Corr2, normal),
[{Corr2, {error, not_member}}] = gather_applied([], 0),
ok = ra:start_server(default, Name, C, add_machine(), [A, B]),
Corr3 = make_ref(),
ok = ra:pipeline_add_member(Leader, C, Corr3, normal),
[{Corr3, ok}] = gather_applied([], 0),
{ok, Members1, _Leader} = ra:members(Leader),
?assertEqual(lists:sort(Members1), lists:sort(Cluster0)),
terminate_cluster(Cluster0).

server_catches_up(Config) ->
N1 = nth_server_name(Config, 1),
N2 = nth_server_name(Config, 2),
Expand Down