diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index 325d602c8ef2d188b4806476300cc28258a851b5..7b36a7d08a475ddb192bcde198e3fef886d4b1d4 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -6463,6 +6463,7 @@ dependencies = [
 name = "sc-network-gossip"
 version = "0.8.0-dev"
 dependencies = [
+ "async-std",
  "futures 0.3.4",
  "futures-timer 3.0.2",
  "libp2p",
diff --git a/substrate/client/network-gossip/Cargo.toml b/substrate/client/network-gossip/Cargo.toml
index a4b3f72b043b9c243f687145c75cb439c8743ed6..c6714375fe3fdad55ca7b5a8662baf3227231ffc 100644
--- a/substrate/client/network-gossip/Cargo.toml
+++ b/substrate/client/network-gossip/Cargo.toml
@@ -25,4 +25,5 @@ sp-utils = { version = "2.0.0-dev", path = "../../primitives/utils" }
 wasm-timer = "0.2"
 
 [dev-dependencies]
+async-std = "1.5"
 substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-utils/runtime/client" }
diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs
index b3bfe606ba0c9f9841f3c567b1ffba7fa4f877dd..a15195111e800d5b42ac91a4521e2a9437eed8e3 100644
--- a/substrate/client/network-gossip/src/bridge.rs
+++ b/substrate/client/network-gossip/src/bridge.rs
@@ -21,9 +21,16 @@ use sc_network::{Event, ReputationChange};
 
 use futures::prelude::*;
 use libp2p::PeerId;
+use log::trace;
 use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
-use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}};
-use sp_utils::mpsc::TracingUnboundedReceiver;
+use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
+use std::{
+	borrow::Cow,
+	collections::{HashMap, hash_map::Entry},
+	pin::Pin,
+	sync::Arc,
+	task::{Context, Poll},
+};
 
 /// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on
 /// top of it.
@@ -31,8 +38,12 @@ pub struct GossipEngine<B: BlockT> {
 	state_machine: ConsensusGossip<B>,
 	network: Box<dyn Network<B> + Send>,
 	periodic_maintenance_interval: futures_timer::Delay,
-	network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
 	engine_id: ConsensusEngineId,
+
+	/// Incoming events from the network.
+	network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>,
+	/// Outgoing events to the consumer.
+	message_sinks: HashMap<B::Hash, Vec<TracingUnboundedSender<TopicNotification>>>,
 }
 
 impl<B: BlockT> Unpin for GossipEngine<B> {}
@@ -54,8 +65,10 @@ impl<B: BlockT> GossipEngine<B> {
 			state_machine: ConsensusGossip::new(validator, engine_id),
 			network: Box::new(network),
 			periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL),
-			network_event_stream,
 			engine_id,
+
+			network_event_stream,
+			message_sinks: HashMap::new(),
 		}
 	}
 
@@ -85,7 +98,15 @@ impl<B: BlockT> GossipEngine<B> {
 	pub fn messages_for(&mut self, topic: B::Hash)
 		-> TracingUnboundedReceiver<TopicNotification>
 	{
-		self.state_machine.messages_for(topic)
+		let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for");
+
+		for notification in self.state_machine.messages_for(topic) {
+			tx.unbounded_send(notification).expect("receiver known to be live; qed");
+		}
+
+		self.message_sinks.entry(topic).or_default().push(tx);
+
+		rx
 	}
 
 	/// Send all messages with given topic to a peer.
@@ -147,16 +168,40 @@ impl<B: BlockT> Future for GossipEngine<B> {
 						this.state_machine.peer_disconnected(&mut *this.network, remote);
 					},
 					Event::NotificationsReceived { remote, messages } => {
-						let engine_id = this.engine_id.clone();
-						this.state_machine.on_incoming(
+						let messages = messages.into_iter().filter_map(|(engine, data)| {
+							if engine == this.engine_id {
+								Some(data.to_vec())
+							} else {
+								None
+							}
+						}).collect();
+
+						let to_forward = this.state_machine.on_incoming(
 							&mut *this.network,
 							remote,
-							messages.into_iter()
-								.filter_map(|(engine, data)| if engine == engine_id {
-									Some(data.to_vec())
-								} else { None })
-								.collect()
+							messages,
 						);
+
+						for (topic, notification) in to_forward.into_iter() {
+							if let Entry::Occupied(mut entry) = this.message_sinks.entry(topic) {
+								trace!(
+									target: "gossip",
+									"Pushing consensus message to sinks for {}.", topic,
+								);
+								entry.get_mut().retain(move |sink| {
+									if let Err(e) = sink.unbounded_send(notification.clone()) {
+										trace!(
+											target: "gossip",
+											"Error broadcasting message notification: {:?}", e,
+										);
+									}
+									!sink.is_closed()
+								});
+								if entry.get().is_empty() {
+									entry.remove_entry();
+								}
+							}
+						}
 					},
 					Event::Dht(_) => {}
 				}
@@ -169,6 +214,11 @@ impl<B: BlockT> Future for GossipEngine<B> {
 		while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) {
 			this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL);
 			this.state_machine.tick(&mut *this.network);
+
+			this.message_sinks.retain(|_, sinks| {
+				sinks.retain(|sink| !sink.is_closed());
+				!sinks.is_empty()
+			});
 		}
 
 		Poll::Pending
@@ -177,23 +227,34 @@ impl<B: BlockT> Future for GossipEngine<B> {
 
 #[cfg(test)]
 mod tests {
-	use super::*;
+	use async_std::task::spawn;
 	use crate::{ValidationResult, ValidatorContext};
+	use futures::{channel::mpsc::{channel, Sender}, executor::block_on_stream};
+	use sc_network::ObservedRole;
+	use sp_runtime::{testing::H256, traits::{Block as BlockT}};
+	use std::sync::{Arc, Mutex};
 	use substrate_test_runtime_client::runtime::Block;
+	use super::*;
 
-	struct TestNetwork {}
+	#[derive(Clone, Default)]
+	struct TestNetwork {
+		inner: Arc<Mutex<TestNetworkInner>>,
+	}
+
+	#[derive(Clone, Default)]
+	struct TestNetworkInner {
+		event_senders: Vec<Sender<Event>>,
+	}
 
-	impl<B: BlockT> Network<B> for Arc<TestNetwork> {
+	impl<B: BlockT> Network<B> for TestNetwork {
 		fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
-			let (_tx, rx) = futures::channel::mpsc::channel(0);
+			let (tx, rx) = channel(100);
+			self.inner.lock().unwrap().event_senders.push(tx);
 
-			// Return rx and drop tx. Thus the given channel will yield `Poll::Ready(None)` on first
-			// poll.
 			Box::pin(rx)
 		}
 
 		fn report_peer(&self, _: PeerId, _: ReputationChange) {
-			unimplemented!();
 		}
 
 		fn disconnect_peer(&self, _: PeerId) {
@@ -211,16 +272,15 @@ mod tests {
 		}
 	}
 
-	struct TestValidator {}
-
-	impl<B: BlockT> Validator<B> for TestValidator {
+	struct AllowAll;
+	impl Validator<Block> for AllowAll {
 		fn validate(
 			&self,
-			_: &mut dyn ValidatorContext<B>,
-			_: &PeerId,
-			_: &[u8]
-		) -> ValidationResult<B::Hash> {
-			unimplemented!();
+			_context: &mut dyn ValidatorContext<Block>,
+			_sender: &PeerId,
+			_data: &[u8],
+		) -> ValidationResult<H256> {
+			ValidationResult::ProcessAndKeep(H256::default())
 		}
 	}
 
@@ -230,13 +290,17 @@ mod tests {
 	/// See https://github.com/paritytech/substrate/issues/5000 for details.
 	#[test]
 	fn returns_when_network_event_stream_closes() {
+		let network = TestNetwork::default();
 		let mut gossip_engine = GossipEngine::<Block>::new(
-			Arc::new(TestNetwork{}),
+			network.clone(),
 			[1, 2, 3, 4],
 			"my_protocol".as_bytes(),
-			Arc::new(TestValidator{}),
+			Arc::new(AllowAll{}),
 		);
 
+		// Drop network event stream sender side.
+		drop(network.inner.lock().unwrap().event_senders.pop());
+
 		futures::executor::block_on(futures::future::poll_fn(move |ctx| {
 			if let Poll::Pending = gossip_engine.poll_unpin(ctx) {
 				panic!(
@@ -247,4 +311,72 @@ mod tests {
 			Poll::Ready(())
 		}))
 	}
+
+	#[test]
+	fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() {
+		let topic = H256::default();
+		let engine_id = [1, 2, 3, 4];
+		let remote_peer = PeerId::random();
+		let network = TestNetwork::default();
+
+		let mut gossip_engine = GossipEngine::<Block>::new(
+			network.clone(),
+			engine_id.clone(),
+			"my_protocol".as_bytes(),
+			Arc::new(AllowAll{}),
+		);
+
+		let mut event_sender = network.inner.lock()
+			.unwrap()
+			.event_senders
+			.pop()
+			.unwrap();
+
+		// Register the remote peer.
+		event_sender.start_send(
+			Event::NotificationStreamOpened {
+				remote: remote_peer.clone(),
+				engine_id: engine_id.clone(),
+				role: ObservedRole::Authority,
+			}
+		).unwrap();
+
+		let messages = vec![vec![1], vec![2]];
+		let events = messages.iter().cloned().map(|m| {
+			Event::NotificationsReceived {
+				remote: remote_peer.clone(),
+				messages: vec![(engine_id, m.into())]
+			}
+		}).collect::<Vec<_>>();
+
+		// Send first event before subscribing.
+		event_sender.start_send(events[0].clone()).unwrap();
+
+		let mut subscribers = vec![];
+		for _ in 0..2 {
+			subscribers.push(gossip_engine.messages_for(topic));
+		}
+
+		// Send second event after subscribing.
+		event_sender.start_send(events[1].clone()).unwrap();
+
+		spawn(gossip_engine);
+
+		let mut subscribers = subscribers.into_iter()
+			.map(|s| block_on_stream(s))
+			.collect::<Vec<_>>();
+
+		// Expect each subscriber to receive both events.
+		for message in messages {
+			for subscriber in subscribers.iter_mut() {
+				assert_eq!(
+					subscriber.next(),
+					Some(TopicNotification {
+						message: message.clone(),
+						sender: Some(remote_peer.clone()),
+					}),
+				);
+			}
+		}
+	}
 }
diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs
index d93003fcfb44ce00d08ae7b12bcbfc3544056e2d..53b5b98245a4181aa331f2d2fd749939b2b0b4fc 100644
--- a/substrate/client/network-gossip/src/state_machine.rs
+++ b/substrate/client/network-gossip/src/state_machine.rs
@@ -16,7 +16,7 @@
 
 use crate::{Network, MessageIntent, Validator, ValidatorContext, ValidationResult};
 
-use std::collections::{HashMap, HashSet, hash_map::Entry};
+use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 use std::iter;
 use std::time;
@@ -24,7 +24,6 @@ use log::trace;
 use lru::LruCache;
 use libp2p::PeerId;
 use sp_runtime::traits::{Block as BlockT, Hash, HashFor};
-use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver};
 use sp_runtime::ConsensusEngineId;
 use sc_network::ObservedRole;
 use wasm_timer::Instant;
@@ -51,7 +50,7 @@ struct PeerConsensus<H> {
 }
 
 /// Topic stream message with sender.
-#[derive(Debug, Eq, PartialEq)]
+#[derive(Clone, Debug, Eq, PartialEq)]
 pub struct TopicNotification {
 	/// Message data.
 	pub message: Vec<u8>,
@@ -147,7 +146,6 @@ fn propagate<'a, B: BlockT, I>(
 /// Consensus network protocol handler. Manages statements and candidate requests.
 pub struct ConsensusGossip<B: BlockT> {
 	peers: HashMap<PeerId, PeerConsensus<B::Hash>>,
-	live_message_sinks: HashMap<B::Hash, Vec<TracingUnboundedSender<TopicNotification>>>,
 	messages: Vec<MessageEntry<B>>,
 	known_messages: LruCache<B::Hash, ()>,
 	engine_id: ConsensusEngineId,
@@ -160,7 +158,6 @@ impl<B: BlockT> ConsensusGossip<B> {
 	pub fn new(validator: Arc<dyn Validator<B>>, engine_id: ConsensusEngineId) -> Self {
 		ConsensusGossip {
 			peers: HashMap::new(),
-			live_message_sinks: HashMap::new(),
 			messages: Default::default(),
 			known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE),
 			engine_id,
@@ -256,11 +253,6 @@ impl<B: BlockT> ConsensusGossip<B> {
 	/// Prune old or no longer relevant consensus messages. Provide a predicate
 	/// for pruning, which returns `false` when the items with a given topic should be pruned.
 	pub fn collect_garbage(&mut self) {
-		self.live_message_sinks.retain(|_, sinks| {
-			sinks.retain(|sink| !sink.is_closed());
-			!sinks.is_empty()
-		});
-
 		let known_messages = &mut self.known_messages;
 		let before = self.messages.len();
 
@@ -278,33 +270,24 @@ impl<B: BlockT> ConsensusGossip<B> {
 		}
 	}
 
-	/// Get data of valid, incoming messages for a topic (but might have expired meanwhile)
-	pub fn messages_for(&mut self, topic: B::Hash)
-		-> TracingUnboundedReceiver<TopicNotification>
-	{
-		let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for");
-		for entry in self.messages.iter_mut().filter(|e| e.topic == topic) {
-			tx.unbounded_send(TopicNotification {
-					message: entry.message.clone(),
-					sender: entry.sender.clone(),
-				})
-				.expect("receiver known to be live; qed");
-		}
-
-		self.live_message_sinks.entry(topic).or_default().push(tx);
-
-		rx
+	/// Get valid messages received in the past for a topic (might have expired meanwhile).
+	pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ {
+		self.messages.iter().filter(move |e| e.topic == topic).map(|entry| TopicNotification {
+			message: entry.message.clone(),
+			sender: entry.sender.clone(),
+		})
 	}
 
-	/// Handle an incoming message for topic by who via protocol. Discard message if topic already
-	/// known, the message is old, its source peers isn't a registered peer or the connection to
-	/// them is broken.
+	/// Register incoming messages and return the ones that are new and valid (according to a gossip
+	/// validator) and should thus be forwarded to the upper layers.
 	pub fn on_incoming(
 		&mut self,
 		network: &mut dyn Network<B>,
 		who: PeerId,
 		messages: Vec<Vec<u8>>,
-	) {
+	) -> Vec<(B::Hash, TopicNotification)> {
+		let mut to_forward = vec![];
+
 		if !messages.is_empty() {
 			trace!(target: "gossip", "Received {} messages from peer {}", messages.len(), who);
 		}
@@ -335,23 +318,19 @@ impl<B: BlockT> ConsensusGossip<B> {
 				network.report_peer(who.clone(), rep::GOSSIP_SUCCESS);
 				if let Some(ref mut peer) = self.peers.get_mut(&who) {
 					peer.known_messages.insert(message_hash);
-					if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) {
-						trace!(target: "gossip", "Pushing consensus message to sinks for {}.", topic);
-						entry.get_mut().retain(|sink| {
-							if let Err(e) = sink.unbounded_send(TopicNotification {
-								message: message.clone(),
-								sender: Some(who.clone())
-							}) {
-								trace!(target: "gossip", "Error broadcasting message notification: {:?}", e);
-							}
-							!sink.is_closed()
-						});
-						if entry.get().is_empty() {
-							entry.remove_entry();
-						}
-					}
+
+					to_forward.push((topic, TopicNotification {
+						message: message.clone(),
+						sender: Some(who.clone())
+					}));
+
 					if keep {
-						self.register_message_hashed(message_hash, topic, message, Some(who.clone()));
+						self.register_message_hashed(
+							message_hash,
+							topic,
+							message,
+							Some(who.clone()),
+						);
 					}
 				} else {
 					trace!(target:"gossip", "Ignored statement from unregistered peer {}", who);
@@ -361,6 +340,8 @@ impl<B: BlockT> ConsensusGossip<B> {
 				trace!(target:"gossip", "Discard message from peer {}", who);
 			}
 		}
+
+		to_forward
 	}
 
 	/// Send all messages with given topic to a peer.
@@ -437,7 +418,6 @@ impl<B: BlockT> ConsensusGossip<B> {
 mod tests {
 	use std::sync::Arc;
 	use sp_runtime::testing::{H256, Block as RawBlock, ExtrinsicWrapper};
-	use futures::executor::block_on_stream;
 
 	use super::*;
 
@@ -518,16 +498,18 @@ mod tests {
 	}
 
 	#[test]
-	fn message_stream_include_those_sent_before_asking_for_stream() {
+	fn message_stream_include_those_sent_before_asking() {
 		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
 
+		// Register message.
 		let message = vec![4, 5, 6];
 		let topic = HashFor::<Block>::hash(&[1,2,3]);
-
 		consensus.register_message(topic, message.clone());
-		let mut stream = block_on_stream(consensus.messages_for(topic));
 
-		assert_eq!(stream.next(), Some(TopicNotification { message: message, sender: None }));
+		assert_eq!(
+			consensus.messages_for(topic).next(),
+			Some(TopicNotification { message: message, sender: None }),
+		);
 	}
 
 	#[test]
@@ -544,22 +526,6 @@ mod tests {
 		assert_eq!(consensus.messages.len(), 2);
 	}
 
-	#[test]
-	fn can_keep_multiple_subscribers_per_topic() {
-		let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]);
-
-		let message = vec![4, 5, 6];
-		let topic = HashFor::<Block>::hash(&[1, 2, 3]);
-
-		consensus.register_message(topic, message.clone());
-
-		let mut stream1 = block_on_stream(consensus.messages_for(topic));
-		let mut stream2 = block_on_stream(consensus.messages_for(topic));
-
-		assert_eq!(stream1.next(), Some(TopicNotification { message: message.clone(), sender: None }));
-		assert_eq!(stream2.next(), Some(TopicNotification { message, sender: None }));
-	}
-
 	#[test]
 	fn peer_is_removed_on_disconnect() {
 		struct TestNetwork;