From c86a774b9d3508fe455c3317b6cac9797f61660d Mon Sep 17 00:00:00 2001
From: Robert Klotzner <eskimor@users.noreply.github.com>
Date: Tue, 27 Apr 2021 21:47:32 +0200
Subject: [PATCH] Send statements to own backing group first (#2927)

* Factor out runtime module into utils.

* First fatal error design.

* Better error handling infra.

* Error handling cleanup.

* Send to peers of our group first.

* Finish backing group prioritization.

* Little cleanup.

* More cleanup.

* Forgot to checkin error.rs.

* Notes.

* Runtime -> RuntimeInfo

* qed in debug assert.

* PolkaErr -> Fault.
---
 polkadot/Cargo.lock                           |   1 +
 .../availability-distribution/src/error.rs    | 112 +--
 .../availability-distribution/src/lib.rs      |  18 +-
 .../src/pov_requester/mod.rs                  |  43 +-
 .../src/requester/fetch_task/mod.rs           |   4 +-
 .../src/responder.rs                          |  16 +-
 .../src/session_cache.rs                      |   8 +-
 .../network/statement-distribution/Cargo.toml |   2 +
 .../statement-distribution/src/error.rs       | 122 +++
 .../network/statement-distribution/src/lib.rs | 708 +++++++++++++-----
 .../statement-distribution/src/requester.rs   |   2 +-
 polkadot/node/service/src/lib.rs              |   1 +
 .../node/subsystem-util/src/error_handling.rs | 201 +++++
 polkadot/node/subsystem-util/src/lib.rs       |   5 +
 .../node/subsystem-util/src/runtime/error.rs  |  36 +-
 .../node/subsystem-util/src/runtime/mod.rs    |  10 +-
 polkadot/node/subsystem/src/messages.rs       |   4 +
 17 files changed, 1022 insertions(+), 271 deletions(-)
 create mode 100644 polkadot/node/network/statement-distribution/src/error.rs
 create mode 100644 polkadot/node/subsystem-util/src/error_handling.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index c80aaee005b..536f131b6bb 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -6568,6 +6568,7 @@ dependencies = [
  "sp-keystore",
  "sp-staking",
  "sp-tracing",
+ "thiserror",
  "tracing",
 ]
 
diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs
index c15669f406f..6b6f62ae988 100644
--- a/polkadot/node/network/availability-distribution/src/error.rs
+++ b/polkadot/node/network/availability-distribution/src/error.rs
@@ -23,30 +23,67 @@ use thiserror::Error;
 
 use futures::channel::oneshot;
 
-use polkadot_node_subsystem_util::{
-	runtime,
-	Error as UtilError,
-};
+use polkadot_node_subsystem_util::{Fault, Error as UtilError, runtime, unwrap_non_fatal};
 use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
 
 use crate::LOG_TARGET;
 
-/// Errors of this subsystem.
 #[derive(Debug, Error)]
-pub enum Error {
-	#[error("Response channel to obtain chunk failed")]
-	QueryChunkResponseChannel(#[source] oneshot::Canceled),
+#[error(transparent)]
+pub struct Error(pub Fault<NonFatal, Fatal>);
 
-	#[error("Response channel to obtain available data failed")]
-	QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),
+impl From<NonFatal> for Error {
+	fn from(e: NonFatal) -> Self {
+		Self(Fault::from_non_fatal(e))
+	}
+}
 
-	#[error("Receive channel closed")]
-	IncomingMessageChannel(#[source] SubsystemError),
+impl From<Fatal> for Error {
+	fn from(f: Fatal) -> Self {
+		Self(Fault::from_fatal(f))
+	}
+}
+
+impl From<runtime::Error> for Error {
+	fn from(o: runtime::Error) -> Self {
+		Self(Fault::from_other(o))
+	}
+}
 
+/// Fatal errors of this subsystem.
+#[derive(Debug, Error)]
+pub enum Fatal {
 	/// Spawning a running task failed.
 	#[error("Spawning subsystem task failed")]
 	SpawnTask(#[source] SubsystemError),
 
+	/// Runtime API subsystem is down, which means we're shutting down.
+	#[error("Runtime request canceled")]
+	RuntimeRequestCanceled(oneshot::Canceled),
+
+	/// Requester stream exhausted.
+	#[error("Erasure chunk requester stream exhausted")]
+	RequesterExhausted,
+
+	#[error("Receive channel closed")]
+	IncomingMessageChannel(#[source] SubsystemError),
+
+	/// Errors coming from runtime::Runtime.
+	#[error("Error while accessing runtime information")]
+	Runtime(#[from] #[source] runtime::Fatal),
+}
+
+/// Non fatal errors of this subsystem.
+#[derive(Debug, Error)]
+pub enum NonFatal {
+	/// av-store will drop the sender on any error that happens.
+	#[error("Response channel to obtain chunk failed")]
+	QueryChunkResponseChannel(#[source] oneshot::Canceled),
+
+	/// av-store will drop the sender on any error that happens.
+	#[error("Response channel to obtain available data failed")]
+	QueryAvailableDataResponseChannel(#[source] oneshot::Canceled),
+
 	/// We tried accessing a session that was not cached.
 	#[error("Session is not cached.")]
 	NoSuchCachedSession,
@@ -55,11 +92,7 @@ pub enum Error {
 	#[error("Not a validator.")]
 	NotAValidator,
 
-	/// Requester stream exhausted.
-	#[error("Erasure chunk requester stream exhausted")]
-	RequesterExhausted,
-
-	/// Sending response failed.
+	/// Sending request response failed (Can happen on timeouts for example).
 	#[error("Sending a request's response failed.")]
 	SendResponse,
 
@@ -68,10 +101,6 @@ pub enum Error {
 	#[error("Utility request failed")]
 	UtilRequest(UtilError),
 
-	/// Runtime API subsystem is down, which means we're shutting down.
-	#[error("Runtime request canceled")]
-	RuntimeRequestCanceled(oneshot::Canceled),
-
 	/// Some request to the runtime failed.
 	/// For example if we prune a block we're requesting info about.
 	#[error("Runtime API error")]
@@ -98,39 +127,30 @@ pub enum Error {
 
 	/// Errors coming from runtime::Runtime.
 	#[error("Error while accessing runtime information")]
-	Runtime(#[source] runtime::Error),
+	Runtime(#[from] #[source] runtime::NonFatal),
 }
 
 pub type Result<T> = std::result::Result<T, Error>;
 
-impl From<runtime::Error> for Error {
-	fn from(err: runtime::Error) -> Self {
-		Self::Runtime(err)
-	}
-}
-
-impl From<SubsystemError> for Error {
-	fn from(err: SubsystemError) -> Self {
-		Self::IncomingMessageChannel(err)
-	}
-}
-
-/// Receive a response from a runtime request and convert errors.
-pub(crate) async fn recv_runtime<V>(
-	r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
-) -> std::result::Result<V, Error> {
-	r.await
-		.map_err(Error::RuntimeRequestCanceled)?
-		.map_err(Error::RuntimeRequest)
-}
-
-
 /// Utility for eating top level errors and log them.
 ///
 /// We basically always want to try and continue on error. This utility function is meant to
 /// consume top-level errors by simply logging them
-pub fn log_error(result: Result<()>, ctx: &'static str) {
-	if let Err(error) = result {
+pub fn log_error(result: Result<()>, ctx: &'static str)
+	-> std::result::Result<(), Fatal>
+{
+	if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
 		tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
 	}
+	Ok(())
+}
+
+/// Receive a response from a runtime request and convert errors.
+pub(crate) async fn recv_runtime<V>(
+	r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
+) -> Result<V> {
+	let result = r.await
+		.map_err(Fatal::RuntimeRequestCanceled)?
+		.map_err(NonFatal::RuntimeRequest)?;
+	Ok(result)
 }
diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs
index 7c522859a07..f0c80eb2b2d 100644
--- a/polkadot/node/network/availability-distribution/src/lib.rs
+++ b/polkadot/node/network/availability-distribution/src/lib.rs
@@ -25,10 +25,10 @@ use polkadot_subsystem::{
 
 /// Error and [`Result`] type for this subsystem.
 mod error;
-pub use error::Error;
+pub use error::{Fatal, NonFatal};
 use error::{Result, log_error};
 
-use polkadot_node_subsystem_util::runtime::Runtime;
+use polkadot_node_subsystem_util::runtime::RuntimeInfo;
 
 /// `Requester` taking care of requesting chunks for candidates pending availability.
 mod requester;
@@ -59,7 +59,7 @@ pub struct AvailabilityDistributionSubsystem {
 	/// Pointer to a keystore, which is required for determining this nodes validator index.
 	keystore: SyncCryptoStorePtr,
 	/// Easy and efficient runtime access for this subsystem.
-	runtime: Runtime,
+	runtime: RuntimeInfo,
 	/// Prometheus metrics.
 	metrics: Metrics,
 }
@@ -85,12 +85,12 @@ impl AvailabilityDistributionSubsystem {
 
 	/// Create a new instance of the availability distribution.
 	pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
-		let runtime = Runtime::new(keystore.clone());
+		let runtime = RuntimeInfo::new(keystore.clone());
 		Self { keystore, runtime,  metrics }
 	}
 
 	/// Start processing work as passed on from the Overseer.
-	async fn run<Context>(mut self, mut ctx: Context) -> Result<()>
+	async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal>
 	where
 		Context: SubsystemContext<Message = AvailabilityDistributionMessage> + Sync + Send,
 	{
@@ -108,10 +108,10 @@ impl AvailabilityDistributionSubsystem {
 			// Handle task messages sending:
 			let message = match action {
 				Either::Left(subsystem_msg) => {
-					subsystem_msg.map_err(|e| Error::IncomingMessageChannel(e))?
+					subsystem_msg.map_err(|e| Fatal::IncomingMessageChannel(e))?
 				}
 				Either::Right(from_task) => {
-					let from_task = from_task.ok_or(Error::RequesterExhausted)?;
+					let from_task = from_task.ok_or(Fatal::RequesterExhausted)?;
 					ctx.send_message(from_task).await;
 					continue;
 				}
@@ -133,7 +133,7 @@ impl AvailabilityDistributionSubsystem {
 					log_error(
 						requester.get_mut().update_fetching_heads(&mut ctx, update).await,
 						"Error in Requester::update_fetching_heads"
-					);
+					)?;
 				}
 				FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
 				FromOverseer::Signal(OverseerSignal::Conclude) => {
@@ -169,7 +169,7 @@ impl AvailabilityDistributionSubsystem {
 							tx,
 						).await,
 						"PoVRequester::fetch_pov"
-					);
+					)?;
 				}
 			}
 		}
diff --git a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs
index 7bb5f253979..e53fdd4b241 100644
--- a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs
+++ b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs
@@ -33,9 +33,10 @@ use polkadot_subsystem::{
 	ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
 	messages::{AllMessages, NetworkBridgeMessage, IfDisconnected}
 };
-use polkadot_node_subsystem_util::runtime::{Runtime, ValidatorInfo};
+use polkadot_node_subsystem_util::runtime::{RuntimeInfo, ValidatorInfo};
 
-use crate::error::{Error, log_error};
+use crate::error::{Fatal, NonFatal};
+use crate::LOG_TARGET;
 
 /// Number of sessions we want to keep in the LRU.
 const NUM_SESSIONS: usize = 2;
@@ -63,7 +64,7 @@ impl PoVRequester {
 	pub async fn update_connected_validators<Context>(
 		&mut self,
 		ctx: &mut Context,
-		runtime: &mut Runtime,
+		runtime: &mut RuntimeInfo,
 		update: &ActiveLeavesUpdate,
 	) -> super::Result<()>
 	where
@@ -87,7 +88,7 @@ impl PoVRequester {
 	pub async fn fetch_pov<Context>(
 		&self,
 		ctx: &mut Context,
-		runtime: &mut Runtime,
+		runtime: &mut RuntimeInfo,
 		parent: Hash,
 		from_validator: ValidatorIndex,
 		candidate_hash: CandidateHash,
@@ -99,7 +100,7 @@ impl PoVRequester {
 	{
 		let info = &runtime.get_session_info(ctx, parent).await?.session_info;
 		let authority_id = info.discovery_keys.get(from_validator.0 as usize)
-			.ok_or(Error::InvalidValidatorIndex)?
+			.ok_or(NonFatal::InvalidValidatorIndex)?
 			.clone();
 		let (req, pending_response) = OutgoingRequest::new(
 			Recipient::Authority(authority_id),
@@ -125,7 +126,8 @@ impl PoVRequester {
 			.with_relay_parent(parent);
 		ctx.spawn("pov-fetcher", fetch_pov_job(pov_hash, pending_response.boxed(), span, tx).boxed())
 			.await
-			.map_err(|e| Error::SpawnTask(e))
+			.map_err(|e| Fatal::SpawnTask(e))?;
+		Ok(())
 	}
 }
 
@@ -136,10 +138,13 @@ async fn fetch_pov_job(
 	span: jaeger::Span,
 	tx: oneshot::Sender<PoV>,
 ) {
-	log_error(
-		do_fetch_pov(pov_hash, pending_response, span, tx).await,
-		"fetch_pov_job",
-	)
+	if let Err(err) = do_fetch_pov(pov_hash, pending_response, span, tx).await {
+		tracing::warn!(
+			target: LOG_TARGET,
+			?err,
+			"fetch_pov_job"
+		);
+	}
 }
 
 /// Do the actual work of waiting for the response.
@@ -149,24 +154,24 @@ async fn do_fetch_pov(
 	_span: jaeger::Span,
 	tx: oneshot::Sender<PoV>,
 )
-	-> super::Result<()>
+	-> std::result::Result<(), NonFatal>
 {
-	let response = pending_response.await.map_err(Error::FetchPoV)?;
+	let response = pending_response.await.map_err(NonFatal::FetchPoV)?;
 	let pov = match response {
 		PoVFetchingResponse::PoV(pov) => pov,
 		PoVFetchingResponse::NoSuchPoV => {
-			return Err(Error::NoSuchPoV)
+			return Err(NonFatal::NoSuchPoV)
 		}
 	};
 	if pov.hash() == pov_hash {
-		tx.send(pov).map_err(|_| Error::SendResponse)
+		tx.send(pov).map_err(|_| NonFatal::SendResponse)
 	} else {
-		Err(Error::UnexpectedPoV)
+		Err(NonFatal::UnexpectedPoV)
 	}
 }
 
 /// Get the session indeces for the given relay chain parents.
-async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut Runtime, new_heads: impl Iterator<Item = &Hash>)
+async fn get_activated_sessions<Context>(ctx: &mut Context, runtime: &mut RuntimeInfo, new_heads: impl Iterator<Item = &Hash>)
 	-> super::Result<impl Iterator<Item = (Hash, SessionIndex)>>
 where
 	Context: SubsystemContext,
@@ -181,7 +186,7 @@ where
 /// Connect to validators of our validator group.
 async fn connect_to_relevant_validators<Context>(
 	ctx: &mut Context,
-	runtime: &mut Runtime,
+	runtime: &mut RuntimeInfo,
 	parent: Hash,
 	session: SessionIndex
 )
@@ -206,7 +211,7 @@ where
 /// Return: `None` if not a validator.
 async fn determine_relevant_validators<Context>(
 	ctx: &mut Context,
-	runtime: &mut Runtime,
+	runtime: &mut RuntimeInfo,
 	parent: Hash,
 	session: SessionIndex,
 )
@@ -275,7 +280,7 @@ mod tests {
 		let (mut context, mut virtual_overseer) =
 			test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
 		let keystore = make_ferdie_keystore();
-		let mut runtime = polkadot_node_subsystem_util::runtime::Runtime::new(keystore);
+		let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(keystore);
 
 		let (tx, rx) = oneshot::channel();
 		let testee = async {
diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs
index c5527eedc4b..8fe099770fb 100644
--- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs
+++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/mod.rs
@@ -34,7 +34,7 @@ use polkadot_subsystem::messages::{
 use polkadot_subsystem::{SubsystemContext, jaeger};
 
 use crate::{
-	error::{Error, Result},
+	error::{Fatal, Result},
 	session_cache::{BadValidators, SessionInfo},
 	LOG_TARGET,
 	metrics::{Metrics, SUCCEEDED, FAILED},
@@ -191,7 +191,7 @@ impl FetchTask {
 
 			ctx.spawn("chunk-fetcher", running.run(kill).boxed())
 				.await
-				.map_err(|e| Error::SpawnTask(e))?;
+				.map_err(|e| Fatal::SpawnTask(e))?;
 
 			Ok(FetchTask {
 				live_in,
diff --git a/polkadot/node/network/availability-distribution/src/responder.rs b/polkadot/node/network/availability-distribution/src/responder.rs
index da3b87a7c37..a811607d0f3 100644
--- a/polkadot/node/network/availability-distribution/src/responder.rs
+++ b/polkadot/node/network/availability-distribution/src/responder.rs
@@ -28,7 +28,7 @@ use polkadot_subsystem::{
 	SubsystemContext, jaeger,
 };
 
-use crate::error::{Error, Result};
+use crate::error::{NonFatal, Result};
 use crate::{LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED, NOT_FOUND}};
 
 /// Variant of `answer_pov_request` that does Prometheus metric and logging on errors.
@@ -107,7 +107,7 @@ where
 		}
 	};
 
-	req.send_response(response).map_err(|_| Error::SendResponse)?;
+	req.send_response(response).map_err(|_| NonFatal::SendResponse)?;
 	Ok(result)
 }
 
@@ -144,7 +144,7 @@ where
 		Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()),
 	};
 
-	req.send_response(response).map_err(|_| Error::SendResponse)?;
+	req.send_response(response).map_err(|_| NonFatal::SendResponse)?;
 	Ok(result)
 }
 
@@ -164,7 +164,7 @@ where
 	))
 	.await;
 
-	rx.await.map_err(|e| {
+	let result = rx.await.map_err(|e| {
 		tracing::trace!(
 			target: LOG_TARGET,
 			?validator_index,
@@ -172,8 +172,9 @@ where
 			error = ?e,
 			"Error retrieving chunk",
 		);
-		Error::QueryChunkResponseChannel(e)
-	})
+		NonFatal::QueryChunkResponseChannel(e)
+	})?;
+	Ok(result)
 }
 
 /// Query PoV from the availability store.
@@ -191,5 +192,6 @@ where
 	))
 	.await;
 
-	rx.await.map_err(|e| Error::QueryAvailableDataResponseChannel(e))
+	let result = rx.await.map_err(|e| NonFatal::QueryAvailableDataResponseChannel(e))?;
+	Ok(result)
 }
diff --git a/polkadot/node/network/availability-distribution/src/session_cache.rs b/polkadot/node/network/availability-distribution/src/session_cache.rs
index 0b2b519bb64..bbd40e2a5db 100644
--- a/polkadot/node/network/availability-distribution/src/session_cache.rs
+++ b/polkadot/node/network/availability-distribution/src/session_cache.rs
@@ -33,7 +33,7 @@ use polkadot_primitives::v1::{
 use polkadot_subsystem::SubsystemContext;
 
 use super::{
-	error::{recv_runtime, Error},
+	error::{recv_runtime, Error, NonFatal},
 	LOG_TARGET,
 };
 
@@ -189,9 +189,9 @@ impl SessionCache {
 		let session = self
 			.session_info_cache
 			.get_mut(&report.session_index)
-			.ok_or(Error::NoSuchCachedSession)?
+			.ok_or(NonFatal::NoSuchCachedSession)?
 			.as_mut()
-			.ok_or(Error::NotAValidator)?;
+			.ok_or(NonFatal::NotAValidator)?;
 		let group = session
 			.validator_groups
 			.get_mut(report.group_index.0 as usize)
@@ -231,7 +231,7 @@ impl SessionCache {
 			..
 		} = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
 			.await?
-			.ok_or(Error::NoSuchSession(session_index))?;
+			.ok_or(NonFatal::NoSuchSession(session_index))?;
 
 		if let Some(our_index) = self.get_our_index(validators).await {
 			// Get our group index:
diff --git a/polkadot/node/network/statement-distribution/Cargo.toml b/polkadot/node/network/statement-distribution/Cargo.toml
index b4648e1ac2d..a958c6df62d 100644
--- a/polkadot/node/network/statement-distribution/Cargo.toml
+++ b/polkadot/node/network/statement-distribution/Cargo.toml
@@ -10,6 +10,7 @@ futures = "0.3.12"
 tracing = "0.1.25"
 polkadot-primitives = { path = "../../../primitives" }
 sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
+sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
 polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
 polkadot-node-primitives = { path = "../../primitives" }
@@ -18,6 +19,7 @@ polkadot-node-network-protocol = { path = "../../network/protocol" }
 arrayvec = "0.5.2"
 indexmap = "1.6.1"
 parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
+thiserror = "1.0.23"
 
 [dev-dependencies]
 polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
diff --git a/polkadot/node/network/statement-distribution/src/error.rs b/polkadot/node/network/statement-distribution/src/error.rs
new file mode 100644
index 00000000000..82e918dfd18
--- /dev/null
+++ b/polkadot/node/network/statement-distribution/src/error.rs
@@ -0,0 +1,122 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+//
+
+//! Error handling related code and Error/Result definitions.
+
+use polkadot_node_network_protocol::PeerId;
+use polkadot_primitives::v1::{CandidateHash, Hash};
+use polkadot_subsystem::SubsystemError;
+use thiserror::Error;
+
+use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal};
+
+use crate::LOG_TARGET;
+
+/// General result.
+pub type Result<T> = std::result::Result<T, Error>;
+/// Result for non fatal only failures.
+pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;
+/// Result for fatal only failures.
+pub type FatalResult<T> = std::result::Result<T, Fatal>;
+
+/// Errors for statement distribution.
+#[derive(Debug, Error)]
+#[error(transparent)]
+pub struct Error(pub Fault<NonFatal, Fatal>);
+
+impl From<NonFatal> for Error {
+	fn from(e: NonFatal) -> Self {
+		Self(Fault::from_non_fatal(e))
+	}
+}
+
+impl From<Fatal> for Error {
+	fn from(f: Fatal) -> Self {
+		Self(Fault::from_fatal(f))
+	}
+}
+
+impl From<runtime::Error> for Error {
+	fn from(o: runtime::Error) -> Self {
+		Self(Fault::from_other(o))
+	}
+}
+
+/// Fatal runtime errors.
+#[derive(Debug, Error)]
+pub enum Fatal {
+	/// Requester channel is never closed.
+	#[error("Requester receiver stream finished.")]
+	RequesterReceiverFinished,
+
+	/// Responder channel is never closed.
+	#[error("Responder receiver stream finished.")]
+	ResponderReceiverFinished,
+
+	/// Spawning a running task failed.
+	#[error("Spawning subsystem task failed")]
+	SpawnTask(#[source] SubsystemError),
+
+	/// Receiving subsystem message from overseer failed.
+	#[error("Receiving message from overseer failed")]
+	SubsystemReceive(#[source] SubsystemError),
+
+	/// Errors coming from runtime::Runtime.
+	#[error("Error while accessing runtime information")]
+	Runtime(#[from] #[source] runtime::Fatal),
+}
+
+/// Errors for fetching of runtime information.
+#[derive(Debug, Error)]
+pub enum NonFatal {
+	/// Errors coming from runtime::Runtime.
+	#[error("Error while accessing runtime information")]
+	Runtime(#[from] #[source] runtime::NonFatal),
+
+	/// Relay parent was not present in active heads.
+	#[error("Relay parent could not be found in active heads")]
+	NoSuchHead(Hash),
+
+	/// Peer requested statement data for candidate that was never announced to it.
+	#[error("Peer requested data for candidate it never received a notification for")]
+	RequestedUnannouncedCandidate(PeerId, CandidateHash),
+
+	/// A large statement status was requested, which could not be found.
+	#[error("Statement status does not exist")]
+	NoSuchLargeStatementStatus(Hash, CandidateHash),
+
+	/// A fetched large statement was requested, but could not be found.
+	#[error("Fetched large statement does not exist")]
+	NoSuchFetchedLargeStatement(Hash, CandidateHash),
+
+	/// Responder no longer waits for our data. (Should not happen right now.)
+	#[error("Oneshot `GetData` channel closed")]
+	ResponderGetDataCanceled,
+}
+
+/// Utility for eating top level errors and log them.
+///
+/// We basically always want to try and continue on error. This utility function is meant to
+/// consume top-level errors by simply logging them.
+pub fn log_error(result: Result<()>, ctx: &'static str)
+	-> FatalResult<()>
+{
+	if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
+		tracing::debug!(target: LOG_TARGET, error = ?error, ctx)
+	}
+	Ok(())
+}
diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs
index 97d527ebbb7..b7ddb74dff3 100644
--- a/polkadot/node/network/statement-distribution/src/lib.rs
+++ b/polkadot/node/network/statement-distribution/src/lib.rs
@@ -22,14 +22,15 @@
 #![deny(unused_crate_dependencies)]
 #![warn(missing_docs)]
 
+use error::{FatalResult, NonFatalResult, log_error};
 use parity_scale_codec::Encode;
 
 use polkadot_subsystem::{
 	ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan, SpawnedSubsystem, Subsystem,
-	SubsystemContext, SubsystemError, SubsystemResult, jaeger,
+	SubsystemContext, SubsystemError, jaeger,
 	messages::{
 		AllMessages, NetworkBridgeMessage, StatementDistributionMessage,
-		CandidateBackingMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent,
+		CandidateBackingMessage, NetworkBridgeEvent,
 	},
 };
 use polkadot_node_subsystem_util::{
@@ -39,7 +40,7 @@ use polkadot_node_subsystem_util::{
 use polkadot_node_primitives::{SignedFullStatement, Statement};
 use polkadot_primitives::v1::{
 	CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash,
-	SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature
+	SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, AuthorityDiscoveryId,
 };
 use polkadot_node_network_protocol::{
 	IfDisconnected, PeerId, UnifiedReputationChange as Rep, View,
@@ -54,9 +55,14 @@ use polkadot_node_network_protocol::{
 use futures::{channel::mpsc, future::RemoteHandle, prelude::*};
 use futures::channel::oneshot;
 use indexmap::{IndexSet, IndexMap, map::Entry as IEntry};
+use sp_keystore::SyncCryptoStorePtr;
+use util::{Fault, runtime::RuntimeInfo};
 
 use std::collections::{HashMap, HashSet, hash_map::Entry};
 
+mod error;
+pub use error::{Error, NonFatal, Fatal, Result};
+
 /// Background task logic for requesting of large statements.
 mod requester;
 use requester::{RequesterMessage, fetch};
@@ -89,6 +95,8 @@ const LOG_TARGET: &str = "parachain::statement-distribution";
 
 /// The statement distribution subsystem.
 pub struct StatementDistribution {
+	/// Pointer to a keystore, which is required for determining this nodes validator index.
+	keystore: SyncCryptoStorePtr,
 	// Prometheus metrics
 	metrics: Metrics,
 }
@@ -101,15 +109,19 @@ impl<C> Subsystem<C> for StatementDistribution
 		// within `run`.
 		SpawnedSubsystem {
 			name: "statement-distribution-subsystem",
-			future: self.run(ctx).boxed(),
+			future: self
+				.run(ctx)
+				.map_err(|e| SubsystemError::with_origin("statement-distribution", e))
+				.boxed(),
 		}
 	}
 }
 
 impl StatementDistribution {
 	/// Create a new Statement Distribution Subsystem
-	pub fn new(metrics: Metrics) -> StatementDistribution {
+	pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> StatementDistribution {
 		StatementDistribution {
+			keystore,
 			metrics,
 		}
 	}
@@ -262,7 +274,7 @@ impl PeerRelayParentKnowledge {
 		&mut self,
 		fingerprint: &(CompactStatement, ValidatorIndex),
 		max_message_count: usize,
-	) -> Result<bool, Rep> {
+	) -> std::result::Result<bool, Rep> {
 		// We don't check `sent_statements` because a statement could be in-flight from both
 		// sides at the same time.
 		if self.received_statements.contains(fingerprint) {
@@ -313,7 +325,7 @@ impl PeerRelayParentKnowledge {
 		&self,
 		fingerprint: &(CompactStatement, ValidatorIndex),
 		max_message_count: usize,
-	) -> Result<(), Rep> {
+	) -> std::result::Result<(), Rep> {
 		// We don't check `sent_statements` because a statement could be in-flight from both
 		// sides at the same time.
 		if self.received_statements.contains(fingerprint) {
@@ -361,6 +373,8 @@ impl PeerRelayParentKnowledge {
 struct PeerData {
 	view: View,
 	view_knowledge: HashMap<Hash, PeerRelayParentKnowledge>,
+	// Peer might be an authority.
+	maybe_authority: Option<AuthorityDiscoveryId>,
 }
 
 impl PeerData {
@@ -423,7 +437,7 @@ impl PeerData {
 		relay_parent: &Hash,
 		fingerprint: &(CompactStatement, ValidatorIndex),
 		max_message_count: usize,
-	) -> Result<bool, Rep> {
+	) -> std::result::Result<bool, Rep> {
 		self.view_knowledge
 			.get_mut(relay_parent)
 			.ok_or(COST_UNEXPECTED_STATEMENT)?
@@ -438,7 +452,7 @@ impl PeerData {
 		relay_parent: &Hash,
 		fingerprint: &(CompactStatement, ValidatorIndex),
 		max_message_count: usize,
-	) -> Result<(), Rep> {
+	) -> std::result::Result<(), Rep> {
 		self.view_knowledge
 			.get(relay_parent)
 			.ok_or(COST_UNEXPECTED_STATEMENT)?
@@ -534,7 +548,7 @@ struct FetchingInfo {
 /// Messages to be handled in this subsystem.
 enum Message {
 	/// Messages from other subsystems.
-	Subsystem(SubsystemResult<FromOverseer<StatementDistributionMessage>>),
+	Subsystem(FatalResult<FromOverseer<StatementDistributionMessage>>),
 	/// Messages from spawned requester background tasks.
 	Requester(Option<RequesterMessage>),
 	/// Messages from spawned responder background task.
@@ -554,7 +568,7 @@ impl Message {
 		let from_responder = from_responder.next().fuse();
 		futures::pin_mut!(from_overseer, from_requester, from_responder);
 		futures::select!(
-			msg = from_overseer => Message::Subsystem(msg),
+			msg = from_overseer => Message::Subsystem(msg.map_err(Fatal::SubsystemReceive)),
 			msg = from_requester => Message::Requester(msg),
 			msg = from_responder => Message::Responder(msg),
 		)
@@ -706,7 +720,9 @@ impl ActiveHeadData {
 
 	/// Returns an error if the statement is already known or not useful
 	/// without modifying the internal state.
-	fn check_useful_or_unknown(&self, statement: SignedFullStatement) -> Result<(), DeniedStatement> {
+	fn check_useful_or_unknown(&self, statement: SignedFullStatement)
+		-> std::result::Result<(), DeniedStatement>
+	{
 		let validator_index = statement.validator_index();
 		let compact = statement.payload().to_compact();
 		let comparator = StoredStatementComparator {
@@ -786,7 +802,7 @@ fn check_statement_signature(
 	head: &ActiveHeadData,
 	relay_parent: Hash,
 	statement: &SignedFullStatement,
-) -> Result<(), ()> {
+) -> std::result::Result<(), ()> {
 	let signing_context = SigningContext {
 		session_index: head.session_index,
 		parent_hash: relay_parent,
@@ -808,6 +824,7 @@ async fn circulate_statement_and_dependents(
 	ctx: &mut impl SubsystemContext,
 	relay_parent: Hash,
 	statement: SignedFullStatement,
+	priority_peers: Vec<PeerId>,
 	metrics: &Metrics,
 ) {
 	let active_head = match active_heads.get_mut(&relay_parent) {
@@ -827,7 +844,7 @@ async fn circulate_statement_and_dependents(
 			{
 				Some((
 					*stored.compact().candidate_hash(),
-					circulate_statement(peers, ctx, relay_parent, stored).await,
+					circulate_statement(peers, ctx, relay_parent, stored, priority_peers).await,
 				))
 			},
 			_ => None,
@@ -907,17 +924,45 @@ async fn circulate_statement(
 	ctx: &mut impl SubsystemContext,
 	relay_parent: Hash,
 	stored: &StoredStatement,
+	mut priority_peers: Vec<PeerId>,
 ) -> Vec<PeerId> {
 	let fingerprint = stored.fingerprint();
 
-	let peers_to_send: Vec<PeerId> = peers.iter().filter_map(|(peer, data)| {
+	let mut peers_to_send: Vec<PeerId> = peers.iter().filter_map(|(peer, data)| {
 		if data.can_send(&relay_parent, &fingerprint) {
 			Some(peer.clone())
 		} else {
 			None
 		}
 	}).collect();
-	let peers_to_send = util::choose_random_sqrt_subset(peers_to_send, MIN_GOSSIP_PEERS);
+
+	let good_peers: HashSet<&PeerId> = peers_to_send.iter().collect();
+	// Only take priority peers we can send data to:
+	priority_peers.retain(|p| good_peers.contains(p));
+
+	// Avoid duplicates:
+	let priority_set: HashSet<&PeerId> = priority_peers.iter().collect();
+	peers_to_send.retain(|p| !priority_set.contains(p));
+
+	let mut peers_to_send =
+		util::choose_random_sqrt_subset(peers_to_send, MIN_GOSSIP_PEERS);
+	// We don't want to use less peers, than we would without any priority peers:
+	let min_size = std::cmp::max(peers_to_send.len(), MIN_GOSSIP_PEERS);
+	// Make set full:
+	let needed_peers = min_size as i64 - priority_peers.len() as i64;
+	if needed_peers > 0 {
+		peers_to_send.truncate(needed_peers as usize);
+		// Order important here - priority peers are placed first, so will be sent first.
+		// This gives backers a chance to be among the first in requesting any large statement
+		// data.
+		priority_peers.append(&mut peers_to_send);
+	}
+	peers_to_send = priority_peers;
+	// We must not have duplicates:
+	debug_assert!(
+		peers_to_send.len() == peers_to_send.clone().into_iter().collect::<HashSet<_>>().len(),
+		"We filter out duplicates above. qed.",
+	);
 	let peers_to_send: Vec<(PeerId, bool)> = peers_to_send.into_iter()
 		.map(|peer_id| {
 			let new = peers.get_mut(&peer_id)
@@ -1246,6 +1291,7 @@ async fn handle_incoming_message_and_circulate<'a>(
 			ctx,
 			relay_parent,
 			statement,
+			Vec::new(),
 		).await;
 	}
 }
@@ -1425,6 +1471,7 @@ async fn update_peer_view_and_send_unlocked(
 
 async fn handle_network_update(
 	peers: &mut HashMap<PeerId, PeerData>,
+	authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
 	active_heads: &mut HashMap<Hash, ActiveHeadData>,
 	ctx: &mut impl SubsystemContext,
 	req_sender: &mpsc::Sender<RequesterMessage>,
@@ -1432,7 +1479,7 @@ async fn handle_network_update(
 	metrics: &Metrics,
 ) {
 	match update {
-		NetworkBridgeEvent::PeerConnected(peer, role, _) => {
+		NetworkBridgeEvent::PeerConnected(peer, role, maybe_authority) => {
 			tracing::trace!(
 				target: LOG_TARGET,
 				?peer,
@@ -1442,7 +1489,11 @@ async fn handle_network_update(
 			peers.insert(peer, PeerData {
 				view: Default::default(),
 				view_knowledge: Default::default(),
+				maybe_authority: maybe_authority.clone(),
 			});
+			if let Some(authority) = maybe_authority {
+				authorities.insert(authority, peer);
+			}
 		}
 		NetworkBridgeEvent::PeerDisconnected(peer) => {
 			tracing::trace!(
@@ -1450,7 +1501,9 @@ async fn handle_network_update(
 				?peer,
 				"Peer disconnected",
 			);
-			peers.remove(&peer);
+			if let Some(auth_id) = peers.remove(&peer).and_then(|p| p.maybe_authority) {
+				authorities.remove(&auth_id);
+			}
 		}
 		NetworkBridgeEvent::PeerMessage(peer, message) => {
 			handle_incoming_message_and_circulate(
@@ -1495,9 +1548,13 @@ impl StatementDistribution {
 	async fn run(
 		self,
 		mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>,
-	) -> SubsystemResult<()> {
+	) -> std::result::Result<(), Fatal> {
 		let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
+		let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();
 		let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
+
+		let mut runtime = RuntimeInfo::new(self.keystore.clone());
+
 		// Sender/Receiver for getting news from our statement fetching tasks.
 		let (req_sender, mut req_receiver) = mpsc::channel(1);
 		// Sender/Receiver for getting news from our responder task.
@@ -1505,43 +1562,48 @@ impl StatementDistribution {
 
 		loop {
 			let message = Message::receive(&mut ctx, &mut req_receiver, &mut res_receiver).await;
-			let finished = match message {
-				Message::Subsystem(result) =>
-					self.handle_subsystem_message(
+			match message {
+				Message::Subsystem(result) => {
+					let result = self.handle_subsystem_message(
 						&mut ctx,
+						&mut runtime,
 						&mut peers,
+						&mut authorities,
 						&mut active_heads,
 						&req_sender,
 						&res_sender,
 						result?,
 					)
-					.await?,
-				Message::Requester(result) =>
-					self.handle_requester_message(
+					.await;
+					match result {
+						Ok(true) => break,
+						Ok(false) => {}
+						Err(Error(Fault::Fatal(f))) => return Err(f), 
+						Err(Error(Fault::Err(error))) =>
+							tracing::debug!(target: LOG_TARGET, ?error)
+					}
+				}
+				Message::Requester(result) => {
+					let result = self.handle_requester_message(
 						&mut ctx,
 						&mut peers,
 						&mut active_heads,
 						&req_sender,
-						result.ok_or(SubsystemError::Context(
-							"Failed to read from requester receiver (stream finished)"
-								.to_string()
-						))?
+						result.ok_or(Fatal::RequesterReceiverFinished)?
 					)
-					.await?,
-				Message::Responder(result) =>
-					self.handle_responder_message(
+					.await;
+					log_error(result.map_err(From::from), "handle_requester_message")?;
+				}
+				Message::Responder(result) => {
+					let result = self.handle_responder_message(
 						&peers,
 						&mut active_heads,
-						result.ok_or(SubsystemError::Context(
-							"Failed to read from responder receiver (stream finished)"
-								.to_string()
-						))?
+						result.ok_or(Fatal::ResponderReceiverFinished)?
 					)
-					.await?,
+					.await;
+					log_error(result.map_err(From::from), "handle_responder_message")?;
+				}
 			};
-			if finished {
-				break
-			}
 		}
 		Ok(())
 	}
@@ -1552,7 +1614,7 @@ impl StatementDistribution {
 		peers: &HashMap<PeerId, PeerData>,
 		active_heads: &mut HashMap<Hash, ActiveHeadData>,
 		message: ResponderMessage,
-	) -> SubsystemResult<bool> {
+	) -> NonFatalResult<()> {
 		match message {
 			ResponderMessage::GetData {
 				requesting_peer,
@@ -1566,39 +1628,28 @@ impl StatementDistribution {
 					&relay_parent,
 					&candidate_hash
 				) {
-					tracing::warn!(
-						target: LOG_TARGET,
-						"Peer requested candidate, although we never announced it to that peer."
-					);
-					return Ok(false)
+					return Err(
+						NonFatal::RequestedUnannouncedCandidate(requesting_peer, candidate_hash)
+					)
 				}
 
-				let active_head = match active_heads.get(&relay_parent) {
-					Some(head) => head,
-					None => return Ok(false),
-				};
+				let active_head = active_heads
+						.get(&relay_parent)
+						.ok_or(NonFatal::NoSuchHead(relay_parent))?;
+
 				let committed = match active_head.waiting_large_statements.get(&candidate_hash) {
 					Some(LargeStatementStatus::Fetched(committed)) => committed.clone(),
 					_ => {
-						tracing::debug!(
-							target: LOG_TARGET,
-							?candidate_hash,
-							"Requested data not found - this should not happen under normal circumstances."
-						);
-						return Ok(false)
+						return Err(
+							NonFatal::NoSuchFetchedLargeStatement(relay_parent, candidate_hash)
+						)
 					}
 				};
 
-				if let Err(_) = tx.send(committed) {
-					tracing::debug!(
-						target: LOG_TARGET,
-						"Sending data to responder failed"
-					);
-					return Ok(false)
-				}
+				tx.send(committed).map_err(|_| NonFatal::ResponderGetDataCanceled)?;
 			}
 		}
-		Ok(false)
+		Ok(())
 	}
 
 	async fn handle_requester_message(
@@ -1608,7 +1659,7 @@ impl StatementDistribution {
 		active_heads: &mut HashMap<Hash, ActiveHeadData>,
 		req_sender: &mpsc::Sender<RequesterMessage>,
 		message: RequesterMessage,
-	) -> SubsystemResult<bool> {
+	) -> NonFatalResult<()> {
 		match message {
 			RequesterMessage::Finished {
 				relay_parent,
@@ -1622,10 +1673,9 @@ impl StatementDistribution {
 				}
 				report_peer(ctx, from_peer, BENEFIT_VALID_RESPONSE).await;
 
-				let active_head = match active_heads.get_mut(&relay_parent) {
-					Some(head) => head,
-					None => return Ok(false),
-				};
+				let active_head = active_heads
+					.get_mut(&relay_parent)
+					.ok_or(NonFatal::NoSuchHead(relay_parent))?;
 
 				let status = active_head
 					.waiting_large_statements
@@ -1634,15 +1684,12 @@ impl StatementDistribution {
 				let info = match status {
 					Some(LargeStatementStatus::Fetching(info)) => info,
 					Some(LargeStatementStatus::Fetched(_)) => {
-						debug_assert!(false, "On status fetched, fetching task already succeeded. qed.");
-						return Ok(false)
+						panic!("On status fetched, fetching task already succeeded. qed.");
 					}
 					None => {
-						tracing::warn!(
-							target: LOG_TARGET,
-							"Received finished task event for non existent status - not supposed to happen."
-						);
-						return Ok(false)
+						return Err(
+							NonFatal::NoSuchLargeStatementStatus(relay_parent, candidate_hash)
+						)
 					}
 				};
 
@@ -1682,10 +1729,9 @@ impl StatementDistribution {
 				candidate_hash,
 				tx,
 			} => {
-				let active_head = match active_heads.get_mut(&relay_parent) {
-					Some(head) => head,
-					None => return Ok(false),
-				};
+				let active_head = active_heads
+					.get_mut(&relay_parent)
+					.ok_or(NonFatal::NoSuchHead(relay_parent))?;
 
 				let status = active_head
 					.waiting_large_statements
@@ -1693,16 +1739,12 @@ impl StatementDistribution {
 
 				let info = match status {
 					Some(LargeStatementStatus::Fetching(info)) => info,
-					Some(LargeStatementStatus::Fetched(_)) => {
-						debug_assert!(false, "On status fetched, fetching task already succeeded. qed.");
-						return Ok(false)
-					}
+					Some(LargeStatementStatus::Fetched(_)) =>
+						panic!("On status fetched, fetching task already succeeded. qed."),
 					None => {
-						tracing::warn!(
-							target: LOG_TARGET,
-							"Received 'get more peers' event for non existent status - not supposed to happen."
-						);
-						return Ok(false)
+						return Err(
+							NonFatal::NoSuchLargeStatementStatus(relay_parent, candidate_hash)
+						)
 					}
 				};
 
@@ -1719,18 +1761,21 @@ impl StatementDistribution {
 			RequesterMessage::ReportPeer(peer, rep) =>
 				report_peer(ctx, peer, rep).await,
 		}
-		Ok(false)
+		Ok(())
 	}
 
+
 	async fn handle_subsystem_message(
 		&self,
 		ctx: &mut impl SubsystemContext,
+		runtime: &mut RuntimeInfo,
 		peers: &mut HashMap<PeerId, PeerData>,
+		authorities: &mut HashMap<AuthorityDiscoveryId, PeerId>,
 		active_heads: &mut HashMap<Hash, ActiveHeadData>,
 		req_sender: &mpsc::Sender<RequesterMessage>,
 		res_sender: &mpsc::Sender<ResponderMessage>,
 		message: FromOverseer<StatementDistributionMessage>,
-	) -> SubsystemResult<bool> {
+	) -> Result<bool> {
 		let metrics = &self.metrics;
 
 		match message {
@@ -1746,45 +1791,12 @@ impl StatementDistribution {
 						"New active leaf",
 					);
 
-					let (validators, session_index) = {
-						let (val_tx, val_rx) = oneshot::channel();
-						let (session_tx, session_rx) = oneshot::channel();
-
-						let val_message = AllMessages::RuntimeApi(
-							RuntimeApiMessage::Request(
-								relay_parent,
-								RuntimeApiRequest::Validators(val_tx),
-							),
-						);
-						let session_message = AllMessages::RuntimeApi(
-							RuntimeApiMessage::Request(
-								relay_parent,
-								RuntimeApiRequest::SessionIndexForChild(session_tx),
-							),
-						);
-						ctx.send_messages(
-							std::iter::once(val_message).chain(std::iter::once(session_message))
-						).await;
-
-						match (val_rx.await?, session_rx.await?) {
-							(Ok(v), Ok(s)) => (v, s),
-							(Err(e), _) | (_, Err(e)) => {
-								tracing::warn!(
-									target: LOG_TARGET,
-									err = ?e,
-									"Failed to fetch runtime API data for active leaf",
-								);
-
-								// Lacking this bookkeeping might make us behave funny, although
-								// not in any slashable way. But we shouldn't take down the node
-								// on what are likely spurious runtime API errors.
-								return Ok(false)
-							}
-						}
-					};
+					let session_index = runtime.get_session_index(ctx, relay_parent).await?;
+					let info = runtime.get_session_info_by_index(ctx, relay_parent, session_index).await?;
+					let session_info = &info.session_info;
 
 					active_heads.entry(relay_parent)
-						.or_insert(ActiveHeadData::new(validators, session_index, span));
+						.or_insert(ActiveHeadData::new(session_info.validators.clone(), session_index, span));
 
 					active_heads.retain(|h, _| {
 						let live = !deactivated.contains(h);
@@ -1810,19 +1822,11 @@ impl StatementDistribution {
 					// Make sure we have data in cache:
 					if is_statement_large(&statement) {
 						if let Statement::Seconded(committed) = &statement.payload() {
-							let active_head = match active_heads.get_mut(&relay_parent) {
-								Some(h) => h,
-								None => {
-									// This should never be out-of-sync with our view if the view updates
-									// correspond to actual `StartWork` messages. So we just log and ignore.
-									tracing::warn!(
-										target: LOG_TARGET,
-										%relay_parent,
-										"our view out-of-sync with active heads; head not found",
-									);
-									return Ok(false)
-								}
-							};
+							let active_head = active_heads
+								.get_mut(&relay_parent)
+								// This should never be out-of-sync with our view if the view
+								// updates correspond to actual `StartWork` messages.
+								.ok_or(NonFatal::NoSuchHead(relay_parent))?;
 							active_head.waiting_large_statements.insert(
 								statement.payload().candidate_hash(),
 								LargeStatementStatus::Fetched(committed.clone())
@@ -1830,12 +1834,36 @@ impl StatementDistribution {
 						}
 					}
 
+					let info = runtime.get_session_info(ctx, relay_parent).await?;
+					let session_info = &info.session_info;
+					let validator_info = &info.validator_info;
+
+					// Get peers in our group, so we can make sure they get our statement
+					// directly:
+					let group_peers = {
+						if let Some(our_group) = validator_info.our_group {
+							let our_group = &session_info.validator_groups[our_group.0 as usize];
+
+							our_group.into_iter()
+								.filter_map(|i| {
+									if Some(*i) == validator_info.our_index {
+										return None
+									}
+									let authority_id = &session_info.discovery_keys[i.0 as usize];
+									authorities.get(authority_id).map(|p| *p)
+								})
+								.collect()
+						} else {
+							Vec::new()
+						}
+					};
 					circulate_statement_and_dependents(
 						peers,
 						active_heads,
 						ctx,
 						relay_parent,
 						statement,
+						group_peers,
 						metrics,
 					).await;
 				}
@@ -1844,6 +1872,7 @@ impl StatementDistribution {
 
 					handle_network_update(
 						peers,
+						authorities,
 						active_heads,
 						ctx,
 						req_sender,
@@ -1855,7 +1884,9 @@ impl StatementDistribution {
 					ctx.spawn(
 						"large-statement-responder",
 						respond(receiver, res_sender.clone()).boxed()
-					).await?;
+					)
+					.await
+					.map_err(Fatal::SpawnTask)?;
 				}
 			}
 		}
@@ -2009,16 +2040,16 @@ mod tests {
 	use super::*;
 	use std::sync::Arc;
 	use sp_keyring::Sr25519Keyring;
-	use sp_application_crypto::AppKey;
+	use sp_application_crypto::{AppKey, sr25519::Pair, Pair as TraitPair};
 	use polkadot_node_primitives::Statement;
-	use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode};
+	use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode, SessionInfo};
 	use assert_matches::assert_matches;
 	use futures::executor::{self, block_on};
 	use futures_timer::Delay;
 	use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
 	use sc_keystore::LocalKeystore;
 	use polkadot_node_network_protocol::{view, ObservedRole, request_response::Recipient};
-	use polkadot_subsystem::{jaeger, ActivatedLeaf};
+	use polkadot_subsystem::{jaeger, ActivatedLeaf, messages::{RuntimeApiMessage, RuntimeApiRequest}};
 	use polkadot_node_network_protocol::request_response::{
 		Requests,
 		v1::{
@@ -2418,6 +2449,7 @@ mod tests {
 
 				k
 			},
+			maybe_authority: None,
 		};
 
 		let pool = sp_core::testing::TaskExecutor::new();
@@ -2505,6 +2537,7 @@ mod tests {
 		let peer_data_from_view = |view: View| PeerData {
 			view: view.clone(),
 			view_knowledge: view.iter().map(|v| (v.clone(), Default::default())).collect(),
+			maybe_authority: None,
 		};
 
 		let mut peer_data: HashMap<_, _> = vec![
@@ -2554,6 +2587,7 @@ mod tests {
 				&mut ctx,
 				hash_b,
 				&statement,
+				Vec::new(),
 			).await;
 
 			{
@@ -2611,18 +2645,20 @@ mod tests {
 		let peer_b = PeerId::random();
 
 		let validators = vec![
-			Sr25519Keyring::Alice.public().into(),
-			Sr25519Keyring::Bob.public().into(),
-			Sr25519Keyring::Charlie.public().into(),
+			Sr25519Keyring::Alice.pair(),
+			Sr25519Keyring::Bob.pair(),
+			Sr25519Keyring::Charlie.pair(),
 		];
 
+		let session_info = make_session_info(validators, vec![]);
+
 		let session_index = 1;
 
 		let pool = sp_core::testing::TaskExecutor::new();
 		let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
 
 		let bg = async move {
-			let s = StatementDistribution { metrics: Default::default() };
+			let s = StatementDistribution { metrics: Default::default(), keystore: Arc::new(LocalKeystore::in_memory()) };
 			s.run(ctx).await.unwrap();
 		};
 
@@ -2640,22 +2676,22 @@ mod tests {
 			assert_matches!(
 				handle.recv().await,
 				AllMessages::RuntimeApi(
-					RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx))
+					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
 				)
 					if r == hash_a
 				=> {
-					let _ = tx.send(Ok(validators));
+					let _ = tx.send(Ok(session_index));
 				}
 			);
 
 			assert_matches!(
 				handle.recv().await,
 				AllMessages::RuntimeApi(
-					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
+					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
 				)
-					if r == hash_a
+					if r == hash_a && sess_index == session_index
 				=> {
-					let _ = tx.send(Ok(session_index));
+					let _ = tx.send(Ok(Some(session_info)));
 				}
 			);
 
@@ -2767,24 +2803,31 @@ mod tests {
 			c
 		};
 
-		let peer_a = PeerId::random();
-		let peer_b = PeerId::random();
-		let peer_c = PeerId::random();
-		let peer_bad = PeerId::random();
+		let peer_a = PeerId::random(); // Alice
+		let peer_b = PeerId::random(); // Bob
+		let peer_c = PeerId::random(); // Charlie
+		let peer_bad = PeerId::random(); // No validator
 
 		let validators = vec![
-			Sr25519Keyring::Alice.public().into(),
-			Sr25519Keyring::Bob.public().into(),
-			Sr25519Keyring::Charlie.public().into(),
+			Sr25519Keyring::Alice.pair(),
+			Sr25519Keyring::Bob.pair(),
+			Sr25519Keyring::Charlie.pair(),
+			// We:
+			Sr25519Keyring::Ferdie.pair(),
 		];
 
+		let session_info = make_session_info(
+			validators,
+			vec![vec![0,1,2,4], vec![3]]
+		);
+
 		let session_index = 1;
 
 		let pool = sp_core::testing::TaskExecutor::new();
 		let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
 
 		let bg = async move {
-			let s = StatementDistribution { metrics: Default::default() };
+			let s = StatementDistribution { metrics: Default::default(), keystore: make_ferdie_keystore()};
 			s.run(ctx).await.unwrap();
 		};
 
@@ -2808,40 +2851,52 @@ mod tests {
 			assert_matches!(
 				handle.recv().await,
 				AllMessages::RuntimeApi(
-					RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx))
+					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
 				)
 					if r == hash_a
 				=> {
-					let _ = tx.send(Ok(validators));
+					let _ = tx.send(Ok(session_index));
 				}
 			);
 
 			assert_matches!(
 				handle.recv().await,
 				AllMessages::RuntimeApi(
-					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
+					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
 				)
-					if r == hash_a
+					if r == hash_a && sess_index == session_index
 				=> {
-					let _ = tx.send(Ok(session_index));
+					let _ = tx.send(Ok(Some(session_info)));
 				}
 			);
 
 			// notify of peers and view
 			handle.send(FromOverseer::Communication {
 				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
-					NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None)
+					NetworkBridgeEvent::PeerConnected(
+						peer_a.clone(),
+						ObservedRole::Full,
+						Some(Sr25519Keyring::Alice.public().into())
+					)
 				)
 			}).await;
 
 			handle.send(FromOverseer::Communication {
 				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
-					NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None)
+					NetworkBridgeEvent::PeerConnected(
+						peer_b.clone(),
+						ObservedRole::Full,
+						Some(Sr25519Keyring::Bob.public().into())
+					)
 				)
 			}).await;
 			handle.send(FromOverseer::Communication {
 				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
-					NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full, None)
+					NetworkBridgeEvent::PeerConnected(
+						peer_c.clone(),
+						ObservedRole::Full,
+						Some(Sr25519Keyring::Charlie.public().into())
+					)
 				)
 			}).await;
 			handle.send(FromOverseer::Communication {
@@ -3111,7 +3166,15 @@ mod tests {
 						),
 					)
 				) => {
-					assert_eq!(recipients.sort(), vec![peer_b.clone(), peer_c.clone()].sort());
+					tracing::debug!(
+						target: LOG_TARGET,
+						?recipients,
+						"Recipients received"
+					);
+					recipients.sort();
+					let mut expected = vec![peer_b, peer_c, peer_bad];
+					expected.sort();
+					assert_eq!(recipients, expected);
 					assert_eq!(meta.relay_parent, hash_a);
 					assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
 					assert_eq!(meta.signed_by, statement.validator_index());
@@ -3181,4 +3244,307 @@ mod tests {
 
 		executor::block_on(future::join(test_fut, bg));
 	}
+
+	#[test]
+	fn share_prioritizes_backing_group() {
+		sp_tracing::try_init_simple();
+		let hash_a = Hash::repeat_byte(1);
+
+		let candidate = {
+			let mut c = CommittedCandidateReceipt::default();
+			c.descriptor.relay_parent = hash_a;
+			c.descriptor.para_id = 1.into();
+			c.commitments.new_validation_code = Some(ValidationCode(vec![1,2,3]));
+			c
+		};
+
+		let peer_a = PeerId::random(); // Alice
+		let peer_b = PeerId::random(); // Bob
+		let peer_c = PeerId::random(); // Charlie
+		let peer_bad = PeerId::random(); // No validator
+		let peer_other_group = PeerId::random(); //Ferdie
+
+		let mut validators = vec![
+			Sr25519Keyring::Alice.pair(),
+			Sr25519Keyring::Bob.pair(),
+			Sr25519Keyring::Charlie.pair(),
+			// other group
+			Sr25519Keyring::Dave.pair(),
+			// We:
+			Sr25519Keyring::Ferdie.pair(),
+		];
+
+		// Strictly speaking we only need MIN_GOSSIP_PEERS - 3 to make sure only priority peers
+		// will be served, but by using a larger value we test for overflow errors:
+		let dummy_count = MIN_GOSSIP_PEERS;
+
+		// We artificially inflate our group, so there won't be any free slots for other peers. (We
+		// want to test that our group is prioritized):
+		let dummy_pairs: Vec<_> = std::iter::repeat_with(|| Pair::generate().0).take(dummy_count).collect();
+		let dummy_peers: Vec<_> = std::iter::repeat_with(|| PeerId::random()).take(dummy_count).collect();
+
+		validators = validators.into_iter().chain(dummy_pairs.clone()).collect();
+
+		let mut first_group = vec![0,1,2,4];
+		first_group.append(&mut (0..dummy_count as u32).map(|v| v + 5).collect());
+		let session_info = make_session_info(
+			validators,
+			vec![first_group, vec![3]]
+		);
+
+		let session_index = 1;
+
+		let pool = sp_core::testing::TaskExecutor::new();
+		let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
+
+		let bg = async move {
+			let s = StatementDistribution { metrics: Default::default(), keystore: make_ferdie_keystore()};
+			s.run(ctx).await.unwrap();
+		};
+
+		let (mut tx_reqs, rx_reqs) = mpsc::channel(1);
+
+		let test_fut = async move {
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::StatementFetchingReceiver(rx_reqs)
+			}).await;
+
+			// register our active heads.
+			handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
+				activated: vec![ActivatedLeaf {
+					hash: hash_a,
+					number: 1,
+					span: Arc::new(jaeger::Span::Disabled),
+				}].into(),
+				deactivated: vec![].into(),
+			}))).await;
+
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::RuntimeApi(
+					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
+				)
+					if r == hash_a
+				=> {
+					let _ = tx.send(Ok(session_index));
+				}
+			);
+
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::RuntimeApi(
+					RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
+				)
+					if r == hash_a && sess_index == session_index
+				=> {
+					let _ = tx.send(Ok(Some(session_info)));
+				}
+			);
+
+			// notify of dummy peers and view
+			for (peer, pair) in dummy_peers.clone().into_iter().zip(dummy_pairs) {
+				handle.send(FromOverseer::Communication {
+					msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+						NetworkBridgeEvent::PeerConnected(
+							peer,
+							ObservedRole::Full,
+							Some(pair.public().into()),
+						)
+					)
+				}).await;
+
+				handle.send(FromOverseer::Communication {
+					msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+						NetworkBridgeEvent::PeerViewChange(peer, view![hash_a])
+					)
+				}).await;
+			}
+
+			// notify of peers and view
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerConnected(
+						peer_a.clone(),
+						ObservedRole::Full,
+						Some(Sr25519Keyring::Alice.public().into())
+					)
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerConnected(
+						peer_b.clone(),
+						ObservedRole::Full,
+						Some(Sr25519Keyring::Bob.public().into())
+					)
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerConnected(
+						peer_c.clone(),
+						ObservedRole::Full,
+						Some(Sr25519Keyring::Charlie.public().into())
+					)
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None)
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerConnected(
+						peer_other_group.clone(),
+						ObservedRole::Full,
+						Some(Sr25519Keyring::Dave.public().into())
+					)
+				)
+			}).await;
+
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a])
+				)
+			}).await;
+
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a])
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a])
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a])
+				)
+			}).await;
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
+					NetworkBridgeEvent::PeerViewChange(peer_other_group.clone(), view![hash_a])
+				)
+			}).await;
+
+			// receive a seconded statement from peer A, which does not provide the request data,
+			// then get that data from peer C. It should be propagated onwards to peer B and to
+			// candidate backing.
+			let statement = {
+				let signing_context = SigningContext {
+					parent_hash: hash_a,
+					session_index,
+				};
+
+				let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
+				let ferdie_public = CryptoStore::sr25519_generate_new(
+					&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Ferdie.to_seed())
+				).await.unwrap();
+
+				SignedFullStatement::sign(
+					&keystore,
+					Statement::Seconded(candidate.clone()),
+					&signing_context,
+					ValidatorIndex(4),
+					&ferdie_public.into(),
+				).await.ok().flatten().expect("should be signed")
+			};
+
+			let metadata =
+				protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata();
+
+			handle.send(FromOverseer::Communication {
+				msg: StatementDistributionMessage::Share(hash_a, statement.clone())
+			}).await;
+
+			// Messages should go out:
+			assert_matches!(
+				handle.recv().await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::SendValidationMessage(
+						mut recipients,
+						protocol_v1::ValidationProtocol::StatementDistribution(
+							protocol_v1::StatementDistributionMessage::LargeStatement(meta)
+						),
+					)
+				) => {
+					tracing::debug!(
+						target: LOG_TARGET,
+						?recipients,
+						"Recipients received"
+					);
+					recipients.sort();
+					// We expect only our backing group to be the recipients, du to the inflated
+					// test group above:
+					let mut expected: Vec<_> = vec![peer_a, peer_b, peer_c].into_iter().chain(dummy_peers).collect();
+					expected.sort();
+					assert_eq!(recipients.len(), expected.len());
+					assert_eq!(recipients, expected);
+					assert_eq!(meta.relay_parent, hash_a);
+					assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
+					assert_eq!(meta.signed_by, statement.validator_index());
+					assert_eq!(&meta.signature, statement.signature());
+				}
+			);
+
+			// Now that it has the candidate it should answer requests accordingly:
+
+			let (pending_response, response_rx) = oneshot::channel();
+			let inner_req = StatementFetchingRequest {
+				relay_parent: metadata.relay_parent,
+				candidate_hash: metadata.candidate_hash,
+			};
+			let req = sc_network::config::IncomingRequest {
+				peer: peer_b,
+				payload: inner_req.encode(),
+				pending_response,
+			};
+			tx_reqs.send(req).await.unwrap();
+			let StatementFetchingResponse::Statement(committed) =
+				Decode::decode(&mut response_rx.await.unwrap().result.unwrap().as_ref()).unwrap();
+			assert_eq!(committed, candidate);
+
+			handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
+		};
+
+		futures::pin_mut!(test_fut);
+		futures::pin_mut!(bg);
+
+		executor::block_on(future::join(test_fut, bg));
+	}
+
+	fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInfo {
+
+		let validator_groups: Vec<Vec<ValidatorIndex>> = groups
+			.iter().map(|g| g.into_iter().map(|v| ValidatorIndex(*v)).collect()).collect();
+
+		SessionInfo {
+			discovery_keys: validators.iter().map(|k| k.public().into()).collect(),
+			// Not used:
+			n_cores: validator_groups.len() as u32,
+			validator_groups,
+			validators: validators.iter().map(|k| k.public().into()).collect(),
+			// Not used values:
+			assignment_keys: Vec::new(),
+			zeroth_delay_tranche_width: 0,
+			relay_vrf_modulo_samples: 0,
+			n_delay_tranches: 0,
+			no_show_slots: 0,
+			needed_approvals: 0,
+		}
+	}
+
+	pub fn make_ferdie_keystore() -> SyncCryptoStorePtr {
+		let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
+		SyncCryptoStore::sr25519_generate_new(
+			&*keystore,
+			ValidatorId::ID,
+			Some(&Sr25519Keyring::Ferdie.to_seed()),
+			)
+			.expect("Insert key into keystore");
+		keystore
+	}
 }
diff --git a/polkadot/node/network/statement-distribution/src/requester.rs b/polkadot/node/network/statement-distribution/src/requester.rs
index 66a7979eda2..2368da3cd2c 100644
--- a/polkadot/node/network/statement-distribution/src/requester.rs
+++ b/polkadot/node/network/statement-distribution/src/requester.rs
@@ -20,7 +20,7 @@ use futures::{SinkExt, channel::{mpsc, oneshot}};
 
 use polkadot_node_network_protocol::{
 	PeerId, UnifiedReputationChange,
-    request_response::{
+	request_response::{
 		OutgoingRequest, Recipient, Requests,
 		v1::{
 			StatementFetchingRequest, StatementFetchingResponse
diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs
index 28989d23f35..13582d3d233 100644
--- a/polkadot/node/service/src/lib.rs
+++ b/polkadot/node/service/src/lib.rs
@@ -530,6 +530,7 @@ where
 			spawner.clone(),
 		),
 		statement_distribution: StatementDistributionSubsystem::new(
+			keystore.clone(),
 			Metrics::register(registry)?,
 		),
 		approval_distribution: ApprovalDistributionSubsystem::new(
diff --git a/polkadot/node/subsystem-util/src/error_handling.rs b/polkadot/node/subsystem-util/src/error_handling.rs
new file mode 100644
index 00000000000..b2040e53ab0
--- /dev/null
+++ b/polkadot/node/subsystem-util/src/error_handling.rs
@@ -0,0 +1,201 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Utilities for general error handling in Polkadot.
+//!
+//! Goals:
+//!
+//! - Ergonomic API with little repetition.
+//! - Still explicitness where it matters - fatal errors should be visible and justified.
+//! - Easy recovering from non fatal errors.
+//! - Errors start as non fatal and can be made fatal at the level where it is really clear they
+//!	  are fatal. E.g. cancellation of a oneshot might be fatal in one case, but absolutely expected
+//!	  in another.
+//! - Good error messages. Fatal errors don't need to be properly structured (as we won't handle
+//!   them), but should provide good error messages of what is going on.
+//! - Encourage many error types. One per module or even per function is totally fine - it makes
+//!   error handling robust, if you only need to handle errors that can actually happen, also error
+//!   messages will get better.
+
+use thiserror::Error;
+
+/// Error abstraction.
+///
+/// Errors might either be fatal and should bring the subsystem down or are at least at the point
+/// of occurrence deemed potentially recoverable.
+///
+/// Upper layers might have a better view and might make a non fatal error of a called function a
+/// fatal one. The opposite should not happen, therefore don't make an error fatal if you don't
+/// know it is in all cases.
+///
+/// Usage pattern:
+///
+/// ```
+/// use thiserror::Error;
+/// use polkadot_node_subsystem::errors::RuntimeApiError;
+/// use polkadot_primitives::v1::SessionIndex;
+/// use futures::channel::oneshot;
+/// use polkadot_node_subsystem_util::{Fault, runtime};
+///
+/// #[derive(Debug, Error)]
+/// #[error(transparent)]
+/// pub struct Error(pub Fault<NonFatal, Fatal>);
+///
+/// pub type Result<T> = std::result::Result<T, Error>;
+/// pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;
+/// pub type FatalResult<T> = std::result::Result<T, Fatal>;
+///
+/// // Make an error from a `NonFatal` one.
+/// impl From<NonFatal> for Error {
+/// 	fn from(e: NonFatal) -> Self {
+/// 		Self(Fault::from_non_fatal(e))
+/// 	}
+/// }
+/// 
+/// // Make an Error from a `Fatal` one.
+/// impl From<Fatal> for Error {
+/// 	fn from(f: Fatal) -> Self {
+/// 		Self(Fault::from_fatal(f))
+/// 	}
+/// }
+/// 
+/// // Easy conversion from sub error types from other modules:
+/// impl From<runtime::Error> for Error {
+/// 	fn from(o: runtime::Error) -> Self {
+/// 		Self(Fault::from_other(o))
+/// 	}
+/// }
+///
+/// #[derive(Debug, Error)]
+/// pub enum Fatal {
+///		/// Really fatal stuff.
+///		#[error("Something fatal happened.")]
+///		SomeFatalError,
+///		/// Errors coming from runtime::Runtime.
+///		#[error("Error while accessing runtime information")]
+///		Runtime(#[from] #[source] runtime::Fatal),
+/// }
+///
+/// #[derive(Debug, Error)]
+/// pub enum NonFatal {
+///		/// Some non fatal error.
+///		/// For example if we prune a block we're requesting info about.
+///		#[error("Non fatal error happened.")]
+///		SomeNonFatalError,
+///
+///		/// Errors coming from runtime::Runtime.
+///		#[error("Error while accessing runtime information")]
+///		Runtime(#[from] #[source] runtime::NonFatal),
+/// }
+/// ```
+/// Then mostly use `Error` in functions, you may also use `NonFatal` and `Fatal` directly in
+/// functions that strictly only fail non fatal or fatal respectively, as `Fatal` and `NonFatal`
+/// can automatically converted into the above defined `Error`.
+/// ```
+#[derive(Debug, Error)]
+pub enum Fault<E, F>
+	where
+		E: std::fmt::Debug + std::error::Error + 'static,
+		F: std::fmt::Debug + std::error::Error + 'static, {
+	/// Error is fatal and should be escalated up.
+	///
+	/// While we usually won't want to pattern match on those, a concrete descriptive enum might
+	/// still be a good idea for easy auditing of what can go wrong in a module and also makes for
+	/// good error messages thanks to `thiserror`.
+	#[error("Fatal error occurred.")]
+	Fatal(#[source] F),
+	/// Error that is not fatal, at least not yet at this level of execution.
+	#[error("Non fatal error occurred.")]
+	Err(#[source] E),
+}
+
+/// Due to typesystem constraints we cannot implement the following methods as standard
+/// `From::from` implementations. So no auto conversions by default, a simple `Result::map_err` is
+/// not too bad though.
+impl<E, F> Fault<E, F>
+	where
+		E: std::fmt::Debug + std::error::Error + 'static,
+		F: std::fmt::Debug + std::error::Error + 'static,
+{
+	/// Build an `Fault` from compatible fatal error.
+	pub fn from_fatal<F1: Into<F>>(f: F1) -> Self {
+		Self::Fatal(f.into())
+	}
+
+	/// Build an `Fault` from compatible non fatal error.
+	pub fn from_non_fatal<E1: Into<E>>(e: E1) -> Self {
+		Self::Err(e.into())
+	}
+
+	/// Build an `Fault` from a compatible other `Fault`.
+	pub fn from_other<E1, F1>(e: Fault<E1, F1>) -> Self
+	where
+		E1: Into<E> + std::fmt::Debug + std::error::Error + 'static,
+		F1: Into<F> + std::fmt::Debug + std::error::Error + 'static,
+	{
+		match e {
+			Fault::Fatal(f) => Self::from_fatal(f),
+			Fault::Err(e) => Self::from_non_fatal(e),
+		}
+	}
+}
+
+/// Unwrap non fatal error and report fatal one.
+///
+/// This function is useful for top level error handling. Fatal errors will be extracted,
+/// non fatal error will be returned for handling.
+///
+/// Usage:
+///
+/// ```no_run
+/// # use thiserror::Error;
+/// # use polkadot_node_subsystem_util::{Fault, unwrap_non_fatal};
+/// # use polkadot_node_subsystem::SubsystemError;
+/// # #[derive(Error, Debug)]
+/// # enum Fatal {
+/// # }
+/// # #[derive(Error, Debug)]
+/// # enum NonFatal {
+/// # }
+/// # fn computation() -> Result<(), Fault<NonFatal, Fatal>> {
+/// # 	panic!();
+/// # }
+/// #
+/// // Use run like so:
+/// //	run(ctx)
+/// //		.map_err(|e| SubsystemError::with_origin("subsystem-name", e))
+/// fn run() -> std::result::Result<(), Fatal> {
+///		loop {
+///			// ....
+///			if let Some(err) = unwrap_non_fatal(computation())? {
+///				println!("Something bad happened: {}", err);
+///				continue
+///			}
+///		}
+/// }
+///
+/// ```
+pub fn unwrap_non_fatal<E,F>(result: Result<(), Fault<E,F>>) -> Result<Option<E>, F>
+	where
+		E: std::fmt::Debug + std::error::Error + 'static,
+		F: std::fmt::Debug + std::error::Error + Send + Sync + 'static
+{
+	match result {
+		Ok(()) => Ok(None),
+		Err(Fault::Fatal(f)) => Err(f),
+		Err(Fault::Err(e)) => Ok(Some(e)),
+	}
+}
diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs
index eb55d31cae3..98f3dda1a34 100644
--- a/polkadot/node/subsystem-util/src/lib.rs
+++ b/polkadot/node/subsystem-util/src/lib.rs
@@ -55,6 +55,11 @@ pub mod validator_discovery;
 pub use metered_channel as metered;
 pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
 
+mod error_handling;
+
+/// Error classification.
+pub use error_handling::{Fault, unwrap_non_fatal};
+
 /// These reexports are required so that external crates can use the `delegated_subsystem` macro properly.
 pub mod reexports {
 	pub use sp_core::traits::SpawnNamed;
diff --git a/polkadot/node/subsystem-util/src/runtime/error.rs b/polkadot/node/subsystem-util/src/runtime/error.rs
index 9b298f5279e..94cbe05e945 100644
--- a/polkadot/node/subsystem-util/src/runtime/error.rs
+++ b/polkadot/node/subsystem-util/src/runtime/error.rs
@@ -23,15 +23,36 @@ use futures::channel::oneshot;
 use polkadot_node_subsystem::errors::RuntimeApiError;
 use polkadot_primitives::v1::SessionIndex;
 
+use crate::Fault;
+
 pub type Result<T> = std::result::Result<T, Error>;
 
-/// Errors for fetching of runtime information.
+/// Errors for `Runtime` cache.
+pub type Error = Fault<NonFatal, Fatal>;
+
+impl From<NonFatal> for Error {
+	fn from(e: NonFatal) -> Self {
+		Self::from_non_fatal(e)
+	}
+}
+
+impl From<Fatal> for Error {
+	fn from(f: Fatal) -> Self {
+		Self::from_fatal(f)
+	}
+}
+
+/// Fatal runtime errors.
 #[derive(Debug, Error)]
-pub enum Error {
+pub enum Fatal {
 	/// Runtime API subsystem is down, which means we're shutting down.
-	#[error("Runtime request canceled")]
+	#[error("Runtime request got canceled")]
 	RuntimeRequestCanceled(oneshot::Canceled),
+}
 
+/// Errors for fetching of runtime information.
+#[derive(Debug, Error)]
+pub enum NonFatal {
 	/// Some request to the runtime failed.
 	/// For example if we prune a block we're requesting info about.
 	#[error("Runtime API error")]
@@ -45,8 +66,9 @@ pub enum Error {
 /// Receive a response from a runtime request and convert errors.
 pub(crate) async fn recv_runtime<V>(
 	r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
-) -> std::result::Result<V, Error> {
-	r.await
-		.map_err(Error::RuntimeRequestCanceled)?
-		.map_err(Error::RuntimeRequest)
+) -> Result<V> {
+	let result = r.await
+		.map_err(Fatal::RuntimeRequestCanceled)?
+		.map_err(NonFatal::RuntimeRequest)?;
+	Ok(result)
 }
diff --git a/polkadot/node/subsystem-util/src/runtime/mod.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs
index 266415e0f5e..0012c9f6b44 100644
--- a/polkadot/node/subsystem-util/src/runtime/mod.rs
+++ b/polkadot/node/subsystem-util/src/runtime/mod.rs
@@ -33,12 +33,12 @@ use crate::{
 mod error;
 
 use error::{recv_runtime, Result};
-pub use error::Error;
+pub use error::{Error, NonFatal, Fatal};
 
 /// Caching of session info.
 ///
 /// It should be ensured that a cached session stays live in the cache as long as we might need it.
-pub struct Runtime {
+pub struct RuntimeInfo {
 	/// Get the session index for a given relay parent.
 	///
 	/// We query this up to a 100 times per block, so caching it here without roundtrips over the
@@ -70,8 +70,8 @@ pub struct ValidatorInfo {
 	pub our_group: Option<GroupIndex>,
 }
 
-impl Runtime {
-	/// Create a new `Runtime` for convenient runtime fetches.
+impl RuntimeInfo {
+	/// Create a new `RuntimeInfo` for convenient runtime fetches.
 	pub fn new(keystore: SyncCryptoStorePtr) -> Self {
 		Self {
 			// Adjust, depending on how many forks we want to support.
@@ -134,7 +134,7 @@ impl Runtime {
 			let session_info =
 				recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
 					.await?
-					.ok_or(Error::NoSuchSession(session_index))?;
+					.ok_or(NonFatal::NoSuchSession(session_index))?;
 			let validator_info = self.get_validator_info(&session_info).await?;
 
 			let full_info = ExtendedSessionInfo {
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index 1c8fa9ab91c..7d6d39938ec 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -224,9 +224,13 @@ pub enum NetworkBridgeMessage {
 	SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol),
 
 	/// Send a batch of validation messages.
+	///
+	/// NOTE: Messages will be processed in order (at least statement distribution relies on this).
 	SendValidationMessages(Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>),
 
 	/// Send a batch of collation messages.
+	///
+	/// NOTE: Messages will be processed in order.
 	SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),
 
 	/// Send requests via substrate request/response.
-- 
GitLab