Don't crash pool when it's empty and `get/3` is called

parent 633319fb
...@@ -63,8 +63,13 @@ start_link() -> ...@@ -63,8 +63,13 @@ start_link() ->
get_downstream_safe(DcId, Opts) -> get_downstream_safe(DcId, Opts) ->
case get_downstream_pool(DcId) of case get_downstream_pool(DcId) of
{ok, Pool} -> {ok, Pool} ->
Downstream = mtp_dc_pool:get(Pool, self(), Opts), case mtp_dc_pool:get(Pool, self(), Opts) of
{DcId, Pool, Downstream}; Downstream when is_pid(Downstream) ->
{DcId, Pool, Downstream};
{error, empty} ->
%% TODO: maybe sleep and retry?
error({pool_empty, DcId, Pool})
end;
not_found -> not_found ->
[{?IDS_KEY, L}] = ets:lookup(?TAB, ?IDS_KEY), [{?IDS_KEY, L}] = ets:lookup(?TAB, ?IDS_KEY),
NewDcId = random_choice(L), NewDcId = random_choice(L),
......
...@@ -72,7 +72,8 @@ dc_to_pool_name(DcId) -> ...@@ -72,7 +72,8 @@ dc_to_pool_name(DcId) ->
valid_dc_id(DcId) orelse error(invalid_dc_id, [DcId]), valid_dc_id(DcId) orelse error(invalid_dc_id, [DcId]),
binary_to_atom(<<"mtp_dc_pool_", (integer_to_binary(DcId))/binary>>, utf8). binary_to_atom(<<"mtp_dc_pool_", (integer_to_binary(DcId))/binary>>, utf8).
-spec get(pid(), upstream(), #{addr := mtp_config:netloc_v4v6(),
ad_tag => binary()}) -> downstream() | {error, atom()}.
get(Pool, Upstream, #{addr := _} = Opts) -> get(Pool, Upstream, #{addr := _} = Opts) ->
gen_server:call(Pool, {get, Upstream, Opts}). gen_server:call(Pool, {get, Upstream, Opts}).
...@@ -101,8 +102,12 @@ init(DcId) -> ...@@ -101,8 +102,12 @@ init(DcId) ->
{ok, State2}. {ok, State2}.
handle_call({get, Upstream, Opts}, _From, State) -> handle_call({get, Upstream, Opts}, _From, State) ->
{Downstream, State1} = handle_get(Upstream, Opts, State), case handle_get(Upstream, Opts, State) of
{reply, Downstream, State1}; {empty, State1} ->
{reply, {error, empty}, State1};
{Downstream, State1} ->
{reply, Downstream, State1}
end;
handle_call(add_connection, _From, State) -> handle_call(add_connection, _From, State) ->
State1 = connect(State), State1 = connect(State),
{reply, ok, State1}; {reply, ok, State1};
...@@ -151,14 +156,18 @@ handle_connected(Pid, #state{pending_downstreams = Pending, ...@@ -151,14 +156,18 @@ handle_connected(Pid, #state{pending_downstreams = Pending,
handle_get(Upstream, Opts, #state{downstreams = Ds, handle_get(Upstream, Opts, #state{downstreams = Ds,
upstreams = Us} = St) -> upstreams = Us} = St) ->
{Downstream, N, Ds1} = ds_get(Ds), case ds_get(Ds) of
MonRef = erlang:monitor(process, Upstream), {Downstream, N, Ds1} ->
Us1 = Us#{Upstream => {Downstream, MonRef}}, MonRef = erlang:monitor(process, Upstream),
ok = mtp_down_conn:upstream_new(Downstream, Upstream, Opts), Us1 = Us#{Upstream => {Downstream, MonRef}},
{Downstream, maybe_spawn_connection( ok = mtp_down_conn:upstream_new(Downstream, Upstream, Opts),
N, {Downstream, maybe_spawn_connection(
St#state{downstreams = Ds1, N,
upstreams = Us1})}. St#state{downstreams = Ds1,
upstreams = Us1})};
empty ->
{empty, maybe_restart_connection(St)}
end.
handle_return(Upstream, #state{downstreams = Ds, handle_return(Upstream, #state{downstreams = Ds,
upstreams = Us} = St) -> upstreams = Us} = St) ->
...@@ -295,11 +304,15 @@ ds_add_downstream(Conn, St) -> ...@@ -295,11 +304,15 @@ ds_add_downstream(Conn, St) ->
pid_psq:add(Conn, St). pid_psq:add(Conn, St).
%% Get least loaded downstream connection %% Get least loaded downstream connection
-spec ds_get(ds_store()) -> {downstream(), pos_integer(), ds_store()}. -spec ds_get(ds_store()) -> {downstream(), pos_integer(), ds_store()} | empty.
ds_get(St) -> ds_get(St) ->
%% TODO: should return real number of connections %% TODO: should return real number of connections
{ok, {{Conn, N}, St1}} = pid_psq:get_min_priority(St), case pid_psq:get_min_priority(St) of
{Conn, N, St1}. {ok, {{Conn, N}, St1}} ->
{Conn, N, St1};
undefined ->
empty
end.
%% Return connection back to storage %% Return connection back to storage
-spec ds_return(downstream(), ds_store()) -> ds_store(). -spec ds_return(downstream(), ds_store()) -> ds_store().
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment