From fe8d2bc7f45c3e96d1429606c33f20ebbef462cc Mon Sep 17 00:00:00 2001 From: Adrian Catangiu <adrian@parity.io> Date: Thu, 6 Jan 2022 15:43:11 +0200 Subject: [PATCH] Add BEEFY `latestFinalized` RPC and deduplicate code between BEEFY and GRANDPA (#10568) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * beefy: add dummy latest_finalized() RPC * beefy: rpc latest_best_beefy() using shared mem * beefy: rpc populate latest_best_beefy() * beefy: rpc handle readiness * beefy: best block over channel - wip Not working because channel can't be simply opened and receiver passed to `rpc_extensions_builder` because `rpc_extensions_builder` has to be `Fn` and not `FnOnce`... and and Receiver side of mpsc can't be cloned yay!.. * beefy: make notification channels payload-agnostic * beefy: use notification mechanism instead of custom channel * beefy: add tracing key to notif channels * sc-utils: add notification channel - wip * beefy: use sc-utils generic notification channel * grandpa: use sc-utils generic notification channel * fix grumbles * beefy-rpc: get best block header instead of number * beefy-rpc: rename to `beefy_getFinalizedHead` * fix nitpicks * client-rpc-notifications: move generic Error from struct to fn * beefy: use header from notification instead of getting from database * beefy-rpc: get best block hash instead of header * beefy-rpc: fix and improve latestHead test * beefy-rpc: bubble up errors from rpc-handler instantiation * update lockfile * Apply suggestions from code review Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> * fix errors and warnings * fix nit Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> --- substrate/Cargo.lock | 40 ++++ substrate/client/beefy/rpc/Cargo.toml | 4 + substrate/client/beefy/rpc/src/lib.rs | 184 ++++++++++++++++-- .../client/beefy/rpc/src/notification.rs | 8 +- substrate/client/beefy/src/lib.rs | 8 +- substrate/client/beefy/src/notification.rs | 111 +++-------- substrate/client/beefy/src/worker.rs | 42 +++- .../client/finality-grandpa/rpc/src/lib.rs | 2 +- .../finality-grandpa/src/notification.rs | 87 +-------- substrate/client/utils/Cargo.toml | 4 + substrate/client/utils/src/lib.rs | 1 + substrate/client/utils/src/notification.rs | 151 ++++++++++++++ 12 files changed, 448 insertions(+), 194 deletions(-) create mode 100644 substrate/client/utils/src/notification.rs diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 1dbd5404911..e5192d2c8fe 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -323,6 +323,27 @@ dependencies = [ "trust-dns-resolver", ] +[[package]] +name = "async-stream" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "171374e7e3b2504e0e5236e3b59260560f9fe94bfe9ac39ba5e4e929c5590625" +dependencies = [ + "async-stream-impl", + "futures-core", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "4.0.3" @@ -508,6 +529,7 @@ version = "4.0.0-dev" dependencies = [ "beefy-gadget", "beefy-primitives", + "derive_more", "futures 0.3.16", "jsonrpc-core", "jsonrpc-core-client", @@ -515,12 +537,15 @@ dependencies = [ "jsonrpc-pubsub", "log 0.4.14", "parity-scale-codec", + "parking_lot 0.11.2", "sc-rpc", + "sc-utils", "serde", "serde_json", "sp-core", "sp-runtime", "substrate-test-runtime-client", + "thiserror", ] [[package]] @@ -8743,7 +8768,9 @@ dependencies = [ "futures 0.3.16", "futures-timer", "lazy_static", + "parking_lot 0.11.2", "prometheus", + "tokio-test", ] [[package]] @@ -10745,6 +10772,19 @@ dependencies = [ "tokio-reactor", ] +[[package]] +name = "tokio-test" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" +dependencies = [ + "async-stream", + "bytes 1.1.0", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tls" version = "0.2.1" diff --git a/substrate/client/beefy/rpc/Cargo.toml b/substrate/client/beefy/rpc/Cargo.toml index bc81732dcd4..0f29804d678 100644 --- a/substrate/client/beefy/rpc/Cargo.toml +++ b/substrate/client/beefy/rpc/Cargo.toml @@ -8,8 +8,11 @@ repository = "https://github.com/paritytech/substrate" description = "RPC for the BEEFY Client gadget for substrate" [dependencies] +derive_more = "0.99" futures = "0.3.16" log = "0.4" +parking_lot = "0.11" +thiserror = "1.0" serde = { version = "1.0.132", features = ["derive"] } jsonrpc-core = "18.0.0" @@ -20,6 +23,7 @@ jsonrpc-pubsub = "18.0.0" codec = { version = "2.2.0", package = "parity-scale-codec", features = ["derive"] } sc-rpc = { version = "4.0.0-dev", path = "../../rpc" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } sp-core = { version = "4.1.0-dev", path = "../../../primitives/core" } sp-runtime = { version = "4.0.0", path = "../../../primitives/runtime" } diff --git a/substrate/client/beefy/rpc/src/lib.rs b/substrate/client/beefy/rpc/src/lib.rs index be1c9b8691a..dc9ee8b9470 100644 --- a/substrate/client/beefy/rpc/src/lib.rs +++ b/substrate/client/beefy/rpc/src/lib.rs @@ -20,19 +20,62 @@ #![warn(missing_docs)] +use parking_lot::RwLock; use std::sync::Arc; use sp_runtime::traits::Block as BlockT; -use futures::{FutureExt, SinkExt, StreamExt}; +use futures::{task::SpawnError, FutureExt, SinkExt, StreamExt, TryFutureExt}; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{manager::SubscriptionManager, typed::Subscriber, SubscriptionId}; use log::warn; -use beefy_gadget::notification::BeefySignedCommitmentStream; +use beefy_gadget::notification::{BeefyBestBlockStream, BeefySignedCommitmentStream}; mod notification; +type FutureResult<T> = jsonrpc_core::BoxFuture<Result<T, jsonrpc_core::Error>>; + +#[derive(Debug, derive_more::Display, derive_more::From, thiserror::Error)] +/// Top-level error type for the RPC handler +pub enum Error { + /// The BEEFY RPC endpoint is not ready. + #[display(fmt = "BEEFY RPC endpoint not ready")] + EndpointNotReady, + /// The BEEFY RPC background task failed to spawn. + #[display(fmt = "BEEFY RPC background task failed to spawn")] + RpcTaskFailure(SpawnError), +} + +/// The error codes returned by jsonrpc. +pub enum ErrorCode { + /// Returned when BEEFY RPC endpoint is not ready. + NotReady = 1, + /// Returned on BEEFY RPC background task failure. + TaskFailure = 2, +} + +impl From<Error> for ErrorCode { + fn from(error: Error) -> Self { + match error { + Error::EndpointNotReady => ErrorCode::NotReady, + Error::RpcTaskFailure(_) => ErrorCode::TaskFailure, + } + } +} + +impl From<Error> for jsonrpc_core::Error { + fn from(error: Error) -> Self { + let message = format!("{}", error); + let code = ErrorCode::from(error); + jsonrpc_core::Error { + message, + code: jsonrpc_core::ErrorCode::ServerError(code as i64), + data: None, + } + } +} + /// Provides RPC methods for interacting with BEEFY. #[rpc] pub trait BeefyApi<Notification, Hash> { @@ -62,26 +105,57 @@ pub trait BeefyApi<Notification, Hash> { metadata: Option<Self::Metadata>, id: SubscriptionId, ) -> jsonrpc_core::Result<bool>; + + /// Returns hash of the latest BEEFY finalized block as seen by this client. + /// + /// The latest BEEFY block might not be available if the BEEFY gadget is not running + /// in the network or if the client is still initializing or syncing with the network. + /// In such case an error would be returned. + #[rpc(name = "beefy_getFinalizedHead")] + fn latest_finalized(&self) -> FutureResult<Hash>; } /// Implements the BeefyApi RPC trait for interacting with BEEFY. pub struct BeefyRpcHandler<Block: BlockT> { signed_commitment_stream: BeefySignedCommitmentStream<Block>, + beefy_best_block: Arc<RwLock<Option<Block::Hash>>>, manager: SubscriptionManager, } impl<Block: BlockT> BeefyRpcHandler<Block> { /// Creates a new BeefyRpcHandler instance. - pub fn new<E>(signed_commitment_stream: BeefySignedCommitmentStream<Block>, executor: E) -> Self + pub fn new<E>( + signed_commitment_stream: BeefySignedCommitmentStream<Block>, + best_block_stream: BeefyBestBlockStream<Block>, + executor: E, + ) -> Result<Self, Error> where E: futures::task::Spawn + Send + Sync + 'static, { + let beefy_best_block = Arc::new(RwLock::new(None)); + + let stream = best_block_stream.subscribe(); + let closure_clone = beefy_best_block.clone(); + let future = stream.for_each(move |best_beefy| { + let async_clone = closure_clone.clone(); + async move { + *async_clone.write() = Some(best_beefy); + } + }); + + executor + .spawn_obj(futures::task::FutureObj::new(Box::pin(future))) + .map_err(|e| { + log::error!("Failed to spawn BEEFY RPC background task; err: {}", e); + e + })?; + let manager = SubscriptionManager::new(Arc::new(executor)); - Self { signed_commitment_stream, manager } + Ok(Self { signed_commitment_stream, beefy_best_block, manager }) } } -impl<Block> BeefyApi<notification::SignedCommitment, Block> for BeefyRpcHandler<Block> +impl<Block> BeefyApi<notification::EncodedSignedCommitment, Block::Hash> for BeefyRpcHandler<Block> where Block: BlockT, { @@ -90,12 +164,12 @@ where fn subscribe_justifications( &self, _metadata: Self::Metadata, - subscriber: Subscriber<notification::SignedCommitment>, + subscriber: Subscriber<notification::EncodedSignedCommitment>, ) { let stream = self .signed_commitment_stream .subscribe() - .map(|x| Ok::<_, ()>(Ok(notification::SignedCommitment::new::<Block>(x)))); + .map(|x| Ok::<_, ()>(Ok(notification::EncodedSignedCommitment::new::<Block>(x)))); self.manager.add(subscriber, |sink| { stream @@ -111,6 +185,17 @@ where ) -> jsonrpc_core::Result<bool> { Ok(self.manager.cancel(id)) } + + fn latest_finalized(&self) -> FutureResult<Block::Hash> { + let result: Result<Block::Hash, jsonrpc_core::Error> = self + .beefy_best_block + .read() + .as_ref() + .cloned() + .ok_or(Error::EndpointNotReady.into()); + let future = async move { result }.boxed(); + future.map_err(jsonrpc_core::Error::from).boxed() + } } #[cfg(test)] @@ -118,16 +203,30 @@ mod tests { use super::*; use jsonrpc_core::{types::Params, Notification, Output}; - use beefy_gadget::notification::{BeefySignedCommitmentSender, SignedCommitment}; + use beefy_gadget::notification::{BeefySignedCommitment, BeefySignedCommitmentSender}; use beefy_primitives::{known_payload_ids, Payload}; use codec::{Decode, Encode}; + use sp_runtime::traits::{BlakeTwo256, Hash}; use substrate_test_runtime_client::runtime::Block; fn setup_io_handler( ) -> (jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>, BeefySignedCommitmentSender<Block>) { - let (commitment_sender, commitment_stream) = BeefySignedCommitmentStream::channel(); + let (_, stream) = BeefyBestBlockStream::<Block>::channel(); + setup_io_handler_with_best_block_stream(stream) + } + + fn setup_io_handler_with_best_block_stream( + best_block_stream: BeefyBestBlockStream<Block>, + ) -> (jsonrpc_core::MetaIoHandler<sc_rpc::Metadata>, BeefySignedCommitmentSender<Block>) { + let (commitment_sender, commitment_stream) = + BeefySignedCommitmentStream::<Block>::channel(); - let handler = BeefyRpcHandler::new(commitment_stream, sc_rpc::testing::TaskExecutor); + let handler: BeefyRpcHandler<Block> = BeefyRpcHandler::new( + commitment_stream, + best_block_stream, + sc_rpc::testing::TaskExecutor, + ) + .unwrap(); let mut io = jsonrpc_core::MetaIoHandler::default(); io.extend_with(BeefyApi::to_delegate(handler)); @@ -141,6 +240,56 @@ mod tests { (meta, rx) } + #[test] + fn uninitialized_rpc_handler() { + let (io, _) = setup_io_handler(); + + let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#; + let response = r#"{"jsonrpc":"2.0","error":{"code":1,"message":"BEEFY RPC endpoint not ready"},"id":1}"#; + + let meta = sc_rpc::Metadata::default(); + assert_eq!(Some(response.into()), io.handle_request_sync(request, meta)); + } + + #[test] + fn latest_finalized_rpc() { + let (sender, stream) = BeefyBestBlockStream::<Block>::channel(); + let (io, _) = setup_io_handler_with_best_block_stream(stream); + + let hash = BlakeTwo256::hash(b"42"); + let r: Result<(), ()> = sender.notify(|| Ok(hash)); + r.unwrap(); + + // Verify RPC `beefy_getFinalizedHead` returns expected hash. + let request = r#"{"jsonrpc":"2.0","method":"beefy_getFinalizedHead","params":[],"id":1}"#; + let expected = "{\ + \"jsonrpc\":\"2.0\",\ + \"result\":\"0x2f0039e93a27221fcf657fb877a1d4f60307106113e885096cb44a461cd0afbf\",\ + \"id\":1\ + }"; + let not_ready = "{\ + \"jsonrpc\":\"2.0\",\ + \"error\":{\"code\":1,\"message\":\"BEEFY RPC endpoint not ready\"},\ + \"id\":1\ + }"; + + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); + while std::time::Instant::now() < deadline { + let meta = sc_rpc::Metadata::default(); + let response = io.handle_request_sync(request, meta); + // Retry "not ready" responses. + if response != Some(not_ready.into()) { + assert_eq!(response, Some(expected.into())); + // Success + return + } + std::thread::sleep(std::time::Duration::from_millis(50)); + } + panic!( + "Deadline reached while waiting for best BEEFY block to update. Perhaps the background task is broken?" + ); + } + #[test] fn subscribe_and_unsubscribe_to_justifications() { let (io, _) = setup_io_handler(); @@ -159,7 +308,7 @@ mod tests { // Unsubscribe let unsub_req = format!( - "{{\"jsonrpc\":\"2.0\",\"method\":\"beefy_unsubscribeJustifications\",\"params\":[{}],\"id\":1}}", + r#"{{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":[{}],"id":1}}"#, sub_id ); assert_eq!( @@ -170,7 +319,7 @@ mod tests { // Unsubscribe again and fail assert_eq!( io.handle_request_sync(&unsub_req, meta), - Some("{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32602,\"message\":\"Invalid subscription id.\"},\"id\":1}".into()), + Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into()), ); } @@ -192,13 +341,13 @@ mod tests { r#"{"jsonrpc":"2.0","method":"beefy_unsubscribeJustifications","params":["FOO"],"id":1}"#, meta.clone() ), - Some("{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32602,\"message\":\"Invalid subscription id.\"},\"id\":1}".into()) + Some(r#"{"jsonrpc":"2.0","error":{"code":-32602,"message":"Invalid subscription id."},"id":1}"#.into()) ); } - fn create_commitment() -> SignedCommitment<Block> { + fn create_commitment() -> BeefySignedCommitment<Block> { let payload = Payload::new(known_payload_ids::MMR_ROOT_ID, "Hello World!".encode()); - SignedCommitment::<Block> { + BeefySignedCommitment::<Block> { commitment: beefy_primitives::Commitment { payload, block_number: 5, @@ -223,7 +372,8 @@ mod tests { // Notify with commitment let commitment = create_commitment(); - commitment_sender.notify(commitment.clone()); + let r: Result<(), ()> = commitment_sender.notify(|| Ok(commitment.clone())); + r.unwrap(); // Inspect what we received let recv = futures::executor::block_on(receiver.take(1).collect::<Vec<_>>()); @@ -236,7 +386,7 @@ mod tests { let recv_sub_id: String = serde_json::from_value(json_map["subscription"].take()).unwrap(); let recv_commitment: sp_core::Bytes = serde_json::from_value(json_map["result"].take()).unwrap(); - let recv_commitment: SignedCommitment<Block> = + let recv_commitment: BeefySignedCommitment<Block> = Decode::decode(&mut &recv_commitment[..]).unwrap(); assert_eq!(recv.method, "beefy_justifications"); diff --git a/substrate/client/beefy/rpc/src/notification.rs b/substrate/client/beefy/rpc/src/notification.rs index 53c0bb618c5..2f58c7c6bb5 100644 --- a/substrate/client/beefy/rpc/src/notification.rs +++ b/substrate/client/beefy/rpc/src/notification.rs @@ -25,15 +25,15 @@ use sp_runtime::traits::Block as BlockT; /// The given bytes should be the SCALE-encoded representation of a /// `beefy_primitives::SignedCommitment`. #[derive(Clone, Serialize, Deserialize)] -pub struct SignedCommitment(sp_core::Bytes); +pub struct EncodedSignedCommitment(sp_core::Bytes); -impl SignedCommitment { +impl EncodedSignedCommitment { pub fn new<Block>( - signed_commitment: beefy_gadget::notification::SignedCommitment<Block>, + signed_commitment: beefy_gadget::notification::BeefySignedCommitment<Block>, ) -> Self where Block: BlockT, { - SignedCommitment(signed_commitment.encode().into()) + EncodedSignedCommitment(signed_commitment.encode().into()) } } diff --git a/substrate/client/beefy/src/lib.rs b/substrate/client/beefy/src/lib.rs index 7d2c3b57b1f..9b2bf383df8 100644 --- a/substrate/client/beefy/src/lib.rs +++ b/substrate/client/beefy/src/lib.rs @@ -31,6 +31,8 @@ use sp_runtime::traits::Block; use beefy_primitives::BeefyApi; +use crate::notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}; + mod error; mod gossip; mod keystore; @@ -121,7 +123,9 @@ where /// Gossip network pub network: N, /// BEEFY signed commitment sender - pub signed_commitment_sender: notification::BeefySignedCommitmentSender<B>, + pub signed_commitment_sender: BeefySignedCommitmentSender<B>, + /// BEEFY best block sender + pub beefy_best_block_sender: BeefyBestBlockSender<B>, /// Minimal delta between blocks, BEEFY should vote for pub min_block_delta: u32, /// Prometheus metric registry @@ -147,6 +151,7 @@ where key_store, network, signed_commitment_sender, + beefy_best_block_sender, min_block_delta, prometheus_registry, protocol_name, @@ -174,6 +179,7 @@ where backend, key_store: key_store.into(), signed_commitment_sender, + beefy_best_block_sender, gossip_engine, gossip_validator, min_block_delta, diff --git a/substrate/client/beefy/src/notification.rs b/substrate/client/beefy/src/notification.rs index cd410ec60bb..7c18d809f6e 100644 --- a/substrate/client/beefy/src/notification.rs +++ b/substrate/client/beefy/src/notification.rs @@ -16,98 +16,41 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use std::sync::Arc; +use sc_utils::notification::{NotificationSender, NotificationStream, TracingKeyStr}; +use sp_runtime::traits::{Block as BlockT, NumberFor}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use sp_runtime::traits::{Block, NumberFor}; - -use parking_lot::Mutex; - -/// Stream of signed commitments returned when subscribing. -pub type SignedCommitment<Block> = +/// A commitment with matching BEEFY authorities' signatures. +pub type BeefySignedCommitment<Block> = beefy_primitives::SignedCommitment<NumberFor<Block>, beefy_primitives::crypto::Signature>; -/// Stream of signed commitments returned when subscribing. -type SignedCommitmentStream<Block> = TracingUnboundedReceiver<SignedCommitment<Block>>; - -/// Sending endpoint for notifying about signed commitments. -type SignedCommitmentSender<Block> = TracingUnboundedSender<SignedCommitment<Block>>; - -/// Collection of channel sending endpoints shared with the receiver side so they can register -/// themselves. -type SharedSignedCommitmentSenders<Block> = Arc<Mutex<Vec<SignedCommitmentSender<Block>>>>; - -/// The sending half of the signed commitment channel(s). -/// -/// Used to send notifications about signed commitments generated at the end of a BEEFY round. -#[derive(Clone)] -pub struct BeefySignedCommitmentSender<B> -where - B: Block, -{ - subscribers: SharedSignedCommitmentSenders<B>, -} - -impl<B> BeefySignedCommitmentSender<B> -where - B: Block, -{ - /// The `subscribers` should be shared with a corresponding `SignedCommitmentSender`. - fn new(subscribers: SharedSignedCommitmentSenders<B>) -> Self { - Self { subscribers } - } +/// The sending half of the notifications channel(s) used to send +/// notifications about best BEEFY block from the gadget side. +pub type BeefyBestBlockSender<Block> = NotificationSender<<Block as BlockT>::Hash>; - /// Send out a notification to all subscribers that a new signed commitment is available for a - /// block. - pub fn notify(&self, signed_commitment: SignedCommitment<B>) { - let mut subscribers = self.subscribers.lock(); +/// The receiving half of a notifications channel used to receive +/// notifications about best BEEFY blocks determined on the gadget side. +pub type BeefyBestBlockStream<Block> = + NotificationStream<<Block as BlockT>::Hash, BeefyBestBlockTracingKey>; - // do an initial prune on closed subscriptions - subscribers.retain(|n| !n.is_closed()); +/// The sending half of the notifications channel(s) used to send notifications +/// about signed commitments generated at the end of a BEEFY round. +pub type BeefySignedCommitmentSender<Block> = NotificationSender<BeefySignedCommitment<Block>>; - if !subscribers.is_empty() { - subscribers.retain(|n| n.unbounded_send(signed_commitment.clone()).is_ok()); - } - } -} +/// The receiving half of a notifications channel used to receive notifications +/// about signed commitments generated at the end of a BEEFY round. +pub type BeefySignedCommitmentStream<Block> = + NotificationStream<BeefySignedCommitment<Block>, BeefySignedCommitmentTracingKey>; -/// The receiving half of the signed commitments channel. -/// -/// Used to receive notifications about signed commitments generated at the end of a BEEFY round. -/// The `BeefySignedCommitmentStream` entity stores the `SharedSignedCommitmentSenders` so it can be -/// used to add more subscriptions. +/// Provides tracing key for BEEFY best block stream. #[derive(Clone)] -pub struct BeefySignedCommitmentStream<B> -where - B: Block, -{ - subscribers: SharedSignedCommitmentSenders<B>, +pub struct BeefyBestBlockTracingKey; +impl TracingKeyStr for BeefyBestBlockTracingKey { + const TRACING_KEY: &'static str = "mpsc_beefy_best_block_notification_stream"; } -impl<B> BeefySignedCommitmentStream<B> -where - B: Block, -{ - /// Creates a new pair of receiver and sender of signed commitment notifications. - pub fn channel() -> (BeefySignedCommitmentSender<B>, Self) { - let subscribers = Arc::new(Mutex::new(vec![])); - let receiver = BeefySignedCommitmentStream::new(subscribers.clone()); - let sender = BeefySignedCommitmentSender::new(subscribers); - (sender, receiver) - } - - /// Create a new receiver of signed commitment notifications. - /// - /// The `subscribers` should be shared with a corresponding `BeefySignedCommitmentSender`. - fn new(subscribers: SharedSignedCommitmentSenders<B>) -> Self { - Self { subscribers } - } - - /// Subscribe to a channel through which signed commitments are sent at the end of each BEEFY - /// voting round. - pub fn subscribe(&self) -> SignedCommitmentStream<B> { - let (sender, receiver) = tracing_unbounded("mpsc_signed_commitments_notification_stream"); - self.subscribers.lock().push(sender); - receiver - } +/// Provides tracing key for BEEFY signed commitments stream. +#[derive(Clone)] +pub struct BeefySignedCommitmentTracingKey; +impl TracingKeyStr for BeefySignedCommitmentTracingKey { + const TRACING_KEY: &'static str = "mpsc_beefy_signed_commitments_notification_stream"; } diff --git a/substrate/client/beefy/src/worker.rs b/substrate/client/beefy/src/worker.rs index d3aa988b8ee..0c7d8d4ffdc 100644 --- a/substrate/client/beefy/src/worker.rs +++ b/substrate/client/beefy/src/worker.rs @@ -46,7 +46,8 @@ use crate::{ keystore::BeefyKeystore, metric_inc, metric_set, metrics::Metrics, - notification, round, Client, + notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}, + round, Client, }; pub(crate) struct WorkerParams<B, BE, C> @@ -56,7 +57,8 @@ where pub client: Arc<C>, pub backend: Arc<BE>, pub key_store: BeefyKeystore, - pub signed_commitment_sender: notification::BeefySignedCommitmentSender<B>, + pub signed_commitment_sender: BeefySignedCommitmentSender<B>, + pub beefy_best_block_sender: BeefyBestBlockSender<B>, pub gossip_engine: GossipEngine<B>, pub gossip_validator: Arc<GossipValidator<B>>, pub min_block_delta: u32, @@ -73,7 +75,7 @@ where client: Arc<C>, backend: Arc<BE>, key_store: BeefyKeystore, - signed_commitment_sender: notification::BeefySignedCommitmentSender<B>, + signed_commitment_sender: BeefySignedCommitmentSender<B>, gossip_engine: Arc<Mutex<GossipEngine<B>>>, gossip_validator: Arc<GossipValidator<B>>, /// Min delta in block numbers between two blocks, BEEFY should vote on @@ -85,6 +87,8 @@ where best_grandpa_block: NumberFor<B>, /// Best block a BEEFY voting round has been concluded for best_beefy_block: Option<NumberFor<B>>, + /// Used to keep RPC worker up to date on latest/best beefy + beefy_best_block_sender: BeefyBestBlockSender<B>, /// Validator set id for the last signed commitment last_signed_id: u64, // keep rustc happy @@ -110,6 +114,7 @@ where backend, key_store, signed_commitment_sender, + beefy_best_block_sender, gossip_engine, gossip_validator, min_block_delta, @@ -130,6 +135,7 @@ where best_grandpa_block: client.info().finalized_number, best_beefy_block: None, last_signed_id: 0, + beefy_best_block_sender, _backend: PhantomData, } } @@ -242,6 +248,9 @@ where debug!(target: "beefy", "🥩 New Rounds for id: {:?}", id); self.best_beefy_block = Some(*notification.header.number()); + self.beefy_best_block_sender + .notify(|| Ok::<_, ()>(notification.hash.clone())) + .expect("forwards closure result; the closure always returns Ok; qed."); // this metric is kind of 'fake'. Best BEEFY block should only be updated once we // have a signed commitment for the block. Remove once the above TODO is done. @@ -329,22 +338,23 @@ where // id is stored for skipped session metric calculation self.last_signed_id = rounds.validator_set_id(); + let block_num = round.1; let commitment = Commitment { payload: round.0, - block_number: round.1, + block_number: block_num, validator_set_id: self.last_signed_id, }; let signed_commitment = SignedCommitment { commitment, signatures }; - metric_set!(self, beefy_round_concluded, round.1); + metric_set!(self, beefy_round_concluded, block_num); info!(target: "beefy", "🥩 Round #{} concluded, committed: {:?}.", round.1, signed_commitment); if self .backend .append_justification( - BlockId::Number(round.1), + BlockId::Number(block_num), ( BEEFY_ENGINE_ID, VersionedFinalityProof::V1(signed_commitment.clone()).encode(), @@ -356,11 +366,23 @@ where // conclude certain rounds multiple times. trace!(target: "beefy", "🥩 Failed to append justification: {:?}", signed_commitment); } + self.signed_commitment_sender + .notify(|| Ok::<_, ()>(signed_commitment)) + .expect("forwards closure result; the closure always returns Ok; qed."); + + self.best_beefy_block = Some(block_num); + if let Err(err) = self.client.hash(block_num).map(|h| { + if let Some(hash) = h { + self.beefy_best_block_sender + .notify(|| Ok::<_, ()>(hash)) + .expect("forwards closure result; the closure always returns Ok; qed."); + } + }) { + error!(target: "beefy", "🥩 Failed to get hash for block number {}; err: {:?}", + block_num, err); + } - self.signed_commitment_sender.notify(signed_commitment); - self.best_beefy_block = Some(round.1); - - metric_set!(self, beefy_best_block, round.1); + metric_set!(self, beefy_best_block, block_num); } } } diff --git a/substrate/client/finality-grandpa/rpc/src/lib.rs b/substrate/client/finality-grandpa/rpc/src/lib.rs index e509d435af4..bde2e5612b2 100644 --- a/substrate/client/finality-grandpa/rpc/src/lib.rs +++ b/substrate/client/finality-grandpa/rpc/src/lib.rs @@ -469,7 +469,7 @@ mod tests { // Notify with a header and justification let justification = create_justification(); - justification_sender.notify(|| Ok(justification.clone())).unwrap(); + justification_sender.notify(|| Ok::<_, ()>(justification.clone())).unwrap(); // Inspect what we received let recv = futures::executor::block_on(receiver.take(1).collect::<Vec<_>>()); diff --git a/substrate/client/finality-grandpa/src/notification.rs b/substrate/client/finality-grandpa/src/notification.rs index 0d154fb3357..1d6e25e55dc 100644 --- a/substrate/client/finality-grandpa/src/notification.rs +++ b/substrate/client/finality-grandpa/src/notification.rs @@ -16,61 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see <https://www.gnu.org/licenses/>. -use parking_lot::Mutex; -use std::sync::Arc; +use sc_utils::notification::{NotificationSender, NotificationStream, TracingKeyStr}; -use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use sp_runtime::traits::Block as BlockT; - -use crate::{justification::GrandpaJustification, Error}; - -// Stream of justifications returned when subscribing. -type JustificationStream<Block> = TracingUnboundedReceiver<GrandpaJustification<Block>>; - -// Sending endpoint for notifying about justifications. -type JustificationSender<Block> = TracingUnboundedSender<GrandpaJustification<Block>>; - -// Collection of channel sending endpoints shared with the receiver side so they can register -// themselves. -type SharedJustificationSenders<Block> = Arc<Mutex<Vec<JustificationSender<Block>>>>; +use crate::justification::GrandpaJustification; /// The sending half of the Grandpa justification channel(s). /// /// Used to send notifications about justifications generated /// at the end of a Grandpa round. -#[derive(Clone)] -pub struct GrandpaJustificationSender<Block: BlockT> { - subscribers: SharedJustificationSenders<Block>, -} - -impl<Block: BlockT> GrandpaJustificationSender<Block> { - /// The `subscribers` should be shared with a corresponding - /// `GrandpaJustificationStream`. - fn new(subscribers: SharedJustificationSenders<Block>) -> Self { - Self { subscribers } - } - - /// Send out a notification to all subscribers that a new justification - /// is available for a block. - pub fn notify( - &self, - justification: impl FnOnce() -> Result<GrandpaJustification<Block>, Error>, - ) -> Result<(), Error> { - let mut subscribers = self.subscribers.lock(); - - // do an initial prune on closed subscriptions - subscribers.retain(|n| !n.is_closed()); - - // if there's no subscribers we avoid creating - // the justification which is a costly operation - if !subscribers.is_empty() { - let justification = justification()?; - subscribers.retain(|n| n.unbounded_send(justification.clone()).is_ok()); - } - - Ok(()) - } -} +pub type GrandpaJustificationSender<Block> = NotificationSender<GrandpaJustification<Block>>; /// The receiving half of the Grandpa justification channel. /// @@ -78,33 +32,12 @@ impl<Block: BlockT> GrandpaJustificationSender<Block> { /// at the end of a Grandpa round. /// The `GrandpaJustificationStream` entity stores the `SharedJustificationSenders` /// so it can be used to add more subscriptions. -#[derive(Clone)] -pub struct GrandpaJustificationStream<Block: BlockT> { - subscribers: SharedJustificationSenders<Block>, -} - -impl<Block: BlockT> GrandpaJustificationStream<Block> { - /// Creates a new pair of receiver and sender of justification notifications. - pub fn channel() -> (GrandpaJustificationSender<Block>, Self) { - let subscribers = Arc::new(Mutex::new(vec![])); - let receiver = GrandpaJustificationStream::new(subscribers.clone()); - let sender = GrandpaJustificationSender::new(subscribers.clone()); - (sender, receiver) - } +pub type GrandpaJustificationStream<Block> = + NotificationStream<GrandpaJustification<Block>, GrandpaJustificationsTracingKey>; - /// Create a new receiver of justification notifications. - /// - /// The `subscribers` should be shared with a corresponding - /// `GrandpaJustificationSender`. - fn new(subscribers: SharedJustificationSenders<Block>) -> Self { - Self { subscribers } - } - - /// Subscribe to a channel through which justifications are sent - /// at the end of each Grandpa voting round. - pub fn subscribe(&self) -> JustificationStream<Block> { - let (sender, receiver) = tracing_unbounded("mpsc_justification_notification_stream"); - self.subscribers.lock().push(sender); - receiver - } +/// Provides tracing key for GRANDPA justifications stream. +#[derive(Clone)] +pub struct GrandpaJustificationsTracingKey; +impl TracingKeyStr for GrandpaJustificationsTracingKey { + const TRACING_KEY: &'static str = "mpsc_grandpa_justification_notification_stream"; } diff --git a/substrate/client/utils/Cargo.toml b/substrate/client/utils/Cargo.toml index 827164b702c..24075f932b5 100644 --- a/substrate/client/utils/Cargo.toml +++ b/substrate/client/utils/Cargo.toml @@ -12,9 +12,13 @@ readme = "README.md" [dependencies] futures = "0.3.9" lazy_static = "1.4.0" +parking_lot = "0.11" prometheus = { version = "0.13.0", default-features = false } futures-timer = "3.0.2" [features] default = ["metered"] metered = [] + +[dev-dependencies] +tokio-test = "0.4.2" diff --git a/substrate/client/utils/src/lib.rs b/substrate/client/utils/src/lib.rs index fab4365c8ed..b3fb8400b12 100644 --- a/substrate/client/utils/src/lib.rs +++ b/substrate/client/utils/src/lib.rs @@ -38,4 +38,5 @@ pub mod metrics; pub mod mpsc; +pub mod notification; pub mod status_sinks; diff --git a/substrate/client/utils/src/notification.rs b/substrate/client/utils/src/notification.rs new file mode 100644 index 00000000000..21d01c5f99f --- /dev/null +++ b/substrate/client/utils/src/notification.rs @@ -0,0 +1,151 @@ +// This file is part of Substrate. + +// Copyright (C) 2021-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <https://www.gnu.org/licenses/>. + +//! Provides mpsc notification channel that can be instantiated +//! _after_ it's been shared to the consumer and producers entities. +//! +//! Useful when building RPC extensions where, at service definition time, we +//! don't know whether the specific interface where the RPC extension will be +//! exposed is safe or not and we want to lazily build the RPC extension +//! whenever we bind the service to an interface. +//! +//! See [`sc-service::builder::RpcExtensionBuilder`] for more details. + +use std::{marker::PhantomData, sync::Arc}; + +use crate::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; + +use parking_lot::Mutex; + +/// Collection of channel sending endpoints shared with the receiver side +/// so they can register themselves. +type SharedSenders<Payload> = Arc<Mutex<Vec<TracingUnboundedSender<Payload>>>>; + +/// Trait used to define the "tracing key" string used to tag +/// and identify the mpsc channels. +pub trait TracingKeyStr { + /// Const `str` representing the "tracing key" used to tag and identify + /// the mpsc channels owned by the object implemeting this trait. + const TRACING_KEY: &'static str; +} + +/// The sending half of the notifications channel(s). +/// +/// Used to send notifications from the BEEFY gadget side. +#[derive(Clone)] +pub struct NotificationSender<Payload: Clone> { + subscribers: SharedSenders<Payload>, +} + +impl<Payload: Clone> NotificationSender<Payload> { + /// The `subscribers` should be shared with a corresponding `NotificationStream`. + fn new(subscribers: SharedSenders<Payload>) -> Self { + Self { subscribers } + } + + /// Send out a notification to all subscribers that a new payload is available for a + /// block. + pub fn notify<Error>( + &self, + payload: impl FnOnce() -> Result<Payload, Error>, + ) -> Result<(), Error> { + let mut subscribers = self.subscribers.lock(); + + // do an initial prune on closed subscriptions + subscribers.retain(|n| !n.is_closed()); + + if !subscribers.is_empty() { + let payload = payload()?; + subscribers.retain(|n| n.unbounded_send(payload.clone()).is_ok()); + } + + Ok(()) + } +} + +/// The receiving half of the notifications channel. +/// +/// The `NotificationStream` entity stores the `SharedSenders` so it can be +/// used to add more subscriptions. +#[derive(Clone)] +pub struct NotificationStream<Payload: Clone, TK: TracingKeyStr> { + subscribers: SharedSenders<Payload>, + _trace_key: PhantomData<TK>, +} + +impl<Payload: Clone, TK: TracingKeyStr> NotificationStream<Payload, TK> { + /// Creates a new pair of receiver and sender of `Payload` notifications. + pub fn channel() -> (NotificationSender<Payload>, Self) { + let subscribers = Arc::new(Mutex::new(vec![])); + let receiver = NotificationStream::new(subscribers.clone()); + let sender = NotificationSender::new(subscribers); + (sender, receiver) + } + + /// Create a new receiver of `Payload` notifications. + /// + /// The `subscribers` should be shared with a corresponding `NotificationSender`. + fn new(subscribers: SharedSenders<Payload>) -> Self { + Self { subscribers, _trace_key: PhantomData } + } + + /// Subscribe to a channel through which the generic payload can be received. + pub fn subscribe(&self) -> TracingUnboundedReceiver<Payload> { + let (sender, receiver) = tracing_unbounded(TK::TRACING_KEY); + self.subscribers.lock().push(sender); + receiver + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::StreamExt; + + #[derive(Clone)] + pub struct DummyTracingKey; + impl TracingKeyStr for DummyTracingKey { + const TRACING_KEY: &'static str = "test_notification_stream"; + } + + type StringStream = NotificationStream<String, DummyTracingKey>; + + #[test] + fn notification_channel_simple() { + let (sender, stream) = StringStream::channel(); + + let test_payload = String::from("test payload"); + let closure_payload = test_payload.clone(); + + // Create a future to receive a single notification + // from the stream and verify its payload. + let future = stream.subscribe().take(1).for_each(move |payload| { + let test_payload = closure_payload.clone(); + async move { + assert_eq!(payload, test_payload); + } + }); + + // Send notification. + let r: std::result::Result<(), ()> = sender.notify(|| Ok(test_payload)); + r.unwrap(); + + // Run receiver future. + tokio_test::block_on(future); + } +} -- GitLab