Commit 3367658d authored by asynchronous rob's avatar asynchronous rob Committed by Gav Wood
Browse files

Integrate transaction pool to the proposal logic (#80)

* reshuffle consensus libraries

* polkadot-useful type definitions for statement table

* begin BftService

* primary selection logic

* bft service implementation without I/O

* extract out `BlockImport` trait

* allow bft primitives to compile on wasm

* Block builder (substrate)

* take polkadot-consensus down to the core.

* test for preemption

* fix test build

* Fix wasm build

* Bulid on any block

* Test for block builder.

* Block import tests for client.

* Tidy ups

* clean up block builder instantiation

* justification verification logic

* JustifiedHeader and import

* Propert block generation for tests

* network and tablerouter trait

* use statement import to drive creation of further statements

* Fixed rpc tests

* custom error type for consensus

* create proposer

* asynchronous proposal evaluation

* inherent transactions in polkadot runtime

* fix tests to match real polkadot block constraints

* implicitly generate inherent functions

* add inherent transaction functionality to block body

* block builder logic for polkadot

* some tests for the polkadot API

* avoid redundancy in native code compatibility check

* helper for extracting nonce

* transaction pool implementation

* transaction pool

* integrate transaction pool with proposer

* indentation

* kill storage keys module

* accept new transactions to replace old
parent 05583c34
......@@ -36,7 +36,7 @@ use polkadot_runtime::runtime;
use polkadot_executor::Executor as LocalDispatch;
use substrate_executor::{NativeExecutionDispatch, NativeExecutor};
use state_machine::OverlayedChanges;
use primitives::{AccountId, SessionKey, Timestamp};
use primitives::{AccountId, SessionKey, Timestamp, TxOrder};
use primitives::block::{Id as BlockId, Block, Header, Body};
use primitives::transaction::UncheckedTransaction;
use primitives::parachain::DutyRoster;
......@@ -85,6 +85,7 @@ impl From<client::error::Error> for Error {
}
}
/// A builder for blocks.
pub trait BlockBuilder: Sized {
/// Push a non-inherent transaction.
fn push_transaction(&mut self, transaction: UncheckedTransaction) -> Result<()>;
......@@ -93,40 +94,64 @@ pub trait BlockBuilder: Sized {
fn bake(self) -> Block;
}
/// A checked block identifier.
pub trait CheckedBlockId: Clone {
/// Yield the underlying block ID.
fn block_id(&self) -> &BlockId;
}
/// Trait encapsulating the Polkadot API.
///
/// All calls should fail when the exact runtime is unknown.
pub trait PolkadotApi {
/// A checked block ID. Used to avoid redundancy of code check.
type CheckedBlockId: CheckedBlockId;
/// The type used to build blocks.
type BlockBuilder: BlockBuilder;
/// Check whether requests at the given block ID can be served.
///
/// It should not be possible to instantiate this type without going
/// through this function.
fn check_id(&self, id: BlockId) -> Result<Self::CheckedBlockId>;
/// Get session keys at a given block.
fn session_keys(&self, at: &BlockId) -> Result<Vec<SessionKey>>;
fn session_keys(&self, at: &Self::CheckedBlockId) -> Result<Vec<SessionKey>>;
/// Get validators at a given block.
fn validators(&self, at: &BlockId) -> Result<Vec<AccountId>>;
fn validators(&self, at: &Self::CheckedBlockId) -> Result<Vec<AccountId>>;
/// Get the authority duty roster at a block.
fn duty_roster(&self, at: &BlockId) -> Result<DutyRoster>;
fn duty_roster(&self, at: &Self::CheckedBlockId) -> Result<DutyRoster>;
/// Get the timestamp registered at a block.
fn timestamp(&self, at: &BlockId) -> Result<Timestamp>;
fn timestamp(&self, at: &Self::CheckedBlockId) -> Result<Timestamp>;
/// Get the nonce of an account at a block.
fn nonce(&self, at: &Self::CheckedBlockId, account: AccountId) -> Result<TxOrder>;
/// Evaluate a block and see if it gives an error.
fn evaluate_block(&self, at: &BlockId, block: Block) -> Result<()>;
fn evaluate_block(&self, at: &Self::CheckedBlockId, block: Block) -> Result<()>;
/// Create a block builder on top of the parent block.
fn build_block(&self, parent: &BlockId, timestamp: u64) -> Result<Self::BlockBuilder>;
fn build_block(&self, parent: &Self::CheckedBlockId, timestamp: u64) -> Result<Self::BlockBuilder>;
}
/// A checked block ID used for the substrate-client implementation of CheckedBlockId;
#[derive(Debug, Clone, Copy)]
pub struct CheckedId(BlockId);
impl CheckedBlockId for CheckedId {
fn block_id(&self) -> &BlockId {
&self.0
}
}
// set up the necessary scaffolding to execute the runtime.
macro_rules! with_runtime {
($client: ident, $at: expr, $exec: expr) => {{
// bail if the code is not the same as the natively linked.
if $client.code_at($at)? != LocalDispatch::native_equivalent() {
bail!(ErrorKind::UnknownRuntime);
}
$client.state_at($at).map_err(Error::from).and_then(|state| {
$client.state_at($at.block_id()).map_err(Error::from).and_then(|state| {
let mut changes = Default::default();
let mut ext = state_machine::Ext {
overlay: &mut changes,
......@@ -141,33 +166,44 @@ macro_rules! with_runtime {
impl<B: Backend> PolkadotApi for Client<B, NativeExecutor<LocalDispatch>>
where ::client::error::Error: From<<<B as Backend>::State as state_machine::backend::Backend>::Error>
{
type CheckedBlockId = CheckedId;
type BlockBuilder = ClientBlockBuilder<B::State>;
fn session_keys(&self, at: &BlockId) -> Result<Vec<SessionKey>> {
fn check_id(&self, id: BlockId) -> Result<CheckedId> {
// bail if the code is not the same as the natively linked.
if self.code_at(&id)? != LocalDispatch::native_equivalent() {
bail!(ErrorKind::UnknownRuntime);
}
Ok(CheckedId(id))
}
fn session_keys(&self, at: &CheckedId) -> Result<Vec<SessionKey>> {
with_runtime!(self, at, ::runtime::consensus::authorities)
}
fn validators(&self, at: &BlockId) -> Result<Vec<AccountId>> {
fn validators(&self, at: &CheckedId) -> Result<Vec<AccountId>> {
with_runtime!(self, at, ::runtime::session::validators)
}
fn duty_roster(&self, at: &BlockId) -> Result<DutyRoster> {
fn duty_roster(&self, at: &CheckedId) -> Result<DutyRoster> {
with_runtime!(self, at, ::runtime::parachains::calculate_duty_roster)
}
fn timestamp(&self, at: &BlockId) -> Result<Timestamp> {
fn timestamp(&self, at: &CheckedId) -> Result<Timestamp> {
with_runtime!(self, at, ::runtime::timestamp::get)
}
fn evaluate_block(&self, at: &BlockId, block: Block) -> Result<()> {
fn evaluate_block(&self, at: &CheckedId, block: Block) -> Result<()> {
with_runtime!(self, at, || ::runtime::system::internal::execute_block(block))
}
fn build_block(&self, parent: &BlockId, timestamp: Timestamp) -> Result<Self::BlockBuilder> {
if self.code_at(parent)? != LocalDispatch::native_equivalent() {
bail!(ErrorKind::UnknownRuntime);
}
fn nonce(&self, at: &Self::CheckedBlockId, account: AccountId) -> Result<TxOrder> {
with_runtime!(self, at, || ::runtime::system::nonce(account))
}
fn build_block(&self, parent: &CheckedId, timestamp: Timestamp) -> Result<Self::BlockBuilder> {
let parent = parent.block_id();
let header = Header {
parent_hash: self.block_hash_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))?,
number: self.block_number_from_id(parent)?.ok_or(ErrorKind::UnknownBlock(*parent))? + 1,
......@@ -316,23 +352,24 @@ mod tests {
#[test]
fn gets_session_and_validator_keys() {
let client = client();
assert_eq!(client.session_keys(&BlockId::Number(0)).unwrap(), validators());
assert_eq!(client.validators(&BlockId::Number(0)).unwrap(), validators());
let id = client.check_id(BlockId::Number(0)).unwrap();
assert_eq!(client.session_keys(&id).unwrap(), validators());
assert_eq!(client.validators(&id).unwrap(), validators());
}
#[test]
fn build_block() {
let client = client();
let block_builder = client.build_block(&BlockId::Number(0), 1_000_000).unwrap();
let id = client.check_id(BlockId::Number(0)).unwrap();
let block_builder = client.build_block(&id, 1_000_000).unwrap();
let block = block_builder.bake();
assert_eq!(block.header.number, 1);
}
#[test]
fn cannot_build_block_on_unknown_parent() {
let client = client();
assert!(client.build_block(&BlockId::Number(100), 1_000_000).is_err());
fn fails_to_check_id_for_unknown_block() {
assert!(client().check_id(BlockId::Number(100)).is_err());
}
}
......@@ -13,6 +13,7 @@ polkadot-api = { path = "../api" }
polkadot-collator = { path = "../collator" }
polkadot-primitives = { path = "../primitives" }
polkadot-statement-table = { path = "../statement-table" }
polkadot-transaction-pool = { path = "../transaction-pool" }
substrate-bft = { path = "../../substrate/bft" }
substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" }
......@@ -41,6 +41,13 @@ error_chain! {
description("Proposal had wrong parent hash."),
display("Proposal had wrong parent hash. Expected {:?}, got {:?}", expected, got),
}
ProposalTooLarge(size: usize) {
description("Proposal exceeded the maximum size."),
display(
"Proposal exceeded the maximum size of {} by {} bytes.",
::MAX_TRANSACTIONS_SIZE, ::MAX_TRANSACTIONS_SIZE.saturating_sub(*size)
),
}
}
}
......
......@@ -37,6 +37,7 @@ extern crate polkadot_api;
extern crate polkadot_collator as collator;
extern crate polkadot_statement_table as table;
extern crate polkadot_primitives;
extern crate polkadot_transaction_pool as transaction_pool;
extern crate substrate_bft as bft;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
......@@ -56,6 +57,7 @@ use polkadot_primitives::block::Block as PolkadotBlock;
use polkadot_primitives::parachain::{Id as ParaId, DutyRoster, BlockData, Extrinsic, CandidateReceipt};
use primitives::block::{Block as SubstrateBlock, Header as SubstrateHeader, HeaderHash, Id as BlockId};
use primitives::AuthorityId;
use transaction_pool::TransactionPool;
use futures::prelude::*;
use futures::future;
......@@ -65,6 +67,9 @@ pub use self::error::{ErrorKind, Error};
mod error;
// block size limit.
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
/// A handle to a statement table router.
pub trait TableRouter {
/// Errors when fetching data from the network.
......@@ -455,6 +460,8 @@ fn make_group_info(roster: DutyRoster, authorities: &[AuthorityId]) -> Result<Ha
pub struct ProposerFactory<C, N> {
/// The client instance.
pub client: Arc<C>,
/// The transaction pool.
pub transaction_pool: Arc<Mutex<TransactionPool>>,
/// The backing network handle.
pub network: N,
}
......@@ -465,7 +472,9 @@ impl<C: PolkadotApi, N: Network> bft::ProposerFactory for ProposerFactory<C, N>
fn init(&self, parent_header: &SubstrateHeader, authorities: &[AuthorityId], sign_with: Arc<ed25519::Pair>) -> Result<Self::Proposer, Error> {
let parent_hash = parent_header.hash();
let duty_roster = self.client.duty_roster(&BlockId::Hash(parent_hash))?;
let checked_id = self.client.check_id(BlockId::Hash(parent_hash))?;
let duty_roster = self.client.duty_roster(&checked_id)?;
let group_info = make_group_info(duty_roster, authorities)?;
let table = Arc::new(SharedTable::new(group_info, sign_with, parent_hash));
......@@ -474,9 +483,11 @@ impl<C: PolkadotApi, N: Network> bft::ProposerFactory for ProposerFactory<C, N>
// TODO [PoC-2]: kick off collation process.
Ok(Proposer {
parent_hash,
parent_id: checked_id,
_table: table,
_router: router,
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
})
}
}
......@@ -490,9 +501,11 @@ fn current_timestamp() -> Timestamp {
}
/// The Polkadot proposer logic.
pub struct Proposer<C, R> {
pub struct Proposer<C: PolkadotApi, R> {
parent_hash: HeaderHash,
parent_id: C::CheckedBlockId,
client: Arc<C>,
transaction_pool: Arc<Mutex<TransactionPool>>,
_table: Arc<SharedTable>,
_router: R,
}
......@@ -503,14 +516,45 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
type Evaluate = Result<bool, Error>;
fn propose(&self) -> Result<SubstrateBlock, Error> {
use transaction_pool::Ready;
// TODO: handle case when current timestamp behind that in state.
let polkadot_block = self.client.build_block(
&BlockId::Hash(self.parent_hash),
let mut block_builder = self.client.build_block(
&self.parent_id,
current_timestamp()
)?.bake();
)?;
let readiness_evaluator = Ready::create(self.parent_id.clone(), &*self.client);
{
let mut pool = self.transaction_pool.lock();
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
for pending in pool.pending(readiness_evaluator.clone()) {
// skip and cull transactions which are too large.
if pending.encoded_size() > MAX_TRANSACTIONS_SIZE {
unqueue_invalid.push(pending.hash().clone());
continue
}
// TODO: integrate transaction queue and `push_transaction`s.
if pending_size + pending.encoded_size() >= MAX_TRANSACTIONS_SIZE { break }
match block_builder.push_transaction(pending.as_transaction().clone()) {
Ok(()) => {
pending_size += pending.encoded_size();
}
Err(_) => {
unqueue_invalid.push(pending.hash().clone());
}
}
}
for tx_hash in unqueue_invalid {
pool.remove(&tx_hash, false);
}
}
let polkadot_block = block_builder.bake();
let substrate_block = Slicable::decode(&mut polkadot_block.encode().as_slice())
.expect("polkadot blocks defined to serialize to substrate blocks correctly; qed");
......@@ -519,7 +563,7 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
// TODO: certain kinds of errors here should lead to a misbehavior report.
fn evaluate(&self, proposal: &SubstrateBlock) -> Result<bool, Error> {
evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash)
evaluate_proposal(proposal, &*self.client, current_timestamp(), &self.parent_hash, &self.parent_id)
}
}
......@@ -528,6 +572,7 @@ fn evaluate_proposal<C: PolkadotApi>(
client: &C,
now: Timestamp,
parent_hash: &HeaderHash,
parent_id: &C::CheckedBlockId,
) -> Result<bool, Error> {
const MAX_TIMESTAMP_DRIFT: Timestamp = 4;
......@@ -535,6 +580,14 @@ fn evaluate_proposal<C: PolkadotApi>(
let proposal = PolkadotBlock::decode(&mut &encoded[..])
.ok_or_else(|| ErrorKind::ProposalNotForPolkadot)?;
let transactions_size = proposal.body.transactions.iter().fold(0, |a, tx| {
a + Slicable::encode(tx).len()
});
if transactions_size > MAX_TRANSACTIONS_SIZE {
bail!(ErrorKind::ProposalTooLarge(transactions_size))
}
if proposal.header.parent_hash != *parent_hash {
bail!(ErrorKind::WrongParentHash(*parent_hash, proposal.header.parent_hash));
}
......@@ -551,6 +604,6 @@ fn evaluate_proposal<C: PolkadotApi>(
}
// execute the block.
client.evaluate_block(&BlockId::Hash(*parent_hash), proposal)?;
client.evaluate_block(parent_id, proposal)?;
Ok(true)
}
......@@ -237,7 +237,10 @@ impl Function {
///
/// Transactions containing inherent functions should not be signed.
pub fn is_inherent(&self) -> bool {
self.inherent_index().is_some()
match *self {
Function::Inherent(_) => true,
_ => false,
}
}
/// If this function is inherent, returns the index it should occupy
......@@ -370,6 +373,11 @@ impl UncheckedTransaction {
}
}
/// Whether this transaction invokes an inherent function.
pub fn is_inherent(&self) -> bool {
self.transaction.function.is_inherent()
}
/// Create a new inherent-style transaction from the given function.
pub fn inherent(function: InherentFunction) -> Self {
UncheckedTransaction {
......
......@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use runtime::{system, parachains, consensus, session, timestamp};
use runtime::{system, parachains, consensus, session};
impl_stubs!(
execute_block => |block| system::internal::execute_block(block),
......@@ -24,5 +24,6 @@ impl_stubs!(
validators => |()| session::validators(),
authorities => |()| consensus::authorities(),
duty_roster => |()| parachains::calculate_duty_roster(),
get_timestamp => |()| timestamp::get()
timestamp => |()| ::runtime::timestamp::get(),
nonce => |account_id| system::nonce(account_id)
);
......@@ -31,9 +31,9 @@ extern crate polkadot_primitives;
#[cfg(test)] #[macro_use] extern crate hex_literal;
pub mod api;
pub mod environment;
pub mod runtime;
pub mod api;
#[cfg(feature = "std")] pub mod genesismap;
......
......@@ -17,19 +17,26 @@
//! System manager: Handles all of the top-level stuff; executing block/transaction, setting code
//! and depositing logs.
use rstd::prelude::*;
use rstd::mem;
use runtime_io::{print, storage_root, enumerated_trie_root};
use rstd::prelude::*;
use codec::{KeyedVec, Slicable};
use runtime_support::{Hashable, storage};
use environment::with_env;
use polkadot_primitives::{AccountId, Hash, TxOrder, BlockNumber, Block, Header,
UncheckedTransaction, Function, InherentFunction, Log};
use polkadot_primitives::{
AccountId, Hash, TxOrder, BlockNumber, Block, Header,
UncheckedTransaction, Function, InherentFunction, Log
};
use runtime_io::{print, storage_root, enumerated_trie_root};
use runtime_support::{Hashable, storage};
use runtime::{staking, session};
const NONCE_OF: &[u8] = b"sys:non:";
const BLOCK_HASH_AT: &[u8] = b"sys:old:";
const TEMP_TRANSACTION_NUMBER: &[u8] = b"temp:txcount:";
/// Prefixes account ID and stores u64 nonce.
pub const NONCE_OF: &[u8] = b"sys:non:";
/// Prefixes block number and stores hash of that block.
pub const BLOCK_HASH_AT: &[u8] = b"sys:old:";
/// Stores the temporary current transaction number.
pub const TEMP_TRANSACTION_NUMBER: &[u8] = b"temp:txcount";
/// The current block number being processed. Set by `execute_block`.
pub fn block_number() -> BlockNumber {
......@@ -53,8 +60,6 @@ pub mod privileged {
pub mod internal {
use super::*;
struct CheckedTransaction(UncheckedTransaction);
/// Deposits a log and ensures it matches the blocks log data.
pub fn deposit_log(log: Log) {
with_env(|e| e.digest.logs.push(log));
......@@ -141,6 +146,12 @@ pub mod internal {
}
}
/// Get an account's current nonce.
pub fn nonce(account: AccountId) -> TxOrder {
let nonce_key = account.to_keyed_vec(NONCE_OF);
storage::get_or(&nonce_key, 0)
}
/// Dispatch a function.
fn dispatch_function(function: &Function, transactor: &AccountId) {
match *function {
......
[package]
name = "polkadot-transaction-pool"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
transaction-pool = "1.9.0"
error-chain = "0.11"
polkadot-api = { path = "../api" }
polkadot-primitives = { path = "../primitives" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-codec = { path = "../../substrate/codec" }
ed25519 = { path = "../../substrate/ed25519" }
ethereum-types = "0.2"
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
extern crate transaction_pool;
extern crate polkadot_api;
extern crate polkadot_primitives as primitives;
extern crate substrate_primitives as substrate_primitives;
extern crate substrate_codec as codec;
extern crate ed25519;
extern crate ethereum_types;
#[macro_use]
extern crate error_chain;
use std::collections::HashMap;
use std::cmp::Ordering;
use std::sync::Arc;
use polkadot_api::PolkadotApi;
use primitives::AccountId;
use primitives::transaction::UncheckedTransaction;
use transaction_pool::{Pool, Readiness};
use transaction_pool::scoring::{Change, Choice};
// TODO: make queue generic over hash and sender so we don't need ethereum-types
pub use ethereum_types::{Address as TruncatedAccountId, H256 as TransactionHash};
pub use transaction_pool::{Options, Status, LightStatus, NoopListener, VerifiedTransaction as VerifiedTransactionOps};
/// Truncate an account ID to 160 bits.
pub fn truncate_id(id: &AccountId) -> TruncatedAccountId {
TruncatedAccountId::from_slice(&id[..20])
}
/// Iterator over pending transactions.
pub type PendingIterator<'a, C> =
transaction_pool::PendingIterator<'a, VerifiedTransaction, Ready<'a, C>, Scoring, NoopListener>;
error_chain! {
errors {
/// Attempted to queue an inherent transaction.
IsInherent(tx: UncheckedTransaction) {
description("Inherent transactions cannot be queued."),
display("Inehrent transactions cannot be queued."),
}
/// Attempted to queue a transaction with bad signature.
BadSignature(tx: UncheckedTransaction) {
description("Transaction had bad signature."),
display("Transaction had bad signature."),
}
/// Import error.
Import(err: Box<::std::error::Error + Send>) {
description("Error importing transaction"),
display("Error importing transaction: {}", err.description()),
}
}
}
/// A verified transaction which should be includable and non-inherent.
#[derive(Debug, Clone)]
pub struct VerifiedTransaction {
inner: UncheckedTransaction,
hash: TransactionHash,
address: TruncatedAccountId,
insertion_id: u64,
encoded_size: usize,
}
impl VerifiedTransaction {
/// Attempt to verify a transaction.
fn create(tx: UncheckedTransaction, insertion_id: u64) -> Result<Self> {
if tx.is_inherent() {
bail!(ErrorKind::IsInherent(tx))
}
let message = codec::Slicable::encode(&tx);
if ed25519::verify(&*tx.signature, &message, &tx.transaction.signed[..]) {
// TODO: make transaction-pool use generic types.
let hash = substrate_primitives::hashing::blake2_256(&message);
let address = truncate_id(&tx.transaction.signed);
Ok(VerifiedTransaction {
inner: tx,
hash: hash.into(),
encoded_size: message.len(),
address,
insertion_id,
})
} else {
Err(ErrorKind::BadSignature(tx).into())
}
}
/// Access the underlying transaction.
pub fn as_transaction(&self) -> &UncheckedTransaction {
self.as_ref()
}
/// Consume the verified transaciton, yielding the unchecked counterpart.
pub fn into_inner(self) -> UncheckedTransaction {
self.inner
}
/// Get the 256-bit hash of this transaction.
pub fn hash(&self) -> &TransactionHash {
&self.hash
}
/// Get the truncated account ID of the sender of this transaction.
pub fn sender(&self) -> &TruncatedAccountId {
&self.address
}