Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solution: Introduce Clock Broadcaster to make Execution Broadcaster easier #358

Merged
merged 2 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ sudo: false
env:
- MIX_ENV=test
elixir:
- 1.5
- 1.6
- 1.7
otp_release:
- 19.3
- 20.0
script: mix coveralls.travis
cache:
Expand All @@ -19,20 +17,20 @@ jobs:
env:
- MIX_ENV=dev
script: mix format --check-formatted
elixir: 1.6
elixir: 1.7
- stage: credo
env:
- MIX_ENV=dev
script: mix credo --strict
elixir: 1.6
elixir: 1.7
- stage: dialyzer
env:
- MIX_ENV=dev
before_script: travis_wait mix dialyzer --plt
script: mix dialyzer --halt-exit-status
elixir: 1.6
elixir: 1.7
- stage: inch
env:
- MIX_ENV=docs
script: mix inch.report
elixir: 1.6
elixir: 1.7
38 changes: 16 additions & 22 deletions dialyzer.ignore-warnings
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Atom':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.BitString':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Float':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Function':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Integer':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.List':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Map':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.PID':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Port':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Reference':'__impl__'/1
Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Tuple':'__impl__'/1
Overloaded contract for 'Elixir.Quantum.DateLibrary':'to_utc!'/2 has overlapping domains; such contracts are currently unsupported and are simply ignored
lib/quantum/execution_broadcaster.ex:201: The inferred return type of handle_call/3 ({'reply',{'resume',{_,_}},#{'jobs':=_, 'last_execution_date':=_, _=>_}}) has nothing in common with {'noreply',[any()],_} | {'stop',_,_} | {'noreply',[any()],_,'hibernate'} | {'reply',_,[any()],_} | {'stop',_,_,_} | {'reply',_,[any()],_,'hibernate'}, which is the expected return type for the callback of the 'Elixir.GenStage' behaviour
lib/quantum/execution_broadcaster.ex:213: The inferred return type of handle_cast/2 ({'noreply',_}) has nothing in common with {'noreply',[any()],_} | {'stop',_,_} | {'noreply',[any()],_,'hibernate'}, which is the expected return type for the callback of the 'Elixir.GenStage' behaviour
lib/quantum/execution_broadcaster.ex:266: The pattern {'ok', Vdate@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:295: The pattern {'ok', Vdate@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:319: Function add_to_state/3 will never be called
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Atom':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.BitString':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Float':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Function':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Integer':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.List':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Map':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.PID':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Port':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Reference':'__impl__'/1
:0: Unknown function 'Elixir.Quantum.RunStrategy.NodeList.Tuple':'__impl__'/1
lib/quantum/date_library.ex:38: Overloaded contract for 'Elixir.Quantum.DateLibrary':'to_utc!'/2 has overlapping domains; such contracts are currently unsupported and are simply ignored
lib/quantum/execution_broadcaster.ex:262: The pattern {'ok', _date@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:291: The pattern {'ok', _date@1} can never match the type {'error',<<_:368>>}
lib/quantum/execution_broadcaster.ex:319: Function add_to_state/4 will never be called
lib/quantum/execution_broadcaster.ex:327: Function add_job_at_date/3 will never be called
lib/quantum/execution_broadcaster.ex:334: Function find_date_and_put_job/3 will never be called
lib/quantum/executor.ex:23: Overloaded contract for 'Elixir.Quantum.Executor':start_link/2 has overlapping domains; such contracts are currently unsupported and are simply ignored
lib/quantum/util.ex:22: Function 'gen_stage_v12?'/0 has no local return
lib/quantum/util.ex:31: The call 'Elixir.Version':'match?'(binary(),<<_:64>>) will never return since it differs in the 1st argument from the success typing arguments: (#{'__struct__':='Elixir.Version', 'build':='nil' | binary(), 'major':=binary() | non_neg_integer(), 'minor':='nil' | non_neg_integer(), 'patch':='nil' | non_neg_integer(), 'pre':=[binary() | non_neg_integer()]},binary() | #{'__struct__':='Elixir.Version.Requirement', 'compiled':=boolean(), 'matchspec':=[{atom() | tuple(),[any()],[any()]}] | ets:comp_match_spec(), 'source':=_})
lib/quantum/execution_broadcaster.ex:201: The inferred return type of handle_call/3 ({'reply',{'resume',{_,_}},#{'jobs':=_, 'last_execution_date':=_, _=>_}}) has nothing in common with {'noreply',[any()],_} | {'stop',_,_} | {'noreply',[any()],_,'hibernate'} | {'reply',_,[any()],_} | {'stop',_,_,_} | {'reply',_,[any()],_,'hibernate'}, which is the expected return type for the callback of 'Elixir.GenStage' behaviour
lib/quantum/execution_broadcaster.ex:213: The inferred return type of handle_cast/2 ({'noreply',_}) has nothing in common with {'noreply',[any()],_} | {'stop',_,_} | {'noreply',[any()],_,'hibernate'}, which is the expected return type for the callback of 'Elixir.GenStage' behaviour
lib/quantum/executor.ex:29: Overloaded contract for 'Elixir.Quantum.Executor':start_link/2 has overlapping domains; such contracts are currently unsupported and are simply ignored
19 changes: 17 additions & 2 deletions lib/quantum.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ defmodule Quantum do

# Default Job Broadcaster Name
global = Keyword.fetch!(config, :global)

job_broadcaster = cluster_worker_config(Module.concat(quantum, JobBroadcaster), global)

clock_broadcaster = cluster_worker_config(Module.concat(quantum, ClockBroadcaster), global)

execution_broadcaster =
cluster_worker_config(Module.concat(quantum, ExecutionBroadcaster), global)

Expand All @@ -45,20 +48,32 @@ defmodule Quantum do

config
|> Keyword.put_new(:quantum, quantum)
|> Keyword.put_new(:scheduler, quantum)
|> update_in([:schedule], &Normalizer.normalize_schedule/1)
|> Keyword.put_new(:clock_broadcaster, clock_broadcaster)
|> Keyword.put_new(:clock_broadcaster_opts, supervisor_opts(clock_broadcaster, global))
|> Keyword.put_new(:job_broadcaster, job_broadcaster)
|> Keyword.put_new(:job_broadcaster_opts, supervisor_opts(job_broadcaster, global))
|> Keyword.put_new(:execution_broadcaster, execution_broadcaster)
|> Keyword.put_new(
:execution_broadcaster_opts,
supervisor_opts(execution_broadcaster, global)
)
|> Keyword.put_new(:executor_supervisor, executor_supervisor)
|> Keyword.put_new(:task_registry, task_registry)
|> Keyword.put_new(:task_registry_opts, supervisor_opts(task_registry, global))
|> Keyword.put_new(:task_supervisor, task_supervisor)
|> Keyword.put_new(:cluster_task_supervisor_registry, cluster_task_supervisor_registry)
|> Keyword.put_new(:storage, Noop)
end

defp cluster_worker_config(module, false), do: [name: module, restart: :permanent]
defp cluster_worker_config(module, false), do: module

defp cluster_worker_config(module, true),
do: [name: {:via, :swarm, module}, restart: :temporary]
do: {:via, :swarm, module}

defp supervisor_opts(module, false), do: [name: module, restart: :permanent]
defp supervisor_opts(module, true), do: [name: module, restart: :temporary]

@doc """
Retrieves the comprehensive runtime configuration.
Expand Down
140 changes: 140 additions & 0 deletions lib/quantum/clock_broadcaster.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
defmodule Quantum.ClockBroadcaster do
@moduledoc false

# Broadcasts the time to run jobs for

use GenStage

require Logger

alias Quantum.ClockBroadcaster.Event
alias Quantum.ClockBroadcaster.InitOpts
alias Quantum.ClockBroadcaster.State

@doc false
@spec start_link(opts :: Keyword.t()) :: GenServer.on_start()
def start_link(opts) do
GenStage.start_link(
__MODULE__,
struct!(InitOpts, Keyword.take(opts, InitOpts.fields())),
name: Keyword.fetch!(opts, :name)
)
end

@doc false
@spec init(opts :: InitOpts.t()) :: {:producer, State.t()}
def init(%InitOpts{debug_logging: debug_logging, start_time: start_time}) do
{:producer,
%State{
time: %{start_time | microsecond: {0, 0}},
debug_logging: debug_logging,
remaining_demand: 0,
timer: nil
}}
end

@doc false
def handle_demand(
demand,
%State{remaining_demand: remaining_demand, time: time, timer: nil} = state
)
when demand > 0 do
expected_event_count = demand + remaining_demand

now = NaiveDateTime.utc_now()

{events, new_time} =
Enum.reduce_while(
1..expected_event_count,
{[], time},
fn _, {list, time} = acc ->
new_time = NaiveDateTime.add(time, 1, :second)

case NaiveDateTime.compare(new_time, now) do
:lt ->
{:cont, {[%Event{time: new_time, catch_up: true} | list], new_time}}

_ ->
{:halt, acc}
end
end
)

new_remaining_demand = expected_event_count - Enum.count(events)

if remaining_demand > 0 and new_remaining_demand == 0 do
log_catched_up(state)
end

new_timer =
if new_remaining_demand > 0 do
schedule_next_event_timer(new_time, now)
end

{:noreply, events,
%{state | time: new_time, remaining_demand: new_remaining_demand, timer: new_timer}}
end

@doc false
def handle_demand(demand, %State{timer: timer} = state) do
Process.cancel_timer(timer)
handle_demand(demand, %{state | timer: nil})
end

@doc false
def handle_info(:ping, %State{remaining_demand: 0} = state) do
{:noreply, [], state}
end

@doc false
def handle_info(:ping, %State{time: time, remaining_demand: remaining_demand} = state)
when remaining_demand > 0 do
now = NaiveDateTime.utc_now()
new_time = NaiveDateTime.add(time, 1, :second)

case NaiveDateTime.compare(new_time, now) do
:lt ->
timer = schedule_next_event_timer(new_time, now)

{:noreply, [%Event{time: new_time, catch_up: false}],
%{state | time: new_time, timer: timer}}

_ ->
warn_event_too_early()

timer = schedule_next_event_timer(time, now)

{:noreply, [], %{state | timer: timer}}
end
end

defp schedule_next_event_timer(time, now) do
next_event_diff =
%{time | microsecond: {0, 0}}
|> NaiveDateTime.add(1, :second)
|> NaiveDateTime.diff(now, :millisecond)

next_event_diff =
if next_event_diff < 0 do
0
else
next_event_diff
end

Process.send_after(self(), :ping, next_event_diff)
end

defp log_catched_up(%State{debug_logging: false}), do: :ok

defp log_catched_up(%State{debug_logging: true}),
do:
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Clock Producer catched up with past times and is now running in normal time"
end)

defp warn_event_too_early,
do:
Logger.warn(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Clock Producer received a too early ping event, rescheduling"
end)
end
14 changes: 14 additions & 0 deletions lib/quantum/clock_broadcaster/event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Quantum.ClockBroadcaster.Event do
@moduledoc false

# Clock Event

@type t :: %__MODULE__{
time: NaiveDateTime.t(),
catch_up: boolean()
}

@enforce_keys [:time, :catch_up]

defstruct @enforce_keys
end
16 changes: 16 additions & 0 deletions lib/quantum/clock_broadcaster/init_opts.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Quantum.ClockBroadcaster.InitOpts do
@moduledoc false

# Init Options

@type t :: %__MODULE__{
start_time: NaiveDateTime.t(),
debug_logging: boolean()
}

@enforce_keys [:start_time, :debug_logging]

defstruct @enforce_keys

def fields, do: @enforce_keys
end
17 changes: 17 additions & 0 deletions lib/quantum/clock_broadcaster/state.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Quantum.ClockBroadcaster.State do
@moduledoc false

# Internal State

@type t :: %__MODULE__{
debug_logging: boolean(),
time: NaiveDateTime.t(),
# catch_up: boolean(),
remaining_demand: non_neg_integer,
timer: reference | nil
}

@enforce_keys [:debug_logging, :time, :remaining_demand, :timer]

defstruct @enforce_keys
end
Loading