Add periodic config updates

parent 6a9d0377
%%%-------------------------------------------------------------------
%%% @author Sergey <me@seriyps.ru>
%%% @copyright (C) 2018, Sergey
%%% @doc
%%% Worker that updates datacenter config and proxy secret from
%%% https://core.telegram.org/getProxySecret
%%% and
%%% https://core.telegram.org/getProxyConfig
%%% @end
%%% Created : 10 Jun 2018 by Sergey <me@seriyps.ru>
%%%-------------------------------------------------------------------
-module(mtp_config).
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([get_downstream/1,
get_downstream_safe/1,
get_secret/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(TAB, ?MODULE).
-define(SECRET_URL, "https://core.telegram.org/getProxySecret").
-define(CONFIG_URL, "https://core.telegram.org/getProxyConfig").
-define(APP, mtproto_proxy).
-record(state, {tab :: ets:tid(),
timer :: gen_timeout:tout()}).
%%%===================================================================
%%% API
%%%===================================================================
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec get_downstream(integer()) -> {ok, {inet:ip4_address(), inet:port_number()}}.
get_downstream_safe(DcId) ->
case get_downstream(DcId) of
{ok, Addr} -> Addr;
not_found ->
[{_, {Min, Max}}] = ets:lookup(?TAB, id_range),
%% Get random DC; it might return 0 and recurse aggain
get_downstream_safe(crypto:rand_uniform(Min, Max + 1))
end.
get_downstream(DcId) ->
case ets:lookup(?TAB, {id, DcId}) of
[] ->
not_found;
[{_, Ip, Port}] ->
{ok, {Ip, Port}};
L ->
Size = length(L),
Idx = crypto:rand_uniform(1, Size + 1),
{_, Ip, Port} = lists:nth(Idx, L),
{ok, {Ip, Port}}
end.
-spec get_secret() -> binary().
get_secret() ->
[{_, Key}] = ets:lookup(?TAB, key),
Key.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
Timer = gen_timeout:new(
#{timeout => {env, ?APP, conf_refresh_interval, 3600},
unit => second}),
Tab = ets:new(?TAB, [bag,
protected,
named_table,
{read_concurrency, true}]),
{ok, update(#state{tab = Tab,
timer = Timer}, force)}.
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(timeout, #state{timer = Timer} =State) ->
case gen_timeout:is_expired(Timer) of
true ->
update(State, soft),
lager:info("Config updated"),
Timer1 = gen_timeout:bump(
gen_timeout:reset(Timer)),
{noreply, State#state{timer = Timer1}};
false ->
{noreply, State#state{timer = gen_timeout:reset(Timer)}}
end;
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
update(#state{tab = Tab}, force) ->
update_key(Tab),
update_config(Tab);
update(State, _) ->
try update(State, force)
catch Class:Reason ->
lager:error(
"Err updating proxy settings: ~s",
[lager:pr_stacktrace(erlang:get_stacktrace(), {Class, Reason})])
end.
update_key(Tab) ->
{ok, {{_, 200, _}, _, Body}} = httpc:request(?SECRET_URL),
true = ets:insert(Tab, {key, list_to_binary(Body)}).
update_config(Tab) ->
{ok, {{_, 200, _}, _, Body}} = httpc:request(?CONFIG_URL),
Downstreams = parse_config(Body),
Range = get_range(Downstreams),
update_downstreams(Downstreams, Tab),
update_range(Range, Tab).
parse_config(Body) ->
Lines = string:lexemes(Body, "\n"),
ProxyLines = lists:filter(
fun("proxy_for " ++ _) -> true;
(_) -> false
end, Lines),
[parse_downstream(Line) || Line <- ProxyLines].
parse_downstream(Line) ->
["proxy_for",
DcId,
IpPort] = string:lexemes(Line, " "),
[Ip, PortWithTrailer] = string:split(IpPort, ":", trailing),
Port = list_to_integer(string:trim(PortWithTrailer, trailing, ";")),
{ok, IpAddr} = inet:parse_ipv4strict_address(Ip),
{list_to_integer(DcId),
IpAddr,
Port}.
get_range(Downstreams) ->
IDsList = [Id || {Id, _, _} <- Downstreams],
{lists:min(IDsList),
lists:max(IDsList)}.
update_downstreams(Downstreams, Tab) ->
[true = ets:insert(Tab, {{id, Id}, Ip, Port})
|| {Id, Ip, Port} <- Downstreams].
update_range(Range, Tab) ->
true = ets:insert(Tab, {id_range, Range}).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
parse_test() ->
Config = ("# force_probability 1 10
proxy_for 1 149.154.175.50:8888;
proxy_for -1 149.154.175.50:8888;
proxy_for 2 149.154.162.39:80;
proxy_for 2 149.154.162.33:80;"),
Expect = [{1, {149, 154, 175, 50}, 8888},
{-1, {149, 154, 175, 50}, 8888},
{2, {149, 154, 162, 39}, 80},
{2, {149, 154, 162, 33},80}],
?assertEqual(Expect, parse_config(Config)).
-endif.
...@@ -22,25 +22,6 @@ ...@@ -22,25 +22,6 @@
-define(MAX_SOCK_BUF_SIZE, 1024 * 300). % Decrease if CPU is cheaper than RAM -define(MAX_SOCK_BUF_SIZE, 1024 * 300). % Decrease if CPU is cheaper than RAM
-define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb -define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb
%% TODO: download from https://core.telegram.org/getProxyConfig
-define(TG_MIDDLE_PROXIES_V4,
{
{{149, 154, 175, 50}, 8888},
{{149, 154, 162, 38}, 80},
{{149, 154, 175, 100}, 8888},
{{91, 108, 4, 136}, 8888},
{{91, 108, 56, 181}, 8888}
}).
%% TODO: download from https://core.telegram.org/getProxySecret
-define(PROXY_SECRET,
<<196,249,250,202,150,120,230,187,72,173,108,126,44,229,192,210,68,48,100,
93,85,74,221,235,85,65,158,3,77,166,39,33,208,70,234,171,110,82,171,20,
169,90,68,62,207,179,70,62,121,160,90,102,97,42,223,156,174,218,139,233,
168,13,166,152,111,176,166,255,56,122,248,77,136,239,58,100,19,113,62,92,
51,119,246,225,163,212,125,153,245,224,197,110,236,232,240,92,84,196,144,
176,121,227,27,239,130,255,14,232,242,176,163,39,86,210,73,197,242,18,105,
129,108,183,6,27,38,93,178,18>>).
-define(APP, mtproto_proxy). -define(APP, mtproto_proxy).
-record(state, -record(state,
...@@ -335,14 +316,7 @@ down_send(Packet, #state{down_sock = Sock, ...@@ -335,14 +316,7 @@ down_send(Packet, #state{down_sock = Sock,
handle_upstream_header(DcId, S) -> handle_upstream_header(DcId, S) ->
{Addr, Port} = {Addr, Port} = mtp_config:get_downstream_safe(DcId),
try element(DcId, ?TG_MIDDLE_PROXIES_V4)
catch error:badarg ->
OtherDcId = (DcId rem tuple_size(?TG_MIDDLE_PROXIES_V4)) + 1,
lager:warning("Wrong DC id: ~p; will use ~p",
[DcId, OtherDcId]),
element(OtherDcId, ?TG_MIDDLE_PROXIES_V4)
end,
case connect(Addr, Port) of case connect(Addr, Port) of
{ok, Sock} -> {ok, Sock} ->
...@@ -385,7 +359,7 @@ connect(Host, Port) -> ...@@ -385,7 +359,7 @@ connect(Host, Port) ->
down_handshake1(S) -> down_handshake1(S) ->
RpcNonce = ?RPC_NONCE, RpcNonce = ?RPC_NONCE,
<<KeySelector:4/binary, _/binary>> = ?PROXY_SECRET, <<KeySelector:4/binary, _/binary>> = Key = mtp_config:get_secret(),
CryptoTs = os:system_time(seconds), CryptoTs = os:system_time(seconds),
Nonce = crypto:strong_rand_bytes(16), Nonce = crypto:strong_rand_bytes(16),
Msg = <<RpcNonce/binary, Msg = <<RpcNonce/binary,
...@@ -396,11 +370,11 @@ down_handshake1(S) -> ...@@ -396,11 +370,11 @@ down_handshake1(S) ->
Full = mtp_full:new(-2, -2), Full = mtp_full:new(-2, -2),
S1 = S#state{down_codec = mtp_layer:new(mtp_full, Full), S1 = S#state{down_codec = mtp_layer:new(mtp_full, Full),
stage = down_handshake_1, stage = down_handshake_1,
stage_state = {KeySelector, Nonce, CryptoTs}}, stage_state = {KeySelector, Nonce, CryptoTs, Key}},
down_send(Msg, S1). down_send(Msg, S1).
down_handshake2(<<Type:4/binary, KeySelector:4/binary, Schema:32/little, _CryptoTs:4/binary, down_handshake2(<<Type:4/binary, KeySelector:4/binary, Schema:32/little, _CryptoTs:4/binary,
SrvNonce:16/binary>>, #state{stage_state = {MyKeySelector, CliNonce, MyTs}, SrvNonce:16/binary>>, #state{stage_state = {MyKeySelector, CliNonce, MyTs, Key},
down_sock = Sock, down_sock = Sock,
down_codec = DownCodec} = S) -> down_codec = DownCodec} = S) ->
(Type == ?RPC_NONCE) orelse error({wrong_rpc_type, Type}), (Type == ?RPC_NONCE) orelse error({wrong_rpc_type, Type}),
...@@ -412,7 +386,7 @@ down_handshake2(<<Type:4/binary, KeySelector:4/binary, Schema:32/little, _Crypto ...@@ -412,7 +386,7 @@ down_handshake2(<<Type:4/binary, KeySelector:4/binary, Schema:32/little, _Crypto
MyIpBin = mtp_obfuscated:bin_rev(mtp_rpc:inet_pton(MyIp)), MyIpBin = mtp_obfuscated:bin_rev(mtp_rpc:inet_pton(MyIp)),
Args = #{srv_n => SrvNonce, clt_n => CliNonce, clt_ts => MyTs, Args = #{srv_n => SrvNonce, clt_n => CliNonce, clt_ts => MyTs,
srv_ip => DownIpBin, srv_port => DownPort, srv_ip => DownIpBin, srv_port => DownPort,
clt_ip => MyIpBin, clt_port => MyPort, secret => ?PROXY_SECRET}, clt_ip => MyIpBin, clt_port => MyPort, secret => Key},
{EncKey, EncIv} = get_middle_key(Args#{purpose => <<"CLIENT">>}), {EncKey, EncIv} = get_middle_key(Args#{purpose => <<"CLIENT">>}),
{DecKey, DecIv} = get_middle_key(Args#{purpose => <<"SERVER">>}), {DecKey, DecIv} = get_middle_key(Args#{purpose => <<"SERVER">>}),
CryptoCodec = mtp_layer:new(mtp_aes_cbc, mtp_aes_cbc:new(EncKey, EncIv, DecKey, DecIv, 16)), CryptoCodec = mtp_layer:new(mtp_aes_cbc, mtp_aes_cbc:new(EncKey, EncIv, DecKey, DecIv, 16)),
...@@ -500,6 +474,14 @@ track(Direction, Data) -> ...@@ -500,6 +474,14 @@ track(Direction, Data) ->
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(PROXY_SECRET,
<<196,249,250,202,150,120,230,187,72,173,108,126,44,229,192,210,68,48,100,
93,85,74,221,235,85,65,158,3,77,166,39,33,208,70,234,171,110,82,171,20,
169,90,68,62,207,179,70,62,121,160,90,102,97,42,223,156,174,218,139,233,
168,13,166,152,111,176,166,255,56,122,248,77,136,239,58,100,19,113,62,92,
51,119,246,225,163,212,125,153,245,224,197,110,236,232,240,92,84,196,144,
176,121,227,27,239,130,255,14,232,242,176,163,39,86,210,73,197,242,18,105,
129,108,183,6,27,38,93,178,18>>).
middle_key_test() -> middle_key_test() ->
Args = #{srv_port => 80, Args = #{srv_port => 80,
......
...@@ -94,7 +94,7 @@ init_up_decrypt(Bin, Secret) -> ...@@ -94,7 +94,7 @@ init_up_decrypt(Bin, Secret) ->
{KeyHash, IV}. {KeyHash, IV}.
get_dc(<<_:60/binary, DcId:16/signed-little-integer, _/binary>>) -> get_dc(<<_:60/binary, DcId:16/signed-little-integer, _/binary>>) ->
abs(DcId). DcId.
new(EncKey, EncIV, DecKey, DecIV) -> new(EncKey, EncIV, DecKey, DecIV) ->
......
...@@ -7,6 +7,8 @@ ...@@ -7,6 +7,8 @@
[lager, [lager,
ranch, ranch,
crypto, crypto,
ssl,
inets,
kernel, kernel,
stdlib stdlib
]}, ]},
......
...@@ -28,7 +28,8 @@ start_link() -> ...@@ -28,7 +28,8 @@ start_link() ->
%% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules} %% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
init([]) -> init([]) ->
Childs = [ Childs = [#{id => mtp_config,
start => {mtp_config, start_link, []}}
], ],
{ok, {#{strategy => rest_for_one, {ok, {#{strategy => rest_for_one,
intensity => 50, intensity => 50,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment