Add test proxy client and some basic common-tests

parent ad2ab942
......@@ -8,5 +8,6 @@ script:
- ./rebar3 compile
- ./rebar3 xref
- ./rebar3 eunit
- ./rebar3 ct
- ./rebar3 dialyzer
- ./rebar3 proper
......@@ -39,6 +39,9 @@
},
{test,
[{deps,
[{proper, "1.3.0"}]}
[{proper, "1.3.0"}]},
{ct_opts, [{sys_config, ["./test/test-sys.config"]}]},
{relx,
[{sys_config, "./test/test-sys.config"}]}
]}]
}.
......@@ -49,6 +49,8 @@ client_create(Secret, Protocol, DcId) ->
DecKey :: binary(),
DecIv :: binary(),
CliCodec :: codec().
client_create(Seed, HexSecret, Protocol, DcId) when byte_size(HexSecret) == 32 ->
client_create(Seed, mtp_handler:unhex(HexSecret), Protocol, DcId);
client_create(Seed, Secret, Protocol, DcId) when byte_size(Seed) == 58,
byte_size(Secret) == 16,
DcId > -10,
......
-module(mtp_test_client).
-export([connect/5,
send/2,
recv_packet/2,
recv_all/2,
close/1]).
-export_type([client/0]).
-record(client,
{sock,
codec}).
-opaque client() :: #client{}.
-type tcp_error() :: inet:posix() | closed. % | timeout.
connect(Host, Port, Secret, DcId, Protocol) ->
Opts = [{packet, raw},
{mode, binary},
{active, false},
{send_timeout, 5000}],
{ok, Sock} = gen_tcp:connect(Host, Port, Opts, 1000),
{Header, _, _, CryptoLayer} = mtp_obfuscated:client_create(Secret, Protocol, DcId),
ok = gen_tcp:send(Sock, Header),
PacketLayer = Protocol:new(),
Codec = mtp_codec:new(mtp_obfuscated, CryptoLayer,
Protocol, PacketLayer),
#client{sock = Sock,
codec = Codec}.
send(Data, #client{sock = Sock, codec = Codec} = Client) ->
{Enc, Codec1} = mtp_codec:encode_packet(Data, Codec),
ok = gen_tcp:send(Sock, Enc),
Client#client{codec = Codec1}.
-spec recv_packet(client(), timeout()) -> {ok, iodata(), client()} | {error, tcp_error() | timeout}.
recv_packet(#client{codec = Codec} = Client, Timeout) ->
case mtp_codec:try_decode_packet(<<>>, Codec) of
{ok, Data, Codec1} ->
%% We already had some data in codec's buffers
{ok, Data, Client#client{codec = Codec1}};
{incomplete, Codec1} ->
recv_packet_inner(Client#client{codec = Codec1}, Timeout)
end.
recv_packet_inner(#client{sock = Sock, codec = Codec0} = Client, Timeout) ->
case gen_tcp:recv(Sock, 0, Timeout) of
{ok, Stream} ->
io:format("~p: ~p~n", [byte_size(Stream), Stream]),
case mtp_codec:try_decode_packet(Stream, Codec0) of
{ok, Data, Codec} ->
{ok, Data, Client#client{codec = Codec}};
{incomplete, Codec} ->
%% recurse
recv_packet_inner(Client#client{codec = Codec}, Timeout)
end;
Err ->
Err
end.
-spec recv_all(client(), timeout()) -> {ok, [iodata()], client()} | {error, tcp_error()}.
recv_all(#client{sock = Sock, codec = Codec0} = Client, Timeout) ->
case tcp_recv_all(Sock, Timeout) of
{ok, Stream} ->
io:format("~p: ~p~n", [byte_size(Stream), Stream]),
{ok, Packets, Codec} =
mtp_codec:fold_packets(
fun(Packet, Acc, Codec) ->
{[Packet | Acc], Codec}
end,
[], Stream, Codec0),
{ok, lists:reverse(Packets),
Client#client{codec = Codec}};
{error, timeout} ->
{ok, [], Client};
Err ->
Err
end.
tcp_recv_all(Sock, Timeout) ->
io:format("Sock: ~p; Timeout: ~p~n~n~n", [Sock, Timeout]),
case gen_tcp:recv(Sock, 0, Timeout) of
{ok, Stream} ->
tcp_recv_all_inner(Sock, Stream);
Err ->
Err
end.
tcp_recv_all_inner(Sock, Acc) ->
case gen_tcp:recv(Sock, 0, 0) of
{ok, Stream} ->
tcp_recv_all_inner(Sock, <<Acc/binary, Stream/binary>>);
{error, timeout} ->
{ok, Acc};
Other ->
Other
end.
close(#client{sock = Sock}) ->
ok = gen_tcp:close(Sock).
-module(mtp_test_metric).
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([notify/4]).
-export([get/2,
get/3,
get_tags/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {count = #{},
gauge = #{},
histogram = #{}}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
notify(Type, Name, Value, Extra) ->
try gen_server:call(?MODULE, {notify, Type, Name, Value, Extra})
catch _:Reason ->
{error, Reason}
end.
get(Type, Name) ->
get(Type, Name, #{}).
get(Type, Name, Extra) ->
gen_server:call(?MODULE, {get, Type, Name, Extra}).
get_tags(Type, Name, Tags) ->
get(Type, Name, #{labels => Tags}).
init([]) ->
{ok, #state{}}.
handle_call({notify, count, Name, Value, Extra}, _From, #state{count = C} = State) ->
K = {Name, Extra},
V1 =
case maps:find(K, C) of
{ok, V0} ->
V0 + Value;
error ->
Value
end,
{reply, ok, State#state{count = C#{K => V1}}};
handle_call({notify, gauge, Name, Value, Extra}, _From, #state{gauge = G} = State) ->
K = {Name, Extra},
{reply, ok, State#state{gauge = G#{K => Value}}};
handle_call({notify, histogram, Name, Value, Extra}, _From, #state{histogram = H} = State) ->
K = {Name, Extra},
V1 =
case maps:find(K, H) of
{ok, {Count, Total, Min, Max}} ->
{Count + 1,
Total + Value,
erlang:min(Min, Value),
erlang:max(Max, Value)};
error ->
{1,
Value,
Value,
Value}
end,
{reply, ok, State#state{histogram = H#{K => V1}}};
handle_call({get, Type, Name, Extra}, _From, State) ->
K = {Name, Extra},
Tab = case Type of
count -> State#state.count;
gauge -> State#state.gauge;
histogram -> State#state.histogram
end,
{reply, maps:get(K, Tab, not_found), State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% Basic tests with only one telegram DC
-module(single_dc_SUITE).
-export([all/0,
init_per_suite/1,
end_per_suite/1,
init_per_testcase/2,
end_per_testcase/2]).
-export([echo_secure_case/1,
echo_abridged_many_packets_case/1]).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
all() ->
%% All exported functions of arity 1 whose name ends with "_case"
Exports = ?MODULE:module_info(exports),
[F
|| {F, A} <- Exports,
A == 1,
case lists:reverse(atom_to_list(F)) of
"esac_" ++ _ -> true;
_ -> false
end].
init_per_suite(Cfg) ->
{ok, _} = application:ensure_all_started(inets),
{ok, _} = application:ensure_all_started(ranch),
Cfg.
end_per_suite(Cfg) ->
Cfg.
init_per_testcase(Name, Cfg) ->
?MODULE:Name({pre, Cfg}).
end_per_testcase(Name, Cfg) ->
?MODULE:Name({post, Cfg}).
echo_secure_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, ?LINE, Cfg);
echo_secure_case({post, Cfg}) ->
stop_single(Cfg);
echo_secure_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
Port = ?config(mtp_port, Cfg),
Secret = ?config(mtp_secret, Cfg),
Cli = mtp_test_client:connect({127, 0, 0, 1}, Port, Secret, DcId, mtp_secure),
Data = crypto:strong_rand_bytes(64),
Cli1 = mtp_test_client:send(Data, Cli),
{ok, Packet, Cli2} = mtp_test_client:recv_packet(Cli1, 1000),
ok = mtp_test_client:close(Cli2),
?assertEqual(Data, Packet),
?assertEqual(1, mtp_test_metric:get_tags(
count, [mtproto_proxy,in_connection,total], [?FUNCTION_NAME])),
%% race-condition
%% ?assertEqual(1, mtp_test_metric:get_tags(
%% count, [mtproto_proxy,in_connection_closed,total], [?FUNCTION_NAME])),
?assertEqual({1, 64, 64, 64},
mtp_test_metric:get_tags(
histogram, [mtproto_proxy,tg_packet_size,bytes],
[upstream_to_downstream])),
?assertMatch({1, _, _, _}, % larger because of RPC headers
mtp_test_metric:get_tags(
histogram, [mtproto_proxy,tg_packet_size,bytes],
[downstream_to_upstream])).
echo_abridged_many_packets_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, ?LINE, Cfg);
echo_abridged_many_packets_case({post, Cfg}) ->
stop_single(Cfg);
echo_abridged_many_packets_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
Port = ?config(mtp_port, Cfg),
Secret = ?config(mtp_secret, Cfg),
Cli0 = mtp_test_client:connect({127, 0, 0, 1}, Port, Secret, DcId, mtp_secure),
Packets =
[crypto:strong_rand_bytes(4 * rand:uniform(50))
|| _ <- lists:seq(1, 15)],
Cli2 = lists:foldl(fun mtp_test_client:send/2, Cli0, Packets),
timer:sleep(10), % TODO: some hook in proxy to find when sent
{ok, RecvPackets, Cli} = mtp_test_client:recv_all(Cli2, 1000),
ok = mtp_test_client:close(Cli),
?assertEqual(Packets, RecvPackets).
%% Helpers
setup_single(Name, Offset, Cfg) ->
{ok, Pid} = mtp_test_metric:start_link(),
PubKey = crypto:strong_rand_bytes(128),
DcId = 1,
DcConf = [{DcId, {127, 0, 0, 1}, 10000 + Offset}],
MtpPort = 10000 + Offset + 1,
Secret = mtp_handler:hex(crypto:strong_rand_bytes(16)),
Listeners = [#{name => Name,
port => MtpPort,
listen_ip => "127.0.0.1",
secret => Secret,
tag => <<"dcbe8f1493fa4cd9ab300891c0b5b326">>}],
application:load(mtproto_proxy),
Cfg1 = set_env([{ports, Listeners}], Cfg),
{ok, DcCfg} = mtp_test_middle_server:start_dc(PubKey, DcConf, #{}),
application:load(mtproto_proxy),
{ok, _} = application:ensure_all_started(mtproto_proxy),
[{dc_id, DcId},
{mtp_port, MtpPort},
{mtp_secret, Secret},
{dc_conf, DcCfg},
{metric, Pid}| Cfg1].
stop_single(Cfg) ->
DcCfg = ?config(dc_conf, Cfg),
MetricPid = ?config(metric, Cfg),
ok = application:stop(mtproto_proxy),
{ok, _} = mtp_test_middle_server:stop_dc(DcCfg),
gen_server:stop(MetricPid),
Cfg.
set_env(Env, Cfg) ->
OldEnv =
[begin
%% OldV is undefined | {ok, V}
OldV = application:get_env(mtproto_proxy, K),
case V of
undefined -> application:unset_env(mtproto_proxy, K);
_ ->
application:set_env(mtproto_proxy, K, V)
end,
{K, OldV}
end || {K, V} <- Env],
[{mtp_env, OldEnv} | Cfg].
%% reset_env(Cfg) ->
%% OldEnv = ?config(mtp_env, Cfg),
%% [case V of
%% undefined ->
%% application:unset_env(mtproto_proxy, K);
%% {ok, Val} ->
%% application:set_env(mtproto_proxy, K, Val)
%% end || {K, V} <- OldEnv].
%% -*- mode: erlang -*-
[
{mtproto_proxy,
[
{ports, []},
{external_ip, "127.0.0.1"},
{listen_ip, "127.0.0.1"},
{num_acceptors, 2},
{init_dc_connections, 1},
{metric_backend, mtp_test_metric}
]},
%% Logging config
{lager,
[{log_root, "log"},
{crash_log, "crash.log"},
{handlers,
[
{lager_console_backend,
[{level, critical}]},
{lager_file_backend,
[{file, "application.log"},
{level, warning},
%% Do fsync only on critical messages
{sync_on, critical}
]}
]}]},
{sasl,
[{errlog_type, error}]}
].
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment