Use "hut" for logger abstraction

parent c8d8edfb
% -*- mode: erlang -*- % -*- mode: erlang -*-
{erl_opts, [debug_info, {erl_opts, [debug_info,
{d, 'HUT_LAGER'},
{parse_transform, lager_transform}]}. {parse_transform, lager_transform}]}.
{deps, [{ranch, "1.7.0"}, {deps, [{ranch, "1.7.0"},
{hut, "1.3.0"},
{lager, "3.6.3"}, {lager, "3.6.3"},
{psq, {git, "https://github.com/eryx67/psq.git", {branch, "master"}}} {psq, {git, "https://github.com/eryx67/psq.git", {branch, "master"}}}
]}. ]}.
......
{"1.1.0", {"1.1.0",
[{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, [{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1},
{<<"hut">>,{pkg,<<"hut">>,<<"1.3.0">>},0},
{<<"lager">>,{pkg,<<"lager">>,<<"3.6.3">>},0}, {<<"lager">>,{pkg,<<"lager">>,<<"3.6.3">>},0},
{<<"psq">>, {<<"psq">>,
{git,"https://github.com/eryx67/psq.git", {git,"https://github.com/eryx67/psq.git",
...@@ -9,6 +10,7 @@ ...@@ -9,6 +10,7 @@
[ [
{pkg_hash,[ {pkg_hash,[
{<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>},
{<<"hut">>, <<"71F2F054E657C03F959CF1ACC43F436EA87580696528CA2A55C8AFB1B06C85E7">>},
{<<"lager">>, <<"FE78951D174616273F87F0DBC3374D1430B1952E5EFC4E1C995592D30A207294">>}, {<<"lager">>, <<"FE78951D174616273F87F0DBC3374D1430B1952E5EFC4E1C995592D30A207294">>},
{<<"ranch">>, <<"9583F47160CA62AF7F8D5DB11454068EAA32B56EEADF984D4F46E61A076DF5F2">>}]} {<<"ranch">>, <<"9583F47160CA62AF7F8D5DB11454068EAA32B56EEADF984D4F46E61A076DF5F2">>}]}
]. ].
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
-type dc_id() :: integer(). -type dc_id() :: integer().
-type netloc() :: {inet:ip4_address(), inet:port_number()}. -type netloc() :: {inet:ip4_address(), inet:port_number()}.
-include_lib("hut/include/hut.hrl").
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(IPS_KEY(DcId), {id, DcId}). -define(IPS_KEY(DcId), {id, DcId}).
-define(IDS_KEY, dc_ids). -define(IDS_KEY, dc_ids).
...@@ -142,7 +144,7 @@ handle_call(_Request, _From, State) -> ...@@ -142,7 +144,7 @@ handle_call(_Request, _From, State) ->
handle_cast(update, #state{timer = Timer} = State) -> handle_cast(update, #state{timer = Timer} = State) ->
update(State, soft), update(State, soft),
lager:info("Config updated"), ?log(info, "Config updated"),
Timer1 = gen_timeout:bump( Timer1 = gen_timeout:bump(
gen_timeout:reset(Timer)), gen_timeout:reset(Timer)),
{noreply, State#state{timer = Timer1}}. {noreply, State#state{timer = Timer1}}.
...@@ -151,7 +153,7 @@ handle_info(timeout, #state{timer = Timer} =State) -> ...@@ -151,7 +153,7 @@ handle_info(timeout, #state{timer = Timer} =State) ->
case gen_timeout:is_expired(Timer) of case gen_timeout:is_expired(Timer) of
true -> true ->
update(State, soft), update(State, soft),
lager:info("Config updated"), ?log(info, "Config updated"),
Timer1 = gen_timeout:bump( Timer1 = gen_timeout:bump(
gen_timeout:reset(Timer)), gen_timeout:reset(Timer)),
{noreply, State#state{timer = Timer1}}; {noreply, State#state{timer = Timer1}};
...@@ -174,9 +176,8 @@ update(#state{tab = Tab}, force) -> ...@@ -174,9 +176,8 @@ update(#state{tab = Tab}, force) ->
update(State, _) -> update(State, _) ->
try update(State, force) try update(State, force)
catch ?WITH_STACKTRACE(Class, Reason, Stack) catch ?WITH_STACKTRACE(Class, Reason, Stack)
lager:error( ?log(error, "Err updating proxy settings: ~s",
"Err updating proxy settings: ~s", [lager:pr_stacktrace(Stack, {Class, Reason})]) %XXX lager-specific
[lager:pr_stacktrace(Stack, {Class, Reason})])
end. end.
update_key(Tab) -> update_key(Tab) ->
...@@ -247,8 +248,8 @@ update_ip([Url | Fallbacks]) -> ...@@ -247,8 +248,8 @@ update_ip([Url | Fallbacks]) ->
{ok, _} = inet:parse_ipv4strict_address(IpStr), %assert {ok, _} = inet:parse_ipv4strict_address(IpStr), %assert
application:set_env(?APP, external_ip, IpStr) application:set_env(?APP, external_ip, IpStr)
catch ?WITH_STACKTRACE(Class, Reason, Stack) catch ?WITH_STACKTRACE(Class, Reason, Stack)
lager:error("Failed to update IP with ~s service: ~s", ?log(error, "Failed to update IP with ~s service: ~s",
[Url, lager:pr_stacktrace(Stack, {Class, Reason})]), [Url, lager:pr_stacktrace(Stack, {Class, Reason})]), %XXX - lager-specific
update_ip(Fallbacks) update_ip(Fallbacks)
end; end;
update_ip([]) -> update_ip([]) ->
......
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
terminate/2, code_change/3]). terminate/2, code_change/3]).
-export_type([status/0]). -export_type([status/0]).
-include_lib("hut/include/hut.hrl").
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
-define(BURST_MAX, 10). -define(BURST_MAX, 10).
...@@ -181,14 +183,12 @@ handle_down(MonRef, Pid, Reason, #state{downstreams = Ds, ...@@ -181,14 +183,12 @@ handle_down(MonRef, Pid, Reason, #state{downstreams = Ds,
{Pid, DsM1} -> {Pid, DsM1} ->
Pending1 = lists:delete(Pid, Pending), Pending1 = lists:delete(Pid, Pending),
Ds1 = ds_remove(Pid, Ds), Ds1 = ds_remove(Pid, Ds),
lager:error("Downstream=~p is down. reason=~p", ?log(error, "Downstream=~p is down. reason=~p", [Pid, Reason]),
[Pid, Reason]),
St#state{pending_downstreams = Pending1, St#state{pending_downstreams = Pending1,
downstreams = Ds1, downstreams = Ds1,
downstream_monitors = DsM1}; downstream_monitors = DsM1};
_ -> _ ->
lager:error("Unexpected DOWN. ref=~p, pid=~p, reason=~p", ?log(error, "Unexpected DOWN. ref=~p, pid=~p, reason=~p", [MonRef, Pid, Reason]),
[MonRef, Pid, Reason]),
St St
end end
end. end.
...@@ -289,7 +289,7 @@ ds_return(Pid, St) -> ...@@ -289,7 +289,7 @@ ds_return(Pid, St) ->
{ok, St1} -> {ok, St1} ->
St1; St1;
undefined -> undefined ->
lager:warning("Attempt to release unknown connection ~p", [Pid]), ?log(warning, "Attempt to release unknown connection ~p", [Pid]),
St St
end. end.
......
...@@ -27,6 +27,8 @@ ...@@ -27,6 +27,8 @@
terminate/2, code_change/3]). terminate/2, code_change/3]).
-export_type([handle/0, upstream_opts/0]). -export_type([handle/0, upstream_opts/0]).
-include_lib("hut/include/hut.hrl").
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
-define(CONN_TIMEOUT, 10000). -define(CONN_TIMEOUT, 10000).
...@@ -115,7 +117,7 @@ handle_call({set_config, Name, Value}, _From, State) -> ...@@ -115,7 +117,7 @@ handle_call({set_config, Name, Value}, _From, State) ->
ok = inet:setopts(State#state.sock, [{buffer, Value}]), ok = inet:setopts(State#state.sock, [{buffer, Value}]),
{ok, OldSize}; {ok, OldSize};
_ -> _ ->
lager:warning("set_config ~p=~p ignored", [Name, Value]), ?log(warning, "set_config ~p=~p ignored", [Name, Value]),
ignored ignored
end, end,
{reply, Result, State}. {reply, Result, State}.
...@@ -145,16 +147,16 @@ handle_info(do_connect, #state{dc_id = DcId} = State) -> ...@@ -145,16 +147,16 @@ handle_info(do_connect, #state{dc_id = DcId} = State) ->
{ok, St1} = connect(DcId, State), {ok, St1} = connect(DcId, State),
{noreply, St1} {noreply, St1}
catch ?WITH_STACKTRACE(Class, Reason, Stack) catch ?WITH_STACKTRACE(Class, Reason, Stack)
lager:error("Down connect error: ~s", ?log(error, "Down connect error: ~s",
[lager:pr_stacktrace(Stack, {Class, Reason})]), [lager:pr_stacktrace(Stack, {Class, Reason})]), %XXX lager-specific
erlang:send_after(300, self(), do_connect), erlang:send_after(300, self(), do_connect),
{noreply, State} {noreply, State}
end. end.
terminate(_Reason, #state{upstreams = Ups}) -> terminate(_Reason, #state{upstreams = Ups}) ->
%% Should I do this or dc_pool? Maybe only when reason is 'normal'? %% Should I do this or dc_pool? Maybe only when reason is 'normal'?
lager:warning("Downstream terminates with reason ~p; len(upstreams)=~p", ?log(warning, "Downstream terminates with reason ~p; len(upstreams)=~p",
[_Reason, map_size(Ups)]), [_Reason, map_size(Ups)]),
Self = self(), Self = self(),
lists:foreach( lists:foreach(
fun(Upstream) -> fun(Upstream) ->
...@@ -172,7 +174,7 @@ handle_send(Data, Upstream, #state{upstreams = Ups, ...@@ -172,7 +174,7 @@ handle_send(Data, Upstream, #state{upstreams = Ups,
Packet = mtp_rpc:encode_packet({data, Data}, {UpstreamStatic, ProxyAddr}), Packet = mtp_rpc:encode_packet({data, Data}, {UpstreamStatic, ProxyAddr}),
down_send(Packet, St); down_send(Packet, St);
_ -> _ ->
lager:warning("Upstream=~p not found", [Upstream]), ?log(warning, "Upstream=~p not found", [Upstream]),
{{error, unknown_upstream}, St} {{error, unknown_upstream}, St}
end. end.
...@@ -185,7 +187,7 @@ handle_upstream_new(Upstream, Opts, #state{upstreams = Ups, ...@@ -185,7 +187,7 @@ handle_upstream_new(Upstream, Opts, #state{upstreams = Ups,
UpsStatic = {ConnId, iolist_to_binary(mtp_rpc:encode_ip_port(Ip, Port)), AdTag}, UpsStatic = {ConnId, iolist_to_binary(mtp_rpc:encode_ip_port(Ip, Port)), AdTag},
Ups1 = Ups#{Upstream => {UpsStatic, 0, 0}}, Ups1 = Ups#{Upstream => {UpsStatic, 0, 0}},
UpsRev1 = UpsRev#{ConnId => Upstream}, UpsRev1 = UpsRev#{ConnId => Upstream},
lager:debug("New upstream=~p conn_id=~p", [Upstream, ConnId]), ?log(debug, "New upstream=~p conn_id=~p", [Upstream, ConnId]),
St#state{upstreams = Ups1, St#state{upstreams = Ups1,
upstreams_rev = UpsRev1}. upstreams_rev = UpsRev1}.
...@@ -203,7 +205,7 @@ handle_upstream_closed(Upstream, #state{upstreams = Ups, ...@@ -203,7 +205,7 @@ handle_upstream_closed(Upstream, #state{upstreams = Ups,
down_send(Packet, St2); down_send(Packet, St2);
error -> error ->
%% It happens when we get rpc_close_ext %% It happens when we get rpc_close_ext
lager:info("Unknown upstream ~p", [Upstream]), ?log(info, "Unknown upstream ~p", [Upstream]),
{ok, St} {ok, St}
end. end.
...@@ -253,7 +255,7 @@ handle_rpc({close_ext, ConnId}, St) -> ...@@ -253,7 +255,7 @@ handle_rpc({close_ext, ConnId}, St) ->
St2#state{upstreams = Ups1, St2#state{upstreams = Ups1,
upstreams_rev = UpsRev1}; upstreams_rev = UpsRev1};
error -> error ->
lager:warning("Unknown upstream ~p", [ConnId]), ?log(warning, "Unknown upstream ~p", [ConnId]),
St1 St1
end; end;
handle_rpc({simple_ack, ConnId, Confirm}, S) -> handle_rpc({simple_ack, ConnId, Confirm}, S) ->
...@@ -261,7 +263,7 @@ handle_rpc({simple_ack, ConnId, Confirm}, S) -> ...@@ -261,7 +263,7 @@ handle_rpc({simple_ack, ConnId, Confirm}, S) ->
-spec down_send(iodata(), #state{}) -> {ok, #state{}}. -spec down_send(iodata(), #state{}) -> {ok, #state{}}.
down_send(Packet, #state{sock = Sock, codec = Codec, dc_id = DcId} = St) -> down_send(Packet, #state{sock = Sock, codec = Codec, dc_id = DcId} = St) ->
%% lager:debug("Up>Down: ~w", [Packet]), %% ?log(debug, "Up>Down: ~w", [Packet]),
{Encoded, Codec1} = mtp_codec:encode_packet(Packet, Codec), {Encoded, Codec1} = mtp_codec:encode_packet(Packet, Codec),
mtp_metric:rt( mtp_metric:rt(
[?APP, downstream_send_duration, seconds], [?APP, downstream_send_duration, seconds],
...@@ -285,7 +287,7 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) -> ...@@ -285,7 +287,7 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) ->
St St
end; end;
error -> error ->
lager:warning("Unknown connection_id=~w", [ConnId]), ?log(warning, "Unknown connection_id=~w", [ConnId]),
%% WHY!!!? %% WHY!!!?
%% ClosedPacket = mtp_rpc:encode_packet(remote_closed, ConnId), %% ClosedPacket = mtp_rpc:encode_packet(remote_closed, ConnId),
%% {ok, St1} = down_send(ClosedPacket, St), %% {ok, St1} = down_send(ClosedPacket, St),
...@@ -393,7 +395,7 @@ connect(DcId, S) -> ...@@ -393,7 +395,7 @@ connect(DcId, S) ->
mtp_metric:count_inc([?APP, out_connect_ok, total], 1, mtp_metric:count_inc([?APP, out_connect_ok, total], 1,
#{labels => [DcId]}), #{labels => [DcId]}),
AddrStr = inet:ntoa(Host), AddrStr = inet:ntoa(Host),
lager:info("~s:~p: TCP connected", [AddrStr, Port]), ?log(info, "~s:~p: TCP connected", [AddrStr, Port]),
down_handshake1(S#state{sock = Sock, down_handshake1(S#state{sock = Sock,
netloc = {Host, Port}}); netloc = {Host, Port}});
{error, Reason} = Err -> {error, Reason} = Err ->
...@@ -489,7 +491,7 @@ down_handshake3(Pkt, #state{stage_state = PrevSenderPid, pool = Pool, ...@@ -489,7 +491,7 @@ down_handshake3(Pkt, #state{stage_state = PrevSenderPid, pool = Pool,
{handshake, _SenderPid, PeerPid} = mtp_rpc:decode_handshake(Pkt), {handshake, _SenderPid, PeerPid} = mtp_rpc:decode_handshake(Pkt),
(PeerPid == PrevSenderPid) orelse error({wrong_sender_pid, PeerPid}), (PeerPid == PrevSenderPid) orelse error({wrong_sender_pid, PeerPid}),
ok = mtp_dc_pool:ack_connected(Pool, self()), ok = mtp_dc_pool:ack_connected(Pool, self()),
lager:info("~s:~w: handshake complete", [inet:ntoa(Addr), Port]), ?log(info, "~s:~w: handshake complete", [inet:ntoa(Addr), Port]),
{ok, S#state{stage = tunnel, {ok, S#state{stage = tunnel,
stage_state = undefined}}. stage_state = undefined}}.
......
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
-type handle() :: pid(). -type handle() :: pid().
-include_lib("hut/include/hut.hrl").
-define(MAX_SOCK_BUF_SIZE, 1024 * 50). % Decrease if CPU is cheaper than RAM -define(MAX_SOCK_BUF_SIZE, 1024 * 50). % Decrease if CPU is cheaper than RAM
-define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb -define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb
...@@ -93,7 +95,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) -> ...@@ -93,7 +95,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) ->
mtp_metric:count_inc([?APP, in_connection, total], 1, #{labels => [Name]}), mtp_metric:count_inc([?APP, in_connection, total], 1, #{labels => [Name]}),
case Transport:peername(Socket) of case Transport:peername(Socket) of
{ok, {Ip, Port}} -> {ok, {Ip, Port}} ->
lager:info("~s: new connection ~s:~p", [Name, inet:ntoa(Ip), Port]), ?log(info, "~s: new connection ~s:~p", [Name, inet:ntoa(Ip), Port]),
{TimeoutKey, TimeoutDefault} = state_timeout(init), {TimeoutKey, TimeoutDefault} = state_timeout(init),
Timer = gen_timeout:new( Timer = gen_timeout:new(
#{timeout => {env, ?APP, TimeoutKey, TimeoutDefault}}), #{timeout => {env, ?APP, TimeoutKey, TimeoutDefault}}),
...@@ -110,7 +112,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) -> ...@@ -110,7 +112,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) ->
{ok, State}; {ok, State};
{error, Reason} -> {error, Reason} ->
mtp_metric:count_inc([?APP, in_connection_closed, total], 1, #{labels => [Name]}), mtp_metric:count_inc([?APP, in_connection_closed, total], 1, #{labels => [Name]}),
lager:info("Can't read peername: ~p", [Reason]), ?log(info, "Can't read peername: ~p", [Reason]),
{stop, error} {stop, error}
end. end.
...@@ -124,14 +126,14 @@ handle_cast({proxy_ans, Down, Data}, #state{down = Down} = S) -> ...@@ -124,14 +126,14 @@ handle_cast({proxy_ans, Down, Data}, #state{down = Down} = S) ->
ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)), ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)),
maybe_check_health(bump_timer(S1)); maybe_check_health(bump_timer(S1));
handle_cast({close_ext, Down}, #state{down = Down, sock = USock, transport = UTrans} = S) -> handle_cast({close_ext, Down}, #state{down = Down, sock = USock, transport = UTrans} = S) ->
lager:debug("asked to close connection by downstream"), ?log(debug, "asked to close connection by downstream"),
ok = UTrans:close(USock), ok = UTrans:close(USock),
{stop, normal, S#state{down = undefined}}; {stop, normal, S#state{down = undefined}};
handle_cast({simple_ack, Down, Confirm}, #state{down = Down} = S) -> handle_cast({simple_ack, Down, Confirm}, #state{down = Down} = S) ->
lager:info("Simple ack: ~p, ~p", [Down, Confirm]), ?log(info, "Simple ack: ~p, ~p", [Down, Confirm]),
{noreply, S}; {noreply, S};
handle_cast(Other, State) -> handle_cast(Other, State) ->
lager:warning("Unexpected msg ~p", [Other]), ?log(warning, "Unexpected msg ~p", [Other]),
{noreply, State}. {noreply, State}.
handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport, handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport,
...@@ -146,18 +148,18 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport, ...@@ -146,18 +148,18 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport,
%% Consider checking health here as well %% Consider checking health here as well
{noreply, bump_timer(S1)}; {noreply, bump_timer(S1)};
{error, Reason} -> {error, Reason} ->
lager:info("handle_data error ~p", [Reason]), ?log(info, "handle_data error ~p", [Reason]),
{stop, normal, S} {stop, normal, S}
catch error:{protocol_error, Type, Extra} -> catch error:{protocol_error, Type, Extra} ->
mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [Type]}), mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [Type]}),
lager:warning("protocol_error ~p ~p", [Type, Extra]), ?log(warning, "protocol_error ~p ~p", [Type, Extra]),
{stop, normal, maybe_close_down(S)} {stop, normal, maybe_close_down(S)}
end; end;
handle_info({tcp_closed, Sock}, #state{sock = Sock} = S) -> handle_info({tcp_closed, Sock}, #state{sock = Sock} = S) ->
lager:debug("upstream sock closed"), ?log(debug, "upstream sock closed"),
{stop, normal, maybe_close_down(S)}; {stop, normal, maybe_close_down(S)};
handle_info({tcp_error, Sock, Reason}, #state{sock = Sock} = S) -> handle_info({tcp_error, Sock, Reason}, #state{sock = Sock} = S) ->
lager:warning("upstream sock error: ~p", [Reason]), ?log(warning, "upstream sock error: ~p", [Reason]),
{stop, normal, maybe_close_down(S)}; {stop, normal, maybe_close_down(S)};
handle_info(timeout, #state{timer = Timer, timer_state = TState, listener = Listener} = S) -> handle_info(timeout, #state{timer = Timer, timer_state = TState, listener = Listener} = S) ->
...@@ -165,7 +167,7 @@ handle_info(timeout, #state{timer = Timer, timer_state = TState, listener = List ...@@ -165,7 +167,7 @@ handle_info(timeout, #state{timer = Timer, timer_state = TState, listener = List
true when TState == stop; true when TState == stop;
TState == init -> TState == init ->
mtp_metric:count_inc([?APP, inactive_timeout, total], 1, #{labels => [Listener]}), mtp_metric:count_inc([?APP, inactive_timeout, total], 1, #{labels => [Listener]}),
lager:info("inactive timeout in state ~p", [TState]), ?log(info, "inactive timeout in state ~p", [TState]),
{stop, normal, S}; {stop, normal, S};
true when TState == hibernate -> true when TState == hibernate ->
mtp_metric:count_inc([?APP, inactive_hibernate, total], 1, #{labels => [Listener]}), mtp_metric:count_inc([?APP, inactive_hibernate, total], 1, #{labels => [Listener]}),
...@@ -175,7 +177,7 @@ handle_info(timeout, #state{timer = Timer, timer_state = TState, listener = List ...@@ -175,7 +177,7 @@ handle_info(timeout, #state{timer = Timer, timer_state = TState, listener = List
{noreply, S#state{timer = Timer1}} {noreply, S#state{timer = Timer1}}
end; end;
handle_info(Other, S) -> handle_info(Other, S) ->
lager:warning("Unexpected msg ~p", [Other]), ?log(warning, "Unexpected msg ~p", [Other]),
{noreply, S}. {noreply, S}.
terminate(_Reason, #state{started_at = Started, listener = Listener} = S) -> terminate(_Reason, #state{started_at = Started, listener = Listener} = S) ->
...@@ -185,7 +187,7 @@ terminate(_Reason, #state{started_at = Started, listener = Listener} = S) -> ...@@ -185,7 +187,7 @@ terminate(_Reason, #state{started_at = Started, listener = Listener} = S) ->
mtp_metric:histogram_observe( mtp_metric:histogram_observe(
[?APP, session_lifetime, seconds], [?APP, session_lifetime, seconds],
erlang:convert_time_unit(Lifetime, millisecond, native), #{labels => [Listener]}), erlang:convert_time_unit(Lifetime, millisecond, native), #{labels => [Listener]}),
lager:info("terminate ~p", [_Reason]), ?log(info, "terminate ~p", [_Reason]),
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
...@@ -280,7 +282,7 @@ up_send(Packet, #state{stage = tunnel, ...@@ -280,7 +282,7 @@ up_send(Packet, #state{stage = tunnel,
sock = Sock, sock = Sock,
transport = Transport, transport = Transport,
listener = Listener} = S) -> listener = Listener} = S) ->
%% lager:debug(">Up: ~p", [Packet]), %% ?log(debug, ">Up: ~p", [Packet]),
{Encoded, UpCodec1} = mtp_codec:encode_packet(Packet, UpCodec), {Encoded, UpCodec1} = mtp_codec:encode_packet(Packet, UpCodec),
mtp_metric:rt([?APP, upstream_send_duration, seconds], mtp_metric:rt([?APP, upstream_send_duration, seconds],
fun() -> fun() ->
...@@ -295,14 +297,14 @@ up_send(Packet, #state{stage = tunnel, ...@@ -295,14 +297,14 @@ up_send(Packet, #state{stage = tunnel,
mtp_metric:count_inc( mtp_metric:count_inc(
[?APP, upstream_send_error, total], 1, [?APP, upstream_send_error, total], 1,
#{labels => [Listener, Reason]}), #{labels => [Listener, Reason]}),
lager:warning("Upstream send error: ~p", [Reason]), ?log(warning, "Upstream send error: ~p", [Reason]),
throw({stop, normal, S}) throw({stop, normal, S})
end end
end, #{labels => [Listener]}), end, #{labels => [Listener]}),
{ok, S#state{codec = UpCodec1}}. {ok, S#state{codec = UpCodec1}}.
down_send(Packet, #state{down = Down} = S) -> down_send(Packet, #state{down = Down} = S) ->
%% lager:debug(">Down: ~p", [Packet]), %% ?log(debug, ">Down: ~p", [Packet]),
case mtp_down_conn:send(Down, Packet) of case mtp_down_conn:send(Down, Packet) of
ok -> ok ->
{ok, S}; {ok, S};
...@@ -316,7 +318,7 @@ handle_unknown_upstream(#state{down = Down, sock = USock, transport = UTrans} = ...@@ -316,7 +318,7 @@ handle_unknown_upstream(#state{down = Down, sock = USock, transport = UTrans} =
ok = UTrans:close(USock), ok = UTrans:close(USock),
receive receive
{'$gen_cast', {close_ext, Down}} -> {'$gen_cast', {close_ext, Down}} ->
lager:debug("asked to close connection by downstream"), ?log(debug, "asked to close connection by downstream"),
throw({stop, normal, S#state{down = undefined}}) throw({stop, normal, S#state{down = undefined}})
after 0 -> after 0 ->
throw({stop, got_unknown_upstream, S}) throw({stop, got_unknown_upstream, S})
...@@ -370,7 +372,7 @@ check_health() -> ...@@ -370,7 +372,7 @@ check_health() ->
do_check_health([{qlen, Limit} | _], #{message_queue_len := QLen} = Health) when QLen > Limit -> do_check_health([{qlen, Limit} | _], #{message_queue_len := QLen} = Health) when QLen > Limit ->
mtp_metric:count_inc([?APP, healthcheck, total], 1, mtp_metric:count_inc([?APP, healthcheck, total], 1,
#{labels => [message_queue_len]}), #{labels => [message_queue_len]}),
lager:warning("Upstream too large queue_len=~w, health=~p", [QLen, Health]), ?log(warning, "Upstream too large queue_len=~w, health=~p", [QLen, Health]),
overflow; overflow;
do_check_health([{gc, Limit} | Other], #{total_mem := TotalMem}) when TotalMem > Limit -> do_check_health([{gc, Limit} | Other], #{total_mem := TotalMem}) when TotalMem > Limit ->
%% Maybe it doesn't makes sense to do GC if queue len is more than, eg, 50? %% Maybe it doesn't makes sense to do GC if queue len is more than, eg, 50?
...@@ -383,8 +385,7 @@ do_check_health([{total_mem, Limit} | _Other], #{total_mem := TotalMem} = Health ...@@ -383,8 +385,7 @@ do_check_health([{total_mem, Limit} | _Other], #{total_mem := TotalMem} = Health
TotalMem > Limit -> TotalMem > Limit ->
mtp_metric:count_inc([?APP, healthcheck, total], 1, mtp_metric:count_inc([?APP, healthcheck, total], 1,
#{labels => [total_memory]}), #{labels => [total_memory]}),
lager:warning("Process too large total_mem=~p, health=~p", ?log(warning, "Process too large total_mem=~p, health=~p", [TotalMem / 1024, Health]),
[TotalMem / 1024, Health]),
overflow; overflow;
do_check_health([_Ok | Other], Health) -> do_check_health([_Ok | Other], Health) ->
do_check_health(Other, Health); do_check_health(Other, Health);
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
terminate/2, code_change/3]). terminate/2, code_change/3]).
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include_lib("hut/include/hut.hrl").
-define(DATA_TAB, ?MODULE). -define(DATA_TAB, ?MODULE).
-define(HISTOGRAM_TAB, mtp_session_storage_histogram). -define(HISTOGRAM_TAB, mtp_session_storage_histogram).
...@@ -108,7 +109,7 @@ handle_info(timeout, #state{data_tab = DataTab, histogram_tab = HistTab, clean_t ...@@ -108,7 +109,7 @@ handle_info(timeout, #state{data_tab = DataTab, histogram_tab = HistTab, clean_t
#{max_age_minutes => 360}), #{max_age_minutes => 360}),
Cleans = clean_storage(DataTab, HistTab, Opts), Cleans = clean_storage(DataTab, HistTab, Opts),
Remaining = ets:info(DataTab, size), Remaining = ets:info(DataTab, size),
lager:info("storage cleaned: ~p; remaining: ~p", [Cleans, Remaining]), ?log(info, "storage cleaned: ~p; remaining: ~p", [Cleans, Remaining]),
gen_timeout:bump(gen_timeout:reset(Timer0)); gen_timeout:bump(gen_timeout:reset(Timer0));
false -> false ->
gen_timeout:reset(Timer0) gen_timeout:reset(Timer0)
......
...@@ -10,8 +10,11 @@ ...@@ -10,8 +10,11 @@
%% Application callbacks %% Application callbacks
-export([start/2, prep_stop/1, stop/1, config_change/3]). -export([start/2, prep_stop/1, stop/1, config_change/3]).
-export([mtp_listeners/0, running_ports/0, start_proxy/1]). -export([mtp_listeners/0, running_ports/0, start_proxy/1]).
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
-include_lib("hut/include/hut.hrl").
-type proxy_port() :: #{name := any(), -type proxy_port() :: #{name := any(),
port := inet:port_number(), port := inet:port_number(),
secret := binary(), secret := binary(),
...@@ -134,15 +137,15 @@ config_changed(Action, ports, Ports) when Action == new; Action == changed -> ...@@ -134,15 +137,15 @@ config_changed(Action, ports, Ports) when Action == new; Action == changed ->
ok; ok;
config_changed(Action, K, V) -> config_changed(Action, K, V) ->
%% Most of the other config options are applied automatically without extra work %% Most of the other config options are applied automatically without extra work
lager:info("Config ~p ~p to ~p ignored", [K, Action, V]), ?log(info, "Config ~p ~p to ~p ignored", [K, Action, V]),
ok. ok.
-ifdef(TEST). -ifdef(TEST).
report(Fmt, Args) -> report(Fmt, Args) ->
lager:debug(Fmt, Args). ?log(debug, Fmt, Args).
-else. -else.
report(Fmt, Args) -> report(Fmt, Args) ->
io:format(Fmt, Args), io:format(Fmt, Args),
lager:info(Fmt, Args). ?log(info, Fmt, Args).
-endif. -endif.
...@@ -171,14 +171,14 @@ handle_event(info, {tcp_closed, _Sock}, _EventName, _S) -> ...@@ -171,14 +171,14 @@ handle_event(info, {tcp_closed, _Sock}, _EventName, _S) ->
hs_send(Packet, #hs_state{transport = Transport, sock = Sock, hs_send(Packet, #hs_state{transport = Transport, sock = Sock,
codec = Codec} = St) -> codec = Codec} = St) ->
%% lager:debug("Up>Down: ~w", [Packet]), %% ?log(debug, "Up>Down: ~w", [Packet]),
{Encoded, Codec1} = mtp_codec:encode_packet(Packet, Codec), {Encoded, Codec1} = mtp_codec:encode_packet(Packet, Codec),
ok = Transport:send(Sock, Encoded), ok = Transport:send(Sock, Encoded),
{ok, St#hs_state{codec = Codec1}}. {ok, St#hs_state{codec = Codec1}}.
t_send(Packet, #t_state{transport = Transport, sock = Sock, t_send(Packet, #t_state{transport = Transport, sock = Sock,
codec = Codec} = St) -> codec = Codec} = St) ->
%% lager:debug("Up>Down: ~w", [Packet]), %% ?log(debug, "Up>Down: ~w", [Packet]),
{Encoded, Codec1} = mtp_codec:encode_packet(Packet, Codec), {Encoded, Codec1} = mtp_codec:encode_packet(Packet, Codec),
ok = Transport:send(Sock, Encoded), ok = Transport:send(Sock, Encoded),
{ok, St#t_state{codec = Codec1}}. {ok, St#t_state{codec = Codec1}}.
......
...@@ -231,11 +231,19 @@ ask_for_close(Id) -> ...@@ -231,11 +231,19 @@ ask_for_close(Id) ->
gen_rpc_close([], _ConnId, St) -> gen_rpc_close([], _ConnId, St) ->
{close, St}. {close, St}.
-ifdef(OTP_RELEASE).
disable_log() ->
logger:set_primary_config(level, critical).
-else.
disable_log() ->
ok.
-endif.
%% Setup / teardown %% Setup / teardown
setup(DcCfg0) -> setup(DcCfg0) ->
application:ensure_all_started(lager), application:ensure_all_started(lager),
lager:set_loglevel(lager_console_backend, critical), lager:set_loglevel(lager_console_backend, critical), %XXX lager-specific
disable_log(),
{ok, Pid} = mtp_test_metric:start_link(), {ok, Pid} = mtp_test_metric:start_link(),
PubKey = crypto:strong_rand_bytes(128), PubKey = crypto:strong_rand_bytes(128),
DcId = ?DC_ID, DcId = ?DC_ID,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment