diff --git a/CHANGELOG.md b/CHANGELOG.md index 6deb7a63..8fe124df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,16 @@ All other sections are for end-users. ## [Unreleased] +## [v0.17.0] - 2022-07-13 + +### Added + +- `worker.sleep_msec_no_row` config value ([#229](https://github.com/SpringQL/SpringQL/pull/229)) + +### Fixed + +- Changed the default sleep length from 10ms to 100ms for when a source / a generic worker does not receive any row ([#229](https://github.com/SpringQL/SpringQL/pull/229)) + ## [v0.16.1] - 2022-07-13 ### Fixed diff --git a/springql-core/src/api/spring_config.rs b/springql-core/src/api/spring_config.rs index 6c777802..77b5da17 100644 --- a/springql-core/src/api/spring_config.rs +++ b/springql-core/src/api/spring_config.rs @@ -18,6 +18,10 @@ n_generic_worker_threads = 1 # Setting this to > 1 may improve throughput but lead to out-of-order stream processing. n_source_worker_threads = 1 +# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream. +# Small number will improve the initial row's E2E latency but increase the CPU usage. +sleep_msec_no_row = 100 + [memory] # How much memory is allowed to be used in SpringQL streaming runtime. upper_limit_bytes = 10_000_000 @@ -136,6 +140,7 @@ impl SpringConfig { pub struct SpringWorkerConfig { pub n_generic_worker_threads: u16, pub n_source_worker_threads: u16, + pub sleep_msec_no_row: u64, } /// Config related to memory management. diff --git a/springql-core/src/stream_engine/autonomous_executor/task_executor.rs b/springql-core/src/stream_engine/autonomous_executor/task_executor.rs index 0db955da..3563436c 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task_executor.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task_executor.rs @@ -50,14 +50,14 @@ impl TaskExecutor { repos: repos.clone(), _generic_worker_pool: GenericWorkerPool::new( - config.worker.n_generic_worker_threads, + &config.worker, locks.clone(), event_queues.clone(), coordinators.clone(), repos.clone(), ), _source_worker_pool: SourceWorkerPool::new( - config.worker.n_source_worker_threads, + &config.worker, locks, event_queues, coordinators, diff --git a/springql-core/src/stream_engine/autonomous_executor/task_executor/generic_worker_pool.rs b/springql-core/src/stream_engine/autonomous_executor/task_executor/generic_worker_pool.rs index 11edb953..498183ed 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task_executor/generic_worker_pool.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task_executor/generic_worker_pool.rs @@ -4,12 +4,15 @@ mod generic_worker; use std::{cell::RefCell, sync::Arc}; -use crate::stream_engine::autonomous_executor::{ - args::{Coordinators, EventQueues, Locks}, - repositories::Repositories, - task_executor::{ - generic_worker_pool::generic_worker::GenericWorker, - task_worker_thread_handler::{TaskWorkerId, TaskWorkerThreadArg}, +use crate::{ + api::SpringWorkerConfig, + stream_engine::autonomous_executor::{ + args::{Coordinators, EventQueues, Locks}, + repositories::Repositories, + task_executor::{ + generic_worker_pool::generic_worker::GenericWorker, + task_worker_thread_handler::{TaskWorkerId, TaskWorkerThreadArg}, + }, }, }; @@ -26,18 +29,19 @@ pub struct GenericWorkerPool { impl GenericWorkerPool { pub fn new( - n_worker_threads: u16, + config: &SpringWorkerConfig, locks: Locks, event_queues: EventQueues, coordinators: Coordinators, repos: Arc, ) -> Self { - let workers = (0..n_worker_threads) + let workers = (0..config.n_generic_worker_threads) .map(|id| { let arg = TaskWorkerThreadArg::new( TaskWorkerId::new(id as u16), locks.task_executor_lock.clone(), repos.clone(), + config.sleep_msec_no_row, ); GenericWorker::new( locks.main_job_lock.clone(), diff --git a/springql-core/src/stream_engine/autonomous_executor/task_executor/source_worker_pool.rs b/springql-core/src/stream_engine/autonomous_executor/task_executor/source_worker_pool.rs index ab7527a0..a5250104 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task_executor/source_worker_pool.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task_executor/source_worker_pool.rs @@ -4,12 +4,15 @@ mod source_worker; use std::{cell::RefCell, sync::Arc}; -use crate::stream_engine::autonomous_executor::{ - args::{Coordinators, EventQueues, Locks}, - repositories::Repositories, - task_executor::{ - source_worker_pool::source_worker::SourceWorker, - task_worker_thread_handler::{TaskWorkerId, TaskWorkerThreadArg}, +use crate::{ + api::SpringWorkerConfig, + stream_engine::autonomous_executor::{ + args::{Coordinators, EventQueues, Locks}, + repositories::Repositories, + task_executor::{ + source_worker_pool::source_worker::SourceWorker, + task_worker_thread_handler::{TaskWorkerId, TaskWorkerThreadArg}, + }, }, }; @@ -26,18 +29,19 @@ pub struct SourceWorkerPool { impl SourceWorkerPool { pub fn new( - n_worker_threads: u16, + config: &SpringWorkerConfig, locks: Locks, event_queues: EventQueues, coordinators: Coordinators, repos: Arc, ) -> Self { - let workers = (0..n_worker_threads) + let workers = (0..config.n_source_worker_threads) .map(|id| { let arg = TaskWorkerThreadArg::new( TaskWorkerId::new(id as u16), locks.task_executor_lock.clone(), repos.clone(), + config.sleep_msec_no_row, ); SourceWorker::new( locks.main_job_lock.clone(), diff --git a/springql-core/src/stream_engine/autonomous_executor/task_executor/task_worker_thread_handler.rs b/springql-core/src/stream_engine/autonomous_executor/task_executor/task_worker_thread_handler.rs index c2273be1..84eca855 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task_executor/task_worker_thread_handler.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task_executor/task_worker_thread_handler.rs @@ -16,7 +16,7 @@ use crate::stream_engine::autonomous_executor::{ AutonomousExecutor, }; -/// Sleep duration for when no (foreign / input) rows are available for the (source / generic) task. +/// Sleep duration for when no tasks are available for the (source / generic) worker. const TASK_WAIT_MSEC: u64 = 10; #[derive(Debug)] @@ -35,6 +35,7 @@ pub struct TaskWorkerThreadArg { pub worker_id: TaskWorkerId, task_executor_lock: Arc, repos: Arc, + sleep_msec_no_row: u64, } #[derive(Debug)] @@ -93,7 +94,7 @@ impl TaskWorkerThreadHandler { ); if processed_rows.is_empty() { // Wait for rows to process - thread::sleep(Duration::from_millis(TASK_WAIT_MSEC)); + thread::sleep(Duration::from_millis(thread_arg.sleep_msec_no_row)); } } else { // Wait for tasks to execute diff --git a/springql/tests/feat_worker_config.rs b/springql/tests/feat_worker_config.rs index 633ef89b..5e3840ee 100644 --- a/springql/tests/feat_worker_config.rs +++ b/springql/tests/feat_worker_config.rs @@ -11,9 +11,15 @@ use springql_foreign_service::{ }; use springql_test_logger::setup_test_logger; -fn t(worker_config: SpringWorkerConfig) { +fn t(n_generic_worker_threads: u16, n_source_worker_threads: u16) { setup_test_logger(); + let worker_config = SpringWorkerConfig { + n_generic_worker_threads, + n_source_worker_threads, + sleep_msec_no_row: 100, + }; + let json_oracle = json!({ "ts": "2021-11-04 23:02:52.123456789", "ticker": "ORCL", @@ -100,32 +106,20 @@ fn t(worker_config: SpringWorkerConfig) { #[test] fn test_feat_1generic_1source() { - t(SpringWorkerConfig { - n_generic_worker_threads: 1, - n_source_worker_threads: 1, - }) + t(1, 1) } #[test] fn test_feat_5generic_1source() { - t(SpringWorkerConfig { - n_generic_worker_threads: 5, - n_source_worker_threads: 1, - }) + t(5, 1) } #[test] fn test_feat_1generic_5source() { - t(SpringWorkerConfig { - n_generic_worker_threads: 1, - n_source_worker_threads: 5, - }) + t(1, 5) } #[test] fn test_feat_5generic_5source() { - t(SpringWorkerConfig { - n_generic_worker_threads: 5, - n_source_worker_threads: 5, - }) + t(5, 5) } diff --git a/springql/tests/test_support/mod.rs b/springql/tests/test_support/mod.rs index ebf90c5a..5cae06be 100644 --- a/springql/tests/test_support/mod.rs +++ b/springql/tests/test_support/mod.rs @@ -19,7 +19,7 @@ pub fn apply_ddls(ddls: &[String], config: SpringConfig) -> SpringPipeline { #[allow(dead_code)] pub fn drain_from_sink(sink: &ForeignSink) -> Vec { let mut received = Vec::new(); - while let Some(v) = sink.try_receive(Duration::from_secs(1)) { + while let Some(v) = sink.try_receive(Duration::from_secs(2)) { received.push(v); } received