Terminate upstream if it have too long queue and uses too much memory

parent 88dddb5e
......@@ -24,6 +24,9 @@
-define(MAX_SOCK_BUF_SIZE, 1024 * 50). % Decrease if CPU is cheaper than RAM
-define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb
-define(QUEUE_CHECK_INTERVAL, 5000).
-define(QUEUE_CHECK_MAX_LEN, 50).
-define(QUEUE_CHECK_MAX_MEM, 5 * 1024 * 1024). %5mb
-define(APP, mtproto_proxy).
......@@ -46,7 +49,8 @@
addr :: mtp_config:netloc(), % IP/Port of remote side
started_at :: pos_integer(),
timer_state = init :: init | hibernate | stop,
timer :: gen_timeout:tout()}).
timer :: gen_timeout:tout(),
last_queue_check :: integer()}).
-type transport() :: module().
-type stage() :: init | tunnel.
......@@ -94,14 +98,16 @@ init({Socket, Transport, [Name, Secret, Tag]}) ->
{TimeoutKey, TimeoutDefault} = state_timeout(init),
Timer = gen_timeout:new(
#{timeout => {env, ?APP, TimeoutKey, TimeoutDefault}}),
NowMs = erlang:system_time(millisecond),
State = #state{sock = Socket,
secret = unhex(Secret),
listener = Name,
transport = Transport,
ad_tag = unhex(Tag),
addr = {Ip, Port},
started_at = erlang:system_time(millisecond),
timer = Timer},
started_at = NowMs,
timer = Timer,
last_queue_check = NowMs},
{ok, State};
{error, Reason} ->
mtp_metric:count_inc([?APP, in_connection_closed, total], 1, #{labels => [Name]}),
......@@ -117,7 +123,7 @@ handle_cast({proxy_ans, Down, Data}, #state{down = Down} = S) ->
%% telegram server -> proxy
case up_send(Data, S) of
{ok, S1} ->
{noreply, bump_timer(S1)};
check_queue_overflow(bump_timer(S1));
{error, Reason} ->
lager:error("Error sending tunnelled data to in socket: ~p", [Reason]),
{stop, normal, S}
......@@ -305,6 +311,48 @@ handle_upstream_header(DcId, #state{acc = Acc, ad_tag = Tag, addr = Addr} = S) -
stage = tunnel},
hibernate)).
%% @doc Terminate if message queue is too big
check_queue_overflow(#state{last_queue_check = LastCheck} = S) ->
NowMs = erlang:system_time(millisecond),
Delta = NowMs - LastCheck,
case Delta < ?QUEUE_CHECK_INTERVAL of
true ->
{noreply, S};
false ->
case do_check_queue_overflow() of
ok ->
{noreply, S#state{last_queue_check = NowMs}};
overflow ->
{stop, normal, S}
end
end.
do_check_queue_overflow() ->
[{_, QLen}, {_, Mem}, {_, Bin}] =
erlang:process_info(self(), [message_queue_len, memory, binary]),
%% BinSum = sum_binary(Bin),
%% lager:debug("Process size check: queue_len=~w, total_mem=~w, memory=~w, binary_sum=~w, binary=~w",
%% [QLen, (Mem + BinSum) / 1024, Mem, BinSum, Bin]),
case QLen > ?QUEUE_CHECK_MAX_LEN of
true ->
RefcBinSize = sum_binary(Bin),
TotalMem = Mem + RefcBinSize,
case TotalMem > ?QUEUE_CHECK_MAX_MEM of
true ->
lager:warning(
"Process too large queue_len=~w, memory=~w, binary_sum=~w, binary=~p",
[QLen, Mem, RefcBinSize, Bin]),
overflow;
false -> ok
end;
false ->
ok
end.
sum_binary(Bin) ->
trunc(lists:sum([Size / RefC || {_, Size, RefC} <- Bin])).
hex(Bin) ->
<<begin
if N < 10 ->
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment