Add "status" for dc pools; add initial burst handling for pool growth

parent a8803a7b
...@@ -14,6 +14,8 @@ Features ...@@ -14,6 +14,8 @@ Features
* Automatic configuration reload (no need for restarts once per day) * Automatic configuration reload (no need for restarts once per day)
* Most of the configuration options can be updated without service restart * Most of the configuration options can be updated without service restart
* Very high performance - can handle tens of thousands connections! Scales to all CPU cores. * Very high performance - can handle tens of thousands connections! Scales to all CPU cores.
* Supports multiplexing (Many connections Client -> Proxy are wrapped to small amount of
connections Proxy -> Telegram Server)
* Small codebase compared to official one * Small codebase compared to official one
* A lots of metrics could be exported (optional) * A lots of metrics could be exported (optional)
......
...@@ -19,7 +19,8 @@ ...@@ -19,7 +19,8 @@
get_downstream_pool/1, get_downstream_pool/1,
get_netloc/1, get_netloc/1,
get_netloc_safe/1, get_netloc_safe/1,
get_secret/0]). get_secret/0,
status/0]).
-export([register_name/2, -export([register_name/2,
unregister_name/1, unregister_name/1,
whereis_name/1, whereis_name/1,
...@@ -131,6 +132,15 @@ get_secret() -> ...@@ -131,6 +132,15 @@ get_secret() ->
[{_, Key}] = ets:lookup(?TAB, key), [{_, Key}] = ets:lookup(?TAB, key),
Key. Key.
status() ->
[{?IDS_KEY, L}] = ets:lookup(?TAB, ?IDS_KEY),
lists:map(
fun(DcId) ->
DcPoolStatus = mtp_dc_pool:status(whereis_name(DcId)),
DcPoolStatus#{dc_id => DcId}
end, L).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
get/3, get/3,
return/2, return/2,
add_connection/1, add_connection/1,
ack_connected/2]). ack_connected/2,
status/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
...@@ -26,6 +27,7 @@ ...@@ -26,6 +27,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
-define(BURST_MAX, 10).
-type upstream() :: mtp_handler:handle(). -type upstream() :: mtp_handler:handle().
-type downstream() :: mtp_down_conn:handle(). -type downstream() :: mtp_down_conn:handle().
...@@ -56,6 +58,9 @@ add_connection(Pool) -> ...@@ -56,6 +58,9 @@ add_connection(Pool) ->
ack_connected(Pool, Downstream) -> ack_connected(Pool, Downstream) ->
gen_server:cast(Pool, {connected, Downstream}). gen_server:cast(Pool, {connected, Downstream}).
status(Pool) ->
gen_server:call(Pool, status).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
...@@ -71,7 +76,18 @@ handle_call({get, Upstream, Opts}, _From, State) -> ...@@ -71,7 +76,18 @@ handle_call({get, Upstream, Opts}, _From, State) ->
{reply, Downstream, State1}; {reply, Downstream, State1};
handle_call(add_connection, _From, State) -> handle_call(add_connection, _From, State) ->
State1 = connect(State), State1 = connect(State),
{reply, ok, State1}. {reply, ok, State1};
handle_call(status, _From, #state{downstreams = Ds,
upstreams = Us} = State) ->
{NDowns, NUps, Min, Max} =
ds_fold(
fun(_Pid, N, {NDowns, NUps, Min, Max}) ->
{NDowns + 1, NUps + N, min(Min, N), max(Max, N)}
end, {0, 0, map_size(Us), 0}, Ds),
{reply, #{n_downstreams => NDowns,
n_upstreams => NUps,
min => Min,
max => Max}, State}.
handle_cast({return, Upstream}, State) -> handle_cast({return, Upstream}, State) ->
{noreply, handle_return(Upstream, State)}; {noreply, handle_return(Upstream, State)};
...@@ -82,10 +98,10 @@ handle_info({'DOWN', MonitorRef, process, Pid, _Reason}, State) -> ...@@ -82,10 +98,10 @@ handle_info({'DOWN', MonitorRef, process, Pid, _Reason}, State) ->
%% TODO: monitor downstream connections as well %% TODO: monitor downstream connections as well
{noreply, handle_down(MonitorRef, Pid, State)}. {noreply, handle_down(MonitorRef, Pid, State)}.
terminate(_Reason, #state{downstreams = Ds}) -> terminate(_Reason, #state{downstreams = Ds}) ->
ds_foreach( ds_fold(
fun(Pid) -> fun(Pid, _, _) ->
mtp_down_conn:shutdown(Pid) mtp_down_conn:shutdown(Pid)
end, Ds), end, ok, Ds),
%% upstreams will be killed by connection itself %% upstreams will be killed by connection itself
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
...@@ -139,17 +155,23 @@ handle_down(MonRef, MaybeUpstream, #state{downstreams = Ds, ...@@ -139,17 +155,23 @@ handle_down(MonRef, MaybeUpstream, #state{downstreams = Ds,
maybe_spawn_connection(CurrentMin, #state{pending_downstreams = Pending} = St) -> maybe_spawn_connection(CurrentMin, #state{pending_downstreams = Pending} = St) ->
%% TODO: shrinking (by timer) %% TODO: shrinking (by timer)
case application:get_env(?APP, clients_per_dc_connection) of ToSpawn =
{ok, N} when CurrentMin > N, case application:get_env(?APP, clients_per_dc_connection) of
Pending == [] -> {ok, N} when CurrentMin > N,
ToSpawn = 2, Pending == [] ->
lists:foldl( 2;
fun(_, S) -> {ok, N} when CurrentMin > (N * 1.5),
connect(S) length(Pending) < ?BURST_MAX ->
end, St, lists:seq(1, ToSpawn)); %% To survive initial bursts
_ -> ?BURST_MAX - length(Pending);
St _ ->
end. 0
end,
lists:foldl(
fun(_, S) ->
connect(S)
end, St, lists:seq(1, ToSpawn)).
%% Initiate new async connection %% Initiate new async connection
connect(#state{pending_downstreams = Pending, connect(#state{pending_downstreams = Pending,
...@@ -181,12 +203,13 @@ ds_new(Connections) -> ...@@ -181,12 +203,13 @@ ds_new(Connections) ->
pid_psq:add(Conn, Psq1) pid_psq:add(Conn, Psq1)
end, Psq, Connections). end, Psq, Connections).
-spec ds_foreach(fun( (downstream()) -> any() ), ds_store()) -> ok. -spec ds_fold(fun( (downstream(), integer(), Acc) -> Acc ), Acc, ds_store()) -> Acc when
ds_foreach(Fun, St) -> Acc :: any().
ds_fold(Fun, Acc0, St) ->
psq:fold( psq:fold(
fun(_, _N, Pid, _) -> fun(_, N, Pid, Acc) ->
Fun(Pid) Fun(Pid, N, Acc)
end, ok, St). end, Acc0, St).
%% Add new downstream to storage %% Add new downstream to storage
-spec ds_add_downstream(downstream(), ds_store()) -> ds_store(). -spec ds_add_downstream(downstream(), ds_store()) -> ds_store().
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
%%% @doc %%% @doc
%%% Supervisor for mtp_down_conn processes %%% Supervisor for mtp_down_conn processes
%%% @end %%% @end
%%% TODO: maybe have one supervisor per-DC
%%% Created : 14 Oct 2018 by Sergey <me@seriyps.ru> %%% Created : 14 Oct 2018 by Sergey <me@seriyps.ru>
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(mtp_down_conn_sup). -module(mtp_down_conn_sup).
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment