Skip to content

Commit

Permalink
Add test and extract fun to separate module
Browse files Browse the repository at this point in the history
  • Loading branch information
define-null committed Apr 23, 2019
1 parent 3fc1fe2 commit 3f3a0fa
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 70 deletions.
82 changes: 15 additions & 67 deletions src/elvis_core.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

-export([start/0]).

%% for internal use only
-export([do_rock/2]).

-define(APP_NAME, "elvis").

-type source_filename() :: nonempty_string().
Expand Down Expand Up @@ -86,75 +89,20 @@ do_parallel_rock(Config0) ->
Config = elvis_config:resolve_files(Config0),
Files = elvis_config:files(Config),

Results = do_parallel_rock0(Config, Files, Parallel),
{ok, Results} =
elvis_task:chunk_fold({?MODULE, do_rock},
fun(Elem, Acc) ->
elvis_result:print_results(Elem),
{ok, [Elem | Acc]}
end,
[], [Config], Files, Parallel),
elvis_result_status(Results).

do_parallel_rock0(Config, Files, N) ->
do_parallel_rock1(Config, Files, N, N, [], []).

do_parallel_rock1(_Config, [], _MaxW, _RemainW, AccR, AccG) ->
gather_all_results(AccR, AccG);
do_parallel_rock1(Config, FilesList, MaxW, 0, AccR, AccG) ->
{AccR1, AccG1, N} = gather_results(AccR, AccG),
do_parallel_rock1(Config, FilesList, MaxW, erlang:min(N, MaxW), AccR1, AccG1);
do_parallel_rock1(Config, FilesList, MaxW, RemainW, AccR, AccG) ->
{WorkToBeDone, FilesRemain} =
try lists:split(RemainW, FilesList) of
Res -> Res
catch error:badarg -> {FilesList, []}
end,

Gather = [do_rock_worker(Config, File) || File <- WorkToBeDone],
do_parallel_rock1(Config, FilesRemain, MaxW, 0, AccR, Gather ++ AccG).

