Unverified Commit 13b58b13 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Authorship works again (#50)

* provide through inherent-data when authoring

* remove unneeded codec round-trip in proposer

* refactor polkadot-consensus service architecture

* integrate block authorship into polkadot service

* remove unused extern substrate-network crate in service

* write wrapper for unifying errors in consensus proposer

* extend wrapper further

* switch temporarily to macro-changing branch

* runtime compiles

* implement `inherent_extrinsics` for runtime

* block authorship works

* add GRANDPA to polkadot runtime

* get everything compiling

* use substrate master branch again

* remove some unneeded params

* update WASM

* parse only extrinsics when pruning availability store

* update recent deps

* runtime almost compiles

* need to expose trait type in build : I had to put phantomdata manually.

* finish updating authorship to latest GRANDPA and Aura

* fix tests

* update wasm
parent 6430a001
Pipeline #26924 passed with stages
in 14 minutes and 52 seconds
This diff is collapsed.
......@@ -10,6 +10,6 @@ parking_lot = "0.4"
log = "0.3"
parity-codec = "2.1"
substrate-primitives = { git = "https://github.com/paritytech/substrate" }
kvdb = { git = "https://github.com/paritytech/parity-common.git" }
kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common.git" }
kvdb-memorydb = { git = "https://github.com/paritytech/parity-common.git" }
kvdb = { git = "https://github.com/paritytech/parity-common", rev="616b40150ded71f57f650067fcbc5c99d7c343e6" }
kvdb-rocksdb = { git = "https://github.com/paritytech/parity-common", rev="616b40150ded71f57f650067fcbc5c99d7c343e6" }
kvdb-memorydb = { git = "https://github.com/paritytech/parity-common", rev="616b40150ded71f57f650067fcbc5c99d7c343e6" }
......@@ -77,7 +77,7 @@ fn extrinsic_key(relay_parent: &Hash, candidate_hash: &Hash) -> Vec<u8> {
/// Handle to the availability store.
#[derive(Clone)]
pub struct Store {
inner: Arc<KeyValueDB>,
inner: Arc<dyn KeyValueDB>,
}
impl Store {
......
......@@ -102,7 +102,7 @@ pub fn run<I, T, W>(args: I, worker: W, version: cli::VersionInfo) -> error::Res
let (spec, mut config) = cli::parse_matches::<service::Factory, _>(load_spec, version, "parity-polkadot", &matches)?;
match cli::execute_default::<service::Factory, _,>(spec, worker, &matches)? {
match cli::execute_default::<service::Factory, _,>(spec, worker, &matches, &config)? {
cli::Action::ExecutedInternally => (),
cli::Action::RunService(worker) => {
info!("Parity ·:· Polkadot");
......
......@@ -16,7 +16,6 @@ polkadot-parachain = { path = "../parachain" }
polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" }
polkadot-statement-table = { path = "../statement-table" }
substrate-consensus-aura = { git = "https://github.com/paritytech/substrate" }
substrate-finality-grandpa = { git = "https://github.com/paritytech/substrate" }
substrate-consensus-common = { git = "https://github.com/paritytech/substrate" }
substrate-primitives = { git = "https://github.com/paritytech/substrate" }
......
......@@ -14,10 +14,10 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Consensus service.
//! Attestation service.
/// Consensus service. A long running service that manages BFT agreement and parachain
/// candidate agreement over the network.
/// Attestation service. A long running service that creates and manages parachain attestation
/// instances.
///
/// This uses a handle to an underlying thread pool to dispatch heavy work
/// such as candidate verification while performing event-driven work
......@@ -33,65 +33,57 @@ use client::blockchain::HeaderBackend;
use client::runtime_api::Core;
use primitives::ed25519;
use futures::prelude::*;
use polkadot_primitives::{Block, BlockId};
use polkadot_primitives::{Block, BlockId, InherentData};
use polkadot_primitives::parachain::ParachainHost;
use extrinsic_store::Store as ExtrinsicStore;
use runtime_primitives::traits::ProvideRuntimeApi;
use transaction_pool::txpool::{ChainApi as PoolChainApi, Pool};
use tokio::runtime::TaskExecutor as ThreadPoolHandle;
use tokio::runtime::TaskExecutor;
use tokio::runtime::current_thread::Runtime as LocalRuntime;
use tokio::timer::Interval;
use super::{Network, Collators, ProposerFactory};
use super::{Network, Collators};
// creates a task to prune redundant entries in availability store upon block finalization
//
// NOTE: this will need to be changed to finality notification rather than
// block import notifications when the consensus switches to non-instant finality.
fn prune_unneeded_availability<C>(client: Arc<C>, extrinsic_store: ExtrinsicStore)
fn prune_unneeded_availability<P>(client: Arc<P>, extrinsic_store: ExtrinsicStore)
-> impl Future<Item=(),Error=()> + Send
where C: Send + Sync + BlockchainEvents<Block> + BlockBody<Block> + 'static
where P: Send + Sync + BlockchainEvents<Block> + BlockBody<Block> + 'static
{
use codec::{Encode, Decode};
use polkadot_primitives::BlockId;
enum NotifyError {
NoBody,
BodyFetch(::client::error::Error),
UnexpectedFormat,
}
impl NotifyError {
fn log(&self, hash: &::polkadot_primitives::Hash) {
match *self {
NotifyError::NoBody => warn!("No block body for imported block {:?}", hash),
NotifyError::BodyFetch(ref err) => warn!("Failed to fetch block body for imported block {:?}: {:?}", hash, err),
NotifyError::UnexpectedFormat => warn!("Consensus outdated: Block {:?} has unexpected body format", hash),
}
}
}
client.finality_notification_stream()
.for_each(move |notification| {
use polkadot_runtime::{Call, ParachainsCall};
use polkadot_runtime::{Call, ParachainsCall, UncheckedExtrinsic as RuntimeExtrinsic};
let hash = notification.hash;
let parent_hash = notification.header.parent_hash;
let runtime_block = client.block_body(&BlockId::hash(hash))
.map_err(NotifyError::BodyFetch)
.and_then(|maybe_body| maybe_body.ok_or(NotifyError::NoBody))
.map(|extrinsics| Block { header: notification.header, extrinsics })
.map(|b: Block| ::polkadot_runtime::Block::decode(&mut b.encode().as_slice()))
.and_then(|maybe_block| maybe_block.ok_or(NotifyError::UnexpectedFormat));
let runtime_block = match runtime_block {
let extrinsics = client.block_body(&BlockId::hash(hash))
.map_err(NotifyError::BodyFetch);
let extrinsics = match extrinsics {
Ok(r) => r,
Err(e) => { e.log(&hash); return Ok(()) }
};
let candidate_hashes = match runtime_block.extrinsics
let candidate_hashes = match extrinsics
.iter()
.filter_map(|ex| RuntimeExtrinsic::decode(&mut ex.encode().as_slice()))
.filter_map(|ex| match ex.function {
Call::Parachains(ParachainsCall::set_heads(ref heads)) =>
Some(heads.iter().map(|c| c.candidate.hash()).collect()),
......@@ -111,130 +103,107 @@ fn prune_unneeded_availability<C>(client: Arc<C>, extrinsic_store: ExtrinsicStor
})
}
/// Consensus service. Starts working when created.
pub struct Service {
/// Parachain candidate attestation service handle.
pub(crate) struct ServiceHandle {
thread: Option<thread::JoinHandle<()>>,
exit_signal: Option<::exit_future::Signal>,
}
impl Service {
/// Create and start a new instance.
pub fn new<C, N, TxApi>(
client: Arc<C>,
network: N,
transaction_pool: Arc<Pool<TxApi>>,
thread_pool: ThreadPoolHandle,
parachain_empty_duration: Duration,
key: ed25519::Pair,
extrinsic_store: ExtrinsicStore,
) -> Service
where
C: BlockchainEvents<Block> + ChainHead<Block> + BlockBody<Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
C::Api: ParachainHost<Block> + Core<Block> + BlockBuilder<Block>,
N: Network + Collators + Send + Sync + 'static,
N::TableRouter: Send + 'static,
<N::Collation as IntoFuture>::Future: Send + 'static,
TxApi: PoolChainApi<Block=Block> + Send + 'static,
{
use parking_lot::Mutex;
use std::collections::HashMap;
const TIMER_DELAY: Duration = Duration::from_secs(5);
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
let (signal, exit) = ::exit_future::signal();
let thread = thread::spawn(move || {
let mut runtime = LocalRuntime::new().expect("Could not create local runtime");
let key = Arc::new(key);
let parachain_consensus = Arc::new(::ParachainConsensus{
client: client.clone(),
network: network.clone(),
collators: network.clone(),
handle: thread_pool.clone(),
extrinsic_store: extrinsic_store.clone(),
parachain_empty_duration,
live_instances: Mutex::new(HashMap::new()),
});
let factory = ProposerFactory::new(
parachain_consensus.clone(),
transaction_pool
/// Create and start a new instance of the attestation service.
pub(crate) fn start<C, N, P>(
client: Arc<P>,
parachain_consensus: Arc<::ParachainConsensus<C, N, P>>,
thread_pool: TaskExecutor,
key: Arc<ed25519::Pair>,
extrinsic_store: ExtrinsicStore,
) -> ServiceHandle
where
C: Collators + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
P: BlockchainEvents<Block> + ChainHead<Block> + BlockBody<Block>,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + Core<Block> + BlockBuilder<Block, InherentData>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
{
const TIMER_DELAY: Duration = Duration::from_secs(5);
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
let (signal, exit) = ::exit_future::signal();
let thread = thread::spawn(move || {
let mut runtime = LocalRuntime::new().expect("Could not create local runtime");
let notifications = {
let client = client.clone();
let consensus = parachain_consensus.clone();
let key = key.clone();
client.import_notification_stream().for_each(move |notification| {
let parent_hash = notification.hash;
if notification.is_new_best {
let res = client
.runtime_api()
.authorities(&BlockId::hash(parent_hash))
.map_err(Into::into)
.and_then(|authorities| {
consensus.get_or_instantiate(
parent_hash,
&authorities,
key.clone(),
)
});
if let Err(e) = res {
warn!("Unable to start parachain consensus on top of {:?}: {}",
parent_hash, e);
}
}
Ok(())
})
};
let prune_old_sessions = {
let client = client.clone();
let interval = Interval::new(
Instant::now() + TIMER_DELAY,
TIMER_INTERVAL,
);
let notifications = {
let client = client.clone();
let consensus = parachain_consensus.clone();
let key = key.clone();
client.import_notification_stream().for_each(move |notification| {
let parent_hash = notification.hash;
if notification.is_new_best {
let res = client
.runtime_api()
.authorities(&BlockId::hash(parent_hash))
.map_err(Into::into)
.and_then(|authorities| {
consensus.get_or_instantiate(
parent_hash,
&authorities,
key.clone(),
)
});
if let Err(e) = res {
warn!("Unable to start parachain consensus on top of {:?}: {}",
parent_hash, e);
}
interval
.for_each(move |_| match client.leaves() {
Ok(leaves) => {
parachain_consensus.retain(|h| leaves.contains(h));
Ok(())
}
Err(e) => {
warn!("Error fetching leaves from client: {:?}", e);
Ok(())
}
Ok(())
})
};
.map_err(|e| warn!("Timer error {:?}", e))
};
let prune_old_sessions = {
let client = client.clone();
let interval = Interval::new(
Instant::now() + TIMER_DELAY,
TIMER_INTERVAL,
);
interval
.for_each(move |_| match client.leaves() {
Ok(leaves) => {
parachain_consensus.retain(|h| leaves.contains(h));
Ok(())
}
Err(e) => {
warn!("Error fetching leaves from client: {:?}", e);
Ok(())
}
})
.map_err(|e| warn!("Timer error {:?}", e))
};
runtime.spawn(notifications);
thread_pool.spawn(prune_old_sessions);
runtime.spawn(notifications);
thread_pool.spawn(prune_old_sessions);
let prune_available = prune_unneeded_availability(client, extrinsic_store)
.select(exit.clone())
.then(|_| Ok(()));
let prune_available = prune_unneeded_availability(client, extrinsic_store)
.select(exit.clone())
.then(|_| Ok(()));
// spawn this on the tokio executor since it's fine on a thread pool.
thread_pool.spawn(prune_available);
// spawn this on the tokio executor since it's fine on a thread pool.
thread_pool.spawn(prune_available);
if let Err(e) = runtime.block_on(exit) {
debug!("BFT event loop error {:?}", e);
}
});
Service {
thread: Some(thread),
exit_signal: Some(signal),
if let Err(e) = runtime.block_on(exit) {
debug!("BFT event loop error {:?}", e);
}
});
ServiceHandle {
thread: Some(thread),
exit_signal: Some(signal),
}
}
impl Drop for Service {
impl Drop for ServiceHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
......
......@@ -74,9 +74,6 @@ impl DynamicInclusion {
Some(now + until)
}
}
/// Get the start instant.
pub fn started_at(&self) -> Instant { self.start }
}
#[cfg(test)]
......
......@@ -45,7 +45,6 @@ extern crate substrate_client as client;
extern crate exit_future;
extern crate tokio;
extern crate substrate_consensus_common as consensus;
extern crate substrate_consensus_aura as aura;
extern crate substrate_finality_grandpa as grandpa;
extern crate substrate_transaction_pool as transaction_pool;
......@@ -65,14 +64,17 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{self, Duration, Instant};
use aura::ExtraVerification;
use client::{BlockchainEvents, ChainHead, BlockBody};
use client::blockchain::HeaderBackend;
use client::block_builder::api::BlockBuilder;
use codec::{Decode, Encode};
use client::block_builder::api::BlockBuilder as BlockBuilderApi;
use client::runtime_api::Core;
use codec::Encode;
use extrinsic_store::Store as ExtrinsicStore;
use parking_lot::Mutex;
use polkadot_primitives::{Hash, Block, BlockId, BlockNumber, Header, Timestamp, SessionKey};
use polkadot_primitives::{Compact, UncheckedExtrinsic};
use polkadot_primitives::{
Hash, Block, BlockId, BlockNumber, Header, Timestamp, SessionKey, InherentData
};
use polkadot_primitives::Compact;
use polkadot_primitives::parachain::{Id as ParaId, Chain, DutyRoster, BlockData, Extrinsic as ParachainExtrinsic, CandidateReceipt, CandidateSignature};
use polkadot_primitives::parachain::{AttestedCandidate, ParachainHost, Statement as PrimitiveStatement};
use primitives::{AuthorityId, ed25519};
......@@ -81,6 +83,7 @@ use tokio::runtime::TaskExecutor;
use tokio::timer::{Delay, Interval};
use transaction_pool::txpool::{Pool, ChainApi as PoolChainApi};
use attestation_service::ServiceHandle;
use futures::prelude::*;
use futures::future::{self, Either};
use collation::CollationFetch;
......@@ -89,12 +92,11 @@ use dynamic_inclusion::DynamicInclusion;
pub use self::collation::{validate_collation, Collators};
pub use self::error::{ErrorKind, Error};
pub use self::shared_table::{SharedTable, StatementProducer, ProducedStatements, Statement, SignedStatement, GenericStatement};
pub use service::Service;
mod attestation_service;
mod dynamic_inclusion;
mod evaluation;
mod error;
mod service;
mod shared_table;
pub mod collation;
......@@ -255,7 +257,7 @@ impl<C, N, P> ParachainConsensus<C, N, P> where
C: Collators + Send + 'static,
N: Network,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + BlockBuilder<Block>,
P::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
<C::Collation as IntoFuture>::Future: Send + 'static,
N::TableRouter: Send + 'static,
{
......@@ -271,8 +273,6 @@ impl<C, N, P> ParachainConsensus<C, N, P> where
)
-> Result<Arc<AttestationTracker>, Error>
{
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
let mut live_instances = self.live_instances.lock();
if let Some(tracker) = live_instances.get(&parent_hash) {
return Ok(tracker.clone());
......@@ -280,10 +280,6 @@ impl<C, N, P> ParachainConsensus<C, N, P> where
let id = BlockId::hash(parent_hash);
let duty_roster = self.client.runtime_api().duty_roster(&id)?;
let random_seed = self.client.runtime_api().random_seed(&id)?;
let _random_seed = BlakeTwo256::hash(random_seed.as_ref());
let _validators = self.client.runtime_api().validators(&id)?;
let (group_info, local_duty) = make_group_info(
duty_roster,
......@@ -298,7 +294,6 @@ impl<C, N, P> ParachainConsensus<C, N, P> where
debug!(target: "consensus", "Active parachains: {:?}", active_parachains);
let _n_parachains = active_parachains.len();
let table = Arc::new(SharedTable::new(group_info, sign_with.clone(), parent_hash, self.extrinsic_store.clone()));
let router = self.network.communication_for(
authorities,
......@@ -358,22 +353,55 @@ struct AttestationTracker {
}
/// Polkadot proposer factory.
struct ProposerFactory<C, N, P, TxApi: PoolChainApi> {
pub struct ProposerFactory<C, N, P, TxApi: PoolChainApi> {
parachain_consensus: Arc<ParachainConsensus<C, N, P>>,
transaction_pool: Arc<Pool<TxApi>>,
_service_handle: ServiceHandle,
}
impl<C, N, P, TxApi> ProposerFactory<C, N, P, TxApi> where
C: Collators + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
P: BlockchainEvents<Block> + ChainHead<Block> + BlockBody<Block>,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + Core<Block> + BlockBuilderApi<Block, InherentData>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
TxApi: PoolChainApi,
{
/// Create a new proposer factory.
fn new(
parachain_consensus: Arc<ParachainConsensus<C, N, P>>,
pub fn new(
client: Arc<P>,
network: N,
collators: C,
transaction_pool: Arc<Pool<TxApi>>,
thread_pool: TaskExecutor,
parachain_empty_duration: Duration,
key: Arc<ed25519::Pair>,
extrinsic_store: ExtrinsicStore,
) -> Self {
let parachain_consensus = Arc::new(ParachainConsensus {
client: client.clone(),
network,
collators,
handle: thread_pool.clone(),
extrinsic_store: extrinsic_store.clone(),
parachain_empty_duration,
live_instances: Mutex::new(HashMap::new()),
});
let service_handle = ::attestation_service::start(
client,
parachain_consensus.clone(),
thread_pool,
key,
extrinsic_store,
);
ProposerFactory {
parachain_consensus,
transaction_pool,
_service_handle: service_handle,
}
}
}
......@@ -383,7 +411,7 @@ impl<C, N, P, TxApi> consensus::Environment<Block> for ProposerFactory<C, N, P,
N: Network,
TxApi: PoolChainApi<Block=Block>,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + BlockBuilder<Block>,
P::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
<C::Collation as IntoFuture>::Future: Send + 'static,
N::TableRouter: Send + 'static,
{
......@@ -495,7 +523,7 @@ pub struct Proposer<C: Send + Sync, TxApi: PoolChainApi> where
impl<C, TxApi> consensus::Proposer<Block> for Proposer<C, TxApi> where
TxApi: PoolChainApi<Block=Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
C::Api: ParachainHost<Block> + BlockBuilder<Block>,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
{
type Error = Error;
type Create = Either<
......@@ -533,55 +561,6 @@ impl<C, TxApi> consensus::Proposer<Block> for Proposer<C, TxApi> where
}
}
/// Does verification before importing blocks.
/// Should be used for further verification in aura.
pub struct BlockVerifier;
impl ExtraVerification<Block> for BlockVerifier {
type Verified = Either<
future::FutureResult<(), String>,
Box<dyn Future<Item=(), Error=String>>,
>;
fn verify(&self, _header: &Header, body: Option<&[UncheckedExtrinsic]>) -> Self::Verified {
use polkadot_runtime::{Call, UncheckedExtrinsic, TimestampCall};
let body = match body {
None => return Either::A(future::ok(())),
Some(body) => body,
};
// TODO: reintroduce or revisit necessaity for includability tracker.
let timestamp = current_timestamp();
let maybe_in_block = body.iter()
.filter_map(|ex| {
let encoded = ex.encode();
let runtime_ex = UncheckedExtrinsic::decode(&mut &encoded[..])?;
match runtime_ex.function {
Call::Timestamp(TimestampCall::set(t)) => Some(t),
_ => None,
}
})
.next();
let timestamp_in_block = match maybe_in_block {
None => return Either::A(future::ok(())),
Some(t) => t,
};
// we wait until the block timestamp is earlier than current.
if timestamp.0 < timestamp_in_block.0 {
let diff_secs = timestamp_in_block.0 - timestamp.0;
let delay = Delay::new(Instant::now() + Duration::from_secs(diff_secs))
.map_err(move |e| format!("Error waiting for {} seconds: {:?}", diff_secs, e));
Either::B(Box::new(delay))
} else {
Either::A(future::ok(()))
}
}
}
fn current_timestamp() -> Timestamp {
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
.expect("now always later than unix epoch; qed")
......@@ -605,7 +584,7 @@ impl ProposalTiming {
//
// this interval is just meant to produce periodic task wakeups
// that lead to the `dynamic_inclusion` getting updated as necessary.
if let Async::Ready(x) = self.attempt_propose.poll().map_err(ErrorKind::Timer)? {
while let Async::Ready(x) = self.attempt_propose.poll().map_err(ErrorKind::Timer)? {
x.expect("timer still alive; intervals never end; qed");
}
......@@ -641,26 +620,31 @@ pub struct CreateProposal<C: Send + Sync, TxApi: PoolChainApi> {
impl<C, TxApi> CreateProposal<C, TxApi> where
TxApi: PoolChainApi<Block=Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
C::Api: ParachainHost<Block> + BlockBuilder<Block>,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block, InherentData>,
{
fn propose_with(&self, candidates: Vec<AttestedCandidate>) -> Result<Block, Error> {
use client::block_builder::BlockBuilder;
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
use polkadot_primitives::InherentData;