From 69854c07c557352148b180d7ae659ca746ee220a Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 16 Jun 2023 16:45:06 +0800 Subject: [PATCH] fix: wait for compaction task to finish (#1783) --- src/datanode/src/instance.rs | 2 +- src/storage/src/compaction/scheduler.rs | 13 ++++++------ src/storage/src/region/tests/compact.rs | 27 ++++++++++++++++++++----- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 68ea2c828c81..aa00d6423b2f 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -354,7 +354,7 @@ impl Instance { fn create_compaction_scheduler(opts: &DatanodeOptions) -> CompactionSchedulerRef { let picker = SimplePicker::default(); let config = SchedulerConfig::from(opts); - let handler = CompactionHandler::new(picker); + let handler = CompactionHandler { picker }; let scheduler = LocalScheduler::new(config, handler); Arc::new(scheduler) } diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index 286fce8d3c5f..35fc6cff964f 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -81,12 +81,8 @@ impl CompactionRequestImpl { pub struct CompactionHandler

{ pub picker: P, -} - -impl

CompactionHandler

{ - pub fn new(picker: P) -> Self { - Self { picker } - } + #[cfg(test)] + pub pending_tasks: Arc>>>, } #[async_trait::async_trait] @@ -111,7 +107,7 @@ where debug!("Compaction task, region: {:?}, task: {:?}", region_id, task); // TODO(hl): we need to keep a track of task handle here to allow task cancellation. - common_runtime::spawn_bg(async move { + let _handle = common_runtime::spawn_bg(async move { if let Err(e) = task.run().await { // TODO(hl): maybe resubmit compaction task on failure? error!(e; "Failed to compact region: {:?}", region_id); @@ -128,6 +124,9 @@ where finish_notifier.notify_one(); }); + #[cfg(test)] + self.pending_tasks.write().await.push(_handle); + Ok(()) } } diff --git a/src/storage/src/region/tests/compact.rs b/src/storage/src/region/tests/compact.rs index 7586ac034e64..298729b6a064 100644 --- a/src/storage/src/region/tests/compact.rs +++ b/src/storage/src/region/tests/compact.rs @@ -24,7 +24,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore; use object_store::services::{Fs, S3}; use object_store::ObjectStore; use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse}; -use tokio::sync::Notify; +use tokio::sync::{Notify, RwLock}; use crate::compaction::{CompactionHandler, SimplePicker}; use crate::config::EngineConfig; @@ -74,7 +74,11 @@ async fn create_region_for_compaction< purge_handler: H, flush_strategy: FlushStrategyRef, s3_bucket: Option, -) -> (RegionImpl, ObjectStore) { +) -> ( + RegionImpl, + ObjectStore, + Arc>>>, +) { let metadata = tests::new_metadata(REGION_NAME); let object_store = new_object_store(store_dir, s3_bucket); @@ -90,7 +94,12 @@ async fn create_region_for_compaction< store_config.flush_strategy = flush_strategy; let picker = SimplePicker::default(); - let handler = CompactionHandler::new(picker); + let pending_compaction_tasks = Arc::new(RwLock::new(vec![])); + let handler = CompactionHandler { + picker, + #[cfg(test)] + pending_tasks: pending_compaction_tasks.clone(), + }; let config = SchedulerConfig::default(); // Overwrite test compaction scheduler and file purger. store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler)); @@ -104,6 +113,7 @@ async fn create_region_for_compaction< ( RegionImpl::create(metadata, store_config).await.unwrap(), object_store, + pending_compaction_tasks, ) } @@ -154,6 +164,7 @@ struct CompactionTester { store_dir: String, engine_config: EngineConfig, flush_strategy: FlushStrategyRef, + pending_tasks: Arc>>>, } impl CompactionTester { @@ -164,7 +175,7 @@ impl CompactionTester { s3_bucket: Option, ) -> CompactionTester { let purge_handler = MockFilePurgeHandler::default(); - let (region, object_store) = create_region_for_compaction( + let (region, object_store, pending_tasks) = create_region_for_compaction( store_dir, engine_config.clone(), purge_handler.clone(), @@ -180,6 +191,7 @@ impl CompactionTester { store_dir: store_dir.to_string(), engine_config, flush_strategy, + pending_tasks, } } @@ -231,6 +243,7 @@ impl CompactionTester { async fn reopen(&mut self) -> Result { // Close the old region. if let Some(base) = self.base.take() { + futures::future::join_all(self.pending_tasks.write().await.drain(..)).await; base.close().await; } @@ -250,7 +263,11 @@ impl CompactionTester { store_config.flush_strategy = self.flush_strategy.clone(); let picker = SimplePicker::default(); - let handler = CompactionHandler::new(picker); + let handler = CompactionHandler { + picker, + #[cfg(test)] + pending_tasks: Arc::new(Default::default()), + }; let config = SchedulerConfig::default(); // Overwrite test compaction scheduler and file purger. store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));