do_rock_worker(Config, #{path := Path} = File) ->
Parent = self(),
Key = spawn_monitor(fun() -> do_rock(Parent, Config, File) end),
{Key, Path}.

-spec do_rock(pid(), elvis_config:config(), elvis_result:file()) -> no_return().
do_rock(Parent, Config, File) ->
try
LoadedFile = load_file_data(Config, File),
apply_rules(Config, LoadedFile)
of
Results ->
exit({Parent, {ok, Results}})
catch T:E ->
exit({Parent, {error, {T,E}}})
end.

gather_all_results(AccR, Remain) ->
{AccR1, _, _} = gather_results0(AccR, Remain, 0, infinity),
AccR1.

gather_results(AccR, AccG) ->
{Key, Res0} = gather(infinity),
gather_results0([Res0 | AccR], lists:keydelete(Key, 1, AccG), 1, 0).

gather_results0(AccR, [], N, _Timeout) ->
{AccR, [], N};
gather_results0(AccR, AccG, N, Timeout) ->
case gather(Timeout) of
timeout -> {AccR, AccG, N};
{Key, Res0} ->
gather_results0([Res0 | AccR], lists:keydelete(Key, 1, AccG), N + 1, Timeout)
end.

gather(Timeout) ->
Self = self(),
receive
{'DOWN', MonRef, process, Pid, {Self, Res}} ->
case Res of
{ok, Res0} ->
elvis_result:print_results(Res0),
{{Pid, MonRef}, Res0};
{error, {T,E}} ->
erlang:T(E)
end
after Timeout ->
timeout
end.
-spec do_rock(elvis_result:file(), elvis_config:config()) -> {ok, elvis_result:file()}.
do_rock(File, Config) ->
LoadedFile = load_file_data(Config, File),
Results = apply_rules(Config, LoadedFile),
{ok, Results}.

%% @private
-spec load_file_data(map() | [map()], elvis_file:file()) -> elvis_file:file().
Expand Down
115 changes: 115 additions & 0 deletions src/elvis_task.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
-module(elvis_task).

-export([chunk_fold/6]).

%% @doc chunk_fold evaluates apply(Module, Function, [Elem|ExtrArgs]) for
%% every element Elem in JobItemList in parallel with max concurrcy factor
%% equal to Concurrency. On succesfull evaluation FunAcc function is called
%% with the result of succesfull execution as a first parametr and accumulator
%% as a second parametr.
-spec chunk_fold(FunWork :: {Module :: module(), Function :: atom()},
FunAcc :: fun((NewElem :: term(), Acc :: term()) ->
Acc :: term()),
InitialAcc :: term(),
ExtraArgs :: list(),
JonItemList :: list(),
Concurrency :: non_neg_integer()) ->
{ok, FinalAcc :: term()} | {error, term()}.
chunk_fold({M,F} = FunWork, FunAcc, InitialAcc, ExtraArgs, List, ChunkSize)
when is_atom(M), is_atom(F),
is_function(FunAcc, 2),
is_list(ExtraArgs), is_list(List),
is_integer(ChunkSize) andalso ChunkSize > 0
->
try
Term = do_in_parallel(FunWork, FunAcc, ExtraArgs, List,
_MaxW = ChunkSize, _RemainW = ChunkSize,
InitialAcc, []),
{ok, Term}
catch throw:{T,E} ->
{error, {T, E}}
end.

do_in_parallel(_FunWork, FunAcc, _ExtraArgs, [], _MaxW, _RemainW, AccR, AccG) ->
gather_all_results(FunAcc, AccR, AccG);
do_in_parallel(FunWork, FunAcc, ExtraArgs, List, MaxW, 0, AccR, AccG) ->
{AccR1, AccG1, N} = gather_results(FunAcc, AccR, AccG),
do_in_parallel(FunWork, FunAcc, ExtraArgs, List,
MaxW, erlang:min(N, MaxW), AccR1, AccG1);
do_in_parallel(FunWork, FunAcc, ExtraArgs, List, MaxW, RemainW, AccR, AccG) ->
{WorkToBeDone, WorkRemain} =
try lists:split(RemainW, List) of
Res -> Res
catch error:badarg -> {List, []}
end,

WrkRefs = [start_worker(FunWork, ExtraArgs, WorkPiece)
|| WorkPiece <- WorkToBeDone],
do_in_parallel(FunWork, FunAcc, ExtraArgs, WorkRemain, MaxW, 0,
AccR, WrkRefs ++ AccG).

start_worker(FunWork, ExtraArgs, Arg) ->
Parent = self(),
Key = spawn_monitor(fun() -> do_work(Parent, FunWork, ExtraArgs, Arg) end),
Key.

do_work(Parent, {M,F}, ExtraArgs, Arg) ->
try erlang:apply(M, F, [Arg | ExtraArgs]) of
{ok, Results} ->
exit({Parent, {ok, Results}});
Unexpected ->
Error = {error, {badreturn, Unexpected}},
exit({Parent, {error, Error}})
catch T:E ->
exit({Parent, {error, {T,E}}})
end.

gather_all_results(AccF, AccR, Remain) ->
{AccR1, _, _} = gather_results0(AccF, AccR, Remain, 0, infinity),
AccR1.

gather_results(AccF, AccR, AccG) ->
{AccG1, Res} = gather(infinity, AccG),
AccR1 = accumulate(AccF, AccR, Res, AccG1),
gather_results0(AccF, AccR1, AccG1, 1, 0).

gather_results0(_AccF, AccR, [], N, _Timeout) ->
{AccR, [], N};
gather_results0(AccF, AccR, AccG, N, Timeout) ->
case gather(Timeout, AccG) of
timeout -> {AccR, AccG, N};
{AccG1, Res} ->
AccR1 = accumulate(AccF, AccR, Res, AccG1),
gather_results0(AccF, AccR1, AccG1, N + 1, Timeout)
end.

accumulate(AccF, AccR, Res, AccG) ->
try
{ok, AccR1} = AccF(Res, AccR),
AccR1
catch T:E ->
cleanup(AccG),
throw({T,E})
end.

cleanup(AccG) ->
[ begin
erlang:demonitor(MRef, [flush]),
erlang:exit(Pid, kill)
end || {Pid, MRef} <- AccG ].

gather(Timeout, AccG) ->
Self = self(),
receive
{'DOWN', _MonRef, process, Pid, {Self, Res}} ->
AccG1 = lists:keydelete(Pid, 1, AccG),
case Res of
{ok, Res0} ->
{AccG1, Res0};
{error, {T,E}} ->
cleanup(AccG1),
throw({T,E})
end
after Timeout ->
timeout
end.
31 changes: 28 additions & 3 deletions test/elvis_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
-export([
all/0,
init_per_suite/1,
end_per_suite/1
end_per_suite/1,
chunk_fold_task/2
]).

-export([
Expand Down Expand Up @@ -31,7 +32,8 @@
find_file_and_check_src/1,
find_file_with_ignore/1,
invalid_file/1,
to_string/1
to_string/1,
chunk_fold/1
]).

-define(EXCLUDED_FUNS,
Expand All @@ -40,7 +42,8 @@
all,
test,
init_per_suite,
end_per_suite
end_per_suite,
chunk_fold_task
]).

-type config() :: [{atom(), term()}].
Expand Down Expand Up @@ -422,6 +425,28 @@ to_string(_Config) ->
"hello" = elvis_utils:to_str(<<"hello">>),
"atom" = elvis_utils:to_str(atom).

-spec chunk_fold(config()) -> any().
chunk_fold(_Config) ->
Multiplier = 10,
List = lists:seq(1,10),
{ok, Value} = elvis_task:chunk_fold({?MODULE, chunk_fold_task},
fun(Elem, Acc) ->
{ok, Acc + Elem}
end, 0, [Multiplier], lists:seq(1,10), 10),
Value = lists:sum(
lists:map(fun(E) -> E * Multiplier end, List)),

{error, {error, undef}} =
elvis_task:chunk_fold({?MODULE, chunk_fold_task_do_not_exist},
fun(Elem, Acc) ->
{ok, Acc + Elem}
end, 0, [Multiplier], lists:seq(1,10), 10).


-spec chunk_fold_task(integer(), integer()) -> {ok, integer()}.
chunk_fold_task(Elem, Miltiplier) ->
{ok, Elem * Miltiplier}.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Private
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
Expand Down

0 comments on commit 3f3a0fa

Please sign in to comment.