Make downstream backpressure tunable

parent e2308a6e
...@@ -34,10 +34,6 @@ ...@@ -34,10 +34,6 @@
-define(CONN_TIMEOUT, 10000). -define(CONN_TIMEOUT, 10000).
-define(SEND_TIMEOUT, 15000). -define(SEND_TIMEOUT, 15000).
-define(MAX_SOCK_BUF_SIZE, 1024 * 500). % Decrease if CPU is cheaper than RAM -define(MAX_SOCK_BUF_SIZE, 1024 * 500). % Decrease if CPU is cheaper than RAM
%% One slow client can slowdown everyone on the same downstream, but it have
%% it's own healthchecks
-define(MAX_NON_ACK_COUNT, 300).
-define(MAX_NON_ACK_BYTES, 1024 * 1024 * 6). % 6MB
-ifndef(OTP_RELEASE). % pre-OTP21 -ifndef(OTP_RELEASE). % pre-OTP21
-define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ). -define(WITH_STACKTRACE(T, R, S), T:R -> S = erlang:get_stacktrace(), ).
...@@ -67,6 +63,15 @@ ...@@ -67,6 +63,15 @@
overflow_passive = false :: boolean(), overflow_passive = false :: boolean(),
non_ack_count = 0 :: non_neg_integer(), non_ack_count = 0 :: non_neg_integer(),
non_ack_bytes = 0 :: non_neg_integer(), non_ack_bytes = 0 :: non_neg_integer(),
backpressure_conf :: {
%%ovarall max num non acked packets
non_neg_integer(),
%%ovarall max non acked bytes
non_neg_integer(),
%%max non acked packets per-upstream
non_neg_integer() | float() | undefined,
%%max non-acked bytes per-upstream
non_neg_integer() | undefined},
pool :: pid(), pool :: pid(),
dc_id :: mtp_config:dc_id(), dc_id :: mtp_config:dc_id(),
netloc :: mtp_config:netloc() | undefined % telegram server ip:port netloc :: mtp_config:netloc() | undefined % telegram server ip:port
...@@ -102,25 +107,40 @@ set_config(Conn, Option, Value) -> ...@@ -102,25 +107,40 @@ set_config(Conn, Option, Value) ->
init([Pool, DcId]) -> init([Pool, DcId]) ->
self() ! do_connect, self() ! do_connect,
{ok, #state{pool = Pool, BpOpts = application:get_env(?APP, downstream_backpressure, #{}),
{ok, UpsPerDown} = application:get_env(?APP, clients_per_dc_connection),
BackpressureConf = build_backpressure_conf(UpsPerDown, BpOpts),
{ok, #state{backpressure_conf = BackpressureConf,
pool = Pool,
dc_id = DcId}}. dc_id = DcId}}.
handle_call({send, Data}, {Upstream, _}, State) -> handle_call({send, Data}, {Upstream, _}, State) ->
{Res, State1} = handle_send(Data, Upstream, State), {Res, State1} = handle_send(Data, Upstream, State),
{reply, Res, State1}; {reply, Res, State1};
handle_call({set_config, Name, Value}, _From, State) -> handle_call({set_config, Name, Value}, _From, State) ->
Result = {Response, State1} =
case Name of case Name of
downstream_socket_buffer_size when is_integer(Value), downstream_socket_buffer_size when is_integer(Value),
Value >= 512 -> Value >= 512 ->
{ok, [{buffer, OldSize}]} = inet:getopts(State#state.sock, [buffer]), {ok, [{buffer, OldSize}]} = inet:getopts(State#state.sock, [buffer]),
ok = inet:setopts(State#state.sock, [{buffer, Value}]), ok = inet:setopts(State#state.sock, [{buffer, Value}]),
{ok, OldSize}; {{ok, OldSize}, State};
downstream_backpressure when is_map(Value) ->
{ok, UpsPerDown} = application:get_env(?APP, clients_per_dc_connection),
try build_backpressure_conf(UpsPerDown, Value) of
BpConfig ->
{{ok, State#state.backpressure_conf},
State#state{backpressure_conf = BpConfig}}
catch Type:Reason ->
?log(error, "~p: not updating downstream_backpressure: ~p",
[Type, Reason]),
{ignored, State}
end;
_ -> _ ->
?log(warning, "set_config ~p=~p ignored", [Name, Value]), ?log(warning, "set_config ~p=~p ignored", [Name, Value]),
ignored {ignored, State}
end, end,
{reply, Result, State}. {reply, Response, State1}.
handle_cast({ack, Upstream, Count, Size}, State) -> handle_cast({ack, Upstream, Count, Size}, State) ->
{noreply, handle_ack(Upstream, Count, Size, State)}; {noreply, handle_ack(Upstream, Count, Size, State)};
...@@ -299,6 +319,26 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) -> ...@@ -299,6 +319,26 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) ->
%% Backpressure %% Backpressure
%% %%
build_backpressure_conf(UpstreamsPerDownstream, BpConf) ->
BytesTotal = maps:get(bytes_total, BpConf, UpstreamsPerDownstream * 30 * 1024),
PacketsTotal = maps:get(packets_total, BpConf, UpstreamsPerDownstream * 2),
BytesPerUpstream = maps:get(bytes_per_upstream, BpConf, undefined),
PacketsPerUpstream = maps:get(packets_per_upstream, BpConf, undefined),
(is_integer(BytesTotal)
andalso (BytesTotal > 1024)
andalso is_integer(PacketsTotal)
andalso (PacketsTotal > 10))
orelse error({invalid_downstream_backpressure, BpConf}),
((undefined == BytesPerUpstream)
orelse (is_integer(BytesPerUpstream)
andalso BytesPerUpstream >= 1024))
orelse error({invalid_bytes_per_upstream, BytesPerUpstream}),
((undefined == PacketsPerUpstream)
orelse (is_number(PacketsPerUpstream)
andalso PacketsPerUpstream >= 1))
orelse error({invalid_bytes_per_upstream, PacketsPerUpstream}),
{PacketsTotal, BytesTotal, PacketsPerUpstream, BytesPerUpstream}.
%% Bumb counters of non-acked packets %% Bumb counters of non-acked packets
non_ack_bump(Upstream, Size, #state{non_ack_count = Cnt, non_ack_bump(Upstream, Size, #state{non_ack_count = Cnt,
non_ack_bytes = Oct, non_ack_bytes = Oct,
...@@ -312,10 +352,24 @@ non_ack_bump(Upstream, Size, #state{non_ack_count = Cnt, ...@@ -312,10 +352,24 @@ non_ack_bump(Upstream, Size, #state{non_ack_count = Cnt,
UpsOct + Size}}}). UpsOct + Size}}}).
%% Do we have too much unconfirmed packets? %% Do we have too much unconfirmed packets?
is_overflow(#state{non_ack_count = Cnt}) when Cnt > ?MAX_NON_ACK_COUNT -> is_overflow(#state{non_ack_count = Cnt,
count; backpressure_conf = {MaxCount, _, _, _}}) when Cnt > MaxCount ->
is_overflow(#state{non_ack_bytes = Oct}) when Oct > ?MAX_NON_ACK_BYTES -> count_total;
bytes; is_overflow(#state{non_ack_bytes = Oct,
backpressure_conf = {_, MaxOct, _, _}}) when Oct > MaxOct ->
bytes_total;
is_overflow(#state{non_ack_count = Cnt,
upstreams = Ups,
backpressure_conf = {_, _, MaxPerConCnt, _}}) when
is_number(MaxPerConCnt),
Cnt > (map_size(Ups) * MaxPerConCnt) ->
count_per_upstream;
is_overflow(#state{non_ack_bytes = Oct,
upstreams = Ups,
backpressure_conf = {_, _, _, MaxPerConOct}}) when
is_integer(MaxPerConOct),
Oct > (map_size(Ups) * MaxPerConOct) ->
bytes_per_upstream;
is_overflow(_) -> is_overflow(_) ->
false. false.
......
...@@ -103,6 +103,26 @@ ...@@ -103,6 +103,26 @@
%% {gc, 409600}, % if connection memory >X - do garbage collection %% {gc, 409600}, % if connection memory >X - do garbage collection
%% {total_mem, 3145728} % if connection memory >X - close connection %% {total_mem, 3145728} % if connection memory >X - close connection
%% ]}, %% ]},
%% Downstream backpressure tuning
%% Values are configured per downstream connection, so, for example, if
%% `clients_per_dc_connection' is 300 and current number of connections
%% is 60,000, then there will be 200 downstream connections, each will
%% keep reading data from it's socket unless there is 10mb of data or
%% 600 packets not yet delivered by it's upstreams to clients
%% `*_per_upstream' options are the same, but will be multiplied by the
%% number of upstreams currently connected to this downstream
%% {downstream_backpressure,
%% #{%% 10mb; if not specified, it's 30kb * clients_per_dc_connection
%% bytes_total => 10485760,
%% %% if not specified, it's 2 * clients_per_dc_connection
%% packets_total => 600,
%% %% integer >= 1024
%% %% if not specified this check is skipped
%% bytes_per_upstream => 51200, %50kb
%% %% float >= 1
%% %% if not specified this check is skipped
%% packets_per_upstream => 3}}
]}, ]},
{modules, []}, {modules, []},
......
...@@ -119,8 +119,12 @@ config_changed(Action, max_connections, N) when Action == new; Action == changed ...@@ -119,8 +119,12 @@ config_changed(Action, max_connections, N) when Action == new; Action == changed
end, mtp_listeners()); end, mtp_listeners());
config_changed(Action, downstream_socket_buffer_size, N) when Action == new; Action == changed -> config_changed(Action, downstream_socket_buffer_size, N) when Action == new; Action == changed ->
[{ok, _} = mtp_down_conn:set_config(Pid, downstream_socket_buffer_size, N) [{ok, _} = mtp_down_conn:set_config(Pid, downstream_socket_buffer_size, N)
|| {_, Pid, worker, [mtp_down_conn]} || Pid <- downstream_connections()],
<- supervisor:which_children(mtp_down_conn_sup)], ok;
config_changed(Action, downstream_backpressure, BpOpts) when Action == new; Action == changed ->
is_map(BpOpts) orelse error(invalid_downstream_backpressure),
[{ok, _} = mtp_down_conn:set_config(Pid, downstream_backpressure, BpOpts)
|| Pid <- downstream_connections()],
ok; ok;
%% Since upstream connections are mostly short-lived, live-update doesn't make much difference %% Since upstream connections are mostly short-lived, live-update doesn't make much difference
%% config_changed(Action, upstream_socket_buffer_size, N) when Action == new; Action == changed -> %% config_changed(Action, upstream_socket_buffer_size, N) when Action == new; Action == changed ->
...@@ -140,6 +144,9 @@ config_changed(Action, K, V) -> ...@@ -140,6 +144,9 @@ config_changed(Action, K, V) ->
?log(info, "Config ~p ~p to ~p ignored", [K, Action, V]), ?log(info, "Config ~p ~p to ~p ignored", [K, Action, V]),
ok. ok.
downstream_connections() ->
[Pid || {_, Pid, worker, [mtp_down_conn]} <- supervisor:which_children(mtp_down_conn_sup)].
-ifdef(TEST). -ifdef(TEST).
report(Fmt, Args) -> report(Fmt, Args) ->
......
...@@ -150,7 +150,10 @@ packet_too_large_case(Cfg) when is_list(Cfg) -> ...@@ -150,7 +150,10 @@ packet_too_large_case(Cfg) when is_list(Cfg) ->
downstream_size_backpressure_case({pre, Cfg}) -> downstream_size_backpressure_case({pre, Cfg}) ->
Cfg1 = setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg), Cfg1 = setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg),
%% Disable upstream healthchecks %% Disable upstream healthchecks
set_env([{upstream_healthchecks, []}], Cfg1); set_env([{upstream_healthchecks, []},
{downstream_backpressure,
#{bytes_total => 6 * 1024 * 1024,
packets_total => 1000}}], Cfg1);
downstream_size_backpressure_case({post, Cfg}) -> downstream_size_backpressure_case({post, Cfg}) ->
stop_single(Cfg), stop_single(Cfg),
reset_env(Cfg); reset_env(Cfg);
...@@ -172,7 +175,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -172,7 +175,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) ->
%% Wait for backpressure-in %% Wait for backpressure-in
?assertEqual( ?assertEqual(
ok, mtp_test_metric:wait_for_value( ok, mtp_test_metric:wait_for_value(
count, [?APP, down_backpressure, total], [DcId, bytes], 1, 5000)), count, [?APP, down_backpressure, total], [DcId, bytes_total], 1, 5000)),
%% Upstream healthcheck should be disabled, otherwise it can interfere %% Upstream healthcheck should be disabled, otherwise it can interfere
?assertEqual(not_found, ?assertEqual(not_found,
mtp_test_metric:get_tags( mtp_test_metric:get_tags(
...@@ -193,7 +196,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -193,7 +196,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) ->
{ok, _RecvPackets, Cli2} = mtp_test_client:recv_all(Cli1, 1000), {ok, _RecvPackets, Cli2} = mtp_test_client:recv_all(Cli1, 1000),
?assertEqual( ?assertEqual(
ok, mtp_test_metric:wait_for( ok, mtp_test_metric:wait_for(
count, [?APP, down_backpressure, total], [DcId, bytes], count, [?APP, down_backpressure, total], [DcId, bytes_total],
fun(V) -> is_integer(V) and (V > 0) end, 5000)), fun(V) -> is_integer(V) and (V > 0) end, 5000)),
ok = mtp_test_client:close(Cli2), ok = mtp_test_client:close(Cli2),
%% ct:pal("t->p ~p; p->c ~p; diff ~p", %% ct:pal("t->p ~p; p->c ~p; diff ~p",
...@@ -211,7 +214,10 @@ downstream_qlen_backpressure_case({pre, Cfg}) -> ...@@ -211,7 +214,10 @@ downstream_qlen_backpressure_case({pre, Cfg}) ->
%% socket data packet; %% socket data packet;
%% Disable upstream healthchecks %% Disable upstream healthchecks
Cfg1 = set_env([{downstream_socket_buffer_size, 1024}, Cfg1 = set_env([{downstream_socket_buffer_size, 1024},
{upstream_healthchecks, []}], Cfg), {upstream_healthchecks, []},
{downstream_backpressure,
#{bytes_total => 50 * 1024 * 1024,
packets_total => 300}}], Cfg),
setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg1); setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg1);
downstream_qlen_backpressure_case({post, Cfg}) -> downstream_qlen_backpressure_case({post, Cfg}) ->
stop_single(Cfg), stop_single(Cfg),
...@@ -235,7 +241,8 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -235,7 +241,8 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) ->
%% Wait for backpressure-in %% Wait for backpressure-in
?assertEqual( ?assertEqual(
ok, mtp_test_metric:wait_for_value( ok, mtp_test_metric:wait_for_value(
count, [?APP, down_backpressure, total], [DcId, count], 1, 5000)), count, [?APP, down_backpressure, total], [DcId, count_total], 1, 5000),
sys:get_state(mtp_test_metric)),
%% Close connection to release backpressure %% Close connection to release backpressure
ok = mtp_test_client:close(Cli1), ok = mtp_test_client:close(Cli1),
?assertEqual( ?assertEqual(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment