Refactor self-healtcheck

* Trigger GC with lower threshold
* Unconditionally die if queue len is too large
parent 4f30749d
......@@ -24,9 +24,11 @@
-define(MAX_SOCK_BUF_SIZE, 1024 * 50). % Decrease if CPU is cheaper than RAM
-define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb
-define(QUEUE_CHECK_INTERVAL, 5000).
-define(QUEUE_CHECK_MAX_LEN, 50).
-define(QUEUE_CHECK_MAX_MEM, 5 * 1024 * 1024). %5mb
-define(HEALTH_CHECK_INTERVAL, 5000).
-define(HEALTH_CHECK_MAX_QLEN, 300).
-define(HEALTH_CHECK_GC, 400 * 1024). %400kb
-define(HEALTH_CHECK_MAX_MEM, 4 * 1024 * 1024). %4mb
-define(APP, mtproto_proxy).
......@@ -123,7 +125,7 @@ handle_cast({proxy_ans, Down, Data}, #state{down = Down} = S) ->
%% telegram server -> proxy
case up_send(Data, S) of
{ok, S1} ->
check_queue_overflow(bump_timer(S1));
maybe_check_health(bump_timer(S1));
{error, Reason} ->
lager:error("Error sending tunnelled data to in socket: ~p", [Reason]),
{stop, normal, S}
......@@ -148,6 +150,7 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock,
case handle_upstream_data(Data, S) of
{ok, S1} ->
ok = Transport:setopts(Sock, [{active, once}]),
%% Consider checking health here as well
{noreply, bump_timer(S1)};
{error, Reason} ->
lager:info("handle_data error ~p", [Reason]),
......@@ -313,14 +316,14 @@ handle_upstream_header(DcId, #state{acc = Acc, ad_tag = Tag, addr = Addr} = S) -
%% @doc Terminate if message queue is too big
check_queue_overflow(#state{last_queue_check = LastCheck} = S) ->
maybe_check_health(#state{last_queue_check = LastCheck} = S) ->
NowMs = erlang:system_time(millisecond),
Delta = NowMs - LastCheck,
case Delta < ?QUEUE_CHECK_INTERVAL of
case Delta < ?HEALTH_CHECK_INTERVAL of
true ->
{noreply, S};
false ->
case do_check_queue_overflow(true) of
case check_health() of
ok ->
{noreply, S#state{last_queue_check = NowMs}};
overflow ->
......@@ -328,33 +331,53 @@ check_queue_overflow(#state{last_queue_check = LastCheck} = S) ->
end
end.
do_check_queue_overflow(Gc) ->
[{_, QLen}, {_, Mem}, {_, Bin}] =
erlang:process_info(self(), [message_queue_len, memory, binary]),
%% BinSum = sum_binary(Bin),
%% lager:debug("Process size check: queue_len=~w, total_mem=~w, memory=~w, binary_sum=~w, binary=~w",
%% [QLen, (Mem + BinSum) / 1024, Mem, BinSum, Bin]),
case QLen > ?QUEUE_CHECK_MAX_LEN of
true ->
RefcBinSize = sum_binary(Bin),
TotalMem = Mem + RefcBinSize,
case TotalMem > ?QUEUE_CHECK_MAX_MEM of
true when Gc->
%% 1. If proc queue > 300 - stop
%% 2. If proc total memory > 400kb - do GC and go to 3
%% 3. If proc total memory > 4mb - stop
check_health() ->
do_check_health([qlen, gc, total_mem], calc_health()).
do_check_health([qlen | _], #{message_queue_len := QLen} = Health) when
QLen > ?HEALTH_CHECK_MAX_QLEN ->
mtp_metric:count_inc([?APP, healthcheck, total], 1,
#{labels => [message_queue_len]}),
lager:warning("Upstream too large queue_len=~w, health=~p", [QLen, Health]),
overflow;
do_check_health([gc | Other], #{total_mem := TotalMem}) when
TotalMem > ?HEALTH_CHECK_GC ->
%% Maybe it doesn't makes sense to do GC if queue len is more than, eg, 50?
%% In this case allmost all memory will be in msg queue
mtp_metric:count_inc([?APP, healthcheck, total], 1,
#{labels => [force_gc]}),
erlang:garbage_collect(self()),
do_check_queue_overflow(false);
true ->
lager:warning(
"Process too large queue_len=~w, memory=~w, binary_sum=~w, binary=~p",
[QLen, Mem, RefcBinSize, Bin]),
do_check_health(Other, calc_health());
do_check_health([total_mem | _Other], #{total_mem := TotalMem} = Health) when
TotalMem > ?HEALTH_CHECK_MAX_MEM ->
mtp_metric:count_inc([?APP, healthcheck, total], 1,
#{labels => [total_memory]}),
lager:warning("Process too large total_mem=~p, health=~p",
[TotalMem / 1024, Health]),
overflow;
false -> ok
end;
false ->
ok
end.
do_check_health([_Ok | Other], Health) ->
do_check_health(Other, Health);
do_check_health([], _) ->
ok.
sum_binary(Bin) ->
trunc(lists:sum([Size / RefC || {_, Size, RefC} <- Bin])).
calc_health() ->
[{_, QLen}, {_, Mem}, {_, BinInfo}] =
erlang:process_info(self(), [message_queue_len, memory, binary]),
RefcBinSize = sum_binary(BinInfo),
TotalMem = Mem + RefcBinSize,
#{message_queue_len => QLen,
memory => Mem,
refc_bin_size => RefcBinSize,
refc_bin_count => length(BinInfo),
total_mem => TotalMem}.
sum_binary(BinInfo) ->
trunc(lists:foldl(fun({_, Size, RefC}, Sum) ->
Sum + (Size / RefC)
end, 0, BinInfo)).
hex(Bin) ->
<<begin
......
......@@ -111,6 +111,9 @@ active_metrics() ->
{count, [?APP, timer_switch, total],
"Connection timeout mode switches",
#{labels => [listener, from, to]}},
{count, [?APP, healthcheck, total],
"Upstream self-healthcheck triggered some action",
#{labels => [action]}},
{count, [?APP, received, bytes],
"Bytes transmitted from upstream/downstream socket",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment