Skip to content

Commit

Permalink
Define def @impl
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen committed Feb 26, 2020
1 parent 4ec3b61 commit 392c9ed
Show file tree
Hide file tree
Showing 13 changed files with 50 additions and 7 deletions.
3 changes: 3 additions & 0 deletions lib/quantum/clock_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule Quantum.ClockBroadcaster do
end

@doc false
@impl GenStage
@spec init(opts :: InitOpts.t()) :: {:producer, State.t()}
def init(%InitOpts{debug_logging: debug_logging, start_time: start_time}) do
{:producer,
Expand All @@ -32,6 +33,7 @@ defmodule Quantum.ClockBroadcaster do
end

@doc false
@impl GenStage
def handle_demand(
demand,
%State{remaining_demand: remaining_demand, time: time, timer: nil} = state
Expand Down Expand Up @@ -80,6 +82,7 @@ defmodule Quantum.ClockBroadcaster do
end

@doc false
@impl GenStage
def handle_info(:ping, %State{remaining_demand: 0} = state) do
{:noreply, [], state}
end
Expand Down
2 changes: 2 additions & 0 deletions lib/quantum/execution_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ defmodule Quantum.ExecutionBroadcaster do
end

@doc false
@impl GenStage
def init(%InitOpts{
job_broadcaster_reference: job_broadcaster,
clock_broadcaster_reference: clock_broadcaster,
Expand All @@ -71,6 +72,7 @@ defmodule Quantum.ExecutionBroadcaster do
}, subscribe_to: [job_broadcaster, clock_broadcaster]}
end

@impl GenStage
def handle_events(events, _, state) do
{events, state} =
Enum.reduce(events, {[], state}, fn event, {list, state} ->
Expand Down
1 change: 1 addition & 0 deletions lib/quantum/executor_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule Quantum.ExecutorSupervisor do
end

@doc false
@impl ConsumerSupervisor
def init(
%InitOpts{
node_selector_broadcaster_reference: node_selector_broadcaster
Expand Down
4 changes: 4 additions & 0 deletions lib/quantum/job_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ defmodule Quantum.JobBroadcaster do
end

@doc false
@impl GenStage
def init(%InitOpts{
jobs: jobs,
storage: storage,
Expand Down Expand Up @@ -61,13 +62,15 @@ defmodule Quantum.JobBroadcaster do
end

@doc false
@impl GenStage
def handle_demand(demand, %State{buffer: buffer} = state) do
{to_send, remaining} = Enum.split(buffer, demand)

{:noreply, to_send, %{state | buffer: remaining}}
end

@doc false
@impl GenStage
def handle_cast(
{:add, %Job{state: :active, name: job_name} = job},
%State{jobs: jobs, storage: storage, scheduler: scheduler, debug_logging: debug_logging} =
Expand Down Expand Up @@ -178,6 +181,7 @@ defmodule Quantum.JobBroadcaster do
end

@doc false
@impl GenStage
def handle_call(:jobs, _, %State{jobs: jobs} = state),
do: {:reply, Map.to_list(jobs), [], state}

Expand Down
2 changes: 2 additions & 0 deletions lib/quantum/node_selector_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defmodule Quantum.NodeSelectorBroadcaster do
end

@doc false
@impl GenStage
def init(%InitOpts{
execution_broadcaster_reference: execution_broadcaster,
task_supervisor_reference: task_supervisor_reference
Expand All @@ -44,6 +45,7 @@ defmodule Quantum.NodeSelectorBroadcaster do
}, subscribe_to: [execution_broadcaster]}
end

@impl GenStage
def handle_events(events, _, %{task_supervisor_reference: task_supervisor_reference} = state) do
{:noreply,
Enum.flat_map(events, fn %ExecuteEvent{job: job} ->
Expand Down
1 change: 1 addition & 0 deletions lib/quantum/run_strategy/all.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ defmodule Quantum.RunStrategy.All do

alias Quantum.Job

@impl Quantum.RunStrategy
@spec normalize_config!([Node.t()] | :cluster) :: t
def normalize_config!(nodes) when is_list(nodes) do
%__MODULE__{nodes: Enum.map(nodes, &normalize_node/1)}
Expand Down
1 change: 1 addition & 0 deletions lib/quantum/run_strategy/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule Quantum.RunStrategy.Local do

alias Quantum.Job

@impl Quantum.RunStrategy
@spec normalize_config!(any) :: t
def normalize_config!(_), do: %__MODULE__{}

Expand Down
1 change: 1 addition & 0 deletions lib/quantum/run_strategy/random.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ defmodule Quantum.RunStrategy.Random do

alias Quantum.Job

@impl Quantum.RunStrategy
@spec normalize_config!([Node.t()] | :cluster) :: t
def normalize_config!(nodes) when is_list(nodes) do
%__MODULE__{nodes: Enum.map(nodes, &normalize_node/1)}
Expand Down
15 changes: 13 additions & 2 deletions lib/quantum/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ defmodule Quantum.Scheduler do
@opaque t :: module

defmacro __using__(opts) do
quote bind_quoted: [opts: opts, moduledoc: @moduledoc] do
quote bind_quoted: [behaviour: __MODULE__, opts: opts, moduledoc: @moduledoc] do
@otp_app Keyword.fetch!(opts, :otp_app)
@moduledoc moduledoc
|> String.replace(~r/MyApp\.Scheduler/, Enum.join(Module.split(__MODULE__), "."))
|> String.replace(~r/:my_app/, ":" <> Atom.to_string(@otp_app))

@behaviour Quantum.Scheduler
@behaviour behaviour

@impl behaviour
def config(custom \\ []) do
Quantum.scheduler_config(__MODULE__, @otp_app, custom)
end
Expand All @@ -75,14 +76,17 @@ defmodule Quantum.Scheduler do

defp __timeout__, do: Keyword.fetch!(config(), :timeout)

@impl behaviour
def start_link(opts \\ [name: __MODULE__]) do
Quantum.Supervisor.start_link(__MODULE__, @otp_app, opts)
end

@impl behaviour
def stop(server \\ __MODULE__, timeout \\ 5000) do
Supervisor.stop(server, :normal, timeout)
end

@impl behaviour
def add_job(server \\ __job_broadcaster__(), job)

def add_job(server, %Job{name: name} = job) do
Expand All @@ -99,32 +103,39 @@ defmodule Quantum.Scheduler do
add_job(server, job)
end

@impl behaviour
def new_job(config \\ config()), do: Job.new(config)

@impl behaviour
def deactivate_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.cast(server, {:change_state, name, :inactive})
end

@impl behaviour
def activate_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.cast(server, {:change_state, name, :active})
end

@impl behaviour
def find_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.call(server, {:find_job, name}, __timeout__())
end

@impl behaviour
def delete_job(server \\ __job_broadcaster__(), name)
when is_atom(name) or is_reference(name) do
GenStage.cast(server, {:delete, name})
end

@impl behaviour
def delete_all_jobs(server \\ __job_broadcaster__()) do
GenStage.cast(server, :delete_all)
end

@impl behaviour
def jobs(server \\ __job_broadcaster__()) do
GenStage.call(server, :jobs, __timeout__())
end
Expand Down
13 changes: 13 additions & 0 deletions lib/quantum/storage/noop.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,24 @@ defmodule Quantum.Storage.Noop do

@behaviour Quantum.Storage.Adapter

@impl Quantum.Storage.Adapter
def jobs(_scheduler_module), do: :not_applicable

@impl Quantum.Storage.Adapter
def add_job(_scheduler_module, _job), do: :ok

@impl Quantum.Storage.Adapter
def delete_job(_scheduler_module, _job_name), do: :ok

@impl Quantum.Storage.Adapter
def update_job_state(_scheduler_module, _job_name, _state), do: :ok

@impl Quantum.Storage.Adapter
def last_execution_date(_scheduler_module), do: :unknown

@impl Quantum.Storage.Adapter
def update_last_execution_date(_scheduler_module, _last_execution_date), do: :ok

@impl Quantum.Storage.Adapter
def purge(_scheduler_module), do: :ok
end
3 changes: 1 addition & 2 deletions lib/quantum/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ defmodule Quantum.Supervisor do
Supervisor.start_link(__MODULE__, {quantum, otp_app, opts}, name)
end

## Callbacks

@impl Supervisor
def init({quantum, otp_app, opts}) do
opts = Quantum.runtime_config(quantum, otp_app, opts)
opts = quantum_init(quantum, opts)
Expand Down
5 changes: 5 additions & 0 deletions lib/quantum/task_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,13 @@ defmodule Quantum.TaskRegistry do
end

@doc false
@impl GenServer
def init(%InitOpts{}) do
{:ok, %State{running_tasks: %{}}}
end

@doc false
@impl GenServer
def handle_call({:running, task, node}, _caller, %State{running_tasks: running_tasks} = state) do
if Enum.member?(Map.get(running_tasks, task, []), node) do
{:reply, :already_running, state}
Expand All @@ -87,6 +89,7 @@ defmodule Quantum.TaskRegistry do
end

@doc false
@impl GenServer
def handle_call({:is_running?, task}, _caller, %State{running_tasks: running_tasks} = state) do
case running_tasks do
%{^task => [_ | _]} ->
Expand All @@ -101,6 +104,7 @@ defmodule Quantum.TaskRegistry do
end

@doc false
@impl GenServer
def handle_call(:any_running?, _caller, %State{running_tasks: running_tasks} = state) do
if Enum.empty?(running_tasks) do
{:reply, false, state}
Expand All @@ -110,6 +114,7 @@ defmodule Quantum.TaskRegistry do
end

@doc false
@impl GenServer
def handle_cast({:finished, task, node}, %State{running_tasks: running_tasks} = state) do
running_tasks =
running_tasks
Expand Down
6 changes: 3 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ defmodule Quantum.Mixfile do
{:tzdata, "~> 1.0", only: [:dev, :test]},
{:earmark, "~> 1.0", only: [:dev, :docs], runtime: false},
{:ex_doc, "~> 0.19", only: [:dev, :docs], runtime: false},
{:excoveralls, "~> 0.5", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.0-rc", only: [:dev, :test], runtime: false},
{:credo, "~> 1.0", only: [:dev, :test], runtime: false}
{:excoveralls, "~> 0.5", only: [:test], runtime: false},
{:dialyxir, "~> 1.0-rc", only: [:dev], runtime: false},
{:credo, "~> 1.0", only: [:dev], runtime: false}
]
end
end

0 comments on commit 392c9ed

Please sign in to comment.