Handle "max packet size" errors; fix related bug in abridged codec

parent 90feb07c
...@@ -30,12 +30,6 @@ new() -> ...@@ -30,12 +30,6 @@ new() ->
try_decode_packet(<<Flag, Len:24/unsigned-little-integer, Rest/binary>> = Data, try_decode_packet(<<Flag, Len:24/unsigned-little-integer, Rest/binary>> = Data,
#st{buffer = <<>>} = St) when Flag == 127; Flag == 255 -> #st{buffer = <<>>} = St) when Flag == 127; Flag == 255 ->
Len1 = Len * 4, Len1 = Len * 4,
(Len1 < ?MAX_PACKET_SIZE)
orelse
begin
mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [abriged_max_size]}),
error({packet_too_large, Len1})
end,
try_decode_packet_len(Len1, Rest, Data, St); try_decode_packet_len(Len1, Rest, Data, St);
try_decode_packet(<<Len, Rest/binary>> = Data, try_decode_packet(<<Len, Rest/binary>> = Data,
#st{buffer = <<>>} = St) when Len >= 128 -> #st{buffer = <<>>} = St) when Len >= 128 ->
...@@ -51,6 +45,8 @@ try_decode_packet(Bin, #st{buffer = <<>>} = St) -> ...@@ -51,6 +45,8 @@ try_decode_packet(Bin, #st{buffer = <<>>} = St) ->
{incomplete, St#st{buffer = Bin}}. {incomplete, St#st{buffer = Bin}}.
try_decode_packet_len(Len, LenStripped, Data, St) -> try_decode_packet_len(Len, LenStripped, Data, St) ->
(Len < ?MAX_PACKET_SIZE)
orelse error({protocol_error, abridged_max_size, Len}),
case LenStripped of case LenStripped of
<<Packet:Len/binary, Rest/binary>> -> <<Packet:Len/binary, Rest/binary>> ->
{ok, Packet, St#st{buffer = Rest}}; {ok, Packet, St#st{buffer = Rest}};
......
...@@ -140,7 +140,7 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport, ...@@ -140,7 +140,7 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport,
Size = byte_size(Data), Size = byte_size(Data),
mtp_metric:count_inc([?APP, received, upstream, bytes], Size, #{labels => [Listener]}), mtp_metric:count_inc([?APP, received, upstream, bytes], Size, #{labels => [Listener]}),
mtp_metric:histogram_observe([?APP, tracker_packet_size, bytes], Size, #{labels => [upstream]}), mtp_metric:histogram_observe([?APP, tracker_packet_size, bytes], Size, #{labels => [upstream]}),
case handle_upstream_data(Data, S) of try handle_upstream_data(Data, S) of
{ok, S1} -> {ok, S1} ->
ok = Transport:setopts(Sock, [{active, once}]), ok = Transport:setopts(Sock, [{active, once}]),
%% Consider checking health here as well %% Consider checking health here as well
...@@ -148,6 +148,10 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport, ...@@ -148,6 +148,10 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport,
{error, Reason} -> {error, Reason} ->
lager:info("handle_data error ~p", [Reason]), lager:info("handle_data error ~p", [Reason]),
{stop, normal, S} {stop, normal, S}
catch error:{protocol_error, Type, Extra} ->
mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [Type]}),
lager:warning("protocol_error ~p ~p", [Type, Extra]),
{stop, normal, maybe_close_down(S)}
end; end;
handle_info({tcp_closed, Sock}, #state{sock = Sock} = S) -> handle_info({tcp_closed, Sock}, #state{sock = Sock} = S) ->
lager:debug("upstream sock closed"), lager:debug("upstream sock closed"),
...@@ -251,8 +255,7 @@ handle_upstream_data(<<Header:64/binary, Rest/binary>>, #state{stage = init, sta ...@@ -251,8 +255,7 @@ handle_upstream_data(<<Header:64/binary, Rest/binary>>, #state{stage = init, sta
acc = Rest, acc = Rest,
stage_state = undefined}); stage_state = undefined});
{error, Reason} = Err -> {error, Reason} = Err ->
mtp_metric:count_inc([?APP, protocol_error, total], mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [Reason]}),
1, #{labels => [Reason]}),
Err Err
end; end;
handle_upstream_data(Bin, #state{stage = init, stage_state = <<>>} = S) -> handle_upstream_data(Bin, #state{stage = init, stage_state = <<>>} = S) ->
......
...@@ -43,8 +43,7 @@ try_decode_packet(<<Len:32/unsigned-little, _/binary>> = Data, ...@@ -43,8 +43,7 @@ try_decode_packet(<<Len:32/unsigned-little, _/binary>> = Data,
(Len1 < ?MAX_PACKET_SIZE) (Len1 < ?MAX_PACKET_SIZE)
orelse orelse
begin begin
mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [intermediate_max_size]}), error({protocol_error, intermediate_max_size, Len1})
error({packet_too_large, Len1})
end, end,
try_decode_packet_len(Len1, Data, St); try_decode_packet_len(Len1, Data, St);
try_decode_packet(Bin, #int_st{buffer = Buf} = St) when byte_size(Buf) > 0 -> try_decode_packet(Bin, #int_st{buffer = Buf} = St) when byte_size(Buf) > 0 ->
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
-export([echo_secure_case/1, -export([echo_secure_case/1,
echo_abridged_many_packets_case/1, echo_abridged_many_packets_case/1,
packet_too_large_case/1,
downstream_size_backpressure_case/1, downstream_size_backpressure_case/1,
downstream_qlen_backpressure_case/1 downstream_qlen_backpressure_case/1
]). ]).
...@@ -112,6 +113,37 @@ echo_abridged_many_packets_case(Cfg) when is_list(Cfg) -> ...@@ -112,6 +113,37 @@ echo_abridged_many_packets_case(Cfg) when is_list(Cfg) ->
[upstream_to_downstream])). [upstream_to_downstream])).
%% @doc test that client trying to send too big packets will be force-disconnected
packet_too_large_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{}, Cfg);
packet_too_large_case({post, Cfg}) ->
stop_single(Cfg);
packet_too_large_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
Host = ?config(mtp_host, Cfg),
Port = ?config(mtp_port, Cfg),
Secret = ?config(mtp_secret, Cfg),
ErrCount = fun(Tag) ->
mtp_test_metric:get_tags(count, [?APP, protocol_error, total], [Tag])
end,
OkPacket = binary:copy(<<0>>, 64),
BigPacket = binary:copy(<<0>>, 1024 * 1024 + 1024),
Protocols = [
{mtp_intermediate, intermediate_max_size},
{mtp_abridged, abridged_max_size}
],
lists:foreach(
fun({Protocol, Metric}) ->
?assertEqual(not_found, ErrCount(Metric), Protocol),
Cli0 = mtp_test_client:connect(Host, Port, Secret, DcId, Protocol),
Cli1 = mtp_test_client:send(OkPacket, Cli0),
{ok, OkPacket, Cli2} = mtp_test_client:recv_packet(Cli1, 5000),
Cli3 = mtp_test_client:send(BigPacket, Cli2),
?assertEqual({error, closed}, mtp_test_client:recv_packet(Cli3, 5000), Protocol),
?assertEqual(1, ErrCount(Metric), Protocol)
end, Protocols).
%% @doc test downstream backpressure when size of non-acknowledged packets grows above threshold %% @doc test downstream backpressure when size of non-acknowledged packets grows above threshold
downstream_size_backpressure_case({pre, Cfg}) -> downstream_size_backpressure_case({pre, Cfg}) ->
Cfg1 = setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg), Cfg1 = setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg),
......
...@@ -13,14 +13,14 @@ ...@@ -13,14 +13,14 @@
%% Logging config %% Logging config
{lager, {lager,
[{log_root, "log"}, [{log_root, "log"},
{crash_log, "crash.log"}, {crash_log, "test-crash.log"},
{handlers, {handlers,
[ [
{lager_console_backend, {lager_console_backend,
[{level, critical}]}, [{level, critical}]},
{lager_file_backend, {lager_file_backend,
[{file, "application.log"}, [{file, "test-application.log"},
{level, info}, {level, info},
%% Do fsync only on critical messages %% Do fsync only on critical messages
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment