diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 490b17ba05ebef45ec7031a5c9eb14397aec0ded..de0a36e096f2696c3ed4792662dbee3f13dcafbb 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -246,7 +246,7 @@ impl<P, E> IntoExit for CollationNode<P, E> where impl<P, E> Worker for CollationNode<P, E> where P: ParachainContext + Send + 'static, - E: Future<Item=(),Error=()> + Send + 'static + E: Future<Item=(),Error=()> + Clone + Send + 'static { type Work = Box<Future<Item=(),Error=()> + Send>; @@ -267,6 +267,7 @@ impl<P, E> Worker for CollationNode<P, E> where let client = service.client(); let network = service.network(); + let inner_exit = exit.clone(); let work = client.import_notification_stream() .for_each(move |notification| { macro_rules! try_fr { @@ -325,7 +326,7 @@ impl<P, E> Worker for CollationNode<P, E> where } }); - tokio::spawn(silenced); + tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(()))); Ok(()) }); diff --git a/polkadot/consensus/src/attestation_service.rs b/polkadot/consensus/src/attestation_service.rs index afc47a92ae21b5bdf186210d5da3a5c62745173e..fc87f83d9e7e8e09d0d77ec033b1a4657f2f3a04 100644 --- a/polkadot/consensus/src/attestation_service.rs +++ b/polkadot/consensus/src/attestation_service.rs @@ -137,28 +137,31 @@ pub(crate) fn start<C, N, P>( let consensus = parachain_consensus.clone(); let key = key.clone(); - client.import_notification_stream().for_each(move |notification| { - let parent_hash = notification.hash; - if notification.is_new_best { - let res = client - .runtime_api() - .authorities(&BlockId::hash(parent_hash)) - .map_err(Into::into) - .and_then(|authorities| { - consensus.get_or_instantiate( - parent_hash, - &authorities, - key.clone(), - ) - }); - - if let Err(e) = res { - warn!("Unable to start parachain consensus on top of {:?}: {}", - parent_hash, e); + client.import_notification_stream() + .for_each(move |notification| { + let parent_hash = notification.hash; + if notification.is_new_best { + let res = client + .runtime_api() + .authorities(&BlockId::hash(parent_hash)) + .map_err(Into::into) + .and_then(|authorities| { + consensus.get_or_instantiate( + parent_hash, + &authorities, + key.clone(), + ) + }); + + if let Err(e) = res { + warn!("Unable to start parachain consensus on top of {:?}: {}", + parent_hash, e); + } } - } - Ok(()) - }) + Ok(()) + }) + .select(exit.clone()) + .then(|_| Ok(())) }; let prune_old_sessions = { @@ -180,6 +183,8 @@ pub(crate) fn start<C, N, P>( } }) .map_err(|e| warn!("Timer error {:?}", e)) + .select(exit.clone()) + .then(|_| Ok(())) }; runtime.spawn(notifications); diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index cd83f96250f3c65623cc89b2ec6997469d61d9a4..04263afed0f3e6051a33951a0e5ecf6860acb00c 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -41,14 +41,17 @@ use router::Router; // task that processes all gossipped consensus messages, // checking signatures -struct MessageProcessTask<P> { +struct MessageProcessTask<P, E> { inner_stream: mpsc::UnboundedReceiver<ConsensusMessage>, parent_hash: Hash, table_router: Router<P>, + exit: E, } -impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P> - where P::Api: ParachainHost<Block>, +impl<P, E> MessageProcessTask<P, E> where + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost<Block>, + E: Future<Item=(),Error=()> + Clone + Send + 'static, { fn process_message(&self, msg: ConsensusMessage) -> Option<Async<()>> { use polkadot_consensus::SignedStatement; @@ -61,7 +64,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P> statement.sender, &self.parent_hash ) { - self.table_router.import_statement(statement); + self.table_router.import_statement(statement, self.exit.clone()); } } @@ -69,8 +72,10 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P> } } -impl<P: ProvideRuntimeApi + Send + Sync + 'static> Future for MessageProcessTask<P> - where P::Api: ParachainHost<Block>, +impl<P, E> Future for MessageProcessTask<P, E> where + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost<Block>, + E: Future<Item=(),Error=()> + Clone + Send + 'static, { type Item = (); type Error = (); @@ -90,30 +95,34 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Future for MessageProcessTask } /// Wrapper around the network service -pub struct ConsensusNetwork<P> { +pub struct ConsensusNetwork<P, E> { network: Arc<NetworkService>, api: Arc<P>, + exit: E, } -impl<P> ConsensusNetwork<P> { +impl<P, E> ConsensusNetwork<P, E> { /// Create a new consensus networking object. - pub fn new(network: Arc<NetworkService>, api: Arc<P>) -> Self { - ConsensusNetwork { network, api } + pub fn new(network: Arc<NetworkService>, exit: E, api: Arc<P>) -> Self { + ConsensusNetwork { network, exit, api } } } -impl<P> Clone for ConsensusNetwork<P> { +impl<P, E: Clone> Clone for ConsensusNetwork<P, E> { fn clone(&self) -> Self { ConsensusNetwork { network: self.network.clone(), + exit: self.exit.clone(), api: self.api.clone(), } } } /// A long-lived network which can create parachain statement routing processes on demand. -impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork<P> - where P::Api: ParachainHost<Block>, +impl<P, E> Network for ConsensusNetwork<P,E> where + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost<Block>, + E: Clone + Future<Item=(),Error=()> + Send + 'static, { type TableRouter = Router<P>; @@ -122,7 +131,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork< &self, _validators: &[SessionKey], table: Arc<SharedTable>, - task_executor: TaskExecutor + task_executor: TaskExecutor, ) -> Self::TableRouter { let parent_hash = table.consensus_parent_hash().clone(); @@ -139,22 +148,28 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork< ); let attestation_topic = table_router.gossip_topic(); + let exit = self.exit.clone(); // spin up a task in the background that processes all incoming statements // TODO: propagate statements on a timer? let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic); - task_executor.spawn(self.network.with_spec(|spec, ctx| { - spec.new_consensus(ctx, parent_hash, CurrentConsensus { - knowledge, - local_session_key, - }); - - MessageProcessTask { - inner_stream, - parent_hash, - table_router: table_router.clone(), - } - })); + let process_task = self.network + .with_spec(|spec, ctx| { + spec.new_consensus(ctx, parent_hash, CurrentConsensus { + knowledge, + local_session_key, + }); + + MessageProcessTask { + inner_stream, + parent_hash, + table_router: table_router.clone(), + exit, + } + }) + .then(|_| Ok(())); + + task_executor.spawn(process_task); table_router } @@ -176,7 +191,7 @@ impl Future for AwaitingCollation { } } -impl<P: ProvideRuntimeApi + Send + Sync + 'static> Collators for ConsensusNetwork<P> +impl<P: ProvideRuntimeApi + Send + Sync + 'static, E: Clone> Collators for ConsensusNetwork<P, E> where P::Api: ParachainHost<Block>, { type Error = NetworkDown; diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs index e2828865affe44e4651e9df08c767397922d717a..205aff1e65e615f8156765338c462e2fa1d44f47 100644 --- a/polkadot/network/src/router.rs +++ b/polkadot/network/src/router.rs @@ -104,7 +104,9 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P> where P::Api: ParachainHost<Block> { /// Import a statement whose signature has been checked already. - pub(crate) fn import_statement(&self, statement: SignedStatement) { + pub(crate) fn import_statement<Exit>(&self, statement: SignedStatement, exit: Exit) + where Exit: Future<Item=(),Error=()> + Clone + Send + 'static + { trace!(target: "p_net", "importing consensus statement {:?}", statement.statement); // defer any statements for which we haven't imported the candidate yet @@ -143,14 +145,16 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P> for (producer, statement) in producers.into_iter().zip(statements) { self.knowledge.lock().note_statement(statement.sender, &statement.statement); - if let Some(producer) = producer { + if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { trace!(target: "consensus", "driving statement work to completion"); - self.dispatch_work(c_hash, producer); + self.task_executor.spawn(work.select(exit.clone()).then(|_| Ok(()))); } } } - fn dispatch_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>) where + fn create_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>) + -> impl Future<Item=(),Error=()> + where D: Future<Item=BlockData,Error=io::Error> + Send + 'static, E: Future<Item=Extrinsic,Error=io::Error> + Send + 'static, { @@ -173,13 +177,13 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P> let knowledge = self.knowledge.clone(); let attestation_topic = self.attestation_topic.clone(); - let work = producer.prime(validate) + producer.prime(validate) .map(move |produced| { // store the data before broadcasting statements, so other peers can fetch. knowledge.lock().note_candidate( candidate_hash, produced.block_data, - produced.extrinsic + produced.extrinsic, ); if produced.validity.is_none() && produced.availability.is_none() { @@ -204,9 +208,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P> ); } }) - .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)); - - self.task_executor.spawn(work); + .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)) } } diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 05427b752cb65f4752a9ee183e5ba449b631249c..82c641b26bc282098510e00739589986e1cdc3ef 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -205,7 +205,11 @@ construct_service_factory! { let client = service.client(); // collator connections and consensus network both fulfilled by this - let consensus_network = ConsensusNetwork::new(service.network(), service.client()); + let consensus_network = ConsensusNetwork::new( + service.network(), + service.on_exit(), + service.client(), + ); let proposer_factory = ::consensus::ProposerFactory::new( client.clone(), consensus_network.clone(),