Minor cleanups

parent 120bf6a4
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
-record(state, -record(state,
{dc_id :: mtp_config:dc_id(), {dc_id :: mtp_config:dc_id(),
%% This one might be really big: %% This one might be really big:
upstreams = #{} :: #{upstream() => downstream()}, upstreams = #{} :: #{upstream() => {downstream(), Monitor :: reference()}},
%% On-demand downstreams are started asynchronously; %% On-demand downstreams are started asynchronously;
pending_downstreams = [] :: [pid()], pending_downstreams = [] :: [pid()],
%% Downstream storage that allows to choose the one with minimal %% Downstream storage that allows to choose the one with minimal
...@@ -142,7 +142,6 @@ handle_get(Upstream, Opts, #state{downstreams = Ds, ...@@ -142,7 +142,6 @@ handle_get(Upstream, Opts, #state{downstreams = Ds,
upstreams = Us} = St) -> upstreams = Us} = St) ->
{Downstream, N, Ds1} = ds_get(Ds), {Downstream, N, Ds1} = ds_get(Ds),
MonRef = erlang:monitor(process, Upstream), MonRef = erlang:monitor(process, Upstream),
%% if N > X and len(pending) < Y -> connect()
Us1 = Us#{Upstream => {Downstream, MonRef}}, Us1 = Us#{Upstream => {Downstream, MonRef}},
ok = mtp_down_conn:upstream_new(Downstream, Upstream, Opts), ok = mtp_down_conn:upstream_new(Downstream, Upstream, Opts),
{Downstream, maybe_spawn_connection( {Downstream, maybe_spawn_connection(
...@@ -186,6 +185,7 @@ handle_down(MonRef, Pid, #state{downstreams = Ds, ...@@ -186,6 +185,7 @@ handle_down(MonRef, Pid, #state{downstreams = Ds,
end. end.
maybe_spawn_connection(CurrentMin, #state{pending_downstreams = Pending} = St) -> maybe_spawn_connection(CurrentMin, #state{pending_downstreams = Pending} = St) ->
%% if N > X and len(pending) < Y -> connect()
%% TODO: shrinking (by timer) %% TODO: shrinking (by timer)
ToSpawn = ToSpawn =
case application:get_env(?APP, clients_per_dc_connection) of case application:get_env(?APP, clients_per_dc_connection) of
......
...@@ -41,13 +41,13 @@ ...@@ -41,13 +41,13 @@
-record(state, {stage = init :: stage(), -record(state, {stage = init :: stage(),
stage_state = [] :: any(), stage_state = [] :: any(),
sock :: gen_tcp:socket() | undefined, sock :: gen_tcp:socket() | undefined,
addr_bin :: binary(), % my external ip:port addr_bin :: binary() | undefined, % my external ip:port
codec :: mtp_layer:layer() | undefined, codec :: mtp_layer:layer() | undefined,
upstreams = #{} :: #{mtp_handler:handle() => upstream()}, upstreams = #{} :: #{mtp_handler:handle() => upstream()},
upstreams_rev = #{} :: #{mtp_rpc:conn_id() => mtp_handler:handle()}, upstreams_rev = #{} :: #{mtp_rpc:conn_id() => mtp_handler:handle()},
pool :: pid(), pool :: pid(),
dc_id :: mtp_config:dc_id(), dc_id :: mtp_config:dc_id(),
netloc :: mtp_config:netloc() % telegram server ip:port netloc :: mtp_config:netloc() | undefined % telegram server ip:port
}). }).
start_link(Pool, DcId) -> start_link(Pool, DcId) ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment