// Copyright 2017-2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! Collation node logic. //! //! A collator node lives on a distinct parachain and submits a proposal for //! a state transition, along with a proof for its validity //! (what we might call a witness or block data). //! //! One of collators' other roles is to route messages between chains. //! Each parachain produces a list of "egress" posts of messages for each other //! parachain on each block, for a total of N^2 lists all together. //! //! We will refer to the egress list at relay chain block X of parachain A with //! destination B as egress(X)[A -> B] //! //! On every block, each parachain will be intended to route messages from some //! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3) //! //! Since the egress information is unique to every block, when routing from a //! parachain a collator must gather all egress posts from that parachain //! up to the last point in history that messages were successfully routed //! from that parachain, accounting for relay chain blocks where no candidate //! from the collator's parachain was produced. //! //! In the case that all parachains route to each other and a candidate for the //! collator's parachain was included in the last relay chain block, the collator //! only has to gather egress posts from other parachains one block back in relay //! chain history. //! //! This crate defines traits which provide context necessary for collation logic //! to be performed, as the collation logic itself. use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::time::Duration; use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn}; use log::warn; use sc_client::BlockchainEvents; use sp_core::Pair; use sp_runtime::traits::BlakeTwo256; use polkadot_primitives::{ BlockId, Hash, Block, parachain::{ self, BlockData, DutyRoster, HeadData, Id as ParaId, PoVBlock, ValidatorId, CollatorPair, LocalValidationData } }; use polkadot_cli::{ ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama, service::{self, Roles, SelectChain} }; use polkadot_network::legacy::validation::ValidationNetwork; pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration}; pub use polkadot_validation::SignedStatement; pub use polkadot_primitives::parachain::CollatorId; pub use sc_network::PeerId; pub use service::RuntimeApiCollection; const COLLATION_TIMEOUT: Duration = Duration::from_secs(30); /// An abstraction over the `Network` with useful functions for a `Collator`. pub trait Network: Send + Sync { /// Convert the given `CollatorId` to a `PeerId`. fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> Box> + Send>; /// Create a `Stream` of checked statements for the given `relay_parent`. /// /// The returned stream will not terminate, so it is required to make sure that the stream is /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. fn checked_statements(&self, relay_parent: Hash) -> Box>; } impl Network for ValidationNetwork where P: 'static + Send + Sync, SP: 'static + Spawn + Clone + Send + Sync, { fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> Box> + Send> { Box::new(Self::collator_id_to_peer_id(self, collator_id)) } fn checked_statements(&self, relay_parent: Hash) -> Box> { Box::new(Self::checked_statements(self, relay_parent)) } } /// Error to return when the head data was invalid. #[derive(Clone, Copy, Debug)] pub struct InvalidHead; /// Collation errors. #[derive(Debug)] pub enum Error { /// Error on the relay-chain side of things. Polkadot(String), /// Error on the collator side of things. Collator(InvalidHead), } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match *self { Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err), Error::Collator(_) => write!(f, "Collator node error: Invalid head data"), } } } /// The Polkadot client type. pub type PolkadotClient = sc_client::Client; /// Something that can build a `ParachainContext`. pub trait BuildParachainContext { /// The parachain context produced by the `build` function. type ParachainContext: self::ParachainContext; /// Build the `ParachainContext`. fn build( self, client: Arc>, spawner: SP, network: Arc, ) -> Result where PolkadotClient: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: RuntimeApiCollection, // Rust bug: https://github.com/rust-lang/rust/issues/24159 < as ProvideRuntimeApi>::Api as sp_api::ApiExt>::StateBackend: sp_api::StateBackend, Extrinsic: codec::Codec + Send + Sync + 'static, E: sc_client::CallExecutor + Clone + Send + Sync + 'static, SP: Spawn + Clone + Send + Sync + 'static, R: Send + Sync + 'static, B: sc_client_api::Backend + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 B::State: sp_api::StateBackend; } /// Parachain context needed for collation. /// /// This can be implemented through an externally attached service or a stub. /// This is expected to be a lightweight, shared type like an Arc. pub trait ParachainContext: Clone { type ProduceCandidate: Future>; /// Produce a candidate, given the relay parent hash, the latest ingress queue information /// and the last parachain head. fn produce_candidate( &mut self, relay_parent: Hash, status: LocalValidationData, ) -> Self::ProduceCandidate; } /// Produce a candidate for the parachain, with given contexts, parent head, and signing key. pub async fn collate

( relay_parent: Hash, local_id: ParaId, local_validation_data: LocalValidationData, mut para_context: P, key: Arc, ) -> Result where P: ParachainContext, P::ProduceCandidate: Send, { let (block_data, head_data) = para_context.produce_candidate( relay_parent, local_validation_data, ).map_err(Error::Collator).await?; let pov_block = PoVBlock { block_data, }; let pov_block_hash = pov_block.hash(); let signature = key.sign(¶chain::collator_signature_payload( &relay_parent, &local_id, &pov_block_hash, )); let info = parachain::CollationInfo { parachain_index: local_id, relay_parent, collator: key.public(), signature, head_data, pov_block_hash, }; let collation = parachain::Collation { info, pov: pov_block, }; Ok(collation) } fn build_collator_service( service: S, para_id: ParaId, key: Arc, build_parachain_context: P, ) -> Result where S: AbstractService, sc_client::Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: RuntimeApiCollection< Extrinsic, Error = sp_blockchain::Error, StateBackend = sc_client_api::StateBackendFor >, // Rust bug: https://github.com/rust-lang/rust/issues/24159 S::Backend: service::Backend, // Rust bug: https://github.com/rust-lang/rust/issues/24159 >::State: sp_api::StateBackend>, // Rust bug: https://github.com/rust-lang/rust/issues/24159 S::CallExecutor: service::CallExecutor, // Rust bug: https://github.com/rust-lang/rust/issues/24159 S::SelectChain: service::SelectChain, P: BuildParachainContext, P::ParachainContext: Send + 'static, ::ProduceCandidate: Send, Extrinsic: service::Codec + Send + Sync + 'static, { let spawner = service.spawn_task_handle(); let client = service.client(); let network = service.network(); let known_oracle = client.clone(); let select_chain = if let Some(select_chain) = service.select_chain() { select_chain } else { return Err("The node cannot work because it can't select chain.".into()) }; let is_known = move |block_hash: &Hash| { use consensus_common::BlockStatus; use polkadot_network::legacy::gossip::Known; match known_oracle.block_status(&BlockId::hash(*block_hash)) { Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, Ok(BlockStatus::KnownBad) => Some(Known::Bad), Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => match select_chain.leaves() { Err(_) => None, Ok(leaves) => if leaves.contains(block_hash) { Some(Known::Leaf) } else { Some(Known::Old) }, } } }; let message_validator = polkadot_network::legacy::gossip::register_validator( network.clone(), (is_known, client.clone()), &spawner, ); let validation_network = Arc::new(ValidationNetwork::new( message_validator, client.clone(), spawner.clone(), )); let parachain_context = match build_parachain_context.build( client.clone(), spawner, validation_network.clone(), ) { Ok(ctx) => ctx, Err(()) => { return Err("Could not build the parachain context!".into()) } }; let work = async move { let mut notification_stream = client.import_notification_stream(); while let Some(notification) = notification_stream.next().await { macro_rules! try_fr { ($e:expr) => { match $e { Ok(x) => x, Err(e) => return future::Either::Left(future::err(Error::Polkadot( format!("{:?}", e) ))), } } } let relay_parent = notification.hash; let id = BlockId::hash(relay_parent); let network = network.clone(); let client = client.clone(); let key = key.clone(); let parachain_context = parachain_context.clone(); let work = future::lazy(move |_| { let api = client.runtime_api(); let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) { Some(local_validation) => local_validation, None => return future::Either::Left(future::ok(())), }; let validators = try_fr!(api.validators(&id)); let targets = compute_targets( para_id, validators.as_slice(), try_fr!(api.duty_roster(&id)), ); let collation_work = collate( relay_parent, para_id, local_validation, parachain_context, key, ).map_ok(move |collation| { network.with_spec(move |spec, ctx| { spec.add_local_collation( ctx, relay_parent, targets, collation, ); }) }); future::Either::Right(collation_work) }); let deadlined = future::select( work.then(|f| f).boxed(), futures_timer::Delay::new(COLLATION_TIMEOUT) ); let silenced = deadlined .map(|either| { if let future::Either::Right(_) = either { warn!("Collation failure: timeout"); } }); let future = silenced.map(drop); tokio::spawn(future); } }.boxed(); service.spawn_essential_task("collation", work); Ok(service) } /// Async function that will run the collator node with the given `RelayChainContext` and `ParachainContext` /// built by the given `BuildParachainContext` and arguments to the underlying polkadot node. pub async fn start_collator

( build_parachain_context: P, para_id: ParaId, key: Arc, config: Configuration, ) -> Result<(), polkadot_service::Error> where P: BuildParachainContext, P::ParachainContext: Send + 'static, ::ProduceCandidate: Send, { match (config.expect_chain_spec().is_kusama(), config.roles) { (true, Roles::LIGHT) => build_collator_service( service::kusama_new_light(config, Some((key.public(), para_id)))?, para_id, key, build_parachain_context, )?.await, (true, _) => build_collator_service( service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?, para_id, key, build_parachain_context, )?.await, (false, Roles::LIGHT) => build_collator_service( service::polkadot_new_light(config, Some((key.public(), para_id)))?, para_id, key, build_parachain_context, )?.await, (false, _) => build_collator_service( service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?, para_id, key, build_parachain_context, )?.await, } } fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet { use polkadot_primitives::parachain::Chain; roster.validator_duty.iter().enumerate() .filter(|&(_, c)| c == &Chain::Parachain(para_id)) .filter_map(|(i, _)| session_keys.get(i)) .cloned() .collect() } /// Run a collator node with the given `RelayChainContext` and `ParachainContext` /// built by the given `BuildParachainContext` and arguments to the underlying polkadot node. /// /// This function blocks until done. pub fn run_collator

( build_parachain_context: P, para_id: ParaId, key: Arc, config: Configuration, ) -> polkadot_cli::Result<()> where P: BuildParachainContext, P::ParachainContext: Send + 'static, ::ProduceCandidate: Send, { match (config.expect_chain_spec().is_kusama(), config.roles) { (true, Roles::LIGHT) => sc_cli::run_service_until_exit(config, |config| { build_collator_service( service::kusama_new_light(config, Some((key.public(), para_id)))?, para_id, key, build_parachain_context, ) }), (true, _) => sc_cli::run_service_until_exit(config, |config| { build_collator_service( service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?, para_id, key, build_parachain_context, ) }), (false, Roles::LIGHT) => sc_cli::run_service_until_exit(config, |config| { build_collator_service( service::polkadot_new_light(config, Some((key.public(), para_id)))?, para_id, key, build_parachain_context, ) }), (false, _) => sc_cli::run_service_until_exit(config, |config| { build_collator_service( service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?, para_id, key, build_parachain_context, ) }), } } #[cfg(test)] mod tests { use super::*; #[derive(Clone)] struct DummyParachainContext; impl ParachainContext for DummyParachainContext { type ProduceCandidate = future::Ready>; fn produce_candidate( &mut self, _relay_parent: Hash, _local_validation: LocalValidationData, ) -> Self::ProduceCandidate { // send messages right back. future::ok(( BlockData(vec![1, 2, 3, 4, 5,]), HeadData(vec![9, 9, 9]), )) } } }