Skip to content

Commit

Permalink
Queue and consumer supervision tree for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewDryga committed Jan 25, 2017
1 parent c42ff38 commit 0315fc0
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 3 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ Supervisor gets restarted, but it **won't receive receive any jobs** resulting i

This option is not safe by-default, because process that doesn't trap exits will not call this callback when supervisor is sending exit signal to it (due to supervisor restart).

2. Linking RabbitMQ client lib channel/connection processes to a workers.

May be a bad solution because all jobs will be re-scheduled whenever a single job fails, resulting in a many duplicate-processed jobs.

2. Store tags in a separate process which monitors supervisor and it's workers.

3. Keep storing tags and job payload within GenStage state, but wrap any unsafe code in a [`Task`](https://hexdocs.pm/elixir/Task.html). [[1]](https://github.com/elixir-lang/gen_stage/issues/131#issuecomment-265758380)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule GenTask.Mixfile do
[app: :gen_task,
description: "Generic Task behavior that helps to encapsulate worker errors and recover from them in " <>
"classic GenStage's.",
package: package,
package: package(),
version: @version,
elixir: "~> 1.3",
elixirc_paths: elixirc_paths(Mix.env),
Expand Down
17 changes: 17 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
%{"bunt": {:hex, :bunt, "0.1.6", "5d95a6882f73f3b9969fdfd1953798046664e6f77ec4e486e6fafc7caad97c6f", [:mix], []},
"certifi": {:hex, :certifi, "0.7.0", "861a57f3808f7eb0c2d1802afeaae0fa5de813b0df0979153cbafcd853ababaf", [:rebar3], []},
"confex": {:hex, :confex, "1.4.3", "3339202ef24fbdab0257dbce0cb0664eb44dd1e1040c4d8d71f14ee1e3a19e21", [:mix], []},
"credo": {:hex, :credo, "0.5.3", "0c405b36e7651245a8ed63c09e2d52c2e2b89b6d02b1570c4d611e0fcbecf4a2", [:mix], [{:bunt, "~> 0.1.6", [hex: :bunt, optional: false]}]},
"distillery": {:hex, :distillery, "0.10.1", "14fccade4b8ab849b99e21c4bdfaa1092dbacdce8afd33f5c369c6e114385b0e", [:mix], []},
"dogma": {:hex, :dogma, "0.1.13", "7b6c6ad2b3ee6501eda3bd39e197dd5198be8d520d1c175c7f713803683cf27a", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, optional: false]}]},
"earmark": {:hex, :earmark, "1.0.3", "89bdbaf2aca8bbb5c97d8b3b55c5dd0cff517ecc78d417e87f1d0982e514557b", [:mix], []},
"ex_doc": {:hex, :ex_doc, "0.14.5", "c0433c8117e948404d93ca69411dd575ec6be39b47802e81ca8d91017a0cf83c", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]},
"excoveralls": {:hex, :excoveralls, "0.6.1", "9e946b6db84dba592f47632157ecd135a46384b98a430fd16007dc910c70348b", [:mix], [{:exjsx, "~> 3.0", [hex: :exjsx, optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, optional: false]}]},
"exjsx": {:hex, :exjsx, "3.2.1", "1bc5bf1e4fd249104178f0885030bcd75a4526f4d2a1e976f4b428d347614f0f", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, optional: false]}]},
"hackney": {:hex, :hackney, "1.6.5", "8c025ee397ac94a184b0743c73b33b96465e85f90a02e210e86df6cbafaa5065", [:rebar3], [{:certifi, "0.7.0", [hex: :certifi, optional: false]}, {:idna, "1.2.0", [hex: :idna, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, optional: false]}]},
"idna": {:hex, :idna, "1.2.0", "ac62ee99da068f43c50dc69acf700e03a62a348360126260e87f2b54eced86b2", [:rebar3], []},
"jsx": {:hex, :jsx, "2.8.1", "1453b4eb3615acb3e2cd0a105d27e6761e2ed2e501ac0b390f5bbec497669846", [:mix, :rebar3], []},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], []},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [:make, :rebar], []}}
16 changes: 16 additions & 0 deletions test/support/test_app_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule TestAppSupervisor do
use Supervisor

def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

def init(_) do
children = [
supervisor(TestWorkerSupervisor, []),
supervisor(TestConsumer, [])
]

supervise(children, strategy: :one_for_one)
end
end
12 changes: 12 additions & 0 deletions test/support/test_consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule TestConsumer do
use GenServer

def start_link do
GenServer.start_link(__MODULE__, [], name: TestConsumer)
end

def handle_info({:job, job}, state) do
{:ok, _pid} = TestWorkerSupervisor.start_worker(job)
{:noreply, state}
end
end
119 changes: 119 additions & 0 deletions test/support/test_queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
defmodule TestQueue do
use GenServer
require Logger

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(opts) do
jobs = 1..100
|> Enum.map(&generate_job/1)

{:ok, %{opts: opts, workers: [], jobs: jobs}}
end

# Client

def attach_observer(observer_pid) do
GenServer.call(__MODULE__, {:attach_observer, observer_pid})
end

def subscribe(worker_pid) do
GenServer.cast(__MODULE__, {:subscribe, worker_pid})
end

def ack(tag) do
GenServer.cast(__MODULE__, {:ack, tag})
end

def nack(tag) do
GenServer.cast(__MODULE__, {:nack, tag})
end

# Server

def handle_call({:attach_observer, pid}, _from, state) do
{:reply, :ok, %{state | opts: state.opts ++ [observer_pid: pid]}}
end

def handle_cast({:subscribe, pid}, %{workers: workers} = state) do
state = %{state | workers: workers ++ [%{pid: pid, active_jobs: 0}]}

{:noreply, dispatch_jobs(state)}
end

def handle_cast({:ack, tag}, %{jobs: jobs, workers: workers} = state) do
job_index = Enum.find_index(jobs, fn %{tag: job_tag} -> job_tag == tag end)
job = Enum.at(jobs, job_index)

worker_index = Enum.find_index(workers, fn %{pid: worker_pid} -> worker_pid == job.worker end)
worker = Enum.at(workers, worker_index)

jobs = List.replace_at(jobs, job_index, %{job | state: :acked, worker: nil})
workers = List.replace_at(workers, worker_index, %{worker | active_jobs: worker.active_jobs - 1})

{:noreply, dispatch_jobs(%{state | workers: workers, jobs: jobs})}
end

def handle_cast({:nack, tag}, %{jobs: jobs, workers: workers} = state) do
job_index = Enum.find_index(jobs, fn %{tag: job_tag} -> job_tag == tag end)
job = Enum.at(jobs, job_index)

worker_index = Enum.find_index(workers, fn %{pid: worker_pid} -> worker_pid == job.worker end)
worker = Enum.at(workers, worker_index)

jobs = List.replace_at(jobs, job_index, %{job | state: :nacked, worker: nil})
workers = List.replace_at(workers, worker_index, %{worker | active_jobs: worker.active_jobs - 1})

{:noreply, dispatch_jobs(%{state | workers: workers, jobs: jobs})}
end

# Helpers

defp generate_job(tag) do
%{payload: :rand.uniform(100_000), tag: tag, worker: nil, state: :undelivered}
end

defp dispatch_jobs(%{opts: opts, workers: workers, jobs: jobs}) do
{jobs, workers, undelivered_count} = jobs
|> Enum.reduce({[], workers, 0}, fn
%{state: :undelivered} = job, {jobs, workers, undelivered_count} ->
{job, workers} = dispatch_job(job, workers, opts)
{jobs ++ [job], workers, undelivered_count + 1}

job, {jobs, workers, undelivered_count} ->
{jobs ++ [job], workers, undelivered_count}
end)

if(opts[:observer_pid]) do
send(opts[:observer_pid], {:undelivered_jobs, undelivered_count})
end

%{opts: opts, workers: workers, jobs: jobs}
end

defp dispatch_job(job, workers, opts) do
index = Enum.find_index(workers, fn %{active_jobs: active_jobs} -> active_jobs < opts[:prefetch_count] end)

case index do
nil ->
{job, workers}

index ->
worker = Enum.at(workers, index)
{job, worker} = send_job(job, worker)
workers = List.replace_at(workers, index, worker)
{job, workers}
end
end

defp send_job(%{payload: payload, tag: tag} = job, %{pid: pid, active_jobs: active_jobs} = worker) do
send(pid, {:job, %{payload: payload, tag: tag}})

{
%{job | worker: pid, state: :unacked},
%{worker | active_jobs: active_jobs + 1}
}
end
end
45 changes: 45 additions & 0 deletions test/support/test_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
defmodule TestWorker do
use GenServer
require Logger

def start_link(%{} = job) do
GenServer.start_link(__MODULE__, job)
end

def init(state) do
{:ok, state, 100}
end

def handle_info(:timeout, %{payload: payload, tag: tag}) do
{:ok, res} = Task.async(fn ->
payload
|> process(tag)
end)
|> Task.yield() # TODO: add timeout

res
|> send_ack(tag)

{:stop, :normal, []}
end

# # Simulated errors
# defp process(_payload, tag) when rem(tag, 2) == 0 do
# throw "Error!"
# end

# Success jobs
defp process(_payload, tag) do
Logger.info("Processed job ##{tag}")
:timer.sleep(100)
:ok
end

defp send_ack(:ok, tag) do
TestQueue.ack(tag)
end

defp send_ack(:error, tag) do
TestQueue.nack(tag)
end
end
21 changes: 21 additions & 0 deletions test/support/test_worker_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule TestWorkerSupervisor do
use Supervisor

def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

def start_worker(job) do
{:ok, pid} = Supervisor.start_child(__MODULE__, [job])
# :sys.trace(pid, true)
{:ok, pid}
end

def init(_) do
children = [
worker(TestWorker, [], restart: :transient)
]

supervise(children, strategy: :simple_one_for_one)
end
end
14 changes: 12 additions & 2 deletions test/unit/gen_task_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@ defmodule GenTaskTest do
use ExUnit.Case
doctest GenTask

test "the truth" do
assert 1 + 1 == 2
setup_all do
{:ok, pid} = TestQueue.start_link([prefetch_count: 5])
{:ok, _supervisor_pid} = TestAppSupervisor.start_link()
# :sys.trace(pid, true)

{:ok, %{pid: pid}}
end

test "process jobs" do
TestQueue.attach_observer(self())
TestQueue.subscribe(TestConsumer)
assert_receive {:undelivered_jobs, 0}, 5_000
end
end

0 comments on commit 0315fc0

Please sign in to comment.