Add monitor dc_pool <-> down_conn

parent 15bff248
......@@ -35,9 +35,15 @@
-record(state,
{dc_id :: mtp_config:dc_id(),
%% This one might be really big:
upstreams = #{} :: #{upstream() => downstream()},
%% On-demand downstreams are started asynchronously;
pending_downstreams = [] :: [pid()],
downstreams :: ds_store()
%% Downstream storage that allows to choose the one with minimal
%% number of connections
%% Should be relatively small
downstreams :: ds_store(),
downstream_monitors = #{} :: #{reference() => downstream()}
}).
%%%===================================================================
......@@ -66,10 +72,11 @@ status(Pool) ->
%%%===================================================================
init(DcId) ->
InitConnections = application:get_env(mtproto_proxy, init_dc_connections, 4),
PendingConnections = [do_connect(DcId) || _ <- lists:seq(1, InitConnections)],
Connections = recv_pending(PendingConnections),
Downstreams = ds_new(Connections),
{ok, #state{dc_id = DcId, downstreams = Downstreams}}.
State = #state{dc_id = DcId,
downstreams = ds_new([])},
State1 = connect_many(InitConnections, State),
State2 = wait_pending(State1),
{ok, State2}.
handle_call({get, Upstream, Opts}, _From, State) ->
{Downstream, State1} = handle_get(Upstream, Opts, State),
......@@ -140,17 +147,30 @@ handle_return(Upstream, #state{downstreams = Ds,
St#state{downstreams = Ds1,
upstreams = Us1}.
handle_down(MonRef, MaybeUpstream, #state{downstreams = Ds,
upstreams = Us} = St) ->
case maps:take(MaybeUpstream, Us) of
handle_down(MonRef, Pid, #state{downstreams = Ds,
downstream_monitors = DsM,
upstreams = Us,
pending_downstreams = Pending} = St) ->
case maps:take(Pid, Us) of
{{Downstream, MonRef}, Us1} ->
ok = mtp_down_conn:upstream_closed(Downstream, MaybeUpstream),
ok = mtp_down_conn:upstream_closed(Downstream, Pid),
Ds1 = ds_return(Downstream, Ds),
St#state{downstreams = Ds1,
upstreams = Us1};
error ->
lager:warning("Unexpected DOWN. ref=~p, pid=~p", [MonRef, MaybeUpstream]),
St
case maps:take(MonRef, DsM) of
{Pid, DsM1} ->
Pending1 = lists:delete(Pid, Pending),
Ds1 = ds_remove(Pid, Ds),
lager:warning("Downstream=~p is down", [Pid]),
St#state{pending_downstreams = Pending1,
downstreams = Ds1,
downstream_monitors = DsM1};
_ ->
lager:warning("Unexpected DOWN. ref=~p, pid=~p",
[MonRef, Pid]),
St
end
end.
maybe_spawn_connection(CurrentMin, #state{pending_downstreams = Pending} = St) ->
......@@ -167,18 +187,23 @@ maybe_spawn_connection(CurrentMin, #state{pending_downstreams = Pending} = St) -
_ ->
0
end,
connect_many(ToSpawn, St).
connect_many(ToSpawn, St) ->
lists:foldl(
fun(_, S) ->
connect(S)
end, St, lists:seq(1, ToSpawn)).
%% Initiate new async connection
connect(#state{pending_downstreams = Pending,
downstream_monitors = DsM,
dc_id = DcId} = St) ->
%% Should monitor connection PIDs as well!
Pid = do_connect(DcId),
St#state{pending_downstreams = [Pid | Pending]}.
MonRef = erlang:monitor(process, Pid),
St#state{pending_downstreams = [Pid | Pending],
downstream_monitors = DsM#{MonRef => Pid}}.
%% Asynchronous connect
do_connect(DcId) ->
......@@ -186,12 +211,18 @@ do_connect(DcId) ->
Pid.
%% Block until all async connections are acked
recv_pending(Pids) ->
[receive
{'$gen_cast', {connected, Pid}} -> Pid
after 10000 ->
exit({timeout, receive Smth -> Smth after 0 -> none end})
end || Pid <- Pids].
wait_pending(#state{pending_downstreams = Pending} = St) ->
lists:foldl(
fun(Pid, #state{pending_downstreams = [Pid | Remaining],
downstreams = Ds} = St1) ->
receive
{'$gen_cast', {connected, Pid}} -> Pid
after 10000 ->
exit({timeout, receive Smth -> Smth after 0 -> none end})
end,
St1#state{pending_downstreams = Remaining,
downstreams = ds_add_downstream(Pid, Ds)}
end, St, Pending).
%% New downstream connection storage
-spec ds_new([downstream()]) -> ds_store().
......@@ -228,3 +259,7 @@ ds_get(St) ->
ds_return(Pid, St) ->
{ok, St1} = pid_psq:dec_priority(Pid, St),
St1.
-spec ds_remove(downstream(), ds_store()) -> ds_store().
ds_remove(Downstream, St) ->
pid_psq:delete(Downstream, St).
......@@ -129,9 +129,14 @@ code_change(_OldVsn, State, _Extra) ->
%% Send packet from upstream to downstream
handle_send(Data, Upstream, #state{upstreams = Ups,
addr_bin = ProxyAddr} = St) ->
UpstreamData = maps:get(Upstream, Ups),
Packet = mtp_rpc:encode_packet({data, Data}, {UpstreamData, ProxyAddr}),
down_send(Packet, St).
case Ups of
#{Upstream := UpstreamData} ->
Packet = mtp_rpc:encode_packet({data, Data}, {UpstreamData, ProxyAddr}),
down_send(Packet, St);
_ ->
lager:warning("Upstream=~p not found", [Upstream]),
{{error, unknown_upstream}, St}
end.
%% New upstream connected
handle_upstream_new(Upstream, Opts, #state{upstreams = Ups,
......@@ -141,6 +146,7 @@ handle_upstream_new(Upstream, Opts, #state{upstreams = Ups,
AdTag = maps:get(ad_tag, Opts, undefined),
Ups1 = Ups#{Upstream => {ConnId, iolist_to_binary(mtp_rpc:encode_ip_port(Ip, Port)), AdTag}},
UpsRev1 = UpsRev#{ConnId => Upstream},
lager:debug("New upstream=~p conn_id=~p", [Upstream, ConnId]),
St#state{upstreams = Ups1,
upstreams_rev = UpsRev1}.
......@@ -226,14 +232,15 @@ down_send(Packet, #state{sock = Sock, codec = Codec} = St) ->
up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) ->
case maps:find(ConnId, UpsRev) of
{ok, Upstream} ->
ok = mtp_handler:send(Upstream, Packet),
St;
error ->
lager:warning("Unknown connection_id=~w", [ConnId]),
ClosedPacket = mtp_rpc:encode_packet(remote_closed, ConnId),
{ok, St1} = down_send(ClosedPacket, St),
St1
{ok, Upstream} ->
ok = mtp_handler:send(Upstream, Packet),
St;
error ->
lager:warning("Unknown connection_id=~w", [ConnId]),
%% WHY!!!?
%% ClosedPacket = mtp_rpc:encode_packet(remote_closed, ConnId),
%% {ok, St1} = down_send(ClosedPacket, St),
St
end.
connect(DcId, S) ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment