Fix rpc_close_ext

parent a182c3be
...@@ -145,6 +145,7 @@ handle_upstream_new(Upstream, Opts, #state{upstreams = Ups, ...@@ -145,6 +145,7 @@ handle_upstream_new(Upstream, Opts, #state{upstreams = Ups,
%% Upstream process is exited (or about to exit) %% Upstream process is exited (or about to exit)
handle_upstream_closed(Upstream, #state{upstreams = Ups, handle_upstream_closed(Upstream, #state{upstreams = Ups,
upstreams_rev = UpsRev} = St) -> upstreams_rev = UpsRev} = St) ->
%% See "mtproto-proxy.c:remove_ext_connection
case maps:take(Upstream, Ups) of case maps:take(Upstream, Ups) of
{{ConnId, _, _}, Ups1} -> {{ConnId, _, _}, Ups1} ->
UpsRev1 = maps:remove(ConnId, UpsRev), UpsRev1 = maps:remove(ConnId, UpsRev),
...@@ -153,7 +154,8 @@ handle_upstream_closed(Upstream, #state{upstreams = Ups, ...@@ -153,7 +154,8 @@ handle_upstream_closed(Upstream, #state{upstreams = Ups,
Packet = mtp_rpc:encode_packet(remote_closed, ConnId), Packet = mtp_rpc:encode_packet(remote_closed, ConnId),
down_send(Packet, St1); down_send(Packet, St1);
error -> error ->
lager:warning("Unknown upstream ~p", [Upstream]), %% It happens when we get rpc_close_ext
lager:info("Unknown upstream ~p", [Upstream]),
{ok, St} {ok, St}
end. end.
...@@ -190,10 +192,21 @@ handle_downstream_data(Bin, #state{stage = handshake_2, ...@@ -190,10 +192,21 @@ handle_downstream_data(Bin, #state{stage = handshake_2,
end. end.
-spec handle_rpc(mtp_rpc:packet(), #state{}) -> #state{}. -spec handle_rpc(mtp_rpc:packet(), #state{}) -> #state{}.
handle_rpc({proxy_ans, ConnId, Data}, S) -> handle_rpc({proxy_ans, ConnId, Data}, St) ->
up_send({proxy_ans, self(), Data}, ConnId, S); up_send({proxy_ans, self(), Data}, ConnId, St);
handle_rpc({close_ext, ConnId}, S) -> handle_rpc({close_ext, ConnId}, St) ->
up_send({close_ext, self()}, ConnId, S); #state{upstreams = Ups,
upstreams_rev = UpsRev} = St1 = up_send({close_ext, self()}, ConnId, St),
case maps:take(ConnId, UpsRev) of
{Upstream, UpsRev1} ->
Ups1 = maps:remove(Upstream, Ups),
St2 = St1#state{upstreams = Ups1,
upstreams_rev = UpsRev1},
St2;
error ->
lager:warning("Unknown upstream ~p", [ConnId]),
St1
end;
handle_rpc({simple_ack, ConnId, Confirm}, S) -> handle_rpc({simple_ack, ConnId, Confirm}, S) ->
up_send({simple_ack, self(), Confirm}, ConnId, S). up_send({simple_ack, self(), Confirm}, ConnId, S).
...@@ -210,10 +223,16 @@ down_send(Packet, #state{sock = Sock, codec = Codec} = St) -> ...@@ -210,10 +223,16 @@ down_send(Packet, #state{sock = Sock, codec = Codec} = St) ->
up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) -> up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) ->
%% lager:debug("Down>Up: ~w", [Packet]), case maps:find(ConnId, UpsRev) of
Upstream = maps:get(ConnId, UpsRev), {ok, Upstream} ->
ok = mtp_handler:send(Upstream, Packet), ok = mtp_handler:send(Upstream, Packet),
St. St;
error ->
lager:warning("Unknown connection_id=~w; ups=~w", [ConnId, maps:keys(UpsRev)]),
ClosedPacket = mtp_rpc:encode_packet(remote_closed, ConnId),
{ok, St1} = down_send(ClosedPacket, St),
St1
end.
connect(DcId, S) -> connect(DcId, S) ->
{ok, {Host, Port}} = mtp_config:get_netloc(DcId), {ok, {Host, Port}} = mtp_config:get_netloc(DcId),
......
...@@ -294,10 +294,12 @@ handle_upstream_header(DcId, #state{acc = Acc, ad_tag = Tag, addr = Addr} = S) - ...@@ -294,10 +294,12 @@ handle_upstream_header(DcId, #state{acc = Acc, ad_tag = Tag, addr = Addr} = S) -
{RealDcId, _Pool, Downstream} = mtp_config:get_downstream_safe(DcId, Opts), {RealDcId, _Pool, Downstream} = mtp_config:get_downstream_safe(DcId, Opts),
handle_upstream_data( handle_upstream_data(
Acc, Acc,
S#state{down = Downstream, switch_timer(
dc_id = RealDcId, S#state{down = Downstream,
acc = <<>>, dc_id = RealDcId,
stage = tunnel}). acc = <<>>,
stage = tunnel},
hibernate)).
hex(Bin) -> hex(Bin) ->
<<begin <<begin
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment