Skip to content

Commit

Permalink
[State Sync] Replace Box with Arc for ChunkExector.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and bors-libra committed Dec 22, 2021
1 parent 4c2f5fb commit 9eede8d
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 63 deletions.
18 changes: 5 additions & 13 deletions diem-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use diem_vm::DiemVM;
use diemdb::DiemDB;
use event_notifications::EventSubscriptionService;
use executor::{chunk_executor::ChunkExecutor, db_bootstrapper::maybe_bootstrap};
use executor_types::ChunkExecutorTrait;
use futures::channel::mpsc::channel;
use mempool_notifications::MempoolNotificationSender;
use network::application::storage::PeerMetadataStorage;
Expand Down Expand Up @@ -231,10 +230,6 @@ fn fetch_chain_id(db: &DbReaderWriter) -> ChainId {
.chain_id()
}

fn setup_chunk_executor(db: DbReaderWriter) -> Box<dyn ChunkExecutorTrait> {
Box::new(ChunkExecutor::<DiemVM>::new(db).unwrap())
}

fn setup_debug_interface(config: &NodeConfig, logger: Option<Arc<Logger>>) -> NodeDebugService {
let addr = format!(
"{}:{}",
Expand All @@ -259,7 +254,6 @@ fn create_state_sync_runtimes<M: MempoolNotificationSender + 'static>(
peer_metadata_storage: Arc<PeerMetadataStorage>,
mempool_notifier: M,
consensus_listener: ConsensusNotificationListener,
chunk_executor: Box<dyn ChunkExecutorTrait>,
waypoint: Waypoint,
event_subscription_service: EventSubscriptionService,
db_rw: DbReaderWriter,
Expand All @@ -285,6 +279,11 @@ fn create_state_sync_runtimes<M: MempoolNotificationSender + 'static>(
diem_data_client.clone(),
);

// Create the chunk executor
let chunk_executor = Arc::new(
ChunkExecutor::<DiemVM>::new(db_rw.clone()).expect("Unable to create the chunk executor!"),
);

// Create the state sync multiplexer
let state_sync_multiplexer = StateSyncMultiplexer::new(
state_sync_network_handles,
Expand Down Expand Up @@ -474,12 +473,6 @@ pub fn setup_environment(node_config: &NodeConfig, logger: Option<Arc<Logger>>)
instant.elapsed().as_millis()
);

instant = Instant::now();
let chunk_executor = setup_chunk_executor(db_rw.clone());
debug!(
"ChunkExecutor setup in {} ms",
instant.elapsed().as_millis()
);
let chain_id = fetch_chain_id(&db_rw);
let mut network_runtimes = vec![];
let mut state_sync_network_handles = vec![];
Expand Down Expand Up @@ -618,7 +611,6 @@ pub fn setup_environment(node_config: &NodeConfig, logger: Option<Arc<Logger>>)
peer_metadata_storage.clone(),
mempool_notifier,
consensus_listener,
chunk_executor,
genesis_waypoint,
event_subscription_service,
db_rw.clone(),
Expand Down
9 changes: 5 additions & 4 deletions state-sync/state-sync-v1/src/bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use event_notifications::EventSubscriptionService;
use executor_types::ChunkExecutorTrait;
use futures::channel::mpsc;
use mempool_notifications::MempoolNotificationSender;
use std::{boxed::Box, collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc};
use storage_interface::DbReader;
use tokio::runtime::{Builder, Runtime};

Expand All @@ -25,12 +25,12 @@ pub struct StateSyncBootstrapper {
}

impl StateSyncBootstrapper {
pub fn bootstrap<M: MempoolNotificationSender + 'static>(
pub fn bootstrap<C: ChunkExecutorTrait + 'static, M: MempoolNotificationSender + 'static>(
network: Vec<(NetworkId, StateSyncSender, StateSyncEvents)>,
mempool_notifier: M,
consensus_listener: ConsensusNotificationListener,
storage: Arc<dyn DbReader>,
executor: Box<dyn ChunkExecutorTrait>,
chunk_executor: Arc<C>,
node_config: &NodeConfig,
waypoint: Waypoint,
event_subscription_service: EventSubscriptionService,
Expand All @@ -42,7 +42,8 @@ impl StateSyncBootstrapper {
.build()
.expect("[State Sync] Failed to create runtime!");

let executor_proxy = ExecutorProxy::new(storage, executor, event_subscription_service);
let executor_proxy =
ExecutorProxy::new(storage, chunk_executor, event_subscription_service);

Self::bootstrap_with_executor_proxy(
runtime,
Expand Down
17 changes: 9 additions & 8 deletions state-sync/state-sync-v1/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,6 +1776,7 @@ mod tests {
waypoint::Waypoint,
PeerId,
};
use executor_types::ChunkExecutorTrait;
use futures::{channel::oneshot, executor::block_on};
use mempool_notifications::MempoolNotifier;
use netcore::transport::ConnectionOrigin;
Expand Down Expand Up @@ -2505,8 +2506,8 @@ mod tests {
vec![waypoint_response, target_response, highest_response]
}

fn verify_all_chunk_requests_are_invalid(
coordinator: &mut StateSyncCoordinator<ExecutorProxy, MempoolNotifier>,
fn verify_all_chunk_requests_are_invalid<C: ChunkExecutorTrait>(
coordinator: &mut StateSyncCoordinator<ExecutorProxy<C>, MempoolNotifier>,
peer_network_id: &PeerNetworkId,
requests: &[StateSyncMessage],
) {
Expand All @@ -2520,8 +2521,8 @@ mod tests {
}
}

fn verify_all_chunk_responses_are_invalid(
coordinator: &mut StateSyncCoordinator<ExecutorProxy, MempoolNotifier>,
fn verify_all_chunk_responses_are_invalid<C: ChunkExecutorTrait>(
coordinator: &mut StateSyncCoordinator<ExecutorProxy<C>, MempoolNotifier>,
peer_network_id: &PeerNetworkId,
responses: &[StateSyncMessage],
) {
Expand All @@ -2535,8 +2536,8 @@ mod tests {
}
}

fn verify_all_chunk_responses_are_the_wrong_type(
coordinator: &mut StateSyncCoordinator<ExecutorProxy, MempoolNotifier>,
fn verify_all_chunk_responses_are_the_wrong_type<C: ChunkExecutorTrait>(
coordinator: &mut StateSyncCoordinator<ExecutorProxy<C>, MempoolNotifier>,
peer_network_id: &PeerNetworkId,
responses: &[StateSyncMessage],
) {
Expand All @@ -2550,8 +2551,8 @@ mod tests {
}
}

fn process_new_peer_event(
coordinator: &mut StateSyncCoordinator<ExecutorProxy, MempoolNotifier>,
fn process_new_peer_event<C: ChunkExecutorTrait>(
coordinator: &mut StateSyncCoordinator<ExecutorProxy<C>, MempoolNotifier>,
peer: &PeerNetworkId,
) {
let connection_metadata = ConnectionMetadata::mock_with_role_and_origin(
Expand Down
22 changes: 11 additions & 11 deletions state-sync/state-sync-v1/src/executor_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,27 +52,27 @@ pub trait ExecutorProxyTrait: Send {
fn publish_event_notifications(&mut self, events: Vec<ContractEvent>) -> Result<(), Error>;
}

pub(crate) struct ExecutorProxy {
pub(crate) struct ExecutorProxy<C> {
storage: Arc<dyn DbReader>,
executor: Box<dyn ChunkExecutorTrait>,
chunk_executor: Arc<C>,
event_subscription_service: EventSubscriptionService,
}

impl ExecutorProxy {
impl<C: ChunkExecutorTrait> ExecutorProxy<C> {
pub(crate) fn new(
storage: Arc<dyn DbReader>,
executor: Box<dyn ChunkExecutorTrait>,
chunk_executor: Arc<C>,
event_subscription_service: EventSubscriptionService,
) -> Self {
Self {
storage,
executor,
chunk_executor,
event_subscription_service,
}
}
}

impl ExecutorProxyTrait for ExecutorProxy {
impl<C: ChunkExecutorTrait> ExecutorProxyTrait for ExecutorProxy<C> {
fn get_local_storage_state(&self) -> Result<SyncState, Error> {
let storage_info = self.storage.get_startup_info().map_err(|error| {
Error::UnexpectedError(format!(
Expand Down Expand Up @@ -106,7 +106,7 @@ impl ExecutorProxyTrait for ExecutorProxy {
// track chunk execution time
let timer = counters::EXECUTE_CHUNK_DURATION.start_timer();
let events = self
.executor
.chunk_executor
.execute_and_commit_chunk(
txn_list_with_proof,
&verified_target_li,
Expand Down Expand Up @@ -377,7 +377,7 @@ mod tests {
.unwrap();

// Create an executor proxy
let chunk_executor = Box::new(ChunkExecutor::<DiemVM>::new(db_rw).unwrap());
let chunk_executor = Arc::new(ChunkExecutor::<DiemVM>::new(db_rw).unwrap());
let mut executor_proxy = ExecutorProxy::new(db, chunk_executor, event_subscription_service);

// Publish a subscribed event
Expand Down Expand Up @@ -579,7 +579,7 @@ mod tests {
.unwrap();

// Create an executor
let chunk_executor = Box::new(ChunkExecutor::<DiemVM>::new(db_rw.clone()).unwrap());
let chunk_executor = Arc::new(ChunkExecutor::<DiemVM>::new(db_rw.clone()).unwrap());
let mut executor_proxy = ExecutorProxy::new(db, chunk_executor, event_subscription_service);

// Verify that the initial configs returned to the subscriber don't contain the unknown on-chain config
Expand Down Expand Up @@ -622,7 +622,7 @@ mod tests {
) -> (
Vec<TestValidator>,
Box<BlockExecutor<DiemVM>>,
ExecutorProxy,
ExecutorProxy<ChunkExecutor<DiemVM>>,
ReconfigNotificationListener,
) {
// Generate a genesis change set
Expand Down Expand Up @@ -662,7 +662,7 @@ mod tests {

// Create the executors
let block_executor = Box::new(BlockExecutor::<DiemVM>::new(db_rw.clone()));
let chunk_executor = Box::new(ChunkExecutor::<DiemVM>::new(db_rw).unwrap());
let chunk_executor = Arc::new(ChunkExecutor::<DiemVM>::new(db_rw).unwrap());
let executor_proxy = ExecutorProxy::new(db, chunk_executor, event_subscription_service);

(
Expand Down
7 changes: 5 additions & 2 deletions state-sync/state-sync-v1/src/fuzzing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use diem_infallible::Mutex;
use diem_types::{
ledger_info::LedgerInfoWithSignatures, transaction::TransactionListWithProof, PeerId,
};
use diem_vm::DiemVM;
use executor::chunk_executor::ChunkExecutor;
use futures::executor::block_on;
use mempool_notifications::MempoolNotifier;
use once_cell::sync::Lazy;
Expand All @@ -24,8 +26,9 @@ use proptest::{
strategy::Strategy,
};

static STATE_SYNC_COORDINATOR: Lazy<Mutex<StateSyncCoordinator<ExecutorProxy, MempoolNotifier>>> =
Lazy::new(|| Mutex::new(test_utils::create_validator_coordinator()));
static STATE_SYNC_COORDINATOR: Lazy<
Mutex<StateSyncCoordinator<ExecutorProxy<ChunkExecutor<DiemVM>>, MempoolNotifier>>,
> = Lazy::new(|| Mutex::new(test_utils::create_validator_coordinator()));

proptest! {
#![proptest_config(ProptestConfig::with_cases(10))]
Expand Down
12 changes: 6 additions & 6 deletions state-sync/state-sync-v1/src/shared_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ pub(crate) mod test_utils {
pub(crate) fn create_coordinator_with_config_and_waypoint(
node_config: NodeConfig,
waypoint: Waypoint,
) -> StateSyncCoordinator<ExecutorProxy, MempoolNotifier> {
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<DiemVM>>, MempoolNotifier> {
create_state_sync_coordinator_for_tests(node_config, waypoint, false)
}

pub(crate) fn create_validator_coordinator(
) -> StateSyncCoordinator<ExecutorProxy, MempoolNotifier> {
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<DiemVM>>, MempoolNotifier> {
let mut node_config = NodeConfig::default();
node_config.base.role = RoleType::Validator;

Expand All @@ -130,7 +130,7 @@ pub(crate) mod test_utils {

#[cfg(test)]
pub(crate) fn create_full_node_coordinator(
) -> StateSyncCoordinator<ExecutorProxy, MempoolNotifier> {
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<DiemVM>>, MempoolNotifier> {
let mut node_config = NodeConfig::default();
node_config.base.role = RoleType::FullNode;

Expand All @@ -139,7 +139,7 @@ pub(crate) mod test_utils {

#[cfg(test)]
pub(crate) fn create_read_only_coordinator(
) -> StateSyncCoordinator<ExecutorProxy, MempoolNotifier> {
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<DiemVM>>, MempoolNotifier> {
let mut node_config = NodeConfig::default();
node_config.base.role = RoleType::Validator;

Expand All @@ -150,7 +150,7 @@ pub(crate) mod test_utils {
node_config: NodeConfig,
waypoint: Waypoint,
read_only_mode: bool,
) -> StateSyncCoordinator<ExecutorProxy, MempoolNotifier> {
) -> StateSyncCoordinator<ExecutorProxy<ChunkExecutor<DiemVM>>, MempoolNotifier> {
// Generate a genesis change set
let (genesis, _) = vm_genesis::test_genesis_change_set_and_validators(Some(1));

Expand All @@ -175,7 +175,7 @@ pub(crate) mod test_utils {
.unwrap();

// Create executor proxy
let chunk_executor = Box::new(ChunkExecutor::<DiemVM>::new(db_rw).unwrap());
let chunk_executor = Arc::new(ChunkExecutor::<DiemVM>::new(db_rw).unwrap());
let executor_proxy = ExecutorProxy::new(db, chunk_executor, event_subscription_service);

// Get initial state
Expand Down
11 changes: 7 additions & 4 deletions state-sync/state-sync-v2/state-sync-driver/src/driver_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use event_notifications::EventSubscriptionService;
use executor_types::ChunkExecutorTrait;
use futures::channel::mpsc;
use mempool_notifications::MempoolNotificationSender;
use std::{boxed::Box, sync::Arc};
use std::sync::Arc;
use storage_interface::DbReaderWriter;
use tokio::runtime::{Builder, Runtime};

Expand All @@ -29,13 +29,16 @@ pub struct DriverFactory {

impl DriverFactory {
/// Creates and spawns a new state sync driver
pub fn create_and_spawn_driver<M: MempoolNotificationSender + 'static>(
pub fn create_and_spawn_driver<
ChunkExecutor: ChunkExecutorTrait + 'static,
MempoolNotifier: MempoolNotificationSender + 'static,
>(
create_runtime: bool,
node_config: &NodeConfig,
waypoint: Waypoint,
storage: DbReaderWriter,
chunk_executor: Box<dyn ChunkExecutorTrait>,
mempool_notification_sender: M,
chunk_executor: Arc<ChunkExecutor>,
mempool_notification_sender: MempoolNotifier,
consensus_listener: ConsensusNotificationListener,
event_subscription_service: EventSubscriptionService,
diem_data_client: DiemNetDataClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,13 @@ pub trait StorageSynchronizerInterface {
}

/// The implementation of the `StorageSynchronizerInterface` used by state sync
pub struct StorageSynchronizer {
chunk_executor: Box<dyn ChunkExecutorTrait>,
pub struct StorageSynchronizer<ChunkExecutor> {
chunk_executor: Arc<ChunkExecutor>,
storage: Arc<RwLock<DbReaderWriter>>,
}

impl StorageSynchronizer {
pub fn new(
chunk_executor: Box<dyn ChunkExecutorTrait>,
storage: Arc<RwLock<DbReaderWriter>>,
) -> Self {
impl<ChunkExecutor: ChunkExecutorTrait> StorageSynchronizer<ChunkExecutor> {
pub fn new(chunk_executor: Arc<ChunkExecutor>, storage: Arc<RwLock<DbReaderWriter>>) -> Self {
Self {
chunk_executor,
storage,
Expand Down Expand Up @@ -125,7 +122,9 @@ impl StorageSynchronizer {
}
}

impl StorageSynchronizerInterface for StorageSynchronizer {
impl<ChunkExecutor: ChunkExecutorTrait> StorageSynchronizerInterface
for StorageSynchronizer<ChunkExecutor>
{
fn apply_and_commit_transaction_outputs(
&mut self,
output_list_with_proof: TransactionOutputListWithProof,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn create_driver_for_tests(
mempool_notifications::new_mempool_notifier_listener_pair();

// Create the chunk executor
let chunk_executor = Box::new(ChunkExecutor::<DiemVM>::new(db_rw.clone()).unwrap());
let chunk_executor = Arc::new(ChunkExecutor::<DiemVM>::new(db_rw.clone()).unwrap());

// Create a streaming service client
let (streaming_service_client, _) = new_streaming_service_client_listener_pair();
Expand Down
Loading

0 comments on commit 9eede8d

Please sign in to comment.