From e0a53dd99788148b14debdb4c6e88f2f708e1a77 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Wed, 9 Nov 2016 17:32:45 +0100 Subject: [PATCH 1/3] added gproc_dist:multicall() --- src/gproc_dist.erl | 77 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 72 insertions(+), 5 deletions(-) diff --git a/src/gproc_dist.erl b/src/gproc_dist.erl index 76255bc..bba12bf 100644 --- a/src/gproc_dist.erl +++ b/src/gproc_dist.erl @@ -44,6 +44,7 @@ -export([leader_call/1, leader_cast/1, sync/0, + multicall/3, get_leader/0]). %%% internal exports @@ -69,7 +70,8 @@ -record(state, { always_broadcast = false, is_leader, - sync_requests = []}). + sync_requests = [], + calls = []}). -include("gproc_trace.hrl"). %% ========================================================== @@ -270,6 +272,14 @@ get_leader() -> GenLeader = gen_leader, GenLeader:call(?MODULE, get_leader). +multicall(M, F, A) -> + case leader_call({multicall, M, F, A}) of + {ok, Result} -> + Result; + {error, Error} -> + error(Error) + end. + %% ========================================================== %% Server-side @@ -281,10 +291,25 @@ handle_call(get_leader, _, S, E) -> handle_call(_, _, S, _) -> {reply, badarg, S}. -handle_info({'DOWN', _MRef, process, Pid, _}, S) -> - ets:delete(?TAB, {Pid, g}), - leader_cast({pid_is_DOWN, Pid}), - {ok, S}; +handle_info({'DOWN', MRef, process, Pid, Msg}, #state{calls = Calls} = S) -> + case lists:keyfind(Pid, 1, Calls) of + {Pid, MRef, server, From} -> + Reply = case Msg of + {mcall, Result} -> + {ok, multicall_result(Result)}; + Error -> + {error, Error} + end, + gen_leader:reply(From, {leader,reply,Reply}), + {ok, S#state{calls = lists:keydelete(Pid, 1, Calls)}}; + {Pid, MRef, client, Server} -> + Server ! {rcall_result, self(), Msg}, + {ok, S#state{calls = lists:keydelete(Pid, 1, Calls)}}; + _ -> + ets:delete(?TAB, {Pid, g}), + leader_cast({pid_is_DOWN, Pid}), + {ok, S} + end; handle_info({gproc_unreg, Objs}, S) -> {ok, [{delete, Objs}], S}; handle_info(_, S) -> @@ -364,6 +389,19 @@ handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) -> GenLeader:broadcast({from_leader, {sync, From}}, Alive, E), {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}} end; +handle_leader_call({multicall, M, F, A}, From, #state{calls = Calls} = S, E) -> + OtherNodes = gen_leader:alive(E) -- [node()], + {Pid, MRef} = spawn_monitor( + fun() -> + exit({mcall, multicall_server(M, F, A, OtherNodes)}) + end), + if OtherNodes =/= [] -> + gen_leader:broadcast({from_leader, {multicall, M, F, A, Pid}}, + OtherNodes, E); + true -> + ok + end, + {noreply, S#state{calls = [{Pid, MRef, server, From}|Calls]}}; handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E) when Reg==reg; Reg==reg_other -> case gproc_lib:insert_reg(K, Value, Pid, g) of @@ -819,6 +857,11 @@ terminate(_Reason, _S) -> from_leader({sync, Ref}, S, _E) -> gen_leader:leader_cast(?MODULE, {sync_reply, node(), Ref}), {ok, S}; +from_leader({multicall, M, F, A, Pid}, #state{calls = Calls} = S, _E) -> + {Pid1, MRef} = spawn_monitor(fun() -> + exit({mcall, apply(M, F, A)}) + end), + {ok, S#state{calls = [{Pid1, MRef, client, Pid}|Calls]}}; from_leader(Ops, S, _E) -> lists:foreach( fun({delete, Globals}) -> @@ -1070,3 +1113,27 @@ add_follow_to_waiters(Waiters, {T,_,_} = K, Pid, Ref, S) -> regged_new(reg ) -> true; regged_new(ensure) -> new. + +multicall_server(M, F, A, Nodes) -> + MyRes = try {mcall, apply(M, F, A)} + catch + C:E -> + {E, erlang:get_stacktrace()} + end, + [{node(), MyRes}|await_nodes(Nodes)]. + +await_nodes([H|T]) -> + receive + {rcall_result, Pid, Res} when node(Pid) =:= H -> + [{H, Res}|await_nodes(T)] + end; +await_nodes([]) -> + []. + +multicall_result(Res) -> + lists:foldr( + fun({_, {mcall, Good}}, {G, B}) -> + {[Good|G], B}; + ({N, E}, {G, B}) -> + {G, [{N,E}|B]} + end, {[], []}, Res). From 535236ba76053a2def08c23806dd43e048d2f427 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Wed, 9 Nov 2016 21:45:10 +0100 Subject: [PATCH 2/3] added edoc and test for multicall --- src/gproc_dist.erl | 16 +++++++++++++++- test/gproc_dist_tests.erl | 18 +++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/gproc_dist.erl b/src/gproc_dist.erl index bba12bf..cdd512d 100644 --- a/src/gproc_dist.erl +++ b/src/gproc_dist.erl @@ -272,6 +272,20 @@ get_leader() -> GenLeader = gen_leader, GenLeader:call(?MODULE, get_leader). +%% @spec multicall(Module::atom(), Func::atom(), Args::list()) -> +%% {[Result], [{node(), Error}]} +%% +%% @doc Perform a multicall RPC on all live gproc nodes +%% +%% This function works like {@link rpc:multicall/3}, except the calls are +%% routed via the gproc leader and its connected nodes - the same route as +%% for the data replication. This means that a multicall following a global +%% registration is guaranteed to follow the update on each gproc node. +%% +%% The return value will be of the form `{GoodResults, BadNodes}', where +%% `BadNodes' is a list of `{Node, Error}' for each node where the call +%% fails. +%% @end multicall(M, F, A) -> case leader_call({multicall, M, F, A}) of {ok, Result} -> @@ -1117,7 +1131,7 @@ regged_new(ensure) -> new. multicall_server(M, F, A, Nodes) -> MyRes = try {mcall, apply(M, F, A)} catch - C:E -> + _:E -> {E, erlang:get_stacktrace()} end, [{node(), MyRes}|await_nodes(Nodes)]. diff --git a/test/gproc_dist_tests.erl b/test/gproc_dist_tests.erl index 4bc7e54..547bb28 100644 --- a/test/gproc_dist_tests.erl +++ b/test/gproc_dist_tests.erl @@ -86,7 +86,8 @@ basic_tests(Ns) -> ?f(t_monitor(Ns)), ?f(t_standby_monitor(Ns)), ?f(t_follow_monitor(Ns)), - ?f(t_subscribe(Ns)) + ?f(t_subscribe(Ns)), + ?f(t_multicall(Ns)) ]. dist_setup() -> @@ -500,6 +501,21 @@ t_subscribe([A,B|_] = Ns) -> ?assertEqual({gproc_monitor,Na,undefined}, got_msg(Pb, gproc_monitor)), ok. +t_multicall(Ns) -> + t_multicall_(Ns, 10). + +t_multicall_([A|_] = Ns, I) when I > 0 -> + Na = ?T_NAME, + Pa = t_spawn_reg(A, Na), + Expected = {[Pa || _ <- Ns], []}, + ?assertMatch( + Expected, rpc:call(A, gproc_dist, multicall, + [gproc, where, [Na]])), + ok = t_call(Pa, die), + t_multicall_(Ns, I-1); +t_multicall_(_, _) -> + ok. + %% got_msg(Pb, Tag) -> %% t_call(Pb, %% {apply_fun, From ed33f71e4c03a3f4c90a359f4d6e74529c4bec71 Mon Sep 17 00:00:00 2001 From: Ulf Wiger Date: Thu, 10 Nov 2016 09:45:11 +0100 Subject: [PATCH 3/3] improved multicall test --- test/gproc_dist_tests.erl | 36 ++++++++++++++++++++++++++---------- test/gproc_test_lib.erl | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 10 deletions(-) diff --git a/test/gproc_dist_tests.erl b/test/gproc_dist_tests.erl index 547bb28..c9ca0f2 100644 --- a/test/gproc_dist_tests.erl +++ b/test/gproc_dist_tests.erl @@ -502,20 +502,36 @@ t_subscribe([A,B|_] = Ns) -> ok. t_multicall(Ns) -> - t_multicall_(Ns, 10). + t_multicall(Ns, 3). -t_multicall_([A|_] = Ns, I) when I > 0 -> - Na = ?T_NAME, - Pa = t_spawn_reg(A, Na), - Expected = {[Pa || _ <- Ns], []}, +t_multicall(Ns, I) when I > 0 -> ?assertMatch( - Expected, rpc:call(A, gproc_dist, multicall, - [gproc, where, [Na]])), - ok = t_call(Pa, die), - t_multicall_(Ns, I-1); -t_multicall_(_, _) -> + true, lists:all(fun t_mcall_/1, Ns)), + t_multicall(Ns, I-1); +t_multicall(_, _) -> ok. +t_mcall_(N) -> + Na = ?T_NAME, + Pa = gproc_test_lib:t_spawn_reg_mcall(N, Na), + ?assertMatch(ok, t_call(Pa, die)), + true. + +%% t_multicall(Ns) -> +%% t_multicall_(Ns, 10). + +%% t_multicall_([A|_] = Ns, I) when I > 0 -> +%% Na = ?T_NAME, +%% Pa = t_spawn_reg(A, Na), +%% Expected = {[Pa || _ <- Ns], []}, +%% ?assertMatch( +%% Expected, +%% t_call(Pa, {apply, gproc_dist, multicall, [gproc, where, [Na]]})), +%% ok = t_call(Pa, die), +%% t_multicall_(Ns, I-1); +%% t_multicall_(_, _) -> +%% ok. + %% got_msg(Pb, Tag) -> %% t_call(Pb, %% {apply_fun, diff --git a/test/gproc_test_lib.erl b/test/gproc_test_lib.erl index 51e8c21..c495ced 100644 --- a/test/gproc_test_lib.erl +++ b/test/gproc_test_lib.erl @@ -3,6 +3,7 @@ -export([t_spawn/1, t_spawn/2, t_spawn_reg/2, t_spawn_reg/3, t_spawn_reg/4, t_spawn_reg_shared/3, + t_spawn_reg_mcall/2, t_spawn_mreg/2, t_call/2, t_loop/0, t_loop/1, @@ -80,6 +81,43 @@ t_spawn_reg_shared(Node, Name, Value) -> erlang:error({timeout, t_spawn_reg_shared, [Node,Name,Value]}) end. +t_spawn_reg_mcall(Node, {_,g,_} = Name) -> + try t_spawn_reg_mcall_(Node, Name) + catch + error:E -> + error({E, erlang:get_stacktrace()}) + end. + +t_spawn_reg_mcall_(Node, Name) -> + Me = self(), + P = spawn( + Node, fun() -> + t_spawn_reg_mcall_p(Name, Me) + end), + MRef = erlang:monitor(process, P), + receive + {P, ok} -> + P; + {'DOWN', MRef, _, _, Reason} -> + error(Reason) + after 1000 -> + erlang:error({timeout, t_spawn_reg_mcall, [Node, Name]}) + end. + +t_spawn_reg_mcall_p(Name, Parent) -> + try begin + true = gproc:reg(Name), + {GoodRes, []} = gproc_dist:multicall(gproc, where, [Name]), + true = lists:all(fun(X) -> X =:= self() end, GoodRes), + Parent ! {self(), ok} + end + catch + error:E -> + error({E, erlang:get_stacktrace()}) + end, + t_loop(). + + default_value({c,_,_}) -> 0; default_value(_) -> undefined.