diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs index 209474572cce8..951633a4471cc 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs @@ -21,6 +21,7 @@ use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::Processor; + #[async_trait::async_trait] pub trait AsyncAccumulatingTransform: Send { const NAME: &'static str; diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 5867d038af82c..b1e091d4bb059 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -82,7 +82,7 @@ impl Interpreter for OptimizeTableInterpreter { } impl OptimizeTableInterpreter { - fn build_physical_plan( + pub fn build_physical_plan( parts: Partitions, table_info: TableInfo, snapshot: Arc, diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 830c7193a966a..45fc60e19256b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -16,18 +16,18 @@ use std::collections::HashSet; use std::sync::Arc; use common_base::base::tokio; -use common_catalog::table::CompactTarget; use common_catalog::table::Table; use common_exception::Result; use common_expression::BlockThresholds; use common_storages_fuse::io::SegmentsIO; use common_storages_fuse::operations::BlockCompactMutator; use common_storages_fuse::operations::CompactOptions; -use common_storages_fuse::operations::CompactTaskInfo; +use common_storages_fuse::operations::CompactPartInfo; use common_storages_fuse::statistics::reducers::merge_statistics_mut; -use common_storages_fuse::FuseTable; +use databend_query::interpreters::OptimizeTableInterpreter; use databend_query::pipelines::executor::ExecutorSettings; use databend_query::pipelines::executor::PipelineCompleteExecutor; +use databend_query::schedulers::build_query_pipeline_without_render_result_set; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; use databend_query::test_kits::table_test_fixture::execute_command; @@ -106,12 +106,25 @@ async fn test_compact() -> Result<()> { } async fn do_compact(ctx: Arc, table: Arc) -> Result { - let fuse_table = FuseTable::try_from_table(table.as_ref())?; let settings = ctx.get_settings(); let mut pipeline = common_pipeline_core::Pipeline::create(); - fuse_table - .compact(ctx.clone(), CompactTarget::Blocks, None, &mut pipeline) - .await?; + let res = table.compact_blocks(ctx.clone(), None).await?; + + let table_info = table.get_table_info().clone(); + let catalog_info = ctx.get_catalog("default").await?.info(); + if let Some((parts, snapshot)) = res { + let physical_plan = OptimizeTableInterpreter::build_physical_plan( + parts, + table_info, + snapshot, + catalog_info, + false, + )?; + + let build_res = + build_query_pipeline_without_render_result_set(&ctx, &physical_plan, false).await?; + pipeline = build_res.main_pipeline; + }; if !pipeline.is_empty() { pipeline.set_max_threads(settings.get_max_threads()? as usize); @@ -190,13 +203,6 @@ async fn test_safety() -> Result<()> { merge_statistics_mut(&mut summary, &seg.summary, None); } - let mut block_ids = HashSet::new(); - for seg in &segment_infos { - for b in &seg.blocks { - block_ids.insert(b.location.clone()); - } - } - let id = Uuid::new_v4(); let snapshot = TableSnapshot::new( id, @@ -204,7 +210,7 @@ async fn test_safety() -> Result<()> { None, schema.as_ref().clone(), summary, - locations, + locations.clone(), None, None, ); @@ -224,42 +230,57 @@ async fn test_safety() -> Result<()> { operator.clone(), cluster_key_id, ); - block_compact_mutator.target_select().await?; - let selections = block_compact_mutator.compact_tasks; - let mut blocks_number = 0; - - let mut block_ids_after_compaction = HashSet::new(); - for part in selections.partitions.into_iter() { - let part = CompactTaskInfo::from_part(&part)?; - blocks_number += part.blocks.len(); - for b in &part.blocks { - block_ids_after_compaction.insert(b.location.clone()); - } + let selections = block_compact_mutator.target_select().await?; + if selections.is_empty() { + eprintln!("no target select"); + continue; } + assert!(!selections.is_lazy); - for unchanged in block_compact_mutator.unchanged_blocks_map.values() { - blocks_number += unchanged.len(); - for b in unchanged.values() { - block_ids_after_compaction.insert(b.location.clone()); + let mut actual_blocks_number = 0; + let mut compact_segment_indices = HashSet::new(); + let mut actual_block_ids = HashSet::new(); + for part in selections.partitions.into_iter() { + let part = CompactPartInfo::from_part(&part)?; + match part { + CompactPartInfo::CompactExtraInfo(extra) => { + compact_segment_indices.insert(extra.segment_index); + compact_segment_indices.extend(extra.removed_segment_indexes.iter()); + actual_blocks_number += extra.unchanged_blocks.len(); + for b in &extra.unchanged_blocks { + actual_block_ids.insert(b.1.location.clone()); + } + } + CompactPartInfo::CompactTaskInfo(task) => { + compact_segment_indices.insert(task.index.segment_idx); + actual_blocks_number += task.blocks.len(); + for b in &task.blocks { + actual_block_ids.insert(b.location.clone()); + } + } } } - for unchanged_segment in block_compact_mutator.unchanged_segments_map.values() { + eprintln!("compact_segment_indices: {:?}", compact_segment_indices); + let mut except_blocks_number = 0; + let mut except_block_ids = HashSet::new(); + for idx in compact_segment_indices.into_iter() { + let loc = locations.get(idx).unwrap(); let compact_segment = SegmentsIO::read_compact_segment( ctx.get_data_operator()?.operator(), - unchanged_segment.clone(), + loc.clone(), TestFixture::default_table_schema(), false, ) .await?; let segment = SegmentInfo::try_from(compact_segment)?; - blocks_number += segment.blocks.len(); + except_blocks_number += segment.blocks.len(); for b in &segment.blocks { - block_ids_after_compaction.insert(b.location.clone()); + except_block_ids.insert(b.location.clone()); } } - assert_eq!(number_of_blocks, blocks_number); - assert_eq!(block_ids, block_ids_after_compaction); + assert_eq!(except_blocks_number, actual_blocks_number); + assert_eq!(except_block_ids, actual_block_ids); } Ok(()) diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index 44eb7caab6ba8..683345af08d9e 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -26,7 +26,7 @@ use common_expression::BlockMetaInfoPtr; use common_expression::BlockThresholds; use common_expression::DataBlock; use common_expression::TableSchemaRef; -use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; use common_sql::executor::MutationKind; use itertools::Itertools; use log::debug; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs index 7dc70d903f47f..ca18f7a0f18da 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs @@ -18,7 +18,7 @@ use common_pipeline_core::pipe::PipeItem; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; -use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; use crate::operations::merge_into::mutator::MatchedAggregator; diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 9f7bd6e5356d8..4d63d9fc8f533 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -40,7 +40,7 @@ pub use common::FillInternalColumnProcessor; pub use common::TransformSerializeBlock; pub use compact::CompactOptions; pub use mutation::BlockCompactMutator; -pub use mutation::CompactTaskInfo; +pub use mutation::CompactPartInfo; pub use mutation::DeletedSegmentInfo; pub use mutation::Mutation; pub use mutation::ReclusterMutator; diff --git a/src/query/storages/fuse/src/operations/mutation/compact/mod.rs b/src/query/storages/fuse/src/operations/mutation/compact/mod.rs index cc7e9e05c1822..f18e8a114f9ea 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/mod.rs @@ -20,6 +20,7 @@ mod segment_compact_mutator; pub use block_compact_mutator::BlockCompactMutator; pub use compact_part::CompactExtraInfo; pub use compact_part::CompactLazyPartInfo; +pub use compact_part::CompactPartInfo; pub use compact_part::CompactTaskInfo; pub use compact_source::CompactSource; pub use segment_compact_mutator::SegmentCompactMutator; diff --git a/src/query/storages/fuse/src/operations/mutation/mod.rs b/src/query/storages/fuse/src/operations/mutation/mod.rs index 0ca5618cac499..5af83bf7266a5 100644 --- a/src/query/storages/fuse/src/operations/mutation/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mod.rs @@ -22,6 +22,7 @@ mod recluster_mutator; pub use compact::BlockCompactMutator; pub use compact::CompactExtraInfo; pub use compact::CompactLazyPartInfo; +pub use compact::CompactPartInfo; pub use compact::CompactSource; pub use compact::CompactTaskInfo; pub use compact::SegmentCompactMutator; diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs index 0b3f0b24116fa..b057482104ee9 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs @@ -18,7 +18,7 @@ use common_pipeline_core::pipe::PipeItem; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; -use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoOperation;