Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
M
mtproto_proxy
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
mtproto_proxy
Commits
f67f88f8
Unverified
Commit
f67f88f8
authored
Oct 18, 2018
by
Сергей Прохоров
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Metrics fixed
parent
91b89061
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
78 additions
and
72 deletions
+78
-72
mtp_down_conn.erl
src/mtp_down_conn.erl
+18
-13
mtp_handler.erl
src/mtp_handler.erl
+24
-25
mtp_metric.erl
src/mtp_metric.erl
+36
-34
No files found.
src/mtp_down_conn.erl
View file @
f67f88f8
...
...
@@ -90,6 +90,8 @@ handle_cast(shutdown, State) ->
handle_info
({
tcp
,
Sock
,
Data
},
#state
{
sock
=
Sock
}
=
S
)
->
mtp_metric
:
count_inc
([
?
APP
,
received
,
bytes
],
byte_size
(
Data
),
#
{
labels
=>
[
downstream
]}),
mtp_metric
:
histogram_observe
([
?
APP
,
tracker_packet_size
,
bytes
],
byte_size
(
Data
),
#
{
labels
=>
[
downstream
]}),
case
handle_downstream_data
(
Data
,
S
)
of
{
ok
,
S1
}
->
ok
=
inet
:
setopts
(
Sock
,
[{
active
,
once
}]),
...
...
@@ -219,14 +221,14 @@ handle_rpc({simple_ack, ConnId, Confirm}, S) ->
up_send
({
simple_ack
,
self
(),
Confirm
},
ConnId
,
S
).
-
spec
down_send
(
iodata
(),
#state
{})
->
{
ok
,
#state
{}}.
down_send
(
Packet
,
#state
{
sock
=
Sock
,
codec
=
Codec
}
=
St
)
->
down_send
(
Packet
,
#state
{
sock
=
Sock
,
codec
=
Codec
,
dc_id
=
DcId
}
=
St
)
->
%% lager:debug("Up>Down: ~w", [Packet]),
{
Encoded
,
Codec1
}
=
mtp_layer
:
encode_packet
(
Packet
,
Codec
),
mtp_metric
:
rt
(
[
?
APP
,
downstream_send_duration
,
seconds
],
fun
()
->
ok
=
gen_tcp
:
send
(
Sock
,
Encoded
)
end
),
end
,
#
{
labels
=>
[
DcId
]}
),
{
ok
,
St
#state
{
codec
=
Codec1
}}.
...
...
@@ -245,13 +247,19 @@ up_send(Packet, ConnId, #state{upstreams_rev = UpsRev} = St) ->
connect
(
DcId
,
S
)
->
{
ok
,
{
Host
,
Port
}}
=
mtp_config
:
get_netloc
(
DcId
),
{
ok
,
Sock
}
=
tcp_connect
(
Host
,
Port
),
mtp_metric
:
count_inc
([
?
APP
,
out_connect_ok
,
total
],
1
,
#
{
labels
=>
[
DcId
]}),
AddrStr
=
inet
:
ntoa
(
Host
),
lager
:
info
(
"
~s
:
~p
: TCP connected"
,
[
AddrStr
,
Port
]),
down_handshake1
(
S
#state
{
sock
=
Sock
,
netloc
=
{
Host
,
Port
}}).
case
tcp_connect
(
Host
,
Port
)
of
{
ok
,
Sock
}
->
mtp_metric
:
count_inc
([
?
APP
,
out_connect_ok
,
total
],
1
,
#
{
labels
=>
[
DcId
]}),
AddrStr
=
inet
:
ntoa
(
Host
),
lager
:
info
(
"
~s
:
~p
: TCP connected"
,
[
AddrStr
,
Port
]),
down_handshake1
(
S
#state
{
sock
=
Sock
,
netloc
=
{
Host
,
Port
}});
{
error
,
Reason
}
=
Err
->
mtp_metric
:
count_inc
([
?
APP
,
out_connect_error
,
total
],
1
,
#
{
labels
=>
[
DcId
,
Reason
]}),
{
Err
,
S
}
end
.
tcp_connect
(
Host
,
Port
)
->
SockOpts
=
[{
active
,
once
},
...
...
@@ -260,10 +268,7 @@ tcp_connect(Host, Port) ->
{
send_timeout
,
?
SEND_TIMEOUT
},
%% {nodelay, true},
{
keepalive
,
true
}],
case
mtp_metric
:
rt
([
?
APP
,
downstream_connect_duration
,
seconds
],
fun
()
->
gen_tcp
:
connect
(
Host
,
Port
,
SockOpts
,
?
CONN_TIMEOUT
)
end
)
of
case
gen_tcp
:
connect
(
Host
,
Port
,
SockOpts
,
?
CONN_TIMEOUT
)
of
{
ok
,
Sock
}
->
ok
=
inet
:
setopts
(
Sock
,
[
%% {recbuf, ?MAX_SOCK_BUF_SIZE},
%% {sndbuf, ?MAX_SOCK_BUF_SIZE},
...
...
src/mtp_handler.erl
View file @
f67f88f8
...
...
@@ -33,6 +33,7 @@
acc
=
<<>>
::
any
(),
secret
::
binary
(),
listener
::
atom
(),
sock
::
gen_tcp
:
socket
(),
transport
::
transport
(),
...
...
@@ -82,13 +83,11 @@ ranch_init({Ref, Transport, Opts}) ->
]),
gen_server
:
enter_loop
(
?
MODULE
,
[],
State
);
error
->
mtp_metric
:
count_inc
([
?
APP
,
in_connection_closed
,
total
],
1
,
#
{}),
exit
(
normal
)
end
.
init
({
Socket
,
Transport
,
[
Name
,
Secret
,
Tag
]})
->
mtp_metric
:
set_context_labels
([
Name
]),
mtp_metric
:
count_inc
([
?
APP
,
in_connection
,
total
],
1
,
#
{}),
mtp_metric
:
count_inc
([
?
APP
,
in_connection
,
total
],
1
,
#
{
labels
=>
[
Name
]}),
case
Transport
:
peername
(
Socket
)
of
{
ok
,
{
Ip
,
Port
}}
->
lager
:
info
(
"
~s
: new connection
~s
:
~p
"
,
[
Name
,
inet
:
ntoa
(
Ip
),
Port
]),
...
...
@@ -97,6 +96,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) ->
#
{
timeout
=>
{
env
,
?
APP
,
TimeoutKey
,
TimeoutDefault
}}),
State
=
#state
{
sock
=
Socket
,
secret
=
unhex
(
Secret
),
listener
=
Name
,
transport
=
Transport
,
ad_tag
=
unhex
(
Tag
),
addr
=
{
Ip
,
Port
},
...
...
@@ -104,6 +104,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) ->
timer
=
Timer
},
{
ok
,
State
};
{
error
,
Reason
}
->
mtp_metric
:
count_inc
([
?
APP
,
in_connection_closed
,
total
],
1
,
#
{
labels
=>
[
Name
]}),
lager
:
info
(
"Can't read peername:
~p
"
,
[
Reason
]),
error
end
.
...
...
@@ -112,7 +113,7 @@ handle_call(_Request, _From, State) ->
Reply
=
ok
,
{
reply
,
Reply
,
State
}.
handle_cast
({
proxy_ans
,
Down
,
Data
},
#state
{
down
=
Down
}
=
S
)
->
handle_cast
({
proxy_ans
,
Down
,
Data
},
#state
{
down
=
Down
,
listener
=
Listener
}
=
S
)
->
%% telegram server -> proxy
case
up_send
(
Data
,
S
)
of
{
ok
,
S1
}
->
...
...
@@ -135,7 +136,9 @@ handle_cast(Other, State) ->
handle_info
({
tcp
,
Sock
,
Data
},
#state
{
sock
=
Sock
,
transport
=
Transport
}
=
S
)
->
%% client -> proxy
track
(
rx
,
Data
),
Size
=
byte_size
(
Data
),
mtp_metric
:
count_inc
([
?
APP
,
received
,
bytes
],
Size
,
#
{
labels
=>
[
upstream
]}),
mtp_metric
:
histogram_observe
([
?
APP
,
tracker_packet_size
,
bytes
],
Size
,
#
{
labels
=>
[
upstream
]}),
case
handle_upstream_data
(
Data
,
S
)
of
{
ok
,
S1
}
->
ok
=
Transport
:
setopts
(
Sock
,
[{
active
,
once
}]),
...
...
@@ -151,15 +154,15 @@ handle_info({tcp_error, Sock, Reason}, #state{sock = Sock} = S) ->
lager
:
info
(
"upstream sock error:
~p
"
,
[
Reason
]),
{
stop
,
Reason
,
maybe_close_down
(
S
)};
handle_info
(
timeout
,
#state
{
timer
=
Timer
,
timer_state
=
TState
}
=
S
)
->
handle_info
(
timeout
,
#state
{
timer
=
Timer
,
timer_state
=
TState
,
listener
=
Listener
}
=
S
)
->
case
gen_timeout
:
is_expired
(
Timer
)
of
true
when
TState
==
stop
;
TState
==
init
->
mtp_metric
:
count_inc
([
?
APP
,
inactive_timeout
,
total
],
1
,
#
{}),
mtp_metric
:
count_inc
([
?
APP
,
inactive_timeout
,
total
],
1
,
#
{
labels
=>
[
Listener
]
}),
lager
:
info
(
"inactive timeout in state
~p
"
,
[
TState
]),
{
stop
,
normal
,
S
};
true
when
TState
==
hibernate
->
mtp_metric
:
count_inc
([
?
APP
,
inactive_hibernate
,
total
],
1
,
#
{}),
mtp_metric
:
count_inc
([
?
APP
,
inactive_hibernate
,
total
],
1
,
#
{
labels
=>
[
Listener
]
}),
{
noreply
,
switch_timer
(
S
,
stop
),
hibernate
};
false
->
Timer1
=
gen_timeout
:
reset
(
Timer
),
...
...
@@ -169,13 +172,13 @@ handle_info(Other, S) ->
lager
:
warning
(
"Unexpected msg
~p
"
,
[
Other
]),
{
noreply
,
S
}.
terminate
(_
Reason
,
#state
{
started_at
=
Started
}
=
S
)
->
terminate
(_
Reason
,
#state
{
started_at
=
Started
,
listener
=
Listener
}
=
S
)
->
maybe_close_down
(
S
),
mtp_metric
:
count_inc
([
?
APP
,
in_connection_closed
,
total
],
1
,
#
{}),
mtp_metric
:
count_inc
([
?
APP
,
in_connection_closed
,
total
],
1
,
#
{
labels
=>
[
Listener
]
}),
Lifetime
=
erlang
:
system_time
(
millisecond
)
-
Started
,
mtp_metric
:
histogram_observe
(
[
?
APP
,
session_lifetime
,
seconds
],
erlang
:
convert_time_unit
(
Lifetime
,
millisecond
,
native
),
#
{}),
erlang
:
convert_time_unit
(
Lifetime
,
millisecond
,
native
),
#
{
labels
=>
[
Listener
]
}),
lager
:
info
(
"terminate
~p
"
,
[_
Reason
]),
ok
.
...
...
@@ -199,9 +202,9 @@ bump_timer(#state{timer = Timer, timer_state = TState} = S) ->
switch_timer
(
#state
{
timer_state
=
TState
}
=
S
,
TState
)
->
S
;
switch_timer
(
#state
{
timer_state
=
FromState
,
timer
=
Timer
}
=
S
,
ToState
)
->
switch_timer
(
#state
{
timer_state
=
FromState
,
timer
=
Timer
,
listener
=
Listener
}
=
S
,
ToState
)
->
mtp_metric
:
count_inc
([
?
APP
,
timer_switch
,
total
],
1
,
#
{
labels
=>
[
FromState
,
ToState
]}),
#
{
labels
=>
[
Listener
,
FromState
,
ToState
]}),
{
NewTimeKey
,
NewTimeDefault
}
=
state_timeout
(
ToState
),
Timer1
=
gen_timeout
:
set_timeout
(
{
env
,
?
APP
,
NewTimeKey
,
NewTimeDefault
},
Timer
),
...
...
@@ -220,7 +223,8 @@ state_timeout(stop) ->
%% Handle telegram client -> proxy stream
handle_upstream_data
(
Bin
,
#state
{
stage
=
tunnel
,
codec
=
UpCodec
}
=
S
)
->
codec
=
UpCodec
,
listener
=
Listener
}
=
S
)
->
{
ok
,
S3
,
UpCodec1
}
=
mtp_layer
:
fold_packets
(
fun
(
Decoded
,
S1
)
->
...
...
@@ -233,11 +237,11 @@ handle_upstream_data(Bin, #state{stage = tunnel,
end
,
S
,
Bin
,
UpCodec
),
{
ok
,
S3
#state
{
codec
=
UpCodec1
}};
handle_upstream_data
(
<<
Header
:
64
/
binary
,
Rest
/
binary
>>
,
#state
{
stage
=
init
,
stage_state
=
<<>>
,
secret
=
Secret
}
=
S
)
->
secret
=
Secret
,
listener
=
Listener
}
=
S
)
->
case
mtp_obfuscated
:
from_header
(
Header
,
Secret
)
of
{
ok
,
DcId
,
PacketLayerMod
,
ObfuscatedCodec
}
->
mtp_metric
:
count_inc
([
?
APP
,
protocol_ok
,
total
],
1
,
#
{
labels
=>
[
PacketLayerMod
]}),
1
,
#
{
labels
=>
[
Listener
,
PacketLayerMod
]}),
ObfuscatedLayer
=
mtp_layer
:
new
(
mtp_obfuscated
,
ObfuscatedCodec
),
PacketLayer
=
mtp_layer
:
new
(
PacketLayerMod
,
PacketLayerMod
:
new
()),
UpCodec
=
mtp_layer
:
new
(
mtp_wrap
,
mtp_wrap
:
new
(
PacketLayer
,
...
...
@@ -249,7 +253,7 @@ handle_upstream_data(<<Header:64/binary, Rest/binary>>, #state{stage = init, sta
stage_state
=
undefined
});
{
error
,
Reason
}
=
Err
->
mtp_metric
:
count_inc
([
?
APP
,
protocol_error
,
total
],
1
,
#
{
labels
=>
[
Reason
]}),
1
,
#
{
labels
=>
[
Reason
]}),
Err
end
;
handle_upstream_data
(
Bin
,
#state
{
stage
=
init
,
stage_state
=
<<>>
}
=
S
)
->
...
...
@@ -261,7 +265,8 @@ handle_upstream_data(Bin, #state{stage = init, stage_state = Buf} = S) ->
up_send
(
Packet
,
#state
{
stage
=
tunnel
,
codec
=
UpCodec
,
sock
=
Sock
,
transport
=
Transport
}
=
S
)
->
transport
=
Transport
,
listener
=
Listener
}
=
S
)
->
%% lager:debug(">Up: ~p", [Packet]),
{
Encoded
,
UpCodec1
}
=
mtp_layer
:
encode_packet
(
Packet
,
UpCodec
),
mtp_metric
:
rt
([
?
APP
,
upstream_send_duration
,
seconds
],
...
...
@@ -272,7 +277,7 @@ up_send(Packet, #state{stage = tunnel,
is_atom
(
Reason
)
andalso
mtp_metric
:
count_inc
(
[
?
APP
,
upstream_send_error
,
total
],
1
,
#
{
labels
=>
[
Reason
]}),
#
{
labels
=>
[
Listener
,
Reason
]}),
lager
:
warning
(
"Upstream send error:
~p
"
,
[
Reason
]),
throw
({
stop
,
normal
,
S
})
end
...
...
@@ -315,9 +320,3 @@ unhex(Chars) ->
(
C
)
when
C
>
$W
->
C
-
$W
end
,
<<
<<
(
UnHChar
(
C
)):
4
>>
||
<<
C
>>
<=
Chars
>>
.
track
(
Direction
,
Data
)
->
Size
=
byte_size
(
Data
),
mtp_metric
:
count_inc
([
?
APP
,
tracker
,
bytes
],
Size
,
#
{
labels
=>
[
Direction
]}),
mtp_metric
:
histogram_observe
([
?
APP
,
tracker_packet_size
,
bytes
],
Size
,
#
{
labels
=>
[
Direction
]}).
src/mtp_metric.erl
View file @
f67f88f8
...
...
@@ -11,8 +11,7 @@
-
export
([
count_inc
/
3
,
gauge_set
/
3
,
histogram_observe
/
3
,
rt
/
2
,
set_context_labels
/
1
]).
rt
/
2
,
rt
/
3
]).
-
export
([
passive_metrics
/
0
,
active_metrics
/
0
]).
...
...
@@ -24,10 +23,6 @@
-
type
metric_name
()
::
[
atom
()].
-
type
metric_doc
()
::
string
().
set_context_labels
(
Tags
)
when
is_list
(
Tags
)
->
erlang
:
put
(
?
PD_KEY
,
Tags
).
count_inc
(
Name
,
Value
,
Extra
)
->
notify
(
count
,
Name
,
Value
,
Extra
).
...
...
@@ -38,24 +33,21 @@ histogram_observe(Name, Value, Extra) ->
notify
(
histogram
,
Name
,
Value
,
Extra
).
rt
(
Name
,
Fun
)
->
rt
(
Name
,
Fun
,
#
{}).
rt
(
Name
,
Fun
,
Extra
)
->
Start
=
erlang
:
monotonic_time
(),
try
Fun
()
after
notify
(
histogram
,
Name
,
erlang
:
monotonic_time
()
-
Start
,
#
{}
)
notify
(
histogram
,
Name
,
erlang
:
monotonic_time
()
-
Start
,
Extra
)
end
.
notify
(
Type
,
Name
,
Value
,
Extra
)
->
case
application
:
get_env
(
?
APP
,
metric_backend
)
of
{
ok
,
Mod
}
->
Extra1
=
case
erlang
:
get
(
?
PD_KEY
)
of
undefined
->
Extra
;
ContextLabels
->
MsgLabels
=
maps
:
get
(
labels
,
Extra
,
[]),
Extra
#
{
labels
=>
ContextLabels
++
MsgLabels
}
end
,
Mod
:
notify
(
Type
,
Name
,
Value
,
Extra1
);
Mod
:
notify
(
Type
,
Name
,
Value
,
Extra
);
_
->
false
end
.
...
...
@@ -66,11 +58,30 @@ notify(Type, Name, Value, Extra) ->
Labels
::
#
{
atom
()
=>
binary
()
|
atom
()},
Value
::
integer
()
|
float
().
passive_metrics
()
->
DownStatus
=
mtp_config
:
status
(),
[{
gauge
,
[
?
APP
,
dc_num_downstreams
],
"Count of connections to downstream"
,
[{
#
{
dc
=>
DcId
},
NDowns
}
||
#
{
n_downstreams
:
=
NDowns
,
dc_id
:
=
DcId
}
<-
DownStatus
]},
{
gauge
,
[
?
APP
,
dc_num_upstreams
],
"Count of upstreams connected to DC"
,
[{
#
{
dc
=>
DcId
},
NUps
}
||
#
{
n_upstreams
:
=
NUps
,
dc_id
:
=
DcId
}
<-
DownStatus
]},
{
gauge
,
[
?
APP
,
dc_upstreams_per_downstream
],
"Count of upstreams connected to DC"
,
lists
:
flatmap
(
fun
(
#
{
min
:
=
Min
,
max
:
=
Max
,
dc_id
:
=
DcId
})
->
[{
#
{
dc
=>
DcId
,
meter
=>
min
},
Min
},
{
#
{
dc
=>
DcId
,
meter
=>
max
},
Max
}]
end
,
DownStatus
)}
|
[{
gauge
,
[
?
APP
,
connections
,
count
],
"Count of ranch connections"
,
[{
#
{
listener
=>
H
},
proplists
:
get_value
(
all_connections
,
P
)}
||
{
H
,
P
}
<-
ranch
:
info
(),
proplists
:
get_value
(
protocol
,
P
)
==
mtp_handler
]}].
proplists
:
get_value
(
protocol
,
P
)
==
mtp_handler
]}]
]
.
-
spec
active_metrics
()
->
[{
metric_type
(),
metric_name
(),
metric_doc
(),
Opts
}]
when
...
...
@@ -101,32 +112,32 @@ active_metrics() ->
"Connection timeout mode switches"
,
#
{
labels
=>
[
listener
,
from
,
to
]}},
{
count
,
[
?
APP
,
tracker
,
bytes
],
"Bytes transmitted
according to tracker
"
,
#
{
labels
=>
[
listener
,
direction
]}},
{
count
,
[
?
APP
,
received
,
bytes
],
"Bytes transmitted
from upstream/downstream socket
"
,
#
{
labels
=>
[
direction
]}},
{
histogram
,
[
?
APP
,
tracker_packet_size
,
bytes
],
"
Proxi
ed packet size"
,
#
{
labels
=>
[
listener
,
direction
],
"
Receiv
ed packet size"
,
#
{
labels
=>
[
direction
],
buckets
=>
{
exponential
,
8
,
4
,
8
}}},
{
histogram
,
[
?
APP
,
tg_packet_size
,
bytes
],
"Proxied telegram protocol packet size"
,
#
{
labels
=>
[
listener
,
direction
],
#
{
labels
=>
[
direction
],
buckets
=>
{
exponential
,
8
,
4
,
8
}}},
{
count
,
[
?
APP
,
protocol_error
,
total
],
"Proxy protocol errors"
,
#
{
labels
=>
[
listener
,
reason
]}},
#
{
labels
=>
[
reason
]}},
{
count
,
[
?
APP
,
protocol_ok
,
total
],
"Proxy upstream protocol type"
,
#
{
labels
=>
[
listener
,
protocol
]}},
{
count
,
[
?
APP
,
out_connect_ok
,
total
],
"Proxy out connections"
,
#
{
labels
=>
[
listener
,
dc_id
]}},
#
{
labels
=>
[
dc_id
]}},
{
count
,
[
?
APP
,
out_connect_error
,
total
],
"Proxy out connect errors"
,
#
{
labels
=>
[
listener
,
reason
]}},
#
{
labels
=>
[
dc_id
,
reason
]}},
{
histogram
,
[
?
APP
,
upstream_send_duration
,
seconds
],
...
...
@@ -135,22 +146,13 @@ active_metrics() ->
%% buckets => ?MS_BUCKETS
labels
=>
[
listener
]
}},
{
histogram
,
[
?
APP
,
downstream_connect_duration
,
seconds
],
"Duration of tcp connect to downstream"
,
#
{
duration_unit
=>
seconds
,
%% buckets => ?MS_BUCKETS
labels
=>
[
listener
]
}},
{
histogram
,
[
?
APP
,
downstream_send_duration
,
seconds
],
"Duration of tcp send calls to downstream"
,
#
{
duration_unit
=>
seconds
,
%% buckets => ?MS_BUCKETS
labels
=>
[
listener
]
labels
=>
[
dc
]
}},
{
count
,
[
?
APP
,
upstream_send_error
,
total
],
"Count of tcp send errors to upstream"
,
#
{
labels
=>
[
listener
,
reason
]}},
{
count
,
[
?
APP
,
downstream_send_error
,
total
],
"Count of tcp send errors to downstream"
,
#
{
labels
=>
[
listener
,
reason
]}}
].
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment