Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block availability data enum #6866

Open
wants to merge 8 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 50 additions & 60 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use crate::block_verification_types::{
pub use crate::canonical_head::CanonicalHead;
use crate::chain_config::ChainConfig;
use crate::data_availability_checker::{
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
DataColumnReconstructionResult,
Availability, AvailabilityCheckError, AvailableBlock, AvailableBlockData,
DataAvailabilityChecker, DataColumnReconstructionResult,
};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -3170,7 +3170,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::DuplicateFullyImported(block_root));
}

self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
// process_engine_blobs is called for both pre and post PeerDAS. However, post PeerDAS
// consumers don't expect the blobs event to fire erraticly.
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
if !self
.spec
.is_peer_das_enabled_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()))
{
self.emit_sse_blob_sidecar_events(&block_root, blobs.iter().flatten().map(Arc::as_ref));
}

let r = self
.check_engine_blob_availability_and_import(slot, block_root, blobs, data_column_recv)
Expand Down Expand Up @@ -3641,9 +3648,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;
let availability = self.data_availability_checker.put_engine_blobs(
block_root,
slot.epoch(T::EthSpec::slots_per_epoch()),
blobs,
data_column_recv,
)?;

self.process_availability(slot, availability, || Ok(()))
.await
Expand Down Expand Up @@ -3728,7 +3738,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;

// Record the time at which this block's blobs became available.
Expand Down Expand Up @@ -3756,7 +3765,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block,
parent_eth1_finalization_data,
consensus_context,
data_column_recv,
)
},
"payload_verification_handle",
Expand Down Expand Up @@ -3795,7 +3803,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
Expand Down Expand Up @@ -3893,7 +3900,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if let Some(proto_block) = fork_choice.get_block(&block_root) {
if let Err(e) = self.early_attester_cache.add_head_block(
block_root,
signed_block.clone(),
&signed_block,
proto_block,
&state,
&self.spec,
Expand Down Expand Up @@ -3962,15 +3969,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// If the write fails, revert fork choice to the version from disk, else we can
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
let (_, signed_block, block_data) = signed_block.deconstruct();

match self.get_blobs_or_columns_store_op(
block_root,
signed_block.epoch(),
blobs,
data_columns,
data_column_recv,
) {
match self.get_blobs_or_columns_store_op(block_root, block_data) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
Expand Down Expand Up @@ -7219,29 +7220,34 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

fn get_blobs_or_columns_store_op(
pub(crate) fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: Option<BlobSidecarList<T::EthSpec>>,
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
block_data: AvailableBlockData<T::EthSpec>,
) -> Result<Option<StoreOp<T::EthSpec>>, String> {
if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let _custody_columns_count = self.data_availability_checker.get_sampling_column_count();

let custody_columns_available = data_columns
.as_ref()
.as_ref()
.is_some_and(|columns| columns.len() == custody_columns_count);

let data_columns_to_persist = if custody_columns_available {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
data_columns
} else if let Some(data_column_recv) = data_column_recv {
match block_data {
AvailableBlockData::NoData => Ok(None),
AvailableBlockData::Blobs(blobs, _) => {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
Ok(Some(StoreOp::PutBlobs(block_root, blobs)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the is_empty check for both blobs and columns so we dont't store empty values?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AvailableBlockData::Blobs enum by definition must have more than zero blobs. If we do a check here we should error, instead of ignoring the Op

}
AvailableBlockData::DataColumns(data_columns) => {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)))
}
AvailableBlockData::DataColumnsRecv(data_column_recv) => {
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
Expand All @@ -7251,34 +7257,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let computed_data_columns = data_column_recv
.blocking_recv()
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
Some(computed_data_columns)
} else {
// No blobs in the block.
None
};

if let Some(data_columns) = data_columns_to_persist {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
}
}
} else if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => blobs.len(),
"count" => computed_data_columns.len(),
);
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
// TODO(das): Store only this node's custody columns
Ok(Some(StoreOp::PutDataColumns(
jimmygchen marked this conversation as resolved.
Show resolved Hide resolved
block_root,
computed_data_columns,
)))
}
}

Ok(None)
}
}

Expand Down
1 change: 0 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1707,7 +1707,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv: None,
},
payload_verification_handle,
})
Expand Down
16 changes: 3 additions & 13 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use derivative::Derivative;
use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::oneshot;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

/// A block that has been received over RPC. It has 2 internal variants:
Expand Down Expand Up @@ -265,7 +264,6 @@ impl<E: EthSpec> ExecutedBlock<E> {

/// A block that has completed all pre-deneb block processing checks including verification
/// by an EL client **and** has all requisite blob data to be imported into fork choice.
#[derive(PartialEq)]
pub struct AvailableExecutedBlock<E: EthSpec> {
pub block: AvailableBlock<E>,
pub import_data: BlockImportData<E>,
Expand Down Expand Up @@ -338,21 +336,14 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
}
}

#[derive(Debug, Derivative)]
#[derivative(PartialEq)]
#[derive(Debug, PartialEq)]
pub struct BlockImportData<E: EthSpec> {
pub block_root: Hash256,
pub state: BeaconState<E>,
pub parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<E>,
#[derivative(PartialEq = "ignore")]
/// An optional receiver for `DataColumnSidecarList`.
///
/// This field is `Some` when data columns are being computed asynchronously.
/// The resulting `DataColumnSidecarList` will be sent through this receiver.
pub data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<E>>>,
}

impl<E: EthSpec> BlockImportData<E> {
Expand All @@ -371,7 +362,6 @@ impl<E: EthSpec> BlockImportData<E> {
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
data_column_recv: None,
}
}
}
Expand Down
Loading