Separate metrics for different backpressure types; test tweaks

parent efafd37c
...@@ -293,8 +293,12 @@ non_ack_bump(Upstream, Size, #state{non_ack_count = Cnt, ...@@ -293,8 +293,12 @@ non_ack_bump(Upstream, Size, #state{non_ack_count = Cnt,
UpsOct + Size}}}). UpsOct + Size}}}).
%% Do we have too much unconfirmed packets? %% Do we have too much unconfirmed packets?
is_overflow(#state{non_ack_count = Cnt, non_ack_bytes = Oct}) -> is_overflow(#state{non_ack_count = Cnt}) when Cnt > ?MAX_NON_ACK_COUNT ->
(Cnt > ?MAX_NON_ACK_COUNT) orelse (Oct > ?MAX_NON_ACK_BYTES). count;
is_overflow(#state{non_ack_bytes = Oct}) when Oct > ?MAX_NON_ACK_BYTES ->
bytes;
is_overflow(_) ->
false.
%% If we are not overflown and socket is passive, activate it %% If we are not overflown and socket is passive, activate it
activate_if_no_overflow(#state{overflow_passive = false, sock = Sock}) -> activate_if_no_overflow(#state{overflow_passive = false, sock = Sock}) ->
...@@ -323,14 +327,14 @@ handle_ack(Upstream, Count, Size, #state{non_ack_count = Cnt, ...@@ -323,14 +327,14 @@ handle_ack(Upstream, Count, Size, #state{non_ack_count = Cnt,
maybe_deactivate(#state{overflow_passive = false, dc_id = Dc} = St) -> maybe_deactivate(#state{overflow_passive = false, dc_id = Dc} = St) ->
case is_overflow(St) of case is_overflow(St) of
true ->
%% Was not overflow, now overflowed
mtp_metric:count_inc([?APP, down_backpressure, total], 1,
#{labels => [Dc, true]}),
St#state{overflow_passive = true};
false -> false ->
%% Was not overflow and still not %% Was not overflow and still not
St St;
Type ->
%% Was not overflow, now overflowed
mtp_metric:count_inc([?APP, down_backpressure, total], 1,
#{labels => [Dc, Type]}),
St#state{overflow_passive = true}
end; end;
maybe_deactivate(St) -> maybe_deactivate(St) ->
St. St.
...@@ -338,15 +342,15 @@ maybe_deactivate(St) -> ...@@ -338,15 +342,15 @@ maybe_deactivate(St) ->
%% Activate socket if we changed state from overflow to ok %% Activate socket if we changed state from overflow to ok
maybe_activate(#state{overflow_passive = true, sock = Sock, dc_id = Dc} = St) -> maybe_activate(#state{overflow_passive = true, sock = Sock, dc_id = Dc} = St) ->
case is_overflow(St) of case is_overflow(St) of
true ->
%% Still overflow
St;
false -> false ->
%% Was overflow, but now resolved %% Was overflow, but now resolved
ok = inet:setopts(Sock, [{active, once}]), ok = inet:setopts(Sock, [{active, once}]),
mtp_metric:count_inc([?APP, down_backpressure, total], 1, mtp_metric:count_inc([?APP, down_backpressure, total], 1,
#{labels => [Dc, false]}), #{labels => [Dc, off]}),
St#state{overflow_passive = false} St#state{overflow_passive = false};
_ ->
%% Still overflow
St
end; end;
maybe_activate(#state{} = St) -> maybe_activate(#state{} = St) ->
St. St.
......
...@@ -155,7 +155,7 @@ active_metrics() -> ...@@ -155,7 +155,7 @@ active_metrics() ->
{count, [?APP, down_backpressure, total], {count, [?APP, down_backpressure, total],
"Times downstream backpressure state was changed", "Times downstream backpressure state was changed",
#{labels => [dc_id, enabled]}}, #{labels => [dc_id, state]}},
{histogram, [?APP, upstream_send_duration, seconds], {histogram, [?APP, upstream_send_duration, seconds],
"Duration of tcp send calls to upstream", "Duration of tcp send calls to upstream",
#{duration_unit => seconds, #{duration_unit => seconds,
......
...@@ -126,7 +126,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -126,7 +126,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) ->
%% Backpressure by size limit is defined in mtp_down_conn.erl:?MAX_NON_ACK_BYTES %% Backpressure by size limit is defined in mtp_down_conn.erl:?MAX_NON_ACK_BYTES
BPressureThreshold = ?MB(6), BPressureThreshold = ?MB(6),
PacketSize = ?KB(400), PacketSize = ?KB(400),
NPackets = 2 * BPressureThreshold div PacketSize, NPackets = 4 * BPressureThreshold div PacketSize,
Packet = crypto:strong_rand_bytes(PacketSize), Packet = crypto:strong_rand_bytes(PacketSize),
Req = mtp_test_cmd_rpc:call(?MODULE, gen_rpc_replies, Req = mtp_test_cmd_rpc:call(?MODULE, gen_rpc_replies,
#{packet => Packet, n => NPackets}), #{packet => Packet, n => NPackets}),
...@@ -135,7 +135,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -135,7 +135,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) ->
%% Wait for backpressure-in %% Wait for backpressure-in
?assertEqual( ?assertEqual(
ok, mtp_test_metric:wait_for_value( ok, mtp_test_metric:wait_for_value(
count, [?APP, down_backpressure, total], [DcId, true], 1, 5000)), count, [?APP, down_backpressure, total], [DcId, bytes], 1, 5000)),
%% Upstream healthcheck should be disabled, otherwise it can interfere %% Upstream healthcheck should be disabled, otherwise it can interfere
?assertEqual(not_found, ?assertEqual(not_found,
mtp_test_metric:get_tags( mtp_test_metric:get_tags(
...@@ -143,7 +143,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -143,7 +143,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) ->
%% No backpressure-out, because we don't read any data %% No backpressure-out, because we don't read any data
?assertEqual(not_found, ?assertEqual(not_found,
mtp_test_metric:get_tags( mtp_test_metric:get_tags(
count, [?APP, down_backpressure, total], [DcId, false])), count, [?APP, down_backpressure, total], [DcId, off])),
%% Amount of bytes received by proxy will be bigger than amount sent to upstreams %% Amount of bytes received by proxy will be bigger than amount sent to upstreams
TgToProxy = TgToProxy =
mtp_test_metric:get_tags( mtp_test_metric:get_tags(
...@@ -156,7 +156,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -156,7 +156,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) ->
{ok, _RecvPackets, Cli2} = mtp_test_client:recv_all(Cli1, 1000), {ok, _RecvPackets, Cli2} = mtp_test_client:recv_all(Cli1, 1000),
?assertEqual( ?assertEqual(
ok, mtp_test_metric:wait_for( ok, mtp_test_metric:wait_for(
count, [?APP, down_backpressure, total], [DcId, true], count, [?APP, down_backpressure, total], [DcId, bytes],
fun(V) -> is_integer(V) and (V > 0) end, 5000)), fun(V) -> is_integer(V) and (V > 0) end, 5000)),
ok = mtp_test_client:close(Cli2), ok = mtp_test_client:close(Cli2),
%% ct:pal("t->p ~p; p->c ~p; diff ~p", %% ct:pal("t->p ~p; p->c ~p; diff ~p",
...@@ -198,7 +198,7 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -198,7 +198,7 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) ->
%% Wait for backpressure-in %% Wait for backpressure-in
?assertEqual( ?assertEqual(
ok, mtp_test_metric:wait_for_value( ok, mtp_test_metric:wait_for_value(
count, [?APP, down_backpressure, total], [DcId, true], 1, 5000)), count, [?APP, down_backpressure, total], [DcId, count], 1, 5000)),
%% Close connection to release backpressure %% Close connection to release backpressure
ok = mtp_test_client:close(Cli1), ok = mtp_test_client:close(Cli1),
?assertEqual( ?assertEqual(
...@@ -206,7 +206,7 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) -> ...@@ -206,7 +206,7 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) ->
count, [?APP, in_connection_closed, total], [?FUNCTION_NAME], 1, 5000)), count, [?APP, in_connection_closed, total], [?FUNCTION_NAME], 1, 5000)),
?assertEqual( ?assertEqual(
ok, mtp_test_metric:wait_for( ok, mtp_test_metric:wait_for(
count, [?APP, down_backpressure, total], [DcId, true], count, [?APP, down_backpressure, total], [DcId, off],
fun(V) -> is_integer(V) and (V > 0) end, 5000)), fun(V) -> is_integer(V) and (V > 0) end, 5000)),
%% [{_, Pid, _, _}] = supervisor:which_children(mtp_down_conn_sup), %% [{_, Pid, _, _}] = supervisor:which_children(mtp_down_conn_sup),
%% ct:pal("Down conn state: ~p", [sys:get_state(Pid)]), %% ct:pal("Down conn state: ~p", [sys:get_state(Pid)]),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment