Skip to content

Commit

Permalink
pool can now be started via environment variables.
Browse files Browse the repository at this point in the history
  • Loading branch information
David Weldon committed Apr 12, 2011
1 parent a3a982a commit af09b1b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 17 deletions.
10 changes: 10 additions & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,13 @@ the complete documentation by running `make doc`.
{ok,pong}
4> riakpool:count().
1

Starting the Pool
-----------------
Prior to any calls to `riakpool:execute/1`, the pool must be started. This can
be accomplished in one of two ways:

1. Before the server is started, set the riakpool application environment
variables `riakpool_host` and `riakpool_port`.
2. After the server is started, call `riakpool:start_pool/0` or
`riakpool:start_pool/2` (see previous section).
58 changes: 42 additions & 16 deletions src/riakpool.erl
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
%% @author David Weldon
%% @doc riakpool implements a pool of riak protocol buffer clients. After the
%% server is started, the pool must be initialized with a call to
%% {@link start_pool/0} or {@link start_pool/2}. In order to use a connection, a
%% call to {@link execute/1} must be made. This will check out a connection from
%% the pool, use it, and check it back in. This ensures that a given connection
%% can only be in use by one external process at a time. If no existing
%% connections are found, a new one will be established. Note this means that
%% the pool will always be the size of the last peak need. The number of
%% connections can be checked with {@link count/0}.
%% @doc riakpool implements a pool of riak protocol buffer clients. In order to
%% use a connection, a call to {@link execute/1} must be made. This will check
%% out a connection from the pool, use it, and check it back in. This ensures
%% that a given connection can only be in use by one external process at a time.
%% If no existing connections are found, a new one will be established. Note
%% this means the pool will always be the size of the last peak need. The number
%% of connections can be checked with {@link count/0}.
%%
%% Prior to any calls to {@link execute/1}, the pool must be started. This can
%% be accomplished in one of two ways:
%%
%% 1. Before the server is started, set the riakpool application environment
%% variables `riakpool_host' and `riakpool_port'.
%%
%% 2. After the server is started, call {@link start_pool/0} or
%% {@link start_pool/2}

-module(riakpool).
-behaviour(gen_server).
Expand Down Expand Up @@ -42,6 +49,10 @@ count() ->
%% `Fun(Pid)'. Returns `{ok, Value}' if the call was successful, and `error'
%% otherwise. If no connection could be found, a new connection will be
%% established.
%% ```
%% > riakpool:execute(fun(C) -> riakc_pb_socket:ping(C) end).
%% {ok,pong}
%% '''
execute(Fun) ->
case gen_server:call(?MODULE, check_out) of
{ok, Pid} ->
Expand Down Expand Up @@ -71,16 +82,19 @@ start_pool(Host, Port) when is_integer(Port) ->
stop() -> gen_server:cast(?MODULE, stop).

%% @hidden
init([]) -> {ok, undefined}.
init([]) ->
case [application:get_env(P) || P <- [riakpool_host, riakpool_port]] of
[{ok, Host}, {ok, Port}] when is_integer(Port) ->
self() ! {start_pool, Host, Port};
_ -> ok
end,
{ok, undefined}.

%% @hidden
handle_call({start_pool, Host, Port}, _From, undefined) ->
case new_connection(Host, Port) of
{ok, Pid} ->
Pids = queue:in(Pid, queue:new()),
State = #state{host=Host, port=Port, pids=Pids},
{reply, ok, State};
error -> {reply, {error, connection_error}, undefined}
case new_state(Host, Port) of
State=#state{} -> {reply, ok, State};
undefined -> {reply, {error, connection_error}, undefined}
end;
handle_call({start_pool, _Host, _Port}, _From, State=#state{}) ->
{reply, {error, pool_already_started}, State};
Expand All @@ -100,6 +114,8 @@ handle_cast(stop, State) -> {stop, normal, State};
handle_cast(_Msg, State) -> {noreply, State}.

%% @hidden
handle_info({start_pool, Host, Port}, undefined) ->
{noreply, new_state(Host, Port)};
handle_info(_Info, State) -> {noreply, State}.

%% @hidden
Expand All @@ -117,6 +133,16 @@ terminate(_Reason, #state{pids=Pids}) ->
%% @hidden
code_change(_OldVsn, State, _Extra) -> {ok, State}.

%% @spec new_state(host(), integer()) -> state() | undefined
%% @doc Returns a state with a single pid if a connection could be established,
%% otherwise returns undefined.
new_state(Host, Port) ->
case new_connection(Host, Port) of
{ok, Pid} ->
#state{host=Host, port=Port, pids=queue:in(Pid, queue:new())};
error -> undefined
end.

%% @spec new_connection(host(), integer()) -> {ok, Pid} | error
%% @doc Returns {ok, Pid} if a new connection was established and added to the
%% supervisor, otherwise returns error.
Expand Down
4 changes: 3 additions & 1 deletion src/riakpool_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ stop(_State) ->
-include_lib("eunit/include/eunit.hrl").

app_test() ->
application:set_env(riakpool, riakpool_host, "localhost"),
application:set_env(riakpool, riakpool_port, 8087),
application:start(riakpool),
riakpool:start_pool(),
Fun = fun(C) -> riakc_pb_socket:ping(C) end,
?assertEqual({ok, pong}, riakpool:execute(Fun)),
?assertEqual(1, riakpool:count()),
application:stop(riakpool).

-endif.

0 comments on commit af09b1b

Please sign in to comment.