Skip to content

Commit

Permalink
feat(iceberg): support iceberg sink create table (#18362)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenzl25 authored Sep 3, 2024
1 parent 5d8b165 commit 5ab2a59
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 17 deletions.
55 changes: 47 additions & 8 deletions e2e_test/sink/iceberg_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,65 @@ CREATE SOURCE iceberg_demo_source WITH (
table.name='e2e_demo_table'
);

statement ok
CREATE SINK s7 AS select mv6.v1 as v1, mv6.v2 as v2, mv6.v3 as v3 from mv6 WITH (
connector = 'iceberg',
type = 'upsert',
primary_key = 'v1',
warehouse.path = 's3a://hummock001',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = secret iceberg_s3_access_key,
s3.secret.key = secret iceberg_s3_secret_key,
s3.region = 'us-east-1',
catalog.name = 'demo',
catalog.type = 'storage',
database.name='demo_db',
table.name='e2e_auto_create_table',
commit_checkpoint_interval = 1,
create_table_if_not_exists = 'true'
);

statement ok
CREATE SOURCE iceberg_e2e_auto_create_table WITH (
connector = 'iceberg',
warehouse.path = 's3a://hummock001',
s3.endpoint = 'http://127.0.0.1:9301',
s3.access.key = secret iceberg_s3_access_key,
s3.secret.key = secret iceberg_s3_secret_key,
s3.region = 'us-east-1',
catalog.name = 'demo',
catalog.type = 'storage',
database.name='demo_db',
table.name='e2e_auto_create_table'
);

statement ok
INSERT INTO t6 VALUES (1, 2, '1-2'), (2, 2, '2-2'), (3, 2, '3-2'), (5, 2, '5-2'), (8, 2, '8-2'), (13, 2, '13-2'), (21, 2, '21-2');

statement ok
FLUSH;

sleep 5s
sleep 20s

query I
select count(*) from rw_iceberg_snapshots where source_name = 'iceberg_demo_source';
----
1

query I
select count(*) from rw_iceberg_snapshots where source_name = 'iceberg_e2e_auto_create_table';
----
1

query I
select sum(record_count) from rw_iceberg_files where source_name = 'iceberg_demo_source';
----
7

query I
select count(*) from rw_iceberg_snapshots where source_name = 'iceberg_demo_source';
select sum(record_count) from rw_iceberg_files where source_name = 'iceberg_e2e_auto_create_table';
----
1
7

statement ok
INSERT INTO t6 VALUES (1, 50, '1-50');
Expand All @@ -78,10 +120,7 @@ statement ok
DROP SOURCE iceberg_demo_source;

statement ok
DROP SINK s6;

statement ok
DROP MATERIALIZED VIEW mv6;
DROP SOURCE iceberg_e2e_auto_create_table;

statement ok
DROP TABLE t6;
DROP TABLE t6 cascade;
88 changes: 86 additions & 2 deletions src/connector/src/sink/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use async_trait::async_trait;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::TableMetadata;
use iceberg::table::Table as TableV2;
use iceberg::{Catalog as CatalogV2, TableIdent};
use iceberg::{Catalog as CatalogV2, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_glue::{AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY};
use icelake::catalog::{
load_catalog, load_iceberg_base_catalog_config, BaseCatalogConfig, CatalogRef, CATALOG_NAME,
Expand All @@ -43,9 +43,10 @@ use icelake::io_v2::{
DataFileWriterBuilder, EqualityDeltaWriterBuilder, IcebergWriterBuilder, DELETE_OP, INSERT_OP,
};
use icelake::transaction::Transaction;
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile};
use icelake::types::{data_file_from_json, data_file_to_json, Any, DataFile, COLUMN_ID_META_KEY};
use icelake::{Table, TableIdentifier};
use itertools::Itertools;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::bail;
Expand Down Expand Up @@ -151,6 +152,9 @@ pub struct IcebergConfig {
#[serde(default = "default_commit_checkpoint_interval")]
#[serde_as(as = "DisplayFromStr")]
pub commit_checkpoint_interval: u64,

#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub create_table_if_not_exists: bool,
}

impl IcebergConfig {
Expand Down Expand Up @@ -701,6 +705,10 @@ impl Debug for IcebergSink {

impl IcebergSink {
async fn create_and_validate_table(&self) -> Result<Table> {
if self.config.create_table_if_not_exists {
self.create_table_if_not_exists().await?;
}

let table = self
.config
.load_table()
Expand All @@ -722,6 +730,79 @@ impl IcebergSink {
Ok(table)
}

async fn create_table_if_not_exists(&self) -> Result<()> {
let catalog = self.config.create_catalog_v2().await?;
let table_id = self
.config
.full_table_name_v2()
.context("Unable to parse table name")?;
if !catalog
.table_exists(&table_id)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))?
{
let namespace = if let Some(database_name) = &self.config.database_name {
NamespaceIdent::new(database_name.clone())
} else {
bail!("database name must be set if you want to create table")
};

// convert risingwave schema -> arrow schema -> iceberg schema
let arrow_fields = self
.param
.columns
.iter()
.map(|column| {
let mut arrow_field = IcebergArrowConvert
.to_arrow_field(&column.name, &column.data_type)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context(format!(
"failed to convert {}: {} to arrow type",
&column.name, &column.data_type
))?;
let mut metadata = HashMap::new();
metadata.insert(
PARQUET_FIELD_ID_META_KEY.to_string(),
column.column_id.get_id().to_string(),
);
metadata.insert(
COLUMN_ID_META_KEY.to_string(),
column.column_id.get_id().to_string(),
);
arrow_field.set_metadata(metadata);
Ok(arrow_field)
})
.collect::<Result<Vec<ArrowField>>>()?;
let arrow_schema = arrow_schema_iceberg::Schema::new(arrow_fields);
let iceberg_schema = iceberg::arrow::arrow_schema_to_schema(&arrow_schema)
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to convert arrow schema to iceberg schema")?;

let location = {
let mut names = namespace.clone().inner();
names.push(self.config.table_name.to_string());
if self.config.path.ends_with('/') {
format!("{}{}", self.config.path, names.join("/"))
} else {
format!("{}/{}", self.config.path, names.join("/"))
}
};

let table_creation = TableCreation::builder()
.name(self.config.table_name.clone())
.schema(iceberg_schema)
.location(location)
.build();

catalog
.create_table(&namespace, table_creation)
.await
.map_err(|e| SinkError::Iceberg(anyhow!(e)))
.context("failed to create iceberg table")?;
}
Ok(())
}

pub fn new(config: IcebergConfig, param: SinkParam) -> Result<Self> {
let unique_column_ids = if config.r#type == SINK_TYPE_UPSERT && !config.force_append_only {
if let Some(pk) = &config.primary_key {
Expand Down Expand Up @@ -1292,6 +1373,8 @@ pub fn try_matches_arrow_schema(

let compatible = match (&converted_arrow_data_type, arrow_field.data_type()) {
(ArrowDataType::Decimal128(_, _), ArrowDataType::Decimal128(_, _)) => true,
(ArrowDataType::Binary, ArrowDataType::LargeBinary) => true,
(ArrowDataType::LargeBinary, ArrowDataType::Binary) => true,
(left, right) => left == right,
};
if !compatible {
Expand Down Expand Up @@ -1394,6 +1477,7 @@ mod test {
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
commit_checkpoint_interval: DEFAULT_COMMIT_CHECKPOINT_INTERVAL,
create_table_if_not_exists: false,
};

assert_eq!(iceberg_config, expected_iceberg_config);
Expand Down
65 changes: 58 additions & 7 deletions src/connector/src/sink/iceberg/storage_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::HashMap;

use async_trait::async_trait;
use iceberg::io::{FileIO, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
use iceberg::spec::TableMetadata;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation,
Expand Down Expand Up @@ -218,18 +218,54 @@ impl Catalog for StorageCatalog {
/// Create a new table inside the namespace.
async fn create_table(
&self,
_namespace: &NamespaceIdent,
_creation: TableCreation,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> iceberg::Result<Table> {
todo!()
let table_ident = TableIdent::new(namespace.clone(), creation.name.clone());
let table_path = {
let mut names = table_ident.namespace.clone().inner();
names.push(table_ident.name.to_string());
if self.warehouse.ends_with('/') {
format!("{}{}", self.warehouse, names.join("/"))
} else {
format!("{}/{}", self.warehouse, names.join("/"))
}
};

// Create the metadata directory
let metadata_path = format!("{table_path}/metadata");

// Create the initial table metadata
let table_metadata = TableMetadataBuilder::from_table_creation(creation)?.build()?;

// Write the initial metadata file
let metadata_file_path = format!("{metadata_path}/v1.metadata.json");
let metadata_json = serde_json::to_string(&table_metadata)?;
let output = self.file_io.new_output(&metadata_file_path)?;
output.write(metadata_json.into()).await?;

// Write the version hint file
let version_hint_path = format!("{table_path}/metadata/version-hint.text");
let version_hint_output = self.file_io.new_output(&version_hint_path)?;
version_hint_output.write("1".into()).await?;

Ok(Table::builder()
.metadata(table_metadata)
.identifier(table_ident)
.file_io(self.file_io.clone())
.build())
}

/// Load table from the catalog.
async fn load_table(&self, table: &TableIdent) -> iceberg::Result<Table> {
let table_path = {
let mut names = table.namespace.clone().inner();
names.push(table.name.to_string());
format!("{}/{}", self.warehouse, names.join("/"))
if self.warehouse.ends_with('/') {
format!("{}{}", self.warehouse, names.join("/"))
} else {
format!("{}/{}", self.warehouse, names.join("/"))
}
};
let path = if self.is_version_hint_exist(&table_path).await? {
let version_hint = self.read_version_hint(&table_path).await?;
Expand Down Expand Up @@ -262,8 +298,23 @@ impl Catalog for StorageCatalog {
}

/// Check if a table exists in the catalog.
async fn table_exists(&self, _table: &TableIdent) -> iceberg::Result<bool> {
todo!()
async fn table_exists(&self, table: &TableIdent) -> iceberg::Result<bool> {
let table_path = {
let mut names = table.namespace.clone().inner();
names.push(table.name.to_string());
if self.warehouse.ends_with('/') {
format!("{}{}", self.warehouse, names.join("/"))
} else {
format!("{}/{}", self.warehouse, names.join("/"))
}
};
let metadata_path = format!("{table_path}/metadata/version-hint.text");
self.file_io.is_exist(&metadata_path).await.map_err(|err| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to check if table exists: {}", err.as_report()),
)
})
}

/// Rename a table in the catalog.
Expand Down
4 changes: 4 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ IcebergConfig:
comments: Commit every n(>0) checkpoints, default is 10.
required: false
default: DEFAULT_COMMIT_CHECKPOINT_INTERVAL
- name: create_table_if_not_exists
field_type: bool
required: false
default: Default::default
KafkaConfig:
fields:
- name: properties.bootstrap.server
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/handler/create_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,10 @@ pub async fn get_partition_compute_info(
async fn get_partition_compute_info_for_iceberg(
iceberg_config: &IcebergConfig,
) -> Result<Option<PartitionComputeInfo>> {
// TODO: check table if exists
if iceberg_config.create_table_if_not_exists {
return Ok(None);
}
let table = iceberg_config.load_table().await?;
let Some(partition_spec) = table.current_table_metadata().current_partition_spec().ok() else {
return Ok(None);
Expand Down

0 comments on commit 5ab2a59

Please sign in to comment.