Skip to content

Commit

Permalink
feat: Add column supports at first or after the existing columns (Gre…
Browse files Browse the repository at this point in the history
…ptimeTeam#1621)

* feat: Add column supports at first or after the existing columns

* Update src/common/query/Cargo.toml

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
  • Loading branch information
lizhemingi and killme2008 authored Jun 1, 2023
1 parent 70e17ea commit 5467ea4
Show file tree
Hide file tree
Showing 19 changed files with 798 additions and 47 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ pub mod prometheus {

pub mod v1;

pub use greptime_proto;
pub use prost::DecodeError;
101 changes: 100 additions & 1 deletion src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::add_column::location::LocationType;
use api::v1::add_column::Location;
use api::v1::alter_expr::Kind;
use api::v1::{column_def, AlterExpr, CreateTableExpr, DropColumns, RenameTable};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::AddColumnLocation;
use datatypes::schema::{ColumnSchema, RawSchema};
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
Expand All @@ -24,9 +27,12 @@ use table::requests::{

use crate::error::{
ColumnNotFoundSnafu, InvalidColumnDefSnafu, MissingFieldSnafu, MissingTimestampColumnSnafu,
Result, UnrecognizedTableOptionSnafu,
Result, UnknownLocationTypeSnafu, UnrecognizedTableOptionSnafu,
};

const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;

/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
Expand All @@ -50,6 +56,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
Ok(AddColumnRequest {
column_schema: schema,
is_key: ac.is_key,
location: parse_location(ac.location)?,
})
})
.collect::<Result<Vec<_>>>()?;
Expand Down Expand Up @@ -186,8 +193,26 @@ pub fn create_expr_to_request(
})
}

fn parse_location(location: Option<Location>) -> Result<Option<AddColumnLocation>> {
match location {
Some(Location {
location_type: LOCATION_TYPE_FIRST,
..
}) => Ok(Some(AddColumnLocation::First)),
Some(Location {
location_type: LOCATION_TYPE_AFTER,
after_cloumn_name,
}) => Ok(Some(AddColumnLocation::After {
column_name: after_cloumn_name,
})),
Some(Location { location_type, .. }) => UnknownLocationTypeSnafu { location_type }.fail(),
None => Ok(None),
}
}

#[cfg(test)]
mod tests {
use api::v1::add_column::location::LocationType;
use api::v1::{AddColumn, AddColumns, ColumnDataType, ColumnDef, DropColumn};
use datatypes::prelude::ConcreteDataType;

Expand Down Expand Up @@ -229,6 +254,80 @@ mod tests {
ConcreteDataType::float64_datatype(),
add_column.column_schema.data_type
);
assert_eq!(None, add_column.location);
}

#[test]
fn test_alter_expr_with_location_to_request() {
let expr = AlterExpr {
catalog_name: "".to_string(),
schema_name: "".to_string(),
table_name: "monitor".to_string(),

kind: Some(Kind::AddColumns(AddColumns {
add_columns: vec![
AddColumn {
column_def: Some(ColumnDef {
name: "mem_usage".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
default_constraint: vec![],
}),
is_key: false,
location: Some(Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
}),
},
AddColumn {
column_def: Some(ColumnDef {
name: "cpu_usage".to_string(),
datatype: ColumnDataType::Float64 as i32,
is_nullable: false,
default_constraint: vec![],
}),
is_key: false,
location: Some(Location {
location_type: LocationType::After.into(),
after_cloumn_name: "ts".to_string(),
}),
},
],
})),
};

let alter_request = alter_expr_to_request(expr).unwrap();
assert_eq!(alter_request.catalog_name, "");
assert_eq!(alter_request.schema_name, "");
assert_eq!("monitor".to_string(), alter_request.table_name);

let mut add_columns = match alter_request.alter_kind {
AlterKind::AddColumns { columns } => columns,
_ => unreachable!(),
};

let add_column = add_columns.pop().unwrap();
assert!(!add_column.is_key);
assert_eq!("cpu_usage", add_column.column_schema.name);
assert_eq!(
ConcreteDataType::float64_datatype(),
add_column.column_schema.data_type
);
assert_eq!(
Some(AddColumnLocation::After {
column_name: "ts".to_string()
}),
add_column.location
);

let add_column = add_columns.pop().unwrap();
assert!(!add_column.is_key);
assert_eq!("mem_usage", add_column.column_schema.name);
assert_eq!(
ConcreteDataType::float64_datatype(),
add_column.column_schema.data_type
);
assert_eq!(Some(AddColumnLocation::First), add_column.location);
}

