Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Prefer fetching small PoVs from backing group #7173

Merged
merged 7 commits into from
May 5, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
enable fetching from backing group for small pov
Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
  • Loading branch information
sandreim committed May 3, 2023
commit 045d03c2f829586a96970e1d086f9cd0782a0e31
97 changes: 90 additions & 7 deletions node/network/availability-recovery/src/lib.rs
Original file line number Diff line number Diff line change
@@ -99,15 +99,20 @@ const TIMEOUT_START_NEW_REQUESTS: Duration = CHUNK_REQUEST_TIMEOUT;
#[cfg(test)]
const TIMEOUT_START_NEW_REQUESTS: Duration = Duration::from_millis(100);

/// PoV size limit in bytes for which prefer fetching from backers.
const SMALL_POV_LIMIT: usize = 128 * 1024;

/// The Availability Recovery Subsystem.
pub struct AvailabilityRecoverySubsystem {
/// Do not request data from the availability store.
/// This is the useful for nodes where the
/// availability-store subsystem is not expected to run,
/// such as collators.
bypass_availability_store: bool,

/// If true, we first try backers.
fast_path: bool,
/// Large PoV threshold. Only used when `fast_path` is `false.
small_pov_limit: Option<usize>,
/// Receiver for available data requests.
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
/// Metrics for this subsystem.
@@ -863,10 +868,11 @@ async fn launch_recovery_task<Context>(
ctx: &mut Context,
session_info: SessionInfo,
receipt: CandidateReceipt,
backing_group: Option<GroupIndex>,
mut backing_group: Option<GroupIndex>,
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
small_pov_limit: Option<usize>,
) -> error::Result<()> {
let candidate_hash = receipt.hash();

@@ -880,6 +886,29 @@ async fn launch_recovery_task<Context>(
bypass_availability_store,
};

if let Some(small_pov_limit) = small_pov_limit {
// Get our own chunk size to get an estimate of the PoV size.
let chunk_size: Result<Option<usize>, error::Error> =
query_chunk_size(ctx, candidate_hash).await;
if let Ok(Some(chunk_size)) = chunk_size {
let pov_size_estimate = chunk_size.saturating_mul(session_info.validators.len() / 3);
let prefer_backing_group = pov_size_estimate < small_pov_limit;

gum::debug!(
target: LOG_TARGET,
pov_size_estimate,
small_pov_limit,
enabled = prefer_backing_group,
"Prefer fetch from backing group",
);

backing_group = backing_group.filter(|_| {
// We keep the backing group only if `1/3` of chunks sum up to less than `small_pov_limit`.
prefer_backing_group
});
}
}

let phase = backing_group
.and_then(|g| session_info.validator_groups.get(g))
.map(|group| Source::RequestFromBackers(RequestFromBackers::new(group.clone())))
@@ -919,6 +948,7 @@ async fn handle_recover<Context>(
response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
bypass_availability_store: bool,
metrics: &Metrics,
small_pov_limit: Option<usize>,
) -> error::Result<()> {
let candidate_hash = receipt.hash();

@@ -963,6 +993,7 @@ async fn handle_recover<Context>(
response_sender,
bypass_availability_store,
metrics,
small_pov_limit,
)
.await,
None => {
@@ -988,6 +1019,18 @@ async fn query_full_data<Context>(
rx.await.map_err(error::Error::CanceledQueryFullData)
}

/// Queries a chunk from av-store.
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
async fn query_chunk_size<Context>(
ctx: &mut Context,
candidate_hash: CandidateHash,
) -> error::Result<Option<usize>> {
let (tx, rx) = oneshot::channel();
ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
.await;

rx.await.map_err(error::Error::CanceledQueryFullData)
}
#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
impl AvailabilityRecoverySubsystem {
/// Create a new instance of `AvailabilityRecoverySubsystem` which never requests the
@@ -996,7 +1039,13 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, bypass_availability_store: true, req_receiver, metrics }
Self {
fast_path: false,
small_pov_limit: None,
bypass_availability_store: true,
req_receiver,
metrics,
}
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which starts with a fast path to
@@ -1005,20 +1054,53 @@ impl AvailabilityRecoverySubsystem {
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: true, bypass_availability_store: false, req_receiver, metrics }
Self {
fast_path: true,
small_pov_limit: None,
bypass_availability_store: false,
req_receiver,
metrics,
}
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which requests only chunks
pub fn with_chunks_only(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self { fast_path: false, bypass_availability_store: false, req_receiver, metrics }
Self {
fast_path: false,
small_pov_limit: None,
bypass_availability_store: false,
req_receiver,
metrics,
}
}

/// Create a new instance of `AvailabilityRecoverySubsystem` which requests chunks if PoV is
/// above a threshold.
pub fn with_chunks_if_pov_large(
req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
metrics: Metrics,
) -> Self {
Self {
fast_path: false,
small_pov_limit: Some(SMALL_POV_LIMIT),
bypass_availability_store: false,
req_receiver,
metrics,
}
}

async fn run<Context>(self, mut ctx: Context) -> SubsystemResult<()> {
let mut state = State::default();
let Self { fast_path, mut req_receiver, metrics, bypass_availability_store } = self;
let Self {
fast_path,
small_pov_limit,
mut req_receiver,
metrics,
bypass_availability_store,
} = self;

loop {
let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
@@ -1045,10 +1127,11 @@ impl AvailabilityRecoverySubsystem {
&mut ctx,
receipt,
session_index,
maybe_backing_group.filter(|_| fast_path),
maybe_backing_group.filter(|_| fast_path || small_pov_limit.is_some()),
response_sender,
bypass_availability_store,
&metrics,
small_pov_limit,
).await {
gum::warn!(
target: LOG_TARGET,
Loading