Skip to content

Commit

Permalink
Improve tests in preparation for bumping Rails dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
rosa committed Aug 6, 2024
1 parent 845d36e commit d9890bc
Showing 11 changed files with 78 additions and 74 deletions.
5 changes: 3 additions & 2 deletions app/models/solid_queue/semaphore.rb
Original file line number Diff line number Diff line change
@@ -52,7 +52,6 @@ def signal
end

private

attr_accessor :job

def attempt_creation
@@ -63,7 +62,9 @@ def attempt_creation
end
end

def check_limit_or_decrement = limit == 1 ? false : attempt_decrement
def check_limit_or_decrement
limit == 1 ? false : attempt_decrement
end

def attempt_decrement
Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
33 changes: 14 additions & 19 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
@@ -18,10 +18,6 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase

teardown do
terminate_process(@pid) if process_exists?(@pid)

SolidQueue::Job.destroy_all
SolidQueue::Process.destroy_all
SolidQueue::Semaphore.delete_all
end

test "run several conflicting jobs over the same record sequentially" do
@@ -33,8 +29,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
SequentialUpdateResultJob.perform_later(@result, name: name)
end

wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

assert_stored_sequence @result, ("A".."K").to_a
end
@@ -51,7 +47,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
end

wait_for_jobs_to_finish_for(5.seconds)
assert_no_pending_jobs
assert_no_unfinished_jobs

assert_stored_sequence @result, ("A".."K").to_a
end
@@ -78,8 +74,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
end
end

wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# C would have started in the beginning, seeing the status empty, and would finish after
# all other jobs, so it'll do the last update with only itself
@@ -96,7 +92,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
SequentialUpdateResultJob.perform_later(@result, name: name)
end

wait_for_jobs_to_finish_for(3.seconds)
wait_for_jobs_to_finish_for(5.seconds)
assert_equal 3, SolidQueue::FailedExecution.count

assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a
@@ -106,8 +102,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
# Simulate a scenario where we got an available semaphore and some stuck jobs
job = SequentialUpdateResultJob.perform_later(@result, name: "A")

wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? }
# Lock the semaphore so we can enqueue jobs and leave them blocked
@@ -128,8 +124,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
assert SolidQueue::Semaphore.signal(job)

# And wait for the dispatcher to release the jobs
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# We can't ensure the order between B and C, because it depends on which worker wins when
# unblocking, as one will try to unblock B and another C
@@ -139,8 +135,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
test "rely on dispatcher to unblock blocked executions with an expired semaphore" do
# Simulate a scenario where we got an available semaphore and some stuck jobs
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? }
# Lock the semaphore so we can enqueue jobs and leave them blocked
@@ -160,8 +156,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago)

# And wait for dispatcher to release the jobs
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# We can't ensure the order between B and C, because it depends on which worker wins when
# unblocking, as one will try to unblock B and another C
@@ -198,7 +194,6 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
end

private

def assert_stored_sequence(result, *sequences)
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join }
skip_active_record_query_cache do
60 changes: 30 additions & 30 deletions test/integration/forked_processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
@@ -15,17 +15,13 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase

teardown do
terminate_process(@pid) if process_exists?(@pid)

SolidQueue::Process.destroy_all
SolidQueue::Job.destroy_all
JobResult.delete_all
end

test "enqueue jobs in multiple queues" do
6.times { |i| enqueue_store_result_job("job_#{i}") }
6.times { |i| enqueue_store_result_job("job_#{i}", :default) }

wait_for_jobs_to_finish_for(0.5.seconds)
wait_for_jobs_to_finish_for(2.seconds)

assert_equal 12, JobResult.count
6.times { |i| assert_completed_job_results("job_#{i}", :background) }
@@ -63,17 +59,18 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
signal_process(@pid, :TERM, wait: 0.1.second)
end

sleep(0.5.seconds)
sleep(1.second)
assert_clean_termination
end

test "quit supervisor while there are jobs in-flight" do
no_pause = enqueue_store_result_job("no pause")
pause = enqueue_store_result_job("pause", pause: 1.seconds)
pause = enqueue_store_result_job("pause", pause: 1.second)

signal_process(@pid, :QUIT, wait: 0.5.second)
wait_for_jobs_to_finish_for(2.5.seconds)
signal_process(@pid, :QUIT, wait: 0.4.second)
wait_for_jobs_to_finish_for(2.seconds, except: pause)

wait_while_with_timeout(2.seconds) { process_exists?(@pid) }
assert_not process_exists?(@pid)

assert_completed_job_results("no pause")
@@ -91,7 +88,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
pause = enqueue_store_result_job("pause", pause: 0.2.seconds)

signal_process(@pid, :TERM, wait: 0.1.second)
wait_for_jobs_to_finish_for(0.5.seconds)
wait_for_jobs_to_finish_for(2.seconds)

assert_completed_job_results("no pause")
assert_completed_job_results("pause")
@@ -108,7 +105,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
pause = enqueue_store_result_job("pause", pause: 0.2.seconds)

signal_process(@pid, :INT, wait: 0.1.second)
wait_for_jobs_to_finish_for(0.5.seconds)
wait_for_jobs_to_finish_for(2.second)

assert_completed_job_results("no pause")
assert_completed_job_results("pause")
@@ -124,8 +121,9 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
no_pause = enqueue_store_result_job("no pause")
pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second)

signal_process(@pid, :TERM, wait: 0.1.second)
wait_for_jobs_to_finish_for(SolidQueue.shutdown_timeout + 0.1.second)
signal_process(@pid, :TERM, wait: 0.5.second)

sleep(SolidQueue.shutdown_timeout + 0.5.second)

assert_completed_job_results("no pause")
assert_job_status(no_pause, :finished)
@@ -152,12 +150,12 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
2.times { enqueue_store_result_job("no error", :default, pause: 0.01) }
error3 = enqueue_store_result_job("error", :default, exception: RuntimeError)

wait_for_jobs_to_finish_for(0.5.seconds)
wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ])

assert_completed_job_results("no error", :background, 3)
assert_completed_job_results("no error", :default, 4)

assert_failures 3
wait_while_with_timeout(1.second) { SolidQueue::FailedExecution.count < 3 }
[ error1, error2, error3 ].each do |job|
assert_job_status(job, :failed)
end
@@ -177,7 +175,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase

2.times { enqueue_store_result_job("no exit", :background) }

wait_for_jobs_to_finish_for(5.seconds)
wait_for_jobs_to_finish_for(3.seconds, except: [ exit_job, pause_job ])

assert_completed_job_results("no exit", :default, 2)
assert_completed_job_results("no exit", :background, 4)
@@ -208,6 +206,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
assert_nil SolidQueue::Process.find_by(id: worker.id)

# Jobs were completed
wait_for_jobs_to_finish_for(1.second)
assert_completed_job_results("pause", :background)
assert_completed_job_results("pause", :default)

@@ -218,7 +217,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
# And they can process jobs just fine
enqueue_store_result_job("no_pause")
enqueue_store_result_job("no_pause", :default)
wait_for_jobs_to_finish_for(0.2.seconds)
wait_for_jobs_to_finish_for(1.second)

assert_completed_job_results("no_pause", :background)
assert_completed_job_results("no_pause", :default)
@@ -228,19 +227,18 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
end

test "kill worker individually" do
killed_pause = enqueue_store_result_job("killed_pause", pause: 1.seconds)
killed_pause = enqueue_store_result_job("killed_pause", pause: 1.second)
enqueue_store_result_job("pause", :default, pause: 0.5.seconds)

worker = find_processes_registered_as("Worker").detect { |process| process.metadata["queues"].include? "background" }

signal_process(worker.pid, :KILL, wait: 0.3.second)
signal_process(worker.pid, :KILL, wait: 0.5.seconds)

# Worker didn't have time to clean up or finish the work
sleep(0.7.second)
sleep(0.5.second)
assert SolidQueue::Process.exists?(id: worker.id)

# And there's a new worker that has been registered for the background queue
wait_for_registered_processes(4, timeout: 3.second)
wait_for_registered_processes(4, timeout: 5.second)

# The job in the background queue was left claimed as the worker couldn't
# finish orderly
@@ -252,7 +250,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase
# The two current workers can process jobs just fine
enqueue_store_result_job("no_pause")
enqueue_store_result_job("no_pause", :default)
wait_for_jobs_to_finish_for(0.5.seconds)
sleep(2.seconds)

assert_completed_job_results("no_pause", :background)
assert_completed_job_results("no_pause", :default)
@@ -291,11 +289,15 @@ def enqueue_store_result_job(value, queue_name = :background, **options)
end

def assert_completed_job_results(value, queue_name = :background, count = 1)
assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count
skip_active_record_query_cache do
assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count
end
end

def assert_started_job_result(value, queue_name = :background, count = 1)
assert_equal count, JobResult.where(queue_name: queue_name, status: "started", value: value).count
skip_active_record_query_cache do
assert_equal count, JobResult.where(queue_name: queue_name, status: "started", value: value).count
end
end

def assert_job_status(active_job, status)
@@ -311,10 +313,8 @@ def assert_job_status(active_job, status)
end

def assert_no_claimed_jobs
assert SolidQueue::ClaimedExecution.none?
end

def assert_failures(count)
assert_equal count, SolidQueue::FailedExecution.count
skip_active_record_query_cache do
assert SolidQueue::ClaimedExecution.none?
end
end
end
6 changes: 2 additions & 4 deletions test/integration/puma/plugin_testing.rb
Original file line number Diff line number Diff line change
@@ -32,16 +32,14 @@ module PluginTesting
teardown do
terminate_process(@pid, signal: :INT) if process_exists?(@pid)

wait_for_registered_processes 0, timeout: 1.second

JobResult.delete_all
wait_for_registered_processes 0, timeout: 2.seconds
end
end

test "perform jobs inside puma's process" do
StoreResultJob.perform_later(:puma_plugin)

wait_for_jobs_to_finish_for(1.second)
wait_for_jobs_to_finish_for(2.seconds)
assert_equal 1, JobResult.where(queue_name: :background, status: "completed", value: :puma_plugin).count
end

6 changes: 3 additions & 3 deletions test/integration/recurring_tasks_test.rb
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@ class RecurringTasksTest < ActiveSupport::TestCase
end

test "enqueue and process periodic tasks" do
wait_for_jobs_to_be_enqueued(2, timeout: 2.seconds)
wait_for_jobs_to_finish_for(2.seconds)
wait_for_jobs_to_be_enqueued(2, timeout: 2.5.seconds)
wait_for_jobs_to_finish_for(2.5.seconds)

terminate_process(@pid)

@@ -33,7 +33,7 @@ class RecurringTasksTest < ActiveSupport::TestCase
assert_equal "StoreResultJob", job.class_name
end

assert_equal 2, JobResult.count
assert JobResult.count >= 2
JobResult.all.each do |result|
assert_equal "custom_status", result.status
assert_equal "42", result.value
5 changes: 0 additions & 5 deletions test/models/solid_queue/job_test.rb
Original file line number Diff line number Diff line change
@@ -3,11 +3,6 @@
class SolidQueue::JobTest < ActiveSupport::TestCase
self.use_transactional_tests = false

teardown do
SolidQueue::Job.destroy_all
JobResult.delete_all
end

class NonOverlappingJob < ApplicationJob
limits_concurrency key: ->(job_result, **) { job_result }

16 changes: 13 additions & 3 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
@@ -32,14 +32,24 @@ class ActiveSupport::TestCase
if SolidQueue.supervisor_pidfile && File.exist?(SolidQueue.supervisor_pidfile)
File.delete(SolidQueue.supervisor_pidfile)
end

unless self.class.use_transactional_tests
SolidQueue::Job.destroy_all
SolidQueue::Process.destroy_all
SolidQueue::Semaphore.delete_all
SolidQueue::RecurringTask.delete_all
JobResult.delete_all
end
end

private
def wait_for_jobs_to_finish_for(timeout = 1.second)
wait_while_with_timeout(timeout) { SolidQueue::Job.where(finished_at: nil).any? }
def wait_for_jobs_to_finish_for(timeout = 1.second, except: [])
wait_while_with_timeout(timeout) do
SolidQueue::Job.where.not(active_job_id: Array(except).map(&:job_id)).where(finished_at: nil).any?
end
end

def assert_no_pending_jobs
def assert_no_unfinished_jobs
skip_active_record_query_cache do
assert SolidQueue::Job.where(finished_at: nil).none?
end
Loading

0 comments on commit d9890bc

Please sign in to comment.