Skip to content

Commit

Permalink
update test case
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Sep 8, 2023
1 parent d45aa6d commit 32265b8
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::Event;
use common_pipeline_core::processors::Processor;

#[async_trait::async_trait]
pub trait AsyncAccumulatingTransform: Send {
const NAME: &'static str;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Interpreter for OptimizeTableInterpreter {
}

impl OptimizeTableInterpreter {
fn build_physical_plan(
pub fn build_physical_plan(
parts: Partitions,
table_info: TableInfo,
snapshot: Arc<TableSnapshot>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ use std::collections::HashSet;
use std::sync::Arc;

use common_base::base::tokio;
use common_catalog::table::CompactTarget;
use common_catalog::table::Table;
use common_exception::Result;
use common_expression::BlockThresholds;
use common_storages_fuse::io::SegmentsIO;
use common_storages_fuse::operations::BlockCompactMutator;
use common_storages_fuse::operations::CompactOptions;
use common_storages_fuse::operations::CompactTaskInfo;
use common_storages_fuse::operations::CompactPartInfo;
use common_storages_fuse::statistics::reducers::merge_statistics_mut;
use common_storages_fuse::FuseTable;
use databend_query::interpreters::OptimizeTableInterpreter;
use databend_query::pipelines::executor::ExecutorSettings;
use databend_query::pipelines::executor::PipelineCompleteExecutor;
use databend_query::schedulers::build_query_pipeline_without_render_result_set;
use databend_query::sessions::QueryContext;
use databend_query::sessions::TableContext;
use databend_query::test_kits::table_test_fixture::execute_command;
Expand Down Expand Up @@ -106,12 +106,25 @@ async fn test_compact() -> Result<()> {
}

async fn do_compact(ctx: Arc<QueryContext>, table: Arc<dyn Table>) -> Result<bool> {
let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let settings = ctx.get_settings();
let mut pipeline = common_pipeline_core::Pipeline::create();
fuse_table
.compact(ctx.clone(), CompactTarget::Blocks, None, &mut pipeline)
.await?;
let res = table.compact_blocks(ctx.clone(), None).await?;

let table_info = table.get_table_info().clone();
let catalog_info = ctx.get_catalog("default").await?.info();
if let Some((parts, snapshot)) = res {
let physical_plan = OptimizeTableInterpreter::build_physical_plan(
parts,
table_info,
snapshot,
catalog_info,
false,
)?;

let build_res =
build_query_pipeline_without_render_result_set(&ctx, &physical_plan, false).await?;
pipeline = build_res.main_pipeline;
};

if !pipeline.is_empty() {
pipeline.set_max_threads(settings.get_max_threads()? as usize);
Expand Down Expand Up @@ -190,21 +203,14 @@ async fn test_safety() -> Result<()> {
merge_statistics_mut(&mut summary, &seg.summary, None);
}

let mut block_ids = HashSet::new();
for seg in &segment_infos {
for b in &seg.blocks {
block_ids.insert(b.location.clone());
}
}

let id = Uuid::new_v4();
let snapshot = TableSnapshot::new(
id,
&None,
None,
schema.as_ref().clone(),
summary,
locations,
locations.clone(),
None,
None,
);
Expand All @@ -224,42 +230,57 @@ async fn test_safety() -> Result<()> {
operator.clone(),
cluster_key_id,
);
block_compact_mutator.target_select().await?;
let selections = block_compact_mutator.compact_tasks;
let mut blocks_number = 0;

let mut block_ids_after_compaction = HashSet::new();
for part in selections.partitions.into_iter() {
let part = CompactTaskInfo::from_part(&part)?;
blocks_number += part.blocks.len();
for b in &part.blocks {
block_ids_after_compaction.insert(b.location.clone());
}
let selections = block_compact_mutator.target_select().await?;
if selections.is_empty() {
eprintln!("no target select");
continue;
}
assert!(!selections.is_lazy);

for unchanged in block_compact_mutator.unchanged_blocks_map.values() {
blocks_number += unchanged.len();
for b in unchanged.values() {
block_ids_after_compaction.insert(b.location.clone());
let mut actual_blocks_number = 0;
let mut compact_segment_indices = HashSet::new();
let mut actual_block_ids = HashSet::new();
for part in selections.partitions.into_iter() {
let part = CompactPartInfo::from_part(&part)?;
match part {
CompactPartInfo::CompactExtraInfo(extra) => {
compact_segment_indices.insert(extra.segment_index);
compact_segment_indices.extend(extra.removed_segment_indexes.iter());
actual_blocks_number += extra.unchanged_blocks.len();
for b in &extra.unchanged_blocks {
actual_block_ids.insert(b.1.location.clone());
}
}
CompactPartInfo::CompactTaskInfo(task) => {
compact_segment_indices.insert(task.index.segment_idx);
actual_blocks_number += task.blocks.len();
for b in &task.blocks {
actual_block_ids.insert(b.location.clone());
}
}
}
}

for unchanged_segment in block_compact_mutator.unchanged_segments_map.values() {
eprintln!("compact_segment_indices: {:?}", compact_segment_indices);
let mut except_blocks_number = 0;
let mut except_block_ids = HashSet::new();
for idx in compact_segment_indices.into_iter() {
let loc = locations.get(idx).unwrap();
let compact_segment = SegmentsIO::read_compact_segment(
ctx.get_data_operator()?.operator(),
unchanged_segment.clone(),
loc.clone(),
TestFixture::default_table_schema(),
false,
)
.await?;
let segment = SegmentInfo::try_from(compact_segment)?;
blocks_number += segment.blocks.len();
except_blocks_number += segment.blocks.len();
for b in &segment.blocks {
block_ids_after_compaction.insert(b.location.clone());
except_block_ids.insert(b.location.clone());
}
}
assert_eq!(number_of_blocks, blocks_number);
assert_eq!(block_ids, block_ids_after_compaction);
assert_eq!(except_blocks_number, actual_blocks_number);
assert_eq!(except_block_ids, actual_block_ids);
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_expression::BlockMetaInfoPtr;
use common_expression::BlockThresholds;
use common_expression::DataBlock;
use common_expression::TableSchemaRef;
use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform;
use common_sql::executor::MutationKind;
use itertools::Itertools;
use log::debug;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_pipeline_core::pipe::PipeItem;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer;

use crate::operations::merge_into::mutator::MatchedAggregator;
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/fuse/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub use common::FillInternalColumnProcessor;
pub use common::TransformSerializeBlock;
pub use compact::CompactOptions;
pub use mutation::BlockCompactMutator;
pub use mutation::CompactTaskInfo;
pub use mutation::CompactPartInfo;
pub use mutation::DeletedSegmentInfo;
pub use mutation::Mutation;
pub use mutation::ReclusterMutator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod segment_compact_mutator;
pub use block_compact_mutator::BlockCompactMutator;
pub use compact_part::CompactExtraInfo;
pub use compact_part::CompactLazyPartInfo;
pub use compact_part::CompactPartInfo;
pub use compact_part::CompactTaskInfo;
pub use compact_source::CompactSource;
pub use segment_compact_mutator::SegmentCompactMutator;
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/mutation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod recluster_mutator;
pub use compact::BlockCompactMutator;
pub use compact::CompactExtraInfo;
pub use compact::CompactLazyPartInfo;
pub use compact::CompactPartInfo;
pub use compact::CompactSource;
pub use compact::CompactTaskInfo;
pub use compact::SegmentCompactMutator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use common_pipeline_core::pipe::PipeItem;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform;
use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer;

use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoOperation;
Expand Down

0 comments on commit 32265b8

Please sign in to comment.