Experimental backpressure for downstream

parent 6bc92539
...@@ -15,7 +15,8 @@ ...@@ -15,7 +15,8 @@
upstream_new/3, upstream_new/3,
upstream_closed/2, upstream_closed/2,
shutdown/1, shutdown/1,
send/2]). send/2,
ack/3]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
...@@ -27,14 +28,20 @@ ...@@ -27,14 +28,20 @@
-define(CONN_TIMEOUT, 10000). -define(CONN_TIMEOUT, 10000).
-define(SEND_TIMEOUT, 15000). -define(SEND_TIMEOUT, 15000).
-define(MAX_SOCK_BUF_SIZE, 1024 * 500). % Decrease if CPU is cheaper than RAM -define(MAX_SOCK_BUF_SIZE, 1024 * 500). % Decrease if CPU is cheaper than RAM
%% One slow client can slowdown everyone on the same downstream, but it have
%% it's own healthchecks
-define(MAX_NON_ACK_COUNT, 300).
-define(MAX_NON_ACK_BYTES, 1024 * 1024 * 6). % 6MB
-type handle() :: pid(). -type handle() :: pid().
-type upstream_opts() :: #{addr := mtp_config:netloc(), % IP/Port of TG client -type upstream_opts() :: #{addr := mtp_config:netloc(), % IP/Port of TG client
ad_tag => binary()}. ad_tag => binary()}.
-type upstream() :: { -type upstream() :: {
_ConnId :: mtp_rpc:conn_id(), _UpsStatic ::{_ConnId :: mtp_rpc:conn_id(),
_Addr :: binary(), _Addr :: binary(),
_AdTag :: binary() | undefined _AdTag :: binary() | undefined},
_NonAckCount :: non_neg_integer(),
_NonAckBytes :: non_neg_integer()
}. }.
-type stage() :: init | handshake_1 | handshake_2 | tunnel. -type stage() :: init | handshake_1 | handshake_2 | tunnel.
...@@ -45,6 +52,9 @@ ...@@ -45,6 +52,9 @@
codec :: mtp_codec:codec() | undefined, codec :: mtp_codec:codec() | undefined,
upstreams = #{} :: #{mtp_handler:handle() => upstream()}, upstreams = #{} :: #{mtp_handler:handle() => upstream()},
upstreams_rev = #{} :: #{mtp_rpc:conn_id() => mtp_handler:handle()}, upstreams_rev = #{} :: #{mtp_rpc:conn_id() => mtp_handler:handle()},
overflow_passive = false :: boolean(),
non_ack_count = 0 :: non_neg_integer(),
non_ack_bytes = 0 :: non_neg_integer(),
pool :: pid(), pool :: pid(),
dc_id :: mtp_config:dc_id(), dc_id :: mtp_config:dc_id(),
netloc :: mtp_config:netloc() | undefined % telegram server ip:port netloc :: mtp_config:netloc() | undefined % telegram server ip:port
...@@ -70,6 +80,9 @@ shutdown(Conn) -> ...@@ -70,6 +80,9 @@ shutdown(Conn) ->
send(Conn, Data) -> send(Conn, Data) ->
gen_server:call(Conn, {send, Data}, ?SEND_TIMEOUT * 2). gen_server:call(Conn, {send, Data}, ?SEND_TIMEOUT * 2).
-spec ack(handle(), pos_integer(), pos_integer()) -> ok.
ack(Conn, Count, Size) ->
gen_server:cast(Conn, {ack, self(), Count, Size}).
init([Pool, DcId]) -> init([Pool, DcId]) ->
self() ! do_connect, self() ! do_connect,
...@@ -80,6 +93,8 @@ handle_call({send, Data}, {Upstream, _}, State) -> ...@@ -80,6 +93,8 @@ handle_call({send, Data}, {Upstream, _}, State) ->
{Res, State1} = handle_send(Data, Upstream, State), {Res, State1} = handle_send(Data, Upstream, State),
{reply, Res, State1}. {reply, Res, State1}.
handle_cast({ack, Upstream, Count, Size}, State) ->
{noreply, handle_ack(Upstream, Count, Size, State)};
handle_cast({upstream_new, Upstream, Opts}, State) -> handle_cast({upstream_new, Upstream, Opts}, State) ->
{noreply, handle_upstream_new(Upstream, Opts, State)}; {noreply, handle_upstream_new(Upstream, Opts, State)};
handle_cast({upstream_closed, Upstream}, State) -> handle_cast({upstream_closed, Upstream}, State) ->
...@@ -93,7 +108,7 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock} = S) -> ...@@ -93,7 +108,7 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock} = S) ->
mtp_metric:count_inc([?APP, received, bytes], byte_size(Data), #{labels => [downstream]}), mtp_metric:count_inc([?APP, received, bytes], byte_size(Data), #{labels => [downstream]}),
mtp_metric:histogram_observe([?APP, tracker_packet_size, bytes], byte_size(Data), #{labels => [downstream]}), mtp_metric:histogram_observe([?APP, tracker_packet_size, bytes], byte_size(Data), #{labels => [downstream]}),
{ok, S1} = handle_downstream_data(Data, S), {ok, S1} = handle_downstream_data(Data, S),
ok = inet:setopts(Sock, [{active, once}]), activate_if_no_overflow(S1),
{noreply, S1}; {noreply, S1};
handle_info({tcp_closed, Sock}, #state{sock = Sock} = State) -> handle_info({tcp_closed, Sock}, #state{sock = Sock} = State) ->
{stop, normal, State}; {stop, normal, State};
...@@ -127,8 +142,8 @@ code_change(_OldVsn, State, _Extra) -> ...@@ -127,8 +142,8 @@ code_change(_OldVsn, State, _Extra) ->
handle_send(Data, Upstream, #state{upstreams = Ups, handle_send(Data, Upstream, #state{upstreams = Ups,
addr_bin = ProxyAddr} = St) -> addr_bin = ProxyAddr} = St) ->
case Ups of case Ups of
#{Upstream := UpstreamData} -> #{Upstream := {UpstreamStatic, _, _}} ->
Packet = mtp_rpc:encode_packet({data, Data}, {UpstreamData, ProxyAddr}), Packet = mtp_rpc:encode_packet({data, Data}, {UpstreamStatic, ProxyAddr}),
down_send(Packet, St); down_send(Packet, St);
_ -> _ ->
lager:warning("Upstream=~p not found", [Upstream]), lager:warning("Upstream=~p not found", [Upstream]),
...@@ -141,7 +156,8 @@ handle_upstream_new(Upstream, Opts, #state{upstreams = Ups, ...@@ -141,7 +156,8 @@ handle_upstream_new(Upstream, Opts, #state{upstreams = Ups,
ConnId = erlang:unique_integer(), ConnId = erlang:unique_integer(),
{Ip, Port} = maps:get(addr, Opts), {Ip, Port} = maps:get(addr, Opts),
AdTag = maps:get(ad_tag, Opts, undefined), AdTag = maps:get(ad_tag, Opts, undefined),
Ups1 = Ups#{Upstream => {ConnId, iolist_to_binary(mtp_rpc:encode_ip_port(Ip, Port)), AdTag}}, UpsStatic = {ConnId, iolist_to_binary(mtp_rpc:encode_ip_port(Ip, Port)), AdTag},
Ups1 = Ups#{Upstream => {UpsStatic, 0, 0}},
UpsRev1 = UpsRev#{ConnId => Upstream}, UpsRev1 = UpsRev#{ConnId => Upstream},
lager:debug("New upstream=~p conn_id=~p", [Upstream, ConnId]), lager:debug("New upstream=~p conn_id=~p", [Upstream, ConnId]),
St#state{upstreams = Ups1, St#state{upstreams = Ups1,
...@@ -152,12 +168,13 @@ handle_upstream_closed(Upstream, #state{upstreams = Ups, ...@@ -152,12 +168,13 @@ handle_upstream_closed(Upstream, #state{upstreams = Ups,
upstreams_rev = UpsRev} = St) -> upstreams_rev = UpsRev} = St) ->
%% See "mtproto-proxy.c:remove_ext_connection %% See "mtproto-proxy.c:remove_ext_connection
case maps:take(Upstream, Ups) of case maps:take(Upstream, Ups) of
{{ConnId, _, _}, Ups1} -> {{{ConnId, _, _}, _, _}, Ups1} ->
St1 = non_ack_cleanup_upstream(Upstream, St),
UpsRev1 = maps:remove(ConnId, UpsRev), UpsRev1 = maps:remove(ConnId, UpsRev),
St1 = St#state{upstreams = Ups1, St2 = St1#state{upstreams = Ups1,
upstreams_rev = UpsRev1}, upstreams_rev = UpsRev1},
Packet = mtp_rpc:encode_packet(remote_closed, ConnId), Packet = mtp_rpc:encode_packet(remote_closed, ConnId),
down_send(Packet, St1); down_send(Packet, St2);
error -> error ->
%% It happens when we get rpc_close_ext %% It happens when we get rpc_close_ext
lager:info("Unknown upstream ~p", [Upstream]), lager:info("Unknown upstream ~p", [Upstream]),
...@@ -204,10 +221,10 @@ handle_rpc({close_ext, ConnId}, St) -> ...@@ -204,10 +221,10 @@ handle_rpc({close_ext, ConnId}, St) ->
upstreams_rev = UpsRev} = St1 = up_send({close_ext, self()}, ConnId, St), upstreams_rev = UpsRev} = St1 = up_send({close_ext, self()}, ConnId, St),
case maps:take(ConnId, UpsRev) of case maps:take(ConnId, UpsRev) of
{Upstream, UpsRev1} -> {Upstream, UpsRev1} ->
St2 = non_ack_cleanup_upstream(Upstream, St1),
Ups1 = maps:remove(Upstream, Ups), Ups1 = maps:remove(Upstream, Ups),
St2 = St1#state{upstreams = Ups1, St2#state{upstreams = Ups1,
upstreams_rev = UpsRev1}, upstreams_rev = UpsRev1};
St2;
error -> error ->
lager:warning("Unknown upstream ~p", [ConnId]), lager:warning("Unknown upstream ~p", [ConnId]),
St1 St1
...@@ -231,7 +248,12 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) -> ...@@ -231,7 +248,12 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) ->
case maps:find(ConnId, UpsRev) of case maps:find(ConnId, UpsRev) of
{ok, Upstream} -> {ok, Upstream} ->
ok = mtp_handler:send(Upstream, Packet), ok = mtp_handler:send(Upstream, Packet),
St; case Packet of
{proxy_ans, _, Data} ->
non_ack_bump(Upstream, iolist_size(Data), St);
_ ->
St
end;
error -> error ->
lager:warning("Unknown connection_id=~w", [ConnId]), lager:warning("Unknown connection_id=~w", [ConnId]),
%% WHY!!!? %% WHY!!!?
...@@ -240,6 +262,96 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) -> ...@@ -240,6 +262,96 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) ->
St St
end. end.
%%
%% Backpressure
%%
%% Bumb counters of non-acked packets
non_ack_bump(Upstream, Size, #state{non_ack_count = Cnt,
non_ack_bytes = Oct,
upstreams = Ups} = St) ->
{UpsStatic, UpsCnt, UpsOct} = maps:get(Upstream, Ups),
maybe_deactivate(
St#state{non_ack_count = Cnt + 1,
non_ack_bytes = Oct + Size,
upstreams = Ups#{Upstream := {UpsStatic,
UpsCnt + 1,
UpsOct + Size}}}).
%% Do we have too much unconfirmed packets?
is_overflow(#state{non_ack_count = Cnt, non_ack_bytes = Oct}) ->
(Cnt > ?MAX_NON_ACK_COUNT) orelse (Oct > ?MAX_NON_ACK_BYTES).
%% If we are not overflown and socket is passive, activate it
activate_if_no_overflow(#state{overflow_passive = false, sock = Sock}) ->
ok = inet:setopts(Sock, [{active, once}]),
true;
activate_if_no_overflow(_) ->
false.
%% Decrement counters and activate socket only if overflow was just resolved
handle_ack(Upstream, Count, Size, #state{non_ack_count = Cnt,
non_ack_bytes = Oct,
upstreams = Ups} = St) ->
case maps:get(Upstream, Ups, undefined) of
undefined ->
%% all upstream's counters should already be handled by cleanup_upstream
St;
{UpsStatic, UpsCnt, UpsOct} ->
maybe_activate(
St#state{non_ack_count = Cnt - Count,
non_ack_bytes = Oct - Size,
upstreams = Ups#{Upstream := {UpsStatic,
UpsCnt - Count,
UpsOct - Size}}})
end.
maybe_deactivate(#state{overflow_passive = false, dc_id = Dc} = St) ->
case is_overflow(St) of
true ->
%% Was not overflow, now overflowed
mtp_metric:count_inc([?APP, down_backpressure, total], 1,
#{labels => [Dc, true]}),
St#state{overflow_passive = true};
false ->
%% Was not overflow and still not
St
end;
maybe_deactivate(St) ->
St.
%% Activate socket if we changed state from overflow to ok
maybe_activate(#state{overflow_passive = true, sock = Sock, dc_id = Dc} = St) ->
case is_overflow(St) of
true ->
%% Still overflow
St;
false ->
%% Was overflow, but now resolved
ok = inet:setopts(Sock, [{active, once}]),
mtp_metric:count_inc([?APP, down_backpressure, total], 1,
#{labels => [Dc, false]}),
St#state{overflow_passive = false}
end;
maybe_activate(#state{} = St) ->
St.
%% Reset counters for upstream that was terminated
non_ack_cleanup_upstream(Upstream, #state{non_ack_count = Cnt,
non_ack_bytes = Oct,
upstreams = Ups} = St) ->
{_, UpsCnt, UpsOct} = maps:get(Upstream, Ups),
maybe_activate(
St#state{non_ack_count = Cnt - UpsCnt,
non_ack_bytes = Oct - UpsOct}).
%%
%% Connect / handshake
%%
connect(DcId, S) -> connect(DcId, S) ->
{ok, {Host, Port}} = mtp_config:get_netloc(DcId), {ok, {Host, Port}} = mtp_config:get_netloc(DcId),
case tcp_connect(Host, Port) of case tcp_connect(Host, Port) of
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
-define(HEALTH_CHECK_INTERVAL, 5000). -define(HEALTH_CHECK_INTERVAL, 5000).
-define(HEALTH_CHECK_MAX_QLEN, 300). -define(HEALTH_CHECK_MAX_QLEN, 300).
-define(HEALTH_CHECK_GC, 400 * 1024). %400kb -define(HEALTH_CHECK_GC, 400 * 1024). %400kb
-define(HEALTH_CHECK_MAX_MEM, 4 * 1024 * 1024). %4mb -define(HEALTH_CHECK_MAX_MEM, 3 * 1024 * 1024). %3mb
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
...@@ -124,6 +124,7 @@ handle_call(_Request, _From, State) -> ...@@ -124,6 +124,7 @@ handle_call(_Request, _From, State) ->
handle_cast({proxy_ans, Down, Data}, #state{down = Down} = S) -> handle_cast({proxy_ans, Down, Data}, #state{down = Down} = S) ->
%% telegram server -> proxy %% telegram server -> proxy
{ok, S1} = up_send(Data, S), {ok, S1} = up_send(Data, S),
ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)),
maybe_check_health(bump_timer(S1)); maybe_check_health(bump_timer(S1));
handle_cast({close_ext, Down}, #state{down = Down, sock = USock, transport = UTrans} = S) -> handle_cast({close_ext, Down}, #state{down = Down, sock = USock, transport = UTrans} = S) ->
lager:debug("asked to close connection by downstream"), lager:debug("asked to close connection by downstream"),
......
...@@ -143,6 +143,9 @@ active_metrics() -> ...@@ -143,6 +143,9 @@ active_metrics() ->
#{labels => [dc_id, reason]}}, #{labels => [dc_id, reason]}},
{count, [?APP, down_backpressure, total],
"Times downstream backpressure state was changed",
#{labels => [dc_id, enabled]}},
{histogram, [?APP, upstream_send_duration, seconds], {histogram, [?APP, upstream_send_duration, seconds],
"Duration of tcp send calls to upstream", "Duration of tcp send calls to upstream",
#{duration_unit => seconds, #{duration_unit => seconds,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment