Skip to content

Commit

Permalink
[qs] Support for OptQS Payload in proof manager
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jul 29, 2024
1 parent d6bffb4 commit 1c5d7e7
Show file tree
Hide file tree
Showing 10 changed files with 199 additions and 98 deletions.
2 changes: 2 additions & 0 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub struct QuorumStoreConfig {
pub num_workers_for_remote_batches: usize,
pub batch_buckets: Vec<u64>,
pub allow_batches_without_pos_in_proposal: bool,
pub enable_opt_quorum_store: bool,
}

impl Default for QuorumStoreConfig {
Expand Down Expand Up @@ -129,6 +130,7 @@ impl Default for QuorumStoreConfig {
num_workers_for_remote_batches: 10,
batch_buckets: DEFAULT_BUCKETS.to_vec(),
allow_batches_without_pos_in_proposal: true,
enable_opt_quorum_store: false,
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
payload::OptQuorumStorePayload,
payload::{OptQuorumStorePayload, PayloadExecutionLimit},
proof_of_store::{BatchInfo, ProofCache, ProofOfStore},
};
use anyhow::bail;
Expand Down Expand Up @@ -280,8 +280,15 @@ impl Payload {
Payload::DirectMempool(_) => {
panic!("Payload is in direct mempool format");
},
Payload::OptQuorumStore(_) => {
unreachable!("OptQuorumStore Payload is incompatible with QuorumStoreV2");
Payload::OptQuorumStore(mut opt_qs_payload) => {
let execution_limits = match max_txns_to_execute {
Some(max_txns_to_execute) => {
PayloadExecutionLimit::MaxTransactionsToExecute(max_txns_to_execute)
},
None => PayloadExecutionLimit::None,
};
opt_qs_payload.set_execution_limit(execution_limits);
Payload::OptQuorumStore(opt_qs_payload)
},
}
}
Expand Down
44 changes: 44 additions & 0 deletions consensus/consensus-types/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ where
}
}

impl<T> From<Vec<T>> for BatchPointer<T>
where
T: TDataInfo,
{
fn from(value: Vec<T>) -> Self {
Self {
batch_summary: value,
status: Arc::new(Mutex::new(None)),
}
}
}

impl<T: PartialEq> PartialEq for BatchPointer<T> {
fn eq(&self, other: &Self) -> bool {
self.batch_summary == other.batch_summary
Expand Down Expand Up @@ -125,6 +137,15 @@ pub struct InlineBatch {
transactions: Vec<SignedTransaction>,
}

impl InlineBatch {
pub fn new(batch_info: BatchInfo, transactions: Vec<SignedTransaction>) -> Self {
Self {
batch_info,
transactions,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct InlineBatches(Vec<InlineBatch>);

Expand Down Expand Up @@ -162,6 +183,12 @@ impl InlineBatches {
}
}

impl From<Vec<InlineBatch>> for InlineBatches {
fn from(value: Vec<InlineBatch>) -> Self {
Self(value)
}
}

impl Deref for InlineBatches {
type Target = Vec<InlineBatch>;

Expand Down Expand Up @@ -215,6 +242,19 @@ pub enum OptQuorumStorePayload {
}

impl OptQuorumStorePayload {
pub fn new(
inline_batches: InlineBatches,
opt_batches: BatchPointer<BatchInfo>,
proofs: BatchPointer<ProofOfStore>,
) -> Self {
Self::V1(OptQuorumStorePayloadV1 {
inline_batches,
opt_batches,
proofs,
execution_limits: PayloadExecutionLimit::None,
})
}

pub(crate) fn num_txns(&self) -> usize {
self.opt_batches.num_txns() + self.proofs.num_txns() + self.inline_batches.num_txns()
}
Expand Down Expand Up @@ -253,6 +293,10 @@ impl OptQuorumStorePayload {
pub fn opt_batches(&self) -> &BatchPointer<BatchInfo> {
&self.opt_batches
}

pub fn set_execution_limit(&mut self, execution_limits: PayloadExecutionLimit) {
self.execution_limits = execution_limits;
}
}

impl Deref for OptQuorumStorePayload {
Expand Down
9 changes: 8 additions & 1 deletion consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::payload::TDataInfo;
use crate::{payload::TDataInfo, utils::PayloadTxnsSize};
use anyhow::{bail, ensure, Context};
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
Expand Down Expand Up @@ -131,6 +131,13 @@ impl BatchInfo {
self.num_bytes
}

pub fn size(&self) -> PayloadTxnsSize {
PayloadTxnsSize {
count: self.num_txns,
bytes: self.num_bytes,
}
}

pub fn gas_bucket_start(&self) -> u64 {
self.gas_bucket_start
}
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,8 @@ impl BlockStore {

#[cfg(any(test, feature = "fuzzing"))]
pub fn set_back_pressure_for_test(&self, back_pressure: bool) {
use std::sync::atomic::Ordering;

self.back_pressure_for_test
.store(back_pressure, Ordering::Relaxed)
}
Expand All @@ -464,6 +466,7 @@ impl BlockStore {
fn vote_back_pressure(&self) -> bool {
#[cfg(any(test, feature = "fuzzing"))]
{
use std::sync::atomic::Ordering;
if self.back_pressure_for_test.load(Ordering::Relaxed) {
return true;
}
Expand Down
39 changes: 25 additions & 14 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ use crate::{
},
DAGRpcResult, RpcHandler,
},
payload_client::PayloadClient,
payload_client::{PayloadClient, PayloadPullParameters},
};
use anyhow::{bail, ensure};
use aptos_collections::BoundedVecDeque;
use aptos_config::config::DagPayloadConfig;
use aptos_consensus_types::common::{Author, Payload, PayloadFilter};
use aptos_consensus_types::{
common::{Author, Payload, PayloadFilter},
utils::PayloadTxnsSize,
};
use aptos_crypto::hash::CryptoHash;
use aptos_infallible::Mutex;
use aptos_logger::{debug, error};
Expand Down Expand Up @@ -255,20 +258,28 @@ impl DagDriver {
let (validator_txns, payload) = match self
.payload_client
.pull_payload(
Duration::from_millis(self.payload_config.payload_pull_max_poll_time_ms),
max_txns,
max_txns,
max_size_bytes,
// TODO: Set max_inline_items and max_inline_bytes correctly
100,
100 * 1024,
PayloadPullParameters {
max_poll_time: Duration::from_millis(
self.payload_config.payload_pull_max_poll_time_ms,
),
max_txns: PayloadTxnsSize {
count: max_txns,
bytes: max_size_bytes,
},
max_unique_txns: max_txns,
max_inline_txns: PayloadTxnsSize {
count: 100,
bytes: 100 * 1024,
},
opt_batch_txns_pct: 0,
user_txn_filter: payload_filter,
pending_ordering: false,
pending_uncommitted_blocks: 0,
recent_max_fill_fraction: 0.0,
block_timestamp: self.time_service.now_unix_time(),
},
sys_payload_filter,
payload_filter,
Box::pin(async {}),
false,
0,
0.0,
self.time_service.now_unix_time(),
)
.await
{
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/quorum_store/batch_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,12 @@ impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
&self,
digest: HashValue,
expiration: u64,
signers: Vec<PeerId>,
responders: Vec<PeerId>,
ret_tx: oneshot::Sender<ExecutorResult<Vec<SignedTransaction>>>,
mut subscriber_rx: oneshot::Receiver<PersistedValue>,
) -> Option<(BatchInfo, Vec<SignedTransaction>)> {
let validator_verifier = self.validator_verifier.clone();
let mut request_state = BatchRequesterState::new(signers, ret_tx, self.retry_limit);
let mut request_state = BatchRequesterState::new(responders, ret_tx, self.retry_limit);
let network_sender = self.network_sender.clone();
let request_num_peers = self.request_num_peers;
let my_peer_id = self.my_peer_id;
Expand Down
Loading

0 comments on commit 1c5d7e7

Please sign in to comment.