Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Jul 25, 2024
1 parent b120d19 commit a1ea209
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 161 deletions.
6 changes: 3 additions & 3 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ impl Default for ConsensusConfig {
max_sending_block_unique_txns: MAX_SENDING_BLOCK_UNIQUE_TXNS,
max_sending_block_bytes: 3 * 1024 * 1024, // 3MB
max_receiving_block_txns: *MAX_RECEIVING_BLOCK_TXNS,
max_sending_inline_txns: 100,
max_sending_inline_bytes: 200 * 1024, // 200 KB
max_receiving_block_bytes: 6 * 1024 * 1024, // 6MB
max_sending_inline_txns: MAX_SENDING_BLOCK_TXNS,
max_sending_inline_bytes: 3 * 1024 * 1024, // 3 MB
max_receiving_block_bytes: 6 * 1024 * 1024, // 6MB
max_pruned_blocks_in_mem: 100,
mempool_executed_txn_timeout_ms: 1000,
mempool_txn_pull_timeout_ms: 1000,
Expand Down
2 changes: 0 additions & 2 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ use aptos_types::{
account_address::AccountAddress, transaction::SignedTransaction,
validator_verifier::ValidatorVerifier, vm_status::DiscardedVMStatus, PeerId,
};
use futures::future::Lazy;
use once_cell::sync::OnceCell;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::{
backtrace,
collections::HashSet,
fmt::{self, Write},
sync::Arc,
Expand Down
40 changes: 40 additions & 0 deletions consensus/consensus-types/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,18 @@ where
}
}

impl<T> From<Vec<T>> for BatchPointer<T>
where
T: TDataInfo,
{
fn from(value: Vec<T>) -> Self {
Self {
batch_summary: value,
status: Arc::new(Mutex::new(None)),
}
}
}

