Respawn failed downstream connections. Fixes gh-25

parent 46800f06
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
-define(BURST_MAX, 10). -define(BURST_MAX, 10).
-define(DEFAULT_INIT_CONNS, 4).
-type upstream() :: mtp_handler:handle(). -type upstream() :: mtp_handler:handle().
-type downstream() :: mtp_down_conn:handle(). -type downstream() :: mtp_down_conn:handle().
...@@ -92,7 +93,7 @@ status(Pool) -> ...@@ -92,7 +93,7 @@ status(Pool) ->
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
init(DcId) -> init(DcId) ->
InitConnections = application:get_env(mtproto_proxy, init_dc_connections, 4), InitConnections = application:get_env(?APP, init_dc_connections, ?DEFAULT_INIT_CONNS),
State = #state{dc_id = DcId, State = #state{dc_id = DcId,
downstreams = ds_new([])}, downstreams = ds_new([])},
State1 = connect_many(InitConnections, State), State1 = connect_many(InitConnections, State),
...@@ -184,15 +185,34 @@ handle_down(MonRef, Pid, Reason, #state{downstreams = Ds, ...@@ -184,15 +185,34 @@ handle_down(MonRef, Pid, Reason, #state{downstreams = Ds,
Pending1 = lists:delete(Pid, Pending), Pending1 = lists:delete(Pid, Pending),
Ds1 = ds_remove(Pid, Ds), Ds1 = ds_remove(Pid, Ds),
?log(error, "Downstream=~p is down. reason=~p", [Pid, Reason]), ?log(error, "Downstream=~p is down. reason=~p", [Pid, Reason]),
St#state{pending_downstreams = Pending1, maybe_restart_connection(
downstreams = Ds1, St#state{pending_downstreams = Pending1,
downstream_monitors = DsM1}; downstreams = Ds1,
downstream_monitors = DsM1});
_ -> _ ->
?log(error, "Unexpected DOWN. ref=~p, pid=~p, reason=~p", [MonRef, Pid, Reason]), ?log(error, "Unexpected DOWN. ref=~p, pid=~p, reason=~p", [MonRef, Pid, Reason]),
St St
end end
end. end.
maybe_restart_connection(#state{pending_downstreams = [],
downstream_monitors = DsM} = St) ->
MinConnections = application:get_env(?APP, init_dc_connections, ?DEFAULT_INIT_CONNS),
OpenConnections = map_size(DsM),
case OpenConnections < MinConnections of
true ->
%% We have less than minimum connections. Just spawn new one
connect(St);
false ->
%% We have more than minimum connections.
%% Don't spawn anything, because it will be done on-demand
St
end;
maybe_restart_connection(St) ->
%% We already have pending connections. Just wait for them to complete
St.
maybe_spawn_connection(CurrentMin, #state{pending_downstreams = Pending} = St) -> maybe_spawn_connection(CurrentMin, #state{pending_downstreams = Pending} = St) ->
%% if N > X and len(pending) < Y -> connect() %% if N > X and len(pending) < Y -> connect()
%% TODO: shrinking (by timer) %% TODO: shrinking (by timer)
......
...@@ -168,8 +168,8 @@ handle_info(do_connect, #state{dc_id = DcId} = State) -> ...@@ -168,8 +168,8 @@ handle_info(do_connect, #state{dc_id = DcId} = State) ->
{ok, St1} = connect(DcId, State), {ok, St1} = connect(DcId, State),
{noreply, St1} {noreply, St1}
catch ?WITH_STACKTRACE(Class, Reason, Stack) catch ?WITH_STACKTRACE(Class, Reason, Stack)
?log(error, "Down connect error: ~s", ?log(error, "Down connect to dc=~w error: ~s",
[lager:pr_stacktrace(Stack, {Class, Reason})]), %XXX lager-specific [DcId, lager:pr_stacktrace(Stack, {Class, Reason})]), %XXX lager-specific
erlang:send_after(300, self(), do_connect), erlang:send_after(300, self(), do_connect),
{noreply, State} {noreply, State}
end. end.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment