Detect downstream connection attempt failure quicker

By checking for monitor message
parent 79fb6015
...@@ -173,13 +173,13 @@ handle_down(MonRef, Pid, #state{downstreams = Ds, ...@@ -173,13 +173,13 @@ handle_down(MonRef, Pid, #state{downstreams = Ds,
{Pid, DsM1} -> {Pid, DsM1} ->
Pending1 = lists:delete(Pid, Pending), Pending1 = lists:delete(Pid, Pending),
Ds1 = ds_remove(Pid, Ds), Ds1 = ds_remove(Pid, Ds),
lager:warning("Downstream=~p is down", [Pid]), lager:error("Downstream=~p is down", [Pid]),
St#state{pending_downstreams = Pending1, St#state{pending_downstreams = Pending1,
downstreams = Ds1, downstreams = Ds1,
downstream_monitors = DsM1}; downstream_monitors = DsM1};
_ -> _ ->
lager:warning("Unexpected DOWN. ref=~p, pid=~p", lager:error("Unexpected DOWN. ref=~p, pid=~p",
[MonRef, Pid]), [MonRef, Pid]),
St St
end end
end. end.
...@@ -211,7 +211,6 @@ connect_many(ToSpawn, St) -> ...@@ -211,7 +211,6 @@ connect_many(ToSpawn, St) ->
connect(#state{pending_downstreams = Pending, connect(#state{pending_downstreams = Pending,
downstream_monitors = DsM, downstream_monitors = DsM,
dc_id = DcId} = St) -> dc_id = DcId} = St) ->
%% Should monitor connection PIDs as well!
Pid = do_connect(DcId), Pid = do_connect(DcId),
MonRef = erlang:monitor(process, Pid), MonRef = erlang:monitor(process, Pid),
St#state{pending_downstreams = [Pid | Pending], St#state{pending_downstreams = [Pid | Pending],
...@@ -223,12 +222,19 @@ do_connect(DcId) -> ...@@ -223,12 +222,19 @@ do_connect(DcId) ->
Pid. Pid.
%% Block until all async connections are acked %% Block until all async connections are acked
wait_pending(#state{pending_downstreams = Pending} = St) -> wait_pending(#state{pending_downstreams = Pending,
downstream_monitors = DsM} = St) ->
lists:foldl( lists:foldl(
fun(Pid, #state{pending_downstreams = [Pid | Remaining], fun(Pid, #state{pending_downstreams = [Pid | Remaining],
downstreams = Ds} = St1) -> downstreams = Ds} = St1) ->
receive receive
{'$gen_cast', {connected, Pid}} -> Pid {'$gen_cast', {connected, Pid}} -> Pid;
{'DOWN', MonitorRef, process, Pid, Reason} ->
%% maybe try to re-connect?
(maps:get(MonitorRef, DsM, undefined) == Pid)
orelse exit({unexpected_down,
MonitorRef, Pid, Reason}),
exit({connection_failed, Pid, Reason})
after 10000 -> after 10000 ->
exit({timeout, receive Smth -> Smth after 0 -> none end}) exit({timeout, receive Smth -> Smth after 0 -> none end})
end, end,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment