Skip to content

Commit

Permalink
refactor: Define DatabaseIdIdent with TIdent
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed May 12, 2024
1 parent 21bbd83 commit 3de2ea1
Show file tree
Hide file tree
Showing 13 changed files with 269 additions and 162 deletions.
17 changes: 17 additions & 0 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ use crate::kv_pb_api::errors::PbApiWriteError;

/// This trait provides a way to access a kv store with `kvapi::Key` type key and protobuf encoded value.
pub trait KVPbApi: KVApi {
/// Update a protobuf encoded value by kvapi::Key.
///
/// Equivalent to `update_pb(UpsertPB::update(key,value))`
/// but returns the transition before and after executing the operation.
fn update_pb<K>(
&self,
key: K,
value: K::ValueType,
) -> impl Future<Output = Result<Change<K::ValueType>, Self::Error>> + Send
where
K: kvapi::Key + Send + 'static,
K::ValueType: FromToProto + Send,
Self::Error: From<PbApiWriteError<Self::Error>>,
{
async move { self.upsert_pb(&UpsertPB::update(key, value)).await }
}

/// Update or insert a protobuf encoded value by kvapi::Key.
///
/// The key will be converted to string and the value is encoded by `FromToProto`.
Expand Down
3 changes: 3 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ use databend_common_meta_app::schema::UpdateVirtualColumnReq;
use databend_common_meta_app::schema::UpsertTableOptionReply;
use databend_common_meta_app::schema::UpsertTableOptionReq;
use databend_common_meta_app::schema::VirtualColumnMeta;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::SeqV;
Expand Down Expand Up @@ -212,11 +213,13 @@ pub trait SchemaApi: Send + Sync {

async fn mget_table_names_by_ids(
&self,
tenant: &Tenant,
table_ids: &[MetaId],
) -> Result<Vec<Option<String>>, KVAppError>;

async fn mget_database_names_by_ids(
&self,
tenant: &Tenant,
db_ids: &[MetaId],
) -> Result<Vec<Option<String>>, KVAppError>;

Expand Down
132 changes: 83 additions & 49 deletions src/meta/api/src/schema_api_impl.rs

Large diffs are not rendered by default.

68 changes: 29 additions & 39 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_meta_app::schema::CreateTableReq;
use databend_common_meta_app::schema::CreateVirtualColumnReq;
use databend_common_meta_app::schema::DBIdTableName;
use databend_common_meta_app::schema::DatabaseId;
use databend_common_meta_app::schema::DatabaseIdHistoryIdent;
use databend_common_meta_app::schema::DatabaseIdIdent;
use databend_common_meta_app::schema::DatabaseIdToName;
use databend_common_meta_app::schema::DatabaseInfo;
use databend_common_meta_app::schema::DatabaseMeta;
Expand Down Expand Up @@ -130,13 +130,14 @@ use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::Operation;
use databend_common_meta_types::UpsertKV;
use databend_common_proto_conv::FromToProto;
use log::debug;
use log::info;
use minitrace::func_name;

use crate::deserialize_struct;
use crate::kv_app_error::KVAppError;
use crate::serialize_struct;
use crate::kv_pb_api::KVPbApi;
use crate::testing::get_kv_data;
use crate::testing::get_kv_u64_data;
use crate::DatamaskApi;
Expand Down Expand Up @@ -232,20 +233,16 @@ fn calc_and_compare_drop_on_table_result(result: Vec<Arc<TableInfo>>, expected:
assert_eq!(get, expected_map);
}

async fn upsert_test_data(
async fn upsert_test_data<K: kvapi::Key + Send>(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
key: &impl kvapi::Key,
value: Vec<u8>,
) -> Result<u64, KVAppError> {
let res = kv_api
.upsert_kv(UpsertKVReq {
key: key.to_string_key(),
seq: MatchSeq::GE(0),
value: Operation::Update(value),
value_meta: None,
})
.await?;

key: K,
value: K::ValueType,
) -> Result<u64, KVAppError>
where
K: kvapi::Key + Send + 'static,
K::ValueType: FromToProto + Send,
{
let res = kv_api.update_pb(key, value).await?;
let seq_v = res.result.unwrap();
Ok(seq_v.seq)
}
Expand Down Expand Up @@ -1918,7 +1915,7 @@ impl SchemaApiTestSuite {
{
let undrop_table_req = UndropTableByIdReq {
name_ident: create_table_req.name_ident.clone(),
db_id: create_table_as_dropped_resp.db_id,
db_id_ident: create_table_as_dropped_resp.database_id_ident.clone(),
table_id: create_table_as_dropped_resp.table_id,
table_id_seq: create_table_as_dropped_resp.table_id_seq.unwrap(),
force_replace: true,
Expand All @@ -1937,7 +1934,7 @@ impl SchemaApiTestSuite {
// undrop-table-by-id with force_replace set to false should fail.
let undrop_table_req = UndropTableByIdReq {
name_ident: create_table_req.name_ident.clone(),
db_id: create_table_as_dropped_resp.db_id,
db_id_ident: create_table_as_dropped_resp.database_id_ident.clone(),
table_id: create_table_as_dropped_resp.table_id,
table_id_seq: create_table_as_dropped_resp.table_id_seq.unwrap(),
force_replace: false,
Expand Down Expand Up @@ -3263,9 +3260,9 @@ impl SchemaApiTestSuite {
drop_on,
..Default::default()
};
let id_key = DatabaseId { db_id };
let data = serialize_struct(&drop_data)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;

let db_id_ident = DatabaseIdIdent::new(&tenant, db_id);
upsert_test_data(mt.as_kv_api(), db_id_ident, drop_data).await?;

let res = mt
.get_database_history(ListDatabaseReq {
Expand Down Expand Up @@ -3309,9 +3306,8 @@ impl SchemaApiTestSuite {
drop_on,
..Default::default()
};
let id_key = DatabaseId { db_id };
let data = serialize_struct(&drop_data)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;
let id_key = DatabaseIdIdent::new(db_name.tenant(), db_id);
upsert_test_data(mt.as_kv_api(), id_key, drop_data).await?;

if delete {
delete_test_data(mt.as_kv_api(), &db_name).await?;
Expand Down Expand Up @@ -3387,7 +3383,7 @@ impl SchemaApiTestSuite {

// assert old db meta and id to name mapping has been removed
for db_id in old_id_list.iter() {
let id_key = DatabaseId { db_id: *db_id };
let id_key = DatabaseIdIdent::new(&tenant, *db_id);
let id_mapping = DatabaseIdToName { db_id: *db_id };

let meta_res: Result<DatabaseMeta, KVAppError> =
Expand Down Expand Up @@ -3452,8 +3448,7 @@ impl SchemaApiTestSuite {
};

let id_key = TableId { table_id };
let data = serialize_struct(&drop_data)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;
upsert_test_data(mt.as_kv_api(), id_key, drop_data.clone()).await?;

if delete {
delete_test_data(mt.as_kv_api(), &dbid_tbname).await?;
Expand Down Expand Up @@ -3731,9 +3726,8 @@ impl SchemaApiTestSuite {
drop_on,
..Default::default()
};
let id_key = DatabaseId { db_id };
let data = serialize_struct(&drop_data)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;
let id_key = DatabaseIdIdent::new(&tenant, db_id);
upsert_test_data(mt.as_kv_api(), id_key, drop_data).await?;

let dbid_idlist1 = DatabaseIdHistoryIdent::new(&tenant, db1_name);
let old_id_list: DbIdList = get_kv_data(mt.as_kv_api(), &dbid_idlist1).await?;
Expand Down Expand Up @@ -3772,7 +3766,7 @@ impl SchemaApiTestSuite {

// assert old db meta and id to name mapping has been removed
for db_id in old_id_list.id_list.iter() {
let id_key = DatabaseId { db_id: *db_id };
let id_key = DatabaseIdIdent::new(&tenant, *db_id);
let id_mapping = DatabaseIdToName { db_id: *db_id };

let meta_res: Result<DatabaseMeta, KVAppError> =
Expand Down Expand Up @@ -3914,9 +3908,8 @@ impl SchemaApiTestSuite {
drop_on: Some(created_on - Duration::days(1)),
..TableMeta::default()
};
let data = serialize_struct(&create_drop_table_meta)?;

upsert_test_data(mt.as_kv_api(), &tbid, data).await?;
upsert_test_data(mt.as_kv_api(), tbid, create_drop_table_meta).await?;
// assert not return out of retention time data
let res = mt
.get_table_history(ListTableReq::new(&tenant, db_name))
Expand Down Expand Up @@ -4070,8 +4063,7 @@ impl SchemaApiTestSuite {
let table_id = resp.table_id;
let id_key = TableId { table_id };
table_meta.drop_on = Some(created_on + Duration::seconds(100));
let data = serialize_struct(&table_meta)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;
upsert_test_data(mt.as_kv_api(), id_key, table_meta).await?;
}

info!("--- create db2.tb3");
Expand All @@ -4098,9 +4090,8 @@ impl SchemaApiTestSuite {
// change db meta to make this db drop time outof filter time
let mut drop_db_meta = create_db_req.meta.clone();
drop_db_meta.drop_on = Some(created_on + Duration::seconds(100));
let id_key = DatabaseId { db_id };
let data = serialize_struct(&drop_db_meta)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;
let id_key = DatabaseIdIdent::new(&tenant, db_id);
upsert_test_data(mt.as_kv_api(), id_key, drop_db_meta).await?;
}

// third create a database not dropped, but has a table drop within filter time
Expand Down Expand Up @@ -4170,8 +4161,7 @@ impl SchemaApiTestSuite {
let table_id = resp.table_id;
let id_key = TableId { table_id };
table_meta.drop_on = Some(created_on + Duration::seconds(100));
let data = serialize_struct(&table_meta)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;
upsert_test_data(mt.as_kv_api(), id_key, table_meta).await?;
}

info!("--- create db3.tb3");
Expand Down
58 changes: 4 additions & 54 deletions src/meta/app/src/schema/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,29 +42,6 @@ pub struct DatabaseIdent {
pub seq: u64,
}

#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Ord)]
pub struct DatabaseId {
pub db_id: u64,
}

impl DatabaseId {
pub fn new(db_id: u64) -> Self {
DatabaseId { db_id }
}
}

impl From<u64> for DatabaseId {
fn from(db_id: u64) -> Self {
DatabaseId { db_id }
}
}

impl Display for DatabaseId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.db_id)
}
}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct DatabaseIdToName {
pub db_id: u64,
Expand Down Expand Up @@ -327,31 +304,9 @@ mod kvapi_key_impl {
use databend_common_meta_kvapi::kvapi;

use crate::schema::database_name_ident::DatabaseNameIdentRaw;
use crate::schema::DatabaseId;
use crate::schema::DatabaseIdIdent;
use crate::schema::DatabaseIdToName;
use crate::schema::DatabaseMeta;

impl kvapi::KeyCodec for DatabaseId {
fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder {
b.push_u64(self.db_id)
}

fn decode_key(parser: &mut kvapi::KeyParser) -> Result<Self, kvapi::KeyError> {
let db_id = parser.next_u64()?;
Ok(Self { db_id })
}
}

/// "__fd_database_by_id/<db_id>"
impl kvapi::Key for DatabaseId {
const PREFIX: &'static str = "__fd_database_by_id";

type ValueType = DatabaseMeta;

fn parent(&self) -> Option<String> {
None
}
}
use crate::tenant::Tenant;

impl kvapi::KeyCodec for DatabaseIdToName {
fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder {
Expand All @@ -371,13 +326,8 @@ mod kvapi_key_impl {
type ValueType = DatabaseNameIdentRaw;

fn parent(&self) -> Option<String> {
Some(DatabaseId::new(self.db_id).to_string_key())
}
}

impl kvapi::Value for DatabaseMeta {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
[]
// TODO(TIdent): add real tenant
Some(DatabaseIdIdent::new(Tenant::new_literal("dummy"), self.db_id).to_string_key())
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/meta/app/src/schema/database_id_history_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ mod kvapi_impl {
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::Key;

use crate::schema::DatabaseId;
use crate::schema::DatabaseIdIdent;
use crate::schema::DbIdList;
use crate::tenant::Tenant;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
Expand All @@ -53,9 +54,10 @@ mod kvapi_impl {

impl kvapi::Value for DbIdList {
fn dependency_keys(&self) -> impl IntoIterator<Item = String> {
// TODO(TIdent): add real tenant:
self.id_list
.iter()
.map(|id| DatabaseId::new(*id).to_string_key())
.map(|id| DatabaseIdIdent::new(Tenant::new_literal("dummy"), *id).to_string_key())
}
}

Expand Down
Loading

0 comments on commit 3de2ea1

Please sign in to comment.