Skip to content

Commit

Permalink
[State Sync] Read directly from storage.
Browse files Browse the repository at this point in the history
Closes: #10070
  • Loading branch information
JoshLind authored and bors-libra committed Dec 22, 2021
1 parent e435976 commit 372e031
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 173 deletions.
2 changes: 1 addition & 1 deletion diem-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ fn create_state_sync_runtimes<M: MempoolNotificationSender + 'static>(
state_sync_network_handles,
mempool_notifier,
consensus_listener,
db_rw,
db_rw.reader,
chunk_executor,
node_config,
waypoint,
Expand Down
42 changes: 17 additions & 25 deletions state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
driver::DriverConfiguration,
error::Error,
storage_synchronizer::{StorageStateSummary, StorageSynchronizerInterface},
driver::DriverConfiguration, error::Error, storage_synchronizer::StorageSynchronizerInterface,
utils,
};
use data_streaming_service::{
Expand All @@ -25,6 +23,7 @@ use diem_types::{
};
use futures::channel::oneshot;
use std::{collections::BTreeMap, sync::Arc};
use storage_interface::DbReader;

/// A simple container for verified epoch states and epoch ending ledger infos
/// that have been fetched from the network.
Expand Down Expand Up @@ -185,11 +184,8 @@ impl VerifiedEpochStates {
/// Returns the highest known ledger info (including the newly fetch ones)
pub fn get_highest_known_ledger_info(
&self,
latest_storage_summary: StorageStateSummary,
mut highest_known_ledger_info: LedgerInfoWithSignatures,
) -> LedgerInfoWithSignatures {
// Get the current highest versioned ledger info from storage
let mut highest_known_ledger_info = latest_storage_summary.latest_ledger_info;

// Check if we've fetched a higher versioned ledger info from the network
if !self.new_epoch_ending_ledger_infos.is_empty() {
let highest_fetched_ledger_info = self
Expand Down Expand Up @@ -241,6 +237,9 @@ pub struct Bootstrapper<StorageSyncer> {
// The client through which to stream data from the Diem network
streaming_service_client: StreamingServiceClient,

// The interface to read from storage
storage: Arc<dyn DbReader>,

// The storage synchronizer used to update local storage
storage_synchronizer: Arc<Mutex<StorageSyncer>>,

Expand All @@ -252,14 +251,12 @@ impl<StorageSyncer: StorageSynchronizerInterface> Bootstrapper<StorageSyncer> {
pub fn new(
driver_configuration: DriverConfiguration,
streaming_service_client: StreamingServiceClient,
storage: Arc<dyn DbReader>,
storage_synchronizer: Arc<Mutex<StorageSyncer>>,
) -> Self {
// Load the latest epoch state from storage
let latest_storage_summary = storage_synchronizer
.lock()
.get_storage_summary()
.expect("Unable to load storage summary!");
let latest_epoch_state = latest_storage_summary.latest_epoch_state;
let latest_epoch_state = utils::fetch_latest_epoch_state(storage.clone())
.expect("Unable to fetch latest epoch state!");
let verified_epoch_states = VerifiedEpochStates::new(latest_epoch_state);

Self {
Expand All @@ -268,6 +265,7 @@ impl<StorageSyncer: StorageSynchronizerInterface> Bootstrapper<StorageSyncer> {
bootstrapped: false,
driver_configuration,
streaming_service_client,
storage,
storage_synchronizer,
verified_epoch_states,
}
Expand Down Expand Up @@ -347,7 +345,7 @@ impl<StorageSyncer: StorageSynchronizerInterface> Bootstrapper<StorageSyncer> {
}

// Get the highest synced and known ledger info versions
let highest_synced_version = self.get_highest_synced_version()?;
let highest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?;
let highest_known_ledger_info = self.get_highest_known_ledger_info()?;
let highest_known_ledger_version = highest_known_ledger_info.ledger_info().version();

Expand Down Expand Up @@ -518,10 +516,9 @@ impl<StorageSyncer: StorageSynchronizerInterface> Bootstrapper<StorageSyncer> {
global_data_summary: &GlobalDataSummary,
) -> Result<(), Error> {
// If our storage has already synced beyond our waypoint, nothing needs to be checked
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
let latest_ledger_info = latest_storage_summary.latest_ledger_info.ledger_info();
let latest_ledger_info = utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
let waypoint_version = self.driver_configuration.waypoint.version();
if latest_ledger_info.version() >= waypoint_version {
if latest_ledger_info.ledger_info().version() >= waypoint_version {
self.verified_epoch_states.set_verified_waypoint();
return Ok(());
}
Expand Down Expand Up @@ -665,7 +662,7 @@ impl<StorageSyncer: StorageSynchronizerInterface> Bootstrapper<StorageSyncer> {
payload_start_version: Option<Version>,
) -> Result<(), Error> {
// Fetch the highest synced version
let highest_synced_version = self.get_highest_synced_version()?;
let highest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?;

// Compare the payload start version with the expected version
if let Some(payload_start_version) = payload_start_version {
Expand Down Expand Up @@ -757,18 +754,13 @@ impl<StorageSyncer: StorageSynchronizerInterface> Bootstrapper<StorageSyncer> {
.get_epoch_ending_ledger_info(payload_end_version))
}

/// Returns the highest synced version in storage
fn get_highest_synced_version(&self) -> Result<Version, Error> {
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
Ok(latest_storage_summary.latest_synced_version)
}

/// Returns the highest known ledger info (including the newly fetch ones)
fn get_highest_known_ledger_info(&self) -> Result<LedgerInfoWithSignatures, Error> {
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
Ok(self
.verified_epoch_states
.get_highest_known_ledger_info(latest_storage_summary))
.get_highest_known_ledger_info(latest_synced_ledger_info))
}

/// Handles the end of stream notification or an invalid payload by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use diem_types::{
transaction::{TransactionListWithProof, TransactionOutputListWithProof, Version},
};
use std::sync::Arc;
use storage_interface::DbReader;

/// A simple component that manages the continuous syncing of the node
pub struct ContinuousSyncer<StorageSyncer> {
Expand All @@ -30,6 +31,9 @@ pub struct ContinuousSyncer<StorageSyncer> {
// The client through which to stream data from the Diem network
streaming_service_client: StreamingServiceClient,

// The interface to read from storage
storage: Arc<dyn DbReader>,

// The storage synchronizer used to update local storage
storage_synchronizer: Arc<Mutex<StorageSyncer>>,
}
Expand All @@ -38,12 +42,14 @@ impl<StorageSyncer: StorageSynchronizerInterface> ContinuousSyncer<StorageSyncer
pub fn new(
driver_configuration: DriverConfiguration,
streaming_service_client: StreamingServiceClient,
storage: Arc<dyn DbReader>,
storage_synchronizer: Arc<Mutex<StorageSyncer>>,
) -> Self {
Self {
active_data_stream: None,
driver_configuration,
streaming_service_client,
storage,
storage_synchronizer,
}
}
Expand Down Expand Up @@ -159,9 +165,8 @@ impl<StorageSyncer: StorageSynchronizerInterface> ContinuousSyncer<StorageSyncer

/// Returns the highest synced version and epoch in storage
fn get_highest_synced_version_and_epoch(&self) -> Result<(Version, Version), Error> {
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
let highest_synced_version = latest_storage_summary.latest_synced_version;
let highest_synced_epoch = latest_storage_summary.latest_epoch_state.epoch;
let highest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?;
let highest_synced_epoch = utils::fetch_latest_epoch_state(self.storage.clone())?.epoch;

Ok((highest_synced_version, highest_synced_epoch))
}
Expand Down Expand Up @@ -298,8 +303,7 @@ impl<StorageSyncer: StorageSynchronizerInterface> ContinuousSyncer<StorageSyncer
}

// Verify the ledger info state and signatures
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
let trusted_state = latest_storage_summary.latest_epoch_state;
let trusted_state = utils::fetch_latest_epoch_state(self.storage.clone())?;
if let Err(error) = trusted_state.verify(ledger_info_with_signatures) {
self.terminate_active_stream(notification_id, NotificationFeedback::PayloadProofFailed)
.await?;
Expand Down
87 changes: 58 additions & 29 deletions state-sync/state-sync-v2/state-sync-driver/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
MempoolNotificationHandler,
},
storage_synchronizer::StorageSynchronizerInterface,
utils,
};
use consensus_notifications::{
ConsensusCommitNotification, ConsensusNotification, ConsensusSyncNotification,
Expand All @@ -25,6 +26,7 @@ use event_notifications::EventSubscriptionService;
use futures::StreamExt;
use mempool_notifications::MempoolNotificationSender;
use std::sync::Arc;
use storage_interface::DbReader;
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;

Expand Down Expand Up @@ -82,8 +84,8 @@ pub struct StateSyncDriver<DataClient, MempoolNotifier, StorageSyncer> {
// The handler for notifications to mempool
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,

// The storage synchronizer used to update local storage
storage_synchronizer: Arc<Mutex<StorageSyncer>>,
// The interface to read from storage
storage: Arc<dyn DbReader>,
}

impl<
Expand All @@ -102,18 +104,21 @@ impl<
storage_synchronizer: StorageSyncer,
diem_data_client: DataClient,
streaming_service_client: StreamingServiceClient,
storage: Arc<dyn DbReader>,
) -> Self {
let event_subscription_service = Arc::new(Mutex::new(event_subscription_service));
let storage_synchronizer = Arc::new(Mutex::new(storage_synchronizer));
let bootstrapper = Bootstrapper::new(
driver_configuration.clone(),
streaming_service_client.clone(),
storage.clone(),
storage_synchronizer.clone(),
);
let continuous_syncer = ContinuousSyncer::new(
driver_configuration.clone(),
streaming_service_client,
storage_synchronizer.clone(),
storage.clone(),
storage_synchronizer,
);

Self {
Expand All @@ -126,7 +131,7 @@ impl<
driver_configuration,
event_subscription_service,
mempool_notification_handler,
storage_synchronizer,
storage,
}
}

Expand Down Expand Up @@ -236,10 +241,13 @@ impl<
);

// Handle the commit notification
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
let latest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?;
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
commit_notification
.handle_commit_notification(
&latest_storage_summary,
latest_synced_version,
latest_synced_ledger_info,
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
Expand All @@ -256,15 +264,17 @@ impl<
&mut self,
sync_notification: ConsensusSyncNotification,
) -> Result<(), Error> {
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
let latest_synced_version = utils::fetch_latest_synced_version(self.storage.clone())?;
debug!(
"Received a consensus sync notification! Target version: {:?}. Latest storage summary: {:?}",
sync_notification.target, &latest_storage_summary,
"Received a consensus sync notification! Target version: {:?}. Latest synced version: {:?}",
sync_notification.target, latest_synced_version,
);

// Initialize a new sync request
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
self.consensus_notification_handler
.initialize_sync_request(sync_notification, latest_storage_summary)
.initialize_sync_request(sync_notification, latest_synced_ledger_info)
.await
}

Expand All @@ -288,23 +298,40 @@ impl<
commit_notification.events.len()
);

// Handle the commit notification
match self.storage_synchronizer.lock().get_storage_summary() {
Ok(latest_storage_summary) => {
if let Err(error) = commit_notification
.handle_commit_notification(
&latest_storage_summary,
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await
{
error!("Failed to handle a commit notification. Error {:?}", error);
// Fetch the latest synced version and ledger info from storage
let (latest_synced_version, latest_synced_ledger_info) =
match utils::fetch_latest_synced_version(self.storage.clone()) {
Ok(latest_synced_version) => {
match utils::fetch_latest_synced_ledger_info(self.storage.clone()) {
Ok(latest_synced_ledger_info) => {
(latest_synced_version, latest_synced_ledger_info)
}
Err(error) => {
error!(
"Failed to fetch latest synced ledger info! Error: {:?}",
error
);
return;
}
}
}
}
Err(error) => {
error!("Failed to fetch storage summary! Error: {:?}", error);
}
Err(error) => {
error!("Failed to fetch latest synced version! Error: {:?}", error);
return;
}
};

// Handle the commit notification
if let Err(error) = commit_notification
.handle_commit_notification(
latest_synced_version,
latest_synced_ledger_info,
self.mempool_notification_handler.clone(),
self.event_subscription_service.clone(),
)
.await
{
error!("Failed to handle a commit notification! Error: {:?}", error);
}

// Update the last commit timestamp for the sync request
Expand All @@ -322,9 +349,10 @@ impl<
return Ok(());
}

let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
let latest_synced_ledger_info =
utils::fetch_latest_synced_ledger_info(self.storage.clone())?;
self.consensus_notification_handler
.check_sync_request_progress(&latest_storage_summary)
.check_sync_request_progress(latest_synced_ledger_info)
.await
}

Expand All @@ -342,6 +370,7 @@ impl<
// Fetch the global data summary and verify we have active peers
let global_data_summary = self.diem_data_client.get_global_data_summary();
if global_data_summary.is_empty() {
// TODO(joshlind): what if we have no peers? i.e., we're only a single node deployment?
trace!("The global data summary is empty! It's likely that we have no active peers.");
return;
}
Expand Down
Loading

0 comments on commit 372e031

Please sign in to comment.