diff --git a/README.md b/README.md index a031d68a..4cb4e434 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,8 @@ Connect to MQTT Broker: ```erlang --type mqttc_opt() :: {host, inet:ip_address() | string()} +-type mqttc_opt() :: {hosts, [{inet:ip_address(), inet:port_number()}]}, + {host, inet:ip_address() | string()} | {port, inet:port_number()} | {client_id, binary()} | {clean_sess, boolean()} @@ -123,6 +124,7 @@ Connect to MQTT Broker: Option | Value | Default | Description | Example -------|-------|---------|-------------|--------- +hosts | list(tuple()) | [] | List of hosts to connect to | [{"127.0.0.1", 8883}, {"127.0.0.2", 8883}] host | inet:ip_address() or string() | "locahost" | Broker Address | "locahost" port | inet:port_number() | 1883 | Broker Port | client_id | binary() | random clientId | MQTT ClientId | <<"slimpleClientId">> diff --git a/src/emqttc.erl b/src/emqttc.erl index c9473f13..e36b7b0c 100644 --- a/src/emqttc.erl +++ b/src/emqttc.erl @@ -43,6 +43,9 @@ %% Lookup topics -export([topics/1]). +%% Connection status +-export([is_connected/1]). + %% Publish, Subscribe API -export([publish/3, publish/4, sync_publish/4, @@ -70,7 +73,8 @@ -endif. --type mqttc_opt() :: {host, inet:ip_address() | string()} +-type mqttc_opt() :: {hosts, list()} + | {host, inet:ip_address() | string()} | {port, inet:port_number()} | {client_id, binary()} | {clean_sess, boolean()} @@ -93,6 +97,7 @@ -record(state, {parent :: pid(), name :: atom(), + hosts = [] :: list(), host = "localhost" :: inet:ip_address() | string(), port = 1883 :: inet:port_number(), socket :: inet:socket(), @@ -351,6 +356,14 @@ ping(Client) -> disconnect(Client) -> gen_fsm:send_event(Client, disconnect). +%%------------------------------------------------------------------------------ +%% @doc is the emqtt client connected to a broker +%% @end +%%------------------------------------------------------------------------------ +-spec is_connected(Client) -> false | {true, {Host :: string(), Port :: integer()}} when Client :: pid() | atom(). +is_connected(Client) -> + gen_fsm:sync_send_all_state_event(Client, is_connected). + %%%============================================================================= %%% gen_fsm callbacks %%%============================================================================= @@ -390,6 +403,7 @@ init([Name, Parent, MqttOpts, TcpOpts]) -> State = init(MqttOpts1, #state{name = Name, parent = Parent, + hosts = [], host = "127.0.0.1", port = 1883, proto_state = ProtoState, @@ -401,10 +415,13 @@ init([Name, Parent, MqttOpts, TcpOpts]) -> tcp_opts = TcpOpts, ssl_opts = []}), - {ok, connecting, State, 0}. + State1 = init_hosts(State), + {ok, connecting, State1, 0}. init([], State) -> State; +init([{hosts, Hosts} | Opts], State) -> + init(Opts, State#state{hosts = Hosts}); init([{host, Host} | Opts], State) -> init(Opts, State#state{host = Host}); init([{port, Port} | Opts], State) -> @@ -437,6 +454,10 @@ init_reconnector(false) -> init_reconnector(Params) when is_integer(Params) orelse is_tuple(Params) -> emqttc_reconnector:new(Params). +init_hosts(State = #state{hosts = [], host = Host, port = Port}) -> + State#state{hosts = [{Host, Port}]}; +init_hosts(State) -> + State. %%------------------------------------------------------------------------------ %% @private %% @doc Event Handler for state that connecting to MQTT broker. @@ -786,6 +807,12 @@ handle_sync_event(topics, _From, StateName, State = #state{pubsub_map = PubsubMa TopicTable = [{Topic, Qos} || {Topic, {Qos, _Subs}} <- maps:to_list(PubsubMap)], {reply, TopicTable, StateName, State}; +handle_sync_event(is_connected, _From, StateName = connected, State = #state{host = Host, port = Port}) -> + {reply, {true, {Host, Port}}, StateName, State}; + +handle_sync_event(is_connected, _From, StateName, State) -> + {reply, false, StateName, State}; + handle_sync_event(_Event, _From, StateName, State) -> Reply = ok, {reply, Reply, StateName, State}. @@ -916,7 +943,22 @@ code_change(_OldVsn, StateName, State, _Extra) -> next_state(StateName, State) -> {next_state, StateName, State, hibernate}. -connect(State = #state{name = Name, +connect(State = #state{hosts = Hosts}) -> + connect(Hosts, none, State). + +connect([{Host,Port} | Rest], _, State) -> + case try_connect(State#state{host=Host, port=Port}) of + {ok, NewState} -> + {next_state, waiting_for_connack, NewState}; + {error, Reason} -> + connect(Rest, Reason, State) + + end; + +connect([], LastReason, State) -> + try_reconnect(LastReason, State). + +try_connect(State = #state{name = Name, host = Host, port = Port, socket = undefined, @@ -936,14 +978,14 @@ connect(State = #state{name = Name, KeepAlive = emqttc_keepalive:new({Socket, send_oct}, KeepAliveTime, {keepalive, timeout}), TRef = gen_fsm:start_timer(ConnAckTimeout*1000, connack), Logger:info("[Client ~s] connected with ~s:~p", [Name, Host, Port]), - {next_state, waiting_for_connack, State#state{socket = Socket, - receiver = Receiver, - keepalive = KeepAlive, - connack_tref = TRef, - proto_state = ProtoState1}}; + {ok, State#state{socket = Socket, + receiver = Receiver, + keepalive = KeepAlive, + connack_tref = TRef, + proto_state = ProtoState1}}; {error, Reason} -> - Logger:info("[Client ~s] connection failure: ~p", [Name, Reason]), - try_reconnect(Reason, State) + Logger:info("[Client ~s] failed to connect with ~s:~p: ~p", [Name, Host, Port, Reason]), + {error, Reason} end. try_reconnect(Reason, State = #state{reconnector = undefined}) ->