diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index b52b4b9e57c05331cb2efc276d947814a0bce351..b0448d3f6d0fa0bb1d8ea6aa9015ece522364d03 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -6344,6 +6344,7 @@ dependencies = [
  "polkadot-primitives",
  "sp-application-crypto",
  "sp-core",
+ "sp-keyring",
  "sp-keystore",
  "tracing-gum",
 ]
diff --git a/polkadot/node/network/bitfield-distribution/Cargo.toml b/polkadot/node/network/bitfield-distribution/Cargo.toml
index f8007e2c9f907313c374e09ac2abecfe7cf9de82..4c16ca7431a0a995e9f1728f8e64e8c6a48fa302 100644
--- a/polkadot/node/network/bitfield-distribution/Cargo.toml
+++ b/polkadot/node/network/bitfield-distribution/Cargo.toml
@@ -18,6 +18,7 @@ bitvec = { version = "1.0.0", default-features = false, features = ["alloc"] }
 sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
 sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
 maplit = "1.0.2"
 log = "0.4.13"
 env_logger = "0.9.0"
diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs
index 1aee6dc06cdbbd5b0b3a18587854823d0a8abbf0..befdec66b35993dcb3dc8fc8dc60aac515706075 100644
--- a/polkadot/node/network/bitfield-distribution/src/lib.rs
+++ b/polkadot/node/network/bitfield-distribution/src/lib.rs
@@ -27,11 +27,7 @@ use futures::{channel::oneshot, FutureExt};
 use polkadot_node_network_protocol::{
 	v1 as protocol_v1, OurView, PeerId, UnifiedReputationChange as Rep, View,
 };
-use polkadot_node_subsystem_util::{
-	self as util,
-	metrics::{self, prometheus},
-	MIN_GOSSIP_PEERS,
-};
+use polkadot_node_subsystem_util::{self as util, MIN_GOSSIP_PEERS};
 use polkadot_primitives::v2::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
 use polkadot_subsystem::{
 	jaeger, messages::*, overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, PerLeafSpan,
@@ -39,6 +35,10 @@ use polkadot_subsystem::{
 };
 use std::collections::{HashMap, HashSet};
 
+use self::metrics::Metrics;
+
+mod metrics;
+
 #[cfg(test)]
 mod tests;
 
@@ -137,19 +137,20 @@ impl PerRelayParentData {
 		}
 	}
 
-	/// Determines if that particular message signed by a validator is needed by the given peer.
+	/// Determines if that particular message signed by a
+	/// validator is needed by the given peer.
 	fn message_from_validator_needed_by_peer(
 		&self,
 		peer: &PeerId,
-		validator: &ValidatorId,
+		signed_by: &ValidatorId,
 	) -> bool {
 		self.message_sent_to_peer
 			.get(peer)
-			.map(|v| !v.contains(validator))
+			.map(|pubkeys| !pubkeys.contains(signed_by))
 			.unwrap_or(true) &&
 			self.message_received_from_peer
 				.get(peer)
-				.map(|v| !v.contains(validator))
+				.map(|pubkeys| !pubkeys.contains(signed_by))
 				.unwrap_or(true)
 	}
 }
@@ -178,21 +179,29 @@ impl BitfieldDistribution {
 		loop {
 			let message = match ctx.recv().await {
 				Ok(message) => message,
-				Err(e) => {
-					gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
+				Err(err) => {
+					gum::error!(
+						target: LOG_TARGET,
+						?err,
+						"Failed to receive a message from Overseer, exiting"
+					);
 					return
 				},
 			};
 			match message {
 				FromOverseer::Communication {
-					msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
+					msg:
+						BitfieldDistributionMessage::DistributeBitfield(
+							relay_parent,
+							signed_availability,
+						),
 				} => {
-					gum::trace!(target: LOG_TARGET, ?hash, "Processing DistributeBitfield");
+					gum::trace!(target: LOG_TARGET, ?relay_parent, "Processing DistributeBitfield");
 					handle_bitfield_distribution(
 						&mut ctx,
 						&mut state,
 						&self.metrics,
-						hash,
+						relay_parent,
 						signed_availability,
 					)
 					.await;
@@ -213,7 +222,7 @@ impl BitfieldDistribution {
 					for activated in activated {
 						let relay_parent = activated.hash;
 
-						gum::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "activated");
+						gum::trace!(target: LOG_TARGET, ?relay_parent, "activated");
 						let span = PerLeafSpan::new(activated.span, "bitfield-distribution");
 						let _span = span.child("query-basics");
 
@@ -230,18 +239,18 @@ impl BitfieldDistribution {
 									PerRelayParentData::new(signing_context, validator_set, span),
 								);
 							},
-							Err(e) => {
-								gum::warn!(target: LOG_TARGET, err = ?e, "query_basics has failed");
+							Err(err) => {
+								gum::warn!(target: LOG_TARGET, ?err, "query_basics has failed");
 							},
 							_ => {},
 						}
 					}
 				},
 				FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
-					gum::trace!(target: LOG_TARGET, hash = %hash, number = %number, "block finalized");
+					gum::trace!(target: LOG_TARGET, ?hash, %number, "block finalized");
 				},
 				FromOverseer::Signal(OverseerSignal::Conclude) => {
-					gum::trace!(target: LOG_TARGET, "Conclude");
+					gum::info!(target: LOG_TARGET, "Conclude");
 					return
 				},
 			}
@@ -250,11 +259,11 @@ impl BitfieldDistribution {
 }
 
 /// Modify the reputation of a peer based on its behavior.
-async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
+async fn modify_reputation<Context>(ctx: &mut Context, relay_parent: Hash, peer: PeerId, rep: Rep)
 where
 	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
 {
-	gum::trace!(target: LOG_TARGET, ?rep, peer_id = %peer, "reputation change");
+	gum::trace!(target: LOG_TARGET, ?relay_parent, ?rep, %peer, "reputation change");
 
 	ctx.send_message(NetworkBridgeMessage::ReportPeer(peer, rep)).await
 }
@@ -278,9 +287,9 @@ async fn handle_bitfield_distribution<Context>(
 	let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
 		job_data
 	} else {
-		gum::trace!(
+		gum::debug!(
 			target: LOG_TARGET,
-			relay_parent = %relay_parent,
+			?relay_parent,
 			"Not supposed to work on relay parent related data",
 		);
 
@@ -288,7 +297,7 @@ async fn handle_bitfield_distribution<Context>(
 	};
 	let validator_set = &job_data.validator_set;
 	if validator_set.is_empty() {
-		gum::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "validator set is empty");
+		gum::debug!(target: LOG_TARGET, ?relay_parent, "validator set is empty");
 		return
 	}
 
@@ -296,7 +305,7 @@ async fn handle_bitfield_distribution<Context>(
 	let validator = if let Some(validator) = validator_set.get(validator_index) {
 		validator.clone()
 	} else {
-		gum::trace!(target: LOG_TARGET, "Could not find a validator for index {}", validator_index);
+		gum::debug!(target: LOG_TARGET, validator_index, "Could not find a validator for index");
 		return
 	};
 
@@ -306,7 +315,7 @@ async fn handle_bitfield_distribution<Context>(
 	let peer_views = &mut state.peer_views;
 	relay_message(ctx, job_data, gossip_peers, peer_views, validator, msg).await;
 
-	metrics.on_own_bitfield_gossipped();
+	metrics.on_own_bitfield_sent();
 }
 
 /// Distribute a given valid and signature checked bitfield message.
@@ -322,13 +331,14 @@ async fn relay_message<Context>(
 ) where
 	Context: SubsystemContext<Message = BitfieldDistributionMessage>,
 {
+	let relay_parent = message.relay_parent;
 	let span = job_data.span.child("relay-msg");
 
 	let _span = span.child("provisionable");
 	// notify the overseer about a new and valid signed bitfield
 	ctx.send_message(ProvisionerMessage::ProvisionableData(
-		message.relay_parent,
-		ProvisionableData::Bitfield(message.relay_parent, message.signed_availability.clone()),
+		relay_parent,
+		ProvisionableData::Bitfield(relay_parent, message.signed_availability.clone()),
 	))
 	.await;
 
@@ -372,7 +382,7 @@ async fn relay_message<Context>(
 	if interested_peers.is_empty() {
 		gum::trace!(
 			target: LOG_TARGET,
-			relay_parent = %message.relay_parent,
+			?relay_parent,
 			"no peers are interested in gossip for relay parent",
 		);
 	} else {
@@ -398,13 +408,13 @@ async fn process_incoming_peer_message<Context>(
 	let protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) = message;
 	gum::trace!(
 		target: LOG_TARGET,
-		peer_id = %origin,
+		peer = %origin,
 		?relay_parent,
 		"received bitfield gossip from peer"
 	);
 	// we don't care about this, not part of our view.
 	if !state.view.contains(&relay_parent) {
-		modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
+		modify_reputation(ctx, relay_parent, origin, COST_NOT_IN_VIEW).await;
 		return
 	}
 
@@ -413,7 +423,7 @@ async fn process_incoming_peer_message<Context>(
 	let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
 		job_data
 	} else {
-		modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
+		modify_reputation(ctx, relay_parent, origin, COST_NOT_IN_VIEW).await;
 		return
 	};
 
@@ -423,18 +433,14 @@ async fn process_incoming_peer_message<Context>(
 		.span
 		.child("msg-received")
 		.with_peer_id(&origin)
+		.with_relay_parent(relay_parent)
 		.with_claimed_validator_index(validator_index)
 		.with_stage(jaeger::Stage::BitfieldDistribution);
 
 	let validator_set = &job_data.validator_set;
 	if validator_set.is_empty() {
-		gum::trace!(
-			target: LOG_TARGET,
-			relay_parent = %relay_parent,
-			?origin,
-			"Validator set is empty",
-		);
-		modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
+		gum::trace!(target: LOG_TARGET, ?relay_parent, ?origin, "Validator set is empty",);
+		modify_reputation(ctx, relay_parent, origin, COST_MISSING_PEER_SESSION_KEY).await;
 		return
 	}
 
@@ -444,7 +450,7 @@ async fn process_incoming_peer_message<Context>(
 	let validator = if let Some(validator) = validator_set.get(validator_index.0 as usize) {
 		validator.clone()
 	} else {
-		modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
+		modify_reputation(ctx, relay_parent, origin, COST_VALIDATOR_INDEX_INVALID).await;
 		return
 	};
 
@@ -457,13 +463,13 @@ async fn process_incoming_peer_message<Context>(
 		received_set.insert(validator.clone());
 	} else {
 		gum::trace!(target: LOG_TARGET, ?validator_index, ?origin, "Duplicate message");
-		modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
+		modify_reputation(ctx, relay_parent, origin, COST_PEER_DUPLICATE_MESSAGE).await;
 		return
 	};
 
 	let one_per_validator = &mut (job_data.one_per_validator);
 
-	// only relay_message a message of a validator once
+	// relay a message received from a validator at most _once_
 	if let Some(old_message) = one_per_validator.get(&validator) {
 		gum::trace!(
 			target: LOG_TARGET,
@@ -471,13 +477,13 @@ async fn process_incoming_peer_message<Context>(
 			"already received a message for validator",
 		);
 		if old_message.signed_availability.as_unchecked() == &bitfield {
-			modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
+			modify_reputation(ctx, relay_parent, origin, BENEFIT_VALID_MESSAGE).await;
 		}
 		return
 	}
 	let signed_availability = match bitfield.try_into_checked(&signing_context, &validator) {
 		Err(_) => {
-			modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await;
+			modify_reputation(ctx, relay_parent, origin, COST_SIGNATURE_INVALID).await;
 			return
 		},
 		Ok(bitfield) => bitfield,
@@ -491,7 +497,7 @@ async fn process_incoming_peer_message<Context>(
 	relay_message(ctx, job_data, &state.gossip_peers, &mut state.peer_views, validator, message)
 		.await;
 
-	modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await
+	modify_reputation(ctx, relay_parent, origin, BENEFIT_VALID_MESSAGE_FIRST).await
 }
 
 /// Deal with network bridge updates and track what needs to be tracked
@@ -507,32 +513,35 @@ async fn handle_network_msg<Context>(
 	let _timer = metrics.time_handle_network_msg();
 
 	match bridge_message {
-		NetworkBridgeEvent::PeerConnected(peerid, role, _) => {
-			gum::trace!(target: LOG_TARGET, ?peerid, ?role, "Peer connected");
+		NetworkBridgeEvent::PeerConnected(peer, role, _) => {
+			gum::trace!(target: LOG_TARGET, ?peer, ?role, "Peer connected");
 			// insert if none already present
-			state.peer_views.entry(peerid).or_default();
+			state.peer_views.entry(peer).or_default();
 		},
-		NetworkBridgeEvent::PeerDisconnected(peerid) => {
-			gum::trace!(target: LOG_TARGET, ?peerid, "Peer disconnected");
+		NetworkBridgeEvent::PeerDisconnected(peer) => {
+			gum::trace!(target: LOG_TARGET, ?peer, "Peer disconnected");
 			// get rid of superfluous data
-			state.peer_views.remove(&peerid);
+			state.peer_views.remove(&peer);
 		},
 		NetworkBridgeEvent::NewGossipTopology(peers) => {
 			let newly_added: Vec<PeerId> = peers.difference(&state.gossip_peers).cloned().collect();
 			state.gossip_peers = peers;
-			for peer in newly_added {
-				if let Some(view) = state.peer_views.remove(&peer) {
-					handle_peer_view_change(ctx, state, peer, view).await;
+			for new_peer in newly_added {
+				// in case we already knew that peer in the past
+				// it might have had an existing view, we use to initialize
+				// and minimize the delta on `PeerViewChange` to be sent
+				if let Some(old_view) = state.peer_views.remove(&new_peer) {
+					handle_peer_view_change(ctx, state, new_peer, old_view).await;
 				}
 			}
 		},
-		NetworkBridgeEvent::PeerViewChange(peerid, view) => {
-			gum::trace!(target: LOG_TARGET, ?peerid, ?view, "Peer view change");
-			handle_peer_view_change(ctx, state, peerid, view).await;
+		NetworkBridgeEvent::PeerViewChange(peerid, new_view) => {
+			gum::trace!(target: LOG_TARGET, ?peerid, ?new_view, "Peer view change");
+			handle_peer_view_change(ctx, state, peerid, new_view).await;
 		},
-		NetworkBridgeEvent::OurViewChange(view) => {
-			gum::trace!(target: LOG_TARGET, ?view, "Our view change");
-			handle_our_view_change(state, view);
+		NetworkBridgeEvent::OurViewChange(new_view) => {
+			gum::trace!(target: LOG_TARGET, ?new_view, "Our view change");
+			handle_our_view_change(state, new_view);
 		},
 		NetworkBridgeEvent::PeerMessage(remote, message) =>
 			process_incoming_peer_message(ctx, state, metrics, remote, message).await,
@@ -545,10 +554,12 @@ fn handle_our_view_change(state: &mut ProtocolState, view: OurView) {
 
 	for added in state.view.difference(&old_view) {
 		if !state.per_relay_parent.contains_key(&added) {
-			gum::warn!(
+			// Is guaranteed to be handled in `ActiveHead` update
+			// so this should never happen.
+			gum::error!(
 				target: LOG_TARGET,
-				added = %added,
-				"Our view contains {} but the overseer never told use we should work on this",
+				%added,
+				"Our view contains {}, but not in active heads",
 				&added
 			);
 		}
@@ -692,100 +703,16 @@ where
 	.await;
 
 	match (validators_rx.await?, session_rx.await?) {
-		(Ok(v), Ok(s)) =>
-			Ok(Some((v, SigningContext { parent_hash: relay_parent, session_index: s }))),
-		(Err(e), _) | (_, Err(e)) => {
-			gum::warn!(target: LOG_TARGET, err = ?e, "Failed to fetch basics from runtime API");
+		(Ok(validators), Ok(session_index)) =>
+			Ok(Some((validators, SigningContext { parent_hash: relay_parent, session_index }))),
+		(Err(err), _) | (_, Err(err)) => {
+			gum::warn!(
+				target: LOG_TARGET,
+				?relay_parent,
+				?err,
+				"Failed to fetch basics from runtime API"
+			);
 			Ok(None)
 		},
 	}
 }
-
-#[derive(Clone)]
-struct MetricsInner {
-	gossipped_own_availability_bitfields: prometheus::Counter<prometheus::U64>,
-	received_availability_bitfields: prometheus::Counter<prometheus::U64>,
-	active_leaves_update: prometheus::Histogram,
-	handle_bitfield_distribution: prometheus::Histogram,
-	handle_network_msg: prometheus::Histogram,
-}
-
-/// Bitfield Distribution metrics.
-#[derive(Default, Clone)]
-pub struct Metrics(Option<MetricsInner>);
-
-impl Metrics {
-	fn on_own_bitfield_gossipped(&self) {
-		if let Some(metrics) = &self.0 {
-			metrics.gossipped_own_availability_bitfields.inc();
-		}
-	}
-
-	fn on_bitfield_received(&self) {
-		if let Some(metrics) = &self.0 {
-			metrics.received_availability_bitfields.inc();
-		}
-	}
-
-	/// Provide a timer for `active_leaves_update` which observes on drop.
-	fn time_active_leaves_update(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
-	}
-
-	/// Provide a timer for `handle_bitfield_distribution` which observes on drop.
-	fn time_handle_bitfield_distribution(
-		&self,
-	) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0
-			.as_ref()
-			.map(|metrics| metrics.handle_bitfield_distribution.start_timer())
-	}
-
-	/// Provide a timer for `handle_network_msg` which observes on drop.
-	fn time_handle_network_msg(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
-		self.0.as_ref().map(|metrics| metrics.handle_network_msg.start_timer())
-	}
-}
-
-impl metrics::Metrics for Metrics {
-	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
-		let metrics = MetricsInner {
-			gossipped_own_availability_bitfields: prometheus::register(
-				prometheus::Counter::new(
-					"polkadot_parachain_gossipped_own_availabilty_bitfields_total",
-					"Number of own availability bitfields sent to other peers.",
-				)?,
-				registry,
-			)?,
-			received_availability_bitfields: prometheus::register(
-				prometheus::Counter::new(
-					"polkadot_parachain_received_availabilty_bitfields_total",
-					"Number of valid availability bitfields received from other peers.",
-				)?,
-				registry,
-			)?,
-			active_leaves_update: prometheus::register(
-				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
-					"polkadot_parachain_bitfield_distribution_active_leaves_update",
-					"Time spent within `bitfield_distribution::active_leaves_update`",
-				))?,
-				registry,
-			)?,
-			handle_bitfield_distribution: prometheus::register(
-				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
-					"polkadot_parachain_bitfield_distribution_handle_bitfield_distribution",
-					"Time spent within `bitfield_distribution::handle_bitfield_distribution`",
-				))?,
-				registry,
-			)?,
-			handle_network_msg: prometheus::register(
-				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
-					"polkadot_parachain_bitfield_distribution_handle_network_msg",
-					"Time spent within `bitfield_distribution::handle_network_msg`",
-				))?,
-				registry,
-			)?,
-		};
-		Ok(Metrics(Some(metrics)))
-	}
-}
diff --git a/polkadot/node/network/bitfield-distribution/src/metrics.rs b/polkadot/node/network/bitfield-distribution/src/metrics.rs
new file mode 100644
index 0000000000000000000000000000000000000000..2a1fae5a948f72bdc52c90b0315d9fa01fb66f2e
--- /dev/null
+++ b/polkadot/node/network/bitfield-distribution/src/metrics.rs
@@ -0,0 +1,108 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+use polkadot_node_subsystem_util::metrics::{prometheus, Metrics as MetricsTrait};
+
+#[derive(Clone)]
+struct MetricsInner {
+	sent_own_availability_bitfields: prometheus::Counter<prometheus::U64>,
+	received_availability_bitfields: prometheus::Counter<prometheus::U64>,
+	active_leaves_update: prometheus::Histogram,
+	handle_bitfield_distribution: prometheus::Histogram,
+	handle_network_msg: prometheus::Histogram,
+}
+
+/// Bitfield Distribution metrics.
+#[derive(Default, Clone)]
+pub struct Metrics(Option<MetricsInner>);
+
+impl Metrics {
+	pub(crate) fn on_own_bitfield_sent(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.sent_own_availability_bitfields.inc();
+		}
+	}
+
+	pub(crate) fn on_bitfield_received(&self) {
+		if let Some(metrics) = &self.0 {
+			metrics.received_availability_bitfields.inc();
+		}
+	}
+
+	/// Provide a timer for `active_leaves_update` which observes on drop.
+	pub(crate) fn time_active_leaves_update(
+		&self,
+	) -> Option<prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
+	}
+
+	/// Provide a timer for `handle_bitfield_distribution` which observes on drop.
+	pub(crate) fn time_handle_bitfield_distribution(
+		&self,
+	) -> Option<prometheus::prometheus::HistogramTimer> {
+		self.0
+			.as_ref()
+			.map(|metrics| metrics.handle_bitfield_distribution.start_timer())
+	}
+
+	/// Provide a timer for `handle_network_msg` which observes on drop.
+	pub(crate) fn time_handle_network_msg(&self) -> Option<prometheus::prometheus::HistogramTimer> {
+		self.0.as_ref().map(|metrics| metrics.handle_network_msg.start_timer())
+	}
+}
+
+impl MetricsTrait for Metrics {
+	fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
+		let metrics = MetricsInner {
+			sent_own_availability_bitfields: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_sent_own_availabilty_bitfields_total",
+					"Number of own availability bitfields sent to other peers.",
+				)?,
+				registry,
+			)?,
+			received_availability_bitfields: prometheus::register(
+				prometheus::Counter::new(
+					"polkadot_parachain_received_availabilty_bitfields_total",
+					"Number of valid availability bitfields received from other peers.",
+				)?,
+				registry,
+			)?,
+			active_leaves_update: prometheus::register(
+				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+					"polkadot_parachain_bitfield_distribution_active_leaves_update",
+					"Time spent within `bitfield_distribution::active_leaves_update`",
+				))?,
+				registry,
+			)?,
+			handle_bitfield_distribution: prometheus::register(
+				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+					"polkadot_parachain_bitfield_distribution_handle_bitfield_distribution",
+					"Time spent within `bitfield_distribution::handle_bitfield_distribution`",
+				))?,
+				registry,
+			)?,
+			handle_network_msg: prometheus::register(
+				prometheus::Histogram::with_opts(prometheus::HistogramOpts::new(
+					"polkadot_parachain_bitfield_distribution_handle_network_msg",
+					"Time spent within `bitfield_distribution::handle_network_msg`",
+				))?,
+				registry,
+			)?,
+		};
+		Ok(Metrics(Some(metrics)))
+	}
+}
diff --git a/polkadot/node/network/bitfield-distribution/src/tests.rs b/polkadot/node/network/bitfield-distribution/src/tests.rs
index f50a8a06d7c0474df4650778f0a9442804fbe4f7..b16d9b7789a6c8396e2273d6f946bc03170b1a12 100644
--- a/polkadot/node/network/bitfield-distribution/src/tests.rs
+++ b/polkadot/node/network/bitfield-distribution/src/tests.rs
@@ -23,9 +23,15 @@ use polkadot_node_network_protocol::{our_view, view, ObservedRole};
 use polkadot_node_subsystem_test_helpers::make_subsystem_context;
 use polkadot_node_subsystem_util::TimeoutExt;
 use polkadot_primitives::v2::{AvailabilityBitfield, Signed, ValidatorIndex};
-use polkadot_subsystem::jaeger;
+use polkadot_subsystem::{
+	jaeger,
+	jaeger::{PerLeafSpan, Span},
+};
 use sp_application_crypto::AppKey;
+use sp_core::Pair as PairT;
+use sp_keyring::Sr25519Keyring;
 use sp_keystore::{testing::KeyStore, SyncCryptoStore, SyncCryptoStorePtr};
+
 use std::{iter::FromIterator as _, sync::Arc, time::Duration};
 
 macro_rules! launch {
@@ -720,3 +726,63 @@ fn do_not_send_message_back_to_origin() {
 		);
 	});
 }
+
+#[test]
+fn need_message_works() {
+	let validators = vec![Sr25519Keyring::Alice.pair(), Sr25519Keyring::Bob.pair()];
+
+	let validator_set = Vec::from_iter(validators.iter().map(|k| ValidatorId::from(k.public())));
+
+	let signing_context = SigningContext { session_index: 1, parent_hash: Hash::repeat_byte(0x00) };
+	let mut state = PerRelayParentData::new(
+		signing_context,
+		validator_set.clone(),
+		PerLeafSpan::new(Arc::new(Span::Disabled), "foo"),
+	);
+
+	let peer_a = PeerId::random();
+	let peer_b = PeerId::random();
+	assert_ne!(peer_a, peer_b);
+
+	let pretend_send =
+		|state: &mut PerRelayParentData, dest_peer: PeerId, signed_by: &ValidatorId| -> bool {
+			if state.message_from_validator_needed_by_peer(&dest_peer, signed_by) {
+				state
+					.message_sent_to_peer
+					.entry(dest_peer)
+					.or_default()
+					.insert(signed_by.clone());
+				true
+			} else {
+				false
+			}
+		};
+
+	let pretend_receive =
+		|state: &mut PerRelayParentData, source_peer: PeerId, signed_by: &ValidatorId| {
+			state
+				.message_received_from_peer
+				.entry(source_peer)
+				.or_default()
+				.insert(signed_by.clone());
+		};
+
+	assert!(true == pretend_send(&mut state, peer_a, &validator_set[0]));
+	assert!(true == pretend_send(&mut state, peer_b, &validator_set[1]));
+	// sending the same thing must not be allowed
+	assert!(false == pretend_send(&mut state, peer_a, &validator_set[0]));
+
+	// receive by Alice
+	pretend_receive(&mut state, peer_a, &validator_set[0]);
+	// must be marked as not needed by Alice, so attempt to send to Alice must be false
+	assert!(false == pretend_send(&mut state, peer_a, &validator_set[0]));
+	// but ok for Bob
+	assert!(false == pretend_send(&mut state, peer_b, &validator_set[1]));
+
+	// receive by Bob
+	pretend_receive(&mut state, peer_a, &validator_set[0]);
+	// not ok for Alice
+	assert!(false == pretend_send(&mut state, peer_a, &validator_set[0]));
+	// also not ok for Bob
+	assert!(false == pretend_send(&mut state, peer_b, &validator_set[1]));
+}
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 73a5e3213178ffe80f61906a5c1ebceff2c64201..89067dbcef22e48cffd07ae8956cab1335e4f3dd 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -577,7 +577,7 @@ where
 	SupportsParachains: HeadSupportsParachains,
 	S: SpawnNamed,
 {
-	/// Stop the overseer.
+	/// Stop the `Overseer`.
 	async fn stop(mut self) {
 		let _ = self.wait_terminate(OverseerSignal::Conclude, Duration::from_secs(1_u64)).await;
 	}
diff --git a/polkadot/node/subsystem-util/src/tests.rs b/polkadot/node/subsystem-util/src/tests.rs
index eb30c177f7773f942b6caa49b0dc0ac0989b7801..c7c6cbf6d80c0fd3be3b66d005a5b59391fb94fe 100644
--- a/polkadot/node/subsystem-util/src/tests.rs
+++ b/polkadot/node/subsystem-util/src/tests.rs
@@ -245,3 +245,14 @@ fn tick_tack_metronome() {
 		)
 	});
 }
+
+#[test]
+fn subset_generation_check() {
+	let values = (0_u8..=25).collect::<Vec<_>>();
+	// 12 even numbers exist
+	let mut chosen = choose_random_subset::<u8, _>(|v| v & 0x01 == 0, values, 12);
+	chosen.sort();
+	for (idx, v) in dbg!(chosen).into_iter().enumerate() {
+		assert_eq!(v as usize, idx * 2);
+	}
+}