diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index bffc55f98f78b324596b10bdce2bdc3dee525374..9294c90408ea25d2f0edda614c6a78da94f920e6 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -3308,6 +3308,7 @@ dependencies = [
  "futures-timer 3.0.2",
  "kv-log-macro",
  "log 0.4.8",
+ "polkadot-primitives",
  "streamunordered",
 ]
 
diff --git a/polkadot/overseer/Cargo.toml b/polkadot/overseer/Cargo.toml
index bcd0a8e9e5298187a7bd54a652d19184849a25c4..0bef4442be4f70ce497e86cdce32dfcea7b08032 100644
--- a/polkadot/overseer/Cargo.toml
+++ b/polkadot/overseer/Cargo.toml
@@ -9,6 +9,7 @@ futures = "0.3.5"
 log = "0.4.8"
 futures-timer = "3.0.2"
 streamunordered = "0.5.1"
+polkadot-primitives = { path = "../primitives" }
 
 [dev-dependencies]
 futures = { version = "0.3.5", features = ["thread-pool"] }
diff --git a/polkadot/overseer/examples/minimal-example.rs b/polkadot/overseer/examples/minimal-example.rs
index 4dd37dbafea16836106b9b58300609f677ac7269..4f5f3bdaff88b3716aa3d58084457a5ef9f95a61 100644
--- a/polkadot/overseer/examples/minimal-example.rs
+++ b/polkadot/overseer/examples/minimal-example.rs
@@ -111,6 +111,7 @@ fn main() {
 		});
 
 		let (overseer, _handler) = Overseer::new(
+			vec![],
 			Box::new(Subsystem2),
 			Box::new(Subsystem1),
 			spawner,
diff --git a/polkadot/overseer/src/lib.rs b/polkadot/overseer/src/lib.rs
index f7ac6cac5079d6bf128c6e9764560c0756122d25..7f0849fcb5920fda3b06b6683d6111f43eaf67c0 100644
--- a/polkadot/overseer/src/lib.rs
+++ b/polkadot/overseer/src/lib.rs
@@ -58,6 +58,7 @@ use std::fmt::Debug;
 use std::pin::Pin;
 use std::task::Poll;
 use std::time::Duration;
+use std::collections::HashSet;
 
 use futures::channel::{mpsc, oneshot};
 use futures::{
@@ -70,6 +71,8 @@ use futures::{
 use futures_timer::Delay;
 use streamunordered::{StreamYield, StreamUnordered};
 
+use polkadot_primitives::{BlockNumber, Hash};
+
 /// An error type that describes faults that may happen
 ///
 /// These are:
@@ -136,10 +139,25 @@ enum ToOverseer {
 	},
 }
 
+/// An event telling the `Overseer` on the particular block
+/// that has been imported or finalized.
+///
+/// This structure exists solely for the purposes of decoupling
+/// `Overseer` code from the client code and the necessity to call
+/// `HeaderBackend::block_number_from_id()`.
+pub struct BlockInfo {
+	/// hash of the block.
+	pub hash: Hash,
+	/// hash of the parent block.
+	pub parent_hash: Hash,
+	/// block's number.
+	pub number: BlockNumber,
+}
+
 /// Some event from outer world.
 enum Event {
-	BlockImport,
-	BlockFinalized,
+	BlockImported(BlockInfo),
+	BlockFinalized(BlockInfo),
 	MsgToSubsystem(AllMessages),
 	Stop,
 }
@@ -160,8 +178,8 @@ pub struct OverseerHandler {
 
 impl OverseerHandler {
 	/// Inform the `Overseer` that that some block was imported.
-	pub async fn block_imported(&mut self) -> SubsystemResult<()> {
-		self.events_tx.send(Event::BlockImport).await?;
+	pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
+		self.events_tx.send(Event::BlockImported(block)).await?;
 
 		Ok(())
 	}
@@ -174,8 +192,8 @@ impl OverseerHandler {
 	}
 
 	/// Inform the `Overseer` that that some block was finalized.
-	pub async fn block_finalized(&mut self) -> SubsystemResult<()> {
-		self.events_tx.send(Event::BlockFinalized).await?;
+	pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
+		self.events_tx.send(Event::BlockFinalized(block)).await?;
 
 		Ok(())
 	}
@@ -222,12 +240,12 @@ pub struct SubsystemContext<M: Debug>{
 ///
 /// [`Overseer`]: struct.Overseer.html
 /// [`Subsystem`]: trait.Subsystem.html
-#[derive(Debug)]
+#[derive(PartialEq, Clone, Debug)]
 pub enum OverseerSignal {
 	/// `Subsystem` should start working.
-	StartWork,
+	StartWork(Hash),
 	/// `Subsystem` should stop working.
-	StopWork,
+	StopWork(Hash),
 	/// Conclude the work of the `Overseer` and all `Subsystem`s.
 	Conclude,
 }
@@ -366,6 +384,14 @@ pub struct Overseer<S: Spawn> {
 
 	/// Events that are sent to the overseer from the outside world
 	events_rx: mpsc::Receiver<Event>,
+
+	/// A set of leaves that `Overseer` starts working with.
+	///
+	/// Drained at the beginning of `run` and never used again.
+	leaves: Vec<(Hash, BlockNumber)>,
+
+	/// The set of the "active leaves".
+	active_leaves: HashSet<(Hash, BlockNumber)>,
 }
 
 impl<S> Overseer<S>
@@ -452,6 +478,7 @@ where
 	/// # fn main() { executor::block_on(async move {
 	/// let spawner = executor::ThreadPool::new().unwrap();
 	/// let (overseer, _handler) = Overseer::new(
+	///     vec![],
 	///     Box::new(ValidationSubsystem),
 	///     Box::new(CandidateBackingSubsystem),
 	///     spawner,
@@ -471,6 +498,7 @@ where
 	/// # }); }
 	/// ```
 	pub fn new(
+		leaves: impl IntoIterator<Item = BlockInfo>,
 		validation: Box<dyn Subsystem<ValidationSubsystemMessage> + Send>,
 		candidate_backing: Box<dyn Subsystem<CandidateBackingSubsystemMessage> + Send>,
 		mut s: S,
@@ -498,6 +526,13 @@ where
 			candidate_backing,
 		)?;
 
+		let active_leaves = HashSet::new();
+
+		let leaves = leaves
+			.into_iter()
+			.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
+			.collect();
+
 		let this = Self {
 			validation_subsystem,
 			candidate_backing_subsystem,
@@ -505,6 +540,8 @@ where
 			running_subsystems,
 			running_subsystems_rx,
 			events_rx,
+			leaves,
+			active_leaves,
 		};
 
 		Ok((this, handler))
@@ -537,6 +574,13 @@ where
 
 	/// Run the `Overseer`.
 	pub async fn run(mut self) -> SubsystemResult<()> {
+		let leaves = std::mem::take(&mut self.leaves);
+
+		for leaf in leaves.into_iter() {
+			self.broadcast_signal(OverseerSignal::StartWork(leaf.0)).await?;
+			self.active_leaves.insert(leaf);
+		}
+
 		loop {
 			while let Poll::Ready(Some(msg)) = poll!(&mut self.events_rx.next()) {
 				match msg {
@@ -547,7 +591,12 @@ where
 						self.stop().await;
 						return Ok(());
 					}
-					_ => ()
+					Event::BlockImported(block) => {
+						self.block_imported(block).await?;
+					}
+					Event::BlockFinalized(block) => {
+						self.block_finalized(block).await?;
+					}
 				}
 			}
 
@@ -576,6 +625,50 @@ where
 		}
 	}
 
+	async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
+		if let Some(parent) = self.active_leaves.take(&(block.parent_hash, block.number - 1)) {
+			self.broadcast_signal(OverseerSignal::StopWork(parent.0)).await?;
+		}
+
+		if !self.active_leaves.contains(&(block.hash, block.number)) {
+			self.broadcast_signal(OverseerSignal::StartWork(block.hash)).await?;
+			self.active_leaves.insert((block.hash, block.number));
+		}
+
+		Ok(())
+	}
+
+	async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
+		let mut stop_these = Vec::new();
+
+		self.active_leaves.retain(|(h, n)| {
+			if *n <= block.number {
+				stop_these.push(*h);
+				false
+			} else {
+				true
+			}
+		});
+
+		for hash in stop_these.into_iter() {
+			self.broadcast_signal(OverseerSignal::StopWork(hash)).await?
+		}
+
+		Ok(())
+	}
+
+	async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> {
+		if let Some(ref mut s) = self.validation_subsystem.instance {
+			s.tx.send(FromOverseer::Signal(signal.clone())).await?;
+		}
+
+		if let Some(ref mut s) = self.candidate_backing_subsystem.instance {
+			s.tx.send(FromOverseer::Signal(signal)).await?;
+		}
+
+		Ok(())
+	}
+
 	async fn route_message(&mut self, msg: AllMessages) {
 		match msg {
 			AllMessages::Validation(msg) => {
@@ -591,7 +684,6 @@ where
 		}
 	}
 
-
 	fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> {
 		self.s.spawn(j).map_err(|_| SubsystemError)
 	}
@@ -642,7 +734,7 @@ mod tests {
 							i += 1;
 							continue;
 						}
-						Ok(FromOverseer::Signal(OverseerSignal::StopWork)) => return,
+						Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
 						Err(_) => return,
 						_ => (),
 					}
@@ -668,7 +760,7 @@ mod tests {
 						continue;
 					}
 					match ctx.try_recv().await {
-						Ok(Some(FromOverseer::Signal(OverseerSignal::StopWork))) => {
+						Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => {
 							break;
 						}
 						Ok(Some(_)) => {
@@ -703,6 +795,7 @@ mod tests {
 			let (s2_tx, mut s2_rx) = mpsc::channel(64);
 
 			let (overseer, mut handler) = Overseer::new(
+				vec![],
 				Box::new(TestSubsystem1(s1_tx)),
 				Box::new(TestSubsystem2(s2_tx)),
 				spawner,
@@ -752,6 +845,7 @@ mod tests {
 		executor::block_on(async move {
 			let (s1_tx, _) = mpsc::channel(64);
 			let (overseer, _handle) = Overseer::new(
+				vec![],
 				Box::new(TestSubsystem1(s1_tx)),
 				Box::new(TestSubsystem4),
 				spawner,
@@ -765,4 +859,227 @@ mod tests {
 			}
 		})
 	}
+
+	struct TestSubsystem5(mpsc::Sender<OverseerSignal>);
+
+	impl Subsystem<ValidationSubsystemMessage> for TestSubsystem5 {
+		fn start(&mut self, mut ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
+			let mut sender = self.0.clone();
+
+			SpawnedSubsystem(Box::pin(async move {
+				loop {
+					match ctx.try_recv().await {
+						Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
+						Ok(Some(FromOverseer::Signal(s))) => {
+							sender.send(s).await.unwrap();
+							continue;
+						},
+						Ok(Some(_)) => continue,
+						Err(_) => return,
+						_ => (),
+					}
+					pending!();
+				}
+			}))
+		}
+	}
+
+	struct TestSubsystem6(mpsc::Sender<OverseerSignal>);
+
+	impl Subsystem<CandidateBackingSubsystemMessage> for TestSubsystem6 {
+		fn start(&mut self, mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
+			let mut sender = self.0.clone();
+
+			SpawnedSubsystem(Box::pin(async move {
+				loop {
+					match ctx.try_recv().await {
+						Ok(Some(FromOverseer::Signal(OverseerSignal::Conclude))) => break,
+						Ok(Some(FromOverseer::Signal(s))) => {
+							sender.send(s).await.unwrap();
+							continue;
+						},
+						Ok(Some(_)) => continue,
+						Err(_) => return,
+						_ => (),
+					}
+					pending!();
+				}
+			}))
+		}
+	}
+
+	// Tests that starting with a defined set of leaves and receiving
+	// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
+	#[test]
+	fn overseer_start_stop_works() {
+		let spawner = executor::ThreadPool::new().unwrap();
+
+		executor::block_on(async move {
+			let first_block_hash = [1; 32].into();
+			let second_block_hash = [2; 32].into();
+			let third_block_hash = [3; 32].into();
+
+			let first_block = BlockInfo {
+				hash: first_block_hash,
+				parent_hash: [0; 32].into(),
+				number: 1,
+			};
+			let second_block = BlockInfo {
+				hash: second_block_hash,
+				parent_hash: first_block_hash,
+				number: 2,
+			};
+			let third_block = BlockInfo {
+				hash: third_block_hash,
+				parent_hash: second_block_hash,
+				number: 3,
+			};
+
+			let (tx_5, mut rx_5) = mpsc::channel(64);
+			let (tx_6, mut rx_6) = mpsc::channel(64);
+
+			let (overseer, mut handler) = Overseer::new(
+				vec![first_block],
+				Box::new(TestSubsystem5(tx_5)),
+				Box::new(TestSubsystem6(tx_6)),
+				spawner,
+			).unwrap();
+
+			let overseer_fut = overseer.run().fuse();
+			pin_mut!(overseer_fut);
+
+			let mut ss5_results = Vec::new();
+			let mut ss6_results = Vec::new();
+
+			handler.block_imported(second_block).await.unwrap();
+			handler.block_imported(third_block).await.unwrap();
+
+			let expected_heartbeats = vec![
+				OverseerSignal::StartWork(first_block_hash),
+				OverseerSignal::StopWork(first_block_hash),
+				OverseerSignal::StartWork(second_block_hash),
+				OverseerSignal::StopWork(second_block_hash),
+				OverseerSignal::StartWork(third_block_hash),
+			];
+
+			loop {
+				select! {
+					res = overseer_fut => {
+						assert!(res.is_ok());
+						break;
+					},
+					res = rx_5.next() => {
+						if let Some(res) = res {
+							ss5_results.push(res);
+						}
+					}
+					res = rx_6.next() => {
+						if let Some(res) = res {
+							ss6_results.push(res);
+						}
+					}
+					complete => break,
+				}
+
+				if ss5_results.len() == expected_heartbeats.len() &&
+					ss6_results.len() == expected_heartbeats.len() {
+						handler.stop().await.unwrap();
+				}
+			}
+
+			assert_eq!(ss5_results, expected_heartbeats);
+			assert_eq!(ss6_results, expected_heartbeats);
+		});
+	}
+
+	// Tests that starting with a defined set of leaves and receiving
+	// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
+	#[test]
+	fn overseer_finalize_works() {
+		let spawner = executor::ThreadPool::new().unwrap();
+
+		executor::block_on(async move {
+			let first_block_hash = [1; 32].into();
+			let second_block_hash = [2; 32].into();
+			let third_block_hash = [3; 32].into();
+
+			let first_block = BlockInfo {
+				hash: first_block_hash,
+				parent_hash: [0; 32].into(),
+				number: 1,
+			};
+			let second_block = BlockInfo {
+				hash: second_block_hash,
+				parent_hash: [42; 32].into(),
+				number: 2,
+			};
+			let third_block = BlockInfo {
+				hash: third_block_hash,
+				parent_hash: second_block_hash,
+				number: 3,
+			};
+
+			let (tx_5, mut rx_5) = mpsc::channel(64);
+			let (tx_6, mut rx_6) = mpsc::channel(64);
+
+			// start with two forks of different height.
+			let (overseer, mut handler) = Overseer::new(
+				vec![first_block, second_block],
+				Box::new(TestSubsystem5(tx_5)),
+				Box::new(TestSubsystem6(tx_6)),
+				spawner,
+			).unwrap();
+
+			let overseer_fut = overseer.run().fuse();
+			pin_mut!(overseer_fut);
+
+			let mut ss5_results = Vec::new();
+			let mut ss6_results = Vec::new();
+
+			// this should stop work on both forks we started with earlier.
+			handler.block_finalized(third_block).await.unwrap();
+
+			let expected_heartbeats = vec![
+				OverseerSignal::StartWork(first_block_hash),
+				OverseerSignal::StartWork(second_block_hash),
+				OverseerSignal::StopWork(first_block_hash),
+				OverseerSignal::StopWork(second_block_hash),
+			];
+
+			loop {
+				select! {
+					res = overseer_fut => {
+						assert!(res.is_ok());
+						break;
+					},
+					res = rx_5.next() => {
+						if let Some(res) = res {
+							ss5_results.push(res);
+						}
+					}
+					res = rx_6.next() => {
+						if let Some(res) = res {
+							ss6_results.push(res);
+						}
+					}
+					complete => break,
+				}
+
+				if ss5_results.len() == expected_heartbeats.len() &&
+					ss6_results.len() == expected_heartbeats.len() {
+						handler.stop().await.unwrap();
+				}
+			}
+
+			assert_eq!(ss5_results.len(), expected_heartbeats.len());
+			assert_eq!(ss6_results.len(), expected_heartbeats.len());
+
+			// Notifications on finality for multiple blocks at once
+			// may be received in different orders.
+			for expected in expected_heartbeats {
+				assert!(ss5_results.contains(&expected));
+				assert!(ss6_results.contains(&expected));
+			}
+		});
+	}
 }