-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(torii-grpc): chunk schema joins to avoid sqlite join limit #2839
Conversation
WalkthroughOhayo, sensei! The pull request introduces a new asynchronous function Changes
Possibly related PRs
Suggested reviewers
Sensei, the changes look solid! Ohayo and happy coding! 🍵🥷 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
crates/torii/core/src/model.rs (1)
414-453
: Ohayo sensei! Consider reducing duplication of collect_columns logic.
This block replicates the existing approach from build_sql_query. You might refactor it into a shared helper function if feasible.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
crates/torii/core/src/model.rs
(1 hunks)crates/torii/grpc/src/server/mod.rs
(3 hunks)
🔇 Additional comments (9)
crates/torii/core/src/model.rs (7)
401-413
: Ohayo sensei! The function signature looks consistent and well-structured.
The parameters for pagination and optional clauses provide good flexibility in constructing customized queries.
455-459
: Ohayo sensei! Good job chunking schemas to avoid hitting SQLite’s join limit.
This approach is valuable for large schema sets, ensuring stable queries.
480-495
: Ohayo sensei! The count query approach is sound.
Generating a subselect to group by id ensures accurate row counting before retrieving paginated data.
497-503
: Ohayo sensei! Validate bind parameter usage.
All bind values are being correctly attached to the count statement, ensuring parameterized queries are secure. Great job for reducing SQL-injection risk.
546-547
: Ohayo sensei! Final return block is tidy.
Everything looks in order, returning both rows and total_count in a single result.
460-479
: Ohayo sensei! Watch out for the potential overhead of large “JOIN” sequences.
Even with chunking, performance can degrade if many tables remain joined in the same query. Monitoring query times or using a better indexing strategy might help.
505-543
: Ohayo sensei! The main query logic is comprehensive yet might lead to repeated partial sets.
You’re correctly reusing bind values, but confirm that chunk iteration yields an intended union of rows rather than duplicates.
✅ Verification successful
Let me gather more context about the chunk handling logic.
Ohayo sensei! Your code is handling chunks perfectly fine!
The implementation is actually quite elegant and safe:
- Schemas are chunked using
MAX_JOINS
(64) to avoid SQLite join limits - Each chunk processes a distinct set of models with no overlap
- Results are properly accumulated using
all_rows.extend(chunk_rows)
- Total count is correctly maintained by adding each
chunk_count
The chunking here is a pagination mechanism for JOIN operations, not for the result set itself, so there's no risk of duplicate rows.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# We'll examine usage of fetch_entities across the codebase to verify we handle chunk results properly
rg -A 5 'fetch_entities'
Length of output: 785
Script:
#!/bin/bash
# Let's examine the full context of fetch_entities implementation
ast-grep --pattern 'pub async fn fetch_entities($$$)'
# Also check if there are any chunk-related variables or logic
rg -B 3 -A 3 'chunk' crates/torii/core/src/model.rs
Length of output: 1347
crates/torii/grpc/src/server/mod.rs (2)
445-448
: Ohayo sensei! This mapping of string models to tag selectors is neat.
Ensure you handle errors if a model name is invalid or doesn't match an existing schema.
499-511
: Ohayo sensei! The call to fetch_entities is consistent with chunking logic.
Just double-check that your limit/offset values align with the chunk handling inside fetch_entities.
crates/torii/grpc/src/server/mod.rs
Outdated
let count_query = format!( | ||
r#" | ||
SELECT COUNT(*) FROM {table} | ||
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id | ||
WHERE {where_clause} | ||
GROUP BY {table}.event_id | ||
"# | ||
); | ||
let total_count = sqlx::query_scalar(&count_query); | ||
for value in &bind_values { | ||
total_count = total_count.bind(value); | ||
} | ||
let total_count = total_count.fetch_one(&self.pool).await?; | ||
if total_count == 0 { | ||
return Ok((Vec::new(), 0)); | ||
} | ||
|
||
let entities = self.fetch_historical_event_messages( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! Reassigning total_count might cause compilation issues.
You must declare total_count as mutable, then rebind it:
-let total_count = sqlx::query_scalar(&count_query);
+let mut total_count = sqlx::query_scalar(&count_query);
Otherwise, overshadowing or immutability will result in a compile error.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let count_query = format!( | |
r#" | |
SELECT COUNT(*) FROM {table} | |
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id | |
WHERE {where_clause} | |
GROUP BY {table}.event_id | |
"# | |
); | |
let total_count = sqlx::query_scalar(&count_query); | |
for value in &bind_values { | |
total_count = total_count.bind(value); | |
} | |
let total_count = total_count.fetch_one(&self.pool).await?; | |
if total_count == 0 { | |
return Ok((Vec::new(), 0)); | |
} | |
let entities = self.fetch_historical_event_messages( | |
let count_query = format!( | |
r#" | |
SELECT COUNT(*) FROM {table} | |
JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id | |
WHERE {where_clause} | |
GROUP BY {table}.event_id | |
"# | |
); | |
let mut total_count = sqlx::query_scalar(&count_query); | |
for value in &bind_values { | |
total_count = total_count.bind(value); | |
} | |
let total_count = total_count.fetch_one(&self.pool).await?; | |
if total_count == 0 { | |
return Ok((Vec::new(), 0)); | |
} | |
let entities = self.fetch_historical_event_messages( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
crates/torii/core/src/model.rs (1)
509-543
: Consider adding batch size configuration for better performance tuning.While the implementation is correct, consider making the chunk size configurable through a configuration parameter rather than hardcoding it. This would allow for performance tuning based on:
- Available system resources
- Specific database workload patterns
- Query complexity
-const MAX_JOINS: usize = 64; +/// Maximum number of joins per chunk, defaulting to SQLite's limit of 64 +const DEFAULT_MAX_JOINS: usize = 64; + +pub struct FetchOptions { + /// Maximum number of joins per chunk + pub max_joins: usize, +} + +impl Default for FetchOptions { + fn default() -> Self { + Self { + max_joins: DEFAULT_MAX_JOINS, + } + } +}crates/torii/grpc/src/server/mod.rs (5)
Line range hint
315-342
: Improve WHERE clause construction readability.Consider extracting the WHERE clause construction into a separate function for better readability and maintainability.
+fn build_where_clause(table: &str, ids: &[String], entity_updated_after: Option<&str>) -> String { + let ids_clause = ids.join(" OR "); + match entity_updated_after { + Some(_) => format!("{} AND {}.updated_at >= ?", ids_clause, table), + None => ids_clause, + } +} + let where_clause = match &hashed_keys { Some(hashed_keys) => { - let ids = - hashed_keys.hashed_keys.iter().map(|_| "{table}.id = ?").collect::<Vec<_>>(); - format!( - "{} {}", - ids.join(" OR "), - if entity_updated_after.is_some() { - format!("AND {table}.updated_at >= ?") - } else { - String::new() - } - ) + let ids = hashed_keys.hashed_keys.iter().map(|_| format!("{}.id = ?", table)).collect::<Vec<_>>(); + build_where_clause(table, &ids, entity_updated_after.as_deref()) } None => { - if entity_updated_after.is_some() { - format!("{table}.updated_at >= ?") - } else { - String::new() - } + entity_updated_after.map_or(String::new(), |_| format!("{}.updated_at >= ?", table)) } };
365-396
: Consider extracting common SQL query patterns.The historical event messages query construction could be refactored to reuse common SQL patterns.
+const HISTORICAL_EVENTS_BASE_QUERY: &str = r#" + SELECT {columns} + FROM {table} + JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id + WHERE {where_clause} + GROUP BY {table}.event_id + ORDER BY {table}.event_id DESC +"#; + -let count_query = format!( - r#" - SELECT COUNT(*) FROM {table} - JOIN {model_relation_table} ON {table}.id = {model_relation_table}.entity_id - WHERE {where_clause} - GROUP BY {table}.event_id -"# -); +let count_query = HISTORICAL_EVENTS_BASE_QUERY.replace("{columns}", "COUNT(*)");
400-414
: Enhance error handling for fetch_entities call.While the parallel processing is efficient, consider adding more specific error handling for different failure scenarios.
-let (rows, total_count) = fetch_entities( +let fetch_result = fetch_entities( &self.pool, &schemas, table, model_relation_table, entity_relation_column, if !where_clause.is_empty() { Some(&where_clause) } else { None }, if !having_clause.is_empty() { Some(&having_clause) } else { None }, order_by, limit, offset, bind_values, ) -.await?; +.await +.map_err(|e| match e { + Error::Sql(sqlx::Error::RowNotFound) => Error::from(QueryError::NoResults), + Error::Sql(sqlx::Error::Database(e)) if e.code().as_deref() == Some("HY000") => { + Error::from(QueryError::JoinLimitExceeded) + } + _ => e, +})?; +let (rows, total_count) = fetch_result;
Line range hint
636-672
: Enhance error messages for schema fetching.Consider adding more descriptive error messages when schemas cannot be fetched.
let schemas = self .model_cache .models(&model_ids) .await? .into_iter() .map(|m| m.schema) - .collect::<Vec<_>>(); + .collect::<Vec<_>>(); + +if schemas.is_empty() { + return Err(Error::from(QueryError::InvalidModel(format!( + "No valid schemas found for models: {:?}", + model_ids.iter().map(|id| format!("{:#x}", id)).collect::<Vec<_>>() + )))); +}
Line range hint
715-734
: Consider adding query optimization hints.For complex composite queries, consider adding SQL hints to help the query optimizer.
let (rows, total_count) = fetch_entities( &self.pool, &schemas, table, model_relation_table, entity_relation_column, - if where_clause.is_empty() { None } else { Some(&where_clause) }, + if where_clause.is_empty() { + None + } else { + // Add INDEXED BY hint for better query performance + Some(&format!("{} INDEXED BY idx_{}_id", where_clause, table)) + }, if having_clause.is_empty() { None } else { Some(&having_clause) }, order_by, limit, offset, bind_values, ) .await?;
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
crates/torii/core/src/model.rs
(1 hunks)crates/torii/grpc/src/server/mod.rs
(11 hunks)
🔇 Additional comments (3)
crates/torii/core/src/model.rs (3)
455-456
: Ohayo sensei! Smart chunking approach to handle SQLite join limits!
The implementation correctly uses SQLite's maximum join limit of 64 and efficiently chunks the schemas using the standard library's chunks iterator.
464-481
: Well-structured query construction with proper SQL safety!
The implementation:
- Properly escapes table and column names with square brackets
- Correctly constructs LEFT JOINs for model schemas
- Maintains proper JOIN conditions
487-505
: Secure and efficient count query implementation!
The code:
- Uses parameterized queries to prevent SQL injection
- Properly accumulates counts across chunks
- Maintains proper grouping for accurate counts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
crates/torii/core/src/model.rs (4)
401-414
: Ohayo sensei! Consider grouping parameters into a dedicated struct.The function has 11 parameters which could be grouped into a dedicated struct for better maintainability and readability.
+#[derive(Debug)] +pub struct EntityQueryParams { + pub schemas: Vec<Ty>, + pub table_name: String, + pub model_relation_table: String, + pub entity_relation_column: String, + pub where_clause: Option<String>, + pub having_clause: Option<String>, + pub order_by: Option<String>, + pub limit: Option<u32>, + pub offset: Option<u32>, + pub bind_values: Vec<String>, +} -pub async fn fetch_entities( - pool: &Pool<sqlx::Sqlite>, - schemas: &[Ty], - table_name: &str, - model_relation_table: &str, - entity_relation_column: &str, - where_clause: Option<&str>, - having_clause: Option<&str>, - order_by: Option<&str>, - limit: Option<u32>, - offset: Option<u32>, - bind_values: Vec<String>, -) -> Result<(Vec<sqlx::sqlite::SqliteRow>, u32), Error> +pub async fn fetch_entities( + pool: &Pool<sqlx::Sqlite>, + params: EntityQueryParams, +) -> Result<(Vec<sqlx::sqlite::SqliteRow>, u32), Error>
456-460
: Document the MAX_JOINS constant with SQLite-specific context.The MAX_JOINS constant is crucial for handling SQLite's join limits, but its significance isn't documented.
- const MAX_JOINS: usize = 64; + /// SQLite has a compile-time limit on the maximum number of joins in a single query. + /// This constant ensures we stay below this limit by chunking our queries. + /// See: https://www.sqlite.org/limits.html + const MAX_JOINS: usize = 64;
488-498
: Optimize the count query for better performance.The count query includes unnecessary columns in the subquery. For counting, we only need the ID column.
- "SELECT COUNT(*) FROM (SELECT {}.id, group_concat({}.model_id) as model_ids FROM [{}] \ + "SELECT COUNT(*) FROM (SELECT {}.id FROM [{}] \ {} {} GROUP BY {}.id {})", table_name, - model_relation_table, table_name,
501-505
: Add error context for database operations.Enhance error handling by providing context about which query failed.
- let chunk_count: u32 = count_stmt.fetch_one(pool).await?; + let chunk_count: u32 = count_stmt + .fetch_one(pool) + .await + .map_err(|e| Error::Database(format!("Failed to execute count query: {}", e)))?;
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/torii/core/src/model.rs
(1 hunks)
🔇 Additional comments (1)
crates/torii/core/src/model.rs (1)
547-548
: LGTM! Return type is well-suited for pagination.
The function returns both the rows and total count, which is perfect for implementing pagination.
// Helper function to collect columns (existing implementation) | ||
fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) { | ||
match ty { | ||
Ty::Struct(s) => { | ||
for child in &s.children { | ||
let new_path = if path.is_empty() { | ||
child.name.clone() | ||
} else { | ||
format!("{}.{}", path, child.name) | ||
}; | ||
collect_columns(table_prefix, &new_path, &child.ty, selections); | ||
} | ||
} | ||
Ty::Tuple(t) => { | ||
for (i, child) in t.iter().enumerate() { | ||
let new_path = | ||
if path.is_empty() { format!("{}", i) } else { format!("{}.{}", path, i) }; | ||
collect_columns(table_prefix, &new_path, child, selections); | ||
} | ||
} | ||
Ty::Enum(e) => { | ||
// Add the enum variant column with table prefix and alias | ||
selections.push(format!("[{table_prefix}].[{path}] as \"{table_prefix}.{path}\"",)); | ||
|
||
// Add columns for each variant's value (if not empty tuple) | ||
for option in &e.options { | ||
if let Ty::Tuple(t) = &option.ty { | ||
if t.is_empty() { | ||
continue; | ||
} | ||
} | ||
let variant_path = format!("{}.{}", path, option.name); | ||
collect_columns(table_prefix, &variant_path, &option.ty, selections); | ||
} | ||
} | ||
Ty::Array(_) | Ty::Primitive(_) | Ty::ByteArray(_) => { | ||
selections.push(format!("[{table_prefix}].[{path}] as \"{table_prefix}.{path}\"",)); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Extract duplicated helper function to a shared location.
The collect_columns
helper function is duplicated from build_sql_query
. Consider extracting it to a module-level function to maintain DRY principles.
+impl ModelSQLReader {
+ fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
+ // Move the existing implementation here
+ }
+}
-fn collect_columns(table_prefix: &str, path: &str, ty: &Ty, selections: &mut Vec<String>) {
- // Remove duplicated implementation
-}
Committable suggestion skipped: line range outside the PR's diff.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2839 +/- ##
==========================================
+ Coverage 56.27% 56.36% +0.08%
==========================================
Files 439 439
Lines 56276 56400 +124
==========================================
+ Hits 31671 31789 +118
- Misses 24605 24611 +6 ☔ View full report in Codecov by Sentry. |
Summary by CodeRabbit
query_by_keys
andquery_by_hashed_keys
methods.