Commit d8e352fa authored by Tomasz Drwięga's avatar Tomasz Drwięga Committed by Gav Wood
Browse files

Extrinsic pool (#182)

* Use latest version of txpool.

* Initial version of the pool.

* Fix abstraction.

* Implement watchers and notifications.

* Return hash from RPC.

* Remove commented code.

* Remove client dep.

* Fix tests.
parent 3de2b3bb
...@@ -61,39 +61,11 @@ mod informant; ...@@ -61,39 +61,11 @@ mod informant;
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc;
use futures::sync::mpsc; use futures::sync::mpsc;
use futures::{Sink, Future, Stream}; use futures::{Sink, Future, Stream};
use tokio_core::reactor; use tokio_core::reactor;
use parking_lot::Mutex;
use service::ChainSpec; use service::ChainSpec;
use primitives::block::Extrinsic;
struct RpcTransactionPool {
inner: Arc<Mutex<txpool::TransactionPool>>,
network: Arc<network::Service>,
}
impl substrate_rpc::author::AuthorApi for RpcTransactionPool {
fn submit_extrinsic(&self, xt: Extrinsic) -> substrate_rpc::author::error::Result<()> {
use primitives::hexdisplay::HexDisplay;
use polkadot_runtime::UncheckedExtrinsic;
use codec::Slicable;
info!("Extrinsic submitted: {}", HexDisplay::from(&xt.0));
let decoded = xt.using_encoded(|ref mut s| UncheckedExtrinsic::decode(s))
.ok_or(substrate_rpc::author::error::ErrorKind::InvalidFormat)?;
info!("Correctly formatted: {:?}", decoded);
self.inner.lock().import(decoded)
.map_err(|_| substrate_rpc::author::error::ErrorKind::PoolError)?;
self.network.trigger_repropagate();
Ok(())
}
}
struct Configuration(service::Configuration); struct Configuration(service::Configuration);
...@@ -238,11 +210,12 @@ fn run_until_exit<B, E>(mut core: reactor::Core, service: service::Service<B, E> ...@@ -238,11 +210,12 @@ fn run_until_exit<B, E>(mut core: reactor::Core, service: service::Service<B, E>
let handler = || { let handler = || {
let chain = rpc::apis::chain::Chain::new(service.client(), core.remote()); let chain = rpc::apis::chain::Chain::new(service.client(), core.remote());
let pool = RpcTransactionPool { rpc::rpc_handler(
inner: service.transaction_pool(), service.client(),
network: service.network(), chain,
}; service.transaction_pool(),
rpc::rpc_handler(service.client(), chain, pool, Configuration(config.clone())) Configuration(config.clone()),
)
}; };
( (
start_server(http_address, |address| rpc::start_http(address, handler())), start_server(http_address, |address| rpc::start_http(address, handler())),
......
...@@ -78,7 +78,6 @@ use tokio_core::reactor::{Handle, Timeout, Interval}; ...@@ -78,7 +78,6 @@ use tokio_core::reactor::{Handle, Timeout, Interval};
use futures::prelude::*; use futures::prelude::*;
use futures::future::{self, Shared}; use futures::future::{self, Shared};
use parking_lot::Mutex;
use collation::CollationFetch; use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion; use dynamic_inclusion::DynamicInclusion;
...@@ -226,7 +225,7 @@ pub struct ProposerFactory<C, N, P> { ...@@ -226,7 +225,7 @@ pub struct ProposerFactory<C, N, P> {
/// The client instance. /// The client instance.
pub client: Arc<C>, pub client: Arc<C>,
/// The transaction pool. /// The transaction pool.
pub transaction_pool: Arc<Mutex<TransactionPool>>, pub transaction_pool: Arc<TransactionPool>,
/// The backing network handle. /// The backing network handle.
pub network: N, pub network: N,
/// Parachain collators. /// Parachain collators.
...@@ -319,7 +318,7 @@ pub struct Proposer<C: PolkadotApi, R, P> { ...@@ -319,7 +318,7 @@ pub struct Proposer<C: PolkadotApi, R, P> {
random_seed: Hash, random_seed: Hash,
router: R, router: R,
table: Arc<SharedTable>, table: Arc<SharedTable>,
transaction_pool: Arc<Mutex<TransactionPool>>, transaction_pool: Arc<TransactionPool>,
} }
impl<C, R, P> bft::Proposer for Proposer<C, R, P> impl<C, R, P> bft::Proposer for Proposer<C, R, P>
...@@ -495,17 +494,16 @@ impl<C, R, P> bft::Proposer for Proposer<C, R, P> ...@@ -495,17 +494,16 @@ impl<C, R, P> bft::Proposer for Proposer<C, R, P>
use primitives::bft::{MisbehaviorKind, MisbehaviorReport}; use primitives::bft::{MisbehaviorKind, MisbehaviorReport};
use polkadot_runtime::{Call, Extrinsic, UncheckedExtrinsic, ConsensusCall}; use polkadot_runtime::{Call, Extrinsic, UncheckedExtrinsic, ConsensusCall};
let local_id = self.local_key.public().0; let local_id = self.local_key.public().0;
let mut pool = self.transaction_pool.lock();
let mut next_index = { let mut next_index = {
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client); let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
let cur_index = self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending| pending
pool.cull(None, readiness_evaluator.clone());
let cur_index = pool.pending(readiness_evaluator)
.filter(|tx| tx.as_ref().as_ref().signed == local_id) .filter(|tx| tx.as_ref().as_ref().signed == local_id)
.last() .last()
.map(|tx| Ok(tx.as_ref().as_ref().index)) .map(|tx| Ok(tx.as_ref().as_ref().index))
.unwrap_or_else(|| self.client.index(&self.parent_id, local_id)); .unwrap_or_else(|| self.client.index(&self.parent_id, local_id))
);
match cur_index { match cur_index {
Ok(cur_index) => cur_index + 1, Ok(cur_index) => cur_index + 1,
...@@ -541,7 +539,7 @@ impl<C, R, P> bft::Proposer for Proposer<C, R, P> ...@@ -541,7 +539,7 @@ impl<C, R, P> bft::Proposer for Proposer<C, R, P>
let signature = self.local_key.sign(&extrinsic.encode()).into(); let signature = self.local_key.sign(&extrinsic.encode()).into();
let uxt = UncheckedExtrinsic { extrinsic, signature }; let uxt = UncheckedExtrinsic { extrinsic, signature };
pool.import(uxt).expect("locally signed extrinsic is valid; qed"); self.transaction_pool.import_unchecked_extrinsic(uxt).expect("locally signed extrinsic is valid; qed");
} }
} }
} }
...@@ -609,7 +607,7 @@ pub struct CreateProposal<C: PolkadotApi, R, P: Collators> { ...@@ -609,7 +607,7 @@ pub struct CreateProposal<C: PolkadotApi, R, P: Collators> {
parent_number: BlockNumber, parent_number: BlockNumber,
parent_id: C::CheckedBlockId, parent_id: C::CheckedBlockId,
client: Arc<C>, client: Arc<C>,
transaction_pool: Arc<Mutex<TransactionPool>>, transaction_pool: Arc<TransactionPool>,
collation: CollationFetch<P, C>, collation: CollationFetch<P, C>,
router: R, router: R,
table: Arc<SharedTable>, table: Arc<SharedTable>,
...@@ -631,37 +629,33 @@ impl<C, R, P> CreateProposal<C, R, P> ...@@ -631,37 +629,33 @@ impl<C, R, P> CreateProposal<C, R, P>
candidates, candidates,
)?; )?;
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
{ {
let mut pool = self.transaction_pool.lock(); let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
let mut unqueue_invalid = Vec::new(); let mut unqueue_invalid = Vec::new();
let mut pending_size = 0; self.transaction_pool.cull_and_get_pending(readiness_evaluator, |pending_iterator| {
let mut pending_size = 0;
pool.cull(None, readiness_evaluator.clone()); for pending in pending_iterator {
for pending in pool.pending(readiness_evaluator.clone()) { // skip and cull transactions which are too large.
// skip and cull transactions which are too large. if pending.encoded_size() > MAX_TRANSACTIONS_SIZE {
if pending.encoded_size() > MAX_TRANSACTIONS_SIZE { unqueue_invalid.push(pending.hash().clone());
unqueue_invalid.push(pending.hash().clone()); continue
continue }
}
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break } if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
match block_builder.push_extrinsic(pending.as_transaction().clone()) { match block_builder.push_extrinsic(pending.as_transaction().clone()) {
Ok(()) => { Ok(()) => {
pending_size += pending.encoded_size(); pending_size += pending.encoded_size();
} }
Err(e) => { Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e); trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(pending.hash().clone()); unqueue_invalid.push(pending.hash().clone());
}
} }
} }
} });
for tx_hash in unqueue_invalid { self.transaction_pool.remove(&unqueue_invalid, false);
pool.remove(&tx_hash, false);
}
} }
let polkadot_block = block_builder.bake(); let polkadot_block = block_builder.bake();
......
...@@ -28,7 +28,6 @@ use client::{BlockchainEvents, ChainHead}; ...@@ -28,7 +28,6 @@ use client::{BlockchainEvents, ChainHead};
use ed25519; use ed25519;
use futures::prelude::*; use futures::prelude::*;
use futures::{future, Canceled}; use futures::{future, Canceled};
use parking_lot::Mutex;
use polkadot_api::LocalPolkadotApi; use polkadot_api::LocalPolkadotApi;
use polkadot_primitives::AccountId; use polkadot_primitives::AccountId;
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
...@@ -237,7 +236,7 @@ impl Service { ...@@ -237,7 +236,7 @@ impl Service {
client: Arc<C>, client: Arc<C>,
api: Arc<A>, api: Arc<A>,
network: Arc<net::ConsensusService>, network: Arc<net::ConsensusService>,
transaction_pool: Arc<Mutex<TransactionPool>>, transaction_pool: Arc<TransactionPool>,
parachain_empty_duration: Duration, parachain_empty_duration: Duration,
key: ed25519::Pair, key: ed25519::Pair,
) -> Service ) -> Service
......
...@@ -22,7 +22,7 @@ pub use network::NetworkConfiguration; ...@@ -22,7 +22,7 @@ pub use network::NetworkConfiguration;
/// The chain specification (this should eventually be replaced by a more general JSON-based chain /// The chain specification (this should eventually be replaced by a more general JSON-based chain
/// specification). /// specification).
#[derive(Clone)] #[derive(Clone, Copy)]
pub enum ChainSpec { pub enum ChainSpec {
/// Whatever the current runtime is, with just Alice as an auth. /// Whatever the current runtime is, with just Alice as an auth.
Development, Development,
...@@ -33,6 +33,7 @@ pub enum ChainSpec { ...@@ -33,6 +33,7 @@ pub enum ChainSpec {
} }
/// Service configuration. /// Service configuration.
#[derive(Clone)]
pub struct Configuration { pub struct Configuration {
/// Node roles. /// Node roles.
pub roles: Role, pub roles: Role,
...@@ -63,21 +64,3 @@ impl Default for Configuration { ...@@ -63,21 +64,3 @@ impl Default for Configuration {
} }
} }
} }
impl Clone for Configuration {
fn clone(&self) -> Configuration {
Configuration {
roles: self.roles.clone(),
transaction_pool: transaction_pool::Options {
max_count: self.transaction_pool.max_count.clone(),
max_mem_usage: self.transaction_pool.max_mem_usage.clone(),
max_per_sender: self.transaction_pool.max_per_sender.clone(),
},
network: self.network.clone(),
keystore_path: self.keystore_path.clone(),
database_path: self.database_path.clone(),
keys: self.keys.clone(),
chain_spec: self.chain_spec.clone(),
}
}
}
...@@ -50,14 +50,14 @@ extern crate hex_literal; ...@@ -50,14 +50,14 @@ extern crate hex_literal;
mod error; mod error;
mod config; mod config;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use futures::prelude::*; use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use codec::Slicable; use codec::Slicable;
use primitives::block::{Id as BlockId, Extrinsic, ExtrinsicHash, HeaderHash, Header}; use primitives::block::{Id as BlockId, ExtrinsicHash, HeaderHash, Header};
use primitives::{AuthorityId, hashing}; use primitives::{AuthorityId};
use transaction_pool::TransactionPool; use transaction_pool::TransactionPool;
use substrate_executor::NativeExecutor; use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch; use polkadot_executor::Executor as LocalDispatch;
...@@ -80,13 +80,13 @@ pub struct Service<B, E> { ...@@ -80,13 +80,13 @@ pub struct Service<B, E> {
thread: Option<thread::JoinHandle<()>>, thread: Option<thread::JoinHandle<()>>,
client: Arc<Client<B, E>>, client: Arc<Client<B, E>>,
network: Arc<network::Service>, network: Arc<network::Service>,
transaction_pool: Arc<Mutex<TransactionPool>>, transaction_pool: Arc<TransactionPool>,
signal: Option<Signal>, signal: Option<Signal>,
_consensus: Option<consensus::Service>, _consensus: Option<consensus::Service>,
} }
struct TransactionPoolAdapter<B, E, A> where A: Send + Sync, E: Send + Sync { struct TransactionPoolAdapter<B, E, A> where A: Send + Sync, E: Send + Sync {
pool: Arc<Mutex<TransactionPool>>, pool: Arc<TransactionPool>,
client: Arc<Client<B, E>>, client: Arc<Client<B, E>>,
api: Arc<A>, api: Arc<A>,
} }
...@@ -108,19 +108,22 @@ impl<B, E, A> network::TransactionPool for TransactionPoolAdapter<B, E, A> ...@@ -108,19 +108,22 @@ impl<B, E, A> network::TransactionPool for TransactionPoolAdapter<B, E, A>
}; };
let id = self.api.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed."); let id = self.api.check_id(BlockId::Hash(best_block)).expect("Best block is always valid; qed.");
let mut pool = self.pool.lock(); let ready = transaction_pool::Ready::create(id, &*self.api);
pool.cull(None, transaction_pool::Ready::create(id.clone(), &*self.api));
pool.pending(transaction_pool::Ready::create(id, &*self.api)).map(|t| { self.pool.cull_and_get_pending(ready, |pending| pending
let hash = ::primitives::Hash::from(&t.hash()[..]); .map(|t| {
let tx = codec::Slicable::encode(t.as_transaction()); let hash = ::primitives::Hash::from(&t.hash()[..]);
(hash, tx) let tx = codec::Slicable::encode(t.as_transaction());
}).collect() (hash, tx)
})
.collect()
)
} }
fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash> { fn import(&self, transaction: &[u8]) -> Option<ExtrinsicHash> {
if let Some(tx) = codec::Slicable::decode(&mut &transaction[..]) { if let Some(uxt) = codec::Slicable::decode(&mut &transaction[..]) {
match self.pool.lock().import(tx) { match self.pool.import_unchecked_extrinsic(uxt) {
Ok(t) => Some(t.hash()[..].into()), Ok(xt) => Some(*xt.hash()),
Err(e) => match *e.kind() { Err(e) => match *e.kind() {
transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()), transaction_pool::ErrorKind::AlreadyImported(hash) => Some(hash[..].into()),
_ => { _ => {
...@@ -134,6 +137,10 @@ impl<B, E, A> network::TransactionPool for TransactionPoolAdapter<B, E, A> ...@@ -134,6 +137,10 @@ impl<B, E, A> network::TransactionPool for TransactionPoolAdapter<B, E, A>
None None
} }
} }
fn on_broadcasted(&self, propagations: HashMap<ExtrinsicHash, Vec<String>>) {
self.pool.on_broadcasted(propagations)
}
} }
pub struct ChainConfig { pub struct ChainConfig {
...@@ -341,7 +348,7 @@ impl<B, E> Service<B, E> ...@@ -341,7 +348,7 @@ impl<B, E> Service<B, E>
C: Fn( C: Fn(
Arc<Client<B, E>>, Arc<Client<B, E>>,
Arc<network::Service>, Arc<network::Service>,
Arc<Mutex<TransactionPool>>, Arc<TransactionPool>,
&Keystore &Keystore
) -> Result<Option<consensus::Service>, error::Error>, ) -> Result<Option<consensus::Service>, error::Error>,
A: PolkadotApi + Send + Sync + 'static, A: PolkadotApi + Send + Sync + 'static,
...@@ -383,7 +390,7 @@ impl<B, E> Service<B, E> ...@@ -383,7 +390,7 @@ impl<B, E> Service<B, E>
let api = api_creator(client.clone()); let api = api_creator(client.clone());
let best_header = client.best_block_header()?; let best_header = client.best_block_header()?;
info!("Starting Polkadot. Best block is #{}", best_header.number); info!("Starting Polkadot. Best block is #{}", best_header.number);
let transaction_pool = Arc::new(Mutex::new(TransactionPool::new(config.transaction_pool))); let transaction_pool = Arc::new(TransactionPool::new(config.transaction_pool));
let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
pool: transaction_pool.clone(), pool: transaction_pool.clone(),
client: client.clone(), client: client.clone(),
...@@ -414,14 +421,28 @@ impl<B, E> Service<B, E> ...@@ -414,14 +421,28 @@ impl<B, E> Service<B, E>
thread_barrier.wait(); thread_barrier.wait();
let mut core = Core::new().expect("tokio::Core could not be created"); let mut core = Core::new().expect("tokio::Core could not be created");
let events = client.import_notification_stream().for_each(move |notification| {
network.on_block_imported(notification.hash, &notification.header);
prune_imported(&*client, &*txpool, notification.hash);
Ok(()) // block notifications
}); let network1 = network.clone();
let txpool1 = txpool.clone();
let events = client.import_notification_stream()
.for_each(move |notification| {
network1.on_block_imported(notification.hash, &notification.header);
prune_imported(&*api, &*txpool1, notification.hash);
Ok(())
});
core.handle().spawn(events); core.handle().spawn(events);
// transaction notifications
let events = txpool.import_notification_stream()
// TODO [ToDr] Consider throttling?
.for_each(move |_| {
network.trigger_repropagate();
Ok(())
});
core.handle().spawn(events);
if let Err(e) = core.run(exit) { if let Err(e) = core.run(exit) {
debug!("Polkadot service event loop shutdown with {:?}", e); debug!("Polkadot service event loop shutdown with {:?}", e);
} }
...@@ -457,30 +478,22 @@ impl<B, E> Service<B, E> ...@@ -457,30 +478,22 @@ impl<B, E> Service<B, E>
} }
/// Get shared transaction pool instance. /// Get shared transaction pool instance.
pub fn transaction_pool(&self) -> Arc<Mutex<TransactionPool>> { pub fn transaction_pool(&self) -> Arc<TransactionPool> {
self.transaction_pool.clone() self.transaction_pool.clone()
} }
} }
fn prune_transactions(pool: &mut TransactionPool, extrinsics: &[Extrinsic]) {
for extrinsic in extrinsics {
let hash: _ = hashing::blake2_256(&extrinsic.encode()).into();
pool.remove(&hash, true);
}
}
/// Produce a task which prunes any finalized transactions from the pool. /// Produce a task which prunes any finalized transactions from the pool.
pub fn prune_imported<B, E>(client: &Client<B, E>, pool: &Mutex<TransactionPool>, hash: HeaderHash) pub fn prune_imported<A>(api: &A, pool: &TransactionPool, hash: HeaderHash)
where where
B: Backend + Send + Sync, A: PolkadotApi,
E: CallExecutor + Send + Sync,
client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{ {
let id = BlockId::Hash(hash); match api.check_id(BlockId::Hash(hash)) {
match client.body(&id) { Ok(id) => {
Ok(Some(body)) => prune_transactions(&mut *pool.lock(), &body[..]), let ready = transaction_pool::Ready::create(id, api);
Ok(None) => warn!("Missing imported block {:?}", hash), pool.cull(None, ready);
Err(e) => warn!("Failed to fetch block: {:?}", e), },
Err(e) => warn!("Failed to check block id: {:?}", e),
} }
} }
......
...@@ -4,15 +4,14 @@ version = "0.1.0" ...@@ -4,15 +4,14 @@ version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
[dependencies] [dependencies]
log = "0.4.0" log = "0.3.0"
transaction-pool = "1.12.0"
error-chain = "0.11" error-chain = "0.11"
polkadot-api = { path = "../api" } polkadot-api = { path = "../api" }
polkadot-primitives = { path = "../primitives" } polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" } polkadot-runtime = { path = "../runtime" }
substrate-client = { path = "../../substrate/client" } substrate-client = { path = "../../substrate/client" }
substrate-rpc = { path = "../../substrate/rpc" } substrate-codec = { path = "../../substrate/codec" }
substrate-extrinsic-pool = { path = "../../substrate/extrinsic-pool" }
substrate-primitives = { path = "../../substrate/primitives" } substrate-primitives = { path = "../../substrate/primitives" }
substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" } substrate-runtime-primitives = { path = "../../substrate/runtime/primitives" }
substrate-codec = { path = "../../substrate/codec" }
ed25519 = { path = "../../substrate/ed25519" } ed25519 = { path = "../../substrate/ed25519" }
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use extrinsic_pool::{self, txpool};
use primitives::Hash;
use runtime::UncheckedExtrinsic;
error_chain! {
links {
Pool(txpool::Error, txpool::ErrorKind);
}
errors {
/// Unexpected extrinsic format submitted
InvalidExtrinsicFormat {
description("Invalid extrinsic format."),
display("Invalid extrinsic format."),
}
/// Attempted to queue an inherent transaction.
IsInherent(xt: UncheckedExtrinsic) {
description("Inherent transactions cannot be queued."),
display("Inehrent transactions cannot be queued."),
}
/// Attempted to queue a transaction with bad signature.
BadSignature(xt: UncheckedExtrinsic) {
description("Transaction had bad signature."),
display("Transaction had bad signature."),
}
/// Attempted to queue a transaction that is already in the pool.
AlreadyImported(hash: Hash) {
description("Transaction is already in the pool."),
display("Transaction {:?} is already in the pool.", hash),
}
/// Import error.
Import(err: Box<::std::error::Error + Send>) {
description("Error importing transaction"),
display("Error importing transaction: {}", err.description()),
}
}
}