Make it possible to export metrics by providing callback module

parent 1cfdf905
%%% @author sergey <me@seriyps.ru>
%%% @copyright (C) 2018, sergey
%%% @doc
%%% Interface for logging metrics (CODE WIPED)
%%% @end
%%% Created : 15 May 2018 by sergey <me@seriyps.ru>
-module(metric).
-export([count_inc/3,
gauge_set/3,
rt/2,
histogram_observe/3]).
count_inc(_Name, _Value, _Extra) ->
noop.
gauge_set(_Name, _Value, _Extra) ->
noop.
histogram_observe(_Name, _Value, _Extra) ->
noop.
rt(_Name, Fun) ->
Fun().
......@@ -31,7 +31,7 @@ try_decode_packet(<<Flag, Len:24/unsigned-little-integer, Rest/binary>> = Data,
(Len1 < ?MAX_PACKET_SIZE)
orelse
begin
metric:count_inc([?APP, protocol_error, total], 1, #{labels => [abriged_max_size]}),
mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [abriged_max_size]}),
error({packet_too_large, Len1})
end,
try_decode_packet_len(Len1, Rest, Data, St);
......
......@@ -50,7 +50,6 @@
%% APIs
start_link(Ref, Socket, Transport, Opts) ->
metric:count_inc([?APP, in_connection, total], 1, #{}),
{ok, proc_lib:spawn_link(?MODULE, ranch_init, [{Ref, Socket, Transport, Opts}])}.
keys_str() ->
......@@ -72,14 +71,16 @@ ranch_init({Ref, Socket, Transport, _} = Opts) ->
]),
gen_server:enter_loop(?MODULE, [], State);
error ->
metric:count_inc([?APP, in_connection_closed, total], 1, #{}),
mtp_metric:count_inc([?APP, in_connection_closed, total], 1, #{}),
exit(normal)
end.
init({_Ref, Socket, Transport, [Secret, Tag]}) ->
init({_Ref, Socket, Transport, [Name, Secret, Tag]}) ->
mtp_metric:set_context_labels([Name]),
mtp_metric:count_inc([?APP, in_connection, total], 1, #{}),
case Transport:peername(Socket) of
{ok, {Ip, Port}} ->
lager:info("New connection ~s:~p", [inet:ntoa(Ip), Port]),
lager:info("~s: new connection ~s:~p", [Name, inet:ntoa(Ip), Port]),
{TimeoutKey, TimeoutDefault} = state_timeout(init),
Timer = gen_timeout:new(
#{timeout => {env, ?APP, TimeoutKey, TimeoutDefault}}),
......@@ -153,11 +154,11 @@ handle_info(timeout, #state{timer = Timer, timer_state = TState} = S) ->
case gen_timeout:is_expired(Timer) of
true when TState == stop;
TState == init ->
metric:count_inc([?APP, inactive_timeout, total], 1, #{}),
mtp_metric:count_inc([?APP, inactive_timeout, total], 1, #{}),
lager:info("inactive timeout in state ~p", [TState]),
{stop, normal, S};
true when TState == hibernate ->
metric:count_inc([?APP, inactive_hibernate, total], 1, #{}),
mtp_metric:count_inc([?APP, inactive_hibernate, total], 1, #{}),
{noreply, switch_timer(S, stop), hibernate};
false ->
Timer1 = gen_timeout:reset(Timer),
......@@ -168,7 +169,7 @@ handle_info(Other, S) ->
{noreply, S}.
terminate(_Reason, #state{}) ->
metric:count_inc([?APP, in_connection_closed, total], 1, #{}),
mtp_metric:count_inc([?APP, in_connection_closed, total], 1, #{}),
lager:debug("terminate ~p", [_Reason]),
ok.
......@@ -192,7 +193,7 @@ bump_timer(#state{timer = Timer, timer_state = TState} = S) ->
switch_timer(#state{timer_state = TState} = S, TState) ->
S;
switch_timer(#state{timer_state = FromState, timer = Timer} = S, ToState) ->
metric:count_inc([?APP, timer_switch, total], 1,
mtp_metric:count_inc([?APP, timer_switch, total], 1,
#{labels => [FromState, ToState]}),
{NewTimeKey, NewTimeDefault} = state_timeout(ToState),
Timer1 = gen_timeout:set_timeout(
......@@ -215,7 +216,7 @@ handle_upstream_data(<<Header:64/binary, Rest/binary>>, #state{stage = init, sta
secret = Secret} = S) ->
case mtp_obfuscated:from_header(Header, Secret) of
{ok, DcId, PacketLayerMod, ObfuscatedCodec} ->
metric:count_inc([?APP, protocol_ok, total],
mtp_metric:count_inc([?APP, protocol_ok, total],
1, #{labels => [PacketLayerMod]}),
ObfuscatedLayer = mtp_layer:new(mtp_obfuscated, ObfuscatedCodec),
PacketLayer = mtp_layer:new(PacketLayerMod, PacketLayerMod:new()),
......@@ -227,7 +228,7 @@ handle_upstream_data(<<Header:64/binary, Rest/binary>>, #state{stage = init, sta
up_acc = Rest,
stage_state = undefined});
{error, Reason} = Err ->
metric:count_inc([?APP, protocol_error, total],
mtp_metric:count_inc([?APP, protocol_error, total],
1, #{labels => [Reason]}),
Err
end;
......@@ -240,7 +241,7 @@ handle_upstream_data(Bin, #state{stage = tunnel,
{ok, S3, UpCodec1} =
mtp_layer:fold_packets(
fun(Decoded, S1) ->
metric:histogram_observe(
mtp_metric:histogram_observe(
[?APP, tg_packet_size, bytes],
byte_size(Decoded),
#{labels => [upstream_to_downstream]}),
......@@ -285,7 +286,7 @@ handle_downstream_data(Bin, #state{stage = tunnel,
{ok, S3, DownCodec1} =
mtp_layer:fold_packets(
fun(Decoded, S1) ->
metric:histogram_observe(
mtp_metric:histogram_observe(
[?APP, tg_packet_size, bytes],
byte_size(Decoded),
#{labels => [downstream_to_upstream]}),
......@@ -300,7 +301,7 @@ up_send(Packet, #state{stage = tunnel,
up_sock = Sock,
up_transport = Transport} = S) ->
{Encoded, UpCodec1} = mtp_layer:encode_packet(Packet, UpCodec),
metric:rt([?APP, upstream_send_duration, seconds],
mtp_metric:rt([?APP, upstream_send_duration, seconds],
fun() ->
ok = Transport:send(Sock, Encoded)
end),
......@@ -309,7 +310,7 @@ up_send(Packet, #state{stage = tunnel,
down_send(Packet, #state{down_sock = Sock,
down_codec = DownCodec} = S) ->
{Encoded, DownCodec1} = mtp_layer:encode_packet(Packet, DownCodec),
metric:rt([?APP, downstream_send_duration, seconds],
mtp_metric:rt([?APP, downstream_send_duration, seconds],
fun() ->
ok = gen_tcp:send(Sock, Encoded)
end),
......@@ -325,12 +326,12 @@ handle_upstream_header(DcId, S) ->
case connect(Addr, Port) of
{ok, Sock} ->
AddrStr = inet:ntoa(Addr),
metric:count_inc([?APP, out_connect_ok, total], 1,
mtp_metric:count_inc([?APP, out_connect_ok, total], 1,
#{labels => [AddrStr]}),
lager:info("Connected to ~s:~p", [AddrStr, Port]),
down_handshake1(S#state{down_sock = Sock});
{error, Reason} = Err ->
metric:count_inc([?APP, out_connect_error, total], 1, #{labels => [Reason]}),
mtp_metric:count_inc([?APP, out_connect_error, total], 1, #{labels => [Reason]}),
Err
end.
......@@ -344,7 +345,7 @@ connect(Host, Port) ->
{send_timeout, ?SEND_TIMEOUT},
%% {nodelay, true},
{keepalive, true}],
case metric:rt([?APP, downstream_connect_duration, seconds],
case mtp_metric:rt([?APP, downstream_connect_duration, seconds],
fun() ->
gen_tcp:connect(Host, Port, SockOpts, ?CONN_TIMEOUT)
end) of
......@@ -473,8 +474,8 @@ unhex(Chars) ->
track(Direction, Data) ->
Size = byte_size(Data),
metric:count_inc([?APP, tracker, bytes], Size, #{labels => [Direction]}),
metric:histogram_observe([?APP, tracker_packet_size, bytes], Size, #{labels => [Direction]}).
mtp_metric:count_inc([?APP, tracker, bytes], Size, #{labels => [Direction]}),
mtp_metric:histogram_observe([?APP, tracker_packet_size, bytes], Size, #{labels => [Direction]}).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
......
......@@ -41,7 +41,7 @@ try_decode_packet(<<Len:32/unsigned-little, _/binary>> = Data,
(Len1 < ?MAX_PACKET_SIZE)
orelse
begin
metric:count_inc([?APP, protocol_error, total], 1, #{labels => [intermediate_max_size]}),
mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [intermediate_max_size]}),
error({packet_too_large, Len1})
end,
try_decode_packet_len(Len1, Data, St);
......
%%% @author sergey <me@seriyps.ru>
%%% @copyright (C) 2018, sergey
%%% @doc
%%% Backend-agnostic interface for logging metrics.
%%% Made with prometheus.erl in mind, but might be used with smth else
%%% @end
%%% Created : 15 May 2018 by sergey <me@seriyps.ru>
-module(mtp_metric).
-export([count_inc/3,
gauge_set/3,
histogram_observe/3,
rt/2,
set_context_labels/1]).
-export([passive_metrics/0,
active_metrics/0]).
-define(APP, mtproto_proxy).
-define(PD_KEY, {?MODULE, context_labels}).
-type metric_type() :: gauge | count | histogram.
-type metric_name() :: [atom()].
-type metric_doc() :: string().
set_context_labels(Tags) when is_list(Tags) ->
erlang:put(?PD_KEY, Tags).
count_inc(Name, Value, Extra) ->
notify(count, Name, Value, Extra).
gauge_set(Name, Value, Extra) ->
notify(gauge, Name, Value, Extra).
histogram_observe(Name, Value, Extra) ->
notify(histogram, Name, Value, Extra).
rt(Name, Fun) ->
Start = erlang:monotonic_time(),
try
Fun()
after
notify(histogram, Name, erlang:monotonic_time() - Start, #{})
end.
notify(Type, Name, Value, Extra) ->
case application:get_env(?APP, metric_backend) of
{ok, Mod} ->
Extra1 = case erlang:get(?PD_KEY) of
undefined -> Extra;
ContextLabels ->
MsgLabels = maps:get(labels, Extra, []),
Extra#{labels => ContextLabels ++ MsgLabels}
end,
Mod:notify(Type, Name, Value, Extra1);
_ ->
false
end.
-spec passive_metrics() -> [{metric_type(), metric_name(), metric_doc(),
[{Labels, Value}]}]
when
Labels :: #{atom() => binary() | atom()},
Value :: integer() | float().
passive_metrics() ->
AppListeners = [Name || #{name := Name} <- application:get_env(?APP, ports, [])],
[{gauge, [?APP, connections, count],
"Count of ranch connections",
[{#{listener => H}, proplists:get_value(all_connections, P)}
|| {H, P} <- ranch:info(),
lists:member(H, AppListeners)]}].
-spec active_metrics() -> [{metric_type(), metric_name(), metric_doc(), Opts}]
when
Opts :: #{duration_units => atom(),
buckets => [number()],
labels => [atom()]}.
active_metrics() ->
[{count, [?APP, in_connection, total],
"MTP incoming connection",
#{labels => [listener]}},
{count, [?APP, in_connection_closed, total],
"MTP incoming connection closed",
#{labels => [listener]}},
{histogram, [?APP, session_lifetime, seconds],
"Time from in connection open to session process termination",
#{duration_unit => seconds,
buckets => [0.2, 0.5, 1, 5, 10, 30, 60, 150, 300, 600, 1200],
labels => [listener]
}},
{count, [?APP, inactive_timeout, total],
"Connection closed by timeout because of no activity",
#{labels => [listener]}},
{count, [?APP, inactive_hibernate, total],
"Connection goes to hibernate by timeout because of no activity",
#{labels => [listener]}},
{count, [?APP, timer_switch, total],
"Connection timeout mode switches",
#{labels => [listener, from, to]}},
{count, [?APP, tracker, bytes],
"Bytes transmitted according to tracker",
#{labels => [listener, direction]}},
{histogram, [?APP, tracker_packet_size, bytes],
"Proxied packet size",
#{labels => [listener, direction],
buckets => {exponential, 8, 4, 8}}},
{histogram, [?APP, tg_packet_size, bytes],
"Proxied telegram protocol packet size",
#{labels => [listener, direction],
buckets => {exponential, 8, 4, 8}}},
{count, [?APP, protocol_error, total],
"Proxy protocol errors",
#{labels => [listener, reason]}},
{count, [?APP, protocol_ok, total],
"Proxy upstream protocol type",
#{labels => [listener, protocol]}},
{count, [?APP, out_connect_ok, total],
"Proxy out connections",
#{labels => [listener, dc]}},
{count, [?APP, out_connect_error, total],
"Proxy out connect errors",
#{labels => [listener, reason]}},
{histogram, [?APP, upstream_send_duration, seconds],
"Duration of tcp send calls to upstream",
#{duration_unit => seconds,
%% buckets => ?MS_BUCKETS
labels => [listener]
}},
{histogram, [?APP, downstream_connect_duration, seconds],
"Duration of tcp connect to downstream",
#{duration_unit => seconds,
%% buckets => ?MS_BUCKETS
labels => [listener]
}},
{histogram, [?APP, downstream_send_duration, seconds],
"Duration of tcp send calls to downstream",
#{duration_unit => seconds,
%% buckets => ?MS_BUCKETS
labels => [listener]
}}
].
......@@ -56,6 +56,10 @@
%% connections to this proxy with "dd"-secrets. Connections by other
%% protocols will be immediately closed.
{allowed_protocols, [mtp_abridged, mtp_intermediate, mtp_secure]}
%% module with function `notify/4' exported.
%% See mtp_metric:notify/4 for details
%% {metric_backend, my_metric_backend}
]},
{modules, []},
......
......@@ -44,7 +44,7 @@ start_proxy(#{name := Name, port := Port, secret := Secret, tag := Tag} = P) ->
{port, Port},
{num_acceptors, NumAcceptors},
{max_connections, MaxConnections}],
mtp_handler, [Secret, Tag]),
mtp_handler, [Name, Secret, Tag]),
Url = io_lib:format(
"https://t.me/proxy?server=~s&port=~w&secret=~s",
[application:get_env(?APP, external_ip, ListenIpStr),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment