diff --git a/UPGRADING.md b/UPGRADING.md index dde0e8f4..84b760a1 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -1,3 +1,19 @@ +# Upgrading to version 0.5.x +This version includes a new migration to improve recurring tasks. To install it, just run: + +```bash +$ bin/rails solid_queue:install:migrations +``` + +Or, if you're using a different database for Solid Queue: + +```bash +$ bin/rails solid_queue:install:migrations DATABASE= +``` + +And then run the migrations. + + # Upgrading to version 0.4.x This version introduced an _async_ mode to run the supervisor and have all workers and dispatchers run as part of the same process as the supervisor, instead of separate, forked, processes. Together with this, we introduced some changes in how the supervisor is started. Prior this change, you could choose whether you wanted to run workers, dispatchers or both, by starting Solid Queue as `solid_queue:work` or `solid_queue:dispatch`. From version 0.4.0, the only option available is: @@ -26,7 +42,6 @@ the supervisor will run 1 dispatcher and no workers. # Upgrading to version 0.3.x - This version introduced support for [recurring (cron-style) jobs](https://github.com/rails/solid_queue/blob/main/README.md#recurring-tasks), and it needs a new DB migration for it. To install it, just run: ```bash diff --git a/app/models/solid_queue/recurring_execution.rb b/app/models/solid_queue/recurring_execution.rb index d801cd65..34160003 100644 --- a/app/models/solid_queue/recurring_execution.rb +++ b/app/models/solid_queue/recurring_execution.rb @@ -7,16 +7,29 @@ class AlreadyRecorded < StandardError; end scope :clearable, -> { where.missing(:job) } class << self + def create_or_insert!(**attributes) + if connection.supports_insert_conflict_target? + # PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict + # during two concurrent INSERTs for the same value of an unique index. We need to explicitly + # indicate unique_by to ignore duplicate rows by this value when inserting + unless insert(attributes, unique_by: [ :task_key, :run_at ]).any? + raise AlreadyRecorded + end + else + create!(**attributes) + end + rescue ActiveRecord::RecordNotUnique + raise AlreadyRecorded + end + def record(task_key, run_at, &block) transaction do block.call.tap do |active_job| if active_job - create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at) + create_or_insert!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at) end end end - rescue ActiveRecord::RecordNotUnique => e - raise AlreadyRecorded end def clear_in_batches(batch_size: 500) diff --git a/lib/solid_queue/dispatcher/recurring_task.rb b/app/models/solid_queue/recurring_task.rb similarity index 57% rename from lib/solid_queue/dispatcher/recurring_task.rb rename to app/models/solid_queue/recurring_task.rb index 7878e6c7..b302af70 100644 --- a/lib/solid_queue/dispatcher/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -1,24 +1,35 @@ +# frozen_string_literal: true + require "fugit" module SolidQueue - class Dispatcher::RecurringTask + class RecurringTask < Record + serialize :arguments, coder: Arguments, default: [] + + validate :supported_schedule + validate :existing_job_class + + scope :static, -> { where(static: true) } + class << self def wrap(args) args.is_a?(self) ? args : from_configuration(args.first, **args.second) end def from_configuration(key, **options) - new(key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args]) + new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args]) end - end - attr_reader :key, :schedule, :class_name, :arguments - - def initialize(key, class_name:, schedule:, arguments: nil) - @key = key - @class_name = class_name - @schedule = schedule - @arguments = Array(arguments) + def create_or_update_all(tasks) + if connection.supports_insert_conflict_target? + # PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict + # during two concurrent INSERTs for the same value of an unique index. We need to explicitly + # indicate unique_by to ignore duplicate rows by this value when inserting + upsert_all tasks.map(&:attributes_for_upsert), unique_by: :key + else + upsert_all tasks.map(&:attributes_for_upsert) + end + end end def delay_from_now @@ -51,23 +62,27 @@ def enqueue(at:) end end - def valid? - parsed_schedule.instance_of?(Fugit::Cron) - end - def to_s "#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{parsed_schedule.original} ]" end - def to_h - { - schedule: schedule, - class_name: class_name, - arguments: arguments - } + def attributes_for_upsert + attributes.without("id", "created_at", "updated_at") end private + def supported_schedule + unless parsed_schedule.instance_of?(Fugit::Cron) + errors.add :schedule, :unsupported, message: "is not a supported recurring schedule" + end + end + + def existing_job_class + unless job_class.present? + errors.add :class_name, :undefined, message: "doesn't correspond to an existing class" + end + end + def using_solid_queue_adapter? job_class.queue_adapter_name.inquiry.solid_queue? end @@ -88,12 +103,13 @@ def arguments_with_kwargs end end + def parsed_schedule @parsed_schedule ||= Fugit.parse(schedule) end def job_class - @job_class ||= class_name.safe_constantize + @job_class ||= class_name&.safe_constantize end end end diff --git a/app/models/solid_queue/recurring_task/arguments.rb b/app/models/solid_queue/recurring_task/arguments.rb new file mode 100644 index 00000000..229f43df --- /dev/null +++ b/app/models/solid_queue/recurring_task/arguments.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +require "active_job/arguments" + +module SolidQueue + class RecurringTask::Arguments + class << self + def load(data) + data.nil? ? [] : ActiveJob::Arguments.deserialize(ActiveSupport::JSON.load(data)) + end + + def dump(data) + ActiveSupport::JSON.dump(ActiveJob::Arguments.serialize(Array(data))) + end + end + end +end diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 80f53fd9..62eeb035 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -52,7 +52,6 @@ def signal end private - attr_accessor :job def attempt_creation @@ -63,7 +62,9 @@ def attempt_creation end end - def check_limit_or_decrement = limit == 1 ? false : attempt_decrement + def check_limit_or_decrement + limit == 1 ? false : attempt_decrement + end def attempt_decrement Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0 diff --git a/db/migrate/20240719134516_create_recurring_tasks.rb b/db/migrate/20240719134516_create_recurring_tasks.rb new file mode 100644 index 00000000..6b2ff000 --- /dev/null +++ b/db/migrate/20240719134516_create_recurring_tasks.rb @@ -0,0 +1,20 @@ +class CreateRecurringTasks < ActiveRecord::Migration[7.1] + def change + create_table :solid_queue_recurring_tasks do |t| + t.string :key, null: false, index: { unique: true } + t.string :schedule, null: false + t.string :command, limit: 2048 + t.string :class_name + t.text :arguments + + t.string :queue_name + t.integer :priority, default: 0 + + t.boolean :static, default: true, index: true + + t.text :description + + t.timestamps + end + end +end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 00b3b0f6..6eb85473 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -75,7 +75,7 @@ def options_from_raw_config(key, defaults) def parse_recurring_tasks(tasks) Array(tasks).map do |id, options| - Dispatcher::RecurringTask.from_configuration(id, **options) + RecurringTask.from_configuration(id, **options) end.select(&:valid?) end diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index 00f80f08..41c505ba 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -4,8 +4,8 @@ module SolidQueue class Dispatcher < Processes::Poller attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule - after_boot :start_concurrency_maintenance, :load_recurring_schedule - before_shutdown :stop_concurrency_maintenance, :unload_recurring_schedule + after_boot :start_concurrency_maintenance, :schedule_recurring_tasks + before_shutdown :stop_concurrency_maintenance, :unschedule_recurring_tasks def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS) @@ -19,7 +19,7 @@ def initialize(**options) end def metadata - super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence) + super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.task_keys.presence) end private @@ -38,16 +38,16 @@ def start_concurrency_maintenance concurrency_maintenance&.start end - def load_recurring_schedule - recurring_schedule.load_tasks + def schedule_recurring_tasks + recurring_schedule.schedule_tasks end def stop_concurrency_maintenance concurrency_maintenance&.stop end - def unload_recurring_schedule - recurring_schedule.unload_tasks + def unschedule_recurring_tasks + recurring_schedule.unschedule_tasks end def all_work_completed? diff --git a/lib/solid_queue/dispatcher/recurring_schedule.rb b/lib/solid_queue/dispatcher/recurring_schedule.rb index d32ba3f3..c476ae6e 100644 --- a/lib/solid_queue/dispatcher/recurring_schedule.rb +++ b/lib/solid_queue/dispatcher/recurring_schedule.rb @@ -7,7 +7,7 @@ class Dispatcher::RecurringSchedule attr_reader :configured_tasks, :scheduled_tasks def initialize(tasks) - @configured_tasks = Array(tasks).map { |task| Dispatcher::RecurringTask.wrap(task) } + @configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?) @scheduled_tasks = Concurrent::Hash.new end @@ -15,33 +15,42 @@ def empty? configured_tasks.empty? end - def load_tasks + def schedule_tasks + wrap_in_app_executor do + persist_tasks + reload_tasks + end + configured_tasks.each do |task| - load_task(task) + schedule_task(task) end end - def load_task(task) + def schedule_task(task) scheduled_tasks[task.key] = schedule(task) end - def unload_tasks + def unschedule_tasks scheduled_tasks.values.each(&:cancel) scheduled_tasks.clear end - def tasks - configured_tasks.each_with_object({}) { |task, hsh| hsh[task.key] = task.to_h } - end - - def inspect - configured_tasks.map(&:to_s).join(" | ") + def task_keys + configured_tasks.map(&:key) end private + def persist_tasks + SolidQueue::RecurringTask.create_or_update_all configured_tasks + end + + def reload_tasks + @configured_tasks = SolidQueue::RecurringTask.where(key: task_keys) + end + def schedule(task) scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at| - thread_schedule.load_task(thread_task) + thread_schedule.schedule_task(thread_task) wrap_in_app_executor do thread_task.enqueue(at: thread_task_run_at) diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 52c648f3..8f06fa15 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_02_18_110712) do +ActiveRecord::Schema[7.1].define(version: 2024_07_19_134516) do create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name" t.string "status" @@ -101,6 +101,22 @@ t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true end + create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "command", limit: 2048 + t.string "class_name" + t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 + t.boolean "static", default: true + t.text "description" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" + end + create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.bigint "job_id", null: false t.string "queue_name", null: false diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index da8d5e38..fe1221cf 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -18,10 +18,6 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase teardown do terminate_process(@pid) if process_exists?(@pid) - - SolidQueue::Job.destroy_all - SolidQueue::Process.destroy_all - SolidQueue::Semaphore.delete_all end test "run several conflicting jobs over the same record sequentially" do @@ -33,8 +29,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SequentialUpdateResultJob.perform_later(@result, name: name) end - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs assert_stored_sequence @result, ("A".."K").to_a end @@ -51,7 +47,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end wait_for_jobs_to_finish_for(5.seconds) - assert_no_pending_jobs + assert_no_unfinished_jobs assert_stored_sequence @result, ("A".."K").to_a end @@ -78,8 +74,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs # C would have started in the beginning, seeing the status empty, and would finish after # all other jobs, so it'll do the last update with only itself @@ -96,7 +92,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SequentialUpdateResultJob.perform_later(@result, name: name) end - wait_for_jobs_to_finish_for(3.seconds) + wait_for_jobs_to_finish_for(5.seconds) assert_equal 3, SolidQueue::FailedExecution.count assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a @@ -106,8 +102,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? } # Lock the semaphore so we can enqueue jobs and leave them blocked @@ -128,8 +124,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert SolidQueue::Semaphore.signal(job) # And wait for the dispatcher to release the jobs - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs # We can't ensure the order between B and C, because it depends on which worker wins when # unblocking, as one will try to unblock B and another C @@ -139,8 +135,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "rely on dispatcher to unblock blocked executions with an expired semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? } # Lock the semaphore so we can enqueue jobs and leave them blocked @@ -160,8 +156,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago) # And wait for dispatcher to release the jobs - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs # We can't ensure the order between B and C, because it depends on which worker wins when # unblocking, as one will try to unblock B and another C @@ -198,7 +194,6 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end private - def assert_stored_sequence(result, *sequences) expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join } skip_active_record_query_cache do diff --git a/test/integration/forked_processes_lifecycle_test.rb b/test/integration/forked_processes_lifecycle_test.rb index 9165e2c3..3867e918 100644 --- a/test/integration/forked_processes_lifecycle_test.rb +++ b/test/integration/forked_processes_lifecycle_test.rb @@ -15,17 +15,13 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase teardown do terminate_process(@pid) if process_exists?(@pid) - - SolidQueue::Process.destroy_all - SolidQueue::Job.destroy_all - JobResult.delete_all end test "enqueue jobs in multiple queues" do 6.times { |i| enqueue_store_result_job("job_#{i}") } 6.times { |i| enqueue_store_result_job("job_#{i}", :default) } - wait_for_jobs_to_finish_for(0.5.seconds) + wait_for_jobs_to_finish_for(2.seconds) assert_equal 12, JobResult.count 6.times { |i| assert_completed_job_results("job_#{i}", :background) } @@ -63,17 +59,18 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase signal_process(@pid, :TERM, wait: 0.1.second) end - sleep(0.5.seconds) + sleep(1.second) assert_clean_termination end test "quit supervisor while there are jobs in-flight" do no_pause = enqueue_store_result_job("no pause") - pause = enqueue_store_result_job("pause", pause: 1.seconds) + pause = enqueue_store_result_job("pause", pause: 1.second) - signal_process(@pid, :QUIT, wait: 0.5.second) - wait_for_jobs_to_finish_for(2.5.seconds) + signal_process(@pid, :QUIT, wait: 0.4.second) + wait_for_jobs_to_finish_for(2.seconds, except: pause) + wait_while_with_timeout(2.seconds) { process_exists?(@pid) } assert_not process_exists?(@pid) assert_completed_job_results("no pause") @@ -90,8 +87,8 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase no_pause = enqueue_store_result_job("no pause") pause = enqueue_store_result_job("pause", pause: 0.2.seconds) - signal_process(@pid, :TERM, wait: 0.1.second) - wait_for_jobs_to_finish_for(0.5.seconds) + signal_process(@pid, :TERM, wait: 0.3.second) + wait_for_jobs_to_finish_for(3.seconds) assert_completed_job_results("no pause") assert_completed_job_results("pause") @@ -108,7 +105,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase pause = enqueue_store_result_job("pause", pause: 0.2.seconds) signal_process(@pid, :INT, wait: 0.1.second) - wait_for_jobs_to_finish_for(0.5.seconds) + wait_for_jobs_to_finish_for(2.second) assert_completed_job_results("no pause") assert_completed_job_results("pause") @@ -124,8 +121,9 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase no_pause = enqueue_store_result_job("no pause") pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second) - signal_process(@pid, :TERM, wait: 0.1.second) - wait_for_jobs_to_finish_for(SolidQueue.shutdown_timeout + 0.1.second) + signal_process(@pid, :TERM, wait: 0.5.second) + + sleep(SolidQueue.shutdown_timeout + 0.5.second) assert_completed_job_results("no pause") assert_job_status(no_pause, :finished) @@ -152,12 +150,12 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase 2.times { enqueue_store_result_job("no error", :default, pause: 0.01) } error3 = enqueue_store_result_job("error", :default, exception: RuntimeError) - wait_for_jobs_to_finish_for(0.5.seconds) + wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ]) assert_completed_job_results("no error", :background, 3) assert_completed_job_results("no error", :default, 4) - assert_failures 3 + wait_while_with_timeout(1.second) { SolidQueue::FailedExecution.count < 3 } [ error1, error2, error3 ].each do |job| assert_job_status(job, :failed) end @@ -177,7 +175,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase 2.times { enqueue_store_result_job("no exit", :background) } - wait_for_jobs_to_finish_for(5.seconds) + wait_for_jobs_to_finish_for(3.seconds, except: [ exit_job, pause_job ]) assert_completed_job_results("no exit", :default, 2) assert_completed_job_results("no exit", :background, 4) @@ -208,6 +206,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase assert_nil SolidQueue::Process.find_by(id: worker.id) # Jobs were completed + wait_for_jobs_to_finish_for(1.second) assert_completed_job_results("pause", :background) assert_completed_job_results("pause", :default) @@ -218,7 +217,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase # And they can process jobs just fine enqueue_store_result_job("no_pause") enqueue_store_result_job("no_pause", :default) - wait_for_jobs_to_finish_for(0.2.seconds) + wait_for_jobs_to_finish_for(1.second) assert_completed_job_results("no_pause", :background) assert_completed_job_results("no_pause", :default) @@ -228,19 +227,18 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase end test "kill worker individually" do - killed_pause = enqueue_store_result_job("killed_pause", pause: 1.seconds) + killed_pause = enqueue_store_result_job("killed_pause", pause: 1.second) enqueue_store_result_job("pause", :default, pause: 0.5.seconds) worker = find_processes_registered_as("Worker").detect { |process| process.metadata["queues"].include? "background" } - - signal_process(worker.pid, :KILL, wait: 0.3.second) + signal_process(worker.pid, :KILL, wait: 0.5.seconds) # Worker didn't have time to clean up or finish the work - sleep(0.7.second) + sleep(0.5.second) assert SolidQueue::Process.exists?(id: worker.id) # And there's a new worker that has been registered for the background queue - wait_for_registered_processes(4, timeout: 3.second) + wait_for_registered_processes(4, timeout: 5.second) # The job in the background queue was left claimed as the worker couldn't # finish orderly @@ -252,7 +250,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase # The two current workers can process jobs just fine enqueue_store_result_job("no_pause") enqueue_store_result_job("no_pause", :default) - wait_for_jobs_to_finish_for(0.5.seconds) + sleep(2.seconds) assert_completed_job_results("no_pause", :background) assert_completed_job_results("no_pause", :default) @@ -291,11 +289,15 @@ def enqueue_store_result_job(value, queue_name = :background, **options) end def assert_completed_job_results(value, queue_name = :background, count = 1) - assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count + skip_active_record_query_cache do + assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count + end end def assert_started_job_result(value, queue_name = :background, count = 1) - assert_equal count, JobResult.where(queue_name: queue_name, status: "started", value: value).count + skip_active_record_query_cache do + assert_equal count, JobResult.where(queue_name: queue_name, status: "started", value: value).count + end end def assert_job_status(active_job, status) @@ -311,10 +313,8 @@ def assert_job_status(active_job, status) end def assert_no_claimed_jobs - assert SolidQueue::ClaimedExecution.none? - end - - def assert_failures(count) - assert_equal count, SolidQueue::FailedExecution.count + skip_active_record_query_cache do + assert SolidQueue::ClaimedExecution.none? + end end end diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 3060747b..820a5c7f 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -289,7 +289,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert events.size >= 1 event = events.last - assert_event event, "enqueue_recurring_task", task: :example_task, active_job_id: SolidQueue::Job.last.active_job_id + assert_event event, "enqueue_recurring_task", task: "example_task", active_job_id: SolidQueue::Job.last.active_job_id assert event.last[:at].present? assert_nil event.last[:other_adapter] end @@ -306,7 +306,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert events.size >= 2 events.each do |event| - assert_event event, "enqueue_recurring_task", task: :example_task + assert_event event, "enqueue_recurring_task", task: "example_task" end active_job_ids = SolidQueue::Job.all.map(&:active_job_id) @@ -333,7 +333,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert events.size >= 1 event = events.last - assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "ActiveRecord::Deadlocked: ActiveRecord::Deadlocked" + assert_event event, "enqueue_recurring_task", task: "example_task", enqueue_error: "ActiveRecord::Deadlocked: ActiveRecord::Deadlocked" assert event.last[:at].present? assert_nil event.last[:other_adapter] end @@ -354,7 +354,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert events.size >= 1 event = events.last - assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "All is broken" + assert_event event, "enqueue_recurring_task", task: "example_task", enqueue_error: "All is broken" assert event.last[:at].present? assert event.last[:other_adapter] ensure diff --git a/test/integration/puma/plugin_testing.rb b/test/integration/puma/plugin_testing.rb index 2cbd84be..0cb82095 100644 --- a/test/integration/puma/plugin_testing.rb +++ b/test/integration/puma/plugin_testing.rb @@ -32,16 +32,14 @@ module PluginTesting teardown do terminate_process(@pid, signal: :INT) if process_exists?(@pid) - wait_for_registered_processes 0, timeout: 1.second - - JobResult.delete_all + wait_for_registered_processes 0, timeout: 2.seconds end end test "perform jobs inside puma's process" do StoreResultJob.perform_later(:puma_plugin) - wait_for_jobs_to_finish_for(1.second) + wait_for_jobs_to_finish_for(2.seconds) assert_equal 1, JobResult.where(queue_name: :background, status: "completed", value: :puma_plugin).count end diff --git a/test/integration/recurring_tasks_test.rb b/test/integration/recurring_tasks_test.rb index 6bc3c262..f18e7788 100644 --- a/test/integration/recurring_tasks_test.rb +++ b/test/integration/recurring_tasks_test.rb @@ -16,12 +16,13 @@ class RecurringTasksTest < ActiveSupport::TestCase SolidQueue::Process.destroy_all SolidQueue::Job.destroy_all + SolidQueue::RecurringTask.delete_all JobResult.delete_all end test "enqueue and process periodic tasks" do - wait_for_jobs_to_be_enqueued(2, timeout: 2.seconds) - wait_for_jobs_to_finish_for(2.seconds) + wait_for_jobs_to_be_enqueued(2, timeout: 2.5.seconds) + wait_for_jobs_to_finish_for(2.5.seconds) terminate_process(@pid) @@ -32,7 +33,7 @@ class RecurringTasksTest < ActiveSupport::TestCase assert_equal "StoreResultJob", job.class_name end - assert_equal 2, JobResult.count + assert JobResult.count >= 2 JobResult.all.each do |result| assert_equal "custom_status", result.status assert_equal "42", result.value @@ -40,8 +41,59 @@ class RecurringTasksTest < ActiveSupport::TestCase end end + test "persist and delete configured tasks" do + configured_task = { periodic_store_result: { class: "StoreResultJob", schedule: "every second" } } + # Wait for concurrency schedule loading after process registration + sleep(0.5) + + assert_recurring_tasks configured_task + terminate_process(@pid) + + task = SolidQueue::RecurringTask.find_by(key: "periodic_store_result") + task.update!(class_name: "StoreResultJob", schedule: "every minute", arguments: [ 42 ]) + + @pid = run_supervisor_as_fork + wait_for_registered_processes(4, timeout: 3.second) + + # Wait for concurrency schedule loading after process registration + sleep(0.5) + + assert_recurring_tasks configured_task + + another_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: [ 42 ] } } + dispatcher1 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: another_task).tap(&:start) + wait_for_registered_processes(5, timeout: 1.second) + + assert_recurring_tasks configured_task.merge(another_task) + + updated_task = { example_task: { class: "AddToBufferJob", schedule: "every minute" } } + dispatcher2 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: updated_task).tap(&:start) + wait_for_registered_processes(6, timeout: 1.second) + + assert_recurring_tasks configured_task.merge(updated_task) + + terminate_process(@pid) + dispatcher1.stop + dispatcher2.stop + end + private def wait_for_jobs_to_be_enqueued(count, timeout: 1.second) wait_while_with_timeout(timeout) { SolidQueue::Job.count < count } end + + def assert_recurring_tasks(expected_tasks) + skip_active_record_query_cache do + assert_equal expected_tasks.count, SolidQueue::RecurringTask.count + + expected_tasks.each do |key, attrs| + task = SolidQueue::RecurringTask.find_by(key: key) + assert task.present? + + assert_equal(attrs[:schedule], task.schedule) if attrs[:schedule] + assert_equal(attrs[:class], task.class_name) if attrs[:class] + assert_equal(attrs[:args], task.arguments) if attrs[:args] + end + end + end end diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 0d907a5d..a9b3cc59 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -3,11 +3,6 @@ class SolidQueue::JobTest < ActiveSupport::TestCase self.use_transactional_tests = false - teardown do - SolidQueue::Job.destroy_all - JobResult.delete_all - end - class NonOverlappingJob < ApplicationJob limits_concurrency key: ->(job_result, **) { job_result } diff --git a/test/unit/recurring_task_test.rb b/test/models/solid_queue/recurring_task_test.rb similarity index 88% rename from test/unit/recurring_task_test.rb rename to test/models/solid_queue/recurring_task_test.rb index 886848ba..bad6ec30 100644 --- a/test/unit/recurring_task_test.rb +++ b/test/models/solid_queue/recurring_task_test.rb @@ -1,6 +1,6 @@ require "test_helper" -class RecurringTaskTest < ActiveSupport::TestCase +class SolidQueue::RecurringTaskTest < ActiveSupport::TestCase class JobWithoutArguments < ApplicationJob def perform JobBuffer.add "job_without_arguments" @@ -114,6 +114,16 @@ def perform task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every second") assert task.valid? assert task.to_s.ends_with? "[ * * * * * * ]" + + # Empty schedule + assert_not SolidQueue::RecurringTask.new(key: "task-id", class_name: "SolidQueue::RecurringTaskTest::JobWithoutArguments").valid? + end + + test "undefined job class" do + assert_not recurring_task_with(class_name: "UnknownJob").valid? + + # Empty class name + assert_not SolidQueue::RecurringTask.new(key: "task-id", schedule: "every minute").valid? end private @@ -130,6 +140,6 @@ def enqueue_and_assert_performed_with_result(task, result) end def recurring_task_with(class_name:, schedule: "every hour", args: nil) - SolidQueue::Dispatcher::RecurringTask.from_configuration("task-id", class: "RecurringTaskTest::#{class_name}", schedule: schedule, args: args) + SolidQueue::RecurringTask.new(key: "task-id", class_name: "SolidQueue::RecurringTaskTest::#{class_name}", schedule: schedule, arguments: args) end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 47ef1a74..10fd50af 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -32,14 +32,24 @@ class ActiveSupport::TestCase if SolidQueue.supervisor_pidfile && File.exist?(SolidQueue.supervisor_pidfile) File.delete(SolidQueue.supervisor_pidfile) end + + unless self.class.use_transactional_tests + SolidQueue::Job.destroy_all + SolidQueue::Process.destroy_all + SolidQueue::Semaphore.delete_all + SolidQueue::RecurringTask.delete_all + JobResult.delete_all + end end private - def wait_for_jobs_to_finish_for(timeout = 1.second) - wait_while_with_timeout(timeout) { SolidQueue::Job.where(finished_at: nil).any? } + def wait_for_jobs_to_finish_for(timeout = 1.second, except: []) + wait_while_with_timeout(timeout) do + SolidQueue::Job.where.not(active_job_id: Array(except).map(&:job_id)).where(finished_at: nil).any? + end end - def assert_no_pending_jobs + def assert_no_unfinished_jobs skip_active_record_query_cache do assert SolidQueue::Job.where(finished_at: nil).none? end diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index d3768a2c..73f53dd0 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -12,8 +12,6 @@ class DispatcherTest < ActiveSupport::TestCase teardown do @dispatcher.stop - SolidQueue::Job.delete_all - SolidQueue::Process.delete_all end test "dispatcher is registered as process" do @@ -51,8 +49,7 @@ class DispatcherTest < ActiveSupport::TestCase assert_equal "Dispatcher", process.kind schedule_from_metadata = process.metadata["recurring_schedule"] - assert_equal 1, schedule_from_metadata.size - assert_equal({ "class_name" => "AddToBufferJob", "schedule" => "every hour", "arguments" => [ 42 ] }, schedule_from_metadata["example_task"]) + assert_equal [ "example_task" ], schedule_from_metadata ensure with_recurring_schedule.stop end @@ -88,6 +85,10 @@ class DispatcherTest < ActiveSupport::TestCase sleep 0.2 end end + + @dispatcher.stop + wait_for_registered_processes(0, timeout: 1.second) + assert_no_registered_processes end test "run more than one instance of the dispatcher without recurring tasks" do @@ -101,13 +102,13 @@ class DispatcherTest < ActiveSupport::TestCase @dispatcher.start another_dispatcher.start - sleep 0.5 + sleep(0.7.seconds) assert_equal 0, SolidQueue::ScheduledExecution.count assert_equal 15, SolidQueue::ReadyExecution.count ensure - another_dispatcher.stop + another_dispatcher&.stop end test "run more than one instance of the dispatcher with recurring tasks" do diff --git a/test/unit/fork_supervisor_test.rb b/test/unit/fork_supervisor_test.rb index 49e4b46b..df0f9def 100644 --- a/test/unit/fork_supervisor_test.rb +++ b/test/unit/fork_supervisor_test.rb @@ -12,8 +12,6 @@ class ForkSupervisorTest < ActiveSupport::TestCase teardown do SolidQueue.supervisor_pidfile = @previous_pidfile File.delete(@pidfile) if File.exist?(@pidfile) - - SolidQueue::Process.destroy_all end test "start" do @@ -32,7 +30,7 @@ class ForkSupervisorTest < ActiveSupport::TestCase test "start with provided configuration" do config_as_hash = { workers: [], dispatchers: [ { batch_size: 100 } ] } pid = run_supervisor_as_fork(load_configuration_from: config_as_hash) - wait_for_registered_processes(2) # supervisor + dispatcher + wait_for_registered_processes(2, timeout: 2) # supervisor + dispatcher assert_registered_supervisor(pid) assert_registered_workers(count: 0) diff --git a/test/unit/hooks_test.rb b/test/unit/hooks_test.rb index f6db1718..4b7c2197 100644 --- a/test/unit/hooks_test.rb +++ b/test/unit/hooks_test.rb @@ -2,6 +2,7 @@ class HooksTest < ActiveSupport::TestCase test "solid_queue_record hook ran" do + SolidQueue::Record assert Rails.application.config.x.solid_queue_record_hook_ran end end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index c7c1032e..63b04f7a 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -78,7 +78,7 @@ class WorkerTest < ActiveSupport::TestCase @worker.start - wait_for_jobs_to_finish_for(1.second) + wait_for_jobs_to_finish_for(2.second) @worker.wake_up assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :paused).count @@ -116,6 +116,10 @@ class WorkerTest < ActiveSupport::TestCase sleep 0.2 end end + + @worker.stop + wait_for_registered_processes(0, timeout: 1.second) + assert_no_registered_processes end test "run inline" do