Add some statefull property-based integration tests

parent 66f07465
......@@ -26,9 +26,6 @@
-define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb
-define(HEALTH_CHECK_INTERVAL, 5000).
-define(HEALTH_CHECK_MAX_QLEN, 300).
-define(HEALTH_CHECK_GC, 400 * 1024). %400kb
-define(HEALTH_CHECK_MAX_MEM, 3 * 1024 * 1024). %3mb
-define(APP, mtproto_proxy).
......
......@@ -16,9 +16,9 @@
%%====================================================================
start(_StartType, _StartArgs) ->
Res = {ok, _} = mtproto_proxy_sup:start_link(),
io:format("+++++++++++++++++++++++++++++++++++++++~n"
"Erlang MTProto proxy by @seriyps https://github.com/seriyps/mtproto_proxy~n"
"Sponsored by and powers @socksy_bot~n"),
report("+++++++++++++++++++++++++++++++++++++++~n"
"Erlang MTProto proxy by @seriyps https://github.com/seriyps/mtproto_proxy~n"
"Sponsored by and powers @socksy_bot~n", []),
[start_proxy(Where) || Where <- application:get_env(?APP, ports, [])],
Res.
......@@ -52,9 +52,13 @@ start_proxy(#{name := Name, port := Port, secret := Secret, tag := Tag} = P) ->
"https://t.me/proxy?server=~s&port=~w&secret=~s",
[application:get_env(?APP, external_ip, ListenIpStr),
Port, Secret]),
io:format("Proxy started on ~s:~p with secret: ~s, tag: ~s~nUrl: ~s~n",
[ListenIpStr, Port, Secret, Tag, Url]),
report("Proxy started on ~s:~p with secret: ~s, tag: ~s~nUrl: ~s~n",
[ListenIpStr, Port, Secret, Tag, Url]),
Res.
stop_proxy(#{name := Name}) ->
ranch:stop_listener(Name).
report(Fmt, Args) ->
io:format(Fmt, Args),
lager:info(Fmt, Args).
......@@ -51,6 +51,4 @@ dc_id() ->
codec() ->
Protocols = [mtp_abridged, mtp_intermediate, mtp_secure],
proper_types:oneof(
[proper_types:exactly(P)
|| P <- Protocols]).
proper_types:oneof(Protocols).
......@@ -48,7 +48,7 @@ recv_packet(#client{codec = Codec} = Client, Timeout) ->
recv_packet_inner(#client{sock = Sock, codec = Codec0} = Client, Timeout) ->
case gen_tcp:recv(Sock, 0, Timeout) of
{ok, Stream} ->
io:format("~p: ~p~n", [byte_size(Stream), Stream]),
%% io:format("~p: ~p~n", [byte_size(Stream), Stream]),
case mtp_codec:try_decode_packet(Stream, Codec0) of
{ok, Data, Codec} ->
{ok, Data, Client#client{codec = Codec}};
......
......@@ -30,11 +30,15 @@ handle_rpc({data, ConnId, Req}, St) ->
case M:F(Opts, ConnId, St) of
{reply, Resp, St1} ->
{rpc, {proxy_ans, ConnId, term_to_packet(Resp)}, St1};
{close, St1} ->
{rpc, {close_ext, ConnId}, tombstone(ConnId, St1)};
{return, What} ->
What
end;
handle_rpc({remote_closed, ConnId}, St) ->
is_integer(maps:get(ConnId, St, undefined))
orelse error({unexpected_closed, ConnId}),
{noreply, St#{ConnId := tombstone}}.
{noreply, tombstone(ConnId, St)}.
tombstone(ConnId, St) ->
({ok, tombstone} =/= maps:find(ConnId, St))
orelse error({already_closed, ConnId}),
St#{ConnId => tombstone}.
......@@ -8,6 +8,7 @@
stop_dc/1,
start_config_server/5,
stop_config_server/1]).
-export([middle_connections/1]).
-export([dc_list_to_config/1]).
-export([do/1]).
......@@ -47,6 +48,10 @@ stop_dc(#{srv_ids := Ids} = Acc) ->
ok = lists:foreach(fun mtp_test_middle_server:stop/1, Ids),
{ok, maps:without([srv_ids], Acc1)}.
middle_connections(#{srv_ids := Ids}) ->
lists:flatten([ranch:procs(Id, connections)
|| Id <- Ids]).
%%
%% Inets HTTPD to use as a mock for https://core.telegram.org
%%
......
......@@ -4,7 +4,8 @@
-behaviour(gen_statem).
-export([start/2,
stop/1]).
stop/1,
get_rpc_handler_state/1]).
-export([start_link/4,
ranch_init/1]).
-export([init/1,
......@@ -57,6 +58,9 @@ start(Id, #{port := _, secret := _} = Opts) ->
stop(Id) ->
ranch:stop_listener(Id).
get_rpc_handler_state(Pid) ->
gen_statem:call(Pid, get_rpc_handler_state).
%% Callbacks
start_link(Ref, _, Transport, Opts) ->
......@@ -155,6 +159,8 @@ on_tunnel(info, {tcp, _Sock, TcpData}, #t_state{codec = Codec0} = S) ->
{S2, S2#t_state.codec}
end, S, TcpData, Codec0),
{keep_state, activate(S2#t_state{codec = Codec1})};
on_tunnel({call, From}, get_rpc_handler_state, #t_state{rpc_handler_state = HSt}) ->
{keep_state_and_data, [{reply, From, HSt}]};
on_tunnel(Type, Event, S) ->
handle_event(Type, Event, ?FUNCTION_NAME, S).
......
%% @doc Statefull property-based tests
-module(prop_mtp_statefull).
-export([prop_check_pooling/0,
prop_check_pooling/1,
initial_state/0,
command/1,
precondition/2,
postcondition/3,
next_state/3]).
-export([connect/2,
echo_packet/2,
ask_for_close/1,
close/1]).
-export([gen_rpc_echo/3,
gen_rpc_close/3]).
-include_lib("proper/include/proper.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-record(st, {
ever_opened = 0,
open = [],
closed = [],
ask_for_close = [],
n_packets = #{}
}).
-define(PORT, 10800).
-define(SECRET, <<"d0d6e111bada5511fcce9584deadbeef">>).
-define(HOST, {127, 0, 0, 1}).
-define(DC_ID, 1).
-define(APP, mtproto_proxy).
prop_check_pooling(doc) ->
"Check that connections and packets are 'accounted' correctly".
prop_check_pooling() ->
?FORALL(Cmds, commands(?MODULE), aggregate(command_names(Cmds), run_cmds(Cmds))).
initial_state() ->
#st{}.
command(#st{open = [], ever_opened = EO}) ->
{call, ?MODULE, connect, [EO, mtp_prop_gen:codec()]};
command(#st{open = L, ever_opened = EO}) ->
proper_types:frequency(
[
{1, {call, ?MODULE, connect, [EO, mtp_prop_gen:codec()]}},
{5, {call, ?MODULE, echo_packet, [proper_types:oneof(L), proper_types:binary()]}},
{2, {call, ?MODULE, close, [proper_types:oneof(L)]}},
{2, {call, ?MODULE, ask_for_close, [proper_types:oneof(L)]}}
]).
precondition(#st{open = L}, {call, ?MODULE, close, _}) ->
length(L) > 0;
precondition(#st{open = L}, {call, ?MODULE, echo_packet, _}) ->
length(L) > 0;
precondition(#st{open = L}, {call, ?MODULE, ask_for_close, _}) ->
length(L) > 0;
precondition(_St, {call, _Mod, _Fun, _Args}) ->
true.
%% Given the state `State' *prior* to the call `{call, Mod, Fun, Args}',
%% determine whether the result `Res' (coming from the actual system)
%% makes sense.
postcondition(_State, {call, ?MODULE, connect, _Args}, _Res) ->
true;
postcondition(_State, {call, ?MODULE, close, _Args}, _Res) ->
true;
postcondition(_State, {call, ?MODULE, ask_for_close, _Args}, _Res) ->
true;
postcondition(_State, {call, ?MODULE, echo_packet, [_Conn, SendBin]}, RecvBin) ->
?assertEqual(SendBin, RecvBin),
true;
postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) ->
false.
%% Assuming the postcondition for a call was true, update the model
%% accordingly for the test to proceed.
next_state(#st{open = L, ever_opened = EO} = St, _Res,
{call, ?MODULE, connect, [ConnId, _Proto]}) ->
St#st{open = [ConnId | L],
ever_opened = EO + 1};
next_state(#st{open = L, closed = Cl} = St, _Res, {call, ?MODULE, close, [ConnId]}) ->
St#st{open = lists:delete(ConnId, L),
closed = [ConnId | Cl]};
next_state(#st{open = L, closed = Cl, ask_for_close = NA} = St, _Res,
{call, ?MODULE, ask_for_close, [ConnId]}) ->
St#st{open = lists:delete(ConnId, L),
closed = [ConnId | Cl],
ask_for_close = [ConnId | NA]};
next_state(#st{n_packets = N} = St, _Res, {call, ?MODULE, echo_packet, [ConnId, _]}) ->
NForConn = maps:get(ConnId, N, 0),
St#st{n_packets = N#{ConnId => NForConn + 1}};
next_state(State, _Res, {call, ?MODULE, _, _}) ->
State.
run_cmds(Cmds) ->
Cfg = setup(#{rpc_handler => mtp_test_cmd_rpc}),
{History, State, Result} = run_commands(?MODULE, Cmds),
%% Validate final states of proxy and "middle server"
timer:sleep(100),
ServerState = collect_server_state(Cfg),
Metrics = collect_metrics(Cfg),
ShimDump = shim_dump(),
stop(Cfg),
?WHENFAIL(io:format("History: ~p\n"
"State: ~w\n"
"ServerState: ~p\n"
"Result: ~p\n",
[History, State, ServerState, Result]),
proper:conjunction(
[{state_ok, check_state(State, ServerState, Metrics, ShimDump)},
{result_ok, Result =:= ok}])).
%% Post-run checks. Assert that model's final state matches proxy and middle-server state
collect_server_state(Cfg) ->
DcCfg = ?config(dc_conf, Cfg),
Pids = mtp_test_datacenter:middle_connections(DcCfg),
States = [mtp_test_middle_server:get_rpc_handler_state(Pid) || Pid <- Pids],
%% io:format("~p~n", [States]),
%% Can use just maps:merge/2 because connection IDs in different states will not overlap
lists:foldl(fun maps:merge/2, #{}, States).
collect_metrics(_Cfg) ->
GetTags = fun(Type, Name, Tags) ->
case mtp_test_metric:get_tags(Type, Name, Tags) of
not_found when Type == histogram -> {0, 0, 0, 0};
not_found -> 0;
Val -> Val
end
end,
#{in_connections => GetTags(count, [?APP, in_connection, total], [?MODULE]),
closed_connections => GetTags(count, [?APP, in_connection_closed, total], [?MODULE]),
tg_in_packet_size => GetTags(
histogram, [?APP, tg_packet_size, bytes], [upstream_to_downstream]),
tg_out_packet_size => GetTags(
histogram, [?APP, tg_packet_size, bytes], [downstream_to_upstream])
}.
check_state(#st{closed = ModClosed, n_packets = ModPackets, ask_for_close = ModAskClose,
open = ModClients, ever_opened = ModOpened} = _St,
SrvState, Metrics, ShimDump) ->
%% io:format("~n~w~n~p~n~p~n~p~n", [St, SrvState, Metrics, ShimDump]),
%% Assert shim is correct
?assertEqual(length(ModClients), map_size(ShimDump)),
%% Total number of packets
ModTotalPackets = maps:fold(fun(_K, N, Acc) -> Acc + N end, 0, ModPackets),
SrvTotalPackets = maps:fold(fun({n_packets, _}, N, Acc) -> Acc + N;
(_, _, Acc) -> Acc
end, 0, SrvState),
?assertEqual(ModTotalPackets, SrvTotalPackets),
%% Number of connections that ever sent data RPC
SrvConnsWithPackets = maps:fold(fun({n_packets, _}, _, Acc) -> Acc + 1;
(_, _, Acc) -> Acc
end, 0, SrvState),
?assertEqual(map_size(ModPackets), SrvConnsWithPackets),
%% Number of sent data RPC per-connection
ModSentPerConn = maps:values(ModPackets),
SrvSentPerConn = maps:fold(fun({n_packets, _}, N, Acc) -> [N | Acc];
(_, _, Acc) -> Acc
end, [], SrvState),
?assertEqual(lists:sort(ModSentPerConn), lists:sort(SrvSentPerConn)),
%% Number of telegram packets send from client to server
ModTgPackets = length(ModAskClose) + ModTotalPackets,
?assertMatch({ModTgPackets, _, _, _}, maps:get(tg_in_packet_size, Metrics)),
%% Number of connections that were ever open
%% Can be only asserted by metrics
?assertEqual(ModOpened, maps:get(in_connections, Metrics)),
%% Number of connections that were closed
SrvClosed = maps:fold(fun(_, tombstone, Acc) -> Acc + 1;
(_, _, Acc) -> Acc
end, 0, SrvState),
?assertEqual(length(ModClosed), SrvClosed),
?assertEqual(length(ModClosed), maps:get(closed_connections, Metrics)),
%% Number of still open connections
%% On middleproxy side, connection only started to be tracked if it sent any data.
%% So, if we opened a connection and haven't sent anything, middle will not know about it
MAlive = length(ordsets:intersection(
ordsets:from_list(ModClients),
ordsets:from_list(maps:keys(ModPackets)))),
SrvAlive = maps:fold(fun(Id, Num, Acc) when is_integer(Id), is_integer(Num) -> Acc + 1;
(_, _, Acc) -> Acc
end, 0, SrvState),
?assertEqual(MAlive, SrvAlive),
true.
%% Connect to proxy
connect(Id, Protocol) ->
Conn = mtp_test_client:connect(?HOST, ?PORT, ?SECRET, ?DC_ID, Protocol),
shim_add(Id, Conn),
ok.
%% Send and receive back some binary data
echo_packet(Id, RandBin) ->
Cli0 = shim_pop(Id),
Req = mtp_test_cmd_rpc:call(?MODULE, gen_rpc_echo, RandBin),
Cli1 = mtp_test_client:send(Req, Cli0),
{ok, Res, Cli2} = mtp_test_client:recv_packet(Cli1, 1000),
shim_add(Id, Cli2),
mtp_test_cmd_rpc:packet_to_term(Res).
gen_rpc_echo(RandBin, ConnId, St) ->
Key = {n_packets, ConnId},
NPackets = maps:get(Key, St, 0),
{reply, RandBin, St#{ConnId => 1,
Key => NPackets + 1}}.
%% Close from client-side
close(Id) ->
Conn = shim_pop(Id),
mtp_test_client:close(Conn).
%% Close from telegram-server side
ask_for_close(Id) ->
Cli0 = shim_pop(Id),
Req = mtp_test_cmd_rpc:call(?MODULE, gen_rpc_close, []),
Cli1 = mtp_test_client:send(Req, Cli0),
{error, closed} = mtp_test_client:recv_packet(Cli1, 1000),
ok.
gen_rpc_close([], _ConnId, St) ->
{close, St}.
%% Setup / teardown
setup(DcCfg0) ->
application:ensure_all_started(lager),
lager:set_loglevel(lager_console_backend, critical),
{ok, Pid} = mtp_test_metric:start_link(),
PubKey = crypto:strong_rand_bytes(128),
DcId = ?DC_ID,
Ip = ?HOST,
DcConf = [{DcId, Ip, ?PORT + 5}],
Secret = ?SECRET,
Listeners = [#{name => ?MODULE,
port => ?PORT,
listen_ip => inet:ntoa(Ip),
secret => Secret,
tag => <<"dcbe8f1493fa4cd9ab300891c0b5b326">>}],
application:load(mtproto_proxy),
Cfg1 = single_dc_SUITE:set_env([{ports, Listeners},
{metric_backend, mtp_test_metric}], []),
{ok, DcCfg} = mtp_test_datacenter:start_dc(PubKey, DcConf, DcCfg0),
application:load(mtproto_proxy),
{ok, _} = application:ensure_all_started(mtproto_proxy),
shim_start(),
[{dc_conf, DcCfg}, {metric, Pid} | Cfg1].
stop(Cfg) ->
DcCfg = ?config(dc_conf, Cfg),
MetricPid = ?config(metric, Cfg),
ok = application:stop(mtproto_proxy),
{ok, _} = mtp_test_datacenter:stop_dc(DcCfg),
single_dc_SUITE:reset_env(Cfg),
gen_server:stop(MetricPid),
shim_stop(),
Cfg.
%% Proces - wrapper holding client connections and states
shim_add(Id, Conn) ->
?MODULE ! {add, Id, Conn}.
shim_pop(Id) ->
?MODULE ! {pop, self(), Id},
receive {conn, Conn} ->
Conn
end.
shim_dump() ->
?MODULE ! {dump, self()},
receive {dump, Conns} ->
Conns
end.
shim_start() ->
Pid = proc_lib:spawn_link(fun loop/0),
register(?MODULE, Pid).
shim_stop() ->
Pid = whereis(?MODULE),
unregister(?MODULE),
exit(Pid, normal).
loop() ->
loop(#{}).
loop(Acc) ->
receive
{dump, From} ->
From ! {dump, Acc},
loop(Acc);
{add, Id, Conn} ->
false = maps:is_key(Id, Acc),
loop(Acc#{Id => Conn});
{pop, From, Id} ->
{Conn, Acc1} = maps:take(Id, Acc),
From ! {conn, Conn},
loop(Acc1)
end.
......@@ -13,6 +13,9 @@
downstream_qlen_backpressure_case/1
]).
-export([set_env/2,
reset_env/1]).
-export([gen_rpc_replies/3]).
-include_lib("common_test/include/ct.hrl").
......@@ -49,7 +52,7 @@ end_per_testcase(Name, Cfg) ->
%% @doc Send single packet and receive it back
echo_secure_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, ?LINE, #{}, Cfg);
setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{}, Cfg);
echo_secure_case({post, Cfg}) ->
stop_single(Cfg);
echo_secure_case(Cfg) when is_list(Cfg) ->
......@@ -78,7 +81,7 @@ echo_secure_case(Cfg) when is_list(Cfg) ->
%% @doc Send many packets and receive them back
echo_abridged_many_packets_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, ?LINE, #{}, Cfg);
setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{}, Cfg);
echo_abridged_many_packets_case({post, Cfg}) ->
stop_single(Cfg);
echo_abridged_many_packets_case(Cfg) when is_list(Cfg) ->
......@@ -111,11 +114,11 @@ echo_abridged_many_packets_case(Cfg) when is_list(Cfg) ->
%% @doc test downstream backpressure when size of non-acknowledged packets grows above threshold
downstream_size_backpressure_case({pre, Cfg}) ->
Cfg1 = setup_single(?FUNCTION_NAME, ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg),
Cfg1 = setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg),
%% Disable upstream healthchecks
application:set_env(?APP, upstream_healthchecks, []),
Cfg1;
set_env([{upstream_healthchecks, []}], Cfg1);
downstream_size_backpressure_case({post, Cfg}) ->
reset_env(Cfg),
stop_single(Cfg);
downstream_size_backpressure_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
......@@ -171,13 +174,13 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) ->
downstream_qlen_backpressure_case({pre, Cfg}) ->
application:load(mtproto_proxy),
%% Reducing downstream socket buffer size. Otherwise we can get queue overflow from just single
%% socket data packet
application:set_env(mtproto_proxy, downstream_socket_buffer_size, 1024),
Cfg1 = setup_single(?FUNCTION_NAME, ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg),
%% socket data packet;
%% Disable upstream healthchecks
application:set_env(?APP, upstream_healthchecks, []),
Cfg1;
Cfg1 = set_env([{downstream_socket_buffer_size, 1024},
{upstream_healthchecks, []}], Cfg),
setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg1);
downstream_qlen_backpressure_case({post, Cfg}) ->
reset_env(Cfg),
stop_single(Cfg);
downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
......@@ -222,13 +225,12 @@ gen_rpc_replies(#{packet := Packet, n := N}, ConnId, St) ->
%% Helpers
setup_single(Name, Offset, DcCfg0, Cfg) ->
setup_single(Name, MtpPort, DcCfg0, Cfg) ->
{ok, Pid} = mtp_test_metric:start_link(),
PubKey = crypto:strong_rand_bytes(128),
DcId = 1,
Ip = {127, 0, 0, 1},
DcConf = [{DcId, Ip, 10000 + Offset}],
MtpPort = 10000 + Offset + 1,
DcConf = [{DcId, Ip, MtpPort + 10}],
Secret = mtp_handler:hex(crypto:strong_rand_bytes(16)),
Listeners = [#{name => Name,
port => MtpPort,
......@@ -270,11 +272,11 @@ set_env(Env, Cfg) ->
end || {K, V} <- Env],
[{mtp_env, OldEnv} | Cfg].
%% reset_env(Cfg) ->
%% OldEnv = ?config(mtp_env, Cfg),
%% [case V of
%% undefined ->
%% application:unset_env(mtproto_proxy, K);
%% {ok, Val} ->
%% application:set_env(mtproto_proxy, K, Val)
%% end || {K, V} <- OldEnv].
reset_env(Cfg) ->
OldEnv = ?config(mtp_env, Cfg),
[case V of
undefined ->
application:unset_env(mtproto_proxy, K);
{ok, Val} ->
application:set_env(mtproto_proxy, K, Val)
end || {K, V} <- OldEnv].
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment