Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: adjust fuse_time_travel_size() #17164

Merged
merged 3 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@ pub trait SchemaApi: Send + Sync {

/// Get history of all tables in the specified database,
/// that are dropped after retention boundary time, i.e., the tables that can be undropped.
async fn list_retainable_tables(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError>;
async fn list_history_tables(
&self,
include_non_retainable: bool,
req: ListTableReq,
) -> Result<Vec<TableNIV>, KVAppError>;

/// List all tables in the database.
///
Expand Down
16 changes: 11 additions & 5 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1551,12 +1551,16 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
return Ok(vec![]);
};

get_retainable_table_metas(self, &Utc::now(), seq_table_id_list.data).await
get_history_table_metas(self, false, &Utc::now(), seq_table_id_list.data).await
}

#[logcall::logcall]
#[fastrace::trace]
async fn list_retainable_tables(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError> {
async fn list_history_tables(
&self,
include_non_retainable: bool,
req: ListTableReq,
) -> Result<Vec<TableNIV>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

// List tables by tenant, db_id.
Expand All @@ -1575,7 +1579,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
for (ident, history) in ident_histories {
debug!(name :% =(&ident); "get_tables_history");

let id_metas = get_retainable_table_metas(self, &now, history.data).await?;
let id_metas =
get_history_table_metas(self, include_non_retainable, &now, history.data).await?;

let table_nivs = id_metas.into_iter().map(|(table_id, seq_meta)| {
let name = DBIdTableName::new(ident.database_id, ident.table_name.clone());
Expand Down Expand Up @@ -3058,8 +3063,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

async fn get_retainable_table_metas(
async fn get_history_table_metas(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
include_non_retainable: bool,
now: &DateTime<Utc>,
tb_id_list: TableIdList,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, MetaError> {
Expand All @@ -3075,7 +3081,7 @@ async fn get_retainable_table_metas(
continue;
};

if is_drop_time_retainable(table_meta.drop_on, *now) {
if include_non_retainable || is_drop_time_retainable(table_meta.drop_on, *now) {
tb_metas.push((k, table_meta));
}
}
Expand Down
29 changes: 17 additions & 12 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4002,7 +4002,7 @@ impl SchemaApiTestSuite {
assert!(table_id >= 1, "table id >= 1");

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;

assert_eq!(res.len(), 1);
Expand All @@ -4020,10 +4020,15 @@ impl SchemaApiTestSuite {
upsert_test_data(mt.as_kv_api(), &tbid, data).await?;
// assert not return out of retention time data
let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;

assert_eq!(res.len(), 0);

let res = mt
.list_history_tables(true, ListTableReq::new(&tenant, db_id))
.await?;
assert_eq!(res.len(), 1);
}

Ok(())
Expand Down Expand Up @@ -4658,7 +4663,7 @@ impl SchemaApiTestSuite {
assert!(res.table_id >= 1, "table id >= 1");

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4690,7 +4695,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
Expand All @@ -4708,7 +4713,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
Expand All @@ -4735,7 +4740,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
Expand All @@ -4758,7 +4763,7 @@ impl SchemaApiTestSuite {
assert!(res.table_id >= 1, "table id >= 1");

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4786,7 +4791,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
Expand All @@ -4803,7 +4808,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4836,7 +4841,7 @@ impl SchemaApiTestSuite {
let old_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
let _res = mt.create_table(req.clone()).await?;
let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
assert!(old_db.meta.seq < cur_db.meta.seq);
Expand Down Expand Up @@ -4875,7 +4880,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![
DroponInfo {
Expand Down Expand Up @@ -4904,7 +4909,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.list_history_tables(false, ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![
DroponInfo {
Expand Down
5 changes: 4 additions & 1 deletion src/query/catalog/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ pub trait Database: DynClone + Sync + Send {
}

#[async_backtrace::framed]
async fn list_tables_history(&self) -> Result<Vec<Arc<dyn Table>>> {
async fn list_tables_history(
&self,
_include_non_retainable: bool,
) -> Result<Vec<Arc<dyn Table>>> {
Err(ErrorCode::Unimplemented(format!(
"UnImplement list_tables_history in {} Database",
self.name()
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/catalogs/default/mutable_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ impl Catalog for MutableCatalog {
db_name: &str,
) -> Result<Vec<Arc<dyn Table>>> {
let db = self.get_database(tenant, db_name).await?;
db.list_tables_history().await
db.list_tables_history(false).await
}

async fn get_drop_table_infos(
Expand Down
13 changes: 8 additions & 5 deletions src/query/service/src/databases/default/default_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ impl Database for DefaultDatabase {
}

#[async_backtrace::framed]
async fn list_tables_history(&self) -> Result<Vec<Arc<dyn Table>>> {
async fn list_tables_history(
&self,
include_non_retainable: bool,
) -> Result<Vec<Arc<dyn Table>>> {
// `get_table_history` will not fetch the tables that created before the
// "metasrv time travel functions" is added.
// thus, only the table-infos of dropped tables are used.
Expand All @@ -202,10 +205,10 @@ impl Database for DefaultDatabase {
let mut dropped = self
.ctx
.meta
.list_retainable_tables(ListTableReq::new(
self.get_tenant(),
self.db_info.database_id,
))
.list_history_tables(
include_non_retainable,
ListTableReq::new(self.get_tenant(), self.db_info.database_id),
)
.await?
.into_iter()
.filter(|i| i.value().drop_on.is_some())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl SimpleArgFunc for FuseTimeTravelSize {
}
None => {
let start = std::time::Instant::now();
let tables = db.list_tables_history().await?;
let tables = db.list_tables_history(true).await?;
info!("list_tables cost: {:?}", start.elapsed());
tables
}
Expand Down
Loading