From 7e2b5d1872c6b04904a9714ddbbae5b909ff8032 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Fri, 19 Jul 2024 15:55:52 +0200 Subject: [PATCH 01/11] Convert recurring tasks into Active Records So we can persist them in the DB and support dynamic tasks, more complex serialized arguments and enqueuing them on-demand from Mission Control more easily. --- .../models/solid_queue}/recurring_task.rb | 15 ++++----------- .../20240719134516_create_recurring_tasks.rb | 12 ++++++++++++ lib/solid_queue/configuration.rb | 2 +- lib/solid_queue/dispatcher/recurring_schedule.rb | 2 +- test/dummy/db/schema.rb | 12 +++++++++++- test/integration/instrumentation_test.rb | 2 +- .../solid_queue}/recurring_task_test.rb | 4 ++-- 7 files changed, 32 insertions(+), 17 deletions(-) rename {lib/solid_queue/dispatcher => app/models/solid_queue}/recurring_task.rb (84%) create mode 100644 db/migrate/20240719134516_create_recurring_tasks.rb rename test/{unit => models/solid_queue}/recurring_task_test.rb (96%) diff --git a/lib/solid_queue/dispatcher/recurring_task.rb b/app/models/solid_queue/recurring_task.rb similarity index 84% rename from lib/solid_queue/dispatcher/recurring_task.rb rename to app/models/solid_queue/recurring_task.rb index 7878e6c7..7f825831 100644 --- a/lib/solid_queue/dispatcher/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -1,26 +1,19 @@ require "fugit" module SolidQueue - class Dispatcher::RecurringTask + class RecurringTask < Record + serialize :arguments, coder: JSON + class << self def wrap(args) args.is_a?(self) ? args : from_configuration(args.first, **args.second) end def from_configuration(key, **options) - new(key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args]) + new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args] || []) end end - attr_reader :key, :schedule, :class_name, :arguments - - def initialize(key, class_name:, schedule:, arguments: nil) - @key = key - @class_name = class_name - @schedule = schedule - @arguments = Array(arguments) - end - def delay_from_now [ (next_time - Time.current).to_f, 0 ].max end diff --git a/db/migrate/20240719134516_create_recurring_tasks.rb b/db/migrate/20240719134516_create_recurring_tasks.rb new file mode 100644 index 00000000..17e4f8d1 --- /dev/null +++ b/db/migrate/20240719134516_create_recurring_tasks.rb @@ -0,0 +1,12 @@ +class CreateRecurringTasks < ActiveRecord::Migration[7.1] + def change + create_table :solid_queue_recurring_tasks do |t| + t.string :key, null: false, index: { unique: true } + t.string :schedule, null: false + t.string :class_name, null: false + t.text :arguments + + t.timestamps + end + end +end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index 00b3b0f6..6eb85473 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -75,7 +75,7 @@ def options_from_raw_config(key, defaults) def parse_recurring_tasks(tasks) Array(tasks).map do |id, options| - Dispatcher::RecurringTask.from_configuration(id, **options) + RecurringTask.from_configuration(id, **options) end.select(&:valid?) end diff --git a/lib/solid_queue/dispatcher/recurring_schedule.rb b/lib/solid_queue/dispatcher/recurring_schedule.rb index d32ba3f3..2dd03f3f 100644 --- a/lib/solid_queue/dispatcher/recurring_schedule.rb +++ b/lib/solid_queue/dispatcher/recurring_schedule.rb @@ -7,7 +7,7 @@ class Dispatcher::RecurringSchedule attr_reader :configured_tasks, :scheduled_tasks def initialize(tasks) - @configured_tasks = Array(tasks).map { |task| Dispatcher::RecurringTask.wrap(task) } + @configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) } @scheduled_tasks = Concurrent::Hash.new end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 52c648f3..8f0372db 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2024_02_18_110712) do +ActiveRecord::Schema[7.1].define(version: 2024_07_19_134516) do create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name" t.string "status" @@ -101,6 +101,16 @@ t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true end + create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| + t.string "key", null: false + t.string "schedule", null: false + t.string "class_name", null: false + t.text "arguments" + t.datetime "created_at", null: false + t.datetime "updated_at", null: false + t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + end + create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.bigint "job_id", null: false t.string "queue_name", null: false diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 3060747b..0b5cc17f 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -289,7 +289,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert events.size >= 1 event = events.last - assert_event event, "enqueue_recurring_task", task: :example_task, active_job_id: SolidQueue::Job.last.active_job_id + assert_event event, "enqueue_recurring_task", task: "example_task", active_job_id: SolidQueue::Job.last.active_job_id assert event.last[:at].present? assert_nil event.last[:other_adapter] end diff --git a/test/unit/recurring_task_test.rb b/test/models/solid_queue/recurring_task_test.rb similarity index 96% rename from test/unit/recurring_task_test.rb rename to test/models/solid_queue/recurring_task_test.rb index 886848ba..27d5923c 100644 --- a/test/unit/recurring_task_test.rb +++ b/test/models/solid_queue/recurring_task_test.rb @@ -1,6 +1,6 @@ require "test_helper" -class RecurringTaskTest < ActiveSupport::TestCase +class SolidQueue::RecurringTaskTest < ActiveSupport::TestCase class JobWithoutArguments < ApplicationJob def perform JobBuffer.add "job_without_arguments" @@ -130,6 +130,6 @@ def enqueue_and_assert_performed_with_result(task, result) end def recurring_task_with(class_name:, schedule: "every hour", args: nil) - SolidQueue::Dispatcher::RecurringTask.from_configuration("task-id", class: "RecurringTaskTest::#{class_name}", schedule: schedule, args: args) + SolidQueue::RecurringTask.from_configuration("task-id", class: "SolidQueue::RecurringTaskTest::#{class_name}", schedule: schedule, args: args) end end From 49ae908688a2fd045f51200d34df0b57ae876a08 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Fri, 19 Jul 2024 22:27:39 +0200 Subject: [PATCH 02/11] Serialize recurring task's arguments using a custom coder That uses Active Job's arguments serializer/deserializer. --- app/models/solid_queue/recurring_task.rb | 4 ++-- app/models/solid_queue/recurring_task/arguments.rb | 13 +++++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) create mode 100644 app/models/solid_queue/recurring_task/arguments.rb diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index 7f825831..e2baec73 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -2,7 +2,7 @@ module SolidQueue class RecurringTask < Record - serialize :arguments, coder: JSON + serialize :arguments, coder: Arguments, default: [] class << self def wrap(args) @@ -10,7 +10,7 @@ def wrap(args) end def from_configuration(key, **options) - new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args] || []) + new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args]) end end diff --git a/app/models/solid_queue/recurring_task/arguments.rb b/app/models/solid_queue/recurring_task/arguments.rb new file mode 100644 index 00000000..3c8b22fe --- /dev/null +++ b/app/models/solid_queue/recurring_task/arguments.rb @@ -0,0 +1,13 @@ +module SolidQueue + class RecurringTask::Arguments + class << self + def load(data) + data.nil? ? [] : ActiveJob::Arguments.deserialize(ActiveSupport::JSON.load(data)) + end + + def dump(data) + ActiveSupport::JSON.dump(ActiveJob::Arguments.serialize(data)) unless data.nil? + end + end + end +end From bf420f165e932ff0cb70638e06ccf999f24e72f4 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Fri, 19 Jul 2024 22:58:49 +0200 Subject: [PATCH 03/11] Use Active Model validations and validate class name as well --- app/models/solid_queue/recurring_task.rb | 23 +++++++++++++++---- .../models/solid_queue/recurring_task_test.rb | 12 +++++++++- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index e2baec73..bb535584 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -4,6 +4,9 @@ module SolidQueue class RecurringTask < Record serialize :arguments, coder: Arguments, default: [] + validate :supported_schedule + validate :existing_job_class + class << self def wrap(args) args.is_a?(self) ? args : from_configuration(args.first, **args.second) @@ -44,10 +47,6 @@ def enqueue(at:) end end - def valid? - parsed_schedule.instance_of?(Fugit::Cron) - end - def to_s "#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{parsed_schedule.original} ]" end @@ -61,6 +60,19 @@ def to_h end private + def supported_schedule + unless parsed_schedule.instance_of?(Fugit::Cron) + errors.add :schedule, :unsupported, message: "is not a supported recurring schedule" + end + end + + def existing_job_class + unless job_class.present? + errors.add :class_name, :undefined, message: "doesn't correspond to an existing class" + end + end + + def using_solid_queue_adapter? job_class.queue_adapter_name.inquiry.solid_queue? end @@ -81,12 +93,13 @@ def arguments_with_kwargs end end + def parsed_schedule @parsed_schedule ||= Fugit.parse(schedule) end def job_class - @job_class ||= class_name.safe_constantize + @job_class ||= class_name&.safe_constantize end end end diff --git a/test/models/solid_queue/recurring_task_test.rb b/test/models/solid_queue/recurring_task_test.rb index 27d5923c..bad6ec30 100644 --- a/test/models/solid_queue/recurring_task_test.rb +++ b/test/models/solid_queue/recurring_task_test.rb @@ -114,6 +114,16 @@ def perform task = recurring_task_with(class_name: "JobWithoutArguments", schedule: "every second") assert task.valid? assert task.to_s.ends_with? "[ * * * * * * ]" + + # Empty schedule + assert_not SolidQueue::RecurringTask.new(key: "task-id", class_name: "SolidQueue::RecurringTaskTest::JobWithoutArguments").valid? + end + + test "undefined job class" do + assert_not recurring_task_with(class_name: "UnknownJob").valid? + + # Empty class name + assert_not SolidQueue::RecurringTask.new(key: "task-id", schedule: "every minute").valid? end private @@ -130,6 +140,6 @@ def enqueue_and_assert_performed_with_result(task, result) end def recurring_task_with(class_name:, schedule: "every hour", args: nil) - SolidQueue::RecurringTask.from_configuration("task-id", class: "SolidQueue::RecurringTaskTest::#{class_name}", schedule: schedule, args: args) + SolidQueue::RecurringTask.new(key: "task-id", class_name: "SolidQueue::RecurringTaskTest::#{class_name}", schedule: schedule, arguments: args) end end From 546aaa6205dd2494303b1e95ee0525c1aaa0b91c Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Sat, 20 Jul 2024 20:34:20 +0200 Subject: [PATCH 04/11] Add `static` attribute to recurring tasks So we can differentiate dynamic ones to not delete them. --- db/migrate/20240719134516_create_recurring_tasks.rb | 2 ++ test/dummy/db/schema.rb | 2 ++ 2 files changed, 4 insertions(+) diff --git a/db/migrate/20240719134516_create_recurring_tasks.rb b/db/migrate/20240719134516_create_recurring_tasks.rb index 17e4f8d1..f52fcf58 100644 --- a/db/migrate/20240719134516_create_recurring_tasks.rb +++ b/db/migrate/20240719134516_create_recurring_tasks.rb @@ -6,6 +6,8 @@ def change t.string :class_name, null: false t.text :arguments + t.boolean :static, default: true, index: true + t.timestamps end end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 8f0372db..5c042c21 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -106,9 +106,11 @@ t.string "schedule", null: false t.string "class_name", null: false t.text "arguments" + t.boolean "static", default: true t.datetime "created_at", null: false t.datetime "updated_at", null: false t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true + t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static" end create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| From 62367397cf2e136c106b9c35432d2c94ec6fd76e Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Fri, 2 Aug 2024 13:17:39 +0200 Subject: [PATCH 05/11] Persist and reload recurring tasks before scheduling them And delete them when the schedule is unloaded. --- app/models/solid_queue/recurring_task.rb | 13 +++-- .../solid_queue/recurring_task/arguments.rb | 6 ++- lib/solid_queue/dispatcher.rb | 14 +++--- .../dispatcher/recurring_schedule.rb | 37 +++++++++----- test/integration/instrumentation_test.rb | 6 +-- test/integration/recurring_tasks_test.rb | 49 +++++++++++++++++++ test/unit/dispatcher_test.rb | 3 +- 7 files changed, 97 insertions(+), 31 deletions(-) diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index bb535584..60f11c20 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + require "fugit" module SolidQueue @@ -7,6 +9,8 @@ class RecurringTask < Record validate :supported_schedule validate :existing_job_class + scope :static, -> { where(static: true) } + class << self def wrap(args) args.is_a?(self) ? args : from_configuration(args.first, **args.second) @@ -51,12 +55,8 @@ def to_s "#{class_name}.perform_later(#{arguments.map(&:inspect).join(",")}) [ #{parsed_schedule.original} ]" end - def to_h - { - schedule: schedule, - class_name: class_name, - arguments: arguments - } + def attributes_for_upsert + attributes.without("id", "created_at", "updated_at") end private @@ -72,7 +72,6 @@ def existing_job_class end end - def using_solid_queue_adapter? job_class.queue_adapter_name.inquiry.solid_queue? end diff --git a/app/models/solid_queue/recurring_task/arguments.rb b/app/models/solid_queue/recurring_task/arguments.rb index 3c8b22fe..229f43df 100644 --- a/app/models/solid_queue/recurring_task/arguments.rb +++ b/app/models/solid_queue/recurring_task/arguments.rb @@ -1,3 +1,7 @@ +# frozen_string_literal: true + +require "active_job/arguments" + module SolidQueue class RecurringTask::Arguments class << self @@ -6,7 +10,7 @@ def load(data) end def dump(data) - ActiveSupport::JSON.dump(ActiveJob::Arguments.serialize(data)) unless data.nil? + ActiveSupport::JSON.dump(ActiveJob::Arguments.serialize(Array(data))) end end end diff --git a/lib/solid_queue/dispatcher.rb b/lib/solid_queue/dispatcher.rb index 00f80f08..41c505ba 100644 --- a/lib/solid_queue/dispatcher.rb +++ b/lib/solid_queue/dispatcher.rb @@ -4,8 +4,8 @@ module SolidQueue class Dispatcher < Processes::Poller attr_accessor :batch_size, :concurrency_maintenance, :recurring_schedule - after_boot :start_concurrency_maintenance, :load_recurring_schedule - before_shutdown :stop_concurrency_maintenance, :unload_recurring_schedule + after_boot :start_concurrency_maintenance, :schedule_recurring_tasks + before_shutdown :stop_concurrency_maintenance, :unschedule_recurring_tasks def initialize(**options) options = options.dup.with_defaults(SolidQueue::Configuration::DISPATCHER_DEFAULTS) @@ -19,7 +19,7 @@ def initialize(**options) end def metadata - super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.tasks.presence) + super.merge(batch_size: batch_size, concurrency_maintenance_interval: concurrency_maintenance&.interval, recurring_schedule: recurring_schedule.task_keys.presence) end private @@ -38,16 +38,16 @@ def start_concurrency_maintenance concurrency_maintenance&.start end - def load_recurring_schedule - recurring_schedule.load_tasks + def schedule_recurring_tasks + recurring_schedule.schedule_tasks end def stop_concurrency_maintenance concurrency_maintenance&.stop end - def unload_recurring_schedule - recurring_schedule.unload_tasks + def unschedule_recurring_tasks + recurring_schedule.unschedule_tasks end def all_work_completed? diff --git a/lib/solid_queue/dispatcher/recurring_schedule.rb b/lib/solid_queue/dispatcher/recurring_schedule.rb index 2dd03f3f..5de9f808 100644 --- a/lib/solid_queue/dispatcher/recurring_schedule.rb +++ b/lib/solid_queue/dispatcher/recurring_schedule.rb @@ -7,7 +7,7 @@ class Dispatcher::RecurringSchedule attr_reader :configured_tasks, :scheduled_tasks def initialize(tasks) - @configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) } + @configured_tasks = Array(tasks).map { |task| SolidQueue::RecurringTask.wrap(task) }.select(&:valid?) @scheduled_tasks = Concurrent::Hash.new end @@ -15,33 +15,48 @@ def empty? configured_tasks.empty? end - def load_tasks + def schedule_tasks + wrap_in_app_executor do + persist_tasks + reload_tasks + end + configured_tasks.each do |task| - load_task(task) + schedule_task(task) end end - def load_task(task) + def schedule_task(task) scheduled_tasks[task.key] = schedule(task) end - def unload_tasks + def unschedule_tasks scheduled_tasks.values.each(&:cancel) scheduled_tasks.clear - end - def tasks - configured_tasks.each_with_object({}) { |task, hsh| hsh[task.key] = task.to_h } + wrap_in_app_executor { delete_tasks } end - def inspect - configured_tasks.map(&:to_s).join(" | ") + def task_keys + configured_tasks.map(&:key) end private + def persist_tasks + SolidQueue::RecurringTask.upsert_all configured_tasks.map(&:attributes_for_upsert), record_timestamps: true + end + + def reload_tasks + @configured_tasks = SolidQueue::RecurringTask.where(key: task_keys) + end + + def delete_tasks + SolidQueue::RecurringTask.static.delete_all + end + def schedule(task) scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at| - thread_schedule.load_task(thread_task) + thread_schedule.schedule_task(thread_task) wrap_in_app_executor do thread_task.enqueue(at: thread_task_run_at) diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 0b5cc17f..820a5c7f 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -306,7 +306,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert events.size >= 2 events.each do |event| - assert_event event, "enqueue_recurring_task", task: :example_task + assert_event event, "enqueue_recurring_task", task: "example_task" end active_job_ids = SolidQueue::Job.all.map(&:active_job_id) @@ -333,7 +333,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert events.size >= 1 event = events.last - assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "ActiveRecord::Deadlocked: ActiveRecord::Deadlocked" + assert_event event, "enqueue_recurring_task", task: "example_task", enqueue_error: "ActiveRecord::Deadlocked: ActiveRecord::Deadlocked" assert event.last[:at].present? assert_nil event.last[:other_adapter] end @@ -354,7 +354,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert events.size >= 1 event = events.last - assert_event event, "enqueue_recurring_task", task: :example_task, enqueue_error: "All is broken" + assert_event event, "enqueue_recurring_task", task: "example_task", enqueue_error: "All is broken" assert event.last[:at].present? assert event.last[:other_adapter] ensure diff --git a/test/integration/recurring_tasks_test.rb b/test/integration/recurring_tasks_test.rb index 6bc3c262..35e4a791 100644 --- a/test/integration/recurring_tasks_test.rb +++ b/test/integration/recurring_tasks_test.rb @@ -16,6 +16,7 @@ class RecurringTasksTest < ActiveSupport::TestCase SolidQueue::Process.destroy_all SolidQueue::Job.destroy_all + SolidQueue::RecurringTask.delete_all JobResult.delete_all end @@ -40,8 +41,56 @@ class RecurringTasksTest < ActiveSupport::TestCase end end + test "persist and delete configured tasks" do + configured_task = { periodic_store_result: { class: "StoreResultJob", schedule: "every second" } } + + assert_recurring_tasks configured_task + terminate_process(@pid) + assert_recurring_tasks [] + + SolidQueue::RecurringTask.create!(key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every minute", arguments: [ 42 ]) + + @pid = run_supervisor_as_fork + wait_for_registered_processes(4, timeout: 3.second) + + assert_recurring_tasks configured_task + + another_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: [ 42 ] } } + dispatcher1 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: another_task).tap(&:start) + wait_for_registered_processes(5, timeout: 1.second) + + assert_recurring_tasks configured_task.merge(another_task) + + updated_task = { example_task: { class: "AddToBufferJob", schedule: "every minute" } } + dispatcher2 = SolidQueue::Dispatcher.new(concurrency_maintenance: false, recurring_tasks: updated_task).tap(&:start) + wait_for_registered_processes(6, timeout: 1.second) + + assert_recurring_tasks configured_task.merge(updated_task) + + terminate_process(@pid) + dispatcher1.stop + dispatcher2.stop + + assert_recurring_tasks [] + end + private def wait_for_jobs_to_be_enqueued(count, timeout: 1.second) wait_while_with_timeout(timeout) { SolidQueue::Job.count < count } end + + def assert_recurring_tasks(expected_tasks) + skip_active_record_query_cache do + assert_equal expected_tasks.count, SolidQueue::RecurringTask.count + + expected_tasks.each do |key, attrs| + task = SolidQueue::RecurringTask.find_by(key: key) + assert task.present? + + assert_equal(attrs[:schedule], task.schedule) if attrs[:schedule] + assert_equal(attrs[:class], task.class_name) if attrs[:class] + assert_equal(attrs[:args], task.arguments) if attrs[:args] + end + end + end end diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index d3768a2c..9d7e4ee5 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -51,8 +51,7 @@ class DispatcherTest < ActiveSupport::TestCase assert_equal "Dispatcher", process.kind schedule_from_metadata = process.metadata["recurring_schedule"] - assert_equal 1, schedule_from_metadata.size - assert_equal({ "class_name" => "AddToBufferJob", "schedule" => "every hour", "arguments" => [ 42 ] }, schedule_from_metadata["example_task"]) + assert_equal [ "example_task" ], schedule_from_metadata ensure with_recurring_schedule.stop end From 04a1d93b3ebc8ff94f8906889c3bde2e50096beb Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Fri, 2 Aug 2024 15:51:34 +0200 Subject: [PATCH 06/11] Fix persisting tasks for PostgreSQL on concurrent INSERTs PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict during two concurrent INSERTs for the same value of an unique index. We need to explicitly indicate unique_by to ignore duplicate rows by this value when inserting --- app/models/solid_queue/recurring_task.rb | 11 +++++++++++ lib/solid_queue/dispatcher/recurring_schedule.rb | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/app/models/solid_queue/recurring_task.rb b/app/models/solid_queue/recurring_task.rb index 60f11c20..b302af70 100644 --- a/app/models/solid_queue/recurring_task.rb +++ b/app/models/solid_queue/recurring_task.rb @@ -19,6 +19,17 @@ def wrap(args) def from_configuration(key, **options) new(key: key, class_name: options[:class], schedule: options[:schedule], arguments: options[:args]) end + + def create_or_update_all(tasks) + if connection.supports_insert_conflict_target? + # PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict + # during two concurrent INSERTs for the same value of an unique index. We need to explicitly + # indicate unique_by to ignore duplicate rows by this value when inserting + upsert_all tasks.map(&:attributes_for_upsert), unique_by: :key + else + upsert_all tasks.map(&:attributes_for_upsert) + end + end end def delay_from_now diff --git a/lib/solid_queue/dispatcher/recurring_schedule.rb b/lib/solid_queue/dispatcher/recurring_schedule.rb index 5de9f808..edf9469c 100644 --- a/lib/solid_queue/dispatcher/recurring_schedule.rb +++ b/lib/solid_queue/dispatcher/recurring_schedule.rb @@ -43,7 +43,7 @@ def task_keys private def persist_tasks - SolidQueue::RecurringTask.upsert_all configured_tasks.map(&:attributes_for_upsert), record_timestamps: true + SolidQueue::RecurringTask.create_or_update_all configured_tasks end def reload_tasks From 09eabe2c8f79c3e6e3769b6591042372aef4e776 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 5 Aug 2024 14:57:36 +0200 Subject: [PATCH 07/11] Allow specifying queue, priority, command and description for recurring tasks These will be used later on. --- db/migrate/20240719134516_create_recurring_tasks.rb | 8 +++++++- test/dummy/db/schema.rb | 6 +++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/db/migrate/20240719134516_create_recurring_tasks.rb b/db/migrate/20240719134516_create_recurring_tasks.rb index f52fcf58..6b2ff000 100644 --- a/db/migrate/20240719134516_create_recurring_tasks.rb +++ b/db/migrate/20240719134516_create_recurring_tasks.rb @@ -3,11 +3,17 @@ def change create_table :solid_queue_recurring_tasks do |t| t.string :key, null: false, index: { unique: true } t.string :schedule, null: false - t.string :class_name, null: false + t.string :command, limit: 2048 + t.string :class_name t.text :arguments + t.string :queue_name + t.integer :priority, default: 0 + t.boolean :static, default: true, index: true + t.text :description + t.timestamps end end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 5c042c21..8f06fa15 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -104,9 +104,13 @@ create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "key", null: false t.string "schedule", null: false - t.string "class_name", null: false + t.string "command", limit: 2048 + t.string "class_name" t.text "arguments" + t.string "queue_name" + t.integer "priority", default: 0 t.boolean "static", default: true + t.text "description" t.datetime "created_at", null: false t.datetime "updated_at", null: false t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true From d1676d5aed6864f4df0e0f36c0425efb8b4772ac Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 5 Aug 2024 14:59:07 +0200 Subject: [PATCH 08/11] Add upgrading instructions for the upcoming version --- UPGRADING.md | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/UPGRADING.md b/UPGRADING.md index dde0e8f4..84b760a1 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -1,3 +1,19 @@ +# Upgrading to version 0.5.x +This version includes a new migration to improve recurring tasks. To install it, just run: + +```bash +$ bin/rails solid_queue:install:migrations +``` + +Or, if you're using a different database for Solid Queue: + +```bash +$ bin/rails solid_queue:install:migrations DATABASE= +``` + +And then run the migrations. + + # Upgrading to version 0.4.x This version introduced an _async_ mode to run the supervisor and have all workers and dispatchers run as part of the same process as the supervisor, instead of separate, forked, processes. Together with this, we introduced some changes in how the supervisor is started. Prior this change, you could choose whether you wanted to run workers, dispatchers or both, by starting Solid Queue as `solid_queue:work` or `solid_queue:dispatch`. From version 0.4.0, the only option available is: @@ -26,7 +42,6 @@ the supervisor will run 1 dispatcher and no workers. # Upgrading to version 0.3.x - This version introduced support for [recurring (cron-style) jobs](https://github.com/rails/solid_queue/blob/main/README.md#recurring-tasks), and it needs a new DB migration for it. To install it, just run: ```bash From 845d36ee58bec5dbc06b943064c92a2bb97628a1 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 5 Aug 2024 16:12:29 +0200 Subject: [PATCH 09/11] Handle concurrent inserts in PostgreSQL when enqueuing recurring tasks Very similar to inserting recurring tasks on boot. PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict during two concurrent INSERTs for the same value of an unique index. We need to use `insert` instead of `create` here and indicate unique_by to ignore duplicate rows by this value when inserting --- app/models/solid_queue/recurring_execution.rb | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/app/models/solid_queue/recurring_execution.rb b/app/models/solid_queue/recurring_execution.rb index d801cd65..34160003 100644 --- a/app/models/solid_queue/recurring_execution.rb +++ b/app/models/solid_queue/recurring_execution.rb @@ -7,16 +7,29 @@ class AlreadyRecorded < StandardError; end scope :clearable, -> { where.missing(:job) } class << self + def create_or_insert!(**attributes) + if connection.supports_insert_conflict_target? + # PostgreSQL fails and aborts the current transaction when it hits a duplicate key conflict + # during two concurrent INSERTs for the same value of an unique index. We need to explicitly + # indicate unique_by to ignore duplicate rows by this value when inserting + unless insert(attributes, unique_by: [ :task_key, :run_at ]).any? + raise AlreadyRecorded + end + else + create!(**attributes) + end + rescue ActiveRecord::RecordNotUnique + raise AlreadyRecorded + end + def record(task_key, run_at, &block) transaction do block.call.tap do |active_job| if active_job - create!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at) + create_or_insert!(job_id: active_job.provider_job_id, task_key: task_key, run_at: run_at) end end end - rescue ActiveRecord::RecordNotUnique => e - raise AlreadyRecorded end def clear_in_batches(batch_size: 500) From c2ec696886c3269f09d0da88f7fa979f7ea53860 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Mon, 5 Aug 2024 22:17:12 +0200 Subject: [PATCH 10/11] Improve tests in preparation for bumping Rails dependency --- app/models/solid_queue/semaphore.rb | 5 +- test/integration/concurrency_controls_test.rb | 33 +++++----- .../forked_processes_lifecycle_test.rb | 62 +++++++++---------- test/integration/puma/plugin_testing.rb | 6 +- test/integration/recurring_tasks_test.rb | 11 +++- test/models/solid_queue/job_test.rb | 5 -- test/test_helper.rb | 16 ++++- test/unit/dispatcher_test.rb | 10 +-- test/unit/fork_supervisor_test.rb | 4 +- test/unit/hooks_test.rb | 1 + test/unit/worker_test.rb | 6 +- 11 files changed, 84 insertions(+), 75 deletions(-) diff --git a/app/models/solid_queue/semaphore.rb b/app/models/solid_queue/semaphore.rb index 80f53fd9..62eeb035 100644 --- a/app/models/solid_queue/semaphore.rb +++ b/app/models/solid_queue/semaphore.rb @@ -52,7 +52,6 @@ def signal end private - attr_accessor :job def attempt_creation @@ -63,7 +62,9 @@ def attempt_creation end end - def check_limit_or_decrement = limit == 1 ? false : attempt_decrement + def check_limit_or_decrement + limit == 1 ? false : attempt_decrement + end def attempt_decrement Semaphore.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0 diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index da8d5e38..fe1221cf 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -18,10 +18,6 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase teardown do terminate_process(@pid) if process_exists?(@pid) - - SolidQueue::Job.destroy_all - SolidQueue::Process.destroy_all - SolidQueue::Semaphore.delete_all end test "run several conflicting jobs over the same record sequentially" do @@ -33,8 +29,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SequentialUpdateResultJob.perform_later(@result, name: name) end - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs assert_stored_sequence @result, ("A".."K").to_a end @@ -51,7 +47,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end wait_for_jobs_to_finish_for(5.seconds) - assert_no_pending_jobs + assert_no_unfinished_jobs assert_stored_sequence @result, ("A".."K").to_a end @@ -78,8 +74,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end end - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs # C would have started in the beginning, seeing the status empty, and would finish after # all other jobs, so it'll do the last update with only itself @@ -96,7 +92,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SequentialUpdateResultJob.perform_later(@result, name: name) end - wait_for_jobs_to_finish_for(3.seconds) + wait_for_jobs_to_finish_for(5.seconds) assert_equal 3, SolidQueue::FailedExecution.count assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a @@ -106,8 +102,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? } # Lock the semaphore so we can enqueue jobs and leave them blocked @@ -128,8 +124,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase assert SolidQueue::Semaphore.signal(job) # And wait for the dispatcher to release the jobs - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs # We can't ensure the order between B and C, because it depends on which worker wins when # unblocking, as one will try to unblock B and another C @@ -139,8 +135,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "rely on dispatcher to unblock blocked executions with an expired semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs job = SequentialUpdateResultJob.perform_later(@result, name: "A") - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? } # Lock the semaphore so we can enqueue jobs and leave them blocked @@ -160,8 +156,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago) # And wait for dispatcher to release the jobs - wait_for_jobs_to_finish_for(3.seconds) - assert_no_pending_jobs + wait_for_jobs_to_finish_for(5.seconds) + assert_no_unfinished_jobs # We can't ensure the order between B and C, because it depends on which worker wins when # unblocking, as one will try to unblock B and another C @@ -198,7 +194,6 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end private - def assert_stored_sequence(result, *sequences) expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join } skip_active_record_query_cache do diff --git a/test/integration/forked_processes_lifecycle_test.rb b/test/integration/forked_processes_lifecycle_test.rb index 9165e2c3..3867e918 100644 --- a/test/integration/forked_processes_lifecycle_test.rb +++ b/test/integration/forked_processes_lifecycle_test.rb @@ -15,17 +15,13 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase teardown do terminate_process(@pid) if process_exists?(@pid) - - SolidQueue::Process.destroy_all - SolidQueue::Job.destroy_all - JobResult.delete_all end test "enqueue jobs in multiple queues" do 6.times { |i| enqueue_store_result_job("job_#{i}") } 6.times { |i| enqueue_store_result_job("job_#{i}", :default) } - wait_for_jobs_to_finish_for(0.5.seconds) + wait_for_jobs_to_finish_for(2.seconds) assert_equal 12, JobResult.count 6.times { |i| assert_completed_job_results("job_#{i}", :background) } @@ -63,17 +59,18 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase signal_process(@pid, :TERM, wait: 0.1.second) end - sleep(0.5.seconds) + sleep(1.second) assert_clean_termination end test "quit supervisor while there are jobs in-flight" do no_pause = enqueue_store_result_job("no pause") - pause = enqueue_store_result_job("pause", pause: 1.seconds) + pause = enqueue_store_result_job("pause", pause: 1.second) - signal_process(@pid, :QUIT, wait: 0.5.second) - wait_for_jobs_to_finish_for(2.5.seconds) + signal_process(@pid, :QUIT, wait: 0.4.second) + wait_for_jobs_to_finish_for(2.seconds, except: pause) + wait_while_with_timeout(2.seconds) { process_exists?(@pid) } assert_not process_exists?(@pid) assert_completed_job_results("no pause") @@ -90,8 +87,8 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase no_pause = enqueue_store_result_job("no pause") pause = enqueue_store_result_job("pause", pause: 0.2.seconds) - signal_process(@pid, :TERM, wait: 0.1.second) - wait_for_jobs_to_finish_for(0.5.seconds) + signal_process(@pid, :TERM, wait: 0.3.second) + wait_for_jobs_to_finish_for(3.seconds) assert_completed_job_results("no pause") assert_completed_job_results("pause") @@ -108,7 +105,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase pause = enqueue_store_result_job("pause", pause: 0.2.seconds) signal_process(@pid, :INT, wait: 0.1.second) - wait_for_jobs_to_finish_for(0.5.seconds) + wait_for_jobs_to_finish_for(2.second) assert_completed_job_results("no pause") assert_completed_job_results("pause") @@ -124,8 +121,9 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase no_pause = enqueue_store_result_job("no pause") pause = enqueue_store_result_job("pause", pause: SolidQueue.shutdown_timeout + 10.second) - signal_process(@pid, :TERM, wait: 0.1.second) - wait_for_jobs_to_finish_for(SolidQueue.shutdown_timeout + 0.1.second) + signal_process(@pid, :TERM, wait: 0.5.second) + + sleep(SolidQueue.shutdown_timeout + 0.5.second) assert_completed_job_results("no pause") assert_job_status(no_pause, :finished) @@ -152,12 +150,12 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase 2.times { enqueue_store_result_job("no error", :default, pause: 0.01) } error3 = enqueue_store_result_job("error", :default, exception: RuntimeError) - wait_for_jobs_to_finish_for(0.5.seconds) + wait_for_jobs_to_finish_for(2.second, except: [ error1, error2, error3 ]) assert_completed_job_results("no error", :background, 3) assert_completed_job_results("no error", :default, 4) - assert_failures 3 + wait_while_with_timeout(1.second) { SolidQueue::FailedExecution.count < 3 } [ error1, error2, error3 ].each do |job| assert_job_status(job, :failed) end @@ -177,7 +175,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase 2.times { enqueue_store_result_job("no exit", :background) } - wait_for_jobs_to_finish_for(5.seconds) + wait_for_jobs_to_finish_for(3.seconds, except: [ exit_job, pause_job ]) assert_completed_job_results("no exit", :default, 2) assert_completed_job_results("no exit", :background, 4) @@ -208,6 +206,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase assert_nil SolidQueue::Process.find_by(id: worker.id) # Jobs were completed + wait_for_jobs_to_finish_for(1.second) assert_completed_job_results("pause", :background) assert_completed_job_results("pause", :default) @@ -218,7 +217,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase # And they can process jobs just fine enqueue_store_result_job("no_pause") enqueue_store_result_job("no_pause", :default) - wait_for_jobs_to_finish_for(0.2.seconds) + wait_for_jobs_to_finish_for(1.second) assert_completed_job_results("no_pause", :background) assert_completed_job_results("no_pause", :default) @@ -228,19 +227,18 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase end test "kill worker individually" do - killed_pause = enqueue_store_result_job("killed_pause", pause: 1.seconds) + killed_pause = enqueue_store_result_job("killed_pause", pause: 1.second) enqueue_store_result_job("pause", :default, pause: 0.5.seconds) worker = find_processes_registered_as("Worker").detect { |process| process.metadata["queues"].include? "background" } - - signal_process(worker.pid, :KILL, wait: 0.3.second) + signal_process(worker.pid, :KILL, wait: 0.5.seconds) # Worker didn't have time to clean up or finish the work - sleep(0.7.second) + sleep(0.5.second) assert SolidQueue::Process.exists?(id: worker.id) # And there's a new worker that has been registered for the background queue - wait_for_registered_processes(4, timeout: 3.second) + wait_for_registered_processes(4, timeout: 5.second) # The job in the background queue was left claimed as the worker couldn't # finish orderly @@ -252,7 +250,7 @@ class ForkedProcessesLifecycleTest < ActiveSupport::TestCase # The two current workers can process jobs just fine enqueue_store_result_job("no_pause") enqueue_store_result_job("no_pause", :default) - wait_for_jobs_to_finish_for(0.5.seconds) + sleep(2.seconds) assert_completed_job_results("no_pause", :background) assert_completed_job_results("no_pause", :default) @@ -291,11 +289,15 @@ def enqueue_store_result_job(value, queue_name = :background, **options) end def assert_completed_job_results(value, queue_name = :background, count = 1) - assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count + skip_active_record_query_cache do + assert_equal count, JobResult.where(queue_name: queue_name, status: "completed", value: value).count + end end def assert_started_job_result(value, queue_name = :background, count = 1) - assert_equal count, JobResult.where(queue_name: queue_name, status: "started", value: value).count + skip_active_record_query_cache do + assert_equal count, JobResult.where(queue_name: queue_name, status: "started", value: value).count + end end def assert_job_status(active_job, status) @@ -311,10 +313,8 @@ def assert_job_status(active_job, status) end def assert_no_claimed_jobs - assert SolidQueue::ClaimedExecution.none? - end - - def assert_failures(count) - assert_equal count, SolidQueue::FailedExecution.count + skip_active_record_query_cache do + assert SolidQueue::ClaimedExecution.none? + end end end diff --git a/test/integration/puma/plugin_testing.rb b/test/integration/puma/plugin_testing.rb index 2cbd84be..0cb82095 100644 --- a/test/integration/puma/plugin_testing.rb +++ b/test/integration/puma/plugin_testing.rb @@ -32,16 +32,14 @@ module PluginTesting teardown do terminate_process(@pid, signal: :INT) if process_exists?(@pid) - wait_for_registered_processes 0, timeout: 1.second - - JobResult.delete_all + wait_for_registered_processes 0, timeout: 2.seconds end end test "perform jobs inside puma's process" do StoreResultJob.perform_later(:puma_plugin) - wait_for_jobs_to_finish_for(1.second) + wait_for_jobs_to_finish_for(2.seconds) assert_equal 1, JobResult.where(queue_name: :background, status: "completed", value: :puma_plugin).count end diff --git a/test/integration/recurring_tasks_test.rb b/test/integration/recurring_tasks_test.rb index 35e4a791..f3100dba 100644 --- a/test/integration/recurring_tasks_test.rb +++ b/test/integration/recurring_tasks_test.rb @@ -21,8 +21,8 @@ class RecurringTasksTest < ActiveSupport::TestCase end test "enqueue and process periodic tasks" do - wait_for_jobs_to_be_enqueued(2, timeout: 2.seconds) - wait_for_jobs_to_finish_for(2.seconds) + wait_for_jobs_to_be_enqueued(2, timeout: 2.5.seconds) + wait_for_jobs_to_finish_for(2.5.seconds) terminate_process(@pid) @@ -33,7 +33,7 @@ class RecurringTasksTest < ActiveSupport::TestCase assert_equal "StoreResultJob", job.class_name end - assert_equal 2, JobResult.count + assert JobResult.count >= 2 JobResult.all.each do |result| assert_equal "custom_status", result.status assert_equal "42", result.value @@ -43,6 +43,8 @@ class RecurringTasksTest < ActiveSupport::TestCase test "persist and delete configured tasks" do configured_task = { periodic_store_result: { class: "StoreResultJob", schedule: "every second" } } + # Wait for concurrency schedule loading after process registration + sleep(0.5) assert_recurring_tasks configured_task terminate_process(@pid) @@ -53,6 +55,9 @@ class RecurringTasksTest < ActiveSupport::TestCase @pid = run_supervisor_as_fork wait_for_registered_processes(4, timeout: 3.second) + # Wait for concurrency schedule loading after process registration + sleep(0.5) + assert_recurring_tasks configured_task another_task = { example_task: { class: "AddToBufferJob", schedule: "every hour", args: [ 42 ] } } diff --git a/test/models/solid_queue/job_test.rb b/test/models/solid_queue/job_test.rb index 0d907a5d..a9b3cc59 100644 --- a/test/models/solid_queue/job_test.rb +++ b/test/models/solid_queue/job_test.rb @@ -3,11 +3,6 @@ class SolidQueue::JobTest < ActiveSupport::TestCase self.use_transactional_tests = false - teardown do - SolidQueue::Job.destroy_all - JobResult.delete_all - end - class NonOverlappingJob < ApplicationJob limits_concurrency key: ->(job_result, **) { job_result } diff --git a/test/test_helper.rb b/test/test_helper.rb index 47ef1a74..10fd50af 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -32,14 +32,24 @@ class ActiveSupport::TestCase if SolidQueue.supervisor_pidfile && File.exist?(SolidQueue.supervisor_pidfile) File.delete(SolidQueue.supervisor_pidfile) end + + unless self.class.use_transactional_tests + SolidQueue::Job.destroy_all + SolidQueue::Process.destroy_all + SolidQueue::Semaphore.delete_all + SolidQueue::RecurringTask.delete_all + JobResult.delete_all + end end private - def wait_for_jobs_to_finish_for(timeout = 1.second) - wait_while_with_timeout(timeout) { SolidQueue::Job.where(finished_at: nil).any? } + def wait_for_jobs_to_finish_for(timeout = 1.second, except: []) + wait_while_with_timeout(timeout) do + SolidQueue::Job.where.not(active_job_id: Array(except).map(&:job_id)).where(finished_at: nil).any? + end end - def assert_no_pending_jobs + def assert_no_unfinished_jobs skip_active_record_query_cache do assert SolidQueue::Job.where(finished_at: nil).none? end diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index 9d7e4ee5..73f53dd0 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -12,8 +12,6 @@ class DispatcherTest < ActiveSupport::TestCase teardown do @dispatcher.stop - SolidQueue::Job.delete_all - SolidQueue::Process.delete_all end test "dispatcher is registered as process" do @@ -87,6 +85,10 @@ class DispatcherTest < ActiveSupport::TestCase sleep 0.2 end end + + @dispatcher.stop + wait_for_registered_processes(0, timeout: 1.second) + assert_no_registered_processes end test "run more than one instance of the dispatcher without recurring tasks" do @@ -100,13 +102,13 @@ class DispatcherTest < ActiveSupport::TestCase @dispatcher.start another_dispatcher.start - sleep 0.5 + sleep(0.7.seconds) assert_equal 0, SolidQueue::ScheduledExecution.count assert_equal 15, SolidQueue::ReadyExecution.count ensure - another_dispatcher.stop + another_dispatcher&.stop end test "run more than one instance of the dispatcher with recurring tasks" do diff --git a/test/unit/fork_supervisor_test.rb b/test/unit/fork_supervisor_test.rb index 49e4b46b..df0f9def 100644 --- a/test/unit/fork_supervisor_test.rb +++ b/test/unit/fork_supervisor_test.rb @@ -12,8 +12,6 @@ class ForkSupervisorTest < ActiveSupport::TestCase teardown do SolidQueue.supervisor_pidfile = @previous_pidfile File.delete(@pidfile) if File.exist?(@pidfile) - - SolidQueue::Process.destroy_all end test "start" do @@ -32,7 +30,7 @@ class ForkSupervisorTest < ActiveSupport::TestCase test "start with provided configuration" do config_as_hash = { workers: [], dispatchers: [ { batch_size: 100 } ] } pid = run_supervisor_as_fork(load_configuration_from: config_as_hash) - wait_for_registered_processes(2) # supervisor + dispatcher + wait_for_registered_processes(2, timeout: 2) # supervisor + dispatcher assert_registered_supervisor(pid) assert_registered_workers(count: 0) diff --git a/test/unit/hooks_test.rb b/test/unit/hooks_test.rb index f6db1718..4b7c2197 100644 --- a/test/unit/hooks_test.rb +++ b/test/unit/hooks_test.rb @@ -2,6 +2,7 @@ class HooksTest < ActiveSupport::TestCase test "solid_queue_record hook ran" do + SolidQueue::Record assert Rails.application.config.x.solid_queue_record_hook_ran end end diff --git a/test/unit/worker_test.rb b/test/unit/worker_test.rb index c7c1032e..63b04f7a 100644 --- a/test/unit/worker_test.rb +++ b/test/unit/worker_test.rb @@ -78,7 +78,7 @@ class WorkerTest < ActiveSupport::TestCase @worker.start - wait_for_jobs_to_finish_for(1.second) + wait_for_jobs_to_finish_for(2.second) @worker.wake_up assert_equal 5, JobResult.where(queue_name: :background, status: "completed", value: :paused).count @@ -116,6 +116,10 @@ class WorkerTest < ActiveSupport::TestCase sleep 0.2 end end + + @worker.stop + wait_for_registered_processes(0, timeout: 1.second) + assert_no_registered_processes end test "run inline" do From 953349c09e65569918f52ed07f665253345ec562 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 6 Aug 2024 18:18:38 +0200 Subject: [PATCH 11/11] Don't delete persisted recurring tasks when the dispatcher shuts down --- lib/solid_queue/dispatcher/recurring_schedule.rb | 6 ------ test/integration/recurring_tasks_test.rb | 6 ++---- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/lib/solid_queue/dispatcher/recurring_schedule.rb b/lib/solid_queue/dispatcher/recurring_schedule.rb index edf9469c..c476ae6e 100644 --- a/lib/solid_queue/dispatcher/recurring_schedule.rb +++ b/lib/solid_queue/dispatcher/recurring_schedule.rb @@ -33,8 +33,6 @@ def schedule_task(task) def unschedule_tasks scheduled_tasks.values.each(&:cancel) scheduled_tasks.clear - - wrap_in_app_executor { delete_tasks } end def task_keys @@ -50,10 +48,6 @@ def reload_tasks @configured_tasks = SolidQueue::RecurringTask.where(key: task_keys) end - def delete_tasks - SolidQueue::RecurringTask.static.delete_all - end - def schedule(task) scheduled_task = Concurrent::ScheduledTask.new(task.delay_from_now, args: [ self, task, task.next_time ]) do |thread_schedule, thread_task, thread_task_run_at| thread_schedule.schedule_task(thread_task) diff --git a/test/integration/recurring_tasks_test.rb b/test/integration/recurring_tasks_test.rb index f3100dba..f18e7788 100644 --- a/test/integration/recurring_tasks_test.rb +++ b/test/integration/recurring_tasks_test.rb @@ -48,9 +48,9 @@ class RecurringTasksTest < ActiveSupport::TestCase assert_recurring_tasks configured_task terminate_process(@pid) - assert_recurring_tasks [] - SolidQueue::RecurringTask.create!(key: "periodic_store_result", class_name: "StoreResultJob", schedule: "every minute", arguments: [ 42 ]) + task = SolidQueue::RecurringTask.find_by(key: "periodic_store_result") + task.update!(class_name: "StoreResultJob", schedule: "every minute", arguments: [ 42 ]) @pid = run_supervisor_as_fork wait_for_registered_processes(4, timeout: 3.second) @@ -75,8 +75,6 @@ class RecurringTasksTest < ActiveSupport::TestCase terminate_process(@pid) dispatcher1.stop dispatcher2.stop - - assert_recurring_tasks [] end private