Make it possible to change logick of test server RPC handling

parent a3a21ecd
%% @doc Callback module for mtp_test_middle_server that supports some more tricky commands
-module(mtp_test_cmd_rpc).
-export([call/3,
packet_to_term/1]).
-export([init/1,
handle_rpc/2]).
call(M, F, Opts) ->
true = erlang:function_exported(M, F, 3),
term_to_packet({M, F, Opts}).
term_to_packet(Term) ->
RespBin = term_to_binary(Term),
RespSize = byte_size(RespBin),
PadSize = case (RespSize rem 16) of
0 -> 0;
Rem -> 16 - Rem
end,
Pad = binary:copy(<<0>>, PadSize),
<<RespSize:32/little-unsigned, RespBin/binary, Pad/binary>>.
packet_to_term(<<Size:32/little-unsigned, Term:Size/binary, _Pad/binary>>) ->
binary_to_term(Term).
init(_) ->
#{}.
handle_rpc({data, ConnId, Req}, St) ->
{M, F, Opts} = packet_to_term(Req),
case M:F(Opts, ConnId, St) of
{reply, Resp, St1} ->
{rpc, {proxy_ans, ConnId, term_to_packet(Resp)}, St1};
{return, What} ->
What
end;
handle_rpc({remote_closed, ConnId}, St) ->
is_integer(maps:get(ConnId, St, undefined))
orelse error({unexpected_closed, ConnId}),
{noreply, St#{ConnId := tombstone}}.
...@@ -16,19 +16,28 @@ ...@@ -16,19 +16,28 @@
-define(SECRET_PATH, "/getProxySecret"). -define(SECRET_PATH, "/getProxySecret").
-define(CONFIG_PATH, "/getProxyConfig"). -define(CONFIG_PATH, "/getProxyConfig").
-type dc_conf() :: [{DcId :: integer(),
Ip :: inet:ip4_address(),
Port :: inet:port_number()}].
start_dc() -> start_dc() ->
Secret = crypto:strong_rand_bytes(128), Secret = crypto:strong_rand_bytes(128),
DcConf = [{1, {127, 0, 0, 1}, 8888}], DcConf = [{1, {127, 0, 0, 1}, 8888}],
{ok, _Cfg} = start_dc(Secret, DcConf, #{}). {ok, _Cfg} = start_dc(Secret, DcConf, #{}).
-spec start_dc(binary(), dc_conf(), #{}) -> {ok, #{}}.
start_dc(Secret, DcConf, Acc) -> start_dc(Secret, DcConf, Acc) ->
Cfg = dc_list_to_config(DcConf), Cfg = dc_list_to_config(DcConf),
{ok, Acc1} = start_config_server({127, 0, 0, 1}, 3333, Secret, Cfg, Acc), {ok, Acc1} = start_config_server({127, 0, 0, 1}, 3333, Secret, Cfg, Acc),
RpcHandler = maps:get(rpc_handler, Acc, mtp_test_echo_rpc),
Ids = Ids =
[begin [begin
Id = {?MODULE, DcId}, Id = {?MODULE, DcId},
{ok, _Pid} = mtp_test_middle_server:start(Id, #{port => Port, ip => Ip, secret => Secret}), {ok, _Pid} = mtp_test_middle_server:start(
Id, #{port => Port,
ip => Ip,
secret => Secret,
rpc_handler => RpcHandler}),
Id Id
end || {DcId, Ip, Port} <- DcConf], end || {DcId, Ip, Port} <- DcConf],
{ok, Acc1#{srv_ids => Ids}}. {ok, Acc1#{srv_ids => Ids}}.
......
%% @doc simple callback module for mtp_test_middle_server that echoes received packets back
-module(mtp_test_echo_rpc).
-export([init/1,
handle_rpc/2]).
init(_) ->
#{}.
handle_rpc({data, ConnId, Data}, St) ->
Cnt = maps:get(ConnId, St, 0),
{rpc, {proxy_ans, ConnId, Data}, St#{ConnId => Cnt + 1}};
handle_rpc({remote_closed, ConnId}, St) ->
is_integer(maps:get(ConnId, St))
orelse error({unexpected_closed, ConnId}),
{noreply, St#{ConnId := tombstone}}.
...@@ -28,12 +28,14 @@ ...@@ -28,12 +28,14 @@
cli_ts, cli_ts,
sender_pid, sender_pid,
peer_pid, peer_pid,
srv_nonce}). srv_nonce,
rpc_handler}).
-record(t_state, -record(t_state,
{sock, {sock,
transport, transport,
codec, codec,
clients = #{} :: #{}}). rpc_handler,
rpc_handler_state}).
-define(RPC_NONCE, 170,135,203,122). -define(RPC_NONCE, 170,135,203,122).
-define(RPC_HANDSHAKE, 245,238,130,118). -define(RPC_HANDSHAKE, 245,238,130,118).
...@@ -42,7 +44,7 @@ ...@@ -42,7 +44,7 @@
%% -type state_name() :: wait_nonce | wait_handshake | on_tunnel. %% -type state_name() :: wait_nonce | wait_handshake | on_tunnel.
%% Api %% Api
start(Id, Opts) -> start(Id, #{port := _, secret := _} = Opts) ->
{ok, _} = application:ensure_all_started(ranch), {ok, _} = application:ensure_all_started(ranch),
ranch:start_listener( ranch:start_listener(
Id, ranch_tcp, Id, ranch_tcp,
...@@ -69,10 +71,12 @@ ranch_init({Ref, Transport, Opts}) -> ...@@ -69,10 +71,12 @@ ranch_init({Ref, Transport, Opts}) ->
init({Socket, Transport, Opts}) -> init({Socket, Transport, Opts}) ->
Codec = mtp_codec:new(mtp_noop_codec, mtp_noop_codec:new(), Codec = mtp_codec:new(mtp_noop_codec, mtp_noop_codec:new(),
mtp_full, mtp_full:new(-2, -2)), mtp_full, mtp_full:new(-2, -2)),
{ok, wait_nonce, #hs_state{sock = Socket, State = #hs_state{sock = Socket,
transport = Transport, transport = Transport,
secret = maps:get(secret, Opts), secret = maps:get(secret, Opts),
codec = Codec}}. codec = Codec,
rpc_handler = maps:get(rpc_handler, Opts, mtp_test_echo_rpc)},
{ok, wait_nonce, State}.
callback_mode() -> callback_mode() ->
state_functions. state_functions.
...@@ -131,12 +135,14 @@ wait_handshake(info, {tcp, _Sock, TcpData}, ...@@ -131,12 +135,14 @@ wait_handshake(info, {tcp, _Sock, TcpData},
Answer = mtp_rpc:encode_handshake({handshake, SenderPID, PeerPID}), Answer = mtp_rpc:encode_handshake({handshake, SenderPID, PeerPID}),
{ok, #hs_state{sock = Sock, {ok, #hs_state{sock = Sock,
transport = Transport, transport = Transport,
codec = Codec2}} = hs_send(Answer, S#hs_state{codec = Codec1}), codec = Codec2,
rpc_handler = Handler}} = hs_send(Answer, S#hs_state{codec = Codec1}),
{next_state, on_tunnel, {next_state, on_tunnel,
activate(#t_state{sock = Sock, activate(#t_state{sock = Sock,
transport = Transport, transport = Transport,
codec = Codec2, codec = Codec2,
clients = #{}})}; rpc_handler = Handler,
rpc_handler_state = Handler:init([])})};
wait_handshake(Type, Event, S) -> wait_handshake(Type, Event, S) ->
handle_event(Type, Event, ?FUNCTION_NAME, S). handle_event(Type, Event, ?FUNCTION_NAME, S).
...@@ -178,15 +184,18 @@ activate(#t_state{transport = Transport, sock = Sock} = S) -> ...@@ -178,15 +184,18 @@ activate(#t_state{transport = Transport, sock = Sock} = S) ->
ok = Transport:setopts(Sock, [{active, once}]), ok = Transport:setopts(Sock, [{active, once}]),
S. S.
handle_rpc({data, ConnId, Data}, #t_state{clients = Clients} = S) -> handle_rpc(RPC, #t_state{rpc_handler = Handler, rpc_handler_state = HSt} = S) ->
%% Echo data back case Handler:handle_rpc(RPC, HSt) of
%% TODO: interptet Data to power some test scenarios, eg, client might {rpc, Response, HSt1} ->
%% ask to close it's connection {ok, S1} = t_send(mtp_rpc:srv_encode_packet(Response),
{ok, S1} = t_send(mtp_rpc:srv_encode_packet({proxy_ans, ConnId, Data}), S), S#t_state{rpc_handler_state = HSt1}),
Cnt = maps:get(ConnId, Clients, 0), S1;
%% Increment can fail if there is a tombstone for this client {rpc_multi, Responses, HSt1} ->
S1#t_state{clients = Clients#{ConnId => Cnt + 1}}; lists:foldl(
handle_rpc({remote_closed, ConnId}, #t_state{clients = Clients} = S) -> fun(Response, S1) ->
is_integer(maps:get(ConnId, Clients)) {ok, S2} = t_send(mtp_rpc:srv_encode_packet(Response), S1),
orelse error({unexpected_closed, ConnId}), S2
S#t_state{clients = Clients#{ConnId := tombstone}}. end, S#t_state{rpc_handler_state = HSt1}, Responses);
{noreply, HSt1} ->
S#t_state{rpc_handler_state = HSt1}
end.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment