Skip to content

Commit

Permalink
[consensus][qs] support OptQuorumStorePayload in consensus
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jul 29, 2024
1 parent 2a201c8 commit 835d377
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 39 deletions.
11 changes: 10 additions & 1 deletion consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use futures::executor::block_on;
#[cfg(test)]
use std::collections::VecDeque;
#[cfg(any(test, feature = "fuzzing"))]
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::AtomicBool;
use std::{sync::Arc, time::Duration};

#[cfg(test)]
Expand Down Expand Up @@ -467,6 +467,15 @@ impl BlockStore {
pub fn pending_blocks(&self) -> Arc<Mutex<PendingBlocks>> {
self.pending_blocks.clone()
}

pub async fn wait_for_payload(&self, block: &Block) -> anyhow::Result<()> {
self.payload_manager.get_transactions(block).await?;
Ok(())
}

pub fn check_payload(&self, proposal: &Block) -> bool {
self.payload_manager.check_payload_availability(proposal)
}
}

impl BlockReader for BlockStore {
Expand Down
9 changes: 9 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,3 +1162,12 @@ pub static RAND_QUEUE_SIZE: Lazy<IntGauge> = Lazy::new(|| {
)
.unwrap()
});

pub static CONSENSUS_PROPOSAL_PAYLOAD_AVAILABILITY: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"aptos_consensus_proposal_payload_availability",
"The availability of proposal payload locally",
&["status"]
)
.unwrap()
});
36 changes: 36 additions & 0 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ pub trait TPayloadManager: Send + Sync {
/// available when block is executed.
fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64);

/// Check if the transactions corresponding are available. This is specific to payload
/// manager implementations. For optimistic quorum store, we only check if optimistic
/// batches are available locally.
fn check_payload_availability(&self, block: &Block) -> bool;

/// Get the transactions in a block's payload. This function returns a vector of transactions.
async fn get_transactions(
&self,
Expand All @@ -67,6 +72,10 @@ impl TPayloadManager for DirectMempoolPayloadManager {

fn prefetch_payload_data(&self, _payload: &Payload, _timestamp: u64) {}

fn check_payload_availability(&self, _block: &Block) -> bool {
true
}

async fn get_transactions(
&self,
block: &Block,
Expand Down Expand Up @@ -275,6 +284,29 @@ impl TPayloadManager for QuorumStorePayloadManager {
};
}

fn check_payload_availability(&self, block: &Block) -> bool {
let Some(payload) = block.payload() else {
return true;
};

match payload {
Payload::DirectMempool(_) => {
unreachable!("QuorumStore doesn't support DirectMempool payload")
},
Payload::InQuorumStore(_) => true,
Payload::InQuorumStoreWithLimit(_) => true,
Payload::QuorumStoreInlineHybrid(_, _, _) => true,
Payload::OptQuorumStore(opt_qs_payload) => {
for batch in opt_qs_payload.opt_batches().deref() {
if self.batch_reader.exists(batch.digest()).is_none() {
return false;
}
}
true
},
}
}

async fn get_transactions(
&self,
block: &Block,
Expand Down Expand Up @@ -635,6 +667,10 @@ impl TPayloadManager for ConsensusObserverPayloadManager {
// noop
}

fn check_payload_availability(&self, _block: &Block) -> bool {
unreachable!("this method isn't used in ConsensusObserver")
}

async fn get_transactions(
&self,
block: &Block,
Expand Down
108 changes: 70 additions & 38 deletions consensus/src/round_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use aptos_consensus_types::{
wrapped_ledger_info::WrappedLedgerInfo,
};
use aptos_crypto::HashValue;
use aptos_executor_types::ExecutorError;
use aptos_infallible::{checked, Mutex};
use aptos_logger::prelude::*;
#[cfg(test)]
Expand All @@ -65,18 +66,19 @@ use aptos_types::{
ValidatorTxnConfig,
},
randomness::RandMetadata,
transaction::SignedTransaction,
validator_verifier::ValidatorVerifier,
PeerId,
};
use fail::fail_point;
use futures::{channel::oneshot, FutureExt, StreamExt};
use futures::{channel::oneshot, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use futures_channel::mpsc::UnboundedReceiver;
use lru::LruCache;
use serde::Serialize;
use std::{mem::Discriminant, sync::Arc, time::Duration};
use std::{mem::Discriminant, pin::Pin, sync::Arc, time::Duration};
use tokio::{
sync::oneshot as TokioOneshot,
time::{sleep, Instant},
time::{error::Elapsed, sleep, Instant},
};

#[derive(Serialize, Clone)]
Expand Down Expand Up @@ -250,6 +252,7 @@ pub struct RoundManager {
// To avoid duplicate broadcasts for the same block, we keep track of blocks for
// which we recently broadcasted fast shares.
blocks_with_broadcasted_fast_shares: LruCache<HashValue, ()>,
futures: FuturesUnordered<Pin<Box<dyn Future<Output = (anyhow::Result<()>, Block)> + Send>>>,
}

impl RoundManager {
Expand Down Expand Up @@ -298,6 +301,7 @@ impl RoundManager {
fast_rand_config,
pending_order_votes: PendingOrderVotes::new(),
blocks_with_broadcasted_fast_shares: LruCache::new(5),
futures: FuturesUnordered::new(),
}
}

Expand Down Expand Up @@ -359,8 +363,12 @@ impl RoundManager {
#[cfg(feature = "failpoints")]
{
if self.check_whether_to_inject_reconfiguration_error() {
self.attempt_to_inject_reconfiguration_error(&proposal_msg)
.await?;
Self::attempt_to_inject_reconfiguration_error(
self.epoch_state.clone(),
self.network.clone(),
&proposal_msg,
)
.await?;
}
}
self.network.broadcast_proposal(proposal_msg).await;
Expand Down Expand Up @@ -485,23 +493,20 @@ impl RoundManager {
block_parent_hash = proposal_msg.proposal().quorum_cert().certified_block().id(),
);

if self
.ensure_round_and_sync_up(
ensure!(
self.ensure_round_and_sync_up(
proposal_msg.proposal().round(),
proposal_msg.sync_info(),
proposal_msg.proposer(),
)
.await
.context("[RoundManager] Process proposal")?
{
self.process_proposal(proposal_msg.take_proposal()).await
} else {
bail!(
"Stale proposal {}, current round {}",
proposal_msg.proposal(),
self.round_state.current_round()
);
}
.context("[RoundManager] Process proposal")?,
"Stale proposal {}, current round {}",
proposal_msg.proposal(),
self.round_state.current_round()
);

self.process_proposal(proposal_msg.take_proposal()).await
}

pub async fn process_delayed_proposal_msg(&mut self, proposal: Block) -> anyhow::Result<()> {
Expand Down Expand Up @@ -663,7 +668,7 @@ impl RoundManager {
"Planning to vote for a NIL block {}", nil_block
);
counters::VOTE_NIL_COUNT.inc();
let nil_vote = self.execute_and_vote(nil_block).await?;
let nil_vote = self.vote_block(nil_block).await?;
(true, nil_vote)
},
};
Expand Down Expand Up @@ -838,30 +843,47 @@ impl RoundManager {
.insert_block(proposal.clone())
.await
.context("[RoundManager] Failed to execute_and_insert the block")?;
self.resend_verified_proposal_to_self(
Self::resend_verified_proposal_to_self(
self.block_store.clone(),
self.buffered_proposal_tx.clone(),
proposal,
author,
BACK_PRESSURE_POLLING_INTERVAL_MS,
self.local_config.round_initial_timeout_ms,
)
.await;
Ok(())
} else {
counters::CONSENSUS_WITHOLD_VOTE_BACKPRESSURE_TRIGGERED.observe(0.0);
return Ok(());
}

counters::CONSENSUS_WITHOLD_VOTE_BACKPRESSURE_TRIGGERED.observe(0.0);

let block_store = self.block_store.clone();
if block_store.check_payload(&proposal) {
counters::CONSENSUS_PROPOSAL_PAYLOAD_AVAILABILITY
.with_label_values(&["available"])
.inc();
self.process_verified_proposal(proposal).await
} else {
debug!("Payload not available locally for block: {}", proposal.id());
counters::CONSENSUS_PROPOSAL_PAYLOAD_AVAILABILITY
.with_label_values(&["missing"])
.inc();
let future =
async move { (block_store.wait_for_payload(&proposal).await, proposal) }.boxed();
self.futures.push(future);
Ok(())
}
}

async fn resend_verified_proposal_to_self(
&self,
block_store: Arc<BlockStore>,
self_sender: aptos_channel::Sender<Author, VerifiedEvent>,
proposal: Block,
author: Author,
polling_interval_ms: u64,
timeout_ms: u64,
) {
let start = Instant::now();
let block_store = self.block_store.clone();
let self_sender = self.buffered_proposal_tx.clone();
let event = VerifiedEvent::VerifiedProposalMsg(Box::new(proposal));
tokio::spawn(async move {
while start.elapsed() < Duration::from_millis(timeout_ms) {
Expand Down Expand Up @@ -903,7 +925,7 @@ impl RoundManager {
pub async fn process_verified_proposal(&mut self, proposal: Block) -> anyhow::Result<()> {
let proposal_round = proposal.round();
let vote = self
.execute_and_vote(proposal)
.vote_block(proposal)
.await
.context("[RoundManager] Process proposal")?;
self.round_state.record_vote(vote.clone());
Expand All @@ -930,12 +952,12 @@ impl RoundManager {
}

/// The function generates a VoteMsg for a given proposed_block:
/// * first execute the block and add it to the block store
/// * add the block to the block store
/// * then verify the voting rules
/// * save the updated state to consensus DB
/// * return a VoteMsg with the LedgerInfo to be committed in case the vote gathers QC.
async fn execute_and_vote(&mut self, proposed_block: Block) -> anyhow::Result<Vote> {
let executed_block = self
async fn vote_block(&mut self, proposed_block: Block) -> anyhow::Result<Vote> {
let block_arc = self
.block_store
.insert_block(proposed_block)
.await
Expand All @@ -953,17 +975,17 @@ impl RoundManager {
"[RoundManager] sync_only flag is set, stop voting"
);

let vote_proposal = executed_block.vote_proposal();
let vote_proposal = block_arc.vote_proposal();
let vote_result = self.safety_rules.lock().construct_and_sign_vote_two_chain(
&vote_proposal,
self.block_store.highest_2chain_timeout_cert().as_deref(),
);
let vote = vote_result.context(format!(
"[RoundManager] SafetyRules Rejected {}",
executed_block.block()
block_arc.block()
))?;
if !executed_block.block().is_nil_block() {
observe_block(executed_block.block().timestamp_usecs(), BlockStage::VOTED);
if !block_arc.block().is_nil_block() {
observe_block(block_arc.block().timestamp_usecs(), BlockStage::VOTED);
}

self.storage
Expand Down Expand Up @@ -1387,6 +1409,16 @@ impl RoundManager {
}
}
},
Some((result, block)) = self.futures.next() => {
match result {
Ok(_) => {
if let Err(e) = self.process_delayed_proposal_msg(block).await {
warn!("error {}", e);
}
},
Err(err) => warn!("unable to get transactions: {}", err),
};
},
(peer_id, event) = event_rx.select_next_some() => {
let result = match event {
VerifiedEvent::VoteMsg(vote_msg) => {
Expand Down Expand Up @@ -1417,7 +1449,7 @@ impl RoundManager {
warn!(error = ?e, kind = error_kind(&e), RoundStateLogSchema::new(round_state));
}
}
}
},
}
}
info!(epoch = self.epoch_state().epoch, "RoundManager stopped");
Expand All @@ -1436,7 +1468,8 @@ impl RoundManager {
/// It's only enabled with fault injection (failpoints feature).
#[cfg(feature = "failpoints")]
async fn attempt_to_inject_reconfiguration_error(
&self,
epoch_state: Arc<EpochState>,
network: Arc<NetworkSender>,
proposal_msg: &ProposalMsg,
) -> anyhow::Result<()> {
let block_data = proposal_msg.proposal().block_data();
Expand All @@ -1449,13 +1482,12 @@ impl RoundManager {
block_data.round() == block_data.quorum_cert().certified_block().round() + 1;
let should_inject = direct_suffix && continuous_round;
if should_inject {
let mut half_peers: Vec<_> = self
.epoch_state
let mut half_peers: Vec<_> = epoch_state
.verifier
.get_ordered_account_addresses_iter()
.collect();
half_peers.truncate(half_peers.len() / 2);
self.network
network
.send_proposal(proposal_msg.clone(), half_peers)
.await;
Err(anyhow::anyhow!("Injected error in reconfiguration suffix"))
Expand Down

0 comments on commit 835d377

Please sign in to comment.