diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index b1a7eba99ddf1b47b8af99d4cceb2a2034333702..ed39f79eabf357578d1fe1b164c3765c0e38f98e 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -256,6 +256,17 @@ dependencies = [
  "webpki-roots 0.19.0",
 ]
 
+[[package]]
+name = "async-trait"
+version = "0.1.36"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a265e3abeffdce30b2e26b7a11b222fe37c6067404001b434101457d0385eb92"
+dependencies = [
+ "proc-macro2 1.0.18",
+ "quote 1.0.7",
+ "syn 1.0.31",
+]
+
 [[package]]
 name = "atty"
 version = "0.2.14"
@@ -4348,6 +4359,25 @@ dependencies = [
  "wasm-timer",
 ]
 
+[[package]]
+name = "polkadot-network-bridge"
+version = "0.1.0"
+dependencies = [
+ "assert_matches",
+ "futures 0.3.5",
+ "futures-timer 3.0.2",
+ "log 0.4.8",
+ "parity-scale-codec",
+ "parking_lot 0.10.2",
+ "polkadot-node-primitives",
+ "polkadot-node-subsystem",
+ "polkadot-primitives",
+ "polkadot-subsystem-test-helpers",
+ "sc-network",
+ "sp-runtime",
+ "streamunordered",
+]
+
 [[package]]
 name = "polkadot-network-test"
 version = "0.8.13"
@@ -4370,36 +4400,39 @@ dependencies = [
 ]
 
 [[package]]
-name = "polkadot-node-messages"
+name = "polkadot-node-primitives"
 version = "0.1.0"
 dependencies = [
- "futures 0.3.5",
- "polkadot-node-primitives",
+ "async-trait",
+ "parity-scale-codec",
  "polkadot-primitives",
  "polkadot-statement-table",
- "sc-network",
+ "sp-runtime",
 ]
 
 [[package]]
-name = "polkadot-node-primitives"
+name = "polkadot-node-subsystem"
 version = "0.1.0"
 dependencies = [
- "parity-scale-codec",
+ "async-trait",
+ "futures 0.3.5",
+ "polkadot-node-primitives",
  "polkadot-primitives",
  "polkadot-statement-table",
- "sp-runtime",
+ "sc-network",
 ]
 
 [[package]]
 name = "polkadot-overseer"
 version = "0.1.0"
 dependencies = [
+ "async-trait",
  "femme",
  "futures 0.3.5",
  "futures-timer 3.0.2",
  "kv-log-macro",
  "log 0.4.8",
- "polkadot-node-messages",
+ "polkadot-node-subsystem",
  "polkadot-primitives",
  "sc-client-api",
  "streamunordered",
@@ -4711,6 +4744,7 @@ dependencies = [
  "parity-scale-codec",
  "parking_lot 0.9.0",
  "polkadot-network",
+ "polkadot-node-subsystem",
  "polkadot-overseer",
  "polkadot-primitives",
  "polkadot-rpc",
@@ -4760,6 +4794,16 @@ dependencies = [
  "sp-core",
 ]
 
+[[package]]
+name = "polkadot-subsystem-test-helpers"
+version = "0.1.0"
+dependencies = [
+ "async-trait",
+ "futures 0.3.5",
+ "parking_lot 0.10.2",
+ "polkadot-node-subsystem",
+]
+
 [[package]]
 name = "polkadot-test-runtime"
 version = "0.8.13"
diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml
index 34f9f14e4e0504e6f3a616b39a40d3f63e275000..4a72ae8dc6f972bb309e2da393a6b548b583b0eb 100644
--- a/polkadot/Cargo.toml
+++ b/polkadot/Cargo.toml
@@ -42,10 +42,12 @@ members = [
 	"service",
 	"validation",
 
-	"node/messages",
+	"node/network/bridge",
 	"node/overseer",
 	"node/primitives",
 	"node/service",
+	"node/subsystem",
+	"node/test-helpers/subsystem",
 
 	"parachain/test-parachains",
 	"parachain/test-parachains/adder",
diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..4f6c8631e2f9118ca541ebfdd02596064ae29d37
--- /dev/null
+++ b/polkadot/node/network/bridge/Cargo.toml
@@ -0,0 +1,22 @@
+[package]
+name = "polkadot-network-bridge"
+version = "0.1.0"
+authors = ["Parity Technologies <admin@parity.io>"]
+edition = "2018"
+
+[dependencies]
+futures = "0.3.5"
+log = "0.4.8"
+futures-timer = "3.0.2"
+streamunordered = "0.5.1"
+polkadot-primitives = { path = "../../../primitives" }
+node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
+parity-scale-codec = "1.3.0"
+sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
+polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
+
+[dev-dependencies]
+parking_lot = "0.10.0"
+subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
+assert_matches = "1.3.0"
diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..aef1632a9439d3745ade82c0bca115e2f66f42b9
--- /dev/null
+++ b/polkadot/node/network/bridge/src/lib.rs
@@ -0,0 +1,912 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! The Network Bridge Subsystem - protocol multiplexer for Polkadot.
+
+use parity_scale_codec::{Encode, Decode};
+use futures::prelude::*;
+use futures::future::BoxFuture;
+use futures::stream::BoxStream;
+
+use sc_network::{
+	ObservedRole, ReputationChange, PeerId,
+	Event as NetworkEvent,
+};
+use sp_runtime::ConsensusEngineId;
+
+use polkadot_subsystem::{
+	FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
+	SubsystemResult,
+};
+use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages};
+use node_primitives::{ProtocolId, View};
+use polkadot_primitives::{Block, Hash};
+
+use std::collections::btree_map::{BTreeMap, Entry as BEntry};
+use std::collections::hash_map::{HashMap, Entry as HEntry};
+use std::pin::Pin;
+use std::sync::Arc;
+
+/// The maximum amount of heads a peer is allowed to have in their view at any time.
+///
+/// We use the same limit to compute the view sent to peers locally.
+const MAX_VIEW_HEADS: usize = 5;
+
+/// The engine ID of the polkadot network protocol.
+pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2";
+/// The protocol name.
+pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2";
+
+const MALFORMED_MESSAGE_COST: ReputationChange
+	= ReputationChange::new(-500, "Malformed Network-bridge message");
+const UNKNOWN_PROTO_COST: ReputationChange
+	= ReputationChange::new(-50, "Message sent to unknown protocol");
+const MALFORMED_VIEW_COST: ReputationChange
+	= ReputationChange::new(-500, "Malformed view");
+
+/// Messages received on the network.
+#[derive(Debug, Encode, Decode, Clone)]
+pub enum WireMessage {
+	/// A message from a peer on a specific protocol.
+	#[codec(index = "1")]
+	ProtocolMessage(ProtocolId, Vec<u8>),
+	/// A view update from a peer.
+	#[codec(index = "2")]
+	ViewUpdate(View),
+}
+
+/// Information about the notifications protocol. Should be used during network configuration
+/// or shortly after startup to register the protocol with the network service.
+pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'static, [u8]>) {
+	(POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME.into())
+}
+
+/// An action to be carried out by the network.
+#[derive(PartialEq)]
+pub enum NetworkAction {
+	/// Note a change in reputation for a peer.
+	ReputationChange(PeerId, ReputationChange),
+	/// Write a notification to a given peer.
+	WriteNotification(PeerId, Vec<u8>),
+}
+
+/// An abstraction over networking for the purposes of this subsystem.
+pub trait Network: Send + 'static {
+	/// Get a stream of all events occurring on the network. This may include events unrelated
+	/// to the Polkadot protocol - the user of this function should filter only for events related
+	/// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID).
+	fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
+
+	/// Get access to an underlying sink for all network actions.
+	fn action_sink<'a>(&'a mut self) -> Pin<
+		Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>
+	>;
+
+	/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
+	fn report_peer(&mut self, who: PeerId, cost_benefit: ReputationChange)
+		-> BoxFuture<SubsystemResult<()>>
+	{
+		async move {
+			self.action_sink().send(NetworkAction::ReputationChange(who, cost_benefit)).await
+		}.boxed()
+	}
+
+	/// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic.
+	fn write_notification(&mut self, who: PeerId, message: Vec<u8>)
+		-> BoxFuture<SubsystemResult<()>>
+	{
+		async move {
+			self.action_sink().send(NetworkAction::WriteNotification(who, message)).await
+		}.boxed()
+	}
+}
+
+impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
+	fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
+		sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
+	}
+
+	fn action_sink<'a>(&'a mut self)
+		-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
+	{
+		use futures::task::{Poll, Context};
+
+		// wrapper around a NetworkService to make it act like a sink.
+		struct ActionSink<'b>(&'b sc_network::NetworkService<Block, Hash>);
+
+		impl<'b> Sink<NetworkAction> for ActionSink<'b> {
+			type Error = SubsystemError;
+
+			fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
+				Poll::Ready(Ok(()))
+			}
+
+			fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
+				match action {
+					NetworkAction::ReputationChange(peer, cost_benefit) => self.0.report_peer(
+						peer,
+						cost_benefit,
+					),
+					NetworkAction::WriteNotification(peer, message) => self.0.write_notification(
+						peer,
+						POLKADOT_ENGINE_ID,
+						message,
+					),
+				}
+
+				Ok(())
+			}
+
+			fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
+				Poll::Ready(Ok(()))
+			}
+
+			fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
+				Poll::Ready(Ok(()))
+			}
+		}
+
+		Box::pin(ActionSink(&**self))
+	}
+}
+
+/// The network bridge subsystem.
+pub struct NetworkBridge<N>(Option<N>);
+
+impl<N> NetworkBridge<N> {
+	/// Create a new network bridge subsystem with underlying network service.
+	///
+	/// This assumes that the network service has had the notifications protocol for the network
+	/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
+	pub fn new(net_service: N) -> Self {
+		NetworkBridge(Some(net_service))
+	}
+}
+
+impl<Net, Context> Subsystem<Context> for NetworkBridge<Net>
+	where
+		Net: Network,
+		Context: SubsystemContext<Message=NetworkBridgeMessage>,
+{
+	fn start(&mut self, mut ctx: Context) -> SpawnedSubsystem {
+		SpawnedSubsystem(match self.0.take() {
+			None => async move { for _ in ctx.recv().await { } }.boxed(),
+			Some(net) => {
+				// Swallow error because failure is fatal to the node and we log with more precision
+				// within `run_network`.
+				run_network(net, ctx).map(|_| ()).boxed()
+			}
+		})
+
+
+
+	}
+}
+
+struct PeerData {
+	/// Latest view sent by the peer.
+	view: View,
+	/// The role of the peer.
+	role: ObservedRole,
+}
+
+#[derive(Debug)]
+enum Action {
+	RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
+	SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>),
+	ReportPeer(PeerId, ReputationChange),
+	StartWork(Hash),
+	StopWork(Hash),
+
+	PeerConnected(PeerId, ObservedRole),
+	PeerDisconnected(PeerId),
+	PeerMessages(PeerId, Vec<WireMessage>),
+
+	Abort,
+}
+
+fn action_from_overseer_message(
+	res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
+) -> Action {
+	match res {
+		Ok(FromOverseer::Signal(OverseerSignal::StartWork(relay_parent)))
+			=> Action::StartWork(relay_parent),
+		Ok(FromOverseer::Signal(OverseerSignal::StopWork(relay_parent)))
+			=> Action::StopWork(relay_parent),
+		Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
+		Ok(FromOverseer::Communication { msg }) => match msg {
+			NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer)
+				=>  Action::RegisterEventProducer(protocol_id, message_producer),
+			NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
+			NetworkBridgeMessage::SendMessage(peers, protocol, message)
+				=> Action::SendMessage(peers, protocol, message),
+		},
+		Err(e) => {
+			log::warn!("Shutting down Network Bridge due to error {:?}", e);
+			Action::Abort
+		}
+	}
+}
+
+fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> {
+	match event {
+		None => {
+			log::info!("Shutting down Network Bridge: underlying event stream concluded");
+			Some(Action::Abort)
+		}
+		Some(NetworkEvent::Dht(_)) => None,
+		Some(NetworkEvent::NotificationStreamOpened { remote, engine_id, role }) => {
+			if engine_id == POLKADOT_ENGINE_ID {
+				Some(Action::PeerConnected(remote, role))
+			} else {
+				None
+			}
+		}
+		Some(NetworkEvent::NotificationStreamClosed { remote, engine_id }) => {
+			if engine_id == POLKADOT_ENGINE_ID {
+				Some(Action::PeerDisconnected(remote))
+			} else {
+				None
+			}
+		}
+		Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
+			let v: Result<Vec<_>, _> = messages.iter()
+				.filter(|(engine_id, _)| engine_id == &POLKADOT_ENGINE_ID)
+				.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
+				.collect();
+
+			match v {
+				Err(_) => Some(Action::ReportPeer(remote, MALFORMED_MESSAGE_COST)),
+				Ok(v) => if v.is_empty() {
+					None
+				} else {
+					Some(Action::PeerMessages(remote, v))
+				}
+			}
+		}
+	}
+}
+
+fn construct_view(live_heads: &[Hash]) -> View {
+	View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect())
+}
+
+async fn dispatch_update_to_all(
+	update: NetworkBridgeEvent,
+	event_producers: impl IntoIterator<Item=&fn(NetworkBridgeEvent) -> AllMessages>,
+	ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>,
+) -> polkadot_subsystem::SubsystemResult<()> {
+	// collect messages here to avoid the borrow lasting across await boundary.
+	let messages: Vec<_> = event_producers.into_iter()
+		.map(|producer| producer(update.clone()))
+		.collect();
+
+	ctx.send_messages(messages).await
+}
+
+async fn update_view(
+	peers: &HashMap<PeerId, PeerData>,
+	live_heads: &[Hash],
+	net: &mut impl Network,
+	local_view: &mut View,
+) -> SubsystemResult<Option<NetworkBridgeEvent>> {
+	let new_view = construct_view(live_heads);
+	if *local_view == new_view { return Ok(None) }
+	*local_view = new_view.clone();
+
+	let message = WireMessage::ViewUpdate(new_view.clone()).encode();
+
+	let notifications = peers.keys().cloned()
+		.map(move |peer| Ok(NetworkAction::WriteNotification(peer, message.clone())));
+
+	net.action_sink().send_all(&mut stream::iter(notifications)).await?;
+
+	Ok(Some(NetworkBridgeEvent::OurViewChange(local_view.clone())))
+}
+
+async fn run_network<N: Network>(
+	mut net: N,
+	mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
+) -> SubsystemResult<()> {
+	let mut event_stream = net.event_stream().fuse();
+
+	// Most recent heads are at the back.
+	let mut live_heads = Vec::with_capacity(MAX_VIEW_HEADS);
+	let mut local_view = View(Vec::new());
+
+	let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
+	let mut event_producers = BTreeMap::new();
+
+	loop {
+		let action = {
+			let subsystem_next = ctx.recv().fuse();
+			let mut net_event_next = event_stream.next().fuse();
+			futures::pin_mut!(subsystem_next);
+
+			let action = futures::select! {
+				subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)),
+				net_event = net_event_next => action_from_network_message(net_event),
+			};
+
+			match action {
+				Some(a) => a,
+				None => continue,
+			}
+		};
+
+		match action {
+			Action::RegisterEventProducer(protocol_id, event_producer) => {
+				// insert only if none present.
+				if let BEntry::Vacant(entry) = event_producers.entry(protocol_id) {
+					let event_producer = entry.insert(event_producer);
+
+					// send the event producer information on all connected peers.
+					let mut messages = Vec::with_capacity(peers.len() * 2);
+					for (peer, data) in &peers {
+						messages.push(event_producer(
+							NetworkBridgeEvent::PeerConnected(peer.clone(), data.role.clone())
+						));
+
+						messages.push(event_producer(
+							NetworkBridgeEvent::PeerViewChange(peer.clone(), data.view.clone())
+						));
+					}
+
+					ctx.send_messages(messages).await?;
+				}
+			}
+			Action::SendMessage(peers, protocol, message) => {
+				let mut message_producer = stream::iter({
+					let n_peers = peers.len();
+					let mut message = Some(
+						WireMessage::ProtocolMessage(protocol, message).encode()
+					);
+
+					peers.iter().cloned().enumerate().map(move |(i, peer)| {
+						// optimization: avoid cloning the message for the last peer in the
+						// list. The message payload can be quite large. If the underlying
+						// network used `Bytes` this would not be necessary.
+						let message = if i == n_peers - 1 {
+							message.take()
+								.expect("Only taken in last iteration of loop, never afterwards; qed")
+						} else {
+							message.as_ref()
+								.expect("Only taken in last iteration of loop, we are not there yet; qed")
+								.clone()
+						};
+
+						Ok(NetworkAction::WriteNotification(peer, message))
+					})
+				});
+
+				net.action_sink().send_all(&mut message_producer).await?;
+			}
+			Action::ReportPeer(peer, rep) => {
+				net.report_peer(peer, rep).await?;
+			}
+			Action::StartWork(relay_parent) => {
+				live_heads.push(relay_parent);
+				if let Some(view_update)
+					= update_view(&peers, &live_heads, &mut net, &mut local_view).await?
+				{
+					if let Err(e) = dispatch_update_to_all(
+						view_update,
+						event_producers.values(),
+						&mut ctx,
+					).await {
+						log::warn!("Aborting - Failure to dispatch messages to overseer");
+						return Err(e)
+					}
+				}
+			}
+			Action::StopWork(relay_parent) => {
+				live_heads.retain(|h| h != &relay_parent);
+				if let Some(view_update)
+					= update_view(&peers, &live_heads, &mut net, &mut local_view).await?
+				{
+					if let Err(e) = dispatch_update_to_all(
+						view_update,
+						event_producers.values(),
+						&mut ctx,
+					).await {
+						log::warn!("Aborting - Failure to dispatch messages to overseer");
+						return Err(e)
+					}
+				}
+			}
+
+			Action::PeerConnected(peer, role) => {
+				match peers.entry(peer.clone()) {
+					HEntry::Occupied(_) => continue,
+					HEntry::Vacant(vacant) => {
+						vacant.insert(PeerData {
+							view: View(Vec::new()),
+							role: role.clone(),
+						});
+
+						if let Err(e) = dispatch_update_to_all(
+							NetworkBridgeEvent::PeerConnected(peer, role),
+							event_producers.values(),
+							&mut ctx,
+						).await {
+							log::warn!("Aborting - Failure to dispatch messages to overseer");
+							return Err(e)
+						}
+					}
+				}
+			}
+			Action::PeerDisconnected(peer) => {
+				if peers.remove(&peer).is_some() {
+					if let Err(e) = dispatch_update_to_all(
+						NetworkBridgeEvent::PeerDisconnected(peer),
+						event_producers.values(),
+						&mut ctx,
+					).await {
+						log::warn!("Aborting - Failure to dispatch messages to overseer");
+						return Err(e)
+					}
+				}
+			},
+			Action::PeerMessages(peer, messages) => {
+				let peer_data = match peers.get_mut(&peer) {
+					None => continue,
+					Some(d) => d,
+				};
+
+				let mut outgoing_messages = Vec::with_capacity(messages.len());
+				for message in messages {
+					match message {
+						WireMessage::ViewUpdate(new_view) => {
+							if new_view.0.len() > MAX_VIEW_HEADS {
+								net.report_peer(
+									peer.clone(),
+									MALFORMED_VIEW_COST,
+								).await?;
+
+								continue
+							}
+
+							if new_view == peer_data.view { continue }
+							peer_data.view = new_view;
+
+							let update = NetworkBridgeEvent::PeerViewChange(
+								peer.clone(),
+								peer_data.view.clone(),
+							);
+
+							outgoing_messages.extend(
+								event_producers.values().map(|producer| producer(update.clone()))
+							);
+						}
+						WireMessage::ProtocolMessage(protocol, message) => {
+							let message = match event_producers.get(&protocol) {
+								Some(producer) => Some(producer(
+									NetworkBridgeEvent::PeerMessage(peer.clone(), message)
+								)),
+								None => {
+									net.report_peer(
+										peer.clone(),
+										UNKNOWN_PROTO_COST,
+									).await?;
+
+									None
+								}
+							};
+
+							if let Some(message) = message {
+								outgoing_messages.push(message);
+							}
+						}
+					}
+				}
+
+				let send_messages = ctx.send_messages(outgoing_messages);
+				if let Err(e) = send_messages.await {
+					log::warn!("Aborting - Failure to dispatch messages to overseer");
+					return Err(e)
+				}
+			},
+
+			Action::Abort => return Ok(()),
+		}
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use futures::channel::mpsc;
+	use futures::executor::{self, ThreadPool};
+
+	use std::sync::Arc;
+	use parking_lot::Mutex;
+	use assert_matches::assert_matches;
+
+	use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage};
+	use subsystem_test::{SingleItemSink, SingleItemStream};
+
+	// The subsystem's view of the network - only supports a single call to `event_stream`.
+	struct TestNetwork {
+		net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
+		action_tx: mpsc::UnboundedSender<NetworkAction>,
+	}
+
+	// The test's view of the network. This receives updates from the subsystem in the form
+	// of `NetworkAction`s.
+	struct TestNetworkHandle {
+		action_rx: mpsc::UnboundedReceiver<NetworkAction>,
+		net_tx: SingleItemSink<NetworkEvent>,
+	}
+
+	fn new_test_network() -> (
+		TestNetwork,
+		TestNetworkHandle,
+	) {
+		let (net_tx, net_rx) = subsystem_test::single_item_sink();
+		let (action_tx, action_rx) = mpsc::unbounded();
+
+		(
+			TestNetwork {
+				net_events: Arc::new(Mutex::new(Some(net_rx))),
+				action_tx,
+			},
+			TestNetworkHandle {
+				action_rx,
+				net_tx,
+			},
+		)
+	}
+
+	impl Network for TestNetwork {
+		fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
+			self.net_events.lock()
+				.take()
+				.expect("Subsystem made more than one call to `event_stream`")
+				.boxed()
+		}
+
+		fn action_sink<'a>(&'a mut self)
+			-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
+		{
+			Box::pin((&mut self.action_tx).sink_map_err(Into::into))
+		}
+	}
+
+	impl TestNetworkHandle {
+		// Get the next network action.
+		async fn next_network_action(&mut self) -> NetworkAction {
+			self.action_rx.next().await.expect("subsystem concluded early")
+		}
+
+		// Wait for the next N network actions.
+		async fn next_network_actions(&mut self, n: usize) -> Vec<NetworkAction> {
+			let mut v = Vec::with_capacity(n);
+			for _ in 0..n {
+				v.push(self.next_network_action().await);
+			}
+
+			v
+		}
+
+		async fn connect_peer(&mut self, peer: PeerId, role: ObservedRole) {
+			self.send_network_event(NetworkEvent::NotificationStreamOpened {
+				remote: peer,
+				engine_id: POLKADOT_ENGINE_ID,
+				role,
+			}).await;
+		}
+
+		async fn disconnect_peer(&mut self, peer: PeerId) {
+			self.send_network_event(NetworkEvent::NotificationStreamClosed {
+				remote: peer,
+				engine_id: POLKADOT_ENGINE_ID,
+			}).await;
+		}
+
+		async fn peer_message(&mut self, peer: PeerId, message: Vec<u8>) {
+			self.send_network_event(NetworkEvent::NotificationsReceived {
+				remote: peer,
+				messages: vec![(POLKADOT_ENGINE_ID, message.into())],
+			}).await;
+		}
+
+		async fn send_network_event(&mut self, event: NetworkEvent) {
+			self.net_tx.send(event).await.expect("subsystem concluded early");
+		}
+	}
+
+	// network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so
+	// we need to use this to prevent fragile reliance on peer ordering.
+	fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool {
+		actions.iter().find(|&x| x == action).is_some()
+	}
+
+	struct TestHarness {
+		network_handle: TestNetworkHandle,
+		virtual_overseer: subsystem_test::TestSubsystemContextHandle<NetworkBridgeMessage>,
+	}
+
+	fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
+		let pool = ThreadPool::new().unwrap();
+
+		let (network, network_handle) = new_test_network();
+		let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool);
+
+		let network_bridge = run_network(
+			network,
+			context,
+		)
+			.map_err(|_| panic!("subsystem execution failed"))
+			.map(|_| ());
+
+		let test_fut = test(TestHarness {
+			network_handle,
+			virtual_overseer,
+		});
+
+		futures::pin_mut!(test_fut);
+		futures::pin_mut!(network_bridge);
+
+		executor::block_on(future::select(test_fut, network_bridge));
+	}
+
+	#[test]
+	fn sends_view_updates_to_peers() {
+		test_harness(|test_harness| async move {
+			let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;
+
+			let peer_a = PeerId::random();
+			let peer_b = PeerId::random();
+
+			network_handle.connect_peer(peer_a.clone(), ObservedRole::Full).await;
+			network_handle.connect_peer(peer_b.clone(), ObservedRole::Full).await;
+
+			let hash_a = Hash::from([1; 32]);
+
+			virtual_overseer.send(FromOverseer::Signal(OverseerSignal::StartWork(hash_a))).await;
+
+			let actions = network_handle.next_network_actions(2).await;
+			let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode();
+			assert!(network_actions_contains(
+				&actions,
+				&NetworkAction::WriteNotification(peer_a, wire_message.clone()),
+			));
+
+			assert!(network_actions_contains(
+				&actions,
+				&NetworkAction::WriteNotification(peer_b, wire_message.clone()),
+			));
+		});
+	}
+
+	#[test]
+	fn peer_view_updates_sent_via_overseer() {
+		test_harness(|test_harness| async move {
+			let TestHarness {
+				mut network_handle,
+				mut virtual_overseer,
+			} = test_harness;
+
+			let peer = PeerId::random();
+
+			let proto_statement = *b"abcd";
+			let proto_bitfield = *b"wxyz";
+
+			network_handle.connect_peer(peer.clone(), ObservedRole::Full).await;
+
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: NetworkBridgeMessage::RegisterEventProducer(
+					proto_statement,
+					|event| AllMessages::StatementDistribution(
+						StatementDistributionMessage::NetworkBridgeUpdate(event)
+					)
+				),
+			}).await;
+
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: NetworkBridgeMessage::RegisterEventProducer(
+					proto_bitfield,
+					|event| AllMessages::BitfieldDistribution(
+						BitfieldDistributionMessage::NetworkBridgeUpdate(event)
+					)
+				),
+			}).await;
+
+			let view = View(vec![Hash::from([1u8; 32])]);
+
+			// bridge will inform about all previously-connected peers.
+			{
+				assert_matches!(
+					virtual_overseer.recv().await,
+					AllMessages::StatementDistribution(
+						StatementDistributionMessage::NetworkBridgeUpdate(
+							NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full)
+						)
+					) if p == peer
+				);
+
+				assert_matches!(
+					virtual_overseer.recv().await,
+					AllMessages::StatementDistribution(
+						StatementDistributionMessage::NetworkBridgeUpdate(
+							NetworkBridgeEvent::PeerViewChange(p, v)
+						)
+					) if p == peer && v == View(Default::default())
+				);
+
+				assert_matches!(
+					virtual_overseer.recv().await,
+					AllMessages::BitfieldDistribution(
+						BitfieldDistributionMessage::NetworkBridgeUpdate(
+							NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full)
+						)
+					) if p == peer
+				);
+
+				assert_matches!(
+					virtual_overseer.recv().await,
+					AllMessages::BitfieldDistribution(
+						BitfieldDistributionMessage::NetworkBridgeUpdate(
+							NetworkBridgeEvent::PeerViewChange(p, v)
+						)
+					) if p == peer && v == View(Default::default())
+				);
+			}
+
+			network_handle.peer_message(
+				peer.clone(),
+				WireMessage::ViewUpdate(view.clone()).encode(),
+			).await;
+
+			// statement distribution message comes first because handlers are ordered by
+			// protocol ID.
+
+			assert_matches!(
+				virtual_overseer.recv().await,
+				AllMessages::StatementDistribution(
+					StatementDistributionMessage::NetworkBridgeUpdate(
+						NetworkBridgeEvent::PeerViewChange(p, v)
+					)
+				) => {
+					assert_eq!(p, peer);
+					assert_eq!(v, view);
+				}
+			);
+
+			assert_matches!(
+				virtual_overseer.recv().await,
+				AllMessages::BitfieldDistribution(
+					BitfieldDistributionMessage::NetworkBridgeUpdate(
+						NetworkBridgeEvent::PeerViewChange(p, v)
+					)
+				) => {
+					assert_eq!(p, peer);
+					assert_eq!(v, view);
+				}
+			);
+		});
+	}
+
+	#[test]
+	fn peer_messages_sent_via_overseer() {
+		test_harness(|test_harness| async move {
+			let TestHarness {
+				mut network_handle,
+				mut virtual_overseer,
+			} = test_harness;
+
+			let peer = PeerId::random();
+
+			let proto_statement = *b"abcd";
+			let proto_bitfield = *b"wxyz";
+
+			network_handle.connect_peer(peer.clone(), ObservedRole::Full).await;
+
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: NetworkBridgeMessage::RegisterEventProducer(
+					proto_statement,
+					|event| AllMessages::StatementDistribution(
+						StatementDistributionMessage::NetworkBridgeUpdate(event)
+					)
+				),
+			}).await;
+
+			virtual_overseer.send(FromOverseer::Communication {
+				msg: NetworkBridgeMessage::RegisterEventProducer(
+					proto_bitfield,
+					|event| AllMessages::BitfieldDistribution(
+						BitfieldDistributionMessage::NetworkBridgeUpdate(event)
+					)
+				),
+			}).await;
+
+			// bridge will inform about all previously-connected peers.
+			{
+				assert_matches!(
+					virtual_overseer.recv().await,
+					AllMessages::StatementDistribution(
+						StatementDistributionMessage::NetworkBridgeUpdate(
+							NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full)
+						)
+					) if p == peer
+				);
+
+				assert_matches!(
+					virtual_overseer.recv().await,
+					AllMessages::StatementDistribution(
+						StatementDistributionMessage::NetworkBridgeUpdate(
+							NetworkBridgeEvent::PeerViewChange(p, v)
+						)
+					) if p == peer && v == View(Default::default())
+				);
+
+				assert_matches!(
+					virtual_overseer.recv().await,
+					AllMessages::BitfieldDistribution(
+						BitfieldDistributionMessage::NetworkBridgeUpdate(
+							NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full)
+						)
+					) if p == peer
+				);
+
+				assert_matches!(
+					virtual_overseer.recv().await,
+					AllMessages::BitfieldDistribution(
+						BitfieldDistributionMessage::NetworkBridgeUpdate(
+							NetworkBridgeEvent::PeerViewChange(p, v)
+						)
+					) if p == peer && v == View(Default::default())
+				);
+			}
+
+			let payload = vec![1, 2, 3];
+
+			network_handle.peer_message(
+				peer.clone(),
+				WireMessage::ProtocolMessage(proto_statement, payload.clone()).encode(),
+			).await;
+
+			network_handle.disconnect_peer(peer.clone()).await;
+
+			// statement distribution message comes first because handlers are ordered by
+			// protocol ID, and then a disconnection event comes - indicating that the message
+			// was only sent to the correct protocol.
+
+			assert_matches!(
+				virtual_overseer.recv().await,
+				AllMessages::StatementDistribution(
+					StatementDistributionMessage::NetworkBridgeUpdate(
+						NetworkBridgeEvent::PeerMessage(p, m)
+					)
+				) => {
+					assert_eq!(p, peer);
+					assert_eq!(m, payload);
+				}
+			);
+
+			assert_matches!(
+				virtual_overseer.recv().await,
+				AllMessages::StatementDistribution(
+					StatementDistributionMessage::NetworkBridgeUpdate(
+						NetworkBridgeEvent::PeerDisconnected(p)
+					)
+				) => {
+					assert_eq!(p, peer);
+				}
+			);
+		});
+	}
+}
diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml
index 88626e2e05f3e76c13f89d280b14c301a5cb0d37..6c6ce304e6d4165c157d7569eacb10593187e436 100644
--- a/polkadot/node/overseer/Cargo.toml
+++ b/polkadot/node/overseer/Cargo.toml
@@ -11,7 +11,8 @@ futures-timer = "3.0.2"
 streamunordered = "0.5.1"
 polkadot-primitives = { path = "../../primitives" }
 client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
-messages = { package = "polkadot-node-messages", path = "../messages" }
+polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
+async-trait = "0.1"
 
 [dev-dependencies]
 futures = { version = "0.3.5", features = ["thread-pool"] }
diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs
index 77b99a3a3b3fab0ab38b4362dbdb5cda35423266..0edc87a6b8db7e63a6364a667cb7c5a1a1d4124e 100644
--- a/polkadot/node/overseer/examples/minimal-example.rs
+++ b/polkadot/node/overseer/examples/minimal-example.rs
@@ -28,16 +28,17 @@ use futures_timer::Delay;
 use kv_log_macro as log;
 
 use polkadot_primitives::parachain::{BlockData, PoVBlock};
-use polkadot_overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem};
+use polkadot_overseer::Overseer;
 
-use messages::{
-	AllMessages, CandidateBackingMessage, FromOverseer, CandidateValidationMessage
+use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer};
+use polkadot_subsystem::messages::{
+	AllMessages, CandidateBackingMessage, CandidateValidationMessage
 };
 
 struct Subsystem1;
 
 impl Subsystem1 {
-	async fn run(mut ctx: SubsystemContext<CandidateBackingMessage>)  {
+	async fn run(mut ctx: impl SubsystemContext<Message=CandidateBackingMessage>)  {
 		loop {
 			match ctx.try_recv().await {
 				Ok(Some(msg)) => {
@@ -56,7 +57,7 @@ impl Subsystem1 {
 			Delay::new(Duration::from_secs(1)).await;
 			let (tx, _) = oneshot::channel();
 
-			ctx.send_msg(AllMessages::CandidateValidation(
+			ctx.send_message(AllMessages::CandidateValidation(
 				CandidateValidationMessage::Validate(
 					Default::default(),
 					Default::default(),
@@ -70,8 +71,10 @@ impl Subsystem1 {
 	}
 }
 
-impl Subsystem<CandidateBackingMessage> for Subsystem1 {
-	fn start(&mut self, ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
+impl<C> Subsystem<C> for Subsystem1
+	where C: SubsystemContext<Message=CandidateBackingMessage>
+{
+	fn start(&mut self, ctx: C) -> SpawnedSubsystem {
 		SpawnedSubsystem(Box::pin(async move {
 			Self::run(ctx).await;
 		}))
@@ -81,7 +84,7 @@ impl Subsystem<CandidateBackingMessage> for Subsystem1 {
 struct Subsystem2;
 
 impl Subsystem2 {
-	async fn run(mut ctx: SubsystemContext<CandidateValidationMessage>)  {
+	async fn run(mut ctx: impl SubsystemContext<Message=CandidateValidationMessage>)  {
 		ctx.spawn(Box::pin(async {
 			loop {
 				log::info!("Job tick");
@@ -105,8 +108,10 @@ impl Subsystem2 {
 	}
 }
 
-impl Subsystem<CandidateValidationMessage> for Subsystem2 {
-	fn start(&mut self, ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
+impl<C> Subsystem<C> for Subsystem2
+	where C: SubsystemContext<Message=CandidateValidationMessage>
+{
+	fn start(&mut self, ctx: C) -> SpawnedSubsystem {
 		SpawnedSubsystem(Box::pin(async move {
 			Self::run(ctx).await;
 		}))
diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs
index 0d3c9b7b509552a92e2a3151183490188032114c..8fb8706be429251a22874ac20c7e96e20b5ae3c3 100644
--- a/polkadot/node/overseer/src/lib.rs
+++ b/polkadot/node/overseer/src/lib.rs
@@ -65,8 +65,8 @@ use futures::channel::{mpsc, oneshot};
 use futures::{
 	pending, poll, select,
 	future::{BoxFuture, RemoteHandle},
-	stream::FuturesUnordered,
-	task::{Spawn, SpawnError, SpawnExt},
+	stream::{self, FuturesUnordered},
+	task::{Spawn, SpawnExt},
 	Future, FutureExt, SinkExt, StreamExt,
 };
 use futures_timer::Delay;
@@ -75,50 +75,14 @@ use streamunordered::{StreamYield, StreamUnordered};
 use polkadot_primitives::{Block, BlockNumber, Hash};
 use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
 
-pub use messages::{
-	OverseerSignal, CandidateValidationMessage, CandidateBackingMessage, AllMessages,
-	FromOverseer,
+use polkadot_subsystem::messages::{
+	CandidateValidationMessage, CandidateBackingMessage, AllMessages
+};
+pub use polkadot_subsystem::{
+	Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
+	SpawnedSubsystem,
 };
 
-/// An error type that describes faults that may happen
-///
-/// These are:
-///   * Channels being closed
-///   * Subsystems dying when they are not expected to
-///   * Subsystems not dying when they are told to die
-///   * etc.
-#[derive(Debug)]
-pub struct SubsystemError;
-
-impl From<mpsc::SendError> for SubsystemError {
-	fn from(_: mpsc::SendError) -> Self {
-		Self
-	}
-}
-
-impl From<oneshot::Canceled> for SubsystemError {
-	fn from(_: oneshot::Canceled) -> Self {
-		Self
-	}
-}
-
-impl From<SpawnError> for SubsystemError {
-    fn from(_: SpawnError) -> Self {
-		Self
-    }
-}
-
-/// A `Result` type that wraps [`SubsystemError`].
-///
-/// [`SubsystemError`]: struct.SubsystemError.html
-pub type SubsystemResult<T> = Result<T, SubsystemError>;
-
-/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`].
-///
-/// In essence it's just a newtype wrapping a `BoxFuture`.
-///
-/// [`Overseer`]: struct.Overseer.html
-pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>);
 
 // A capacity of bounded channels inside the overseer.
 const CHANNEL_CAPACITY: usize = 1024;
@@ -278,7 +242,7 @@ impl Debug for ToOverseer {
 /// A running instance of some [`Subsystem`].
 ///
 /// [`Subsystem`]: trait.Subsystem.html
-struct SubsystemInstance<M: Debug> {
+struct SubsystemInstance<M> {
 	tx: mpsc::Sender<FromOverseer<M>>,
 }
 
@@ -289,17 +253,17 @@ struct SubsystemInstance<M: Debug> {
 /// [`Overseer`]: struct.Overseer.html
 /// [`Subsystem`]: trait.Subsystem.html
 /// [`SubsystemJob`]: trait.SubsystemJob.html
-pub struct SubsystemContext<M: Debug>{
+#[derive(Debug)]
+pub struct OverseerSubsystemContext<M>{
 	rx: mpsc::Receiver<FromOverseer<M>>,
 	tx: mpsc::Sender<ToOverseer>,
 }
 
-impl<M: Debug> SubsystemContext<M> {
-	/// Try to asyncronously receive a message.
-	///
-	/// This has to be used with caution, if you loop over this without
-	/// using `pending!()` macro you will end up with a busy loop!
-	pub async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
+#[async_trait::async_trait]
+impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
+	type Message = M;
+
+	async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
 		match poll!(self.rx.next()) {
 			Poll::Ready(Some(msg)) => Ok(Some(msg)),
 			Poll::Ready(None) => Err(()),
@@ -307,13 +271,11 @@ impl<M: Debug> SubsystemContext<M> {
 		}
 	}
 
-	/// Receive a message.
-	pub async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
+	async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
 		self.rx.next().await.ok_or(SubsystemError)
 	}
 
-	/// Spawn a child task on the executor.
-	pub async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
+	async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
 		let (tx, rx) = oneshot::channel();
 		self.tx.send(ToOverseer::SpawnJob {
 			s,
@@ -323,33 +285,25 @@ impl<M: Debug> SubsystemContext<M> {
 		rx.await?
 	}
 
-	/// Send a direct message to some other `Subsystem`, routed based on message type.
-	pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
+	async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
 		self.tx.send(ToOverseer::SubsystemMessage(msg)).await?;
 
 		Ok(())
 	}
 
-	fn new(rx: mpsc::Receiver<FromOverseer<M>>, tx: mpsc::Sender<ToOverseer>) -> Self {
-		Self {
-			rx,
-			tx,
-		}
+	async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
+		where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
+	{
+		let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
+		self.tx.send_all(&mut msgs).await?;
+
+		Ok(())
 	}
 }
 
-/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
-///
-/// It is generic over the message type circulating in the system.
-/// The idea that we want some type contaning persistent state that
-/// can spawn actually running subsystems when asked to.
-///
-/// [`Overseer`]: struct.Overseer.html
-/// [`Subsystem`]: trait.Subsystem.html
-pub trait Subsystem<M: Debug> {
-	/// Start this `Subsystem` and return `SpawnedSubsystem`.
-	fn start(&mut self, ctx: SubsystemContext<M>) -> SpawnedSubsystem;
-}
+/// A subsystem compatible with the overseer - one which can be run in the context of the
+/// overseer.
+pub type CompatibleSubsystem<M> = Box<dyn Subsystem<OverseerSubsystemContext<M>> + Send>;
 
 /// A subsystem that we oversee.
 ///
@@ -359,8 +313,8 @@ pub trait Subsystem<M: Debug> {
 ///
 /// [`Subsystem`]: trait.Subsystem.html
 #[allow(dead_code)]
-struct OverseenSubsystem<M: Debug> {
-	subsystem: Box<dyn Subsystem<M> + Send>,
+struct OverseenSubsystem<M> {
+	subsystem: CompatibleSubsystem<M>,
 	instance: Option<SubsystemInstance<M>>,
 }
 
@@ -441,16 +395,20 @@ where
 	/// # use std::time::Duration;
 	/// # use futures::{executor, pin_mut, select, FutureExt};
 	/// # use futures_timer::Delay;
-	/// # use polkadot_overseer::{
-	/// #     Overseer, Subsystem, SpawnedSubsystem, SubsystemContext,
-	/// #     CandidateValidationMessage, CandidateBackingMessage,
+	/// # use polkadot_overseer::Overseer;
+	/// # use polkadot_subsystem::{
+	/// #     Subsystem, SpawnedSubsystem, SubsystemContext,
+	/// #     messages::{CandidateValidationMessage, CandidateBackingMessage},
 	/// # };
 	///
 	/// struct ValidationSubsystem;
-	/// impl Subsystem<CandidateValidationMessage> for ValidationSubsystem {
+	///
+	/// impl<C> Subsystem<C> for ValidationSubsystem
+	/// 	where C: SubsystemContext<Message=CandidateValidationMessage>
+	/// {
 	///     fn start(
 	///         &mut self,
-	///         mut ctx: SubsystemContext<CandidateValidationMessage>,
+	///         mut ctx: C,
 	///     ) -> SpawnedSubsystem {
 	///         SpawnedSubsystem(Box::pin(async move {
 	///             loop {
@@ -461,10 +419,12 @@ where
 	/// }
 	///
 	/// struct CandidateBackingSubsystem;
-	/// impl Subsystem<CandidateBackingMessage> for CandidateBackingSubsystem {
+	/// impl<C> Subsystem<C> for CandidateBackingSubsystem
+	/// 	where C: SubsystemContext<Message=CandidateBackingMessage>
+	/// {
 	///     fn start(
 	///         &mut self,
-	///         mut ctx: SubsystemContext<CandidateBackingMessage>,
+	///         mut ctx: C,
 	///     ) -> SpawnedSubsystem {
 	///         SpawnedSubsystem(Box::pin(async move {
 	///             loop {
@@ -498,8 +458,8 @@ where
 	/// ```
 	pub fn new(
 		leaves: impl IntoIterator<Item = BlockInfo>,
-		validation: Box<dyn Subsystem<CandidateValidationMessage> + Send>,
-		candidate_backing: Box<dyn Subsystem<CandidateBackingMessage> + Send>,
+		validation: CompatibleSubsystem<CandidateValidationMessage>,
+		candidate_backing: CompatibleSubsystem<CandidateBackingMessage>,
 		mut s: S,
 	) -> SubsystemResult<(Self, OverseerHandler)> {
 		let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
@@ -680,6 +640,12 @@ where
 					let _ = s.tx.send(FromOverseer::Communication { msg }).await;
 				}
 			}
+			_ => {
+				// TODO: temporary catch-all until all subsystems are integrated with overseer.
+				// The overseer is not complete until this is an exhaustive match with all
+				// messages targeting an included subsystem.
+				// https://github.com/paritytech/polkadot/issues/1317
+			}
 		}
 	}
 
@@ -688,15 +654,15 @@ where
 	}
 }
 
-fn spawn<S: Spawn, M: Debug>(
+fn spawn<S: Spawn, M: Send + 'static>(
 	spawner: &mut S,
 	futures: &mut FuturesUnordered<RemoteHandle<()>>,
 	streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
-	mut s: Box<dyn Subsystem<M> + Send>,
+	mut s: CompatibleSubsystem<M>,
 ) -> SubsystemResult<OverseenSubsystem<M>> {
 	let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
 	let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
-	let ctx = SubsystemContext::new(to_rx, from_tx);
+	let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
 	let f = s.start(ctx);
 
 	let handle = spawner.spawn_with_handle(f.0)?;
@@ -723,8 +689,10 @@ mod tests {
 
 	struct TestSubsystem1(mpsc::Sender<usize>);
 
-	impl Subsystem<CandidateValidationMessage> for TestSubsystem1 {
-		fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
+	impl<C> Subsystem<C> for TestSubsystem1
+		where C: SubsystemContext<Message=CandidateValidationMessage>
+	{
+		fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
 			let mut sender = self.0.clone();
 			SpawnedSubsystem(Box::pin(async move {
 				let mut i = 0;
@@ -746,14 +714,16 @@ mod tests {
 
 	struct TestSubsystem2(mpsc::Sender<usize>);
 
-	impl Subsystem<CandidateBackingMessage> for TestSubsystem2 {
-		fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
+	impl<C> Subsystem<C> for TestSubsystem2
+		where C: SubsystemContext<Message=CandidateBackingMessage>
+	{
+		fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
 			SpawnedSubsystem(Box::pin(async move {
 				let mut c: usize = 0;
 				loop {
 					if c < 10 {
 						let (tx, _) = oneshot::channel();
-						ctx.send_msg(
+						ctx.send_message(
 							AllMessages::CandidateValidation(
 								CandidateValidationMessage::Validate(
 									Default::default(),
@@ -786,8 +756,10 @@ mod tests {
 
 	struct TestSubsystem4;
 
-	impl Subsystem<CandidateBackingMessage> for TestSubsystem4 {
-		fn start(&mut self, mut _ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
+	impl<C> Subsystem<C> for TestSubsystem4
+		where C: SubsystemContext<Message=CandidateBackingMessage>
+	{
+		fn start(&mut self, mut _ctx: C) -> SpawnedSubsystem {
 			SpawnedSubsystem(Box::pin(async move {
 				// Do nothing and exit.
 			}))
@@ -871,8 +843,10 @@ mod tests {
 
 	struct TestSubsystem5(mpsc::Sender<OverseerSignal>);
 
-	impl Subsystem<CandidateValidationMessage> for TestSubsystem5 {
-		fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
+	impl<C> Subsystem<C> for TestSubsystem5
+		where C: SubsystemContext<Message=CandidateValidationMessage>
+	{
+		fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
 			let mut sender = self.0.clone();
 
 			SpawnedSubsystem(Box::pin(async move {
@@ -895,8 +869,10 @@ mod tests {
 
 	struct TestSubsystem6(mpsc::Sender<OverseerSignal>);
 
-	impl Subsystem<CandidateBackingMessage> for TestSubsystem6 {
-		fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
+	impl<C> Subsystem<C> for TestSubsystem6
+		where C: SubsystemContext<Message=CandidateBackingMessage>
+	{
+		fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
 			let mut sender = self.0.clone();
 
 			SpawnedSubsystem(Box::pin(async move {
diff --git a/polkadot/node/primitives/Cargo.toml b/polkadot/node/primitives/Cargo.toml
index f317565b2e99ddd0b9a47671974f40c12147a262..b2bc9231ae74fa35b165592ab335deee1572a46f 100644
--- a/polkadot/node/primitives/Cargo.toml
+++ b/polkadot/node/primitives/Cargo.toml
@@ -10,3 +10,4 @@ polkadot-primitives = { path = "../../primitives" }
 polkadot-statement-table = { path = "../../statement-table" }
 parity-scale-codec = { version = "1.3.0", default-features = false, features = ["derive"] }
 runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
+async-trait = "0.1"
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index bd43748ab24a0e8b7dce38ad43ba86bce8c4ad53..527e6aaea2742aad899ab2277b5b07b96c8cc474 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -64,6 +64,7 @@ impl EncodeAs<CompactStatement> for Statement {
 pub type SignedFullStatement = Signed<Statement, CompactStatement>;
 
 /// A misbehaviour report.
+#[derive(Debug)]
 pub enum MisbehaviorReport {
 	/// These validator nodes disagree on this candidate's validity, please figure it out
 	///
@@ -79,3 +80,12 @@ pub enum MisbehaviorReport {
 	/// This peer has seconded more than one parachain candidate for this relay parent head
 	DoubleVote(CandidateReceipt, SignedFullStatement, SignedFullStatement),
 }
+
+/// A unique identifier for a network protocol.
+pub type ProtocolId = [u8; 4];
+
+/// A succinct representation of a peer's view. This consists of a bounded amount of chain heads.
+///
+/// Up to `N` (5?) chain heads.
+#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
+pub struct View(pub Vec<Hash>);
diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml
index 74069f0233afa8d73e79e2f3bf3ed8cba2c1cf9b..f1a56acfad9524b74c9206b217f2085d0a3ea378 100644
--- a/polkadot/node/service/Cargo.toml
+++ b/polkadot/node/service/Cargo.toml
@@ -15,6 +15,7 @@ hex-literal = "0.2.1"
 polkadot-primitives = { path = "../../primitives" }
 polkadot-runtime = { path = "../../runtime/polkadot" }
 polkadot-overseer = { path = "../overseer" }
+polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
 kusama-runtime = { path = "../../runtime/kusama" }
 westend-runtime = { path = "../../runtime/westend" }
 polkadot-network = { path = "../../network", optional = true }
diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs
index 2c3d77605e1b7e861fe8d8bcd3869615756ab6b6..8b0e27b8a6b7b3cc3e2d65aedb1aacf28873abd0 100644
--- a/polkadot/node/service/src/lib.rs
+++ b/polkadot/node/service/src/lib.rs
@@ -29,10 +29,10 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
 use sc_executor::native_executor_instance;
 use log::info;
 use sp_blockchain::HeaderBackend;
-use polkadot_overseer::{
-	self as overseer,
-	BlockInfo, Overseer, OverseerHandler, Subsystem, SubsystemContext, SpawnedSubsystem,
-	CandidateValidationMessage, CandidateBackingMessage,
+use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler};
+use polkadot_subsystem::{
+	Subsystem, SubsystemContext, SpawnedSubsystem,
+	messages::{CandidateValidationMessage, CandidateBackingMessage},
 };
 pub use service::{
 	Role, PruningMode, TransactionPoolOptions, Error, RuntimeGenesis,
@@ -269,8 +269,10 @@ macro_rules! new_full_start {
 
 struct CandidateValidationSubsystem;
 
-impl Subsystem<CandidateValidationMessage> for CandidateValidationSubsystem {
-	fn start(&mut self, mut ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
+impl<C> Subsystem<C> for CandidateValidationSubsystem
+	where C: SubsystemContext<Message = CandidateValidationMessage>
+{
+	fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
 		SpawnedSubsystem(Box::pin(async move {
 			while let Ok(_) = ctx.recv().await {}
 		}))
@@ -279,8 +281,10 @@ impl Subsystem<CandidateValidationMessage> for CandidateValidationSubsystem {
 
 struct CandidateBackingSubsystem;
 
-impl Subsystem<CandidateBackingMessage> for CandidateBackingSubsystem {
-	fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
+impl<C> Subsystem<C> for CandidateBackingSubsystem
+	where C: SubsystemContext<Message = CandidateBackingMessage>
+{
+	fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
 		SpawnedSubsystem(Box::pin(async move {
 			while let Ok(_) = ctx.recv().await {}
 		}))
diff --git a/polkadot/node/messages/Cargo.toml b/polkadot/node/subsystem/Cargo.toml
similarity index 77%
rename from polkadot/node/messages/Cargo.toml
rename to polkadot/node/subsystem/Cargo.toml
index 9edb5a0519876b3aecfe907fefe3bcca999e2e3b..43712319cb716c0f48e266eb5904f5d0f2b9daf0 100644
--- a/polkadot/node/messages/Cargo.toml
+++ b/polkadot/node/subsystem/Cargo.toml
@@ -1,9 +1,9 @@
 [package]
-name = "polkadot-node-messages"
+name = "polkadot-node-subsystem"
 version = "0.1.0"
 authors = ["Parity Technologies <admin@parity.io>"]
 edition = "2018"
-description = "Message types used by Subsystems"
+description = "Subsystem traits and message definitions"
 
 [dependencies]
 polkadot-primitives = { path = "../../primitives" }
@@ -11,3 +11,4 @@ polkadot-statement-table = { path = "../../statement-table" }
 polkadot-node-primitives = { path = "../primitives" }
 sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
 futures = "0.3.5"
+async-trait = "0.1"
diff --git a/polkadot/node/subsystem/src/lib.rs b/polkadot/node/subsystem/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..31d094907f517f79b37968768998825d218579cb
--- /dev/null
+++ b/polkadot/node/subsystem/src/lib.rs
@@ -0,0 +1,150 @@
+// Copyright 2017-2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Subsystem trait definitions and message types.
+//!
+//! Node-side logic for Polkadot is mostly comprised of Subsystems, which are discrete components
+//! that communicate via message-passing. They are coordinated by an overseer, provided by a
+//! separate crate.
+
+use std::pin::Pin;
+
+use futures::prelude::*;
+use futures::channel::{mpsc, oneshot};
+use futures::future::BoxFuture;
+
+use polkadot_primitives::Hash;
+use async_trait::async_trait;
+
+use crate::messages::AllMessages;
+
+pub mod messages;
+
+/// Signals sent by an overseer to a subsystem.
+#[derive(PartialEq, Clone, Debug)]
+pub enum OverseerSignal {
+	/// `Subsystem` should start working on block-based work, given by the relay-chain block hash.
+	StartWork(Hash),
+	/// `Subsystem` should stop working on block-based work specified by the relay-chain block hash.
+	StopWork(Hash),
+	/// Conclude the work of the `Overseer` and all `Subsystem`s.
+	Conclude,
+}
+
+/// A message type that a subsystem receives from an overseer.
+/// It wraps signals from an overseer and messages that are circulating
+/// between subsystems.
+///
+/// It is generic over over the message type `M` that a particular `Subsystem` may use.
+#[derive(Debug)]
+pub enum FromOverseer<M> {
+	/// Signal from the `Overseer`.
+	Signal(OverseerSignal),
+
+	/// Some other `Subsystem`'s message.
+	Communication {
+		msg: M,
+	},
+}
+
+/// An error type that describes faults that may happen
+///
+/// These are:
+///   * Channels being closed
+///   * Subsystems dying when they are not expected to
+///   * Subsystems not dying when they are told to die
+///   * etc.
+#[derive(Debug)]
+pub struct SubsystemError;
+
+impl From<mpsc::SendError> for SubsystemError {
+	fn from(_: mpsc::SendError) -> Self {
+		Self
+	}
+}
+
+impl From<oneshot::Canceled> for SubsystemError {
+	fn from(_: oneshot::Canceled) -> Self {
+		Self
+	}
+}
+
+impl From<futures::task::SpawnError> for SubsystemError {
+    fn from(_: futures::task::SpawnError) -> Self {
+		Self
+    }
+}
+
+impl From<std::convert::Infallible> for SubsystemError {
+	fn from(e: std::convert::Infallible) -> Self {
+		match e {}
+	}
+}
+
+/// An asynchronous subsystem task..
+///
+/// In essence it's just a newtype wrapping a `BoxFuture`.
+pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>);
+
+/// A `Result` type that wraps [`SubsystemError`].
+///
+/// [`SubsystemError`]: struct.SubsystemError.html
+pub type SubsystemResult<T> = Result<T, SubsystemError>;
+
+/// A context type that is given to the [`Subsystem`] upon spawning.
+/// It can be used by [`Subsystem`] to communicate with other [`Subsystem`]s
+/// or spawn jobs.
+///
+/// [`Overseer`]: struct.Overseer.html
+/// [`SubsystemJob`]: trait.SubsystemJob.html
+#[async_trait]
+pub trait SubsystemContext: Send + 'static {
+	/// The message type of this context. Subsystems launched with this context will expect
+	/// to receive messages of this type.
+	type Message: Send;
+
+	/// Try to asynchronously receive a message.
+	///
+	/// This has to be used with caution, if you loop over this without
+	/// using `pending!()` macro you will end up with a busy loop!
+	async fn try_recv(&mut self) -> Result<Option<FromOverseer<Self::Message>>, ()>;
+
+	/// Receive a message.
+	async fn recv(&mut self) -> SubsystemResult<FromOverseer<Self::Message>>;
+
+	/// Spawn a child task on the executor.
+	async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()>;
+
+	/// Send a direct message to some other `Subsystem`, routed based on message type.
+	async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()>;
+
+	/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
+	async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
+		where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send;
+}
+
+/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
+///
+/// It is generic over the message type circulating in the system.
+/// The idea that we want some type contaning persistent state that
+/// can spawn actually running subsystems when asked to.
+///
+/// [`Overseer`]: struct.Overseer.html
+/// [`Subsystem`]: trait.Subsystem.html
+pub trait Subsystem<C: SubsystemContext> {
+	/// Start this `Subsystem` and return `SpawnedSubsystem`.
+	fn start(&mut self, ctx: C) -> SpawnedSubsystem;
+}
diff --git a/polkadot/node/messages/src/lib.rs b/polkadot/node/subsystem/src/messages.rs
similarity index 87%
rename from polkadot/node/messages/src/lib.rs
rename to polkadot/node/subsystem/src/messages.rs
index 3a413f2c67bbdcc951ed0f820061c4061cad321e..c22581349078bb6c6426d175838ca25cce51d9c6 100644
--- a/polkadot/node/messages/src/lib.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -24,27 +24,16 @@
 
 use futures::channel::{mpsc, oneshot};
 
-use sc_network::{ObservedRole, ReputationChange, PeerId, config::ProtocolId};
+use sc_network::{ObservedRole, ReputationChange, PeerId};
 use polkadot_primitives::{BlockNumber, Hash, Signature};
 use polkadot_primitives::parachain::{
 	AbridgedCandidateReceipt, PoVBlock, ErasureChunk, BackedCandidate, Id as ParaId,
 	SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex,
 };
 use polkadot_node_primitives::{
-	MisbehaviorReport, SignedFullStatement,
+	MisbehaviorReport, SignedFullStatement, View, ProtocolId,
 };
 
-/// Signals sent by an overseer to a subsystem.
-#[derive(PartialEq, Clone, Debug)]
-pub enum OverseerSignal {
-	/// `Subsystem` should start working on block-based work, given by the relay-chain block hash.
-	StartWork(Hash),
-	/// `Subsystem` should stop working on block-based work specified by the relay-chain block hash.
-	StopWork(Hash),
-	/// Conclude the work of the `Overseer` and all `Subsystem`s.
-	Conclude,
-}
-
 /// A notification of a new backed candidate.
 #[derive(Debug)]
 pub struct NewBackedCandidate(pub BackedCandidate);
@@ -90,12 +79,8 @@ pub enum CandidateValidationMessage {
 	),
 }
 
-/// Chain heads.
-///
-/// Up to `N` (5?) chain heads.
-pub struct View(pub Vec<Hash>);
-
 /// Events from network.
+#[derive(Debug, Clone)]
 pub enum NetworkBridgeEvent {
 	/// A peer has connected.
 	PeerConnected(PeerId, ObservedRole),
@@ -114,7 +99,8 @@ pub enum NetworkBridgeEvent {
 }
 
 /// Messages received by the network bridge subsystem.
-pub enum NetworkBridgeSubsystemMessage {
+#[derive(Debug)]
+pub enum NetworkBridgeMessage {
 	/// Register an event producer on startup.
 	RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages),
 
@@ -126,6 +112,7 @@ pub enum NetworkBridgeSubsystemMessage {
 }
 
 /// Availability Distribution Message.
+#[derive(Debug)]
 pub enum AvailabilityDistributionMessage {
 	/// Distribute an availability chunk to other validators.
 	DistributeChunk(Hash, ErasureChunk),
@@ -138,6 +125,7 @@ pub enum AvailabilityDistributionMessage {
 }
 
 /// Bitfield distribution message.
+#[derive(Debug)]
 pub enum BitfieldDistributionMessage {
 	/// Distribute a bitfield via gossip to other validators.
 	DistributeBitfield(Hash, SignedAvailabilityBitfield),
@@ -147,6 +135,7 @@ pub enum BitfieldDistributionMessage {
 }
 
 /// Availability store subsystem message.
+#[derive(Debug)]
 pub enum AvailabilityStoreMessage {
 	/// Query a `PoVBlock` from the AV store.
 	QueryPoV(Hash, oneshot::Sender<Option<PoVBlock>>),
@@ -159,6 +148,7 @@ pub enum AvailabilityStoreMessage {
 }
 
 /// A request to the Runtime API subsystem.
+#[derive(Debug)]
 pub enum RuntimeApiRequest {
 	/// Get the current validator set.
 	Validators(oneshot::Sender<Vec<ValidatorId>>),
@@ -171,19 +161,24 @@ pub enum RuntimeApiRequest {
 }
 
 /// A message to the Runtime API subsystem.
+#[derive(Debug)]
 pub enum RuntimeApiMessage {
 	/// Make a request of the runtime API against the post-state of the given relay-parent.
 	Request(Hash, RuntimeApiRequest),
 }
 
 /// Statement distribution message.
+#[derive(Debug)]
 pub enum StatementDistributionMessage {
 	/// We have originated a signed statement in the context of
 	/// given relay-parent hash and it should be distributed to other validators.
 	Share(Hash, SignedFullStatement),
+	/// Event from the network bridge.
+	NetworkBridgeUpdate(NetworkBridgeEvent),
 }
 
 /// This data becomes intrinsics or extrinsics which should be included in a future relay chain block.
+#[derive(Debug)]
 pub enum ProvisionableData {
 	/// This bitfield indicates the availability of various candidate blocks.
 	Bitfield(Hash, SignedAvailabilityBitfield),
@@ -198,6 +193,7 @@ pub enum ProvisionableData {
 /// Message to the Provisioner.
 ///
 /// In all cases, the Hash is that of the relay parent.
+#[derive(Debug)]
 pub enum ProvisionerMessage {
 	/// This message allows potential block authors to be kept updated with all new authorship data
 	/// as it becomes available.
@@ -213,20 +209,18 @@ pub enum AllMessages {
 	CandidateValidation(CandidateValidationMessage),
 	/// Message for the candidate backing subsystem.
 	CandidateBacking(CandidateBackingMessage),
-}
-
-/// A message type that a subsystem receives from an overseer.
-/// It wraps signals from an overseer and messages that are circulating
-/// between subsystems.
-///
-/// It is generic over over the message type `M` that a particular `Subsystem` may use.
-#[derive(Debug)]
-pub enum FromOverseer<M: std::fmt::Debug> {
-	/// Signal from the `Overseer`.
-	Signal(OverseerSignal),
-
-	/// Some other `Subsystem`'s message.
-	Communication {
-		msg: M,
-	},
+	/// Message for the candidate selection subsystem.
+	CandidateSelection(CandidateSelectionMessage),
+	/// Message for the statement distribution subsystem.
+	StatementDistribution(StatementDistributionMessage),
+	/// Message for the availability distribution subsystem.
+	AvailabilityDistribution(AvailabilityDistributionMessage),
+	/// Message for the bitfield distribution subsystem.
+	BitfieldDistribution(BitfieldDistributionMessage),
+	/// Message for the Provisioner subsystem.
+	Provisioner(ProvisionerMessage),
+	/// Message for the Runtime API subsystem.
+	RuntimeApi(RuntimeApiMessage),
+	/// Message for the availability store subsystem.
+	AvailabilityStore(AvailabilityStoreMessage),
 }
diff --git a/polkadot/node/test-helpers/subsystem/Cargo.toml b/polkadot/node/test-helpers/subsystem/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..0fc26a24ea14a619e5dde9a10189f714522aa682
--- /dev/null
+++ b/polkadot/node/test-helpers/subsystem/Cargo.toml
@@ -0,0 +1,12 @@
+[package]
+name = "polkadot-subsystem-test-helpers"
+version = "0.1.0"
+authors = ["Parity Technologies <admin@parity.io>"]
+edition = "2018"
+description = "Helpers for testing subsystems"
+
+[dependencies]
+futures = "0.3.5"
+async-trait = "0.1"
+polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
+parking_lot = "0.10.0"
diff --git a/polkadot/node/test-helpers/subsystem/src/lib.rs b/polkadot/node/test-helpers/subsystem/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..c99a33c78d9bbc89036e66aebaf9035f442a953d
--- /dev/null
+++ b/polkadot/node/test-helpers/subsystem/src/lib.rs
@@ -0,0 +1,229 @@
+// Copyright 2017-2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Utilities for testing subsystems.
+
+use polkadot_subsystem::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError};
+use polkadot_subsystem::messages::AllMessages;
+
+use futures::prelude::*;
+use futures::channel::mpsc;
+use futures::task::{Spawn, SpawnExt};
+use futures::poll;
+use parking_lot::Mutex;
+
+use std::convert::Infallible;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll, Waker};
+
+enum SinkState<T> {
+	Empty {
+		read_waker: Option<Waker>,
+	},
+	Item {
+		item: T,
+		ready_waker: Option<Waker>,
+		flush_waker: Option<Waker>,
+	},
+}
+
+/// The sink half of a single-item sink that does not resolve until the item has been read.
+pub struct SingleItemSink<T>(Arc<Mutex<SinkState<T>>>);
+
+/// The stream half of a single-item sink.
+pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);
+
+impl<T> Sink<T> for SingleItemSink<T> {
+	type Error = Infallible;
+
+	fn poll_ready(
+		self: Pin<&mut Self>,
+		cx: &mut Context,
+	) -> Poll<Result<(), Infallible>> {
+		let mut state = self.0.lock();
+		match *state {
+			SinkState::Empty { .. } => Poll::Ready(Ok(())),
+			SinkState::Item { ref mut ready_waker, .. } => {
+				*ready_waker = Some(cx.waker().clone());
+				Poll::Pending
+			}
+		}
+	}
+
+	fn start_send(
+		self: Pin<&mut Self>,
+		item: T,
+	) -> Result<(), Infallible> {
+		let mut state = self.0.lock();
+
+		match *state {
+			SinkState::Empty { ref mut read_waker } => {
+				if let Some(waker) = read_waker.take() {
+					waker.wake();
+				}
+			}
+			_ => panic!("start_send called outside of empty sink state ensured by poll_ready"),
+		}
+
+		*state = SinkState::Item {
+			item,
+			ready_waker: None,
+			flush_waker: None,
+		};
+
+		Ok(())
+	}
+
+	fn poll_flush(
+		self: Pin<&mut Self>,
+		cx: &mut Context,
+	) -> Poll<Result<(), Infallible>> {
+		let mut state = self.0.lock();
+		match *state {
+			SinkState::Empty { .. } => Poll::Ready(Ok(())),
+			SinkState::Item { ref mut flush_waker, .. } => {
+				*flush_waker = Some(cx.waker().clone());
+				Poll::Pending
+			}
+		}
+	}
+
+	fn poll_close(
+		self: Pin<&mut Self>,
+		cx: &mut Context,
+	) -> Poll<Result<(), Infallible>> {
+		self.poll_flush(cx)
+	}
+}
+
+impl<T> Stream for SingleItemStream<T> {
+	type Item = T;
+
+	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+		let mut state = self.0.lock();
+
+		let read_waker = Some(cx.waker().clone());
+
+		match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
+			SinkState::Empty { .. } => Poll::Pending,
+			SinkState::Item { item, ready_waker, flush_waker } => {
+				if let Some(waker) = ready_waker {
+					waker.wake();
+				}
+
+				if let Some(waker) = flush_waker {
+					waker.wake();
+				}
+
+				Poll::Ready(Some(item))
+			}
+		}
+	}
+}
+
+/// Create a single-item Sink/Stream pair.
+///
+/// The sink's send methods resolve at the point which the stream reads the item,
+/// not when the item is buffered.
+pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
+	let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
+	(
+		SingleItemSink(inner.clone()),
+		SingleItemStream(inner),
+	)
+}
+
+/// A test subsystem context.
+pub struct TestSubsystemContext<M, S> {
+	tx: mpsc::UnboundedSender<AllMessages>,
+	rx: SingleItemStream<FromOverseer<M>>,
+	spawn: S,
+}
+
+#[async_trait::async_trait]
+impl<M: Send + 'static, S: Spawn + Send + 'static> SubsystemContext for TestSubsystemContext<M, S> {
+	type Message = M;
+
+	async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
+		match poll!(self.rx.next()) {
+			Poll::Ready(Some(msg)) => Ok(Some(msg)),
+			Poll::Ready(None) => Err(()),
+			Poll::Pending => Ok(None),
+		}
+	}
+
+	async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
+		self.rx.next().await.ok_or(SubsystemError)
+	}
+
+	async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
+		self.spawn.spawn(s).map_err(Into::into)
+	}
+
+	async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
+		self.tx.send(msg).await.expect("test overseer no longer live");
+		Ok(())
+	}
+
+	async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
+		where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
+	{
+		let mut iter = stream::iter(msgs.into_iter().map(Ok));
+		self.tx.send_all(&mut iter).await.expect("test overseer no longer live");
+
+		Ok(())
+	}
+}
+
+/// A handle for interacting with the subsystem context.
+pub struct TestSubsystemContextHandle<M> {
+	tx: SingleItemSink<FromOverseer<M>>,
+	rx: mpsc::UnboundedReceiver<AllMessages>,
+}
+
+impl<M> TestSubsystemContextHandle<M> {
+	/// Send a message or signal to the subsystem. This resolves at the point in time where the
+	/// subsystem has _read_ the message.
+	pub async fn send(&mut self, from_overseer: FromOverseer<M>) {
+		self.tx.send(from_overseer).await.expect("Test subsystem no longer live");
+	}
+
+	/// Receive the next message from the subsystem.
+	pub async fn recv(&mut self) -> AllMessages {
+		self.rx.next().await.expect("Test subsystem no longer live")
+	}
+}
+
+/// Make a test subsystem context.
+pub fn make_subsystem_context<M, S>(spawn: S)
+	-> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>)
+{
+	let (overseer_tx, overseer_rx) = single_item_sink();
+	let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
+
+	(
+		TestSubsystemContext {
+			tx: all_messages_tx,
+			rx: overseer_rx,
+			spawn,
+		},
+		TestSubsystemContextHandle {
+			tx: overseer_tx,
+			rx: all_messages_rx
+		},
+	)
+}