Skip to content

Commit

Permalink
[State Sync] Use decoupled execute/apply and commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and bors-libra committed Dec 22, 2021
1 parent 74bc0cf commit e435976
Show file tree
Hide file tree
Showing 7 changed files with 498 additions and 380 deletions.
157 changes: 43 additions & 114 deletions state-sync/state-sync-v2/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use crate::{
driver::DriverConfiguration,
error::Error,
notification_handlers::MempoolNotificationHandler,
storage_synchronizer::{StorageStateSummary, StorageSynchronizerInterface},
utils,
};
Expand All @@ -18,16 +17,13 @@ use diem_data_client::GlobalDataSummary;
use diem_infallible::Mutex;
use diem_logger::*;
use diem_types::{
contract_event::ContractEvent,
epoch_change::Verifier,
epoch_state::EpochState,
ledger_info::LedgerInfoWithSignatures,
transaction::{Transaction, TransactionListWithProof, TransactionOutputListWithProof, Version},
transaction::{TransactionListWithProof, TransactionOutputListWithProof, Version},
waypoint::Waypoint,
};
use event_notifications::EventSubscriptionService;
use futures::channel::oneshot;
use mempool_notifications::MempoolNotificationSender;
use std::{collections::BTreeMap, sync::Arc};

/// A simple container for verified epoch states and epoch ending ledger infos
Expand Down Expand Up @@ -229,7 +225,7 @@ impl VerifiedEpochStates {
}

/// A simple component that manages the bootstrapping of the node
pub struct Bootstrapper<MempoolNotifier, StorageSyncer> {
pub struct Bootstrapper<StorageSyncer> {
// The currently active data stream (provided by the data streaming service)
active_data_stream: Option<DataStreamListener>,

Expand All @@ -242,12 +238,6 @@ pub struct Bootstrapper<MempoolNotifier, StorageSyncer> {
// The config of the state sync driver
driver_configuration: DriverConfiguration,

// The event subscription service to notify listeners of on-chain events
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,

// The handler for notifications to mempool
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,

// The client through which to stream data from the Diem network
streaming_service_client: StreamingServiceClient,

Expand All @@ -258,13 +248,9 @@ pub struct Bootstrapper<MempoolNotifier, StorageSyncer> {
verified_epoch_states: VerifiedEpochStates,
}

impl<MempoolNotifier: MempoolNotificationSender, StorageSyncer: StorageSynchronizerInterface>
Bootstrapper<MempoolNotifier, StorageSyncer>
{
impl<StorageSyncer: StorageSynchronizerInterface> Bootstrapper<StorageSyncer> {
pub fn new(
driver_configuration: DriverConfiguration,
event_subscription_service: Arc<Mutex<EventSubscriptionService>>,
mempool_notification_handler: MempoolNotificationHandler<MempoolNotifier>,
streaming_service_client: StreamingServiceClient,
storage_synchronizer: Arc<Mutex<StorageSyncer>>,
) -> Self {
Expand All @@ -281,8 +267,6 @@ impl<MempoolNotifier: MempoolNotificationSender, StorageSyncer: StorageSynchroni
bootstrap_notifier_channel: None,
bootstrapped: false,
driver_configuration,
event_subscription_service,
mempool_notification_handler,
streaming_service_client,
storage_synchronizer,
verified_epoch_states,
Expand Down Expand Up @@ -629,71 +613,49 @@ impl<MempoolNotifier: MempoolNotificationSender, StorageSyncer: StorageSynchroni
let highest_known_ledger_info = self.get_highest_known_ledger_info()?;

// Execute/apply and commit the transactions/outputs
let (committed_events, committed_transactions) =
match self.driver_configuration.config.bootstrapping_mode {
BootstrappingMode::ApplyTransactionOutputsFromGenesis => {
if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof {
let committed_transactions = transaction_outputs_with_proof
.transactions_and_outputs
.iter()
.map(|(txn, _)| txn.clone())
.collect();
let committed_events = self
.storage_synchronizer
.lock()
.apply_and_commit_transaction_outputs(
transaction_outputs_with_proof,
highest_known_ledger_info,
end_of_epoch_ledger_info,
);
(committed_events, committed_transactions)
} else {
self.terminate_active_stream(
notification_id,
NotificationFeedback::PayloadTypeIsIncorrect,
)
.await?;
return Err(Error::InvalidPayload(
"Did not receive transaction outputs with proof!".into(),
));
}
}
BootstrappingMode::ExecuteTransactionsFromGenesis => {
if let Some(transaction_list_with_proof) = transaction_list_with_proof {
let committed_transactions =
transaction_list_with_proof.transactions.clone();
let committed_events = self
.storage_synchronizer
.lock()
.execute_and_commit_transactions(
transaction_list_with_proof,
highest_known_ledger_info,
end_of_epoch_ledger_info,
);
(committed_events, committed_transactions)
} else {
self.terminate_active_stream(
notification_id,
NotificationFeedback::PayloadTypeIsIncorrect,
)
.await?;
return Err(Error::InvalidPayload(
"Did not receive transactions with proof!".into(),
));
}
match self.driver_configuration.config.bootstrapping_mode {
BootstrappingMode::ApplyTransactionOutputsFromGenesis => {
if let Some(transaction_outputs_with_proof) = transaction_outputs_with_proof {
self.storage_synchronizer.lock().apply_transaction_outputs(
transaction_outputs_with_proof,
highest_known_ledger_info,
end_of_epoch_ledger_info,
)?;
} else {
self.terminate_active_stream(
notification_id,
NotificationFeedback::PayloadTypeIsIncorrect,
)
.await?;
return Err(Error::InvalidPayload(
"Did not receive transaction outputs with proof!".into(),
));
}
bootstrapping_mode => {
unimplemented!("Bootstrapping mode not supported: {:?}", bootstrapping_mode)
}
BootstrappingMode::ExecuteTransactionsFromGenesis => {
if let Some(transaction_list_with_proof) = transaction_list_with_proof {
self.storage_synchronizer.lock().execute_transactions(
transaction_list_with_proof,
highest_known_ledger_info,
end_of_epoch_ledger_info,
)?;
} else {
self.terminate_active_stream(
notification_id,
NotificationFeedback::PayloadTypeIsIncorrect,
)
.await?;
return Err(Error::InvalidPayload(
"Did not receive transactions with proof!".into(),
));
}
};
}
bootstrapping_mode => {
unimplemented!("Bootstrapping mode not supported: {:?}", bootstrapping_mode)
}
};

// Notify listeners of committed events and transactions
self.notify_committed_events_and_transactions(
notification_id,
committed_events,
committed_transactions,
)
.await
Ok(())
}

/// Verifies the first payload version matches the version we wish to sync
Expand Down Expand Up @@ -795,39 +757,6 @@ impl<MempoolNotifier: MempoolNotificationSender, StorageSyncer: StorageSynchroni
.get_epoch_ending_ledger_info(payload_end_version))
}

/// Notifies mempool of the committed transactions and notifies the event
/// subscription service of committed events.
async fn notify_committed_events_and_transactions(
&mut self,
notification_id: NotificationId,
committed_events: Result<Vec<ContractEvent>, Error>,
committed_transactions: Vec<Transaction>,
) -> Result<(), Error> {
match committed_events {
Ok(committed_events) => {
let latest_storage_summary =
self.storage_synchronizer.lock().get_storage_summary()?;

utils::notify_committed_events_and_transactions(
&latest_storage_summary,
self.mempool_notification_handler.clone(),
committed_transactions,
self.event_subscription_service.clone(),
committed_events,
)
.await
}
Err(error) => {
self.terminate_active_stream(
notification_id,
NotificationFeedback::InvalidPayloadData,
)
.await?;
Err(error)
}
}
}

/// Returns the highest synced version in storage
fn get_highest_synced_version(&self) -> Result<Version, Error> {
let latest_storage_summary = self.storage_synchronizer.lock().get_storage_summary()?;
Expand Down
Loading

0 comments on commit e435976

Please sign in to comment.