Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

approval-distribution: batched approval/assignment sending #6401

Merged
merged 12 commits into from
Dec 13, 2022
Prev Previous commit
Next Next commit
Add batch tests
Signed-off-by: Andrei Sandu <andrei-mihail@parity.io>
  • Loading branch information
sandreim committed Dec 6, 2022
commit a938c47cb69b01edb41a8662e1d93dbb0231d368
128 changes: 128 additions & 0 deletions node/network/approval-distribution/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2276,3 +2276,131 @@ fn resends_messages_periodically() {
virtual_overseer
});
}

fn batch_test_round(message_count: usize) {
use polkadot_node_subsystem::SubsystemContext;
let pool = sp_core::testing::TaskExecutor::new();
let mut state = State::default();

let (mut context, mut virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = ApprovalDistribution::new(Default::default());
let mut rng = rand_chacha::ChaCha12Rng::seed_from_u64(12345);
let mut sender = context.sender().clone();
let subsystem = subsystem.run_inner(context, &mut state, &mut rng);

let test_fut = async move {
let overseer = &mut virtual_overseer;
let validators = 0..message_count;
let assignments: Vec<_> = validators
.clone()
.map(|index| (fake_assignment_cert(Hash::zero(), ValidatorIndex(index as u32)), 0))
.collect();

let approvals: Vec<_> = validators
.map(|index| IndirectSignedApprovalVote {
block_hash: Hash::zero(),
candidate_index: 0,
validator: ValidatorIndex(index as u32),
signature: dummy_signature(),
})
.collect();

let peer = PeerId::random();
send_assignments_batched(&mut sender, assignments.clone(), peer).await;
send_approvals_batched(&mut sender, approvals.clone(), peer).await;

// Check expected assignments batches.
for assignment_index in (0..assignments.len()).step_by(super::MAX_BATCH_SIZE) {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
peers,
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Assignments(sent_assignments)
))
)) => {
// Last batch should cover all remaining messages.
if sent_assignments.len() < super::MAX_BATCH_SIZE {
assert_eq!(sent_assignments.len() + assignment_index, assignments.len());
} else {
assert_eq!(sent_assignments.len(), super::MAX_BATCH_SIZE);
}

assert_eq!(peers.len(), 1);

for (message_index, assignment) in sent_assignments.iter().enumerate() {
assert_eq!(assignment.0, assignments[assignment_index + message_index].0);
assert_eq!(assignment.1, 0);
}
}
);
}

// Check approval vote batching.
for approval_index in (0..approvals.len()).step_by(super::MAX_BATCH_SIZE) {
assert_matches!(
overseer_recv(overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::SendValidationMessage(
peers,
Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
protocol_v1::ApprovalDistributionMessage::Approvals(sent_approvals)
))
)) => {
// Last batch should cover all remaining messages.
if sent_approvals.len() < super::MAX_BATCH_SIZE {
assert_eq!(sent_approvals.len() + approval_index, approvals.len());
} else {
assert_eq!(sent_approvals.len(), super::MAX_BATCH_SIZE);
}

assert_eq!(peers.len(), 1);

for (message_index, approval) in sent_approvals.iter().enumerate() {
assert_eq!(approval, &approvals[approval_index + message_index]);
}
}
);
}
virtual_overseer
};

futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);

executor::block_on(future::join(
async move {
let mut overseer = test_fut.await;
overseer
.send(FromOrchestra::Signal(OverseerSignal::Conclude))
.timeout(TIMEOUT)
.await
.expect("Conclude send timeout");
},
subsystem,
));
}

#[test]
fn batch_sending_1_msg() {
batch_test_round(1);
}

#[test]
fn batch_sending_exactly_one_batch() {
batch_test_round(super::MAX_BATCH_SIZE);
}

#[test]
fn batch_sending_partial_batch() {
batch_test_round(super::MAX_BATCH_SIZE * 2 + 4);
}

#[test]
fn batch_sending_multiple_same_len() {
batch_test_round(super::MAX_BATCH_SIZE * 10);
}

#[test]
fn batch_sending_half_batch() {
batch_test_round(super::MAX_BATCH_SIZE / 2);
}