Skip to content
Snippets Groups Projects
Commit 1628ba33 authored by Tomasz Drwięga's avatar Tomasz Drwięga Committed by Gavin Wood
Browse files

Revalidate some transactions on every block import. (#4292)

* Revalidate some transactions on every block import.

* Fix endless loop in revalidate_ready.

* Clean up logging a bit.

* More clean ups.

* Print status after resubmitting.

* Remove env_logger.

* Remove redundant log.
parent 925b23a3
No related merge requests found
......@@ -5568,7 +5568,6 @@ dependencies = [
"assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"criterion 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"derive_more 0.99.2 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-scale-codec 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
......
......@@ -16,7 +16,6 @@ txpool-api = { package = "sp-transaction-pool-api", path = "../../../primitives/
[dev-dependencies]
assert_matches = "1.3.0"
env_logger = "0.7.0"
codec = { package = "parity-scale-codec", version = "1.0.0" }
test_runtime = { package = "substrate-test-runtime", path = "../../../test/utils/runtime" }
criterion = "0.3"
......
......@@ -175,13 +175,39 @@ impl<B: ChainApi> Pool<B> {
///
/// Returns future that performs validation of all ready transactions and
/// then resubmits all transactions back to the pool.
pub fn revalidate_ready(&self, at: &BlockId<B::Block>) -> impl Future<Output=Result<(), B::Error>> {
pub fn revalidate_ready(
&self,
at: &BlockId<B::Block>,
max: Option<usize>,
) -> impl Future<Output=Result<(), B::Error>> {
use std::time::Instant;
log::debug!(target: "txpool",
"Fetching ready transactions (up to: {})",
max.map(|x| format!("{}", x)).unwrap_or_else(|| "all".into())
);
let validated_pool = self.validated_pool.clone();
let ready = self.validated_pool.ready().map(|tx| tx.data.clone());
let ready = self.validated_pool.ready()
.map(|tx| tx.data.clone())
.take(max.unwrap_or_else(usize::max_value));
let now = Instant::now();
self.verify(at, ready, false)
.map(move |revalidated_transactions| revalidated_transactions.map(
move |revalidated_transactions| validated_pool.resubmit(revalidated_transactions)
))
.map(move |revalidated_transactions| {
log::debug!(target: "txpool",
"Re-verified transactions, took {} ms. Resubmitting.",
now.elapsed().as_millis()
);
let now = Instant::now();
let res = revalidated_transactions.map(
|revalidated_transactions| validated_pool.resubmit(revalidated_transactions)
);
log::debug!(target: "txpool",
"Resubmitted. Took {} ms. Status: {:?}",
now.elapsed().as_millis(),
validated_pool.status()
);
res
})
}
/// Prunes known ready transactions.
......@@ -927,7 +953,6 @@ mod tests {
#[test]
fn should_handle_pruning_in_the_middle_of_import() {
let _ = env_logger::try_init();
// given
let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
let (tx, rx) = std::sync::mpsc::sync_channel(1);
......@@ -1014,7 +1039,7 @@ mod tests {
pool.validated_pool.api().invalidate.lock().insert(hash3);
pool.validated_pool.api().clear_requirements.lock().insert(hash1);
pool.validated_pool.api().add_requirements.lock().insert(hash0);
block_on(pool.revalidate_ready(&BlockId::Number(0))).unwrap();
block_on(pool.revalidate_ready(&BlockId::Number(0), None)).unwrap();
// then
// hash0 now has unsatisfied requirements => it is moved to the future queue
......
......@@ -22,7 +22,7 @@ use std::{
};
use serde::Serialize;
use log::debug;
use log::trace;
use parking_lot::RwLock;
use sp_runtime::traits::Member;
use sp_runtime::transaction_validity::{
......@@ -267,7 +267,7 @@ impl<Hash: hash::Hash + Member + Serialize, Ex> ReadyTransactions<Hash, Ex> {
to_remove.append(&mut tx.unlocks);
// add to removed
debug!(target: "txpool", "[{:?}] Removed as invalid: ", hash);
trace!(target: "txpool", "[{:?}] Removed as part of the subtree.", hash);
removed.push(tx.transaction.transaction);
}
}
......
......@@ -106,7 +106,12 @@ impl<B: ChainApi> ValidatedPool<B> {
.map(|validated_tx| self.submit_one(validated_tx))
.collect::<Vec<_>>();
let removed = self.enforce_limits();
// only enforce limits if there is at least one imported transaction
let removed = if results.iter().any(|res| res.is_ok()) {
self.enforce_limits()
} else {
Default::default()
};
results.into_iter().map(|res| match res {
Ok(ref hash) if removed.contains(hash) => Err(error::Error::ImmediatelyDropped.into()),
......@@ -236,6 +241,8 @@ impl<B: ChainApi> ValidatedPool<B> {
initial_statuses.insert(removed_hash.clone(), Status::Ready);
txs_to_resubmit.push((removed_hash, tx_to_resubmit));
}
// make sure to remove the hash even if it's not present in the pool any more.
updated_transactions.remove(&hash);
}
// if we're rejecting future transactions, then insertion order matters here:
......
......@@ -23,7 +23,7 @@ use futures::{
Future, FutureExt,
future::{Either, join, ready},
};
use log::{warn, debug};
use log::{warn, debug, trace};
use parking_lot::Mutex;
use client_api::{
......@@ -74,6 +74,11 @@ where
id: &BlockId<Block>,
retracted: &[Block::Hash],
) -> Box<dyn Future<Output=()> + Send + Unpin> {
let now = std::time::Instant::now();
let took = move || format!("Took {} ms", now.elapsed().as_millis());
let id = *id;
trace!(target: "txpool", "[{:?}] Starting pool maintainance", id);
// Put transactions from retracted blocks back into the pool.
let client_copy = self.client.clone();
let retracted_transactions = retracted.to_vec().into_iter()
......@@ -82,13 +87,14 @@ where
// if signed information is not present, attempt to resubmit anyway.
.filter(|tx| tx.is_signed().unwrap_or(true));
let resubmit_future = self.pool
.submit_at(id, retracted_transactions, true)
.then(|resubmit_result| ready(match resubmit_result {
Ok(_) => (),
Err(e) => {
debug!(target: "txpool", "Error re-submitting transactions: {:?}", e);
()
}
.submit_at(&id, retracted_transactions, true)
.then(move |resubmit_result| ready(match resubmit_result {
Ok(_) => trace!(target: "txpool",
"[{:?}] Re-submitting retracted done. {}", id, took()
),
Err(e) => debug!(target: "txpool",
"[{:?}] Error re-submitting transactions: {:?}", id, e
),
}));
// Avoid calling into runtime if there is nothing to prune from the pool anyway.
......@@ -96,28 +102,42 @@ where
return Box::new(resubmit_future)
}
let block = (self.client.header(*id), self.client.block_body(id));
match block {
let block = (self.client.header(id), self.client.block_body(&id));
let prune_future = match block {
(Ok(Some(header)), Ok(Some(extrinsics))) => {
let parent_id = BlockId::hash(*header.parent_hash());
let prune_future = self.pool
.prune(id, &parent_id, &extrinsics)
.then(|prune_result| ready(match prune_result {
Ok(_) => (),
Err(e) => {
warn!("Error pruning transactions: {:?}", e);
()
}
.prune(&id, &parent_id, &extrinsics)
.then(move |prune_result| ready(match prune_result {
Ok(_) => trace!(target: "txpool",
"[{:?}] Pruning done. {}", id, took()
),
Err(e) => warn!(target: "txpool",
"[{:?}] Error pruning transactions: {:?}", id, e
),
}));
Box::new(resubmit_future.then(|_| prune_future))
Either::Left(resubmit_future.then(|_| prune_future))
},
(Ok(_), Ok(_)) => Box::new(resubmit_future),
(Ok(_), Ok(_)) => Either::Right(resubmit_future),
err => {
warn!("Error reading block: {:?}", err);
Box::new(resubmit_future)
warn!(target: "txpool", "[{:?}] Error reading block: {:?}", id, err);
Either::Right(resubmit_future)
},
}
};
let revalidate_future = self.pool
.revalidate_ready(&id, Some(16))
.then(move |result| ready(match result {
Ok(_) => debug!(target: "txpool",
"[{:?}] Revalidation done: {}", id, took()
),
Err(e) => warn!(target: "txpool",
"[{:?}] Encountered errors while revalidating transactions: {:?}", id, e
),
}));
Box::new(prune_future.then(|_| revalidate_future))
}
}
......@@ -228,7 +248,7 @@ impl<Block, Client, PoolApi, F> LightBasicPoolMaintainer<Block, Client, PoolApi,
true => {
let revalidation_status = self.revalidation_status.clone();
Either::Left(self.pool
.revalidate_ready(id)
.revalidate_ready(id, None)
.map(|r| r.map_err(|e| warn!("Error revalidating known transactions: {}", e)))
.map(move |_| revalidation_status.lock().clear()))
},
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment