diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index 1dbd5404911348b9cb6eca37a8f33b94713206bf..e5192d2c8fe17441ff7f28c48864c6fd722f3966 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -323,6 +323,27 @@ dependencies = [
  "trust-dns-resolver",
 ]
 
+[[package]]
+name = "async-stream"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625"
+dependencies = [
+ "async-stream-impl",
+ "futures-core",
+]
+
+[[package]]
+name = "async-stream-impl"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "async-task"
 version = "4.0.3"
@@ -508,6 +529,7 @@ version = "4.0.0-dev"
 dependencies = [
  "beefy-gadget",
  "beefy-primitives",
+ "derive_more",
  "futures 0.3.16",
  "jsonrpc-core",
  "jsonrpc-core-client",
@@ -515,12 +537,15 @@ dependencies = [
  "jsonrpc-pubsub",
  "log 0.4.14",
  "parity-scale-codec",
+ "parking_lot 0.11.2",
  "sc-rpc",
+ "sc-utils",
  "serde",
  "serde_json",
  "sp-core",
  "sp-runtime",
  "substrate-test-runtime-client",
+ "thiserror",
 ]
 
 [[package]]
@@ -8743,7 +8768,9 @@ dependencies = [
  "futures 0.3.16",
  "futures-timer",
  "lazy_static",
+ "parking_lot 0.11.2",
  "prometheus",
+ "tokio-test",
 ]
 
 [[package]]
@@ -10745,6 +10772,19 @@ dependencies = [
  "tokio-reactor",
 ]
 
+[[package]]
+name = "tokio-test"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3"
+dependencies = [
+ "async-stream",
+ "bytes 1.1.0",
+ "futures-core",
+ "tokio",
+ "tokio-stream",
+]
+
 [[package]]
 name = "tokio-tls"
 version = "0.2.1"
diff --git a/substrate/client/beefy/rpc/Cargo.toml b/substrate/client/beefy/rpc/Cargo.toml
index bc81732dcd41d863cfc48314f1363cbe37970633..0f29804d6780e2662b3cf83975f90330aeb5283f 100644
--- a/substrate/client/beefy/rpc/Cargo.toml
+++ b/substrate/client/beefy/rpc/Cargo.toml
@@ -8,8 +8,11 @@ repository = "https://github.com/paritytech/substrate"
 description = "RPC for the BEEFY Client gadget for substrate"
 
 [dependencies]
+derive_more = "0.99"
 futures = "0.3.16"
 log = "0.4"
+parking_lot = "0.11"
+thiserror = "1.0"
 serde = { version = "1.0.132", features = ["derive"] }
 
 jsonrpc-core = "18.0.0"
@@ -20,6 +23,7 @@ jsonrpc-pubsub = "18.0.0"
 codec = { version = "2.2.0", package = "parity-scale-codec", features = ["derive"] }
 
 sc-rpc = { version = "4.0.0-dev", path = "../../rpc" }
+sc-utils = { version = "4.0.0-dev", path = "../../utils" }
 
 sp-core = { version = "4.1.0-dev", path = "../../../primitives/core" }
 sp-runtime = { version = "4.0.0", path = "../../../primitives/runtime" }
diff --git a/substrate/client/beefy/rpc/src/lib.rs b/substrate/client/beefy/rpc/src/lib.rs
index be1c9b8691a2a3eb7964a767747d4c743043d47f..dc9ee8b94701b72729dc4daaa101433a49a40df1 100644
--- a/substrate/client/beefy/rpc/src/lib.rs
+++ b/substrate/client/beefy/rpc/src/lib.rs
@@ -20,19 +20,62 @@
 
 #![warn(missing_docs)]
 
+use parking_lot::RwLock;
 use std::sync::Arc;
 
 use sp_runtime::traits::Block as BlockT;
 
-use futures::{FutureExt, SinkExt, StreamExt};
+use futures::{task::SpawnError, FutureExt, SinkExt, StreamExt, TryFutureExt};
 use jsonrpc_derive::rpc;
 use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId};
 use log::warn;
 
-use beefy_gadget::notification::BeefySignedCommitmentStream;
+use beefy_gadget::notification::{BeefyBestBlockStream, BeefySignedCommitmentStream};
 
 mod notification;
 
+type FutureResult<T> = jsonrpc_core::BoxFuture<Result<T, jsonrpc_core::Error>>;
+
+#[derive(Debug, derive_more::Display, derive_more::From, thiserror::Error)]
+/// Top-level error type for the RPC handler
+pub enum Error {
+	/// The BEEFY RPC endpoint is not ready.
+	#[display(fmt = "BEEFY RPC endpoint not ready")]
+	EndpointNotReady,
+	/// The BEEFY RPC background task failed to spawn.
+	#[display(fmt = "BEEFY RPC background task failed to spawn")]
+	RpcTaskFailure(SpawnError),
+}
+
+/// The error codes returned by jsonrpc.
+pub enum ErrorCode {
+	/// Returned when BEEFY RPC endpoint is not ready.
+	NotReady = 1,
+	/// Returned on BEEFY RPC background task failure.
+	TaskFailure = 2,
+}
+
+impl From<Error> for ErrorCode {
+	fn from(error: Error) -> Self {
+		match error {
+			Error::EndpointNotReady => ErrorCode::NotReady,
+			Error::RpcTaskFailure(_) => ErrorCode::TaskFailure,
+		}
+	}
+}
+
+impl From<Error> for jsonrpc_core::Error {
+	fn from(error: Error) -> Self {
+		let message = format!("{}", error);
+		let code = ErrorCode::from(error);
+		jsonrpc_core::Error {
+			message,
+			code: jsonrpc_core::ErrorCode::ServerError(code as i64),
+			data: None,
+		}
+	}
+}
+
 /// Provides RPC methods for interacting with BEEFY.
 #[rpc]
 pub trait BeefyApi<Notification, Hash> {
@@ -62,26 +105,57 @@ pub trait BeefyApi<Notification, Hash> {
 		metadata: Option<Self::Metadata>,
 		id: SubscriptionId,
 	) -> jsonrpc_core::Result<bool>;
+
+	/// Returns hash of the latest BEEFY finalized block as seen by this client.
+	///
+	/// The latest BEEFY block might not be available if the BEEFY gadget is not running
+	/// in the network or if the client is still initializing or syncing with the network.
+	/// In such case an error would be returned.
+	#[rpc(name = "beefy_getFinalizedHead")]
+	fn latest_finalized(&self) -> FutureResult<Hash>;
 }
 
 /// Implements the BeefyApi RPC trait for interacting with BEEFY.
 pub struct BeefyRpcHandler<Block: BlockT> {
 	signed_commitment_stream: BeefySignedCommitmentStream<Block>,
+	beefy_best_block: Arc<RwLock<Option<Block::Hash>>>,
 	manager: SubscriptionManager,
 }
 
 impl<Block: BlockT> BeefyRpcHandler<Block> {
 	/// Creates a new BeefyRpcHandler instance.
-	pub fn new<E>(signed_commitment_stream: BeefySignedCommitmentStream<Block>, executor: E) -> Self
+	pub fn new<E>(
+		signed_commitment_stream: BeefySignedCommitmentStream<Block>,
+		best_block_stream: BeefyBestBlockStream<Block>,
+		executor: E,
+	) -> Result<Self, Error>
 	where
 		E: futures::task::Spawn + Send + Sync + 'static,
 	{
+		let beefy_best_block = Arc::new(RwLock::new(None));
+
+		let stream = best_block_stream.subscribe();
+		let closure_clone = beefy_best_block.clone();
+		let future = stream.for_each(move |best_beefy| {
+			let async_clone = closure_clone.clone();
+			async move {
+				*async_clone.write() = Some(best_beefy);
+			}
+		});
+
+		executor
+			.spawn_obj(futures::task::FutureObj::new(Box::pin(future)))
+			.map_err(|e| {
+				log::error!("Failed to spawn BEEFY RPC background task; err: {}", e);
+				e
+			})?;
+
 		let manager = SubscriptionManager::new(Arc::new(executor));
-		Self { signed_commitment_stream, manager }
+		Ok(Self { signed_commitment_stream, beefy_best_block, manager })
 	}
 }
 
-impl<Block> BeefyApi<notification::SignedCommitment, Block> for BeefyRpcHandler<Block>
+impl<Block> BeefyApi<notification::EncodedSignedCommitment, Block::Hash> for BeefyRpcHandler<Block>
 where
 	Block: BlockT,
 {
@@ -90,12 +164,12 @@ where
 	fn subscribe_justifications(
 		&self,
 		_metadata: Self::Metadata,
-		subscriber: Subscriber<notification::SignedCommitment>,
+		subscriber: Subscriber<notification::EncodedSignedCommitment>,
 	) {
 		let stream = self
 			.signed_commitment_stream
 			.subscribe()
-			.map(|x| Ok::<_, ()>(Ok(notification::SignedCommitment::new::<Block>(x))));
+			.map(|x| Ok::<_, ()>(Ok(notification::EncodedSignedCommitment::new::<Block>(x))));
 
 		self.manager.add(subscriber, |sink| {
 			stream
@@ -111,6 +185,17 @@ where
 	) -> jsonrpc_core::Result<bool> {
 		Ok(self.manager.cancel(id))
 	}
+
+	fn latest_finalized(&self) -> FutureResult<Block::Hash> {
+		let result: Result<Block::Hash, jsonrpc_core::Error> = self
+			.beefy_best_block
+			.read()
+			.as_ref()
+			.cloned()
+			.ok_or(Error::EndpointNotReady.into());
+		let future = async move { result }.boxed();
+		future.map_err(jsonrpc_core::Error::from).boxed()
+	}
 }
 
 #[cfg(test)]
@@ -118,16 +203,30 @@ mod tests {
 	use super::*;
 	use jsonrpc_core::{types::Params, Notification, Output};
 
-	use beefy_gadget::notification::{BeefySignedCommitmentSender, SignedCommitment};
+	use beefy_gadget::notification::{BeefySignedCommitment, BeefySignedCommitmentSender};
 	use beefy_primitives::{known_payload_ids, Payload};
 	use codec::{Decode, Encode};
+	use sp_runtime::traits::{BlakeTwo256, Hash};
 	use substrate_test_runtime_client::runtime::Block;
 
 	fn setup_io_handler(
 	) -> (jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>, BeefySignedCommitmentSender<Block>) {
-		let (commitment_sender, commitment_stream) = BeefySignedCommitmentStream::channel();
+		let (_, stream) = BeefyBestBlockStream::<Block>::channel();
+		setup_io_handler_with_best_block_stream(stream)
+	}
+
+	fn setup_io_handler_with_best_block_stream(
+		best_block_stream: BeefyBestBlockStream<Block>,
+	) -> (jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>, BeefySignedCommitmentSender<Block>) {
+		let (commitment_sender, commitment_stream) =
+			BeefySignedCommitmentStream::<Block>::channel();
 
-		let handler = BeefyRpcHandler::new(commitment_stream, sc_rpc::testing::TaskExecutor);
+		let handler: BeefyRpcHandler<Block> = BeefyRpcHandler::new(
+			commitment_stream,
+			best_block_stream,
+			sc_rpc::testing::TaskExecutor,
+		)
+		.unwrap();
 
 		let mut io = jsonrpc_core::MetaIoHandler::default();
 		io.extend_with(BeefyApi::to_delegate(handler));
@@ -141,6 +240,56 @@ mod tests {
 		(meta, rx)
 	}
 
+	#[test]
+	fn uninitialized_rpc_handler() {
+		let (io, _) = setup_io_handler();
+
+		let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
+		let response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"BEEFY RPC endpoint not ready"},"id":1}"#;
+
+		let meta = sc_rpc::Metadata::default();
+		assert_eq!(Some(response.into()), io.handle_request_sync(request, meta));
+	}
+
+	#[test]
+	fn latest_finalized_rpc() {
+		let (sender, stream) = BeefyBestBlockStream::<Block>::channel();
+		let (io, _) = setup_io_handler_with_best_block_stream(stream);
+
+		let hash = BlakeTwo256::hash(b"42");
+		let r: Result<(), ()> = sender.notify(|| Ok(hash));
+		r.unwrap();
+
+		// Verify RPC `beefy_getFinalizedHead` returns expected hash.
+		let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#;
+		let expected = "{\
+			\"jsonrpc\":\"2.0\",\
+			\"result\":\"0x2f0039e93a27221fcf657fb877a1d4f60307106113e885096cb44a461cd0afbf\",\
+			\"id\":1\
+		}";
+		let not_ready = "{\
+			\"jsonrpc\":\"2.0\",\
+			\"error\":{\"code\":1,\"message\":\"BEEFY RPC endpoint not ready\"},\
+			\"id\":1\
+		}";
+
+		let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
+		while std::time::Instant::now() < deadline {
+			let meta = sc_rpc::Metadata::default();
+			let response = io.handle_request_sync(request, meta);
+			// Retry "not ready" responses.
+			if response != Some(not_ready.into()) {
+				assert_eq!(response, Some(expected.into()));
+				// Success
+				return
+			}
+			std::thread::sleep(std::time::Duration::from_millis(50));
+		}
+		panic!(
+			"Deadline reached while waiting for best BEEFY block to update. Perhaps the background task is broken?"
+		);
+	}
+
 	#[test]
 	fn subscribe_and_unsubscribe_to_justifications() {
 		let (io, _) = setup_io_handler();
@@ -159,7 +308,7 @@ mod tests {
 
 		// Unsubscribe
 		let unsub_req = format!(
-			"{{\"jsonrpc\":\"2.0\",\"method\":\"beefy_unsubscribeJustifications\",\"params\":[{}],\"id\":1}}",
+			r#"{{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":[{}],"id":1}}"#,
 			sub_id
 		);
 		assert_eq!(
@@ -170,7 +319,7 @@ mod tests {
 		// Unsubscribe again and fail
 		assert_eq!(
 			io.handle_request_sync(&unsub_req, meta),
-			Some("{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32602,\"message\":\"Invalid subscription id.\"},\"id\":1}".into()),
+			Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into()),
 		);
 	}
 
@@ -192,13 +341,13 @@ mod tests {
 				r#"{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":["FOO"],"id":1}"#,
 				meta.clone()
 			),
-			Some("{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32602,\"message\":\"Invalid subscription id.\"},\"id\":1}".into())
+			Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into())
 		);
 	}
 
-	fn create_commitment() -> SignedCommitment<Block> {
+	fn create_commitment() -> BeefySignedCommitment<Block> {
 		let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, "Hello World!".encode());
-		SignedCommitment::<Block> {
+		BeefySignedCommitment::<Block> {
 			commitment: beefy_primitives::Commitment {
 				payload,
 				block_number: 5,
@@ -223,7 +372,8 @@ mod tests {
 
 		// Notify with commitment
 		let commitment = create_commitment();
-		commitment_sender.notify(commitment.clone());
+		let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone()));
+		r.unwrap();
 
 		// Inspect what we received
 		let recv = futures::executor::block_on(receiver.take(1).collect::<Vec<_>>());
@@ -236,7 +386,7 @@ mod tests {
 		let recv_sub_id: String = serde_json::from_value(json_map["subscription"].take()).unwrap();
 		let recv_commitment: sp_core::Bytes =
 			serde_json::from_value(json_map["result"].take()).unwrap();
-		let recv_commitment: SignedCommitment<Block> =
+		let recv_commitment: BeefySignedCommitment<Block> =
 			Decode::decode(&mut &recv_commitment[..]).unwrap();
 
 		assert_eq!(recv.method, "beefy_justifications");
diff --git a/substrate/client/beefy/rpc/src/notification.rs b/substrate/client/beefy/rpc/src/notification.rs
index 53c0bb618c5d5a1ea817213d282c35d79ac1b922..2f58c7c6bb5dc8a1e146f4d75f04ac0b9b57d35c 100644
--- a/substrate/client/beefy/rpc/src/notification.rs
+++ b/substrate/client/beefy/rpc/src/notification.rs
@@ -25,15 +25,15 @@ use sp_runtime::traits::Block as BlockT;
 /// The given bytes should be the SCALE-encoded representation of a
 /// `beefy_primitives::SignedCommitment`.
 #[derive(Clone, Serialize, Deserialize)]
-pub struct SignedCommitment(sp_core::Bytes);
+pub struct EncodedSignedCommitment(sp_core::Bytes);
 
-impl SignedCommitment {
+impl EncodedSignedCommitment {
 	pub fn new<Block>(
-		signed_commitment: beefy_gadget::notification::SignedCommitment<Block>,
+		signed_commitment: beefy_gadget::notification::BeefySignedCommitment<Block>,
 	) -> Self
 	where
 		Block: BlockT,
 	{
-		SignedCommitment(signed_commitment.encode().into())
+		EncodedSignedCommitment(signed_commitment.encode().into())
 	}
 }
diff --git a/substrate/client/beefy/src/lib.rs b/substrate/client/beefy/src/lib.rs
index 7d2c3b57b1f70ec41e180d84d70a2b6a39c3fccf..9b2bf383df8ef7a2b6696d1557fa0fb8b7b0b9ca 100644
--- a/substrate/client/beefy/src/lib.rs
+++ b/substrate/client/beefy/src/lib.rs
@@ -31,6 +31,8 @@ use sp_runtime::traits::Block;
 
 use beefy_primitives::BeefyApi;
 
+use crate::notification::{BeefyBestBlockSender, BeefySignedCommitmentSender};
+
 mod error;
 mod gossip;
 mod keystore;
@@ -121,7 +123,9 @@ where
 	/// Gossip network
 	pub network: N,
 	/// BEEFY signed commitment sender
-	pub signed_commitment_sender: notification::BeefySignedCommitmentSender<B>,
+	pub signed_commitment_sender: BeefySignedCommitmentSender<B>,
+	/// BEEFY best block sender
+	pub beefy_best_block_sender: BeefyBestBlockSender<B>,
 	/// Minimal delta between blocks, BEEFY should vote for
 	pub min_block_delta: u32,
 	/// Prometheus metric registry
@@ -147,6 +151,7 @@ where
 		key_store,
 		network,
 		signed_commitment_sender,
+		beefy_best_block_sender,
 		min_block_delta,
 		prometheus_registry,
 		protocol_name,
@@ -174,6 +179,7 @@ where
 		backend,
 		key_store: key_store.into(),
 		signed_commitment_sender,
+		beefy_best_block_sender,
 		gossip_engine,
 		gossip_validator,
 		min_block_delta,
diff --git a/substrate/client/beefy/src/notification.rs b/substrate/client/beefy/src/notification.rs
index cd410ec60bb3c453f41fc0bd90acf62bb4dda5b3..7c18d809f6efb1c3230509b2433d41a7fb0fd936 100644
--- a/substrate/client/beefy/src/notification.rs
+++ b/substrate/client/beefy/src/notification.rs
@@ -16,98 +16,41 @@
 // You should have received a copy of the GNU General Public License
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
-use std::sync::Arc;
+use sc_utils::notification::{NotificationSender, NotificationStream, TracingKeyStr};
+use sp_runtime::traits::{Block as BlockT, NumberFor};
 
-use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
-use sp_runtime::traits::{Block, NumberFor};
-
-use parking_lot::Mutex;
-
-/// Stream of signed commitments returned when subscribing.
-pub type SignedCommitment<Block> =
+/// A commitment with matching BEEFY authorities' signatures.
+pub type BeefySignedCommitment<Block> =
 	beefy_primitives::SignedCommitment<NumberFor<Block>, beefy_primitives::crypto::Signature>;
 
-/// Stream of signed commitments returned when subscribing.
-type SignedCommitmentStream<Block> = TracingUnboundedReceiver<SignedCommitment<Block>>;
-
-/// Sending endpoint for notifying about signed commitments.
-type SignedCommitmentSender<Block> = TracingUnboundedSender<SignedCommitment<Block>>;
-
-/// Collection of channel sending endpoints shared with the receiver side so they can register
-/// themselves.
-type SharedSignedCommitmentSenders<Block> = Arc<Mutex<Vec<SignedCommitmentSender<Block>>>>;
-
-/// The sending half of the signed commitment channel(s).
-///
-/// Used to send notifications about signed commitments generated at the end of a BEEFY round.
-#[derive(Clone)]
-pub struct BeefySignedCommitmentSender<B>
-where
-	B: Block,
-{
-	subscribers: SharedSignedCommitmentSenders<B>,
-}
-
-impl<B> BeefySignedCommitmentSender<B>
-where
-	B: Block,
-{
-	/// The `subscribers` should be shared with a corresponding `SignedCommitmentSender`.
-	fn new(subscribers: SharedSignedCommitmentSenders<B>) -> Self {
-		Self { subscribers }
-	}
+/// The sending half of the notifications channel(s) used to send
+/// notifications about best BEEFY block from the gadget side.
+pub type BeefyBestBlockSender<Block> = NotificationSender<<Block as BlockT>::Hash>;
 
-	/// Send out a notification to all subscribers that a new signed commitment is available for a
-	/// block.
-	pub fn notify(&self, signed_commitment: SignedCommitment<B>) {
-		let mut subscribers = self.subscribers.lock();
+/// The receiving half of a notifications channel used to receive
+/// notifications about best BEEFY blocks determined on the gadget side.
+pub type BeefyBestBlockStream<Block> =
+	NotificationStream<<Block as BlockT>::Hash, BeefyBestBlockTracingKey>;
 
-		// do an initial prune on closed subscriptions
-		subscribers.retain(|n| !n.is_closed());
+/// The sending half of the notifications channel(s) used to send notifications
+/// about signed commitments generated at the end of a BEEFY round.
+pub type BeefySignedCommitmentSender<Block> = NotificationSender<BeefySignedCommitment<Block>>;
 
-		if !subscribers.is_empty() {
-			subscribers.retain(|n| n.unbounded_send(signed_commitment.clone()).is_ok());
-		}
-	}
-}
+/// The receiving half of a notifications channel used to receive notifications
+/// about signed commitments generated at the end of a BEEFY round.
+pub type BeefySignedCommitmentStream<Block> =
+	NotificationStream<BeefySignedCommitment<Block>, BeefySignedCommitmentTracingKey>;
 
-/// The receiving half of the signed commitments channel.
-///
-/// Used to receive notifications about signed commitments generated at the end of a BEEFY round.
-/// The `BeefySignedCommitmentStream` entity stores the `SharedSignedCommitmentSenders` so it can be
-/// used to add more subscriptions.
+/// Provides tracing key for BEEFY best block stream.
 #[derive(Clone)]
-pub struct BeefySignedCommitmentStream<B>
-where
-	B: Block,
-{
-	subscribers: SharedSignedCommitmentSenders<B>,
+pub struct BeefyBestBlockTracingKey;
+impl TracingKeyStr for BeefyBestBlockTracingKey {
+	const TRACING_KEY: &'static str = "mpsc_beefy_best_block_notification_stream";
 }
 
-impl<B> BeefySignedCommitmentStream<B>
-where
-	B: Block,
-{
-	/// Creates a new pair of receiver and sender of signed commitment notifications.
-	pub fn channel() -> (BeefySignedCommitmentSender<B>, Self) {
-		let subscribers = Arc::new(Mutex::new(vec![]));
-		let receiver = BeefySignedCommitmentStream::new(subscribers.clone());
-		let sender = BeefySignedCommitmentSender::new(subscribers);
-		(sender, receiver)
-	}
-
-	/// Create a new receiver of signed commitment notifications.
-	///
-	/// The `subscribers` should be shared with a corresponding `BeefySignedCommitmentSender`.
-	fn new(subscribers: SharedSignedCommitmentSenders<B>) -> Self {
-		Self { subscribers }
-	}
-
-	/// Subscribe to a channel through which signed commitments are sent at the end of each BEEFY
-	/// voting round.
-	pub fn subscribe(&self) -> SignedCommitmentStream<B> {
-		let (sender, receiver) = tracing_unbounded("mpsc_signed_commitments_notification_stream");
-		self.subscribers.lock().push(sender);
-		receiver
-	}
+/// Provides tracing key for BEEFY signed commitments stream.
+#[derive(Clone)]
+pub struct BeefySignedCommitmentTracingKey;
+impl TracingKeyStr for BeefySignedCommitmentTracingKey {
+	const TRACING_KEY: &'static str = "mpsc_beefy_signed_commitments_notification_stream";
 }
diff --git a/substrate/client/beefy/src/worker.rs b/substrate/client/beefy/src/worker.rs
index d3aa988b8ee27e366af3dac68e99c75a4165687a..0c7d8d4ffdc9c873b666e683b9b190b5f38faeb8 100644
--- a/substrate/client/beefy/src/worker.rs
+++ b/substrate/client/beefy/src/worker.rs
@@ -46,7 +46,8 @@ use crate::{
 	keystore::BeefyKeystore,
 	metric_inc, metric_set,
 	metrics::Metrics,
-	notification, round, Client,
+	notification::{BeefyBestBlockSender, BeefySignedCommitmentSender},
+	round, Client,
 };
 
 pub(crate) struct WorkerParams<B, BE, C>
@@ -56,7 +57,8 @@ where
 	pub client: Arc<C>,
 	pub backend: Arc<BE>,
 	pub key_store: BeefyKeystore,
-	pub signed_commitment_sender: notification::BeefySignedCommitmentSender<B>,
+	pub signed_commitment_sender: BeefySignedCommitmentSender<B>,
+	pub beefy_best_block_sender: BeefyBestBlockSender<B>,
 	pub gossip_engine: GossipEngine<B>,
 	pub gossip_validator: Arc<GossipValidator<B>>,
 	pub min_block_delta: u32,
@@ -73,7 +75,7 @@ where
 	client: Arc<C>,
 	backend: Arc<BE>,
 	key_store: BeefyKeystore,
-	signed_commitment_sender: notification::BeefySignedCommitmentSender<B>,
+	signed_commitment_sender: BeefySignedCommitmentSender<B>,
 	gossip_engine: Arc<Mutex<GossipEngine<B>>>,
 	gossip_validator: Arc<GossipValidator<B>>,
 	/// Min delta in block numbers between two blocks, BEEFY should vote on
@@ -85,6 +87,8 @@ where
 	best_grandpa_block: NumberFor<B>,
 	/// Best block a BEEFY voting round has been concluded for
 	best_beefy_block: Option<NumberFor<B>>,
+	/// Used to keep RPC worker up to date on latest/best beefy
+	beefy_best_block_sender: BeefyBestBlockSender<B>,
 	/// Validator set id for the last signed commitment
 	last_signed_id: u64,
 	// keep rustc happy
@@ -110,6 +114,7 @@ where
 			backend,
 			key_store,
 			signed_commitment_sender,
+			beefy_best_block_sender,
 			gossip_engine,
 			gossip_validator,
 			min_block_delta,
@@ -130,6 +135,7 @@ where
 			best_grandpa_block: client.info().finalized_number,
 			best_beefy_block: None,
 			last_signed_id: 0,
+			beefy_best_block_sender,
 			_backend: PhantomData,
 		}
 	}
@@ -242,6 +248,9 @@ where
 				debug!(target: "beefy", "🥩 New Rounds for id: {:?}", id);
 
 				self.best_beefy_block = Some(*notification.header.number());
+				self.beefy_best_block_sender
+					.notify(|| Ok::<_, ()>(notification.hash.clone()))
+					.expect("forwards closure result; the closure always returns Ok; qed.");
 
 				// this metric is kind of 'fake'. Best BEEFY block should only be updated once we
 				// have a signed commitment for the block. Remove once the above TODO is done.
@@ -329,22 +338,23 @@ where
 				// id is stored for skipped session metric calculation
 				self.last_signed_id = rounds.validator_set_id();
 
+				let block_num = round.1;
 				let commitment = Commitment {
 					payload: round.0,
-					block_number: round.1,
+					block_number: block_num,
 					validator_set_id: self.last_signed_id,
 				};
 
 				let signed_commitment = SignedCommitment { commitment, signatures };
 
-				metric_set!(self, beefy_round_concluded, round.1);
+				metric_set!(self, beefy_round_concluded, block_num);
 
 				info!(target: "beefy", "🥩 Round #{} concluded, committed: {:?}.", round.1, signed_commitment);
 
 				if self
 					.backend
 					.append_justification(
-						BlockId::Number(round.1),
+						BlockId::Number(block_num),
 						(
 							BEEFY_ENGINE_ID,
 							VersionedFinalityProof::V1(signed_commitment.clone()).encode(),
@@ -356,11 +366,23 @@ where
 					// conclude certain rounds multiple times.
 					trace!(target: "beefy", "🥩 Failed to append justification: {:?}", signed_commitment);
 				}
+				self.signed_commitment_sender
+					.notify(|| Ok::<_, ()>(signed_commitment))
+					.expect("forwards closure result; the closure always returns Ok; qed.");
+
+				self.best_beefy_block = Some(block_num);
+				if let Err(err) = self.client.hash(block_num).map(|h| {
+					if let Some(hash) = h {
+						self.beefy_best_block_sender
+							.notify(|| Ok::<_, ()>(hash))
+							.expect("forwards closure result; the closure always returns Ok; qed.");
+					}
+				}) {
+					error!(target: "beefy", "🥩 Failed to get hash for block number {}; err: {:?}",
+						block_num, err);
+				}
 
-				self.signed_commitment_sender.notify(signed_commitment);
-				self.best_beefy_block = Some(round.1);
-
-				metric_set!(self, beefy_best_block, round.1);
+				metric_set!(self, beefy_best_block, block_num);
 			}
 		}
 	}
diff --git a/substrate/client/finality-grandpa/rpc/src/lib.rs b/substrate/client/finality-grandpa/rpc/src/lib.rs
index e509d435af4ad846f03b5e39afa9cdf4eefe150d..bde2e5612b2e6da3c96b49b5cc4a9065442f86b0 100644
--- a/substrate/client/finality-grandpa/rpc/src/lib.rs
+++ b/substrate/client/finality-grandpa/rpc/src/lib.rs
@@ -469,7 +469,7 @@ mod tests {
 
 		// Notify with a header and justification
 		let justification = create_justification();
-		justification_sender.notify(|| Ok(justification.clone())).unwrap();
+		justification_sender.notify(|| Ok::<_, ()>(justification.clone())).unwrap();
 
 		// Inspect what we received
 		let recv = futures::executor::block_on(receiver.take(1).collect::<Vec<_>>());
diff --git a/substrate/client/finality-grandpa/src/notification.rs b/substrate/client/finality-grandpa/src/notification.rs
index 0d154fb3357e48369dabd90f297e251cc66bc085..1d6e25e55dc65bf98fc75482fa487945b27d61af 100644
--- a/substrate/client/finality-grandpa/src/notification.rs
+++ b/substrate/client/finality-grandpa/src/notification.rs
@@ -16,61 +16,15 @@
 // You should have received a copy of the GNU General Public License
 // along with this program. If not, see <https://www.gnu.org/licenses/>.
 
-use parking_lot::Mutex;
-use std::sync::Arc;
+use sc_utils::notification::{NotificationSender, NotificationStream, TracingKeyStr};
 
-use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
-use sp_runtime::traits::Block as BlockT;
-
-use crate::{justification::GrandpaJustification, Error};
-
-// Stream of justifications returned when subscribing.
-type JustificationStream<Block> = TracingUnboundedReceiver<GrandpaJustification<Block>>;
-
-// Sending endpoint for notifying about justifications.
-type JustificationSender<Block> = TracingUnboundedSender<GrandpaJustification<Block>>;
-
-// Collection of channel sending endpoints shared with the receiver side so they can register
-// themselves.
-type SharedJustificationSenders<Block> = Arc<Mutex<Vec<JustificationSender<Block>>>>;
+use crate::justification::GrandpaJustification;
 
 /// The sending half of the Grandpa justification channel(s).
 ///
 /// Used to send notifications about justifications generated
 /// at the end of a Grandpa round.
-#[derive(Clone)]
-pub struct GrandpaJustificationSender<Block: BlockT> {
-	subscribers: SharedJustificationSenders<Block>,
-}
-
-impl<Block: BlockT> GrandpaJustificationSender<Block> {
-	/// The `subscribers` should be shared with a corresponding
-	/// `GrandpaJustificationStream`.
-	fn new(subscribers: SharedJustificationSenders<Block>) -> Self {
-		Self { subscribers }
-	}
-
-	/// Send out a notification to all subscribers that a new justification
-	/// is available for a block.
-	pub fn notify(
-		&self,
-		justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>,
-	) -> Result<(), Error> {
-		let mut subscribers = self.subscribers.lock();
-
-		// do an initial prune on closed subscriptions
-		subscribers.retain(|n| !n.is_closed());
-
-		// if there's no subscribers we avoid creating
-		// the justification which is a costly operation
-		if !subscribers.is_empty() {
-			let justification = justification()?;
-			subscribers.retain(|n| n.unbounded_send(justification.clone()).is_ok());
-		}
-
-		Ok(())
-	}
-}
+pub type GrandpaJustificationSender<Block> = NotificationSender<GrandpaJustification<Block>>;
 
 /// The receiving half of the Grandpa justification channel.
 ///
@@ -78,33 +32,12 @@ impl<Block: BlockT> GrandpaJustificationSender<Block> {
 /// at the end of a Grandpa round.
 /// The `GrandpaJustificationStream` entity stores the `SharedJustificationSenders`
 /// so it can be used to add more subscriptions.
-#[derive(Clone)]
-pub struct GrandpaJustificationStream<Block: BlockT> {
-	subscribers: SharedJustificationSenders<Block>,
-}
-
-impl<Block: BlockT> GrandpaJustificationStream<Block> {
-	/// Creates a new pair of receiver and sender of justification notifications.
-	pub fn channel() -> (GrandpaJustificationSender<Block>, Self) {
-		let subscribers = Arc::new(Mutex::new(vec![]));
-		let receiver = GrandpaJustificationStream::new(subscribers.clone());
-		let sender = GrandpaJustificationSender::new(subscribers.clone());
-		(sender, receiver)
-	}
+pub type GrandpaJustificationStream<Block> =
+	NotificationStream<GrandpaJustification<Block>, GrandpaJustificationsTracingKey>;
 
-	/// Create a new receiver of justification notifications.
-	///
-	/// The `subscribers` should be shared with a corresponding
-	/// `GrandpaJustificationSender`.
-	fn new(subscribers: SharedJustificationSenders<Block>) -> Self {
-		Self { subscribers }
-	}
-
-	/// Subscribe to a channel through which justifications are sent
-	/// at the end of each Grandpa voting round.
-	pub fn subscribe(&self) -> JustificationStream<Block> {
-		let (sender, receiver) = tracing_unbounded("mpsc_justification_notification_stream");
-		self.subscribers.lock().push(sender);
-		receiver
-	}
+/// Provides tracing key for GRANDPA justifications stream.
+#[derive(Clone)]
+pub struct GrandpaJustificationsTracingKey;
+impl TracingKeyStr for GrandpaJustificationsTracingKey {
+	const TRACING_KEY: &'static str = "mpsc_grandpa_justification_notification_stream";
 }
diff --git a/substrate/client/utils/Cargo.toml b/substrate/client/utils/Cargo.toml
index 827164b702c6f63d0f30f3ed9582b7314f661d46..24075f932b5085c205ed6ec98b03486433332ca7 100644
--- a/substrate/client/utils/Cargo.toml
+++ b/substrate/client/utils/Cargo.toml
@@ -12,9 +12,13 @@ readme = "README.md"
 [dependencies]
 futures = "0.3.9"
 lazy_static = "1.4.0"
+parking_lot = "0.11"
 prometheus = { version = "0.13.0", default-features = false }
 futures-timer = "3.0.2"
 
 [features]
 default = ["metered"]
 metered = []
+
+[dev-dependencies]
+tokio-test = "0.4.2"
diff --git a/substrate/client/utils/src/lib.rs b/substrate/client/utils/src/lib.rs
index fab4365c8ed832cfff8f1aac7e3377b6ed2e5539..b3fb8400b12f6401934e1eb6bf4b8a2815811fd8 100644
--- a/substrate/client/utils/src/lib.rs
+++ b/substrate/client/utils/src/lib.rs
@@ -38,4 +38,5 @@
 
 pub mod metrics;
 pub mod mpsc;
+pub mod notification;
 pub mod status_sinks;
diff --git a/substrate/client/utils/src/notification.rs b/substrate/client/utils/src/notification.rs
new file mode 100644
index 0000000000000000000000000000000000000000..21d01c5f99fef6172dc2cee2373641a516f8660b
--- /dev/null
+++ b/substrate/client/utils/src/notification.rs
@@ -0,0 +1,151 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <https://www.gnu.org/licenses/>.
+
+//! Provides mpsc notification channel that can be instantiated
+//! _after_ it's been shared to the consumer and producers entities.
+//!
+//! Useful when building RPC extensions where, at service definition time, we
+//! don't know whether the specific interface where the RPC extension will be
+//! exposed is safe or not and we want to lazily build the RPC extension
+//! whenever we bind the service to an interface.
+//!
+//! See [`sc-service::builder::RpcExtensionBuilder`] for more details.
+
+use std::{marker::PhantomData, sync::Arc};
+
+use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
+
+use parking_lot::Mutex;
+
+/// Collection of channel sending endpoints shared with the receiver side
+/// so they can register themselves.
+type SharedSenders<Payload> = Arc<Mutex<Vec<TracingUnboundedSender<Payload>>>>;
+
+/// Trait used to define the "tracing key" string used to tag
+/// and identify the mpsc channels.
+pub trait TracingKeyStr {
+	/// Const `str` representing the "tracing key" used to tag and identify
+	/// the mpsc channels owned by the object implemeting this trait.
+	const TRACING_KEY: &'static str;
+}
+
+/// The sending half of the notifications channel(s).
+///
+/// Used to send notifications from the BEEFY gadget side.
+#[derive(Clone)]
+pub struct NotificationSender<Payload: Clone> {
+	subscribers: SharedSenders<Payload>,
+}
+
+impl<Payload: Clone> NotificationSender<Payload> {
+	/// The `subscribers` should be shared with a corresponding `NotificationStream`.
+	fn new(subscribers: SharedSenders<Payload>) -> Self {
+		Self { subscribers }
+	}
+
+	/// Send out a notification to all subscribers that a new payload is available for a
+	/// block.
+	pub fn notify<Error>(
+		&self,
+		payload: impl FnOnce() -> Result<Payload, Error>,
+	) -> Result<(), Error> {
+		let mut subscribers = self.subscribers.lock();
+
+		// do an initial prune on closed subscriptions
+		subscribers.retain(|n| !n.is_closed());
+
+		if !subscribers.is_empty() {
+			let payload = payload()?;
+			subscribers.retain(|n| n.unbounded_send(payload.clone()).is_ok());
+		}
+
+		Ok(())
+	}
+}
+
+/// The receiving half of the notifications channel.
+///
+/// The `NotificationStream` entity stores the `SharedSenders` so it can be
+/// used to add more subscriptions.
+#[derive(Clone)]
+pub struct NotificationStream<Payload: Clone, TK: TracingKeyStr> {
+	subscribers: SharedSenders<Payload>,
+	_trace_key: PhantomData<TK>,
+}
+
+impl<Payload: Clone, TK: TracingKeyStr> NotificationStream<Payload, TK> {
+	/// Creates a new pair of receiver and sender of `Payload` notifications.
+	pub fn channel() -> (NotificationSender<Payload>, Self) {
+		let subscribers = Arc::new(Mutex::new(vec![]));
+		let receiver = NotificationStream::new(subscribers.clone());
+		let sender = NotificationSender::new(subscribers);
+		(sender, receiver)
+	}
+
+	/// Create a new receiver of `Payload` notifications.
+	///
+	/// The `subscribers` should be shared with a corresponding `NotificationSender`.
+	fn new(subscribers: SharedSenders<Payload>) -> Self {
+		Self { subscribers, _trace_key: PhantomData }
+	}
+
+	/// Subscribe to a channel through which the generic payload can be received.
+	pub fn subscribe(&self) -> TracingUnboundedReceiver<Payload> {
+		let (sender, receiver) = tracing_unbounded(TK::TRACING_KEY);
+		self.subscribers.lock().push(sender);
+		receiver
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use futures::StreamExt;
+
+	#[derive(Clone)]
+	pub struct DummyTracingKey;
+	impl TracingKeyStr for DummyTracingKey {
+		const TRACING_KEY: &'static str = "test_notification_stream";
+	}
+
+	type StringStream = NotificationStream<String, DummyTracingKey>;
+
+	#[test]
+	fn notification_channel_simple() {
+		let (sender, stream) = StringStream::channel();
+
+		let test_payload = String::from("test payload");
+		let closure_payload = test_payload.clone();
+
+		// Create a future to receive a single notification
+		// from the stream and verify its payload.
+		let future = stream.subscribe().take(1).for_each(move |payload| {
+			let test_payload = closure_payload.clone();
+			async move {
+				assert_eq!(payload, test_payload);
+			}
+		});
+
+		// Send notification.
+		let r: std::result::Result<(), ()> = sender.notify(|| Ok(test_payload));
+		r.unwrap();
+
+		// Run receiver future.
+		tokio_test::block_on(future);
+	}
+}