validation.rs 29.4 KiB
Newer Older
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! The "validation session" networking code built on top of the base network service.
//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
//! each time a validation session begins on a new chain head.
use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT};
use substrate_network::Context as NetContext;
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{
	Id as ParaId, Collation, Extrinsic, ParachainHost, Message, CandidateReceipt,
	CollatorId, ValidatorId, PoVBlock,
};
use codec::{Encode, Decode};
use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor};
use futures::sync::oneshot::{self, Receiver};
use std::collections::hash_map::{HashMap, Entry};
use std::io;
use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;

use router::Router;
use gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData};
use super::PolkadotProtocol;

pub use polkadot_validation::Incoming;

/// An executor suitable for dispatching async consensus tasks.
pub trait Executor {
	fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F);
}

/// A wrapped futures::future::Executor.
pub struct WrappedExecutor<T>(pub T);

impl<T> Executor for WrappedExecutor<T>
	where T: FutureExecutor<Box<Future<Item=(),Error=()> + Send + 'static>>
{
	fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F) {
		if let Err(e) = self.0.execute(Box::new(f)) {
			warn!(target: "validation", "could not spawn consensus task: {:?}", e);
		}
	}
}

impl Executor for TaskExecutor {
	fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F) {
		TaskExecutor::spawn(self, f)
	}
}

/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
	/// Get a stream of gossip messages for a given hash.
	fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>>;

	/// Gossip a message on given topic.
	fn gossip_message(&self, topic: Hash, message: Vec<u8>);

	/// Drop a gossip topic.
	fn drop_gossip(&self, topic: Hash);

	/// Execute a closure with the polkadot protocol.
	fn with_spec<F: Send + 'static>(&self, with: F)
		where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>);
}

impl NetworkService for super::NetworkService {
	fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
		let (tx, rx) = std::sync::mpsc::channel();

		self.with_gossip(move |gossip, _| {
			let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
			let _ = tx.send(inner_rx);
		});

		match rx.recv() {
			Ok(rx) => rx,
			Err(_) => mpsc::unbounded().1, // return empty channel.
		}
	}

	fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
		self.gossip_consensus_message(topic, POLKADOT_ENGINE_ID, message, false);
	fn drop_gossip(&self, _topic: Hash) { }

	fn with_spec<F: Send + 'static>(&self, with: F)
		where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
	{
		super::NetworkService::with_spec(self, with)
	}
}
/// Params to a current validation session.
pub struct SessionParams {
	/// The local session key.
	pub local_session_key: Option<SessionKey>,
	/// The parent hash.
	pub parent_hash: Hash,
	/// The authorities.
	pub authorities: Vec<SessionKey>,
pub struct ValidationNetwork<P, E, N, T> {
	network: Arc<N>,
	executor: T,
	message_validator: RegisteredMessageValidator,
impl<P, E, N, T> ValidationNetwork<P, E, N, T> {
	/// Create a new consensus networking object.
	pub fn new(
		network: Arc<N>,
		exit: E,
		message_validator: RegisteredMessageValidator,
		api: Arc<P>,
		executor: T,
	) -> Self {
		ValidationNetwork { network, exit, message_validator, api, executor }
impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
			exit: self.exit.clone(),
			executor: self.executor.clone(),
			message_validator: self.message_validator.clone(),
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
	P: ProvideRuntimeApi + Send + Sync + 'static,
	P::Api: ParachainHost<Block>,
	E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static,
	N: NetworkService,
	T: Clone + Executor + Send + Sync + 'static,
{
	/// Instantiate session data fetcher at a parent hash.
	///
	/// If the used session key is new, it will be broadcast to peers.
	/// If a validation session was already instantiated at this parent hash,
	/// the underlying instance will be shared.
	///
	/// If there was already a validation session instantiated and a different
	/// session key was set, then the new key will be ignored.
	///
	/// This implies that there can be multiple services intantiating validation
	/// session instances safely, but they should all be coordinated on which session keys
	/// are being used.
	pub fn instantiate_session(&self, params: SessionParams)
		-> oneshot::Receiver<SessionDataFetcher<P, E, N, T>>
	{
		let parent_hash = params.parent_hash;
		let network = self.network.clone();
		let api = self.api.clone();
		let task_executor = self.executor.clone();
		let exit = self.exit.clone();
		let message_validator = self.message_validator.clone();

		let (tx, rx) = oneshot::channel();
		self.network.with_spec(move |spec, ctx| {
			// before requesting messages, note live consensus session.
			message_validator.note_session(
				parent_hash,
				MessageValidationData { authorities: params.authorities.clone() },
			);

Loading full blame...