From 0dbdfef95e7e1272aeb7dc70286be351f1ebe899 Mon Sep 17 00:00:00 2001
From: Robert Klotzner <eskimor@users.noreply.github.com>
Date: Mon, 3 May 2021 21:41:14 +0200
Subject: [PATCH] More secure `Signed` implementation (#2963)

* Remove signature verification in backing.

`SignedFullStatement` now signals that the signature has already been
checked.

* Remove unused check_payload function.

* Introduced unchecked signed variants.

* Fix inclusion to use unchecked variant.

* More unchecked variants.

* Use unchecked variants in protocols.

* Start fixing statement-distribution.

* Fixup statement distribution.

* Fix inclusion.

* Fix warning.

* Fix backing properly.

* Fix bitfield distribution.

* Make crypto store optional for `RuntimeInfo`.

* Factor out utility functions.

* get_group_rotation_info

* WIP: Collator cleanup + check signatures.

* Convenience signature checking functions.

* Check signature on collator-side.

* Fix warnings.

* Fix collator side tests.

* Get rid of warnings.

* Better Signed/UncheckedSigned implementation.

Also get rid of Encode/Decode for Signed! *party*

* Get rid of dead code.

* Move Signed in its own module.

* into_checked -> try_into_checked

* Fix merge.
---
 polkadot/Cargo.lock                           |   1 -
 polkadot/node/core/backing/src/lib.rs         |  50 +-
 .../node/core/candidate-selection/src/lib.rs  |  13 +-
 .../node/core/parachains-inherent/src/lib.rs  |   2 +-
 .../availability-distribution/src/lib.rs      |   2 +-
 .../src/pov_requester/mod.rs                  |   2 +-
 .../src/requester/mod.rs                      |  32 +-
 .../network/bitfield-distribution/Cargo.toml  |   1 -
 .../network/bitfield-distribution/src/lib.rs  |  72 +--
 .../collator-protocol/src/collator_side.rs    | 592 ++++++++----------
 .../network/collator-protocol/src/error.rs    |  92 +++
 .../node/network/collator-protocol/src/lib.rs |  28 +-
 .../collator-protocol/src/validator_side.rs   |  13 +-
 polkadot/node/network/protocol/src/lib.rs     |  23 +-
 .../network/statement-distribution/src/lib.rs | 301 ++++-----
 polkadot/node/primitives/src/lib.rs           |   5 +-
 polkadot/node/subsystem-util/src/lib.rs       |  15 -
 .../node/subsystem-util/src/runtime/mod.rs    |  95 ++-
 polkadot/node/subsystem/src/messages.rs       |   2 +-
 polkadot/primitives/src/v0.rs                 | 159 -----
 polkadot/primitives/src/{v1.rs => v1/mod.rs}  |  19 +-
 polkadot/primitives/src/v1/signed.rs          | 282 +++++++++
 .../src/types/overseer-protocol.md            |   8 +-
 polkadot/runtime/parachains/src/inclusion.rs  |  71 ++-
 24 files changed, 1014 insertions(+), 866 deletions(-)
 create mode 100644 polkadot/node/network/collator-protocol/src/error.rs
 rename polkadot/primitives/src/{v1.rs => v1/mod.rs} (98%)
 create mode 100644 polkadot/primitives/src/v1/signed.rs

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 03d27165118..81249cd0006 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -5582,7 +5582,6 @@ dependencies = [
  "futures 0.3.14",
  "log",
  "maplit",
- "parity-scale-codec",
  "polkadot-node-network-protocol",
  "polkadot-node-subsystem",
  "polkadot-node-subsystem-test-helpers",
diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs
index b269ab6762f..49d739ff2c3 100644
--- a/polkadot/node/core/backing/src/lib.rs
+++ b/polkadot/node/core/backing/src/lib.rs
@@ -195,7 +195,6 @@ const fn group_quorum(n_validators: usize) -> usize {
 
 #[derive(Default)]
 struct TableContext {
-	signing_context: SigningContext,
 	validator: Option<Validator>,
 	groups: HashMap<ParaId, Vec<ValidatorIndex>>,
 	validators: Vec<ValidatorId>,
@@ -870,7 +869,6 @@ impl CandidateBackingJob {
 					.with_candidate(statement.payload().candidate_hash())
 					.with_relay_parent(_relay_parent);
 
-				self.check_statement_signature(&statement)?;
 				match self.maybe_validate_and_import(&root_span, sender, statement).await {
 					Err(Error::ValidationFailed(_)) => return Ok(()),
 					Err(e) => return Err(e),
@@ -1028,22 +1026,6 @@ impl CandidateBackingJob {
 		Some(signed)
 	}
 
-	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
-	fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
-		let idx = statement.validator_index().0 as usize;
-
-		if self.table_context.validators.len() > idx {
-			statement.check_signature(
-				&self.table_context.signing_context,
-				&self.table_context.validators[idx],
-			).map_err(|_| Error::InvalidSignature)?;
-		} else {
-			return Err(Error::InvalidSignature);
-		}
-
-		Ok(())
-	}
-
 	/// Insert or get the unbacked-span for the given candidate hash.
 	fn insert_or_get_unbacked_span(
 		&mut self,
@@ -1204,7 +1186,6 @@ impl util::JobTrait for CandidateBackingJob {
 			let table_context = TableContext {
 				groups,
 				validators,
-				signing_context,
 				validator,
 			};
 
@@ -1658,14 +1639,9 @@ mod tests {
 				AllMessages::StatementDistribution(
 					StatementDistributionMessage::Share(
 						parent_hash,
-						signed_statement,
+						_signed_statement,
 					)
-				) if parent_hash == test_state.relay_parent => {
-					signed_statement.check_signature(
-						&test_state.signing_context,
-						&test_state.validator_public[0],
-					).unwrap();
-				}
+				) if parent_hash == test_state.relay_parent => {}
 			);
 
 			assert_matches!(
@@ -1708,11 +1684,6 @@ mod tests {
 			}.build();
 
 			let candidate_a_hash = candidate_a.hash();
-			let public0 = CryptoStore::sr25519_generate_new(
-				&*test_state.keystore,
-				ValidatorId::ID,
-				Some(&test_state.validators[0].to_seed()),
-			).await.expect("Insert key into keystore");
 			let public1 = CryptoStore::sr25519_generate_new(
 				&*test_state.keystore,
 				ValidatorId::ID,
@@ -1795,10 +1766,9 @@ mod tests {
 			assert_matches!(
 				virtual_overseer.recv().await,
 				AllMessages::StatementDistribution(
-					StatementDistributionMessage::Share(hash, stmt)
+					StatementDistributionMessage::Share(hash, _stmt)
 				) => {
 					assert_eq!(test_state.relay_parent, hash);
-					stmt.check_signature(&test_state.signing_context, &public0.into()).expect("Is signed correctly");
 				}
 			);
 
@@ -2092,11 +2062,6 @@ mod tests {
 						signed_statement,
 					)
 				) if relay_parent == test_state.relay_parent => {
-					signed_statement.check_signature(
-						&test_state.signing_context,
-						&test_state.validator_public[0],
-					).unwrap();
-
 					assert_eq!(*signed_statement.payload(), Statement::Valid(candidate_a_hash));
 				}
 			);
@@ -2257,11 +2222,6 @@ mod tests {
 						signed_statement,
 					)
 				) if parent_hash == test_state.relay_parent => {
-					signed_statement.check_signature(
-						&test_state.signing_context,
-						&test_state.validator_public[0],
-					).unwrap();
-
 					assert_eq!(*signed_statement.payload(), Statement::Seconded(candidate_b));
 				}
 			);
@@ -2593,10 +2553,7 @@ mod tests {
 		use sp_core::Encode;
 		use std::convert::TryFrom;
 
-		let relay_parent = [1; 32].into();
 		let para_id = ParaId::from(10);
-		let session_index = 5;
-		let signing_context = SigningContext { parent_hash: relay_parent, session_index };
 		let validators = vec![
 			Sr25519Keyring::Alice,
 			Sr25519Keyring::Bob,
@@ -2614,7 +2571,6 @@ mod tests {
 		};
 
 		let table_context = TableContext {
-			signing_context,
 			validator: None,
 			groups: validator_groups,
 			validators: validator_public.clone(),
diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs
index 25180f5004b..37433ec6345 100644
--- a/polkadot/node/core/candidate-selection/src/lib.rs
+++ b/polkadot/node/core/candidate-selection/src/lib.rs
@@ -249,12 +249,12 @@ impl CandidateSelectionJob {
 						.with_relay_parent(_relay_parent);
 					self.handle_invalid(sender, candidate_receipt).await;
 				}
-				Some(CandidateSelectionMessage::Seconded(_relay_parent, statement)) => {
+				Some(CandidateSelectionMessage::Seconded(relay_parent, statement)) => {
 					let _span = span.child("handle-seconded")
 						.with_stage(jaeger::Stage::CandidateSelection)
 						.with_candidate(statement.payload().candidate_hash())
-						.with_relay_parent(_relay_parent);
-					self.handle_seconded(sender, statement).await;
+						.with_relay_parent(relay_parent);
+					self.handle_seconded(sender, relay_parent, statement).await;
 				}
 				None => break,
 			}
@@ -345,6 +345,7 @@ impl CandidateSelectionJob {
 	async fn handle_seconded(
 		&mut self,
 		sender: &mut impl SubsystemSender,
+		relay_parent: Hash,
 		statement: SignedFullStatement,
 	) {
 		let received_from = match &self.seconded_candidate {
@@ -368,7 +369,11 @@ impl CandidateSelectionJob {
 			.await;
 
 		sender.send_message(
-			CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement).into()
+			CollatorProtocolMessage::NotifyCollationSeconded(
+				received_from.clone(),
+				relay_parent,
+				statement
+			).into()
 		).await;
 	}
 }
diff --git a/polkadot/node/core/parachains-inherent/src/lib.rs b/polkadot/node/core/parachains-inherent/src/lib.rs
index 9c6b8ed0dbb..e8ffb573658 100644
--- a/polkadot/node/core/parachains-inherent/src/lib.rs
+++ b/polkadot/node/core/parachains-inherent/src/lib.rs
@@ -79,7 +79,7 @@ impl ParachainsInherentDataProvider {
 
 		let inherent_data = match res {
 			Ok(pd) => ParachainsInherentData {
-				bitfields: pd.bitfields,
+				bitfields: pd.bitfields.into_iter().map(Into::into).collect(),
 				backed_candidates: pd.backed_candidates,
 				disputes: pd.disputes,
 				parent_header,
diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs
index f0c80eb2b2d..9ebc0af130b 100644
--- a/polkadot/node/network/availability-distribution/src/lib.rs
+++ b/polkadot/node/network/availability-distribution/src/lib.rs
@@ -85,7 +85,7 @@ impl AvailabilityDistributionSubsystem {
 
 	/// Create a new instance of the availability distribution.
 	pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
-		let runtime = RuntimeInfo::new(keystore.clone());
+		let runtime = RuntimeInfo::new(Some(keystore.clone()));
 		Self { keystore, runtime,  metrics }
 	}
 
diff --git a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs
index e53fdd4b241..a7d856fefbc 100644
--- a/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs
+++ b/polkadot/node/network/availability-distribution/src/pov_requester/mod.rs
@@ -280,7 +280,7 @@ mod tests {
 		let (mut context, mut virtual_overseer) =
 			test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
 		let keystore = make_ferdie_keystore();
-		let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(keystore);
+		let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(Some(keystore));
 
 		let (tx, rx) = oneshot::channel();
 		let testee = async {
diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs
index eaef4e5f3cc..052b7cded43 100644
--- a/polkadot/node/network/availability-distribution/src/requester/mod.rs
+++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs
@@ -32,14 +32,14 @@ use futures::{
 
 use sp_keystore::SyncCryptoStorePtr;
 
-use polkadot_node_subsystem_util::request_availability_cores;
-use polkadot_primitives::v1::{CandidateHash, CoreState, Hash, OccupiedCore};
+use polkadot_node_subsystem_util::runtime::get_occupied_cores;
+use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore};
 use polkadot_subsystem::{
 	messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
 };
 
-use super::{error::recv_runtime, session_cache::SessionCache, LOG_TARGET, Metrics};
-use crate::error::Error;
+use super::{session_cache::SessionCache, LOG_TARGET, Metrics};
+
 
 /// A task fetching a particular chunk.
 mod fetch_task;
@@ -125,7 +125,7 @@ impl Requester {
 		Context: SubsystemContext,
 	{
 		for ActivatedLeaf { hash: leaf, .. } in new_heads {
-			let cores = query_occupied_cores(ctx, leaf).await?;
+			let cores = get_occupied_cores(ctx, leaf).await?;
 			tracing::trace!(
 				target: LOG_TARGET,
 				occupied_cores = ?cores,
@@ -226,25 +226,3 @@ impl Stream for Requester {
 	}
 }
 
-/// Query all hashes and descriptors of candidates pending availability at a particular block.
-#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
-async fn query_occupied_cores<Context>(
-	ctx: &mut Context,
-	relay_parent: Hash,
-) -> Result<Vec<OccupiedCore>, Error>
-where
-	Context: SubsystemContext,
-{
-	let cores = recv_runtime(request_availability_cores(relay_parent, ctx.sender()).await).await?;
-
-	Ok(cores
-		.into_iter()
-		.filter_map(|core_state| {
-			if let CoreState::Occupied(occupied) = core_state {
-				Some(occupied)
-			} else {
-				None
-			}
-		})
-		.collect())
-}
diff --git a/polkadot/node/network/bitfield-distribution/Cargo.toml b/polkadot/node/network/bitfield-distribution/Cargo.toml
index f19807a91be..712c95c5dff 100644
--- a/polkadot/node/network/bitfield-distribution/Cargo.toml
+++ b/polkadot/node/network/bitfield-distribution/Cargo.toml
@@ -7,7 +7,6 @@ edition = "2018"
 [dependencies]
 futures = "0.3.12"
 tracing = "0.1.25"
-parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
 polkadot-primitives = { path = "../../../primitives" }
 polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
 polkadot-node-subsystem-util = { path = "../../subsystem-util" }
diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs
index dbe9a61cf56..92c160cfc23 100644
--- a/polkadot/node/network/bitfield-distribution/src/lib.rs
+++ b/polkadot/node/network/bitfield-distribution/src/lib.rs
@@ -22,7 +22,6 @@
 
 #![deny(unused_crate_dependencies)]
 
-use parity_scale_codec::{Decode, Encode};
 use futures::{channel::oneshot, FutureExt};
 
 use polkadot_subsystem::messages::*;
@@ -49,7 +48,7 @@ const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Valid message");
 
 /// Checked signed availability bitfield that is distributed
 /// to other peers.
-#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
+#[derive(Debug, Clone, PartialEq, Eq)]
 struct BitfieldGossipMessage {
 	/// The relay parent this message is relative to.
 	relay_parent: Hash,
@@ -69,7 +68,7 @@ impl BitfieldGossipMessage {
 	{
 		protocol_v1::BitfieldDistributionMessage::Bitfield(
 			self.relay_parent,
-			self.signed_availability,
+			self.signed_availability.into(),
 		)
 	}
 }
@@ -392,19 +391,26 @@ async fn process_incoming_peer_message<Context>(
 	state: &mut ProtocolState,
 	metrics: &Metrics,
 	origin: PeerId,
-	message: BitfieldGossipMessage,
+	message: protocol_v1::BitfieldDistributionMessage,
 )
 where
 	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
 {
+	let protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) = message;
+	tracing::trace!(
+		target: LOG_TARGET,
+		peer_id = %origin,
+		?relay_parent,
+		"received bitfield gossip from peer"
+	);
 	// we don't care about this, not part of our view.
-	if !state.view.contains(&message.relay_parent) {
+	if !state.view.contains(&relay_parent) {
 		modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
 		return;
 	}
 
 	// Ignore anything the overseer did not tell this subsystem to work on.
-	let mut job_data = state.per_relay_parent.get_mut(&message.relay_parent);
+	let mut job_data = state.per_relay_parent.get_mut(&relay_parent);
 	let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
 		job_data
 	} else {
@@ -412,17 +418,19 @@ where
 		return;
 	};
 
+	let validator_index = bitfield.unchecked_validator_index();
+
 	let mut _span = job_data.span
 			.child("msg-received")
 			.with_peer_id(&origin)
-			.with_claimed_validator_index(message.signed_availability.validator_index())
+			.with_claimed_validator_index(validator_index)
 			.with_stage(jaeger::Stage::BitfieldDistribution);
 
 	let validator_set = &job_data.validator_set;
 	if validator_set.is_empty() {
 		tracing::trace!(
 			target: LOG_TARGET,
-			relay_parent = %message.relay_parent,
+			relay_parent = %relay_parent,
 			?origin,
 			"Validator set is empty",
 		);
@@ -433,8 +441,7 @@ where
 	// Use the (untrusted) validator index provided by the signed payload
 	// and see if that one actually signed the availability bitset.
 	let signing_context = job_data.signing_context.clone();
-	let validator_index = message.signed_availability.validator_index().0 as usize;
-	let validator = if let Some(validator) = validator_set.get(validator_index) {
+	let validator = if let Some(validator) = validator_set.get(validator_index.0 as usize) {
 		validator.clone()
 	} else {
 		modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
@@ -454,7 +461,7 @@ where
 	} else {
 		tracing::trace!(
 			target: LOG_TARGET,
-			validator_index,
+			?validator_index,
 			?origin,
 			"Duplicate message",
 		);
@@ -468,22 +475,26 @@ where
 	if let Some(old_message) = one_per_validator.get(&validator) {
 		tracing::trace!(
 			target: LOG_TARGET,
-			validator_index,
+			?validator_index,
 			"already received a message for validator",
 		);
-		if old_message.signed_availability == message.signed_availability {
+		if old_message.signed_availability.as_unchecked() == &bitfield {
 			modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
 		}
 		return;
 	}
-	if message
-		.signed_availability
-		.check_signature(&signing_context, &validator)
-		.is_err()
-	{
-		modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await;
-		return;
-	}
+	let signed_availability = match bitfield.try_into_checked(&signing_context, &validator) {
+		Err(_) => {
+			modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await;
+			return;
+		},
+		Ok(bitfield) => bitfield,
+	};
+
+	let message = BitfieldGossipMessage {
+		relay_parent,
+		signed_availability,
+	};
 
 	metrics.on_bitfield_received();
 	one_per_validator.insert(validator.clone(), message.clone());
@@ -544,23 +555,8 @@ where
 			);
 			handle_our_view_change(state, view);
 		}
-		NetworkBridgeEvent::PeerMessage(remote, message) => {
-			match message {
-				protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
-					tracing::trace!(
-						target: LOG_TARGET,
-						peer_id = %remote,
-						?relay_parent,
-						"received bitfield gossip from peer"
-					);
-					let gossiped_bitfield = BitfieldGossipMessage {
-						relay_parent,
-						signed_availability: bitfield,
-					};
-					process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await;
-				}
-			}
-		}
+		NetworkBridgeEvent::PeerMessage(remote, message) =>
+			process_incoming_peer_message(ctx, state, metrics, remote, message).await,
 	}
 }
 
diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs
index 99f6344aa98..a873f623cce 100644
--- a/polkadot/node/network/collator-protocol/src/collator_side.rs
+++ b/polkadot/node/network/collator-protocol/src/collator_side.rs
@@ -14,36 +14,36 @@
 // You should have received a copy of the GNU General Public License
 // along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
 
-use std::collections::{HashMap, HashSet};
+use std::collections::{HashMap, HashSet, hash_map::Entry};
 
-use super::{LOG_TARGET,  Result};
-
-use futures::{select, FutureExt, channel::oneshot};
+use futures::{FutureExt, channel::oneshot, channel::mpsc};
 use sp_core::Pair;
 
-use polkadot_primitives::v1::{
-	CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, Hash,
-	Id as ParaId, ValidatorId
-};
+use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash, Id as ParaId};
 use polkadot_subsystem::{
-	jaeger, PerLeafSpan,
-	FromOverseer, OverseerSignal, SubsystemContext,
-	messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage, NetworkBridgeEvent},
+	FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, jaeger,
+	messages::{
+		AllMessages, CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeMessage,
+	},
 };
 use polkadot_node_network_protocol::{
 	OurView, PeerId, View, peer_set::PeerSet,
-	request_response::{IncomingRequest, v1::{CollationFetchingRequest, CollationFetchingResponse}},
-	v1 as protocol_v1, UnifiedReputationChange as Rep,
+	request_response::{
+		IncomingRequest,
+		v1::{CollationFetchingRequest, CollationFetchingResponse},
+	},
+	v1 as protocol_v1,
+	UnifiedReputationChange as Rep,
 };
 use polkadot_node_subsystem_util::{
-	validator_discovery,
-	request_validators,
-	request_validator_groups,
-	request_availability_cores,
 	metrics::{self, prometheus},
+	runtime::{RuntimeInfo, get_availability_cores, get_group_rotation_info}
 };
 use polkadot_node_primitives::{SignedFullStatement, Statement, PoV};
 
+use crate::error::{Fatal, NonFatal, log_error};
+use super::{LOG_TARGET,  Result};
+
 const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
 
 #[derive(Clone, Default)]
@@ -62,11 +62,6 @@ impl Metrics {
 		}
 	}
 
-	/// Provide a timer for handling `ConnectionRequest` which observes on drop.
-	fn time_handle_connection_request(&self) -> Option<prometheus::prometheus::HistogramTimer> {
-		self.0.as_ref().map(|metrics| metrics.handle_connection_request.start_timer())
-	}
-
 	/// Provide a timer for `process_msg` which observes on drop.
 	fn time_process_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
 		self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
@@ -77,7 +72,6 @@ impl Metrics {
 struct MetricsInner {
 	advertisements_made: prometheus::Counter<prometheus::U64>,
 	collations_sent: prometheus::Counter<prometheus::U64>,
-	handle_connection_request: prometheus::Histogram,
 	process_msg: prometheus::Histogram,
 }
 
@@ -100,15 +94,6 @@ impl metrics::Metrics for Metrics {
 				)?,
 				registry,
 			)?,
-			handle_connection_request: prometheus::register(
-				prometheus::Histogram::with_opts(
-					prometheus::HistogramOpts::new(
-						"parachain_collator_protocol_collator_handle_connection_request",
-						"Time spent within `collator_protocol_collator::handle_connection_request`",
-					)
-				)?,
-				registry,
-			)?,
 			process_msg: prometheus::register(
 				prometheus::Histogram::with_opts(
 					prometheus::HistogramOpts::new(
@@ -129,50 +114,36 @@ impl metrics::Metrics for Metrics {
 /// This structure is responsible for keeping track of which validators belong to a certain group for a para. It also
 /// stores a mapping from [`PeerId`] to [`ValidatorId`] as we learn about it over the lifetime of this object. Besides
 /// that it also keeps track to which validators we advertised our collation.
+#[derive(Debug)]
 struct ValidatorGroup {
-	/// All [`ValidatorId`]'s that are assigned to us in this group.
-	validator_ids: HashSet<ValidatorId>,
-	/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s from the
-	/// authority discovery. It is not ensured that this will contain *all* validators of this group.
-	peer_ids: HashMap<PeerId, ValidatorId>,
+	/// All [`AuthorityDiscoveryId`]'s that are assigned to us in this group.
+	discovery_ids: HashSet<AuthorityDiscoveryId>,
 	/// All [`ValidatorId`]'s of the current group to that we advertised our collation.
-	advertised_to: HashSet<ValidatorId>,
+	advertised_to: HashSet<AuthorityDiscoveryId>,
 }
 
 impl ValidatorGroup {
 	/// Returns `true` if we should advertise our collation to the given peer.
-	fn should_advertise_to(&self, peer: &PeerId) -> bool {
-		match self.peer_ids.get(peer) {
-			Some(validator_id) => !self.advertised_to.contains(validator_id),
+	fn should_advertise_to(&self, peer_ids: &HashMap<PeerId, AuthorityDiscoveryId>, peer: &PeerId)
+		-> bool {
+		match peer_ids.get(peer) {
+			Some(discovery_id) => !self.advertised_to.contains(discovery_id),
 			None => false,
 		}
 	}
 
 	/// Should be called after we advertised our collation to the given `peer` to keep track of it.
-	fn advertised_to_peer(&mut self, peer: &PeerId) {
-		if let Some(validator_id) = self.peer_ids.get(peer) {
+	fn advertised_to_peer(&mut self, peer_ids: &HashMap<PeerId, AuthorityDiscoveryId>, peer: &PeerId) {
+		if let Some(validator_id) = peer_ids.get(peer) {
 			self.advertised_to.insert(validator_id.clone());
 		}
 	}
-
-	/// Add a [`PeerId`] that belongs to the given [`ValidatorId`].
-	///
-	/// This returns `true` if the given validator belongs to this group and we could insert its [`PeerId`].
-	fn add_peer_id_for_validator(&mut self, peer_id: &PeerId, validator_id: &ValidatorId) -> bool {
-		if !self.validator_ids.contains(validator_id) {
-			false
-		} else {
-			self.peer_ids.insert(peer_id.clone(), validator_id.clone());
-			true
-		}
-	}
 }
 
-impl From<HashSet<ValidatorId>> for ValidatorGroup {
-	fn from(validator_ids: HashSet<ValidatorId>) -> Self {
+impl From<HashSet<AuthorityDiscoveryId>> for ValidatorGroup {
+	fn from(discovery_ids: HashSet<AuthorityDiscoveryId>) -> Self {
 		Self {
-			validator_ids,
-			peer_ids: HashMap::new(),
+			discovery_ids,
 			advertised_to: HashSet::new(),
 		}
 	}
@@ -243,8 +214,11 @@ struct State {
 	/// Our validator groups per active leaf.
 	our_validators_groups: HashMap<Hash, ValidatorGroup>,
 
-	/// The connection requests to validators per relay parent.
-	connection_requests: validator_discovery::ConnectionRequests,
+	/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s by `PeerConnected` events.
+	peer_ids: HashMap<PeerId, AuthorityDiscoveryId>,
+
+	/// The connection handles to validators per group we are interested in.
+	connection_handles: HashMap<GroupIndex, mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,
 
 	/// Metrics.
 	metrics: Metrics,
@@ -265,13 +239,18 @@ impl State {
 			collations: Default::default(),
 			collation_result_senders: Default::default(),
 			our_validators_groups: Default::default(),
-			connection_requests: Default::default(),
+			peer_ids: Default::default(),
+			connection_handles: Default::default(),
 		}
 	}
 
-	/// Returns `true` if the given `peer` is interested in the leaf that is represented by `relay_parent`.
-	fn peer_interested_in_leaf(&self, peer: &PeerId, relay_parent: &Hash) -> bool {
-		self.peer_views.get(peer).map(|v| v.contains(relay_parent)).unwrap_or(false)
+	/// Get all peers which have the given relay parent in their view.
+	fn peers_interested_in_leaf(&self, relay_parent: &Hash) -> Vec<PeerId> {
+		self.peer_views
+			.iter()
+			.filter(|(_, v)| v.contains(relay_parent))
+			.map(|(peer, _)| *peer)
+			.collect()
 	}
 }
 
@@ -283,9 +262,10 @@ impl State {
 /// or the relay-parent isn't in the active-leaves set, we ignore the message
 /// as it must be invalid in that case - although this indicates a logic error
 /// elsewhere in the node.
-#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
+#[tracing::instrument(level = "trace", skip(ctx, runtime, state, pov), fields(subsystem = LOG_TARGET))]
 async fn distribute_collation(
-	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
+	ctx: &mut impl SubsystemContext,
+	runtime: &mut RuntimeInfo,
 	state: &mut State,
 	id: ParaId,
 	receipt: CandidateReceipt,
@@ -327,9 +307,10 @@ async fn distribute_collation(
 	};
 
 	// Determine the group on that core and the next group on that core.
-	let (current_validators, next_validators) = determine_our_validators(ctx, our_core, num_cores, relay_parent).await?;
+	let (current_validators, next_validators) =
+		determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent,).await?;
 
-	if current_validators.is_empty() && next_validators.is_empty() {
+	if current_validators.validators.is_empty() && next_validators.validators.is_empty() {
 		tracing::warn!(
 			target: LOG_TARGET,
 			core = ?our_core,
@@ -350,16 +331,26 @@ async fn distribute_collation(
 		?next_validators,
 		"Accepted collation, connecting to validators."
 	);
-	// Issue a discovery request for the validators of the current group and the next group.
+
+	// Drop obsolete connections:
+	let new_groups: HashSet<_> = vec![current_validators.group, next_validators.group].into_iter().collect();
+	state.connection_handles.retain(|k, _| new_groups.contains(k));
+
+	let validator_group: HashSet<_> = current_validators.validators.iter().map(Clone::clone).collect();
+
+	// Issue a discovery request for the validators of the current group and the next group:
+	connect_to_validators(
+		ctx,
+		state,
+		current_validators,
+	).await;
 	connect_to_validators(
 		ctx,
-		relay_parent,
-		id,
 		state,
-		current_validators.union(&next_validators).cloned().collect(),
-	).await?;
+		next_validators,
+	).await;
 
-	state.our_validators_groups.insert(relay_parent, current_validators.into());
+	state.our_validators_groups.insert(relay_parent, validator_group.into());
 
 	if let Some(result_sender) = result_sender {
 		state.collation_result_senders.insert(receipt.hash(), result_sender);
@@ -367,6 +358,11 @@ async fn distribute_collation(
 
 	state.collations.insert(relay_parent, Collation { receipt, pov, status: CollationStatus::Created });
 
+	// Make sure already connected peers get collations:
+	for peer_id in state.peers_interested_in_leaf(&relay_parent) {
+		advertise_collation(ctx, state, relay_parent, peer_id).await;
+	}
+
 	Ok(())
 }
 
@@ -374,11 +370,11 @@ async fn distribute_collation(
 /// and the total number of cores.
 #[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
 async fn determine_core(
-	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
+	ctx: &mut impl SubsystemContext,
 	para_id: ParaId,
 	relay_parent: Hash,
 ) -> Result<Option<(CoreIndex, usize)>> {
-	let cores = request_availability_cores(relay_parent, ctx.sender()).await.await??;
+	let cores = get_availability_cores(ctx, relay_parent).await?;
 
 	for (idx, core) in cores.iter().enumerate() {
 		if let CoreState::Scheduled(occupied) = core {
@@ -387,35 +383,54 @@ async fn determine_core(
 			}
 		}
 	}
-
 	Ok(None)
 }
 
+/// Validators of a particular group index.
+#[derive(Debug)]
+struct GroupValidators {
+	/// The group those validators belong to.
+	group: GroupIndex,
+	/// The validators of above group (their discovery keys).
+	validators: Vec<AuthorityDiscoveryId>,
+}
+
 /// Figure out current and next group of validators assigned to the para being collated on.
 ///
 /// Returns [`ValidatorId`]'s of current and next group as determined based on the `relay_parent`.
-#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
+#[tracing::instrument(level = "trace", skip(ctx, runtime), fields(subsystem = LOG_TARGET))]
 async fn determine_our_validators(
-	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
+	ctx: &mut impl SubsystemContext,
+	runtime: &mut RuntimeInfo,
 	core_index: CoreIndex,
 	cores: usize,
 	relay_parent: Hash,
-) -> Result<(HashSet<ValidatorId>, HashSet<ValidatorId>)> {
-	let groups = request_validator_groups(relay_parent, ctx.sender()).await;
+) -> Result<(GroupValidators, GroupValidators)> {
+	let info = &runtime.get_session_info(ctx, relay_parent).await?.session_info;
+	tracing::debug!(target: LOG_TARGET, ?info, "Received info");
+	let groups = &info.validator_groups;
+	let rotation_info = get_group_rotation_info(ctx, relay_parent).await?;
 
-	let groups = groups.await??;
+	let current_group_index = rotation_info.group_for_core(core_index, cores);
+	let current_validators = groups.get(current_group_index.0 as usize).map(|v| v.as_slice()).unwrap_or_default();
 
-	let current_group_index = groups.1.group_for_core(core_index, cores);
-	let current_validators = groups.0.get(current_group_index.0 as usize).map(|v| v.as_slice()).unwrap_or_default();
+	let next_group_idx = (current_group_index.0 as usize + 1) % groups.len();
+	let next_validators = groups.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default();
 
-	let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len();
-	let next_validators = groups.0.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default();
-
-	let validators = request_validators(relay_parent, ctx.sender()).await.await??;
+	let validators = &info.discovery_keys;
 
 	let current_validators = current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
 	let next_validators = next_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
 
+	let current_validators = GroupValidators {
+		group: current_group_index,
+		validators: current_validators,
+	};
+	let next_validators = GroupValidators {
+		group: GroupIndex(next_group_idx as u32),
+		validators: next_validators,
+	};
+
 	Ok((current_validators, next_validators))
 }
 
@@ -448,22 +463,20 @@ async fn declare(
 /// revoke the previous connection request.
 #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
 async fn connect_to_validators(
-	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
-	relay_parent: Hash,
-	para_id: ParaId,
+	ctx: &mut impl SubsystemContext,
 	state: &mut State,
-	validators: Vec<ValidatorId>,
-) -> Result<()> {
-	let request = validator_discovery::connect_to_validators(
-		ctx,
-		relay_parent,
-		validators,
-		PeerSet::Collation,
-	).await?;
-
-	state.connection_requests.put(relay_parent, para_id, request);
-
-	Ok(())
+	group: GroupValidators,
+)  {
+	match state.connection_handles.entry(group.group) {
+		Entry::Occupied(_) => {}
+		Entry::Vacant(vacant) => {
+			let (tx, rx) = mpsc::channel(0);
+			ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
+				validator_ids: group.validators, peer_set: PeerSet::Collation, connected: tx
+			})).await;
+			vacant.insert(rx);
+		}
+	}
 }
 
 /// Advertise collation to the given `peer`.
@@ -472,14 +485,14 @@ async fn connect_to_validators(
 /// set as validator for our para at the given `relay_parent`.
 #[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
 async fn advertise_collation(
-	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
+	ctx: &mut impl SubsystemContext,
 	state: &mut State,
 	relay_parent: Hash,
 	peer: PeerId,
 ) {
 	let should_advertise = state.our_validators_groups
 		.get(&relay_parent)
-		.map(|g| g.should_advertise_to(&peer))
+		.map(|g| g.should_advertise_to(&state.peer_ids, &peer))
 		.unwrap_or(false);
 
 	match (state.collations.get_mut(&relay_parent), should_advertise) {
@@ -524,16 +537,17 @@ async fn advertise_collation(
 	)).await;
 
 	if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
-		validators.advertised_to_peer(&peer);
+		validators.advertised_to_peer(&state.peer_ids, &peer);
 	}
 
 	state.metrics.on_advertisment_made();
 }
 
 /// The main incoming message dispatching switch.
-#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
+#[tracing::instrument(level = "trace", skip(ctx, runtime, state), fields(subsystem = LOG_TARGET))]
 async fn process_msg(
 	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
+	runtime: &mut RuntimeInfo,
 	state: &mut State,
 	msg: CollatorProtocolMessage,
 ) -> Result<()> {
@@ -561,7 +575,7 @@ async fn process_msg(
 					);
 				}
 				Some(id) => {
-					distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
+					distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender).await?;
 				}
 				None => {
 					tracing::warn!(
@@ -590,7 +604,7 @@ async fn process_msg(
 				"NoteGoodCollation message is not expected on the collator side of the protocol",
 			);
 		}
-		NotifyCollationSeconded(_, _) => {
+		NotifyCollationSeconded(_, _, _) => {
 			tracing::warn!(
 				target: LOG_TARGET,
 				"NotifyCollationSeconded message is not expected on the collator side of the protocol",
@@ -599,6 +613,7 @@ async fn process_msg(
 		NetworkBridgeUpdateV1(event) => {
 			if let Err(e) = handle_network_msg(
 				ctx,
+				runtime,
 				state,
 				event,
 			).await {
@@ -670,9 +685,10 @@ async fn send_collation(
 }
 
 /// A networking messages switch.
-#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
+#[tracing::instrument(level = "trace", skip(ctx, runtime, state), fields(subsystem = LOG_TARGET))]
 async fn handle_incoming_peer_message(
 	ctx: &mut impl SubsystemContext,
+	runtime: &mut RuntimeInfo,
 	state: &mut State,
 	origin: PeerId,
 	msg: protocol_v1::CollatorProtocolMessage,
@@ -708,22 +724,31 @@ async fn handle_incoming_peer_message(
 				NetworkBridgeMessage::DisconnectPeer(origin, PeerSet::Collation).into()
 			).await;
 		}
-		CollationSeconded(statement) => {
-			if !matches!(statement.payload(), Statement::Seconded(_)) {
+		CollationSeconded(relay_parent, statement) => {
+			if !matches!(statement.unchecked_payload(), Statement::Seconded(_)) {
 				tracing::warn!(
 					target: LOG_TARGET,
 					?statement,
 					?origin,
 					"Collation seconded message received with none-seconded statement.",
 				);
-			} else if let Some(sender) = state.collation_result_senders.remove(&statement.payload().candidate_hash()) {
-				tracing::trace!(
-					target: LOG_TARGET,
-					?statement,
-					?origin,
-					"received a `CollationSeconded`",
-				);
-				let _ = sender.send(statement);
+			} else {
+				let statement = runtime.check_signature(ctx, relay_parent, statement)
+					.await?
+					.map_err(NonFatal::InvalidStatementSignature)?;
+
+				let removed = state.collation_result_senders
+					.remove(&statement.payload().candidate_hash());
+
+				if let Some(sender) = removed {
+					tracing::trace!(
+						target: LOG_TARGET,
+						?statement,
+						?origin,
+						"received a `CollationSeconded`",
+					);
+					let _ = sender.send(statement);
+				}
 			}
 		}
 	}
@@ -750,48 +775,18 @@ async fn handle_peer_view_change(
 	}
 }
 
-/// A validator is connected.
-///
-/// `Declare` that we are a collator with a given `CollatorId`.
-#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
-async fn handle_validator_connected(
-	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
-	state: &mut State,
-	peer_id: PeerId,
-	validator_id: ValidatorId,
-	relay_parent: Hash,
-) {
-	tracing::trace!(
-		target: LOG_TARGET,
-		?validator_id,
-		"Connected to requested validator"
-	);
-
-	// Store the PeerId and find out if we should advertise to this peer.
-	//
-	// If this peer does not belong to the para validators, we also don't need to try to advertise our collation.
-	let advertise = if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
-		validators.add_peer_id_for_validator(&peer_id, &validator_id)
-	} else {
-		false
-	};
-
-	if advertise && state.peer_interested_in_leaf(&peer_id, &relay_parent) {
-		advertise_collation(ctx, state, relay_parent, peer_id).await;
-	}
-}
-
 /// Bridge messages switch.
-#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
+#[tracing::instrument(level = "trace", skip(ctx, runtime, state), fields(subsystem = LOG_TARGET))]
 async fn handle_network_msg(
 	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
+	runtime: &mut RuntimeInfo,
 	state: &mut State,
 	bridge_message: NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>,
 ) -> Result<()> {
 	use NetworkBridgeEvent::*;
 
 	match bridge_message {
-		PeerConnected(peer_id, observed_role, _) => {
+		PeerConnected(peer_id, observed_role, maybe_authority) => {
 			// If it is possible that a disconnected validator would attempt a reconnect
 			// it should be handled here.
 			tracing::trace!(
@@ -800,9 +795,17 @@ async fn handle_network_msg(
 				?observed_role,
 				"Peer connected",
 			);
+			if let Some(authority) = maybe_authority {
+				tracing::trace!(
+					target: LOG_TARGET,
+					?authority,
+					?peer_id,
+					"Connected to requested validator"
+				);
+				state.peer_ids.insert(peer_id, authority);
 
-			// Always declare to every peer. We should be connecting only to validators.
-			declare(ctx, state, peer_id.clone()).await;
+				declare(ctx, state, peer_id).await;
+			}
 		}
 		PeerViewChange(peer_id, view) => {
 			tracing::trace!(
@@ -820,6 +823,7 @@ async fn handle_network_msg(
 				"Peer disconnected",
 			);
 			state.peer_views.remove(&peer_id);
+			state.peer_ids.remove(&peer_id);
 		}
 		OurViewChange(view) => {
 			tracing::trace!(
@@ -830,7 +834,7 @@ async fn handle_network_msg(
 			handle_our_view_change(state, view).await?;
 		}
 		PeerMessage(remote, msg) => {
-			handle_incoming_peer_message(ctx, state, remote, msg).await?;
+			handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
 		}
 	}
 
@@ -871,7 +875,6 @@ async fn handle_our_view_change(
 			}
 		}
 		state.our_validators_groups.remove(removed);
-		state.connection_requests.remove_all(removed);
 		state.span_per_relay_parent.remove(removed);
 	}
 
@@ -892,30 +895,20 @@ pub(crate) async fn run(
 	use OverseerSignal::*;
 
 	let mut state = State::new(local_peer_id, collator_pair, metrics);
+	let mut runtime = RuntimeInfo::new(None);
 
 	loop {
-		select! {
-			res = state.connection_requests.next().fuse() => {
-				let _timer = state.metrics.time_handle_connection_request();
-
-				handle_validator_connected(
-					&mut ctx,
-					&mut state,
-					res.peer_id,
-					res.validator_id,
-					res.relay_parent,
-				).await;
+		let msg = ctx.recv().fuse().await.map_err(Fatal::SubsystemReceive)?;
+		match msg {
+			Communication { msg } => {
+				log_error(
+					process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
+					"Failed to process message"
+				)?;
 			},
-			msg = ctx.recv().fuse() => match msg? {
-				Communication { msg } => {
-					if let Err(e) = process_msg(&mut ctx, &mut state, msg).await {
-						tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to process message");
-					}
-				},
-				Signal(ActiveLeaves(_update)) => {}
-				Signal(BlockFinalized(..)) => {}
-				Signal(Conclude) => return Ok(()),
-			}
+			Signal(ActiveLeaves(_update)) => {}
+			Signal(BlockFinalized(..)) => {}
+			Signal(Conclude) => return Ok(()),
 		}
 	}
 }
@@ -939,10 +932,7 @@ mod tests {
 		request_response::request::IncomingRequest,
 	};
 	use polkadot_node_subsystem_util::TimeoutExt;
-	use polkadot_primitives::v1::{
-		AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo,
-		ScheduledCore, SessionIndex, SessionInfo, ValidatorIndex,
-	};
+	use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
 	use polkadot_node_primitives::BlockData;
 	use polkadot_subsystem::{
 		jaeger,
@@ -977,10 +967,9 @@ mod tests {
 	struct TestState {
 		para_id: ParaId,
 		validators: Vec<Sr25519Keyring>,
-		validator_public: Vec<ValidatorId>,
-		validator_authority_id: Vec<AuthorityDiscoveryId>,
+		session_info: SessionInfo,
+		group_rotation_info: GroupRotationInfo,
 		validator_peer_id: Vec<PeerId>,
-		validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
 		relay_parent: Hash,
 		availability_core: CoreState,
 		local_peer_id: PeerId,
@@ -1009,10 +998,10 @@ mod tests {
 			];
 
 			let validator_public = validator_pubkeys(&validators);
-			let validator_authority_id = validator_authority_id(&validators);
+			let discovery_keys = validator_authority_id(&validators);
 
 			let validator_peer_id = std::iter::repeat_with(|| PeerId::random())
-				.take(validator_public.len())
+				.take(discovery_keys.len())
 				.collect();
 
 			let validator_groups = vec![vec![2, 0, 4], vec![3, 2, 4]]
@@ -1022,7 +1011,6 @@ mod tests {
 				group_rotation_frequency: 100,
 				now: 1,
 			};
-			let validator_groups = (validator_groups, group_rotation_info);
 
 			let availability_core = CoreState::Scheduled(ScheduledCore {
 				para_id,
@@ -1037,10 +1025,14 @@ mod tests {
 			Self {
 				para_id,
 				validators,
-				validator_public,
-				validator_authority_id,
+				session_info: SessionInfo {
+					validators: validator_public,
+					discovery_keys, 
+					validator_groups,
+					..Default::default()
+				},
+				group_rotation_info,
 				validator_peer_id,
-				validator_groups,
 				relay_parent,
 				availability_core,
 				local_peer_id,
@@ -1052,7 +1044,7 @@ mod tests {
 
 	impl TestState {
 		fn current_group_validator_indices(&self) -> &[ValidatorIndex] {
-			&self.validator_groups.0[0]
+			&self.session_info.validator_groups[0]
 		}
 
 		fn current_session_index(&self) -> SessionIndex {
@@ -1066,25 +1058,7 @@ mod tests {
 		fn current_group_validator_authority_ids(&self) -> Vec<AuthorityDiscoveryId> {
 			self.current_group_validator_indices()
 				.iter()
-				.map(|i| self.validator_authority_id[i.0 as usize].clone())
-				.collect()
-		}
-
-		fn current_group_validator_ids(&self) -> Vec<ValidatorId> {
-			self.current_group_validator_indices()
-				.iter()
-				.map(|i| self.validator_public[i.0 as usize].clone())
-				.collect()
-		}
-
-		fn next_group_validator_indices(&self) -> &[ValidatorIndex] {
-			&self.validator_groups.0[1]
-		}
-
-		fn next_group_validator_authority_ids(&self) -> Vec<AuthorityDiscoveryId> {
-			self.next_group_validator_indices()
-				.iter()
-				.map(|i| self.validator_authority_id[i.0 as usize].clone())
+				.map(|i| self.session_info.discovery_keys[i.0 as usize].clone())
 				.collect()
 		}
 
@@ -1218,7 +1192,7 @@ mod tests {
 	/// Result of [`distribute_collation`]
 	struct DistributeCollation {
 		/// Should be used to inform the subsystem about connected validators.
-		connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
+		connected: Vec<mpsc::Sender<(AuthorityDiscoveryId, PeerId)>>,
 		candidate: CandidateReceipt,
 		pov_block: PoV,
 	}
@@ -1227,6 +1201,8 @@ mod tests {
 	async fn distribute_collation(
 		virtual_overseer: &mut VirtualOverseer,
 		test_state: &TestState,
+		// whether or not the currently active validators are already connected or not.
+		already_connected: bool,
 	) -> DistributeCollation {
 		// Now we want to distribute a PoVBlock
 		let pov_block = PoV {
@@ -1259,91 +1235,88 @@ mod tests {
 			}
 		);
 
-		// Obtain the validator groups
-		assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::ValidatorGroups(tx)
-			)) => {
-				assert_eq!(relay_parent, test_state.relay_parent);
-				tx.send(Ok(test_state.validator_groups.clone())).unwrap();
-			}
-		);
-
-		// obtain the validators per relay parent
-		assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::Validators(tx),
-			)) => {
-				assert_eq!(relay_parent, test_state.relay_parent);
-				tx.send(Ok(test_state.validator_public.clone())).unwrap();
-			}
-		);
-
-		// obtain the validator_id to authority_id mapping
-		assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::SessionIndexForChild(tx),
-			)) => {
-				assert_eq!(relay_parent, test_state.relay_parent);
-				tx.send(Ok(test_state.current_session_index())).unwrap();
-			}
-		);
-
-		assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::RuntimeApi(RuntimeApiMessage::Request(
-				relay_parent,
-				RuntimeApiRequest::SessionInfo(index, tx),
-			)) => {
-				assert_eq!(relay_parent, test_state.relay_parent);
-				assert_eq!(index, test_state.current_session_index());
-
-				let validators = test_state.current_group_validator_ids();
-				let current_discovery_keys = test_state.current_group_validator_authority_ids();
-				let next_discovery_keys = test_state.next_group_validator_authority_ids();
-
-				let discovery_keys = [&current_discovery_keys[..], &next_discovery_keys[..]].concat();
+		// We don't know precisely what is going to come as session info might be cached:
+		loop {
+			match overseer_recv(virtual_overseer).await {
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionIndexForChild(tx),
+				)) => {
+					assert_eq!(relay_parent, test_state.relay_parent);
+					tx.send(Ok(test_state.current_session_index())).unwrap();
+				}
 
-				tx.send(Ok(Some(SessionInfo {
-					validators,
-					discovery_keys,
-					..Default::default()
-				}))).unwrap();
-			}
-		);
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::SessionInfo(index, tx),
+				)) => {
+					assert_eq!(relay_parent, test_state.relay_parent);
+					assert_eq!(index, test_state.current_session_index());
 
-		assert_matches!(
-			overseer_recv(virtual_overseer).await,
-			AllMessages::NetworkBridge(
-				NetworkBridgeMessage::ConnectToValidators {
-					connected,
-					..
+					tx.send(Ok(Some(test_state.session_info.clone()))).unwrap();
 				}
-			) => {
-				DistributeCollation {
-					connected,
-					candidate,
-					pov_block,
+
+				AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+					relay_parent,
+					RuntimeApiRequest::ValidatorGroups(tx)
+				)) => {
+					assert_eq!(relay_parent, test_state.relay_parent);
+					tx.send(Ok((
+						test_state.session_info.validator_groups.clone(),
+						test_state.group_rotation_info.clone(),
+					))).unwrap();
+					// This call is mandatory - we are done:
+					break;
 				}
+				other =>
+					panic!("Unexpected message received: {:?}", other),
 			}
-		)
+		}
+
+		let connected = if already_connected {
+			Vec::new()
+		} else {
+			let connected_current = assert_matches!(
+				overseer_recv(virtual_overseer).await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::ConnectToValidators {
+						connected,
+						..
+					}
+				) => { connected }
+			);
+			let connected_next = assert_matches!(
+				overseer_recv(virtual_overseer).await,
+				AllMessages::NetworkBridge(
+					NetworkBridgeMessage::ConnectToValidators {
+						connected,
+						..
+					}
+				) => { connected }
+			);
+			vec![connected_current, connected_next]
+		};
+
+		DistributeCollation {
+			connected, 
+			candidate,
+			pov_block,
+		}
 	}
 
 	/// Connect a peer
-	async fn connect_peer(virtual_overseer: &mut VirtualOverseer, peer: PeerId) {
+	async fn connect_peer(
+		virtual_overseer: &mut VirtualOverseer,
+		peer: PeerId,
+		authority_id: Option<AuthorityDiscoveryId>
+	) {
 		overseer_send(
 			virtual_overseer,
 			CollatorProtocolMessage::NetworkBridgeUpdateV1(
 				NetworkBridgeEvent::PeerConnected(
 					peer.clone(),
 					polkadot_node_network_protocol::ObservedRole::Authority,
-					None,
+					authority_id,
 				),
 			),
 		).await;
@@ -1446,15 +1419,14 @@ mod tests {
 
 			setup_system(&mut virtual_overseer, &test_state).await;
 
-			let DistributeCollation { mut connected, candidate, pov_block } =
-				distribute_collation(&mut virtual_overseer, &test_state).await;
+			let DistributeCollation { connected: _connected, candidate, pov_block } =
+				distribute_collation(&mut virtual_overseer, &test_state, false).await;
 
 			for (val, peer) in test_state.current_group_validator_authority_ids()
 				.into_iter()
 				.zip(test_state.current_group_validator_peer_ids())
 			{
-				connect_peer(&mut virtual_overseer, peer.clone()).await;
-				connected.try_send((val, peer)).unwrap();
+				connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
 			}
 
 			// We declare to the connected validators that we are a collator.
@@ -1531,15 +1503,8 @@ mod tests {
 
 			assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
 
-			let DistributeCollation { mut connected, .. } =
-				distribute_collation(&mut virtual_overseer, &test_state).await;
-
-			for (val, peer) in test_state.current_group_validator_authority_ids()
-				.into_iter()
-				.zip(test_state.current_group_validator_peer_ids())
-			{
-				connected.try_send((val, peer)).unwrap();
-			}
+			let DistributeCollation { connected: _connected, .. } =
+				distribute_collation(&mut virtual_overseer, &test_state, true).await;
 
 			// Send info about peer's view.
 			overseer_send(
@@ -1567,11 +1532,12 @@ mod tests {
 			let mut virtual_overseer = test_harness.virtual_overseer;
 
 			let peer = test_state.validator_peer_id[0].clone();
+			let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
 
 			setup_system(&mut virtual_overseer, &test_state).await;
 
 			// A validator connected to us
-			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 			virtual_overseer
 		})
@@ -1595,10 +1561,10 @@ mod tests {
 			setup_system(&mut virtual_overseer, &test_state).await;
 
 			// A validator connected to us
-			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
 
 			// Connect the second validator
-			connect_peer(&mut virtual_overseer, peer2.clone()).await;
+			connect_peer(&mut virtual_overseer, peer2.clone(), Some(validator_id2)).await;
 
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
@@ -1606,9 +1572,7 @@ mod tests {
 			// And let it tell us that it is has the same view.
 			send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
 
-			let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
-			connected.try_send((validator_id, peer.clone())).unwrap();
-			connected.try_send((validator_id2, peer2.clone())).unwrap();
+			let _connected = distribute_collation(&mut virtual_overseer, &test_state, false).await.connected;
 
 			expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
 
@@ -1639,26 +1603,22 @@ mod tests {
 			setup_system(&mut virtual_overseer, &test_state).await;
 
 			// A validator connected to us
-			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
 
 			// Connect the second validator
-			connect_peer(&mut virtual_overseer, peer2.clone()).await;
+			connect_peer(&mut virtual_overseer, peer2.clone(), Some(validator_id2)).await;
 
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
 
-			let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
-			connected.try_send((validator_id.clone(), peer.clone())).unwrap();
-			connected.try_send((validator_id2.clone(), peer2.clone())).unwrap();
+			let _connected = distribute_collation(&mut virtual_overseer, &test_state, false).await.connected;
 
 			let old_relay_parent = test_state.relay_parent;
 
 			// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
 			test_state.advance_to_new_round(&mut virtual_overseer, true).await;
 
-			let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
-			connected.try_send((validator_id, peer.clone())).unwrap();
-			connected.try_send((validator_id2, peer2.clone())).unwrap();
+			let _connected = distribute_collation(&mut virtual_overseer, &test_state, true).await.connected;
 
 			send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await;
 			expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await;
@@ -1685,18 +1645,17 @@ mod tests {
 			setup_system(&mut virtual_overseer, &test_state).await;
 
 			// A validator connected to us
-			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id.clone())).await;
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 
-			let mut connected = distribute_collation(&mut virtual_overseer, &test_state).await.connected;
-			connected.try_send((validator_id.clone(), peer.clone())).unwrap();
+			let _ = distribute_collation(&mut virtual_overseer, &test_state, false).await.connected;
 
 			send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
 			expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
 
 			// Disconnect and reconnect directly
 			disconnect_peer(&mut virtual_overseer, peer.clone()).await;
-			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 
 			send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
@@ -1717,11 +1676,12 @@ mod tests {
 			let mut virtual_overseer = test_harness.virtual_overseer;
 
 			let peer = test_state.current_group_validator_peer_ids()[0].clone();
+			let validator_id = test_state.current_group_validator_authority_ids()[0].clone();
 
 			setup_system(&mut virtual_overseer, &test_state).await;
 
 			// A validator connected to us
-			connect_peer(&mut virtual_overseer, peer.clone()).await;
+			connect_peer(&mut virtual_overseer, peer.clone(), Some(validator_id)).await;
 			expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
 
 			overseer_send(
diff --git a/polkadot/node/network/collator-protocol/src/error.rs b/polkadot/node/network/collator-protocol/src/error.rs
new file mode 100644
index 00000000000..37f8df0731b
--- /dev/null
+++ b/polkadot/node/network/collator-protocol/src/error.rs
@@ -0,0 +1,92 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+//
+
+//! Error handling related code and Error/Result definitions.
+
+use polkadot_node_primitives::UncheckedSignedFullStatement;
+use polkadot_subsystem::SubsystemError;
+use thiserror::Error;
+
+use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal};
+
+use crate::LOG_TARGET;
+
+/// General result.
+pub type Result<T> = std::result::Result<T, Error>;
+
+/// Result for fatal only failures.
+pub type FatalResult<T> = std::result::Result<T, Fatal>;
+
+/// Errors for statement distribution.
+#[derive(Debug, Error)]
+#[error(transparent)]
+pub struct Error(pub Fault<NonFatal, Fatal>);
+
+impl From<NonFatal> for Error {
+	fn from(e: NonFatal) -> Self {
+		Self(Fault::from_non_fatal(e))
+	}
+}
+
+impl From<Fatal> for Error {
+	fn from(f: Fatal) -> Self {
+		Self(Fault::from_fatal(f))
+	}
+}
+
+impl From<runtime::Error> for Error {
+	fn from(o: runtime::Error) -> Self {
+		Self(Fault::from_other(o))
+	}
+}
+
+/// Fatal runtime errors.
+#[derive(Debug, Error)]
+pub enum Fatal {
+	/// Receiving subsystem message from overseer failed.
+	#[error("Receiving message from overseer failed")]
+	SubsystemReceive(#[source] SubsystemError),
+
+	/// Errors coming from runtime::Runtime.
+	#[error("Error while accessing runtime information")]
+	Runtime(#[from] #[source] runtime::Fatal),
+}
+
+/// Errors for fetching of runtime information.
+#[derive(Debug, Error)]
+pub enum NonFatal {
+	/// Signature was invalid on received statement.
+	#[error("CollationSeconded contained statement with invalid signature.")]
+	InvalidStatementSignature(UncheckedSignedFullStatement),
+
+	/// Errors coming from runtime::Runtime.
+	#[error("Error while accessing runtime information")]
+	Runtime(#[from] #[source] runtime::NonFatal),
+}
+
+/// Utility for eating top level errors and log them.
+///
+/// We basically always want to try and continue on error. This utility function is meant to
+/// consume top-level errors by simply logging them.
+pub fn log_error(result: Result<()>, ctx: &'static str)
+	-> FatalResult<()>
+{
+	if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
+		tracing::warn!(target: LOG_TARGET, error = ?error, ctx)
+	}
+	Ok(())
+}
diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs
index 2f85f4c0c9e..24ae2407d12 100644
--- a/polkadot/node/network/collator-protocol/src/lib.rs
+++ b/polkadot/node/network/collator-protocol/src/lib.rs
@@ -22,41 +22,25 @@
 
 use std::time::Duration;
 
-use futures::{channel::oneshot, FutureExt, TryFutureExt};
-use thiserror::Error;
+use futures::{FutureExt, TryFutureExt};
 
 use sp_keystore::SyncCryptoStorePtr;
 
 use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange as Rep};
-use polkadot_node_subsystem_util::{self as util, metrics::prometheus};
 use polkadot_primitives::v1::CollatorPair;
 use polkadot_subsystem::{
-	errors::RuntimeApiError,
 	messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage},
 	SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError,
 };
 
+mod error;
+use error::Result;
+
 mod collator_side;
 mod validator_side;
 
 const LOG_TARGET: &'static str = "parachain::collator-protocol";
 
-#[derive(Debug, Error)]
-enum Error {
-	#[error(transparent)]
-	Subsystem(#[from] SubsystemError),
-	#[error(transparent)]
-	Oneshot(#[from] oneshot::Canceled),
-	#[error(transparent)]
-	RuntimeApi(#[from] RuntimeApiError),
-	#[error(transparent)]
-	UtilError(#[from] util::Error),
-	#[error(transparent)]
-	Prometheus(#[from] prometheus::PrometheusError),
-}
-
-type Result<T> = std::result::Result<T, Error>;
-
 /// A collator eviction policy - how fast to evict collators which are inactive.
 #[derive(Debug, Clone, Copy)]
 pub struct CollatorEvictionPolicy {
@@ -124,9 +108,7 @@ impl CollatorProtocolSubsystem {
 				collator_pair,
 				metrics,
 			).await,
-		}.map_err(|e| {
-			SubsystemError::with_origin("collator-protocol", e).into()
-		})
+		}
 	}
 }
 
diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs
index ae5ee6fd12c..2a161c65d83 100644
--- a/polkadot/node/network/collator-protocol/src/validator_side.rs
+++ b/polkadot/node/network/collator-protocol/src/validator_side.rs
@@ -47,6 +47,8 @@ use polkadot_subsystem::{
 	FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, SubsystemSender,
 };
 
+use crate::error::Fatal;
+
 use super::{modify_reputation, Result, LOG_TARGET};
 
 const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
@@ -540,6 +542,7 @@ async fn notify_collation_seconded(
 	ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
 	peer_data: &HashMap<PeerId, PeerData>,
 	id: CollatorId,
+	relay_parent: Hash,
 	statement: SignedFullStatement,
 ) {
 	if !matches!(statement.payload(), Statement::Seconded(_)) {
@@ -552,7 +555,7 @@ async fn notify_collation_seconded(
 	}
 
 	if let Some(peer_id) = collator_peer_id(peer_data, &id) {
-		let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(statement);
+		let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement.into());
 
 		ctx.send_message(AllMessages::NetworkBridge(
 			NetworkBridgeMessage::SendCollationMessage(
@@ -782,7 +785,7 @@ where
 				}
 			}
 		}
-		CollationSeconded(_) => {
+		CollationSeconded(_, _) => {
 			tracing::warn!(
 				target: LOG_TARGET,
 				peer_id = ?origin,
@@ -934,8 +937,8 @@ where
 		NoteGoodCollation(id) => {
 			note_good_collation(ctx, &state.peer_data, id).await;
 		}
-		NotifyCollationSeconded(id, statement) => {
-			notify_collation_seconded(ctx, &state.peer_data, id, statement).await;
+		NotifyCollationSeconded(id, relay_parent, statement) => {
+			notify_collation_seconded(ctx, &state.peer_data, id, relay_parent, statement).await;
 		}
 		NetworkBridgeUpdateV1(event) => {
 			if let Err(e) = handle_network_msg(
@@ -1003,7 +1006,7 @@ pub(crate) async fn run<Context>(
 
 			if let Poll::Ready(res) = futures::poll!(s) {
 				Some(match res {
-					Either::Left((msg, _)) => Either::Left(msg?),
+					Either::Left((msg, _)) => Either::Left(msg.map_err(Fatal::SubsystemReceive)?),
 					Either::Right((_, _)) => Either::Right(()),
 				})
 			} else {
diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs
index 33cb2034adf..5f19a4af899 100644
--- a/polkadot/node/network/protocol/src/lib.rs
+++ b/polkadot/node/network/protocol/src/lib.rs
@@ -291,18 +291,23 @@ pub mod v1 {
 	use parity_scale_codec::{Encode, Decode};
 	use std::convert::TryFrom;
 
-	use polkadot_primitives::v1::{CandidateHash, CandidateIndex, CollatorId, CollatorSignature, CompactStatement, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex, ValidatorSignature};
+	use polkadot_primitives::v1::{
+		CandidateHash, CandidateIndex, CollatorId, CollatorSignature,
+		CompactStatement, Hash, Id as ParaId, UncheckedSignedAvailabilityBitfield,
+		ValidatorIndex, ValidatorSignature
+	};
 	use polkadot_node_primitives::{
 		approval::{IndirectAssignmentCert, IndirectSignedApprovalVote},
-		SignedFullStatement,
+		UncheckedSignedFullStatement,
 	};
 
+	
 	/// Network messages used by the bitfield distribution subsystem.
 	#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)]
 	pub enum BitfieldDistributionMessage {
 		/// A signed availability bitfield for a given relay-parent hash.
 		#[codec(index = 0)]
-		Bitfield(Hash, SignedAvailabilityBitfield),
+		Bitfield(Hash, UncheckedSignedAvailabilityBitfield),
 	}
 
 	/// Network messages used by the statement distribution subsystem.
@@ -310,7 +315,7 @@ pub mod v1 {
 	pub enum StatementDistributionMessage {
 		/// A signed full statement under a given relay-parent.
 		#[codec(index = 0)]
-		Statement(Hash, SignedFullStatement),
+		Statement(Hash, UncheckedSignedFullStatement),
 		/// Seconded statement with large payload (e.g. containing a runtime upgrade).
 		///
 		/// We only gossip the hash in that case, actual payloads can be fetched from sending node
@@ -338,9 +343,9 @@ pub mod v1 {
 			match self {
 				Self::Statement(relay_parent, statement) => StatementMetadata {
 					relay_parent: *relay_parent,
-					candidate_hash: statement.payload().candidate_hash(),
-					signed_by: statement.validator_index(),
-					signature: statement.signature().clone(),
+					candidate_hash: statement.unchecked_payload().candidate_hash(),
+					signed_by: statement.unchecked_validator_index(),
+					signature: statement.unchecked_signature().clone(),
 				},
 				Self::LargeStatement(metadata) => metadata.clone(),
 			}
@@ -350,7 +355,7 @@ pub mod v1 {
 		pub fn get_fingerprint(&self) -> (CompactStatement, ValidatorIndex) {
 			match self {
 				Self::Statement(_, statement) =>
-					(statement.payload().to_compact(), statement.validator_index()),
+					(statement.unchecked_payload().to_compact(), statement.unchecked_validator_index()),
 				Self::LargeStatement(meta) =>
 					(CompactStatement::Seconded(meta.candidate_hash), meta.signed_by),
 			}
@@ -400,7 +405,7 @@ pub mod v1 {
 		AdvertiseCollation(Hash),
 		/// A collation sent to a validator was seconded.
 		#[codec(index = 4)]
-		CollationSeconded(SignedFullStatement),
+		CollationSeconded(Hash, UncheckedSignedFullStatement),
 	}
 
 	/// All network messages on the validation peer-set.
diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs
index b7ddb74dff3..3af3d16c001 100644
--- a/polkadot/node/network/statement-distribution/src/lib.rs
+++ b/polkadot/node/network/statement-distribution/src/lib.rs
@@ -37,7 +37,7 @@ use polkadot_node_subsystem_util::{
 	metrics::{self, prometheus},
 	self as util, MIN_GOSSIP_PEERS,
 };
-use polkadot_node_primitives::{SignedFullStatement, Statement};
+use polkadot_node_primitives::{SignedFullStatement, UncheckedSignedFullStatement, Statement};
 use polkadot_primitives::v1::{
 	CandidateHash, CommittedCandidateReceipt, CompactStatement, Hash,
 	SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, AuthorityDiscoveryId,
@@ -54,7 +54,7 @@ use polkadot_node_network_protocol::{
 
 use futures::{channel::mpsc, future::RemoteHandle, prelude::*};
 use futures::channel::oneshot;
-use indexmap::{IndexSet, IndexMap, map::Entry as IEntry};
+use indexmap::{IndexMap, map::Entry as IEntry};
 use sp_keystore::SyncCryptoStorePtr;
 use util::{Fault, runtime::RuntimeInfo};
 
@@ -461,10 +461,10 @@ impl PeerData {
 }
 
 // A statement stored while a relay chain head is active.
-#[derive(Debug)]
-struct StoredStatement {
-	comparator: StoredStatementComparator,
-	statement: SignedFullStatement,
+#[derive(Debug, Copy, Clone)]
+struct StoredStatement<'a> {
+	comparator: &'a StoredStatementComparator,
+	statement: &'a SignedFullStatement,
 }
 
 // A value used for comparison of stored statements to each other.
@@ -480,40 +480,26 @@ struct StoredStatementComparator {
 	signature: ValidatorSignature,
 }
 
-impl StoredStatement {
-	fn compact(&self) -> &CompactStatement {
-		&self.comparator.compact
-	}
-
-	fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) {
-		(self.comparator.compact.clone(), self.statement.validator_index())
+impl<'a> From<(&'a StoredStatementComparator, &'a SignedFullStatement)> for StoredStatement<'a> {
+	fn from((comparator, statement): (&'a StoredStatementComparator, &'a SignedFullStatement)) -> Self {
+		Self { comparator, statement }
 	}
 }
 
-impl std::borrow::Borrow<StoredStatementComparator> for StoredStatement {
-	fn borrow(&self) -> &StoredStatementComparator {
-		&self.comparator
-	}
-}
-
-impl std::hash::Hash for StoredStatement {
-	fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-		self.comparator.hash(state)
+impl<'a> StoredStatement<'a> {
+	fn compact(&self) -> &'a CompactStatement {
+		&self.comparator.compact
 	}
-}
 
-impl std::cmp::PartialEq for StoredStatement {
-	fn eq(&self, other: &Self) -> bool {
-		&self.comparator == &other.comparator
+	fn fingerprint(&self) -> (CompactStatement, ValidatorIndex) {
+		(self.comparator.compact.clone(), self.statement.validator_index())
 	}
 }
 
-impl std::cmp::Eq for StoredStatement {}
-
 #[derive(Debug)]
 enum NotedStatement<'a> {
 	NotUseful,
-	Fresh(&'a StoredStatement),
+	Fresh(StoredStatement<'a>),
 	UsefulButKnown
 }
 
@@ -588,7 +574,7 @@ struct ActiveHeadData {
 	///
 	/// These are iterable in insertion order, and `Seconded` statements are always
 	/// accepted before dependent statements.
-	statements: IndexSet<StoredStatement>,
+	statements: IndexMap<StoredStatementComparator, SignedFullStatement>,
 	/// Large statements we are waiting for with associated meta data.
 	waiting_large_statements: HashMap<CandidateHash, LargeStatementStatus>,
 	/// The validators at this head.
@@ -641,11 +627,6 @@ impl ActiveHeadData {
 			signature: statement.signature().clone(),
 		};
 
-		let stored = StoredStatement {
-			comparator: comparator.clone(),
-			statement,
-		};
-
 		match comparator.compact {
 			CompactStatement::Seconded(h) => {
 				let seconded_so_far = self.seconded_counts.entry(validator_index).or_insert(0);
@@ -653,34 +634,36 @@ impl ActiveHeadData {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?stored.statement,
+						?statement,
 						"Extra statement is ignored"
 					);
 					return NotedStatement::NotUseful;
 				}
 
 				self.candidates.insert(h);
-				if self.statements.insert(stored) {
-					*seconded_so_far += 1;
-
+				if let Some(old) = self.statements.insert(comparator.clone(), statement) {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?self.statements.last().expect("Just inserted").statement,
-						"Noted new statement"
+						statement = ?old,
+						"Known statement"
 					);
-					// This will always return `Some` because it was just inserted.
-					NotedStatement::Fresh(self.statements.get(&comparator)
-						.expect("Statement was just inserted; qed"))
+					NotedStatement::UsefulButKnown
 				} else {
+					*seconded_so_far += 1;
+
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?self.statements.get(&comparator)
-							.expect("Existence was just checked; qed").statement,
-						"Known statement"
+						statement = ?self.statements.last().expect("Just inserted").1,
+						"Noted new statement"
 					);
-					NotedStatement::UsefulButKnown
+					// This will always return `Some` because it was just inserted.
+					let key_value = self.statements
+						.get_key_value(&comparator)
+						.expect("Statement was just inserted; qed");
+
+					NotedStatement::Fresh(key_value.into())
 				}
 			}
 			CompactStatement::Valid(h) => {
@@ -688,31 +671,34 @@ impl ActiveHeadData {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?stored.statement,
+						?statement,
 						"Statement for unknown candidate"
 					);
 					return NotedStatement::NotUseful;
 				}
 
-				if self.statements.insert(stored) {
+				if let Some(old) = self.statements.insert(comparator.clone(), statement) {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?self.statements.last().expect("Just inserted").statement,
-						"Noted new statement"
+						statement = ?old,
+						"Known statement"
 					);
-					// This will always return `Some` because it was just inserted.
-					NotedStatement::Fresh(self.statements.get(&comparator)
-						.expect("Statement was just inserted; qed"))
+					NotedStatement::UsefulButKnown
 				} else {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?self.statements.get(&comparator)
-							.expect("Existence was just checked; qed").statement,
-						"Known statement"
+						statement = ?self.statements.last().expect("Just inserted").1,
+						"Noted new statement"
 					);
-					NotedStatement::UsefulButKnown
+					// This will always return `Some` because it was just inserted.
+					NotedStatement::Fresh(
+						self.statements
+							.get_key_value(&comparator)
+							.expect("Statement was just inserted; qed")
+							.into()
+					)
 				}
 			}
 		}
@@ -720,20 +706,15 @@ impl ActiveHeadData {
 
 	/// Returns an error if the statement is already known or not useful
 	/// without modifying the internal state.
-	fn check_useful_or_unknown(&self, statement: SignedFullStatement)
+	fn check_useful_or_unknown(&self, statement: &UncheckedSignedFullStatement)
 		-> std::result::Result<(), DeniedStatement>
 	{
-		let validator_index = statement.validator_index();
-		let compact = statement.payload().to_compact();
+		let validator_index = statement.unchecked_validator_index();
+		let compact = statement.unchecked_payload().to_compact();
 		let comparator = StoredStatementComparator {
 			compact: compact.clone(),
 			validator_index,
-			signature: statement.signature().clone(),
-		};
-
-		let stored = StoredStatement {
-			comparator,
-			statement,
+			signature: statement.unchecked_signature().clone(),
 		};
 
 		match compact {
@@ -743,17 +724,17 @@ impl ActiveHeadData {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?stored.statement,
+						?statement,
 						"Extra statement is ignored",
 					);
 					return Err(DeniedStatement::NotUseful);
 				}
 
-				if self.statements.contains(&stored) {
+				if self.statements.contains_key(&comparator) {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?stored.statement,
+						?statement,
 						"Known statement",
 					);
 					return Err(DeniedStatement::UsefulButKnown);
@@ -764,17 +745,17 @@ impl ActiveHeadData {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?stored.statement,
+						?statement,
 						"Statement for unknown candidate",
 					);
 					return Err(DeniedStatement::NotUseful);
 				}
 
-				if self.statements.contains(&stored) {
+				if self.statements.contains_key(&comparator) {
 					tracing::trace!(
 						target: LOG_TARGET,
 						?validator_index,
-						statement = ?stored.statement,
+						?statement,
 						"Known statement",
 					);
 					return Err(DeniedStatement::UsefulButKnown);
@@ -785,14 +766,13 @@ impl ActiveHeadData {
 	}
 
 	/// Get an iterator over all statements for the active head. Seconded statements come first.
-	fn statements(&self) -> impl Iterator<Item = &'_ StoredStatement> + '_ {
-		self.statements.iter()
+	fn statements(&self) -> impl Iterator<Item = StoredStatement<'_>> + '_ {
+		self.statements.iter().map(Into::into)
 	}
 
 	/// Get an iterator over all statements for the active head that are for a particular candidate.
 	fn statements_about(&self, candidate_hash: CandidateHash)
-		-> impl Iterator<Item = &'_ StoredStatement> + '_
-	{
+		-> impl Iterator<Item = StoredStatement<'_>> + '_ {
 		self.statements().filter(move |s| s.compact().candidate_hash() == &candidate_hash)
 	}
 }
@@ -801,16 +781,17 @@ impl ActiveHeadData {
 fn check_statement_signature(
 	head: &ActiveHeadData,
 	relay_parent: Hash,
-	statement: &SignedFullStatement,
-) -> std::result::Result<(), ()> {
+	statement: UncheckedSignedFullStatement,
+) -> std::result::Result<SignedFullStatement, UncheckedSignedFullStatement> {
 	let signing_context = SigningContext {
 		session_index: head.session_index,
 		parent_hash: relay_parent,
 	};
 
-	head.validators.get(statement.validator_index().0 as usize)
-		.ok_or(())
-		.and_then(|v| statement.check_signature(&signing_context, v))
+	head.validators
+		.get(statement.unchecked_validator_index().0 as usize)
+		.ok_or_else(|| statement.clone())
+		.and_then(|v| statement.try_into_checked(&signing_context, v))
 }
 
 /// Places the statement in storage if it is new, and then
@@ -887,7 +868,7 @@ fn statement_message(relay_parent: Hash, statement: SignedFullStatement)
 			}
 		)
 	} else {
-		protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement)
+		protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement.into())
 	};
 
 	protocol_v1::ValidationProtocol::StatementDistribution(msg)
@@ -902,7 +883,7 @@ fn is_statement_large(statement: &SignedFullStatement) -> bool {
 				return true
 			}
 			// No runtime upgrade, now we need to be more nuanced:
-			let size = statement.encoded_size();
+			let size = statement.as_unchecked().encoded_size();
 
 			// Half max size seems to be a good threshold to start not using notifications:
 			let threshold =
@@ -919,11 +900,11 @@ fn is_statement_large(statement: &SignedFullStatement) -> bool {
 /// Circulates a statement to all peers who have not seen it yet, and returns
 /// an iterator over peers who need to have dependent statements sent.
 #[tracing::instrument(level = "trace", skip(peers, ctx), fields(subsystem = LOG_TARGET))]
-async fn circulate_statement(
+async fn circulate_statement<'a>(
 	peers: &mut HashMap<PeerId, PeerData>,
 	ctx: &mut impl SubsystemContext,
 	relay_parent: Hash,
-	stored: &StoredStatement,
+	stored: StoredStatement<'a>,
 	mut priority_peers: Vec<PeerId>,
 ) -> Vec<PeerId> {
 	let fingerprint = stored.fingerprint();
@@ -1092,7 +1073,7 @@ async fn retrieve_statement_from_message<'a>(
 	ctx: &mut impl SubsystemContext,
 	req_sender: &mpsc::Sender<RequesterMessage>,
 	metrics: &Metrics,
-) -> Option<SignedFullStatement> {
+) -> Option<UncheckedSignedFullStatement> {
 
 	let fingerprint = message.get_fingerprint();
 	let candidate_hash = *fingerprint.0.candidate_hash();
@@ -1100,7 +1081,7 @@ async fn retrieve_statement_from_message<'a>(
 	// Immediately return any Seconded statement:
 	let message =
 		if let protocol_v1::StatementDistributionMessage::Statement(h, s) = message {
-			if let Statement::Seconded(_) = s.payload() {
+			if let Statement::Seconded(_) = s.unchecked_payload() {
 				return Some(s)
 			}
 			protocol_v1::StatementDistributionMessage::Statement(h, s)
@@ -1148,40 +1129,12 @@ async fn retrieve_statement_from_message<'a>(
 							return Some(s)
 						}
 						protocol_v1::StatementDistributionMessage::LargeStatement(metadata) => {
-
-							let validator_id = active_head.validators.get(metadata.signed_by.0 as usize);
-
-							if let Some(validator_id) = validator_id {
-								let signing_context = SigningContext {
-									session_index: active_head.session_index,
-									parent_hash: metadata.relay_parent,
-								};
-
-								let statement = SignedFullStatement::new(
-									Statement::Seconded(committed.clone()),
+							return Some(UncheckedSignedFullStatement::new(
+								Statement::Seconded(
+									committed.clone()),
 									metadata.signed_by,
 									metadata.signature.clone(),
-									&signing_context,
-									validator_id,
-								);
-
-								if let Some(statement) = statement {
-									return Some(statement)
-								} else {
-									tracing::debug!(
-										target: LOG_TARGET,
-										validator_index = ?metadata.signed_by,
-										"Building statement failed - invalid signature!"
-									);
-									report_peer(ctx, peer, COST_INVALID_SIGNATURE).await;
-								}
-							} else {
-								tracing::debug!(
-									target: LOG_TARGET,
-									validator_index = ?metadata.signed_by,
-									"Error loading statement, could not find key for validator."
-								);
-							}
+							))
 						}
 					}
 				}
@@ -1309,7 +1262,7 @@ async fn handle_incoming_message<'a>(
 	message: protocol_v1::StatementDistributionMessage,
 	req_sender: &mpsc::Sender<RequesterMessage>,
 	metrics: &Metrics,
-) -> Option<(Hash, &'a StoredStatement)> {
+) -> Option<(Hash, StoredStatement<'a>)> {
 	let relay_parent = message.get_relay_parent();
 
 	let active_head = match active_heads.get_mut(&relay_parent) {
@@ -1356,7 +1309,7 @@ async fn handle_incoming_message<'a>(
 		metrics,
 	).await?;
 
-	match active_head.check_useful_or_unknown(statement.clone()) {
+	match active_head.check_useful_or_unknown(&statement) {
 		Ok(()) => {},
 		Err(DeniedStatement::NotUseful) => {
 			return None;
@@ -1368,16 +1321,19 @@ async fn handle_incoming_message<'a>(
 	}
 
 	// check the signature on the statement.
-	if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) {
-		tracing::debug!(
-			target: LOG_TARGET,
-			?peer,
-			?statement,
-			"Invalid statement signature"
-		);
-		report_peer(ctx, peer, COST_INVALID_SIGNATURE).await;
-		return None;
-	}
+	let statement = match check_statement_signature(&active_head, relay_parent, statement) {
+		Err(statement) => {
+			tracing::debug!(
+				target: LOG_TARGET,
+				?peer,
+				?statement,
+				"Invalid statement signature"
+			);
+			report_peer(ctx, peer, COST_INVALID_SIGNATURE).await;
+			return None
+		}
+		Ok(statement) => statement,
+	};
 
 	// Ensure the statement is stored in the peer data.
 	//
@@ -1553,7 +1509,7 @@ impl StatementDistribution {
 		let mut authorities: HashMap<AuthorityDiscoveryId, PeerId> = HashMap::new();
 		let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();
 
-		let mut runtime = RuntimeInfo::new(self.keystore.clone());
+		let mut runtime = RuntimeInfo::new(Some(self.keystore.clone()));
 
 		// Sender/Receiver for getting news from our statement fetching tasks.
 		let (req_sender, mut req_receiver) = mpsc::channel(1);
@@ -2116,14 +2072,14 @@ mod tests {
 			ValidatorIndex(0),
 			&alice_public.into(),
 		)).ok().flatten().expect("should be signed");
-		assert!(head_data.check_useful_or_unknown(a_seconded_val_0.clone()).is_ok());
+		assert!(head_data.check_useful_or_unknown(&a_seconded_val_0.clone().into()).is_ok());
 		let noted = head_data.note_statement(a_seconded_val_0.clone());
 
 		assert_matches!(noted, NotedStatement::Fresh(_));
 
 		// note A (duplicate)
 		assert_eq!(
-			head_data.check_useful_or_unknown(a_seconded_val_0.clone()),
+			head_data.check_useful_or_unknown(&a_seconded_val_0.clone().into()),
 			Err(DeniedStatement::UsefulButKnown),
 		);
 		let noted = head_data.note_statement(a_seconded_val_0);
@@ -2138,7 +2094,7 @@ mod tests {
 			ValidatorIndex(0),
 			&alice_public.into(),
 		)).ok().flatten().expect("should be signed");
-		assert!(head_data.check_useful_or_unknown(statement.clone()).is_ok());
+		assert!(head_data.check_useful_or_unknown(&statement.clone().into()).is_ok());
 		let noted = head_data.note_statement(statement);
 		assert_matches!(noted, NotedStatement::Fresh(_));
 
@@ -2151,7 +2107,7 @@ mod tests {
 			&alice_public.into(),
 		)).ok().flatten().expect("should be signed");
 		assert_eq!(
-			head_data.check_useful_or_unknown(statement.clone()),
+			head_data.check_useful_or_unknown(&statement.clone().into()),
 			Err(DeniedStatement::NotUseful),
 		);
 		let noted = head_data.note_statement(statement);
@@ -2165,7 +2121,7 @@ mod tests {
 			ValidatorIndex(1),
 			&bob_public.into(),
 		)).ok().flatten().expect("should be signed");
-		assert!(head_data.check_useful_or_unknown(statement.clone()).is_ok());
+		assert!(head_data.check_useful_or_unknown(&statement.clone().into()).is_ok());
 		let noted = head_data.note_statement(statement);
 		assert_matches!(noted, NotedStatement::Fresh(_));
 
@@ -2177,7 +2133,7 @@ mod tests {
 			ValidatorIndex(1),
 			&bob_public.into(),
 		)).ok().flatten().expect("should be signed");
-		assert!(head_data.check_useful_or_unknown(statement.clone()).is_ok());
+		assert!(head_data.check_useful_or_unknown(&statement.clone().into()).is_ok());
 		let noted = head_data.note_statement(statement);
 		assert_matches!(noted, NotedStatement::Fresh(_));
 	}
@@ -2406,7 +2362,7 @@ mod tests {
 				ValidatorIndex(0),
 				&alice_public.into(),
 			)).ok().flatten().expect("should be signed");
-			assert!(data.check_useful_or_unknown(statement.clone()).is_ok());
+			assert!(data.check_useful_or_unknown(&statement.clone().into()).is_ok());
 			let noted = data.note_statement(statement);
 
 			assert_matches!(noted, NotedStatement::Fresh(_));
@@ -2418,7 +2374,7 @@ mod tests {
 				ValidatorIndex(1),
 				&bob_public.into(),
 			)).ok().flatten().expect("should be signed");
-			assert!(data.check_useful_or_unknown(statement.clone()).is_ok());
+			assert!(data.check_useful_or_unknown(&statement.clone().into()).is_ok());
 			let noted = data.note_statement(statement);
 
 			assert_matches!(noted, NotedStatement::Fresh(_));
@@ -2430,7 +2386,7 @@ mod tests {
 				ValidatorIndex(2),
 				&charlie_public.into(),
 			)).ok().flatten().expect("should be signed");
-			assert!(data.check_useful_or_unknown(statement.clone()).is_ok());
+			assert!(data.check_useful_or_unknown(&statement.clone().into()).is_ok());
 			let noted = data.note_statement(statement);
 			assert_matches!(noted, NotedStatement::Fresh(_));
 
@@ -2553,40 +2509,39 @@ mod tests {
 				::<StatementDistributionMessage,_>(pool);
 
 		executor::block_on(async move {
-			let statement = {
-				let signing_context = SigningContext {
-					parent_hash: hash_b,
-					session_index,
-				};
+			let signing_context = SigningContext {
+				parent_hash: hash_b,
+				session_index,
+			};
 
-				let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
-				let alice_public = CryptoStore::sr25519_generate_new(
-					&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
-				).await.unwrap();
+			let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
+			let alice_public = CryptoStore::sr25519_generate_new(
+				&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
+			).await.unwrap();
 
-				let statement = SignedFullStatement::sign(
-					&keystore,
-					Statement::Seconded(candidate),
-					&signing_context,
-					ValidatorIndex(0),
-					&alice_public.into(),
-				).await.ok().flatten().expect("should be signed");
-
-				StoredStatement {
-					comparator: StoredStatementComparator {
-						compact: statement.payload().to_compact(),
-						validator_index: ValidatorIndex(0),
-						signature: statement.signature().clone()
-					},
-					statement,
-				}
+			let statement = SignedFullStatement::sign(
+				&keystore,
+				Statement::Seconded(candidate),
+				&signing_context,
+				ValidatorIndex(0),
+				&alice_public.into(),
+			).await.ok().flatten().expect("should be signed");
+
+			let comparator = StoredStatementComparator {
+					compact: statement.payload().to_compact(),
+					validator_index: ValidatorIndex(0),
+					signature: statement.signature().clone()
+			};
+			let statement = StoredStatement {
+				comparator: &comparator,
+				statement: &statement,
 			};
 
 			let needs_dependents = circulate_statement(
 				&mut peer_data,
 				&mut ctx,
 				hash_b,
-				&statement,
+				statement,
 				Vec::new(),
 			).await;
 
@@ -2746,7 +2701,7 @@ mod tests {
 				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
 					NetworkBridgeEvent::PeerMessage(
 						peer_a.clone(),
-						protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()),
+						protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into()),
 					)
 				)
 			}).await;
@@ -2777,7 +2732,7 @@ mod tests {
 				) => {
 					assert_eq!(recipients, vec![peer_b.clone()]);
 					assert_eq!(r, hash_a);
-					assert_eq!(s, statement);
+					assert_eq!(s, statement.into());
 				}
 			);
 			handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
@@ -2951,7 +2906,7 @@ mod tests {
 			};
 
 			let metadata =
-				protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata();
+				protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into()).get_metadata();
 
 			handle.send(FromOverseer::Communication {
 				msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
@@ -3454,7 +3409,7 @@ mod tests {
 			};
 
 			let metadata =
-				protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata();
+				protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into()).get_metadata();
 
 			handle.send(FromOverseer::Communication {
 				msg: StatementDistributionMessage::Share(hash_a, statement.clone())
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index 8c45e843556..a59f6cd1e05 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -33,7 +33,7 @@ pub use sp_consensus_babe::{
 	Epoch as BabeEpoch, BabeEpochConfiguration, AllowedSlots as BabeAllowedSlots,
 };
 
-use polkadot_primitives::v1::{CandidateCommitments, CandidateHash, CollatorPair, CommittedCandidateReceipt, CompactStatement, EncodeAs, Hash, HeadData, Id as ParaId, OutboundHrmpMessage, PersistedValidationData, Signed, UpwardMessage, ValidationCode, BlakeTwo256, HashT, ValidatorIndex};
+use polkadot_primitives::v1::{BlakeTwo256, CandidateCommitments, CandidateHash, CollatorPair, CommittedCandidateReceipt, CompactStatement, EncodeAs, Hash, HashT, HeadData, Id as ParaId, OutboundHrmpMessage, PersistedValidationData, Signed, UncheckedSigned, UpwardMessage, ValidationCode, ValidatorIndex};
 pub use polkadot_parachain::primitives::BlockData;
 
 pub mod approval;
@@ -114,6 +114,9 @@ impl EncodeAs<CompactStatement> for Statement {
 /// Only the compact `SignedStatement` is suitable for submission to the chain.
 pub type SignedFullStatement = Signed<Statement, CompactStatement>;
 
+/// Variant of `SignedFullStatement` where the signature has not yet been verified.
+pub type UncheckedSignedFullStatement = UncheckedSigned<Statement, CompactStatement>;
+
 /// Candidate invalidity details
 #[derive(Debug)]
 pub enum InvalidCandidate {
diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs
index 98f3dda1a34..287b530440b 100644
--- a/polkadot/node/subsystem-util/src/lib.rs
+++ b/polkadot/node/subsystem-util/src/lib.rs
@@ -322,21 +322,6 @@ impl Validator {
 	) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
 		Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key).await
 	}
-
-	/// Validate the payload with this validator
-	///
-	/// Validation can only succeed if `signed.validator_index() == self.index()`.
-	/// Normally, this will always be the case for a properly operating program,
-	/// but it's double-checked here anyway.
-	pub fn check_payload<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
-		&self,
-		signed: Signed<Payload, RealPayload>,
-	) -> Result<(), ()> {
-		if signed.validator_index() != self.index {
-			return Err(());
-		}
-		signed.check_signature(&self.signing_context, &self.id())
-	}
 }
 
 struct AbortOnDrop(future::AbortHandle);
diff --git a/polkadot/node/subsystem-util/src/runtime/mod.rs b/polkadot/node/subsystem-util/src/runtime/mod.rs
index 0012c9f6b44..f07db076302 100644
--- a/polkadot/node/subsystem-util/src/runtime/mod.rs
+++ b/polkadot/node/subsystem-util/src/runtime/mod.rs
@@ -18,15 +18,18 @@
 
 use lru::LruCache;
 
+use parity_scale_codec::Encode;
 use sp_application_crypto::AppKey;
 use sp_core::crypto::Public;
 use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
 
-use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
+use polkadot_primitives::v1::{CoreState, EncodeAs, GroupIndex, GroupRotationInfo, Hash, OccupiedCore, SessionIndex, SessionInfo, Signed, SigningContext, UncheckedSigned, ValidatorId, ValidatorIndex};
 use polkadot_node_subsystem::SubsystemContext;
 
 use crate::{
 	request_session_index_for_child, request_session_info,
+	request_availability_cores,
+	request_validator_groups,
 };
 
 /// Errors that can happen on runtime fetches.
@@ -49,7 +52,7 @@ pub struct RuntimeInfo {
 	session_info_cache: LruCache<SessionIndex, ExtendedSessionInfo>,
 
 	/// Key store for determining whether we are a validator and what `ValidatorIndex` we have.
-	keystore: SyncCryptoStorePtr,
+	keystore: Option<SyncCryptoStorePtr>,
 }
 
 /// SessionInfo with additional useful data for validator nodes.
@@ -72,7 +75,7 @@ pub struct ValidatorInfo {
 
 impl RuntimeInfo {
 	/// Create a new `RuntimeInfo` for convenient runtime fetches.
-	pub fn new(keystore: SyncCryptoStorePtr) -> Self {
+	pub fn new(keystore: Option<SyncCryptoStorePtr>) -> Self {
 		Self {
 			// Adjust, depending on how many forks we want to support.
 			session_index_cache: LruCache::new(10),
@@ -150,6 +153,23 @@ impl RuntimeInfo {
 		)
 	}
 
+	/// Convenience function for checking the signature of something signed.
+	pub async fn check_signature<Context, Payload, RealPayload>(
+		&mut self,
+		ctx: &mut Context,
+		parent: Hash,
+		signed: UncheckedSigned<Payload, RealPayload>,
+	) -> Result<std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>>>
+	where
+		Context: SubsystemContext,
+		Payload: EncodeAs<RealPayload> + Clone,
+		RealPayload: Encode + Clone,
+	{
+		let session_index = self.get_session_index(ctx, parent).await?;
+		let info = self.get_session_info_by_index(ctx, parent, session_index).await?;
+		Ok(check_signature(session_index, &info.session_info, parent, signed))
+	}
+
 	/// Build `ValidatorInfo` for the current session.
 	///
 	///
@@ -187,8 +207,9 @@ impl RuntimeInfo {
 	///
 	/// Returns: None if we are not a validator.
 	async fn get_our_index(&self, validators: &[ValidatorId]) -> Option<ValidatorIndex> {
+		let keystore = self.keystore.as_ref()?;
 		for (i, v) in validators.iter().enumerate() {
-			if CryptoStore::has_keys(&*self.keystore, &[(v.to_raw_vec(), ValidatorId::ID)])
+			if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), ValidatorId::ID)])
 				.await
 			{
 				return Some(ValidatorIndex(i as u32));
@@ -197,3 +218,69 @@ impl RuntimeInfo {
 		None
 	}
 }
+
+/// Convenience function for quickly checking the signature on signed data.
+pub fn check_signature<Payload, RealPayload>(
+	session_index: SessionIndex,
+	session_info: &SessionInfo,
+	relay_parent: Hash,
+	signed: UncheckedSigned<Payload, RealPayload>,
+) -> std::result::Result<Signed<Payload, RealPayload>, UncheckedSigned<Payload, RealPayload>> 
+where
+	Payload: EncodeAs<RealPayload> + Clone,
+	RealPayload: Encode + Clone,
+{
+	let signing_context = SigningContext {
+		session_index,
+		parent_hash: relay_parent,
+	};
+
+	session_info.validators
+		.get(signed.unchecked_validator_index().0 as usize)
+		.ok_or_else(|| signed.clone())
+		.and_then(|v| signed.try_into_checked(&signing_context, v))
+}
+
+/// Request availability cores from the runtime.
+pub async fn get_availability_cores<Context>(ctx: &mut Context, relay_parent: Hash)
+	-> Result<Vec<CoreState>> 
+	where
+		Context: SubsystemContext,
+{
+	recv_runtime(request_availability_cores(relay_parent, ctx.sender()).await).await
+}
+
+/// Variant of `request_availability_cores` that only returns occupied ones.
+pub async fn get_occupied_cores<Context>(
+	ctx: &mut Context,
+	relay_parent: Hash,
+) -> Result<Vec<OccupiedCore>>
+where
+	Context: SubsystemContext,
+{
+	let cores = get_availability_cores(ctx, relay_parent).await?;
+
+	Ok(cores
+		.into_iter()
+		.filter_map(|core_state| {
+			if let CoreState::Occupied(occupied) = core_state {
+				Some(occupied)
+			} else {
+				None
+			}
+		})
+		.collect()
+	)
+}
+
+/// Get group rotation info based on the given relay_parent.
+pub async fn get_group_rotation_info<Context>(ctx: &mut Context, relay_parent: Hash)
+	-> Result<GroupRotationInfo>
+	where
+		Context: SubsystemContext
+{
+	// We drop `groups` here as we don't need them, because of `RuntimeInfo`. Ideally we would not
+	// fetch them in the first place.
+	let (_, info) = recv_runtime(request_validator_groups(relay_parent, ctx.sender()).await).await?;
+	Ok(info)
+}
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index 7d6d39938ec..55b733864f0 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -200,7 +200,7 @@ pub enum CollatorProtocolMessage {
 	/// Note a collator as having provided a good collation.
 	NoteGoodCollation(CollatorId),
 	/// Notify a collator that its collation was seconded.
-	NotifyCollationSeconded(CollatorId, SignedFullStatement),
+	NotifyCollationSeconded(CollatorId, Hash, SignedFullStatement),
 	/// Get a network bridge update.
 	#[from]
 	NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>),
diff --git a/polkadot/primitives/src/v0.rs b/polkadot/primitives/src/v0.rs
index 9a7112bccd0..dc3c8dceb7d 100644
--- a/polkadot/primitives/src/v0.rs
+++ b/polkadot/primitives/src/v0.rs
@@ -18,8 +18,6 @@
 //! perspective.
 
 use sp_std::prelude::*;
-#[cfg(feature = "std")]
-use sp_std::convert::TryInto;
 use sp_std::cmp::Ordering;
 
 use parity_scale_codec::{Encode, Decode};
@@ -29,13 +27,9 @@ use serde::{Serialize, Deserialize};
 #[cfg(feature = "std")]
 use parity_util_mem::{MallocSizeOf, MallocSizeOfOps};
 
-#[cfg(feature = "std")]
-use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError};
 use primitives::RuntimeDebug;
 use runtime_primitives::traits::{AppVerify, Block as BlockT};
 use inherents::InherentIdentifier;
-#[cfg(feature = "std")]
-use application_crypto::AppKey;
 use application_crypto::KeyTypeId;
 
 pub use runtime_primitives::traits::{BlakeTwo256, Hash as HashT, Verify, IdentifyAccount};
@@ -731,9 +725,6 @@ impl CompactStatement {
 	}
 }
 
-/// A signed compact statement, suitable to be sent to the chain.
-pub type SignedStatement = Signed<CompactStatement>;
-
 /// An either implicit or explicit attestation to the validity of a parachain
 /// candidate.
 #[derive(Clone, Eq, PartialEq, Decode, Encode, RuntimeDebug)]
@@ -866,156 +857,6 @@ pub mod id {
 	pub const PARACHAIN_HOST: ApiId = *b"parahost";
 }
 
-/// This helper trait ensures that we can encode Statement as CompactStatement,
-/// and anything as itself.
-///
-/// This resembles `parity_scale_codec::EncodeLike`, but it's distinct:
-/// EncodeLike is a marker trait which asserts at the typesystem level that
-/// one type's encoding is a valid encoding for another type. It doesn't
-/// perform any type conversion when encoding.
-///
-/// This trait, on the other hand, provides a method which can be used to
-/// simultaneously convert and encode one type as another.
-pub trait EncodeAs<T> {
-	/// Convert Self into T, then encode T.
-	///
-	/// This is useful when T is a subset of Self, reducing encoding costs;
-	/// its signature also means that we do not need to clone Self in order
-	/// to retain ownership, as we would if we were to do
-	/// `self.clone().into().encode()`.
-	fn encode_as(&self) -> Vec<u8>;
-}
-
-impl<T: Encode> EncodeAs<T> for T {
-	fn encode_as(&self) -> Vec<u8> {
-		self.encode()
-	}
-}
-
-/// A signed type which encapsulates the common desire to sign some data and validate a signature.
-///
-/// Note that the internal fields are not public; they are all accessable by immutable getters.
-/// This reduces the chance that they are accidentally mutated, invalidating the signature.
-#[derive(Clone, PartialEq, Eq, Encode, Decode, RuntimeDebug)]
-pub struct Signed<Payload, RealPayload = Payload> {
-	/// The payload is part of the signed data. The rest is the signing context,
-	/// which is known both at signing and at validation.
-	payload: Payload,
-	/// The index of the validator signing this statement.
-	validator_index: ValidatorIndex,
-	/// The signature by the validator of the signed payload.
-	signature: ValidatorSignature,
-	/// This ensures the real payload is tracked at the typesystem level.
-	real_payload: sp_std::marker::PhantomData<RealPayload>,
-}
-
-// We can't bound this on `Payload: Into<RealPayload>` beacuse that conversion consumes
-// the payload, and we don't want that. We can't bound it on `Payload: AsRef<RealPayload>`
-// because there's no blanket impl of `AsRef<T> for T`. In the end, we just invent our
-// own trait which does what we need: EncodeAs.
-impl<Payload: EncodeAs<RealPayload>, RealPayload: Encode> Signed<Payload, RealPayload> {
-	fn payload_data<H: Encode>(payload: &Payload, context: &SigningContext<H>) -> Vec<u8> {
-		// equivalent to (real_payload, context).encode()
-		let mut out = payload.encode_as();
-		out.extend(context.encode());
-		out
-	}
-
-	/// Used to create a `Signed` from already existing parts.
-	#[cfg(feature = "std")]
-	pub fn new<H: Encode>(
-		payload: Payload,
-		validator_index: ValidatorIndex,
-		signature: ValidatorSignature,
-		context: &SigningContext<H>,
-		key: &ValidatorId,
-	) -> Option<Self> {
-		let s = Self {
-			payload,
-			validator_index,
-			signature,
-			real_payload: std::marker::PhantomData,
-		};
-
-		s.check_signature(context, key).ok()?;
-
-		Some(s)
-	}
-
-	/// Sign this payload with the given context and key, storing the validator index.
-	#[cfg(feature = "std")]
-	pub async fn sign<H: Encode>(
-		keystore: &SyncCryptoStorePtr,
-		payload: Payload,
-		context: &SigningContext<H>,
-		validator_index: ValidatorIndex,
-		key: &ValidatorId,
-	) -> Result<Option<Self>, KeystoreError> {
-		let data = Self::payload_data(&payload, context);
-		let signature = CryptoStore::sign_with(
-			&**keystore,
-			ValidatorId::ID,
-			&key.into(),
-			&data,
-		).await?;
-
-		let signature = match signature {
-			Some(sig) => sig.try_into().map_err(|_| KeystoreError::KeyNotSupported(ValidatorId::ID))?,
-			None => return Ok(None),
-		};
-
-		Ok(Some(Self {
-			payload,
-			validator_index,
-			signature,
-			real_payload: std::marker::PhantomData,
-		}))
-	}
-
-	/// Validate the payload given the context and public key.
-	pub fn check_signature<H: Encode>(&self, context: &SigningContext<H>, key: &ValidatorId) -> Result<(), ()> {
-		let data = Self::payload_data(&self.payload, context);
-		if self.signature.verify(data.as_slice(), key) { Ok(()) } else { Err(()) }
-	}
-
-	/// Immutably access the payload.
-	#[inline]
-	pub fn payload(&self) -> &Payload {
-		&self.payload
-	}
-
-	/// Immutably access the validator index.
-	#[inline]
-	pub fn validator_index(&self) -> ValidatorIndex {
-		self.validator_index
-	}
-
-	/// Immutably access the signature.
-	#[inline]
-	pub fn signature(&self) -> &ValidatorSignature {
-		&self.signature
-	}
-
-	/// Discard signing data, get the payload
-	// Note: can't `impl<P, R> From<Signed<P, R>> for P` because the orphan rule exception doesn't
-	// handle this case yet. Likewise can't `impl<P, R> Into<P> for Signed<P, R>` because it might
-	// potentially conflict with the global blanket impl, even though it currently doesn't.
-	#[inline]
-	pub fn into_payload(self) -> Payload {
-		self.payload
-	}
-
-	/// Convert `Payload` into `RealPayload`.
-	pub fn convert_payload(&self) -> Signed<RealPayload> where for<'a> &'a Payload: Into<RealPayload> {
-		Signed {
-			signature: self.signature.clone(),
-			validator_index: self.validator_index,
-			payload: self.payload().into(),
-			real_payload: sp_std::marker::PhantomData,
-		}
-	}
-}
-
 /// Custom validity errors used in Polkadot while validating transactions.
 #[repr(u8)]
 pub enum ValidityError {
diff --git a/polkadot/primitives/src/v1.rs b/polkadot/primitives/src/v1/mod.rs
similarity index 98%
rename from polkadot/primitives/src/v1.rs
rename to polkadot/primitives/src/v1/mod.rs
index 4fdab1f3e1a..f0d44905b1c 100644
--- a/polkadot/primitives/src/v1.rs
+++ b/polkadot/primitives/src/v1/mod.rs
@@ -44,8 +44,8 @@ pub use polkadot_parachain::primitives::{
 // Export some basic parachain primitives from v0.
 pub use crate::v0::{
 	CollatorId, CollatorSignature, PARACHAIN_KEY_TYPE_ID, ValidatorId, ValidatorIndex,
-	ValidatorSignature, SigningContext, Signed, ValidityAttestation,
-	CompactStatement, SignedStatement, EncodeAs,
+	ValidatorSignature, SigningContext,  ValidityAttestation,
+	CompactStatement,
 };
 
 #[cfg(feature = "std")]
@@ -58,6 +58,10 @@ pub use crate::v0::{ValidatorPair, CollatorPair};
 pub use sp_staking::SessionIndex;
 pub use sp_authority_discovery::AuthorityId as AuthorityDiscoveryId;
 
+/// Signed data.
+mod signed;
+pub use signed::{Signed, UncheckedSigned, EncodeAs};
+
 /// A declarations of storage keys where an external observer can find some interesting data.
 pub mod well_known_keys {
 	use super::{Id, HrmpChannelId};
@@ -169,6 +173,7 @@ pub mod well_known_keys {
 	}
 }
 
+
 /// Unique identifier for the Parachains Inherent
 pub const PARACHAINS_INHERENT_IDENTIFIER: InherentIdentifier = *b"parachn0";
 
@@ -461,11 +466,19 @@ impl From<BitVec<bitvec::order::Lsb0, u8>> for AvailabilityBitfield {
 	}
 }
 
+
+/// A signed compact statement, suitable to be sent to the chain.
+pub type SignedStatement = Signed<CompactStatement>;
+
 /// A bitfield signed by a particular validator about the availability of pending candidates.
 pub type SignedAvailabilityBitfield = Signed<AvailabilityBitfield>;
+/// A signed bitfield with signature not yet checked.
+pub type UncheckedSignedAvailabilityBitfield = UncheckedSigned<AvailabilityBitfield>;
 
 /// A set of signed availability bitfields. Should be sorted by validator index, ascending.
 pub type SignedAvailabilityBitfields = Vec<SignedAvailabilityBitfield>;
+/// A set of unchecked signed availability bitfields. Should be sorted by validator index, ascending.
+pub type UncheckedSignedAvailabilityBitfields = Vec<UncheckedSignedAvailabilityBitfield>;
 
 /// A backed (or backable, depending on context) candidate.
 #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
@@ -1120,7 +1133,7 @@ pub struct DisputeState<N = BlockNumber> {
 #[derive(Encode, Decode, Clone, PartialEq, RuntimeDebug)]
 pub struct InherentData<HDR: HeaderT = Header> {
 	/// Signed bitfields by validators about availability.
-	pub bitfields: SignedAvailabilityBitfields,
+	pub bitfields: UncheckedSignedAvailabilityBitfields,
 	/// Backed candidates for inclusion in the block.
 	pub backed_candidates: Vec<BackedCandidate<HDR::Hash>>,
 	/// Sets of dispute votes for inclusion,
diff --git a/polkadot/primitives/src/v1/signed.rs b/polkadot/primitives/src/v1/signed.rs
new file mode 100644
index 00000000000..3aa6d964342
--- /dev/null
+++ b/polkadot/primitives/src/v1/signed.rs
@@ -0,0 +1,282 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use parity_scale_codec::{Encode, Decode};
+
+use sp_std::prelude::Vec;
+#[cfg(feature = "std")]
+use sp_std::convert::TryInto;
+#[cfg(feature = "std")]
+use application_crypto::AppKey;
+#[cfg(feature = "std")]
+use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError};
+
+use primitives::RuntimeDebug;
+use runtime_primitives::traits::AppVerify;
+
+use crate::v0::{SigningContext, ValidatorId, ValidatorSignature, ValidatorIndex};
+
+/// Signed data with signature already verified.
+///
+/// NOTE: This type does not have an Encode/Decode instance, as this would cancel out our
+/// valid signature guarantees. If you need to encode/decode you have to convert into an
+/// `UncheckedSigned` first.
+///
+/// `Signed` can easily be converted into `UncheckedSigned` and conversion back via `into_signed`
+/// enforces a valid signature again.
+#[derive(Clone, PartialEq, Eq, RuntimeDebug)]
+pub struct Signed<Payload, RealPayload = Payload>(UncheckedSigned<Payload, RealPayload>);
+
+/// Unchecked signed data, can be converted to `Signed` by checking the signature.
+#[derive(Clone, PartialEq, Eq, RuntimeDebug, Encode, Decode)]
+pub struct UncheckedSigned<Payload, RealPayload = Payload> {
+	/// The payload is part of the signed data. The rest is the signing context,
+	/// which is known both at signing and at validation.
+	payload: Payload,
+	/// The index of the validator signing this statement.
+	validator_index: ValidatorIndex,
+	/// The signature by the validator of the signed payload.
+	signature: ValidatorSignature,
+	/// This ensures the real payload is tracked at the typesystem level.
+	real_payload: sp_std::marker::PhantomData<RealPayload>,
+}
+
+impl<Payload: EncodeAs<RealPayload>, RealPayload: Encode> Signed<Payload, RealPayload> {
+	/// Used to create a `Signed` from already existing parts.
+	///
+	/// The signature is checked as part of the process.
+	#[cfg(feature = "std")]
+	pub fn new<H: Encode>(
+		payload: Payload,
+		validator_index: ValidatorIndex,
+		signature: ValidatorSignature,
+		context: &SigningContext<H>,
+		key: &ValidatorId,
+	) -> Option<Self> {
+		let s = UncheckedSigned {
+			payload,
+			validator_index,
+			signature,
+			real_payload: std::marker::PhantomData,
+		};
+
+		s.check_signature(context, key).ok()?;
+
+		Some(Self(s))
+	}
+
+	/// Create a new `Signed` by signing data.
+	#[cfg(feature = "std")]
+	pub async fn sign<H: Encode>(
+		keystore: &SyncCryptoStorePtr,
+		payload: Payload,
+		context: &SigningContext<H>,
+		validator_index: ValidatorIndex,
+		key: &ValidatorId,
+	) -> Result<Option<Self>, KeystoreError> {
+		let r = UncheckedSigned::sign(keystore, payload, context, validator_index, key).await?;
+		Ok(r.map(Self))
+	}
+
+	/// Try to convert from `UncheckedSigned` by checking the signature.
+	pub fn try_from_unchecked<H: Encode>(
+		unchecked: UncheckedSigned<Payload, RealPayload>,
+		context: &SigningContext<H>,
+		key: &ValidatorId
+	) -> Result<Self, UncheckedSigned<Payload, RealPayload>> {
+		if unchecked.check_signature(context, key).is_ok() {
+			Ok(Self(unchecked))
+		} else {
+			Err(unchecked)
+		}
+	}
+
+	/// Get a reference to data as unchecked.
+	pub fn as_unchecked(&self) -> &UncheckedSigned<Payload, RealPayload> {
+		&self.0
+	}
+
+	/// Immutably access the payload.
+	#[inline]
+	pub fn payload(&self) -> &Payload {
+		&self.0.payload
+	}
+
+	/// Immutably access the validator index.
+	#[inline]
+	pub fn validator_index(&self) -> ValidatorIndex {
+		self.0.validator_index
+	}
+
+	/// Immutably access the signature.
+	#[inline]
+	pub fn signature(&self) -> &ValidatorSignature {
+		&self.0.signature
+	}
+
+	/// Discard signing data, get the payload
+	#[inline]
+	pub fn into_payload(self) -> Payload {
+		self.0.payload
+	}
+
+	/// Convert `Payload` into `RealPayload`.
+	pub fn convert_payload(&self) -> Signed<RealPayload> where for<'a> &'a Payload: Into<RealPayload> {
+		Signed(self.0.unchecked_convert_payload())
+	}
+}
+
+// We can't bound this on `Payload: Into<RealPayload>` beacuse that conversion consumes
+// the payload, and we don't want that. We can't bound it on `Payload: AsRef<RealPayload>`
+// because there's no blanket impl of `AsRef<T> for T`. In the end, we just invent our
+// own trait which does what we need: EncodeAs.
+impl<Payload: EncodeAs<RealPayload>, RealPayload: Encode> UncheckedSigned<Payload, RealPayload> {
+	/// Used to create a `UncheckedSigned` from already existing parts.
+	///
+	/// Signature is not checked here, hence `UncheckedSigned`.
+	#[cfg(feature = "std")]
+	pub fn new(
+		payload: Payload,
+		validator_index: ValidatorIndex,
+		signature: ValidatorSignature,
+	) -> Self {
+		Self {
+			payload,
+			validator_index,
+			signature,
+			real_payload: std::marker::PhantomData,
+		}
+	}
+
+	/// Check signature and convert to `Signed` if successful.
+	pub fn try_into_checked<H: Encode>(
+		self,
+		context: &SigningContext<H>,
+		key: &ValidatorId
+	) -> Result<Signed<Payload, RealPayload>, Self> {
+		Signed::try_from_unchecked(self, context, key)
+	}
+
+	/// Immutably access the payload.
+	#[inline]
+	pub fn unchecked_payload(&self) -> &Payload {
+		&self.payload
+	}
+
+	/// Immutably access the validator index.
+	#[inline]
+	pub fn unchecked_validator_index(&self) -> ValidatorIndex {
+		self.validator_index
+	}
+
+	/// Immutably access the signature.
+	#[inline]
+	pub fn unchecked_signature(&self) -> &ValidatorSignature {
+		&self.signature
+	}
+
+	/// Discard signing data, get the payload
+	#[inline]
+	pub fn unchecked_into_payload(self) -> Payload {
+		self.payload
+	}
+
+	/// Convert `Payload` into `RealPayload`.
+	pub fn unchecked_convert_payload(&self) -> UncheckedSigned<RealPayload> where for<'a> &'a Payload: Into<RealPayload> {
+		UncheckedSigned {
+			signature: self.signature.clone(),
+			validator_index: self.validator_index,
+			payload: (&self.payload).into(),
+			real_payload: sp_std::marker::PhantomData,
+		}
+	}
+
+	fn payload_data<H: Encode>(payload: &Payload, context: &SigningContext<H>) -> Vec<u8> {
+		// equivalent to (real_payload, context).encode()
+		let mut out = payload.encode_as();
+		out.extend(context.encode());
+		out
+	}
+
+	/// Sign this payload with the given context and key, storing the validator index.
+	#[cfg(feature = "std")]
+	async fn sign<H: Encode>(
+		keystore: &SyncCryptoStorePtr,
+		payload: Payload,
+		context: &SigningContext<H>,
+		validator_index: ValidatorIndex,
+		key: &ValidatorId,
+	) -> Result<Option<Self>, KeystoreError> {
+		let data = Self::payload_data(&payload, context);
+		let signature = CryptoStore::sign_with(
+			&**keystore,
+			ValidatorId::ID,
+			&key.into(),
+			&data,
+		).await?;
+
+		let signature = match signature {
+			Some(sig) => sig.try_into().map_err(|_| KeystoreError::KeyNotSupported(ValidatorId::ID))?,
+			None => return Ok(None),
+		};
+
+		Ok(Some(Self {
+			payload,
+			validator_index,
+			signature,
+			real_payload: std::marker::PhantomData,
+		}))
+	}
+
+	/// Validate the payload given the context and public key.
+	fn check_signature<H: Encode>(&self, context: &SigningContext<H>, key: &ValidatorId) -> Result<(), ()> {
+		let data = Self::payload_data(&self.payload, context);
+		if self.signature.verify(data.as_slice(), key) { Ok(()) } else { Err(()) }
+	}
+
+}
+
+impl<Payload, RealPayload> From<Signed<Payload, RealPayload>> for UncheckedSigned<Payload, RealPayload> {
+	fn from(signed: Signed<Payload, RealPayload>) -> Self {
+		signed.0
+	}
+}
+
+/// This helper trait ensures that we can encode Statement as CompactStatement,
+/// and anything as itself.
+///
+/// This resembles `parity_scale_codec::EncodeLike`, but it's distinct:
+/// EncodeLike is a marker trait which asserts at the typesystem level that
+/// one type's encoding is a valid encoding for another type. It doesn't
+/// perform any type conversion when encoding.
+///
+/// This trait, on the other hand, provides a method which can be used to
+/// simultaneously convert and encode one type as another.
+pub trait EncodeAs<T> {
+	/// Convert Self into T, then encode T.
+	///
+	/// This is useful when T is a subset of Self, reducing encoding costs;
+	/// its signature also means that we do not need to clone Self in order
+	/// to retain ownership, as we would if we were to do
+	/// `self.clone().into().encode()`.
+	fn encode_as(&self) -> Vec<u8>;
+}
+
+impl<T: Encode> EncodeAs<T> for T {
+	fn encode_as(&self) -> Vec<u8> {
+		self.encode()
+	}
+}
diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
index 9e082b8d1b6..8bd002c5e83 100644
--- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
+++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
@@ -83,7 +83,7 @@ enum ApprovalVotingMessage {
     ///
     /// The base number is typically the number of the last finalized block, but in GRANDPA it is
     /// possible for the base to be slightly higher than the last finalized block.
-    /// 
+    ///
     /// The `BlockNumber` provided is the number of the block's ancestor which is the
     /// earliest possible vote.
     ///
@@ -91,7 +91,7 @@ enum ApprovalVotingMessage {
     /// Return `None` if the input hash is unrecognized.
     ApprovedAncestor {
         target_hash: Hash,
-        base_number: BlockNumber, 
+        base_number: BlockNumber,
         rx: ResponseChannel<Option<(Hash, BlockNumber, Vec<(Hash, Vec<CandidateHash>)>)>>
     },
 }
@@ -334,7 +334,7 @@ enum CollatorProtocolMessage {
     /// Note a collator as having provided a good collation.
     NoteGoodCollation(CollatorId, SignedFullStatement),
     /// Notify a collator that its collation was seconded.
-    NotifyCollationSeconded(CollatorId, SignedFullStatement),
+    NotifyCollationSeconded(CollatorId, Hash, SignedFullStatement),
 }
 ```
 
@@ -378,7 +378,7 @@ enum DisputeCoordinatorMessage {
     /// Sign and issue local dispute votes. A value of `true` indicates validity, and `false` invalidity.
     IssueLocalStatement(SessionIndex, CandidateHash, CandidateReceipt, bool),
     /// Determine the highest undisputed block within the given chain, based on where candidates
-    /// were included. If even the base block should not be finalized due to a dispute, 
+    /// were included. If even the base block should not be finalized due to a dispute,
     /// then `None` should be returned on the channel.
     ///
     /// The block descriptions begin counting upwards from the block after the given `base_number`. The `base_number`
diff --git a/polkadot/runtime/parachains/src/inclusion.rs b/polkadot/runtime/parachains/src/inclusion.rs
index 733e3bc138e..a884656f7ad 100644
--- a/polkadot/runtime/parachains/src/inclusion.rs
+++ b/polkadot/runtime/parachains/src/inclusion.rs
@@ -23,7 +23,7 @@
 use sp_std::prelude::*;
 use primitives::v1::{
 	CandidateCommitments, CandidateDescriptor, ValidatorIndex, Id as ParaId,
-	AvailabilityBitfield as AvailabilityBitfield, SignedAvailabilityBitfields, SigningContext,
+	AvailabilityBitfield as AvailabilityBitfield, UncheckedSignedAvailabilityBitfields, SigningContext,
 	BackedCandidate, CoreIndex, GroupIndex, CommittedCandidateReceipt,
 	CandidateReceipt, HeadData, CandidateHash,
 };
@@ -236,7 +236,7 @@ impl<T: Config> Module<T> {
 	/// becoming available.
 	pub(crate) fn process_bitfields(
 		expected_bits: usize,
-		signed_bitfields: SignedAvailabilityBitfields,
+		unchecked_bitfields: UncheckedSignedAvailabilityBitfields,
 		core_lookup: impl Fn(CoreIndex) -> Option<ParaId>,
 	) -> Result<Vec<CoreIndex>, DispatchError> {
 		let validators = shared::Module::<T>::active_validator_keys();
@@ -247,12 +247,13 @@ impl<T: Config> Module<T> {
 			.map(|core_para| core_para.map(|p| (p, PendingAvailability::<T>::get(&p))))
 			.collect();
 
+
 		// do sanity checks on the bitfields:
 		// 1. no more than one bitfield per validator
 		// 2. bitfields are ascending by validator index.
 		// 3. each bitfield has exactly `expected_bits`
 		// 4. signature is valid.
-		{
+		let signed_bitfields = {
 			let occupied_bitmask: BitVec<BitOrderLsb0, u8> = assigned_paras_record.iter()
 				.map(|p| p.as_ref()
 					.map_or(false, |(_id, pending_availability)| pending_availability.is_some())
@@ -266,37 +267,42 @@ impl<T: Config> Module<T> {
 				session_index,
 			};
 
-			for signed_bitfield in &signed_bitfields {
+			let mut signed_bitfields = Vec::with_capacity(unchecked_bitfields.len());
+
+			for unchecked_bitfield in unchecked_bitfields {
 				ensure!(
-					signed_bitfield.payload().0.len() == expected_bits,
+					unchecked_bitfield.unchecked_payload().0.len() == expected_bits,
 					Error::<T>::WrongBitfieldSize,
 				);
 
 				ensure!(
-					last_index.map_or(true, |last| last < signed_bitfield.validator_index()),
+					last_index.map_or(true, |last| last < unchecked_bitfield.unchecked_validator_index()),
 					Error::<T>::BitfieldDuplicateOrUnordered,
 				);
 
 				ensure!(
-					(signed_bitfield.validator_index().0 as usize) < validators.len(),
+					(unchecked_bitfield.unchecked_validator_index().0 as usize) < validators.len(),
 					Error::<T>::ValidatorIndexOutOfBounds,
 				);
 
 				ensure!(
-					occupied_bitmask.clone() & signed_bitfield.payload().0.clone() == signed_bitfield.payload().0,
+					occupied_bitmask.clone() & unchecked_bitfield.unchecked_payload().0.clone() == unchecked_bitfield.unchecked_payload().0,
 					Error::<T>::UnoccupiedBitInBitfield,
 				);
 
-				let validator_public = &validators[signed_bitfield.validator_index().0 as usize];
+				let validator_public = &validators[unchecked_bitfield.unchecked_validator_index().0 as usize];
 
-				signed_bitfield.check_signature(
-					&signing_context,
-					validator_public,
-				).map_err(|_| Error::<T>::InvalidBitfieldSignature)?;
+				last_index = Some(unchecked_bitfield.unchecked_validator_index());
 
-				last_index = Some(signed_bitfield.validator_index());
+				signed_bitfields.push(
+					unchecked_bitfield.try_into_checked(
+						&signing_context,
+						validator_public,
+					).map_err(|_| Error::<T>::InvalidBitfieldSignature)?
+				);
 			}
-		}
+			signed_bitfields
+		};
 
 		let now = <frame_system::Pallet<T>>::block_number();
 		for signed_bitfield in signed_bitfields {
@@ -902,7 +908,7 @@ mod tests {
 
 	use std::sync::Arc;
 	use futures::executor::block_on;
-	use primitives::v0::PARACHAIN_KEY_TYPE_ID;
+	use primitives::{v0::PARACHAIN_KEY_TYPE_ID, v1::UncheckedSignedAvailabilityBitfield};
 	use primitives::v1::{BlockNumber, Hash};
 	use primitives::v1::{
 		SignedAvailabilityBitfield, CompactStatement as Statement, ValidityAttestation, CollatorId,
@@ -1257,7 +1263,7 @@ mod tests {
 
 				assert!(Inclusion::process_bitfields(
 					expected_bits(),
-					vec![signed],
+					vec![signed.into()],
 					&core_lookup,
 				).is_err());
 			}
@@ -1275,7 +1281,7 @@ mod tests {
 
 				assert!(Inclusion::process_bitfields(
 					expected_bits() + 1,
-					vec![signed],
+					vec![signed.into()],
 					&core_lookup,
 				).is_err());
 			}
@@ -1283,13 +1289,14 @@ mod tests {
 			// duplicate.
 			{
 				let bare_bitfield = default_bitfield();
-				let signed = block_on(sign_bitfield(
-					&keystore,
-					&validators[0],
-					ValidatorIndex(0),
-					bare_bitfield,
-					&signing_context,
-				));
+				let signed: UncheckedSignedAvailabilityBitfield =
+					block_on(sign_bitfield(
+						&keystore,
+						&validators[0],
+						ValidatorIndex(0),
+						bare_bitfield,
+						&signing_context,
+				)).into();
 
 				assert!(Inclusion::process_bitfields(
 					expected_bits(),
@@ -1307,7 +1314,7 @@ mod tests {
 					ValidatorIndex(0),
 					bare_bitfield.clone(),
 					&signing_context,
-				));
+				)).into();
 
 				let signed_1 = block_on(sign_bitfield(
 					&keystore,
@@ -1315,7 +1322,7 @@ mod tests {
 					ValidatorIndex(1),
 					bare_bitfield,
 					&signing_context,
-				));
+				)).into();
 
 				assert!(Inclusion::process_bitfields(
 					expected_bits(),
@@ -1338,7 +1345,7 @@ mod tests {
 
 				assert!(Inclusion::process_bitfields(
 					expected_bits(),
-					vec![signed],
+					vec![signed.into()],
 					&core_lookup,
 				).is_err());
 			}
@@ -1356,7 +1363,7 @@ mod tests {
 
 				assert!(Inclusion::process_bitfields(
 					expected_bits(),
-					vec![signed],
+					vec![signed.into()],
 					&core_lookup,
 				).is_ok());
 			}
@@ -1391,7 +1398,7 @@ mod tests {
 
 				assert!(Inclusion::process_bitfields(
 					expected_bits(),
-					vec![signed],
+					vec![signed.into()],
 					&core_lookup,
 				).is_ok());
 
@@ -1430,7 +1437,7 @@ mod tests {
 				assert_eq!(
 					Inclusion::process_bitfields(
 						expected_bits(),
-						vec![signed],
+						vec![signed.into()],
 						&core_lookup,
 					),
 					Ok(vec![]),
@@ -1549,7 +1556,7 @@ mod tests {
 					ValidatorIndex(i as _),
 					to_sign,
 					&signing_context,
-				)))
+				)).into())
 			}).collect();
 
 			assert!(Inclusion::process_bitfields(
-- 
GitLab