Skip to content

Commit

Permalink
fix: wait for compaction task to finish (GreptimeTeam#1783)
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r authored Jun 16, 2023
1 parent 1eeb5b4 commit 69854c0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 13 deletions.
2 changes: 1 addition & 1 deletion src/datanode/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl Instance {
fn create_compaction_scheduler<S: LogStore>(opts: &DatanodeOptions) -> CompactionSchedulerRef<S> {
let picker = SimplePicker::default();
let config = SchedulerConfig::from(opts);
let handler = CompactionHandler::new(picker);
let handler = CompactionHandler { picker };
let scheduler = LocalScheduler::new(config, handler);
Arc::new(scheduler)
}
Expand Down
13 changes: 6 additions & 7 deletions src/storage/src/compaction/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,8 @@ impl<S: LogStore> CompactionRequestImpl<S> {

pub struct CompactionHandler<P> {
pub picker: P,
}

impl<P> CompactionHandler<P> {
pub fn new(picker: P) -> Self {
Self { picker }
}
#[cfg(test)]
pub pending_tasks: Arc<tokio::sync::RwLock<Vec<tokio::task::JoinHandle<()>>>>,
}

#[async_trait::async_trait]
Expand All @@ -111,7 +107,7 @@ where

debug!("Compaction task, region: {:?}, task: {:?}", region_id, task);
// TODO(hl): we need to keep a track of task handle here to allow task cancellation.
common_runtime::spawn_bg(async move {
let _handle = common_runtime::spawn_bg(async move {
if let Err(e) = task.run().await {
// TODO(hl): maybe resubmit compaction task on failure?
error!(e; "Failed to compact region: {:?}", region_id);
Expand All @@ -128,6 +124,9 @@ where
finish_notifier.notify_one();
});

#[cfg(test)]
self.pending_tasks.write().await.push(_handle);

Ok(())
}
}
27 changes: 22 additions & 5 deletions src/storage/src/region/tests/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use object_store::services::{Fs, S3};
use object_store::ObjectStore;
use store_api::storage::{FlushContext, FlushReason, OpenOptions, Region, WriteResponse};
use tokio::sync::Notify;
use tokio::sync::{Notify, RwLock};

use crate::compaction::{CompactionHandler, SimplePicker};
use crate::config::EngineConfig;
Expand Down Expand Up @@ -74,7 +74,11 @@ async fn create_region_for_compaction<
purge_handler: H,
flush_strategy: FlushStrategyRef,
s3_bucket: Option<String>,
) -> (RegionImpl<RaftEngineLogStore>, ObjectStore) {
) -> (
RegionImpl<RaftEngineLogStore>,
ObjectStore,
Arc<tokio::sync::RwLock<Vec<tokio::task::JoinHandle<()>>>>,
) {
let metadata = tests::new_metadata(REGION_NAME);

let object_store = new_object_store(store_dir, s3_bucket);
Expand All @@ -90,7 +94,12 @@ async fn create_region_for_compaction<
store_config.flush_strategy = flush_strategy;

let picker = SimplePicker::default();
let handler = CompactionHandler::new(picker);
let pending_compaction_tasks = Arc::new(RwLock::new(vec![]));
let handler = CompactionHandler {
picker,
#[cfg(test)]
pending_tasks: pending_compaction_tasks.clone(),
};
let config = SchedulerConfig::default();
// Overwrite test compaction scheduler and file purger.
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));
Expand All @@ -104,6 +113,7 @@ async fn create_region_for_compaction<
(
RegionImpl::create(metadata, store_config).await.unwrap(),
object_store,
pending_compaction_tasks,
)
}

Expand Down Expand Up @@ -154,6 +164,7 @@ struct CompactionTester {
store_dir: String,
engine_config: EngineConfig,
flush_strategy: FlushStrategyRef,
pending_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
}

impl CompactionTester {
Expand All @@ -164,7 +175,7 @@ impl CompactionTester {
s3_bucket: Option<String>,
) -> CompactionTester {
let purge_handler = MockFilePurgeHandler::default();
let (region, object_store) = create_region_for_compaction(
let (region, object_store, pending_tasks) = create_region_for_compaction(
store_dir,
engine_config.clone(),
purge_handler.clone(),
Expand All @@ -180,6 +191,7 @@ impl CompactionTester {
store_dir: store_dir.to_string(),
engine_config,
flush_strategy,
pending_tasks,
}
}

Expand Down Expand Up @@ -231,6 +243,7 @@ impl CompactionTester {
async fn reopen(&mut self) -> Result<bool> {
// Close the old region.
if let Some(base) = self.base.take() {
futures::future::join_all(self.pending_tasks.write().await.drain(..)).await;
base.close().await;
}

Expand All @@ -250,7 +263,11 @@ impl CompactionTester {
store_config.flush_strategy = self.flush_strategy.clone();

let picker = SimplePicker::default();
let handler = CompactionHandler::new(picker);
let handler = CompactionHandler {
picker,
#[cfg(test)]
pending_tasks: Arc::new(Default::default()),
};
let config = SchedulerConfig::default();
// Overwrite test compaction scheduler and file purger.
store_config.compaction_scheduler = Arc::new(LocalScheduler::new(config, handler));
Expand Down

0 comments on commit 69854c0

Please sign in to comment.