Skip to content

Commit

Permalink
feat: supports CTE query (GreptimeTeam#1674)
Browse files Browse the repository at this point in the history
* feat: supports CTE query

* test: move cte test to standalone
  • Loading branch information
killme2008 authored and paomian committed Oct 19, 2023
1 parent 6b6f99c commit d7a06ba
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 31 deletions.
7 changes: 0 additions & 7 deletions src/datatypes/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ pub enum Error {
#[snafu(display("Invalid timestamp index: {}", index))]
InvalidTimestampIndex { index: usize, location: Location },

#[snafu(display("Duplicate timestamp index, exists: {}, new: {}", exists, new))]
DuplicateTimestampIndex {
exists: usize,
new: usize,
location: Location,
},

#[snafu(display("{}", msg))]
CastType { msg: String, location: Location },

Expand Down
17 changes: 2 additions & 15 deletions src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,7 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
let mut name_to_index = HashMap::with_capacity(column_schemas.len());
let mut timestamp_index = None;
for (index, column_schema) in column_schemas.iter().enumerate() {
if column_schema.is_time_index() {
ensure!(
timestamp_index.is_none(),
error::DuplicateTimestampIndexSnafu {
exists: timestamp_index.unwrap(),
new: index,
}
);
if column_schema.is_time_index() && timestamp_index.is_none() {
timestamp_index = Some(index);
}
let field = Field::try_from(column_schema)?;
Expand Down Expand Up @@ -302,14 +295,8 @@ impl TryFrom<Arc<ArrowSchema>> for Schema {
for (index, column_schema) in column_schemas.iter().enumerate() {
if column_schema.is_time_index() {
validate_timestamp_index(&column_schemas, index)?;
ensure!(
timestamp_index.is_none(),
error::DuplicateTimestampIndexSnafu {
exists: timestamp_index.unwrap(),
new: index,
}
);
timestamp_index = Some(index);
break;
}
}

Expand Down
14 changes: 7 additions & 7 deletions src/query/src/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ async fn resolve_tables(
.context(CatalogSnafu)?;

if let Entry::Vacant(v) = tables.entry(resolved_name.to_string()) {
let table = table_provider
.resolve_table(table_name)
.await
.context(CatalogSnafu)?;

v.insert(table);
// Try our best to resolve the tables here, but we don't return an error if table is not found,
// because the table name may be a temporary name of CTE or view, they can't be found until plan
// execution.
if let Ok(table) = table_provider.resolve_table(table_name).await {
v.insert(table);
}
}
}
Ok(tables)
Expand All @@ -100,7 +100,7 @@ impl ContextProvider for DfContextProviderAdapter {
self.tables
.get(&table_ref.to_string())
.cloned()
.ok_or_else(|| DataFusionError::Plan(format!("table '{}' not found", table_ref)))
.ok_or_else(|| DataFusionError::Plan(format!("Table not found: {}", table_ref)))
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
Expand Down
2 changes: 1 addition & 1 deletion tests/cases/standalone/alter/rename_table.result
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Error: 4001(TableNotFound), Table not found: t

SELECT * FROM t;

Error: 4001(TableNotFound), Table not found: greptime.public.t
Error: 3000(PlanQuery), Error during planning: Table not found: greptime.public.t

CREATE TABLE t(i INTEGER, j BIGINT TIME INDEX);

Expand Down
2 changes: 1 addition & 1 deletion tests/cases/standalone/common/catalog/schema.result
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ Error: 1001(Unsupported), SQL statement is not supported: DROP SCHEMA test_publi

SELECT * FROM test_public_schema.hello;

Error: 4001(TableNotFound), Table not found: greptime.test_public_schema.hello
Error: 3000(PlanQuery), Error during planning: Table not found: greptime.test_public_schema.hello

USE public;

Expand Down
126 changes: 126 additions & 0 deletions tests/cases/standalone/cte/cte.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
create table a(i bigint time index);

Affected Rows: 0

insert into a values (42);

Affected Rows: 1

with cte1 as (Select i as j from a) select * from cte1;

+----+
| j |
+----+
| 42 |
+----+

with cte1 as (Select i as j from a) select x from cte1 t1(x);

+----+
| x |
+----+
| 42 |
+----+

with cte1(xxx) as (Select i as j from a) select xxx from cte1;

+-----+
| xxx |
+-----+
| 42 |
+-----+

with cte1(xxx) as (Select i as j from a) select x from cte1 t1(x);

+----+
| x |
+----+
| 42 |
+----+

with cte1 as (Select i as j from a), cte2 as (select ref.j as k from cte1 as ref), cte3 as (select ref2.j+1 as i from cte1 as ref2) select * from cte2 , cte3;

+----+----+
| k | i |
+----+----+
| 42 | 43 |
+----+----+

with cte1 as (select i as j from a), cte2 as (select ref.j as k from cte1 as ref), cte3 as (select ref2.j+1 as i from cte1 as ref2) select * from cte2 union all select * FROM cte3 order by 1;

+----+
| k |
+----+
| 42 |
| 43 |
+----+

with cte1 as (select 42), cte1 as (select 42) select * FROM cte1;

Error: 3000(PlanQuery), sql parser error: WITH query name "cte1" specified more than once

with cte3 as (select ref2.j as i from cte1 as ref2), cte1 as (Select i as j from a), cte2 as (select ref.j+1 as k from cte1 as ref) select * from cte2 union all select * FROM cte3;

Error: 3000(PlanQuery), Error during planning: Table not found: greptime.public.cte1

with cte1 as (Select i as j from a) select * from cte1 cte11, cte1 cte12;

+----+----+
| j | j |
+----+----+
| 42 | 42 |
+----+----+

with cte1 as (Select i as j from a) select * from cte1 where j = (select max(j) from cte1 as cte2);

+----+
| j |
+----+
| 42 |
+----+

with cte1(x, y) as (select 42 a, 84 b) select zzz, y from cte1 t1(zzz, y);

+-----+----+
| zzz | y |
+-----+----+
| 42 | 84 |
+-----+----+

SELECT 1 UNION ALL (WITH cte AS (SELECT 42) SELECT * FROM cte) order by 1;

+----------+
| Int64(1) |
+----------+
| 1 |
| 42 |
+----------+

WITH RECURSIVE cte(d) AS (
SELECT 1
UNION ALL
(WITH c(d) AS (SELECT * FROM cte)
SELECT d + 1
FROM c
WHERE FALSE
)
)
SELECT max(d) FROM cte;

Error: 3000(PlanQuery), This feature is not implemented: Recursive CTEs are not supported

with cte (a) as (
select 1
)
select
a as alias1,
alias1 as alias2
from cte
where alias2 > 0;

Error: 3000(PlanQuery), No field named alias2. Valid fields are a.

drop table a;

Affected Rows: 1

52 changes: 52 additions & 0 deletions tests/cases/standalone/cte/cte.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
create table a(i bigint time index);

insert into a values (42);

with cte1 as (Select i as j from a) select * from cte1;

with cte1 as (Select i as j from a) select x from cte1 t1(x);

with cte1(xxx) as (Select i as j from a) select xxx from cte1;

with cte1(xxx) as (Select i as j from a) select x from cte1 t1(x);

with cte1 as (Select i as j from a), cte2 as (select ref.j as k from cte1 as ref), cte3 as (select ref2.j+1 as i from cte1 as ref2) select * from cte2 , cte3;

with cte1 as (select i as j from a), cte2 as (select ref.j as k from cte1 as ref), cte3 as (select ref2.j+1 as i from cte1 as ref2) select * from cte2 union all select * FROM cte3 order by 1;

with cte1 as (select 42), cte1 as (select 42) select * FROM cte1;

-- reference to CTE before its actually defined, it's not supported by datafusion
with cte3 as (select ref2.j as i from cte1 as ref2), cte1 as (Select i as j from a), cte2 as (select ref.j+1 as k from cte1 as ref) select * from cte2 union all select * FROM cte3;

with cte1 as (Select i as j from a) select * from cte1 cte11, cte1 cte12;

with cte1 as (Select i as j from a) select * from cte1 where j = (select max(j) from cte1 as cte2);

with cte1(x, y) as (select 42 a, 84 b) select zzz, y from cte1 t1(zzz, y);

SELECT 1 UNION ALL (WITH cte AS (SELECT 42) SELECT * FROM cte) order by 1;

-- Recursive CTEs are not supported in datafusion
WITH RECURSIVE cte(d) AS (
SELECT 1
UNION ALL
(WITH c(d) AS (SELECT * FROM cte)
SELECT d + 1
FROM c
WHERE FALSE
)
)
SELECT max(d) FROM cte;

-- Nested aliases is not supported in datafusion
with cte (a) as (
select 1
)
select
a as alias1,
alias1 as alias2
from cte
where alias2 > 0;

drop table a;
68 changes: 68 additions & 0 deletions tests/cases/standalone/cte/cte_in_cte.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
create table a(i bigint time index);

Affected Rows: 0

insert into a values (42);

Affected Rows: 1

with cte1 as (Select i as j from a) select * from cte1;

+----+
| j |
+----+
| 42 |
+----+

with cte1 as (with b as (Select i as j from a) Select j from b) select x from cte1 t1(x);

+----+
| x |
+----+
| 42 |
+----+

with cte1(xxx) as (with ncte(yyy) as (Select i as j from a) Select yyy from ncte) select xxx from cte1;

+-----+
| xxx |
+-----+
| 42 |
+-----+

with cte1 as (with b as (Select i as j from a) select j from b), cte2 as (with c as (select ref.j+1 as k from cte1 as ref) select k from c) select * from cte1 , cte2;

+----+----+
| j | k |
+----+----+
| 42 | 43 |
+----+----+

with cte1 as (select 42), cte1 as (select 42) select * FROM cte1;

Error: 3000(PlanQuery), sql parser error: WITH query name "cte1" specified more than once

with cte1 as (Select i as j from a) select * from (with cte2 as (select max(j) as j from cte1) select * from cte2) f;

+----+
| j |
+----+
| 42 |
+----+

with cte1 as (Select i as j from a) select * from cte1 where j = (with cte2 as (select max(j) as j from cte1) select j from cte2);

Error: 3001(EngineExecuteQuery), This feature is not implemented: Physical plan does not support logical expression (<subquery>)

with cte as (Select i as j from a) select * from cte where j = (with cte as (select max(j) as j from cte) select j from cte);

Error: 3000(PlanQuery), sql parser error: WITH query name "cte" specified more than once

with cte as (select * from cte) select * from cte;

Error: 3000(PlanQuery), Error during planning: Table not found: greptime.public.cte

drop table a;

Affected Rows: 1

28 changes: 28 additions & 0 deletions tests/cases/standalone/cte/cte_in_cte.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
create table a(i bigint time index);

insert into a values (42);

with cte1 as (Select i as j from a) select * from cte1;

with cte1 as (with b as (Select i as j from a) Select j from b) select x from cte1 t1(x);

with cte1(xxx) as (with ncte(yyy) as (Select i as j from a) Select yyy from ncte) select xxx from cte1;

with cte1 as (with b as (Select i as j from a) select j from b), cte2 as (with c as (select ref.j+1 as k from cte1 as ref) select k from c) select * from cte1 , cte2;

with cte1 as (select 42), cte1 as (select 42) select * FROM cte1;

with cte1 as (Select i as j from a) select * from (with cte2 as (select max(j) as j from cte1) select * from cte2) f;

-- Refer to CTE in subquery expression,
-- this feature is not implemented in datafusion
with cte1 as (Select i as j from a) select * from cte1 where j = (with cte2 as (select max(j) as j from cte1) select j from cte2);

-- Refer to same-named CTE in a subquery expression
-- this feature is not implemented in datafusion
with cte as (Select i as j from a) select * from cte where j = (with cte as (select max(j) as j from cte) select j from cte);

-- self-refer to non-existent cte-
with cte as (select * from cte) select * from cte;

drop table a;

0 comments on commit d7a06ba

Please sign in to comment.