impl<T: PartialEq> PartialEq for BatchPointer<T> {
fn eq(&self, other: &Self) -> bool {
self.batch_summary == other.batch_summary
Expand Down Expand Up @@ -122,6 +134,15 @@ pub struct InlineBatch {
transactions: Vec<SignedTransaction>,
}

impl InlineBatch {
pub fn new(batch_info: BatchInfo, transactions: Vec<SignedTransaction>) -> Self {
Self {
batch_info,
transactions,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct InlineBatches(Vec<InlineBatch>);

Expand Down Expand Up @@ -159,6 +180,12 @@ impl InlineBatches {
}
}

impl From<Vec<InlineBatch>> for InlineBatches {
fn from(value: Vec<InlineBatch>) -> Self {
Self(value)
}
}

impl Deref for InlineBatches {
type Target = Vec<InlineBatch>;

Expand Down Expand Up @@ -212,6 +239,19 @@ pub enum OptQuorumStorePayload {
}

impl OptQuorumStorePayload {
pub fn new(
inline_batches: InlineBatches,
opt_batches: BatchPointer<BatchInfo>,
proofs: BatchPointer<ProofOfStore>,
) -> Self {
Self::V1(OptQuorumStorePayloadV1 {
inline_batches,
opt_batches,
proofs,
execution_limits: PayloadExecutionLimit::None,
})
}

pub(crate) fn num_txns(&self) -> usize {
self.opt_batches.num_txns() + self.proofs.num_txns() + self.inline_batches.num_txns()
}
Expand Down
9 changes: 8 additions & 1 deletion consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::payload::TDataInfo;
use crate::{payload::TDataInfo, request_response::PayloadTxnsSize};
use anyhow::{bail, ensure, Context};
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
Expand Down Expand Up @@ -131,6 +131,13 @@ impl BatchInfo {
self.num_bytes
}

pub fn size(&self) -> PayloadTxnsSize {
PayloadTxnsSize {
max_count: self.num_txns,
max_bytes: self.num_bytes,
}
}

pub fn gas_bucket_start(&self) -> u64 {
self.gas_bucket_start
}
Expand Down
150 changes: 114 additions & 36 deletions consensus/consensus-types/src/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,127 @@ use anyhow::Result;
use futures::channel::oneshot;
use std::{fmt, fmt::Formatter, time::Duration};

#[derive(Debug, Clone, Copy)]
pub struct PayloadTxnsSize {
pub max_count: u64,
pub max_bytes: u64,
}

impl PayloadTxnsSize {
pub fn zero() -> Self {
Self {
max_count: 0,
max_bytes: 0,
}
}

pub fn compute_pct(self, pct: u8) -> Self {
Self {
max_count: self.max_count * pct as u64 / 100,
max_bytes: self.max_bytes * pct as u64 / 100,
}
}

pub fn saturating_sub(self, rhs: Self) -> Self {
Self {
max_count: self.max_count.saturating_sub(rhs.max_count),
max_bytes: self.max_bytes.saturating_sub(rhs.max_bytes),
}
}
}

impl std::ops::Add for PayloadTxnsSize {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
Self {
max_count: self.max_count + rhs.max_count,
max_bytes: self.max_bytes + rhs.max_bytes,
}
}
}

impl std::ops::AddAssign for PayloadTxnsSize {
fn add_assign(&mut self, rhs: Self) {
self.max_count += rhs.max_count;
self.max_bytes += rhs.max_bytes;
}
}

impl std::ops::Sub for PayloadTxnsSize {
type Output = Self;

fn sub(self, rhs: Self) -> Self::Output {
Self {
max_count: self.max_count - rhs.max_count,
max_bytes: self.max_bytes - rhs.max_bytes,
}
}
}

impl PartialEq for PayloadTxnsSize {
fn eq(&self, other: &Self) -> bool {
self.max_count == other.max_count && self.max_bytes == other.max_bytes
}
}

impl PartialOrd for PayloadTxnsSize {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
// If the size in bytes is less than the count, then we cannot compare
// the two quantities.
if self.max_count > self.max_bytes || other.max_count > other.max_bytes {
return None;
}

Some(self.cmp(other))
}
}

impl Eq for PayloadTxnsSize {}

impl Ord for PayloadTxnsSize {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
if self.max_count == other.max_count && self.max_bytes == other.max_bytes {
return std::cmp::Ordering::Equal;
}

if self.max_count > other.max_count && self.max_bytes > other.max_bytes {
return std::cmp::Ordering::Greater;
}

std::cmp::Ordering::Less
}
}

#[derive(Debug)]
pub struct GetPayloadRequest {
// max txns
pub max_txns: PayloadTxnsSize,
pub max_txns_after_filtering: u64,
// target txns with proof in max_txns as pct
pub txns_with_proofs_pct: u8,
// max inline txns
pub max_inline_txns: PayloadTxnsSize,
// return non full
pub return_non_full: bool,
// block payloads to exclude from the requested block
pub filter: PayloadFilter,
// callback to respond to
pub callback: oneshot::Sender<Result<GetPayloadResponse>>,
// block timestamp
pub block_timestamp: Duration,
}

pub enum GetPayloadCommand {
/// Request to pull block to submit to consensus.
GetPayloadRequest(
// max number of transactions in the block
u64,
// max number of unique transactions in the block
u64,
// max byte size
u64,
// max number of inline transactions (transactions without a proof of store)
u64,
// max byte size of inline transactions (transactions without a proof of store)
u64,
// return non full
bool,
// block payloads to exclude from the requested block
PayloadFilter,
// callback to respond to
oneshot::Sender<Result<GetPayloadResponse>>,
// block timestamp
Duration,
),
GetPayloadRequest(GetPayloadRequest),
}

impl fmt::Display for GetPayloadCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
GetPayloadCommand::GetPayloadRequest(
max_txns,
max_txns_after_filtering,
max_bytes,
max_inline_txns,
max_inline_bytes,
return_non_full,
excluded,
_,
block_timestamp,
) => {
write!(
f,
"GetPayloadRequest [max_txns: {}, max_txns_after_filtering: {}, max_bytes: {}, max_inline_txns: {}, max_inline_bytes:{}, return_non_full: {}, excluded: {}, block_timestamp: {:?}]",
max_txns, max_txns_after_filtering, max_bytes, max_inline_txns, max_inline_bytes, return_non_full, excluded, block_timestamp
)
GetPayloadCommand::GetPayloadRequest(request) => {
write!(f, "{:?}", request)
},
}
}
Expand Down
30 changes: 17 additions & 13 deletions consensus/src/payload_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,27 @@ pub mod mixed;
pub mod user;
pub mod validator;

pub struct PayloadPullParameters {
pub max_poll_time: Duration,
pub max_items: u64,
pub max_unique_items: u64,
pub max_bytes: u64,
pub max_inline_items: u64,
pub max_inline_bytes: u64,
pub validator_txn_filter: TransactionFilter,
pub user_txn_filter: PayloadFilter,
pub wait_callback: BoxFuture<'static, ()>,
pub pending_ordering: bool,
pub pending_uncommitted_blocks: usize,
pub recent_max_fill_fraction: f32,
pub block_timestamp: Duration,
}

#[async_trait::async_trait]
pub trait PayloadClient: Send + Sync {
async fn pull_payload(
&self,
max_poll_time: Duration,
max_items: u64,
max_unique_items: u64,
max_bytes: u64,
max_inline_items: u64,
max_inline_bytes: u64,
validator_txn_filter: TransactionFilter,
user_txn_filter: PayloadFilter,
wait_callback: BoxFuture<'static, ()>,
pending_ordering: bool,
pending_uncommitted_blocks: usize,
recent_max_fill_fraction: f32,
block_timestamp: Duration,
config: PayloadPullParameters,
) -> anyhow::Result<(Vec<ValidatorTransaction>, Payload), QuorumStoreError>;

fn trace_payloads(&self) {}
Expand Down
5 changes: 3 additions & 2 deletions consensus/src/quorum_store/batch_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,17 @@ impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
}
}

/// TODO(ibalajiarun): implement RPCWithFallback
pub(crate) async fn request_batch(
&self,
digest: HashValue,
expiration: u64,
signers: Vec<PeerId>,
proof_signers: Vec<PeerId>,
ret_tx: oneshot::Sender<ExecutorResult<Vec<SignedTransaction>>>,
mut subscriber_rx: oneshot::Receiver<PersistedValue>,
) -> Option<(BatchInfo, Vec<SignedTransaction>)> {
let validator_verifier = self.validator_verifier.clone();
let mut request_state = BatchRequesterState::new(signers, ret_tx, self.retry_limit);
let mut request_state = BatchRequesterState::new(proof_signers, ret_tx, self.retry_limit);
let network_sender = self.network_sender.clone();
let request_num_peers = self.request_num_peers;
let my_peer_id = self.my_peer_id;
Expand Down
5 changes: 2 additions & 3 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,18 @@ use crate::{
},
};
use anyhow::bail;
use aptos_consensus_types::proof_of_store::{BatchInfo, ProofOfStore, SignedBatchInfo};
use aptos_consensus_types::proof_of_store::SignedBatchInfo;
use aptos_crypto::HashValue;
use aptos_executor_types::{ExecutorError, ExecutorResult};
use aptos_logger::prelude::*;
use aptos_types::{transaction::SignedTransaction, validator_signer::ValidatorSigner, PeerId};
use dashmap::{
mapref::entry::Entry::{self, Occupied, Vacant},
mapref::entry::Entry::{Occupied, Vacant},
DashMap,
};
use fail::fail_point;
use once_cell::sync::OnceCell;
use std::{
hash::Hash,
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex,
Expand Down
Loading

0 comments on commit a1ea209

Please sign in to comment.