Fix "maybe_close_down" race-condition

If downstream connection pool crashed and recovered while some
upstream was alive holding it's DC ID, it may return downstream
taken from the old incarnation of pool to new one because it's
registered under the same name.
Fix is to hold pool's PID instead of name
parent e9f999a1
......@@ -113,9 +113,8 @@ handle_cast({return, Upstream}, State) ->
handle_cast({connected, Pid}, State) ->
{noreply, handle_connected(Pid, State)}.
handle_info({'DOWN', MonitorRef, process, Pid, _Reason}, State) ->
%% TODO: monitor downstream connections as well
{noreply, handle_down(MonitorRef, Pid, State)}.
handle_info({'DOWN', MonitorRef, process, Pid, Reason}, State) ->
{noreply, handle_down(MonitorRef, Pid, Reason, State)}.
terminate(_Reason, #state{downstreams = Ds}) ->
ds_fold(
fun(Pid, _, _) ->
......@@ -158,7 +157,7 @@ handle_return(Upstream, #state{downstreams = Ds,
St#state{downstreams = Ds1,
upstreams = Us1}.
handle_down(MonRef, Pid, #state{downstreams = Ds,
handle_down(MonRef, Pid, Reason, #state{downstreams = Ds,
downstream_monitors = DsM,
upstreams = Us,
pending_downstreams = Pending} = St) ->
......@@ -173,13 +172,14 @@ handle_down(MonRef, Pid, #state{downstreams = Ds,
{Pid, DsM1} ->
Pending1 = lists:delete(Pid, Pending),
Ds1 = ds_remove(Pid, Ds),
lager:error("Downstream=~p is down", [Pid]),
lager:error("Downstream=~p is down. reason=~p",
[Pid, Reason]),
St#state{pending_downstreams = Pending1,
downstreams = Ds1,
downstream_monitors = DsM1};
_ ->
lager:error("Unexpected DOWN. ref=~p, pid=~p",
[MonRef, Pid]),
lager:error("Unexpected DOWN. ref=~p, pid=~p, reason=~p",
[MonRef, Pid, Reason]),
St
end
end.
......
......@@ -45,7 +45,7 @@
codec :: mtp_codec:codec() | undefined,
down :: mtp_down_conn:handle() | undefined,
dc_id :: integer(),
dc_id :: {DcId :: integer(), Pool :: pid()},
ad_tag :: binary(),
addr :: mtp_config:netloc(), % IP/Port of remote side
......@@ -191,8 +191,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
maybe_close_down(#state{down = undefined} = S) -> S;
maybe_close_down(#state{dc_id = DcId} = S) ->
{ok, Pool} = mtp_config:get_downstream_pool(DcId),
maybe_close_down(#state{dc_id = {_DcId, Pool}} = S) ->
mtp_dc_pool:return(Pool, self()),
S#state{down = undefined}.
......@@ -315,12 +314,12 @@ handle_unknown_upstream(#state{down = Down, sock = USock, transport = UTrans} =
handle_upstream_header(DcId, #state{acc = Acc, ad_tag = Tag, addr = Addr} = S) ->
Opts = #{ad_tag => Tag,
addr => Addr},
{RealDcId, _Pool, Downstream} = mtp_config:get_downstream_safe(DcId, Opts),
{RealDcId, Pool, Downstream} = mtp_config:get_downstream_safe(DcId, Opts),
handle_upstream_data(
Acc,
switch_timer(
S#state{down = Downstream,
dc_id = RealDcId,
dc_id = {RealDcId, Pool},
acc = <<>>,
stage = tunnel},
hibernate)).
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment