Unverified Commit 7c90366c authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

ensure all spawned futures are exit-guarded (#59)

parent d5bef61e
Pipeline #27561 passed with stages
in 14 minutes and 56 seconds
......@@ -246,7 +246,7 @@ impl<P, E> IntoExit for CollationNode<P, E> where
impl<P, E> Worker for CollationNode<P, E> where
P: ParachainContext + Send + 'static,
E: Future<Item=(),Error=()> + Send + 'static
E: Future<Item=(),Error=()> + Clone + Send + 'static
{
type Work = Box<Future<Item=(),Error=()> + Send>;
......@@ -267,6 +267,7 @@ impl<P, E> Worker for CollationNode<P, E> where
let client = service.client();
let network = service.network();
let inner_exit = exit.clone();
let work = client.import_notification_stream()
.for_each(move |notification| {
macro_rules! try_fr {
......@@ -325,7 +326,7 @@ impl<P, E> Worker for CollationNode<P, E> where
}
});
tokio::spawn(silenced);
tokio::spawn(silenced.select(inner_exit.clone()).then(|_| Ok(())));
Ok(())
});
......
......@@ -137,28 +137,31 @@ pub(crate) fn start<C, N, P>(
let consensus = parachain_consensus.clone();
let key = key.clone();
client.import_notification_stream().for_each(move |notification| {
let parent_hash = notification.hash;
if notification.is_new_best {
let res = client
.runtime_api()
.authorities(&BlockId::hash(parent_hash))
.map_err(Into::into)
.and_then(|authorities| {
consensus.get_or_instantiate(
parent_hash,
&authorities,
key.clone(),
)
});
if let Err(e) = res {
warn!("Unable to start parachain consensus on top of {:?}: {}",
parent_hash, e);
client.import_notification_stream()
.for_each(move |notification| {
let parent_hash = notification.hash;
if notification.is_new_best {
let res = client
.runtime_api()
.authorities(&BlockId::hash(parent_hash))
.map_err(Into::into)
.and_then(|authorities| {
consensus.get_or_instantiate(
parent_hash,
&authorities,
key.clone(),
)
});
if let Err(e) = res {
warn!("Unable to start parachain consensus on top of {:?}: {}",
parent_hash, e);
}
}
}
Ok(())
})
Ok(())
})
.select(exit.clone())
.then(|_| Ok(()))
};
let prune_old_sessions = {
......@@ -180,6 +183,8 @@ pub(crate) fn start<C, N, P>(
}
})
.map_err(|e| warn!("Timer error {:?}", e))
.select(exit.clone())
.then(|_| Ok(()))
};
runtime.spawn(notifications);
......
......@@ -41,14 +41,17 @@ use router::Router;
// task that processes all gossipped consensus messages,
// checking signatures
struct MessageProcessTask<P> {
struct MessageProcessTask<P, E> {
inner_stream: mpsc::UnboundedReceiver<ConsensusMessage>,
parent_hash: Hash,
table_router: Router<P>,
exit: E,
}
impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P>
where P::Api: ParachainHost<Block>,
impl<P, E> MessageProcessTask<P, E> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
{
fn process_message(&self, msg: ConsensusMessage) -> Option<Async<()>> {
use polkadot_consensus::SignedStatement;
......@@ -61,7 +64,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P>
statement.sender,
&self.parent_hash
) {
self.table_router.import_statement(statement);
self.table_router.import_statement(statement, self.exit.clone());
}
}
......@@ -69,8 +72,10 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> MessageProcessTask<P>
}
}
impl<P: ProvideRuntimeApi + Send + Sync + 'static> Future for MessageProcessTask<P>
where P::Api: ParachainHost<Block>,
impl<P, E> Future for MessageProcessTask<P, E> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
{
type Item = ();
type Error = ();
......@@ -90,30 +95,34 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Future for MessageProcessTask
}
/// Wrapper around the network service
pub struct ConsensusNetwork<P> {
pub struct ConsensusNetwork<P, E> {
network: Arc<NetworkService>,
api: Arc<P>,
exit: E,
}
impl<P> ConsensusNetwork<P> {
impl<P, E> ConsensusNetwork<P, E> {
/// Create a new consensus networking object.
pub fn new(network: Arc<NetworkService>, api: Arc<P>) -> Self {
ConsensusNetwork { network, api }
pub fn new(network: Arc<NetworkService>, exit: E, api: Arc<P>) -> Self {
ConsensusNetwork { network, exit, api }
}
}
impl<P> Clone for ConsensusNetwork<P> {
impl<P, E: Clone> Clone for ConsensusNetwork<P, E> {
fn clone(&self) -> Self {
ConsensusNetwork {
network: self.network.clone(),
exit: self.exit.clone(),
api: self.api.clone(),
}
}
}
/// A long-lived network which can create parachain statement routing processes on demand.
impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork<P>
where P::Api: ParachainHost<Block>,
impl<P, E> Network for ConsensusNetwork<P,E> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
E: Clone + Future<Item=(),Error=()> + Send + 'static,
{
type TableRouter = Router<P>;
......@@ -122,7 +131,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork<
&self,
_validators: &[SessionKey],
table: Arc<SharedTable>,
task_executor: TaskExecutor
task_executor: TaskExecutor,
) -> Self::TableRouter {
let parent_hash = table.consensus_parent_hash().clone();
......@@ -139,22 +148,28 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork<
);
let attestation_topic = table_router.gossip_topic();
let exit = self.exit.clone();
// spin up a task in the background that processes all incoming statements
// TODO: propagate statements on a timer?
let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic);
task_executor.spawn(self.network.with_spec(|spec, ctx| {
spec.new_consensus(ctx, parent_hash, CurrentConsensus {
knowledge,
local_session_key,
});
MessageProcessTask {
inner_stream,
parent_hash,
table_router: table_router.clone(),
}
}));
let process_task = self.network
.with_spec(|spec, ctx| {
spec.new_consensus(ctx, parent_hash, CurrentConsensus {
knowledge,
local_session_key,
});
MessageProcessTask {
inner_stream,
parent_hash,
table_router: table_router.clone(),
exit,
}
})
.then(|_| Ok(()));
task_executor.spawn(process_task);
table_router
}
......@@ -176,7 +191,7 @@ impl Future for AwaitingCollation {
}
}
impl<P: ProvideRuntimeApi + Send + Sync + 'static> Collators for ConsensusNetwork<P>
impl<P: ProvideRuntimeApi + Send + Sync + 'static, E: Clone> Collators for ConsensusNetwork<P, E>
where P::Api: ParachainHost<Block>,
{
type Error = NetworkDown;
......
......@@ -104,7 +104,9 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
where P::Api: ParachainHost<Block>
{
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) {
pub(crate) fn import_statement<Exit>(&self, statement: SignedStatement, exit: Exit)
where Exit: Future<Item=(),Error=()> + Clone + Send + 'static
{
trace!(target: "p_net", "importing consensus statement {:?}", statement.statement);
// defer any statements for which we haven't imported the candidate yet
......@@ -143,14 +145,16 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
for (producer, statement) in producers.into_iter().zip(statements) {
self.knowledge.lock().note_statement(statement.sender, &statement.statement);
if let Some(producer) = producer {
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "consensus", "driving statement work to completion");
self.dispatch_work(c_hash, producer);
self.task_executor.spawn(work.select(exit.clone()).then(|_| Ok(())));
}
}
}
fn dispatch_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>) where
fn create_work<D, E>(&self, candidate_hash: Hash, producer: StatementProducer<D, E>)
-> impl Future<Item=(),Error=()>
where
D: Future<Item=BlockData,Error=io::Error> + Send + 'static,
E: Future<Item=Extrinsic,Error=io::Error> + Send + 'static,
{
......@@ -173,13 +177,13 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
let knowledge = self.knowledge.clone();
let attestation_topic = self.attestation_topic.clone();
let work = producer.prime(validate)
producer.prime(validate)
.map(move |produced| {
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
produced.block_data,
produced.extrinsic
produced.extrinsic,
);
if produced.validity.is_none() && produced.availability.is_none() {
......@@ -204,9 +208,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
);
}
})
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e));
self.task_executor.spawn(work);
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e))
}
}
......
......@@ -205,7 +205,11 @@ construct_service_factory! {
let client = service.client();
// collator connections and consensus network both fulfilled by this
let consensus_network = ConsensusNetwork::new(service.network(), service.client());
let consensus_network = ConsensusNetwork::new(
service.network(),
service.on_exit(),
service.client(),
);
let proposer_factory = ::consensus::ProposerFactory::new(
client.clone(),
consensus_network.clone(),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment