Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Fix revalidation not revalidating multiple times #5065

Merged
merged 1 commit into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

//! Substrate transaction pool implementation.

#![recursion_limit="256"]
#![warn(missing_docs)]
#![warn(unused_extern_crates)]

Expand Down
34 changes: 32 additions & 2 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ async fn batch_revalidate<Api: ChainApi>(
}

pool.validated_pool().remove_invalid(&invalid_hashes);
pool.resubmit(revalidated);
if revalidated.len() > 0 {
pool.resubmit(revalidated);
}
}

impl<Api: ChainApi> RevalidationWorker<Api> {
Expand Down Expand Up @@ -149,6 +151,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
} else {
for xt in &to_queue {
extrinsics.remove(xt);
self.members.remove(xt);
Copy link
Contributor Author

@NikVolf NikVolf Feb 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that only this is required to a fix it.

The rest is defensive programming and logging :)

}
}
left -= to_queue.len();
Expand All @@ -163,14 +166,26 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
queued_exts
}

fn len(&self) -> usize {
self.block_ordered.iter().map(|b| b.1.len()).sum()
}

fn push(&mut self, worker_payload: WorkerPayload<Api>) {
// we don't add something that already scheduled for revalidation
let transactions = worker_payload.transactions;
let block_number = worker_payload.at;

for ext_hash in transactions {
// we don't add something that already scheduled for revalidation
if self.members.contains_key(&ext_hash) { continue; }
if self.members.contains_key(&ext_hash) {
log::debug!(
target: "txpool",
"[{:?}] Skipped adding for revalidation: Already there.",
ext_hash,
);

continue;
}

self.block_ordered.entry(block_number)
.and_modify(|value| { value.insert(ext_hash.clone()); })
Expand Down Expand Up @@ -198,7 +213,18 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
futures::select! {
_ = interval.next() => {
let next_batch = this.prepare_batch();
let batch_len = next_batch.len();

batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;

if batch_len > 0 || this.len() > 0 {
log::debug!(
target: "txpool",
"Revalidated {} transactions. Left in the queue for revalidation: {}.",
batch_len,
this.len(),
);
}
},
workload = from_queue.next() => {
match workload {
Expand Down Expand Up @@ -264,6 +290,10 @@ where
/// If queue configured without background worker, this will resolve after
/// revalidation is actually done.
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
if transactions.len() > 0 {
log::debug!(target: "txpool", "Added {} transactions to revalidation queue", transactions.len());
}

if let Some(ref to_worker) = self.background {
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
Expand Down
28 changes: 28 additions & 0 deletions client/transaction-pool/src/testing/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,34 @@ fn should_not_retain_invalid_hashes_from_retracted() {
assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_revalidate_transaction_multiple_times() {
let xt = uxt(Alice, 209);

let (pool, _guard) = maintained_pool();

block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

pool.api.push_block(1, vec![xt.clone()]);

// maintenance is in background
block_on(pool.maintain(block_event(1)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));

block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
assert_eq!(pool.status().ready, 1);

pool.api.push_block(2, vec![]);
pool.api.add_invalid(&xt);

// maintenance is in background
block_on(pool.maintain(block_event(2)));
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));

assert_eq!(pool.status().ready, 0);
}

#[test]
fn should_push_watchers_during_maintaince() {
fn alice_uxt(nonce: u64) -> Extrinsic {
Expand Down