Skip to content

Commit

Permalink
chore(cubesql): Do not call async Node functions while planning
Browse files Browse the repository at this point in the history
  • Loading branch information
MazterQyou committed Oct 9, 2024
1 parent 583e566 commit 81edfd1
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 108 deletions.
79 changes: 75 additions & 4 deletions rust/cubesql/cubesql/src/compile/query_engine.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::compile::engine::df::planner::CubeQueryPlanner;
use std::{backtrace::Backtrace, collections::HashMap, future::Future, pin::Pin, sync::Arc};
use std::{
backtrace::Backtrace, collections::HashMap, future::Future, pin::Pin, sync::Arc,
time::SystemTime,
};

use crate::{
compile::{
Expand Down Expand Up @@ -43,6 +46,7 @@ use datafusion::{
sql::{parser::Statement as DFStatement, planner::SqlToRel},
variable::VarType,
};
use uuid::Uuid;

#[async_trait::async_trait]
pub trait QueryEngine {
Expand Down Expand Up @@ -74,6 +78,11 @@ pub trait QueryEngine {

fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType;

async fn get_compiler_id_and_refresh_cache_if_needed(
&self,
state: Arc<SessionState>,
) -> Result<Uuid, CompilationError>;

async fn plan(
&self,
stmt: Self::AstStatementType,
Expand All @@ -82,6 +91,28 @@ pub trait QueryEngine {
meta: Arc<MetaContext>,
state: Arc<SessionState>,
) -> CompilationResult<QueryPlan> {
let compiler_id = self
.get_compiler_id_and_refresh_cache_if_needed(state.clone())
.await?;

let planning_start = SystemTime::now();
if let Some(span_id) = span_id.as_ref() {
if let Some(auth_context) = state.auth_context() {
self.transport_ref()
.log_load_state(
Some(span_id.clone()),
auth_context,
state.get_load_request_meta(),
"SQL API Query Planning".to_string(),
serde_json::json!({
"query": span_id.query_key.clone(),
}),
)
.await
.map_err(|e| CompilationError::internal(e.to_string()))?;
}
}

let ctx = self.create_session_ctx(state.clone())?;
let cube_ctx = self.create_cube_ctx(state.clone(), meta.clone(), ctx.clone())?;

Expand Down Expand Up @@ -140,7 +171,7 @@ pub trait QueryEngine {
let mut finalized_graph = self
.compiler_cache_ref()
.rewrite(
state.auth_context().unwrap(),
compiler_id,
cube_ctx.clone(),
converter.take_egraph(),
&query_params.unwrap(),
Expand Down Expand Up @@ -186,7 +217,13 @@ pub trait QueryEngine {
let mut rewriter = Rewriter::new(finalized_graph, cube_ctx.clone());

let result = rewriter
.find_best_plan(root, state.auth_context().unwrap(), qtrace, span_id.clone())
.find_best_plan(
root,
compiler_id,
state.auth_context().unwrap(),
qtrace,
span_id.clone(),
)
.await
.map_err(|e| match e.cause {
CubeErrorCauseType::Internal(_) => CompilationError::Internal(
Expand Down Expand Up @@ -233,12 +270,31 @@ pub trait QueryEngine {
// TODO: We should find what optimizers will be safety to use for OLAP queries
guard.optimizer.rules = vec![];
}
if let Some(span_id) = span_id {
if let Some(span_id) = &span_id {
span_id.set_is_data_query(true).await;
}
};

log::debug!("Rewrite: {:#?}", rewrite_plan);

if let Some(span_id) = span_id.as_ref() {
if let Some(auth_context) = state.auth_context() {
self.transport_ref()
.log_load_state(
Some(span_id.clone()),
auth_context,
state.get_load_request_meta(),
"SQL API Query Planning Success".to_string(),
serde_json::json!({
"query": span_id.query_key.clone(),
"duration": planning_start.elapsed().unwrap().as_millis() as u64,
}),
)
.await
.map_err(|e| CompilationError::internal(e.to_string()))?;
}
}

let rewrite_plan = Self::evaluate_wrapped_sql(
self.transport_ref().clone(),
Arc::new(state.get_load_request_meta()),
Expand Down Expand Up @@ -493,6 +549,21 @@ impl QueryEngine for SqlQueryEngine {
fn sanitize_statement(&self, stmt: &Self::AstStatementType) -> Self::AstStatementType {
SensitiveDataSanitizer::new().replace(stmt.clone())
}

async fn get_compiler_id_and_refresh_cache_if_needed(
&self,
state: Arc<SessionState>,
) -> Result<Uuid, CompilationError> {
self.compiler_cache_ref()
.get_compiler_id_and_refresh_if_needed(
state.auth_context().ok_or_else(|| {
CompilationError::internal("Unable to get auth context".to_string())
})?,
state.protocol.clone(),
)
.await
.map_err(|e| CompilationError::internal(e.to_string()))
}
}

fn is_olap_query(parent: &LogicalPlan) -> Result<bool, CompilationError> {
Expand Down
8 changes: 5 additions & 3 deletions rust/cubesql/cubesql/src/compile/rewrite/rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{
sync::Arc,
time::Duration,
};
use uuid::Uuid;

pub struct Rewriter {
graph: EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>,
Expand Down Expand Up @@ -229,7 +230,7 @@ impl Rewriter {

pub async fn run_rewrite_to_completion(
&mut self,
auth_context: AuthContextRef,
compiler_id: Uuid,
qtrace: &mut Option<Qtrace>,
) -> Result<EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, CubeError> {
let cube_context = self.cube_context.clone();
Expand All @@ -243,7 +244,7 @@ impl Rewriter {
.server
.compiler_cache
.rewrite_rules(
auth_context.clone(),
compiler_id,
cube_context.session_state.protocol.clone(),
false,
)
Expand Down Expand Up @@ -311,6 +312,7 @@ impl Rewriter {
pub async fn find_best_plan(
&mut self,
root: Id,
compiler_id: Uuid,
auth_context: AuthContextRef,
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
Expand All @@ -326,7 +328,7 @@ impl Rewriter {
.server
.compiler_cache
.rewrite_rules(
auth_context.clone(),
compiler_id,
cube_context.session_state.protocol.clone(),
true,
)
Expand Down
48 changes: 3 additions & 45 deletions rust/cubesql/cubesql/src/compile/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::compile::{
StatusFlags,
};
use sqlparser::ast;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::SystemTime};
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};

use crate::{
compile::{
Expand Down Expand Up @@ -61,50 +61,8 @@ impl QueryRouter {
qtrace: &mut Option<Qtrace>,
span_id: Option<Arc<SpanId>>,
) -> CompilationResult<QueryPlan> {
let planning_start = SystemTime::now();
if let Some(span_id) = span_id.as_ref() {
if let Some(auth_context) = self.state.auth_context() {
self.session_manager
.server
.transport
.log_load_state(
Some(span_id.clone()),
auth_context,
self.state.get_load_request_meta(),
"SQL API Query Planning".to_string(),
serde_json::json!({
"query": span_id.query_key.clone(),
}),
)
.await
.map_err(|e| CompilationError::internal(e.to_string()))?;
}
}
let result = self
.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
.await?;

if let Some(span_id) = span_id.as_ref() {
if let Some(auth_context) = self.state.auth_context() {
self.session_manager
.server
.transport
.log_load_state(
Some(span_id.clone()),
auth_context,
self.state.get_load_request_meta(),
"SQL API Query Planning Success".to_string(),
serde_json::json!({
"query": span_id.query_key.clone(),
"duration": planning_start.elapsed().unwrap().as_millis() as u64,
}),
)
.await
.map_err(|e| CompilationError::internal(e.to_string()))?;
}
}

return Ok(result);
self.create_df_logical_plan(stmt.clone(), qtrace, span_id.clone())
.await
}

pub async fn plan(
Expand Down
Loading

0 comments on commit 81edfd1

Please sign in to comment.