Add deadline for downstream connect handshake

There were situations, when downstream handshake stuck and stayed in
mtp_dc_pool `pending` field forever.
With this change if downstream can't finish handshake in 8s, it will crash
parent d1bc6d90
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
-define(CONN_TIMEOUT, 10000). -define(CONN_TIMEOUT, 10000).
-define(SEND_TIMEOUT, 15000). -define(SEND_TIMEOUT, 15000).
-define(HANDSHAKE_TIMEOUT, 8000).
-define(MAX_SOCK_BUF_SIZE, 1024 * 500). % Decrease if CPU is cheaper than RAM -define(MAX_SOCK_BUF_SIZE, 1024 * 500). % Decrease if CPU is cheaper than RAM
-define(MAX_CODEC_BUFFERS, 5 * 1024 * 1024). -define(MAX_CODEC_BUFFERS, 5 * 1024 * 1024).
...@@ -172,8 +173,18 @@ handle_info(do_connect, #state{dc_id = DcId} = State) -> ...@@ -172,8 +173,18 @@ handle_info(do_connect, #state{dc_id = DcId} = State) ->
[DcId, lager:pr_stacktrace(Stack, {Class, Reason})]), %XXX lager-specific [DcId, lager:pr_stacktrace(Stack, {Class, Reason})]), %XXX lager-specific
erlang:send_after(300, self(), do_connect), erlang:send_after(300, self(), do_connect),
{noreply, State} {noreply, State}
end;
handle_info(handshake_timeout, #state{stage = Stage, dc_id = DcId} = St) ->
case Stage of
tunnel ->
%% race-condition between deadline timer and actual handshake completion
%% (so, handshake completed exactly at deadline time)
{noreply, St};
_ ->
{stop, {downstream_handshake_timeout, DcId, Stage}, St}
end. end.
terminate(_Reason, #state{upstreams = Ups}) -> terminate(_Reason, #state{upstreams = Ups}) ->
%% Should I do this or dc_pool? Maybe only when reason is 'normal'? %% Should I do this or dc_pool? Maybe only when reason is 'normal'?
?log(warning, "Downstream terminates with reason ~p; len(upstreams)=~p", ?log(warning, "Downstream terminates with reason ~p; len(upstreams)=~p",
...@@ -482,15 +493,16 @@ down_handshake1(S) -> ...@@ -482,15 +493,16 @@ down_handshake1(S) ->
Nonce = crypto:strong_rand_bytes(16), Nonce = crypto:strong_rand_bytes(16),
Schema = 1, %AES Schema = 1, %AES
Msg = mtp_rpc:encode_nonce({nonce, KeySelector, Schema, CryptoTs, Nonce}), Msg = mtp_rpc:encode_nonce({nonce, KeySelector, Schema, CryptoTs, Nonce}),
Deadline = erlang:send_after(?HANDSHAKE_TIMEOUT, self(), handshake_timeout),
S1 = S#state{stage = handshake_1, S1 = S#state{stage = handshake_1,
%% Use fake encryption codec %% Use fake encryption codec
codec = mtp_codec:new(mtp_noop_codec, mtp_noop_codec:new(), codec = mtp_codec:new(mtp_noop_codec, mtp_noop_codec:new(),
mtp_full, mtp_full:new(-2, -2), mtp_full, mtp_full:new(-2, -2),
false, undefined, ?MAX_CODEC_BUFFERS), false, undefined, ?MAX_CODEC_BUFFERS),
stage_state = {KeySelector, Nonce, CryptoTs, Key}}, stage_state = {Deadline, KeySelector, Nonce, CryptoTs, Key}},
down_send(Msg, S1). down_send(Msg, S1).
down_handshake2(Pkt, #state{stage_state = {MyKeySelector, CliNonce, MyTs, Key}, down_handshake2(Pkt, #state{stage_state = {Deadline, MyKeySelector, CliNonce, MyTs, Key},
codec = Codec1, codec = Codec1,
sock = Sock} = S) -> sock = Sock} = S) ->
{nonce, KeySelector, Schema, _CryptoTs, SrvNonce} = mtp_rpc:decode_nonce(Pkt), {nonce, KeySelector, Schema, _CryptoTs, SrvNonce} = mtp_rpc:decode_nonce(Pkt),
...@@ -513,7 +525,7 @@ down_handshake2(Pkt, #state{stage_state = {MyKeySelector, CliNonce, MyTs, Key}, ...@@ -513,7 +525,7 @@ down_handshake2(Pkt, #state{stage_state = {MyKeySelector, CliNonce, MyTs, Key},
S#state{codec = Codec, S#state{codec = Codec,
stage = handshake_2, stage = handshake_2,
addr_bin = iolist_to_binary(mtp_rpc:encode_ip_port(MyIp, MyPort)), addr_bin = iolist_to_binary(mtp_rpc:encode_ip_port(MyIp, MyPort)),
stage_state = SenderPID}). stage_state = {Deadline, SenderPID}}).
get_middle_key(#{srv_n := Nonce, clt_n := MyNonce, clt_ts := MyTs, srv_ip := SrvIpBinBig, srv_port := SrvPort, get_middle_key(#{srv_n := Nonce, clt_n := MyNonce, clt_ts := MyTs, srv_ip := SrvIpBinBig, srv_port := SrvPort,
clt_ip := CltIpBinBig, clt_port := CltPort, secret := Secret, purpose := Purpose} = _Args) -> clt_ip := CltIpBinBig, clt_port := CltPort, secret := Secret, purpose := Purpose} = _Args) ->
...@@ -540,12 +552,13 @@ get_middle_key(#{srv_n := Nonce, clt_n := MyNonce, clt_ts := MyTs, srv_ip := Srv ...@@ -540,12 +552,13 @@ get_middle_key(#{srv_n := Nonce, clt_n := MyNonce, clt_ts := MyTs, srv_ip := Srv
{Key, IV}. {Key, IV}.
down_handshake3(Pkt, #state{stage_state = PrevSenderPid, pool = Pool, down_handshake3(Pkt, #state{stage_state = {Deadline, PrevSenderPid}, pool = Pool, dc_id = DcId,
netloc = {Addr, Port}} = S) -> netloc = {Addr, Port}} = S) ->
erlang:cancel_timer(Deadline),
{handshake, _SenderPid, PeerPid} = mtp_rpc:decode_handshake(Pkt), {handshake, _SenderPid, PeerPid} = mtp_rpc:decode_handshake(Pkt),
(PeerPid == PrevSenderPid) orelse error({wrong_sender_pid, PeerPid}), (PeerPid == PrevSenderPid) orelse error({wrong_sender_pid, PeerPid}),
ok = mtp_dc_pool:ack_connected(Pool, self()), ok = mtp_dc_pool:ack_connected(Pool, self()),
?log(info, "~s:~w: handshake complete", [inet:ntoa(Addr), Port]), ?log(info, "~s:~w: dc=~w handshake complete", [inet:ntoa(Addr), Port, DcId]),
{ok, S#state{stage = tunnel, {ok, S#state{stage = tunnel,
stage_state = undefined}}. stage_state = undefined}}.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment