Skip to content

Commit

Permalink
refactor: clustering_information support specify cluster keys (#15783)
Browse files Browse the repository at this point in the history
improve clustering_information
  • Loading branch information
zhyass authored Jun 13, 2024
1 parent d807ad1 commit b416ac8
Show file tree
Hide file tree
Showing 17 changed files with 455 additions and 227 deletions.
44 changes: 22 additions & 22 deletions scripts/ci/ci-setup-chaos-meta.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ k3d cluster create --config ./scripts/ci/meta-chaos/k3d.yaml meta-chaos
echo "127.0.0.1 k3d-registry.localhost" | sudo tee -a /etc/hosts

if kubectl version --client; then
echo "kubectl client already installed"
echo "kubectl client already installed"
else
echo "install kubectl client"
curl -LO "https://dl.k8s.io/release/v1.29.5/bin/linux/amd64/kubectl"
chmod +x kubectl
sudo mv kubectl /usr/local/bin/
echo "install kubectl client"
curl -LO "https://dl.k8s.io/release/v1.29.5/bin/linux/amd64/kubectl"
chmod +x kubectl
sudo mv kubectl /usr/local/bin/
fi

if helm version; then
echo "helm already installed"
echo "helm already installed"
else
echo "install helm"
curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3
chmod 700 get_helm.sh
./get_helm.sh
echo "install helm"
curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3
chmod 700 get_helm.sh
./get_helm.sh
fi

echo "make databend-meta image"
Expand Down Expand Up @@ -60,19 +60,19 @@ kubectl delete pvc --namespace databend data-test-databend-meta-0 data-test-data

helm repo add databend https://charts.databend.rs
helm install test databend/databend-meta \
--namespace databend \
--create-namespace \
--values scripts/ci/meta-chaos/meta-ha.yaml \
--set image.repository=k3d-registry.localhost:5111/databend-meta \
--set image.tag=meta-chaos \
--wait || true
--namespace databend \
--create-namespace \
--values scripts/ci/meta-chaos/meta-ha.yaml \
--set image.repository=k3d-registry.localhost:5111/databend-meta \
--set image.tag=meta-chaos \
--wait || true

sleep 10
echo "check if databend-meta nodes is ready"
kubectl -n databend wait \
--for=condition=ready pod \
-l app.kubernetes.io/name=databend-meta \
--timeout 120s || true
--for=condition=ready pod \
-l app.kubernetes.io/name=databend-meta \
--timeout 120s || true

kubectl get pods -A -o wide

Expand All @@ -83,9 +83,9 @@ kubectl apply -f scripts/ci/meta-chaos/verifier.yaml

echo "check if databend-metaverifier node is ready"
kubectl -n databend wait \
--for=condition=ready pod \
-l app.kubernetes.io/name=databend-metaverifier \
--timeout 120s || true
--for=condition=ready pod \
-l app.kubernetes.io/name=databend-metaverifier \
--timeout 120s || true

echo "logs databend-metaverifier.."
kubectl logs databend-metaverifier --namespace databend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use databend_common_base::base::tokio;
use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::table_args::TableArgs;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
use databend_common_expression::SendableDataBlockStream;
use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan;
use databend_common_storages_fuse::table_functions::ClusteringInformationTable;
use databend_query::sessions::QueryContext;
use databend_query::stream::ReadDataBlockStream;
use databend_query::test_kits::*;
use tokio_stream::StreamExt;

Expand All @@ -41,43 +30,21 @@ async fn test_clustering_information_table_read() -> Result<()> {
fixture.create_default_database().await?;
fixture.create_default_table().await?;

// func args
let arg_db = Scalar::String(db.clone());
let arg_tbl = Scalar::String(tbl.clone());

{
let expected = vec![
"+----------+----------+----------+----------+----------+----------+----------+",
"| Column 0 | Column 1 | Column 2 | Column 3 | Column 4 | Column 5 | Column 6 |",
"+----------+----------+----------+----------+----------+----------+----------+",
"| '(id)' | 0 | 0 | 0 | 0 | 0 | '{}' |",
"+----------+----------+----------+----------+----------+----------+----------+",
];

expects_ok(
"empty_data_set",
test_drive_clustering_information(
TableArgs::new_positioned(vec![arg_db.clone(), arg_tbl.clone()]),
ctx.clone(),
)
.await,
expected,
)
.await?;
}

{
let qry = format!("insert into {}.{} values(1, (2, 3)),(2, (4, 6))", db, tbl);
let _ = execute_query(ctx.clone(), qry.as_str()).await?;
let expected = vec![
"+----------+----------+----------+----------+----------+----------+---------------+",
"| Column 0 | Column 1 | Column 2 | Column 3 | Column 4 | Column 5 | Column 6 |",
"+----------+----------+----------+----------+----------+----------+---------------+",
"| '(id)' | 1 | 0 | 0 | 0 | 1 | '{\"00001\":1}' |",
"+----------+----------+----------+----------+----------+----------+---------------+",
"+----------+----------+----------+----------+----------+---------------+",
"| Column 0 | Column 1 | Column 2 | Column 3 | Column 4 | Column 5 |",
"+----------+----------+----------+----------+----------+---------------+",
"| '(id)' | 1 | 0 | 0 | 1 | '{\"00001\":1}' |",
"+----------+----------+----------+----------+----------+---------------+",
];

let qry = format!("select * from clustering_information('{}', '{}')", db, tbl);
let qry = format!(
"select * exclude(timestamp) from clustering_information('{}', '{}')",
db, tbl
);

expects_ok(
"clustering_information",
Expand Down Expand Up @@ -107,19 +74,3 @@ async fn test_clustering_information_table_read() -> Result<()> {

Ok(())
}

async fn test_drive_clustering_information(
tbl_args: TableArgs,
ctx: Arc<QueryContext>,
) -> Result<SendableDataBlockStream> {
let func = ClusteringInformationTable::create("system", "clustering_information", 1, tbl_args)?;
let source_plan = func
.clone()
.as_table()
.read_plan(ctx.clone(), Some(PushDownInfo::default()), true)
.await?;
ctx.set_partitions(source_plan.parts.clone())?;
func.as_table()
.read_data_block_stream(ctx, &source_plan)
.await
}
78 changes: 77 additions & 1 deletion src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ pub fn parse_cluster_keys(
&name_resolution_ctx,
metadata,
&[],
false,
true,
)?;

let tokens = tokenize_sql(cluster_key_str)?;
Expand Down Expand Up @@ -462,6 +462,82 @@ pub fn parse_cluster_keys(
Ok(exprs)
}

pub fn analyze_cluster_keys(
ctx: Arc<dyn TableContext>,
table_meta: Arc<dyn Table>,
sql: &str,
) -> Result<(String, Vec<Expr>)> {
let sql_dialect = ctx.get_settings().get_sql_dialect().unwrap_or_default();
let tokens = tokenize_sql(sql)?;
let mut ast_exprs = parse_comma_separated_exprs(&tokens, sql_dialect)?;
// unwrap tuple.
if ast_exprs.len() == 1 {
if let AExpr::Tuple { exprs, .. } = &ast_exprs[0] {
ast_exprs = exprs.clone();
}
}

let (mut bind_context, metadata) = bind_one_table(table_meta)?;
let settings = Settings::create(Tenant::new_literal("dummy"));
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let mut type_checker = TypeChecker::try_create(
&mut bind_context,
ctx,
&name_resolution_ctx,
metadata,
&[],
true,
)?;

let mut exprs = Vec::with_capacity(ast_exprs.len());
let mut cluster_keys = Vec::with_capacity(exprs.len());
for ast in ast_exprs {
let (scalar, _) = *type_checker.resolve(&ast)?;
if scalar.used_columns().len() != 1 || !scalar.evaluable() {
return Err(ErrorCode::InvalidClusterKeys(format!(
"Cluster by expression `{:#}` is invalid",
ast
)));
}

let expr = scalar.as_expr()?.project_column_ref(|col| col.index);
if !expr.is_deterministic(&BUILTIN_FUNCTIONS) {
return Err(ErrorCode::InvalidClusterKeys(format!(
"Cluster by expression `{:#}` is not deterministic",
ast
)));
}

let data_type = expr.data_type().remove_nullable();
if !matches!(
data_type,
DataType::Number(_)
| DataType::String
| DataType::Timestamp
| DataType::Date
| DataType::Boolean
| DataType::Decimal(_)
) {
return Err(ErrorCode::InvalidClusterKeys(format!(
"Unsupported data type '{}' for cluster by expression `{:#}`",
data_type, ast
)));
}

exprs.push(expr);

let mut cluster_by = ast.clone();
let mut normalizer = IdentifierNormalizer {
ctx: &name_resolution_ctx,
};
cluster_by.drive_mut(&mut normalizer);
cluster_keys.push(format!("{:#}", &cluster_by));
}

let cluster_by_str = format!("({})", cluster_keys.join(", "));
Ok((cluster_by_str, exprs))
}

#[derive(Default)]
struct DummyTable {
info: TableInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,13 @@ impl ReclusterMutator {
});
}

let mut selected_idx = IndexSet::new();
if !unfinished_parts.is_empty() {
warn!("Recluster: unfinished_parts is not empty after calculate the blocks overlaps");
// todo: re-sort the unfinished parts firstly.
// re-sort the unfinished parts firstly.
unfinished_parts.keys().for_each(|idx| {
selected_idx.insert(*idx);
});
}

let sum_depth: usize = block_depths.iter().sum();
Expand All @@ -585,7 +589,6 @@ impl ReclusterMutator {
(10000.0 * sum_depth as f64 / block_depths.len() as f64).round() / 10000.0;

// find the max point, gather the indices.
let mut selected_idx = IndexSet::new();
if average_depth > depth_threshold {
point_overlaps[max_point].iter().for_each(|idx| {
selected_idx.insert(*idx);
Expand Down
Loading

0 comments on commit b416ac8

Please sign in to comment.