Fix inconsistency in mtp_codec:fold_packets

parent d1398b5b
...@@ -92,7 +92,7 @@ encode_packet(Bin, #codec{packet_mod = PacketMod, ...@@ -92,7 +92,7 @@ encode_packet(Bin, #codec{packet_mod = PacketMod,
{Enc2, S#codec{crypto_state = CryptoSt1, packet_state = PacketSt1}}. {Enc2, S#codec{crypto_state = CryptoSt1, packet_state = PacketSt1}}.
-spec fold_packets(fun( (binary(), FoldSt) -> FoldSt ), -spec fold_packets(fun( (binary(), FoldSt, codec()) -> FoldSt ),
FoldSt, binary(), codec()) -> FoldSt, binary(), codec()) ->
{ok, FoldSt, codec()} {ok, FoldSt, codec()}
when when
...@@ -100,8 +100,8 @@ encode_packet(Bin, #codec{packet_mod = PacketMod, ...@@ -100,8 +100,8 @@ encode_packet(Bin, #codec{packet_mod = PacketMod,
fold_packets(Fun, FoldSt, Data, Codec) -> fold_packets(Fun, FoldSt, Data, Codec) ->
case try_decode_packet(Data, Codec) of case try_decode_packet(Data, Codec) of
{ok, Decoded, Codec1} -> {ok, Decoded, Codec1} ->
FoldSt1 = Fun(Decoded, FoldSt), {FoldSt1, Codec2} = Fun(Decoded, FoldSt, Codec1),
fold_packets(Fun, FoldSt1, <<>>, Codec1); fold_packets(Fun, FoldSt1, <<>>, Codec2);
{incomplete, Codec1} -> {incomplete, Codec1} ->
{ok, FoldSt, Codec1} {ok, FoldSt, Codec1}
end. end.
...@@ -189,12 +189,13 @@ handle_downstream_data(Bin, #state{stage = tunnel, ...@@ -189,12 +189,13 @@ handle_downstream_data(Bin, #state{stage = tunnel,
codec = DownCodec} = S) -> codec = DownCodec} = S) ->
{ok, S3, DownCodec1} = {ok, S3, DownCodec1} =
mtp_codec:fold_packets( mtp_codec:fold_packets(
fun(Decoded, S1) -> fun(Decoded, S1, Codec1) ->
mtp_metric:histogram_observe( mtp_metric:histogram_observe(
[?APP, tg_packet_size, bytes], [?APP, tg_packet_size, bytes],
byte_size(Decoded), byte_size(Decoded),
#{labels => [downstream_to_upstream]}), #{labels => [downstream_to_upstream]}),
handle_rpc(mtp_rpc:decode_packet(Decoded), S1) S2 = handle_rpc(mtp_rpc:decode_packet(Decoded), S1#state{codec = Codec1}),
{S2, S2#state.codec}
end, S, Bin, DownCodec), end, S, Bin, DownCodec),
{ok, S3#state{codec = DownCodec1}}; {ok, S3#state{codec = DownCodec1}};
handle_downstream_data(Bin, #state{stage = handshake_1, handle_downstream_data(Bin, #state{stage = handshake_1,
......
...@@ -230,13 +230,13 @@ handle_upstream_data(Bin, #state{stage = tunnel, ...@@ -230,13 +230,13 @@ handle_upstream_data(Bin, #state{stage = tunnel,
codec = UpCodec} = S) -> codec = UpCodec} = S) ->
{ok, S3, UpCodec1} = {ok, S3, UpCodec1} =
mtp_codec:fold_packets( mtp_codec:fold_packets(
fun(Decoded, S1) -> fun(Decoded, S1, Codec1) ->
mtp_metric:histogram_observe( mtp_metric:histogram_observe(
[?APP, tg_packet_size, bytes], [?APP, tg_packet_size, bytes],
byte_size(Decoded), byte_size(Decoded),
#{labels => [upstream_to_downstream]}), #{labels => [upstream_to_downstream]}),
{ok, S2} = down_send(Decoded, S1), {ok, S2} = down_send(Decoded, S1#state{codec = Codec1}),
S2 {S2, S2#state.codec}
end, S, Bin, UpCodec), end, S, Bin, UpCodec),
{ok, S3#state{codec = UpCodec1}}; {ok, S3#state{codec = UpCodec1}};
handle_upstream_data(<<Header:64/binary, Rest/binary>>, #state{stage = init, stage_state = <<>>, handle_upstream_data(<<Header:64/binary, Rest/binary>>, #state{stage = init, stage_state = <<>>,
......
%% Fake telegram server %% Fake telegram server
%% Secret = crypto:strong_rand_bytes(128). -module(mtp_test_middle_server).
%% DcConf = [{1, {127, 0, 0, 1}, 8888}, {2, {127, 0, 0, 1}, 8889}].
%% Cfg = mtp_test_midle_server:dc_list_to_config(DcConf).
%% mtp_test_midle_server:start_config_server({127, 0, 0, 1}, 3333, Secret, Cfg).
%% mtp_test_midle_server:start(dc1, #{port => 8888, ip => {127, 0, 0, 1}, secret => Secret}).
%% mtp_test_midle_server:start(dc2, #{port => 8889, ip => {127, 0, 0, 1}, secret => Secret}).
%% application:ensure_all_started(mtproto_proxy).
-module(mtp_test_midle_server).
-behaviour(ranch_protocol). -behaviour(ranch_protocol).
-behaviour(gen_statem). -behaviour(gen_statem).
...@@ -57,15 +50,15 @@ ...@@ -57,15 +50,15 @@
-define(SECRET_PATH, "/getProxySecret"). -define(SECRET_PATH, "/getProxySecret").
-define(CONFIG_PATH, "/getProxyConfig"). -define(CONFIG_PATH, "/getProxyConfig").
-type state_name() :: wait_nonce | wait_handshake | on_tunnel. %% -type state_name() :: wait_nonce | wait_handshake | on_tunnel.
start_dc() -> start_dc() ->
Secret = crypto:strong_rand_bytes(128), Secret = crypto:strong_rand_bytes(128),
DcConf = [{1, {127, 0, 0, 1}, 8888}], DcConf = [{1, {127, 0, 0, 1}, 8888}],
{ok, _Cfg} = mtp_test_midle_server:start_dc(Secret, DcConf, #{}). {ok, _Cfg} = start_dc(Secret, DcConf, #{}).
start_dc(Secret, DcConf, Acc) -> start_dc(Secret, DcConf, Acc) ->
Cfg = mtp_test_midle_server:dc_list_to_config(DcConf), Cfg = mtp_test_middle_server:dc_list_to_config(DcConf),
{ok, Acc1} = start_config_server({127, 0, 0, 1}, 3333, Secret, Cfg, Acc), {ok, Acc1} = start_config_server({127, 0, 0, 1}, 3333, Secret, Cfg, Acc),
Ids = Ids =
[begin [begin
...@@ -76,7 +69,7 @@ start_dc(Secret, DcConf, Acc) -> ...@@ -76,7 +69,7 @@ start_dc(Secret, DcConf, Acc) ->
{ok, Acc1#{srv_ids => Ids}}. {ok, Acc1#{srv_ids => Ids}}.
stop_dc(#{srv_ids := Ids} = Acc) -> stop_dc(#{srv_ids := Ids} = Acc) ->
Acc1 = stop_config_server(Acc), {ok, Acc1} = stop_config_server(Acc),
ok = lists:foreach(fun stop/1, Ids), ok = lists:foreach(fun stop/1, Ids),
{ok, maps:without([srv_ids], Acc1)}. {ok, maps:without([srv_ids], Acc1)}.
...@@ -86,14 +79,13 @@ stop_dc(#{srv_ids := Ids} = Acc) -> ...@@ -86,14 +79,13 @@ stop_dc(#{srv_ids := Ids} = Acc) ->
%% Api %% Api
start_config_server(Ip, Port, Secret, DcConfig, Acc) -> start_config_server(Ip, Port, Secret, DcConfig, Acc) ->
application:load(mtproto_proxy),
Netloc = lists:flatten(io_lib:format("http://~s:~w", [inet:ntoa(Ip), Port])), Netloc = lists:flatten(io_lib:format("http://~s:~w", [inet:ntoa(Ip), Port])),
Env = [{proxy_secret_url, Env = [{proxy_secret_url,
Netloc ++ ?SECRET_PATH}, Netloc ++ ?SECRET_PATH},
{proxy_config_url, {proxy_config_url,
Netloc ++ ?CONFIG_PATH}, Netloc ++ ?CONFIG_PATH},
{external_ip, "127.0.0.1"}, {external_ip, "127.0.0.1"},
{init_dc_connections, 1},
{num_acceptors, 4},
{ip_lookup_services, undefined}], {ip_lookup_services, undefined}],
OldEnv = OldEnv =
[begin [begin
...@@ -245,15 +237,21 @@ wait_handshake(info, {tcp, _Sock, TcpData}, ...@@ -245,15 +237,21 @@ wait_handshake(info, {tcp, _Sock, TcpData},
activate(#t_state{sock = Sock, activate(#t_state{sock = Sock,
transport = Transport, transport = Transport,
codec = Codec2, codec = Codec2,
clients = #{}})}. clients = #{}})};
wait_handshake(Type, Event, S) ->
handle_event(Type, Event, ?FUNCTION_NAME, S).
on_tunnel(info, {tcp, _Sock, TcpData}, #t_state{codec = Codec0} = S) -> on_tunnel(info, {tcp, _Sock, TcpData}, #t_state{codec = Codec0} = S) ->
{ok, S2, Codec1} = {ok, S2, Codec1} =
mtp_codec:fold_packets( mtp_codec:fold_packets(
fun(Packet, S1) -> fun(Packet, S1, Codec1) ->
handle_rpc(mtp_rpc:srv_decode_packet(Packet), S1) S2 = handle_rpc(mtp_rpc:srv_decode_packet(Packet), S1#t_state{codec = Codec1}),
{S2, S2#t_state.codec}
end, S, TcpData, Codec0), end, S, TcpData, Codec0),
{keep_state, activate(S2#t_state{codec = Codec1})}. {keep_state, activate(S2#t_state{codec = Codec1})};
on_tunnel(Type, Event, S) ->
handle_event(Type, Event, ?FUNCTION_NAME, S).
handle_event(info, {tcp_closed, _Sock}, _EventName, _S) -> handle_event(info, {tcp_closed, _Sock}, _EventName, _S) ->
{stop, normal}. {stop, normal}.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment