Add special helpers for live config_change

parent f1070dfe
......@@ -20,7 +20,8 @@
get_netloc/1,
get_netloc_safe/1,
get_secret/0,
status/0]).
status/0,
update/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
......@@ -103,16 +104,21 @@ get_secret() ->
[{_, Key}] = ets:lookup(?TAB, key),
Key.
-spec status() -> [mtp_dc_pool:status()].
status() ->
[{?IDS_KEY, L}] = ets:lookup(?TAB, ?IDS_KEY),
lists:map(
fun(DcId) ->
{ok, Pid} = get_downstream_pool(DcId),
DcPoolStatus = mtp_dc_pool:status(Pid),
DcPoolStatus#{dc_id => DcId}
mtp_dc_pool:status(Pid)
end, L).
-spec update() -> ok.
update() ->
gen_server:cast(?MODULE, update).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
......@@ -133,8 +139,14 @@ init([]) ->
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_cast(update, #state{timer = Timer} = State) ->
update(State, soft),
lager:info("Config updated"),
Timer1 = gen_timeout:bump(
gen_timeout:reset(Timer)),
{noreply, State#state{timer = Timer1}}.
handle_info(timeout, #state{timer = Timer} =State) ->
case gen_timeout:is_expired(Timer) of
true ->
......
......@@ -26,6 +26,7 @@
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export_type([status/0]).
-define(SERVER, ?MODULE).
-define(APP, mtproto_proxy).
......@@ -34,6 +35,11 @@
-type upstream() :: mtp_handler:handle().
-type downstream() :: mtp_down_conn:handle().
-type ds_store() :: psq:psq().
-type status() :: #{n_downstreams := non_neg_integer(),
n_upstreams := non_neg_integer(),
min := non_neg_integer(),
max := non_neg_integer(),
dc_id := mtp_config:dc_id()}.
-record(state,
{dc_id :: mtp_config:dc_id(),
......@@ -76,6 +82,7 @@ add_connection(Pool) ->
ack_connected(Pool, Downstream) ->
gen_server:cast(Pool, {connected, Downstream}).
-spec status(pid()) -> status().
status(Pool) ->
gen_server:call(Pool, status).
......@@ -97,7 +104,8 @@ handle_call(add_connection, _From, State) ->
State1 = connect(State),
{reply, ok, State1};
handle_call(status, _From, #state{downstreams = Ds,
upstreams = Us} = State) ->
upstreams = Us,
dc_id = DcId} = State) ->
{NDowns, NUps, Min, Max} =
ds_fold(
fun(_Pid, N, {NDowns, NUps, Min, Max}) ->
......@@ -106,7 +114,8 @@ handle_call(status, _From, #state{downstreams = Ds,
{reply, #{n_downstreams => NDowns,
n_upstreams => NUps,
min => Min,
max => Max}, State}.
max => Max,
dc_id => DcId}, State}.
handle_cast({return, Upstream}, State) ->
{noreply, handle_return(Upstream, State)};
......
......@@ -16,7 +16,8 @@
upstream_closed/2,
shutdown/1,
send/2,
ack/3]).
ack/3,
set_config/3]).
-ifdef(TEST).
-export([get_middle_key/1]).
-endif.
......@@ -93,6 +94,10 @@ send(Conn, Data) ->
ack(Conn, Count, Size) ->
gen_server:cast(Conn, {ack, self(), Count, Size}).
-spec set_config(handle(), atom(), any()) -> {ok, OldValue :: any()} | ignored.
set_config(Conn, Option, Value) ->
gen_server:call(Conn, {set_config, Option, Value}).
init([Pool, DcId]) ->
self() ! do_connect,
{ok, #state{pool = Pool,
......@@ -100,7 +105,20 @@ init([Pool, DcId]) ->
handle_call({send, Data}, {Upstream, _}, State) ->
{Res, State1} = handle_send(Data, Upstream, State),
{reply, Res, State1}.
{reply, Res, State1};
handle_call({set_config, Name, Value}, _From, State) ->
Result =
case Name of
downstream_socket_buffer_size when is_integer(Value),
Value >= 512 ->
{ok, [{buffer, OldSize}]} = inet:getopts(State#state.sock, [buffer]),
ok = inet:setopts(State#state.sock, [{buffer, Value}]),
{ok, OldSize};
_ ->
lager:warning("set_config ~p=~p ignored", [Name, Value]),
ignored
end,
{reply, Result, State}.
handle_cast({ack, Upstream, Count, Size}, State) ->
{noreply, handle_ack(Upstream, Count, Size, State)};
......@@ -112,7 +130,6 @@ handle_cast({upstream_closed, Upstream}, State) ->
handle_cast(shutdown, State) ->
{stop, shutdown, State}.
handle_info({tcp, Sock, Data}, #state{sock = Sock, dc_id = DcId} = S) ->
mtp_metric:count_inc([?APP, received, downstream, bytes], byte_size(Data), #{labels => [DcId]}),
mtp_metric:histogram_observe([?APP, tracker_packet_size, bytes], byte_size(Data), #{labels => [downstream]}),
......
......@@ -8,9 +8,16 @@
-behaviour(application).
%% Application callbacks
-export([start/2, prep_stop/1, stop/1, start_proxy/1]).
-export([start/2, prep_stop/1, stop/1, config_change/3]).
-export([mtp_listeners/0, running_ports/0, start_proxy/1]).
-define(APP, mtproto_proxy).
-type proxy_port() :: #{name := any(),
port := inet:port_number(),
secret := binary(),
tag := binary(),
listen_ip => inet:ip4_addr()}.
%%====================================================================
%% API
%%====================================================================
......@@ -22,17 +29,54 @@ start(_StartType, _StartArgs) ->
[start_proxy(Where) || Where <- application:get_env(?APP, ports, [])],
Res.
%%--------------------------------------------------------------------
prep_stop(State) ->
[stop_proxy(Where) || Where <- application:get_env(?APP, ports, [])],
State.
stop(_State) ->
ok.
config_change(Changed, New, Removed) ->
%% app's env is already updated when this callback is called
ok = lists:foreach(fun(K) -> config_changed(removed, K, []) end, Removed),
ok = lists:foreach(fun({K, V}) -> config_changed(changed, K, V) end, Changed),
ok = lists:foreach(fun({K, V}) -> config_changed(new, K, V) end, New).
%%--------------------------------------------------------------------
%% @doc List of ranch listeners running mtproto_proxy
-spec mtp_listeners() -> [tuple()].
mtp_listeners() ->
lists:filter(
fun({_Name, Opts}) ->
proplists:get_value(protocol, Opts) == mtp_handler
end,
ranch:info()).
%% @doc Currently running listeners in a form of proxy_port()
-spec running_ports() -> [proxy_port()].
running_ports() ->
lists:map(
fun({Name, Opts}) ->
#{protocol_options := ProtoOpts,
ip := Ip,
port := Port} = maps:from_list(Opts),
[Name, Secret, AdTag] = ProtoOpts,
#{name => Name,
listen_ip => inet:ntoa(Ip),
port => Port,
secret => Secret,
tag => AdTag}
end, mtp_listeners()).
%%====================================================================
%% Internal functions
%%====================================================================
-spec start_proxy(proxy_port()) -> {ok, pid()}.
start_proxy(#{name := Name, port := Port, secret := Secret, tag := Tag} = P) ->
ListenIpStr = maps:get(
listen_ip, P,
......@@ -59,6 +103,41 @@ start_proxy(#{name := Name, port := Port, secret := Secret, tag := Tag} = P) ->
stop_proxy(#{name := Name}) ->
ranch:stop_listener(Name).
config_changed(_, ip_lookup_services, _) ->
mtp_config:update();
config_changed(_, proxy_secret_url, _) ->
mtp_config:update();
config_changed(_, proxy_config_url, _) ->
mtp_config:update();
config_changed(Action, max_connections, N) when Action == new; Action == changed ->
(is_integer(N) and (N >= 0)) orelse error({"max_connections should be non_neg_integer", N}),
lists:foreach(fun({Name, _}) ->
ranch:set_max_connections(Name, N)
end, mtp_listeners());
config_changed(Action, downstream_socket_buffer_size, N) when Action == new; Action == changed ->
[{ok, _} = mtp_down_conn:set_config(Pid, downstream_socket_buffer_size, N)
|| {_, Pid, worker, [mtp_down_conn]}
<- supervisor:which_children(mtp_down_conn_sup)],
ok;
%% Since upstream connections are mostly short-lived, live-update doesn't make much difference
%% config_changed(Action, upstream_socket_buffer_size, N) when Action == new; Action == changed ->
config_changed(Action, ports, Ports) when Action == new; Action == changed ->
%% TODO: update secret or ad_tag without disconnect
RanchPorts = ordsets:from_list(running_ports()),
DefaultListenIp = #{listen_ip => application:get_env(?APP, listen_ip, "0.0.0.0")},
NewPorts = ordsets:from_list([maps:merge(DefaultListenIp, Port)
|| Port <- Ports]),
ToStop = ordsets:subtract(RanchPorts, NewPorts),
ToStart = ordsets:subtract(NewPorts, RanchPorts),
lists:foreach(fun stop_proxy/1, ToStop),
[{ok, _} = mtproto_proxy_app:start_proxy(Conf) || Conf <- ToStart],
ok;
config_changed(Action, K, V) ->
%% Most of the other config options are applied automatically without extra work
lager:info("Config ~p ~p to ~p ignored", [K, Action, V]),
ok.
-ifdef(TEST).
report(Fmt, Args) ->
lager:debug(Fmt, Args).
......
......@@ -11,7 +11,8 @@
echo_abridged_many_packets_case/1,
packet_too_large_case/1,
downstream_size_backpressure_case/1,
downstream_qlen_backpressure_case/1
downstream_qlen_backpressure_case/1,
config_change_case/1
]).
-export([set_env/2,
......@@ -248,11 +249,65 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) ->
%% ct:pal("Metric: ~p", [sys:get_state(mtp_test_metric)]),
ok.
gen_rpc_replies(#{packet := Packet, n := N}, ConnId, St) ->
Rpcs = [{proxy_ans, ConnId, Packet} || _ <- lists:seq(1, N)],
{return, {rpc_multi, Rpcs, St#{ConnId => 1}}}.
%% @doc test mtproto_proxy_app:config_change/3
config_change_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{}, Cfg);
config_change_case({post, Cfg}) ->
stop_single(Cfg);
config_change_case(Cfg) when is_list(Cfg) ->
%% test "max_connections"
MaxConnsBefore = [{Listener, proplists:get_value(max_connections, Opts)}
|| {Listener, Opts} <- mtproto_proxy_app:mtp_listeners()],
NewMaxConns = 10,
ok = mtproto_proxy_app:config_change([{max_connections, NewMaxConns}], [], []),
MaxConnsAfter = [{Listener, proplists:get_value(max_connections, Opts)}
|| {Listener, Opts} <- mtproto_proxy_app:mtp_listeners()],
?assertNotEqual(MaxConnsBefore, MaxConnsAfter),
?assert(lists:all(fun({_Listener, MaxConns}) ->
MaxConns == NewMaxConns
end, MaxConnsAfter),
MaxConnsAfter),
%% test downstream_socket_buffer_size
GetBufferSizes =
fun() ->
lists:map(
fun({_, Pid, worker, [mtp_down_conn]}) ->
%% This is hacky and may brake in future erlang releases
{links, Links} = process_info(Pid, links),
[Port] = [L || L <- Links, is_port(L)],
{ok, [{buffer, BufSize}]} = inet:getopts(Port, [buffer]),
{Pid, BufSize}
end, supervisor:which_children(mtp_down_conn_sup))
end,
BufSizesBefore = GetBufferSizes(),
NewBufSize = 512,
ok = mtproto_proxy_app:config_change([{downstream_socket_buffer_size, NewBufSize}], [], []),
BufSizesAfter = GetBufferSizes(),
?assertNotEqual(BufSizesBefore, BufSizesAfter),
?assert(lists:all(fun({_Conn, BufSize}) ->
BufSize == NewBufSize
end, BufSizesAfter),
BufSizesAfter),
%% test ports
PortsBefore = mtproto_proxy_app:running_ports(),
?assertMatch([#{name := _,
listen_ip := _,
port := _,
secret := _,
tag := _}], PortsBefore),
ok = mtproto_proxy_app:config_change([{ports, []}], [], []),
?assertEqual([], mtproto_proxy_app:running_ports()),
ok = mtproto_proxy_app:config_change([{ports, PortsBefore}], [], []),
?assertEqual(PortsBefore, mtproto_proxy_app:running_ports()),
ok.
%% TODO: send a lot, not read, and then close - assert connection IDs are cleaned up
%% Helpers
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment