Add another replay attack protection: filter error replies from downstream

parent 6768b7ba
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
-define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb -define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb
-define(HEALTH_CHECK_INTERVAL, 5000). -define(HEALTH_CHECK_INTERVAL, 5000).
% telegram server responds with "l\xfe\xff\xff" if client packet MTProto is invalid
-define(SRV_ERROR, <<108, 254, 255, 255>>).
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
...@@ -51,7 +53,8 @@ ...@@ -51,7 +53,8 @@
started_at :: pos_integer(), started_at :: pos_integer(),
timer_state = init :: init | hibernate | stop, timer_state = init :: init | hibernate | stop,
timer :: gen_timeout:tout(), timer :: gen_timeout:tout(),
last_queue_check :: integer()}). last_queue_check :: integer(),
srv_error_filter :: first | on | off}).
-type transport() :: module(). -type transport() :: module().
-type stage() :: init | tunnel. -type stage() :: init | tunnel.
...@@ -99,6 +102,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) -> ...@@ -99,6 +102,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) ->
{TimeoutKey, TimeoutDefault} = state_timeout(init), {TimeoutKey, TimeoutDefault} = state_timeout(init),
Timer = gen_timeout:new( Timer = gen_timeout:new(
#{timeout => {env, ?APP, TimeoutKey, TimeoutDefault}}), #{timeout => {env, ?APP, TimeoutKey, TimeoutDefault}}),
Filter = application:get_env(?APP, replay_check_server_error_filter, off),
NowMs = erlang:system_time(millisecond), NowMs = erlang:system_time(millisecond),
State = #state{sock = Socket, State = #state{sock = Socket,
secret = unhex(Secret), secret = unhex(Secret),
...@@ -108,7 +112,8 @@ init({Socket, Transport, [Name, Secret, Tag]}) -> ...@@ -108,7 +112,8 @@ init({Socket, Transport, [Name, Secret, Tag]}) ->
addr = {Ip, Port}, addr = {Ip, Port},
started_at = NowMs, started_at = NowMs,
timer = Timer, timer = Timer,
last_queue_check = NowMs}, last_queue_check = NowMs,
srv_error_filter = Filter},
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
mtp_metric:count_inc([?APP, in_connection_closed, total], 1, #{labels => [Name]}), mtp_metric:count_inc([?APP, in_connection_closed, total], 1, #{labels => [Name]}),
...@@ -120,11 +125,35 @@ handle_call(_Request, _From, State) -> ...@@ -120,11 +125,35 @@ handle_call(_Request, _From, State) ->
Reply = ok, Reply = ok,
{reply, Reply, State}. {reply, Reply, State}.
handle_cast({proxy_ans, Down, Data}, #state{down = Down} = S) -> handle_cast({proxy_ans, Down, Data}, #state{down = Down, srv_error_filter = off} = S) ->
%% telegram server -> proxy %% telegram server -> proxy
%% srv_error_filter is 'off'
{ok, S1} = up_send(Data, S), {ok, S1} = up_send(Data, S),
ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)), ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)),
maybe_check_health(bump_timer(S1)); maybe_check_health(bump_timer(S1));
handle_cast({proxy_ans, Down, ?SRV_ERROR = Data}, #state{down = Down, srv_error_filter = Filter,
addr = {Ip, _}} = S) when Filter =/= off ->
%% telegram server -> proxy
%% Server replied with server error; it might be another kind of replay attack;
%% Don't send this packet to client so proxy won't be fingerprinted
ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)),
?log(warning, "~s: protocol_error srv_error_filtered", [inet:ntoa(Ip)]),
mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [srv_error_filtered]}),
case Filter of
first -> S#state{srv_error_filter = off};
on -> S
end;
handle_cast({proxy_ans, Down, Data}, #state{down = Down, srv_error_filter = Filter} = S) when Filter =/= off ->
%% telegram server -> proxy
%% Normal data packet
%% srv_error_filter is 'on' or srv_error_filter is 'first' and it's 1st server packet
{ok, S1} = up_send(Data, S),
ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)),
S2 = case Filter of
first -> S1#state{srv_error_filter = off};
on -> S1
end,
maybe_check_health(bump_timer(S2));
handle_cast({close_ext, Down}, #state{down = Down, sock = USock, transport = UTrans} = S) -> handle_cast({close_ext, Down}, #state{down = Down, sock = USock, transport = UTrans} = S) ->
?log(debug, "asked to close connection by downstream"), ?log(debug, "asked to close connection by downstream"),
ok = UTrans:close(USock), ok = UTrans:close(USock),
...@@ -137,7 +166,7 @@ handle_cast(Other, State) -> ...@@ -137,7 +166,7 @@ handle_cast(Other, State) ->
{noreply, State}. {noreply, State}.
handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport, handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport,
listener = Listener} = S) -> listener = Listener, addr = {Ip, _}} = S) ->
%% client -> proxy %% client -> proxy
Size = byte_size(Data), Size = byte_size(Data),
mtp_metric:count_inc([?APP, received, upstream, bytes], Size, #{labels => [Listener]}), mtp_metric:count_inc([?APP, received, upstream, bytes], Size, #{labels => [Listener]}),
...@@ -152,7 +181,7 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport, ...@@ -152,7 +181,7 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport,
{stop, normal, S} {stop, normal, S}
catch error:{protocol_error, Type, Extra} -> catch error:{protocol_error, Type, Extra} ->
mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [Type]}), mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [Type]}),
?log(warning, "protocol_error ~p ~p", [Type, Extra]), ?log(warning, "~s: protocol_error ~p ~p", [inet:ntoa(Ip), Type, Extra]),
{stop, normal, maybe_close_down(S)} {stop, normal, maybe_close_down(S)}
end; end;
handle_info({tcp_closed, Sock}, #state{sock = Sock} = S) -> handle_info({tcp_closed, Sock}, #state{sock = Sock} = S) ->
...@@ -268,11 +297,11 @@ handle_upstream_data(Bin, #state{stage = init, stage_state = Buf} = S) -> ...@@ -268,11 +297,11 @@ handle_upstream_data(Bin, #state{stage = init, stage_state = Buf} = S) ->
maybe_check_replay(Packet) -> maybe_check_replay(Packet) ->
%% Check for session replay attack: attempt to connect with the same 1st 64byte packet %% Check for session replay attack: attempt to connect with the same 1st 64byte packet
case lists:member(mtp_session_storage, application:get_env(?APP, replay_checks_enabled, [])) of case application:get_env(?APP, replay_check_session_storage, off) of
true -> on ->
(new == mtp_session_storage:check_add(Packet)) orelse (new == mtp_session_storage:check_add(Packet)) orelse
error({protocol_error, replay_session_detected, Packet}); error({protocol_error, replay_session_detected, Packet});
false -> off ->
ok ok
end. end.
......
...@@ -63,7 +63,18 @@ ...@@ -63,7 +63,18 @@
%% List of enabled replay-attack checks. See %% List of enabled replay-attack checks. See
%% https://habr.com/ru/post/452144/ %% https://habr.com/ru/post/452144/
{replay_checks_enabled, [mtp_session_storage]}, %% session_storage - store last used 1st client packets in special
%% storage, drop connections with same 1st packet
%% Values: on/off
%% Default: off
{replay_check_session_storage, on},
%% server_error_filter - drop server error responses.
%% Values:
%% first - drop server error only if it's 1st server packet
%% on - drop all server error packets
%% off - don't drop server errors
%% Default: off
{replay_check_server_error_filter, first},
%% Options for `mtp_session_storage` replay attack check %% Options for `mtp_session_storage` replay attack check
%% Those settings are not precise! They are checked not in realtime, but %% Those settings are not precise! They are checked not in realtime, but
......
...@@ -13,7 +13,8 @@ ...@@ -13,7 +13,8 @@
downstream_size_backpressure_case/1, downstream_size_backpressure_case/1,
downstream_qlen_backpressure_case/1, downstream_qlen_backpressure_case/1,
config_change_case/1, config_change_case/1,
replay_attack_case/1 replay_attack_case/1,
replay_attack_server_error_case/1
]). ]).
-export([set_env/2, -export([set_env/2,
...@@ -346,6 +347,32 @@ replay_attack_case(Cfg) when is_list(Cfg) -> ...@@ -346,6 +347,32 @@ replay_attack_case(Cfg) when is_list(Cfg) ->
?assertEqual(1, ErrCount()), ?assertEqual(1, ErrCount()),
?assertEqual({error, closed}, mtp_test_client:recv_packet(Cli2, 1000)). ?assertEqual({error, closed}, mtp_test_client:recv_packet(Cli2, 1000)).
%% @doc test replay attack protection.
%% Server error responses are not proxied
replay_attack_server_error_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{}, Cfg);
replay_attack_server_error_case({post, Cfg}) ->
stop_single(Cfg);
replay_attack_server_error_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
Host = ?config(mtp_host, Cfg),
Port = ?config(mtp_port, Cfg),
Secret = ?config(mtp_secret, Cfg),
ErrCount = fun() ->
mtp_test_metric:get_tags(
count, [?APP, protocol_error, total], [srv_error_filtered])
end,
?assertEqual(not_found, ErrCount()),
Cli1 = mtp_test_client:connect(Host, Port, Secret, DcId, mtp_secure),
%% Let TG server echo error packet back, but packet will be filtered
_Cli2 = mtp_test_client:send(<<108, 254, 255, 255>>, Cli1),
?assertEqual(
ok, mtp_test_metric:wait_for_value(
count, [?APP, protocol_error, total], [srv_error_filtered], 1, 5000),
{mtp_session_storage:status(),
sys:get_state(mtp_test_metric)}),
?assertEqual(1, ErrCount()).
%% TODO: send a lot, not read, and then close - assert connection IDs are cleaned up %% TODO: send a lot, not read, and then close - assert connection IDs are cleaned up
%% Helpers %% Helpers
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment