From 835d377362935d32bb3ea268c013518a461136ae Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Wed, 24 Jul 2024 09:07:10 -0700 Subject: [PATCH] [consensus][qs] support OptQuorumStorePayload in consensus --- consensus/src/block_storage/block_store.rs | 11 ++- consensus/src/counters.rs | 9 ++ consensus/src/payload_manager.rs | 36 +++++++ consensus/src/round_manager.rs | 108 +++++++++++++-------- 4 files changed, 125 insertions(+), 39 deletions(-) diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index e01d87aafeb6d2..4d5b26f76b9550 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -36,7 +36,7 @@ use futures::executor::block_on; #[cfg(test)] use std::collections::VecDeque; #[cfg(any(test, feature = "fuzzing"))] -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::AtomicBool; use std::{sync::Arc, time::Duration}; #[cfg(test)] @@ -467,6 +467,15 @@ impl BlockStore { pub fn pending_blocks(&self) -> Arc> { self.pending_blocks.clone() } + + pub async fn wait_for_payload(&self, block: &Block) -> anyhow::Result<()> { + self.payload_manager.get_transactions(block).await?; + Ok(()) + } + + pub fn check_payload(&self, proposal: &Block) -> bool { + self.payload_manager.check_payload_availability(proposal) + } } impl BlockReader for BlockStore { diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index ba63c48f005db7..98ad522086fe84 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -1162,3 +1162,12 @@ pub static RAND_QUEUE_SIZE: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub static CONSENSUS_PROPOSAL_PAYLOAD_AVAILABILITY: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "aptos_consensus_proposal_payload_availability", + "The availability of proposal payload locally", + &["status"] + ) + .unwrap() +}); diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index 3ce5c00bdc758f..0a8e65fb9dcda1 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -45,6 +45,11 @@ pub trait TPayloadManager: Send + Sync { /// available when block is executed. fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64); + /// Check if the transactions corresponding are available. This is specific to payload + /// manager implementations. For optimistic quorum store, we only check if optimistic + /// batches are available locally. + fn check_payload_availability(&self, block: &Block) -> bool; + /// Get the transactions in a block's payload. This function returns a vector of transactions. async fn get_transactions( &self, @@ -67,6 +72,10 @@ impl TPayloadManager for DirectMempoolPayloadManager { fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {} + fn check_payload_availability(&self, _block: &Block) -> bool { + true + } + async fn get_transactions( &self, block: &Block, @@ -275,6 +284,29 @@ impl TPayloadManager for QuorumStorePayloadManager { }; } + fn check_payload_availability(&self, block: &Block) -> bool { + let Some(payload) = block.payload() else { + return true; + }; + + match payload { + Payload::DirectMempool(_) => { + unreachable!("QuorumStore doesn't support DirectMempool payload") + }, + Payload::InQuorumStore(_) => true, + Payload::InQuorumStoreWithLimit(_) => true, + Payload::QuorumStoreInlineHybrid(_, _, _) => true, + Payload::OptQuorumStore(opt_qs_payload) => { + for batch in opt_qs_payload.opt_batches().deref() { + if self.batch_reader.exists(batch.digest()).is_none() { + return false; + } + } + true + }, + } + } + async fn get_transactions( &self, block: &Block, @@ -635,6 +667,10 @@ impl TPayloadManager for ConsensusObserverPayloadManager { // noop } + fn check_payload_availability(&self, _block: &Block) -> bool { + unreachable!("this method isn't used in ConsensusObserver") + } + async fn get_transactions( &self, block: &Block, diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 92522577794b8a..3f572e2c964e9c 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -52,6 +52,7 @@ use aptos_consensus_types::{ wrapped_ledger_info::WrappedLedgerInfo, }; use aptos_crypto::HashValue; +use aptos_executor_types::ExecutorError; use aptos_infallible::{checked, Mutex}; use aptos_logger::prelude::*; #[cfg(test)] @@ -65,18 +66,19 @@ use aptos_types::{ ValidatorTxnConfig, }, randomness::RandMetadata, + transaction::SignedTransaction, validator_verifier::ValidatorVerifier, PeerId, }; use fail::fail_point; -use futures::{channel::oneshot, FutureExt, StreamExt}; +use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use futures_channel::mpsc::UnboundedReceiver; use lru::LruCache; use serde::Serialize; -use std::{mem::Discriminant, sync::Arc, time::Duration}; +use std::{mem::Discriminant, pin::Pin, sync::Arc, time::Duration}; use tokio::{ sync::oneshot as TokioOneshot, - time::{sleep, Instant}, + time::{error::Elapsed, sleep, Instant}, }; #[derive(Serialize, Clone)] @@ -250,6 +252,7 @@ pub struct RoundManager { // To avoid duplicate broadcasts for the same block, we keep track of blocks for // which we recently broadcasted fast shares. blocks_with_broadcasted_fast_shares: LruCache, + futures: FuturesUnordered, Block)> + Send>>>, } impl RoundManager { @@ -298,6 +301,7 @@ impl RoundManager { fast_rand_config, pending_order_votes: PendingOrderVotes::new(), blocks_with_broadcasted_fast_shares: LruCache::new(5), + futures: FuturesUnordered::new(), } } @@ -359,8 +363,12 @@ impl RoundManager { #[cfg(feature = "failpoints")] { if self.check_whether_to_inject_reconfiguration_error() { - self.attempt_to_inject_reconfiguration_error(&proposal_msg) - .await?; + Self::attempt_to_inject_reconfiguration_error( + self.epoch_state.clone(), + self.network.clone(), + &proposal_msg, + ) + .await?; } } self.network.broadcast_proposal(proposal_msg).await; @@ -485,23 +493,20 @@ impl RoundManager { block_parent_hash = proposal_msg.proposal().quorum_cert().certified_block().id(), ); - if self - .ensure_round_and_sync_up( + ensure!( + self.ensure_round_and_sync_up( proposal_msg.proposal().round(), proposal_msg.sync_info(), proposal_msg.proposer(), ) .await - .context("[RoundManager] Process proposal")? - { - self.process_proposal(proposal_msg.take_proposal()).await - } else { - bail!( - "Stale proposal {}, current round {}", - proposal_msg.proposal(), - self.round_state.current_round() - ); - } + .context("[RoundManager] Process proposal")?, + "Stale proposal {}, current round {}", + proposal_msg.proposal(), + self.round_state.current_round() + ); + + self.process_proposal(proposal_msg.take_proposal()).await } pub async fn process_delayed_proposal_msg(&mut self, proposal: Block) -> anyhow::Result<()> { @@ -663,7 +668,7 @@ impl RoundManager { "Planning to vote for a NIL block {}", nil_block ); counters::VOTE_NIL_COUNT.inc(); - let nil_vote = self.execute_and_vote(nil_block).await?; + let nil_vote = self.vote_block(nil_block).await?; (true, nil_vote) }, }; @@ -838,30 +843,47 @@ impl RoundManager { .insert_block(proposal.clone()) .await .context("[RoundManager] Failed to execute_and_insert the block")?; - self.resend_verified_proposal_to_self( + Self::resend_verified_proposal_to_self( + self.block_store.clone(), + self.buffered_proposal_tx.clone(), proposal, author, BACK_PRESSURE_POLLING_INTERVAL_MS, self.local_config.round_initial_timeout_ms, ) .await; - Ok(()) - } else { - counters::CONSENSUS_WITHOLD_VOTE_BACKPRESSURE_TRIGGERED.observe(0.0); + return Ok(()); + } + + counters::CONSENSUS_WITHOLD_VOTE_BACKPRESSURE_TRIGGERED.observe(0.0); + + let block_store = self.block_store.clone(); + if block_store.check_payload(&proposal) { + counters::CONSENSUS_PROPOSAL_PAYLOAD_AVAILABILITY + .with_label_values(&["available"]) + .inc(); self.process_verified_proposal(proposal).await + } else { + debug!("Payload not available locally for block: {}", proposal.id()); + counters::CONSENSUS_PROPOSAL_PAYLOAD_AVAILABILITY + .with_label_values(&["missing"]) + .inc(); + let future = + async move { (block_store.wait_for_payload(&proposal).await, proposal) }.boxed(); + self.futures.push(future); + Ok(()) } } async fn resend_verified_proposal_to_self( - &self, + block_store: Arc, + self_sender: aptos_channel::Sender, proposal: Block, author: Author, polling_interval_ms: u64, timeout_ms: u64, ) { let start = Instant::now(); - let block_store = self.block_store.clone(); - let self_sender = self.buffered_proposal_tx.clone(); let event = VerifiedEvent::VerifiedProposalMsg(Box::new(proposal)); tokio::spawn(async move { while start.elapsed() < Duration::from_millis(timeout_ms) { @@ -903,7 +925,7 @@ impl RoundManager { pub async fn process_verified_proposal(&mut self, proposal: Block) -> anyhow::Result<()> { let proposal_round = proposal.round(); let vote = self - .execute_and_vote(proposal) + .vote_block(proposal) .await .context("[RoundManager] Process proposal")?; self.round_state.record_vote(vote.clone()); @@ -930,12 +952,12 @@ impl RoundManager { } /// The function generates a VoteMsg for a given proposed_block: - /// * first execute the block and add it to the block store + /// * add the block to the block store /// * then verify the voting rules /// * save the updated state to consensus DB /// * return a VoteMsg with the LedgerInfo to be committed in case the vote gathers QC. - async fn execute_and_vote(&mut self, proposed_block: Block) -> anyhow::Result { - let executed_block = self + async fn vote_block(&mut self, proposed_block: Block) -> anyhow::Result { + let block_arc = self .block_store .insert_block(proposed_block) .await @@ -953,17 +975,17 @@ impl RoundManager { "[RoundManager] sync_only flag is set, stop voting" ); - let vote_proposal = executed_block.vote_proposal(); + let vote_proposal = block_arc.vote_proposal(); let vote_result = self.safety_rules.lock().construct_and_sign_vote_two_chain( &vote_proposal, self.block_store.highest_2chain_timeout_cert().as_deref(), ); let vote = vote_result.context(format!( "[RoundManager] SafetyRules Rejected {}", - executed_block.block() + block_arc.block() ))?; - if !executed_block.block().is_nil_block() { - observe_block(executed_block.block().timestamp_usecs(), BlockStage::VOTED); + if !block_arc.block().is_nil_block() { + observe_block(block_arc.block().timestamp_usecs(), BlockStage::VOTED); } self.storage @@ -1387,6 +1409,16 @@ impl RoundManager { } } }, + Some((result, block)) = self.futures.next() => { + match result { + Ok(_) => { + if let Err(e) = self.process_delayed_proposal_msg(block).await { + warn!("error {}", e); + } + }, + Err(err) => warn!("unable to get transactions: {}", err), + }; + }, (peer_id, event) = event_rx.select_next_some() => { let result = match event { VerifiedEvent::VoteMsg(vote_msg) => { @@ -1417,7 +1449,7 @@ impl RoundManager { warn!(error = ?e, kind = error_kind(&e), RoundStateLogSchema::new(round_state)); } } - } + }, } } info!(epoch = self.epoch_state().epoch, "RoundManager stopped"); @@ -1436,7 +1468,8 @@ impl RoundManager { /// It's only enabled with fault injection (failpoints feature). #[cfg(feature = "failpoints")] async fn attempt_to_inject_reconfiguration_error( - &self, + epoch_state: Arc, + network: Arc, proposal_msg: &ProposalMsg, ) -> anyhow::Result<()> { let block_data = proposal_msg.proposal().block_data(); @@ -1449,13 +1482,12 @@ impl RoundManager { block_data.round() == block_data.quorum_cert().certified_block().round() + 1; let should_inject = direct_suffix && continuous_round; if should_inject { - let mut half_peers: Vec<_> = self - .epoch_state + let mut half_peers: Vec<_> = epoch_state .verifier .get_ordered_account_addresses_iter() .collect(); half_peers.truncate(half_peers.len() / 2); - self.network + network .send_proposal(proposal_msg.clone(), half_peers) .await; Err(anyhow::anyhow!("Injected error in reconfiguration suffix"))