Skip to content

Commit

Permalink
add backup support of table info db.
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos committed Aug 24, 2024
1 parent 3a4cd44 commit 84f816c
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 2 deletions.
9 changes: 9 additions & 0 deletions config/src/config/indexer_table_info_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
// Useful defaults
pub const DEFAULT_PARSER_TASK_COUNT: u16 = 20;
pub const DEFAULT_PARSER_BATCH_SIZE: u16 = 1000;
pub const DEFAULT_TABLE_INFO_BUCKET: &str = "default-table-info";

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand All @@ -20,6 +21,12 @@ pub struct IndexerTableInfoConfig {
pub parser_batch_size: u16,

pub enable_expensive_logging: bool,

/// Enable backup service
pub db_backup_enabled: bool,

/// Backup and restore service config
pub gcs_bucket_name: String,
}

// Reminder, #[serde(default)] on IndexerTableInfoConfig means that the default values for
Expand All @@ -32,6 +39,8 @@ impl Default for IndexerTableInfoConfig {
parser_task_count: DEFAULT_PARSER_TASK_COUNT,
parser_batch_size: DEFAULT_PARSER_BATCH_SIZE,
enable_expensive_logging: false,
db_backup_enabled: false,
gcs_bucket_name: DEFAULT_TABLE_INFO_BUCKET.to_owned(),
}
}
}
2 changes: 2 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ Follow instructions on how to run a fullnode against an existing network.
enabled: true
parser_task_count: 10
parser_batch_size: 1000
db_backup_enabled: false
gcs_bucket_name: "table-info"
* Run fullnode `cargo run -p aptos-node --release -- -f ./fullnode.yaml`
11 changes: 11 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
backup_restore::gcs::GcsBackupRestoreOperator,
internal_indexer_db_service::InternalIndexerDBService, table_info_service::TableInfoService,
};
use aptos_api::context::Context;
Expand Down Expand Up @@ -66,26 +67,36 @@ pub fn bootstrap(
let db =
open_db(db_path, &rocksdb_config).expect("Failed to open up indexer async v2 db initially");

let table_info_gcs_bucket_name = node_config.indexer_table_info.gcs_bucket_name.clone();
let indexer_async_v2 =
Arc::new(IndexerAsyncV2::new(db).expect("Failed to initialize indexer async v2"));
let indexer_async_v2_clone = Arc::clone(&indexer_async_v2);

// Spawn the runtime for table info parsing
runtime.spawn(async move {
let backup_restore_operator: Arc<GcsBackupRestoreOperator> =
Arc::new(GcsBackupRestoreOperator::new(table_info_gcs_bucket_name).await);
let context = Arc::new(Context::new(
chain_id,
db_rw.reader.clone(),
mp_sender,
node_config.clone(),
None,
));
// DB backup is optional
let backup_restore_operator = if node_config.indexer_table_info.db_backup_enabled {
Some(backup_restore_operator)
} else {
None
};

let mut parser = TableInfoService::new(
context,
indexer_async_v2_clone.next_version(),
node_config.indexer_table_info.parser_task_count,
node_config.indexer_table_info.parser_batch_size,
node_config.indexer_table_info.enable_expensive_logging,
backup_restore_operator,
indexer_async_v2_clone,
);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::backup_restore::gcs::GcsBackupRestoreOperator;
use anyhow::Error;
use aptos_api::context::Context;
use aptos_api_types::TransactionOnChainData;
Expand All @@ -18,13 +19,18 @@ type EndVersion = u64;
const LEDGER_VERSION_RETRY_TIME_MILLIS: u64 = 10;
const SERVICE_TYPE: &str = "table_info_service";

/// TableInfoService is responsible for parsing table info from transactions and writing them to rocksdb.
/// Not thread safe.
pub struct TableInfoService {
pub current_version: u64,
pub parser_task_count: u16,
pub parser_batch_size: u16,
pub context: Arc<Context>,
pub enable_expensive_logging: bool,
pub indexer_async_v2: Arc<IndexerAsyncV2>,

// Backup and restore service. If not enabled, this will be None.
pub backup_restore_operator: Option<Arc<GcsBackupRestoreOperator>>,
}

impl TableInfoService {
Expand All @@ -34,6 +40,7 @@ impl TableInfoService {
parser_task_count: u16,
parser_batch_size: u16,
enable_expensive_logging: bool,
backup_restore_operator: Option<Arc<GcsBackupRestoreOperator>>,
indexer_async_v2: Arc<IndexerAsyncV2>,
) -> Self {
Self {
Expand All @@ -42,6 +49,7 @@ impl TableInfoService {
parser_batch_size,
context,
enable_expensive_logging,
backup_restore_operator,
indexer_async_v2,
}
}
Expand All @@ -52,6 +60,7 @@ impl TableInfoService {
/// 4. write parsed table info to rocksdb
/// 5. after all batches from the loop complete, if pending on items not empty, move on to 6, otherwise, start from 1 again
/// 6. retry all the txns in the loop sequentially to clean up the pending on items
/// 7. try to backup rocksdb snapshot if new epoch have been found
pub async fn run(&mut self) {
loop {
let start_time = std::time::Instant::now();
Expand All @@ -61,12 +70,32 @@ impl TableInfoService {
.process_multiple_batches(self.indexer_async_v2.clone(), batches, ledger_version)
.await;
let max_version = self.get_max_batch_version(results).unwrap_or_default();
let versions_processed = max_version - self.current_version + 1;
let versions_processed = max_version + 1 - self.current_version;
let context = self.context.clone();
let backup_restore_operator = self.backup_restore_operator.clone();
let start_version = self.current_version;
let indexer_async_v2 = self.indexer_async_v2.clone();

// Try uploading the rocksdb snapshot by taking a full db checkpoint and save it to gcs if found new epoch
// running backup logic in a separate thread to not let it block the main thread to parse table info, since
// gcs operation could be slow
// TODO: move this to a separate thread.
if let Some(backup_restore_operator) = backup_restore_operator {
tokio::spawn(async move {
Self::try_backup_db_snapshot_for_new_epoch(
context.clone(),
max_version,
indexer_async_v2.clone(),
backup_restore_operator,
)
.await;
});
}

log_grpc_step(
SERVICE_TYPE,
IndexerGrpcStep::TableInfoProcessed,
Some(self.current_version as i64),
Some(start_version as i64),
Some(max_version as i64),
None,
None,
Expand Down Expand Up @@ -294,6 +323,50 @@ impl TableInfoService {
Ok(())
}

/// Tries to upload a snapshot of the database if the backup service is enabled.
/// This function is called to periodically back up the database state to Google Cloud Storage (GCS).
/// It checks the latest epoch of data already backed up in GCS and compares it with the current epoch.
async fn try_backup_db_snapshot_for_new_epoch(
context: Arc<Context>,
last_version: u64,
indexer_async_v2: Arc<IndexerAsyncV2>,
backup_restore_operator: Arc<GcsBackupRestoreOperator>,
) {
let metadata_epoch = backup_restore_operator.clone().get_metadata_epoch();
let (_, _, block_event) = context
.db
.get_block_info_by_version(last_version)
.unwrap_or_else(|_| {
panic!("Could not get block_info for last version {}", last_version,)
});
let block_event_epoch = block_event.epoch();
// If gcs most recent transaction version in metadata is behind, take a snapshot of rocksdb and upload
if metadata_epoch < block_event_epoch {
let start_time = std::time::Instant::now();
// temporary path to store the snapshot
let snapshot_dir = context
.node_config
.get_data_dir()
.join(block_event_epoch.to_string());
let ledger_chain_id = context.chain_id().id();
backup_restore_operator
.backup_db_snapshot(
ledger_chain_id as u64,
block_event_epoch,
indexer_async_v2,
snapshot_dir.clone(),
)
.await
.expect("Failed to upload snapshot in table info service");
// TODO: use log_grpc_step to log the backup step.
info!(
backup_epoch = block_event_epoch,
backup_millis = start_time.elapsed().as_millis(),
"[Table Info] Table info db backed up successfully"
);
}
}

/// TODO(jill): consolidate it with `ensure_highest_known_version`
/// Will keep looping and checking the latest ledger info to see if there are new transactions
/// If there are, it will update the ledger version version
Expand Down

0 comments on commit 84f816c

Please sign in to comment.