#[test]
Expand Down
12 changes: 9 additions & 3 deletions src/common/grpc-expr/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ pub enum Error {

#[snafu(display("The column name already exists, column: {}", column))]
ColumnAlreadyExists { column: String, location: Location },

#[snafu(display("Unknown location type: {}", location_type))]
UnknownLocationType {
location_type: i32,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -103,9 +109,9 @@ impl ErrorExt for Error {
Error::MissingField { .. } => StatusCode::InvalidArguments,
Error::InvalidColumnDef { source, .. } => source.status_code(),
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
Error::UnexpectedValuesLength { .. } | Error::ColumnAlreadyExists { .. } => {
StatusCode::InvalidArguments
}
Error::UnexpectedValuesLength { .. }
| Error::ColumnAlreadyExists { .. }
| Error::UnknownLocationType { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
api = { path = "../../api" }
async-trait.workspace = true
common-error = { path = "../error" }
common-recordbatch = { path = "../recordbatch" }
Expand All @@ -13,6 +14,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes = { path = "../../datatypes" }
serde.workspace = true
snafu.workspace = true
statrs = "0.16"

Expand Down
24 changes: 24 additions & 0 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

use std::fmt::{Debug, Formatter};

use api::greptime_proto::v1::add_column::location::LocationType;
use api::greptime_proto::v1::add_column::Location;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use serde::{Deserialize, Serialize};

pub mod columnar_value;
pub mod error;
Expand Down Expand Up @@ -44,3 +47,24 @@ impl Debug for Output {
}

pub use datafusion::physical_plan::ExecutionPlan as DfPhysicalPlan;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum AddColumnLocation {
First,
After { column_name: String },
}

impl From<&AddColumnLocation> for Location {
fn from(value: &AddColumnLocation) -> Self {
match value {
AddColumnLocation::First => Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
},
AddColumnLocation::After { column_name } => Location {
location_type: LocationType::After.into(),
after_cloumn_name: column_name.to_string(),
},
}
}
}
62 changes: 46 additions & 16 deletions src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ async fn new_dummy_catalog_list(

#[cfg(test)]
mod test {
use api::v1::add_column::location::LocationType;
use api::v1::add_column::Location;
use api::v1::column::{SemanticType, Values};
use api::v1::{
alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef,
Expand Down Expand Up @@ -364,16 +366,44 @@ mod test {
schema_name: "my_database".to_string(),
table_name: "my_table".to_string(),
kind: Some(alter_expr::Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
location: None,
}],
add_columns: vec![
AddColumn {
column_def: Some(ColumnDef {
name: "b".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
location: None,
},
AddColumn {
column_def: Some(ColumnDef {
name: "c".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
location: Some(Location {
location_type: LocationType::First.into(),
after_cloumn_name: "".to_string(),
}),
},
AddColumn {
column_def: Some(ColumnDef {
name: "d".to_string(),
datatype: ColumnDataType::Int32 as i32,
is_nullable: true,
default_constraint: vec![],
}),
is_key: true,
location: Some(Location {
location_type: LocationType::After.into(),
after_cloumn_name: "a".to_string(),
}),
},
],
})),
})),
});
Expand All @@ -389,15 +419,15 @@ mod test {
.unwrap();
assert!(matches!(output, Output::AffectedRows(1)));

let output = exec_selection(instance, "SELECT ts, a, b FROM my_database.my_table").await;
let output = exec_selection(instance, "SELECT * FROM my_database.my_table").await;
let Output::Stream(stream) = output else { unreachable!() };
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+---------------------+---+---+
| ts | a | b |
+---------------------+---+---+
| 2022-12-30T07:09:00 | s | 1 |
+---------------------+---+---+";
+---+---+---+---------------------+---+
| c | a | d | ts | b |
+---+---+---+---------------------+---+
| | s | | 2022-12-30T07:09:00 | 1 |
+---+---+---+---------------------+---+";
assert_eq!(recordbatches.pretty_print().unwrap(), expected);
}

Expand Down
6 changes: 5 additions & 1 deletion src/datanode/src/sql/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ impl SqlHandler {
}
.fail()
}
AlterTableOperation::AddColumn { column_def } => AlterKind::AddColumns {
AlterTableOperation::AddColumn {
column_def,
location,
} => AlterKind::AddColumns {
columns: vec![AddColumnRequest {
column_schema: column_def_to_schema(column_def, false)
.context(error::ParseSqlSnafu)?,
// FIXME(dennis): supports adding key column
is_key: false,
location: location.clone(),
}],
},
AlterTableOperation::DropColumn { name } => AlterKind::DropColumns {
Expand Down
7 changes: 5 additions & 2 deletions src/frontend/src/expr_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,18 @@ pub(crate) fn to_alter_expr(
}
.fail();
}
AlterTableOperation::AddColumn { column_def } => Kind::AddColumns(AddColumns {
AlterTableOperation::AddColumn {
column_def,
location,
} => Kind::AddColumns(AddColumns {
add_columns: vec![AddColumn {
column_def: Some(
sql_column_def_to_grpc_column_def(column_def)
.map_err(BoxedError::new)
.context(ExternalSnafu)?,
),
is_key: false,
location: None,
location: location.as_ref().map(From::from),
}],
}),
AlterTableOperation::DropColumn { name } => Kind::DropColumns(DropColumns {
Expand Down
Loading

0 comments on commit 5467ea4

Please sign in to comment.