From 5db336c7437aa5351915a978eecc0f8d0b65cc55 Mon Sep 17 00:00:00 2001 From: Max Inden <mail@max-inden.de> Date: Mon, 20 Apr 2020 22:40:44 +0200 Subject: [PATCH] client/network-gossip: Move sink IO outside of state_machine (#5669) * client/network-gossip: Move sink IO outside of state_machine `ConsensusGossip` is supposed to be a deterministic state machine. `GossipEngine` wrapping `ConsensusGossip` should handle IO operations. This commit moves the `message_sink` IO operations to `GossipEngine`. More specifically on incoming messages a `GossipEngine` calls `ConsensusGossip::on_incoming` to validate and register the messages. `ConsensusGossip` returns the valid messages which are then forwarded by `GossipEngine` to the upper layer via the `message_sinks`. * client/network-gossip: Adjust and extend tests * Update client/network-gossip/src/bridge.rs Co-authored-by: Benjamin Kampmann <ben.kampmann@googlemail.com> --- substrate/Cargo.lock | 1 + substrate/client/network-gossip/Cargo.toml | 1 + substrate/client/network-gossip/src/bridge.rs | 190 +++++++++++++++--- .../network-gossip/src/state_machine.rs | 100 +++------ 4 files changed, 196 insertions(+), 96 deletions(-) diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 325d602c8ef..7b36a7d08a4 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -6463,6 +6463,7 @@ dependencies = [ name = "sc-network-gossip" version = "0.8.0-dev" dependencies = [ + "async-std", "futures 0.3.4", "futures-timer 3.0.2", "libp2p", diff --git a/substrate/client/network-gossip/Cargo.toml b/substrate/client/network-gossip/Cargo.toml index a4b3f72b043..c6714375fe3 100644 --- a/substrate/client/network-gossip/Cargo.toml +++ b/substrate/client/network-gossip/Cargo.toml @@ -25,4 +25,5 @@ sp-utils = { version = "2.0.0-dev", path = "../../primitives/utils" } wasm-timer = "0.2" [dev-dependencies] +async-std = "1.5" substrate-test-runtime-client = { version = "2.0.0-dev", path = "../../test-utils/runtime/client" } diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index b3bfe606ba0..a15195111e8 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -21,9 +21,16 @@ use sc_network::{Event, ReputationChange}; use futures::prelude::*; use libp2p::PeerId; +use log::trace; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; -use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}}; -use sp_utils::mpsc::TracingUnboundedReceiver; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; +use std::{ + borrow::Cow, + collections::{HashMap, hash_map::Entry}, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; /// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on /// top of it. @@ -31,8 +38,12 @@ pub struct GossipEngine<B: BlockT> { state_machine: ConsensusGossip<B>, network: Box<dyn Network<B> + Send>, periodic_maintenance_interval: futures_timer::Delay, - network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>, engine_id: ConsensusEngineId, + + /// Incoming events from the network. + network_event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>, + /// Outgoing events to the consumer. + message_sinks: HashMap<B::Hash, Vec<TracingUnboundedSender<TopicNotification>>>, } impl<B: BlockT> Unpin for GossipEngine<B> {} @@ -54,8 +65,10 @@ impl<B: BlockT> GossipEngine<B> { state_machine: ConsensusGossip::new(validator, engine_id), network: Box::new(network), periodic_maintenance_interval: futures_timer::Delay::new(PERIODIC_MAINTENANCE_INTERVAL), - network_event_stream, engine_id, + + network_event_stream, + message_sinks: HashMap::new(), } } @@ -85,7 +98,15 @@ impl<B: BlockT> GossipEngine<B> { pub fn messages_for(&mut self, topic: B::Hash) -> TracingUnboundedReceiver<TopicNotification> { - self.state_machine.messages_for(topic) + let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for"); + + for notification in self.state_machine.messages_for(topic) { + tx.unbounded_send(notification).expect("receiver known to be live; qed"); + } + + self.message_sinks.entry(topic).or_default().push(tx); + + rx } /// Send all messages with given topic to a peer. @@ -147,16 +168,40 @@ impl<B: BlockT> Future for GossipEngine<B> { this.state_machine.peer_disconnected(&mut *this.network, remote); }, Event::NotificationsReceived { remote, messages } => { - let engine_id = this.engine_id.clone(); - this.state_machine.on_incoming( + let messages = messages.into_iter().filter_map(|(engine, data)| { + if engine == this.engine_id { + Some(data.to_vec()) + } else { + None + } + }).collect(); + + let to_forward = this.state_machine.on_incoming( &mut *this.network, remote, - messages.into_iter() - .filter_map(|(engine, data)| if engine == engine_id { - Some(data.to_vec()) - } else { None }) - .collect() + messages, ); + + for (topic, notification) in to_forward.into_iter() { + if let Entry::Occupied(mut entry) = this.message_sinks.entry(topic) { + trace!( + target: "gossip", + "Pushing consensus message to sinks for {}.", topic, + ); + entry.get_mut().retain(move |sink| { + if let Err(e) = sink.unbounded_send(notification.clone()) { + trace!( + target: "gossip", + "Error broadcasting message notification: {:?}", e, + ); + } + !sink.is_closed() + }); + if entry.get().is_empty() { + entry.remove_entry(); + } + } + } }, Event::Dht(_) => {} } @@ -169,6 +214,11 @@ impl<B: BlockT> Future for GossipEngine<B> { while let Poll::Ready(()) = this.periodic_maintenance_interval.poll_unpin(cx) { this.periodic_maintenance_interval.reset(PERIODIC_MAINTENANCE_INTERVAL); this.state_machine.tick(&mut *this.network); + + this.message_sinks.retain(|_, sinks| { + sinks.retain(|sink| !sink.is_closed()); + !sinks.is_empty() + }); } Poll::Pending @@ -177,23 +227,34 @@ impl<B: BlockT> Future for GossipEngine<B> { #[cfg(test)] mod tests { - use super::*; + use async_std::task::spawn; use crate::{ValidationResult, ValidatorContext}; + use futures::{channel::mpsc::{channel, Sender}, executor::block_on_stream}; + use sc_network::ObservedRole; + use sp_runtime::{testing::H256, traits::{Block as BlockT}}; + use std::sync::{Arc, Mutex}; use substrate_test_runtime_client::runtime::Block; + use super::*; - struct TestNetwork {} + #[derive(Clone, Default)] + struct TestNetwork { + inner: Arc<Mutex<TestNetworkInner>>, + } + + #[derive(Clone, Default)] + struct TestNetworkInner { + event_senders: Vec<Sender<Event>>, + } - impl<B: BlockT> Network<B> for Arc<TestNetwork> { + impl<B: BlockT> Network<B> for TestNetwork { fn event_stream(&self) -> Pin<Box<dyn Stream<Item = Event> + Send>> { - let (_tx, rx) = futures::channel::mpsc::channel(0); + let (tx, rx) = channel(100); + self.inner.lock().unwrap().event_senders.push(tx); - // Return rx and drop tx. Thus the given channel will yield `Poll::Ready(None)` on first - // poll. Box::pin(rx) } fn report_peer(&self, _: PeerId, _: ReputationChange) { - unimplemented!(); } fn disconnect_peer(&self, _: PeerId) { @@ -211,16 +272,15 @@ mod tests { } } - struct TestValidator {} - - impl<B: BlockT> Validator<B> for TestValidator { + struct AllowAll; + impl Validator<Block> for AllowAll { fn validate( &self, - _: &mut dyn ValidatorContext<B>, - _: &PeerId, - _: &[u8] - ) -> ValidationResult<B::Hash> { - unimplemented!(); + _context: &mut dyn ValidatorContext<Block>, + _sender: &PeerId, + _data: &[u8], + ) -> ValidationResult<H256> { + ValidationResult::ProcessAndKeep(H256::default()) } } @@ -230,13 +290,17 @@ mod tests { /// See https://github.com/paritytech/substrate/issues/5000 for details. #[test] fn returns_when_network_event_stream_closes() { + let network = TestNetwork::default(); let mut gossip_engine = GossipEngine::<Block>::new( - Arc::new(TestNetwork{}), + network.clone(), [1, 2, 3, 4], "my_protocol".as_bytes(), - Arc::new(TestValidator{}), + Arc::new(AllowAll{}), ); + // Drop network event stream sender side. + drop(network.inner.lock().unwrap().event_senders.pop()); + futures::executor::block_on(futures::future::poll_fn(move |ctx| { if let Poll::Pending = gossip_engine.poll_unpin(ctx) { panic!( @@ -247,4 +311,72 @@ mod tests { Poll::Ready(()) })) } + + #[test] + fn keeps_multiple_subscribers_per_topic_updated_with_both_old_and_new_messages() { + let topic = H256::default(); + let engine_id = [1, 2, 3, 4]; + let remote_peer = PeerId::random(); + let network = TestNetwork::default(); + + let mut gossip_engine = GossipEngine::<Block>::new( + network.clone(), + engine_id.clone(), + "my_protocol".as_bytes(), + Arc::new(AllowAll{}), + ); + + let mut event_sender = network.inner.lock() + .unwrap() + .event_senders + .pop() + .unwrap(); + + // Register the remote peer. + event_sender.start_send( + Event::NotificationStreamOpened { + remote: remote_peer.clone(), + engine_id: engine_id.clone(), + role: ObservedRole::Authority, + } + ).unwrap(); + + let messages = vec![vec![1], vec![2]]; + let events = messages.iter().cloned().map(|m| { + Event::NotificationsReceived { + remote: remote_peer.clone(), + messages: vec![(engine_id, m.into())] + } + }).collect::<Vec<_>>(); + + // Send first event before subscribing. + event_sender.start_send(events[0].clone()).unwrap(); + + let mut subscribers = vec![]; + for _ in 0..2 { + subscribers.push(gossip_engine.messages_for(topic)); + } + + // Send second event after subscribing. + event_sender.start_send(events[1].clone()).unwrap(); + + spawn(gossip_engine); + + let mut subscribers = subscribers.into_iter() + .map(|s| block_on_stream(s)) + .collect::<Vec<_>>(); + + // Expect each subscriber to receive both events. + for message in messages { + for subscriber in subscribers.iter_mut() { + assert_eq!( + subscriber.next(), + Some(TopicNotification { + message: message.clone(), + sender: Some(remote_peer.clone()), + }), + ); + } + } + } } diff --git a/substrate/client/network-gossip/src/state_machine.rs b/substrate/client/network-gossip/src/state_machine.rs index d93003fcfb4..53b5b98245a 100644 --- a/substrate/client/network-gossip/src/state_machine.rs +++ b/substrate/client/network-gossip/src/state_machine.rs @@ -16,7 +16,7 @@ use crate::{Network, MessageIntent, Validator, ValidatorContext, ValidationResult}; -use std::collections::{HashMap, HashSet, hash_map::Entry}; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::iter; use std::time; @@ -24,7 +24,6 @@ use log::trace; use lru::LruCache; use libp2p::PeerId; use sp_runtime::traits::{Block as BlockT, Hash, HashFor}; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; use sp_runtime::ConsensusEngineId; use sc_network::ObservedRole; use wasm_timer::Instant; @@ -51,7 +50,7 @@ struct PeerConsensus<H> { } /// Topic stream message with sender. -#[derive(Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct TopicNotification { /// Message data. pub message: Vec<u8>, @@ -147,7 +146,6 @@ fn propagate<'a, B: BlockT, I>( /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip<B: BlockT> { peers: HashMap<PeerId, PeerConsensus<B::Hash>>, - live_message_sinks: HashMap<B::Hash, Vec<TracingUnboundedSender<TopicNotification>>>, messages: Vec<MessageEntry<B>>, known_messages: LruCache<B::Hash, ()>, engine_id: ConsensusEngineId, @@ -160,7 +158,6 @@ impl<B: BlockT> ConsensusGossip<B> { pub fn new(validator: Arc<dyn Validator<B>>, engine_id: ConsensusEngineId) -> Self { ConsensusGossip { peers: HashMap::new(), - live_message_sinks: HashMap::new(), messages: Default::default(), known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE), engine_id, @@ -256,11 +253,6 @@ impl<B: BlockT> ConsensusGossip<B> { /// Prune old or no longer relevant consensus messages. Provide a predicate /// for pruning, which returns `false` when the items with a given topic should be pruned. pub fn collect_garbage(&mut self) { - self.live_message_sinks.retain(|_, sinks| { - sinks.retain(|sink| !sink.is_closed()); - !sinks.is_empty() - }); - let known_messages = &mut self.known_messages; let before = self.messages.len(); @@ -278,33 +270,24 @@ impl<B: BlockT> ConsensusGossip<B> { } } - /// Get data of valid, incoming messages for a topic (but might have expired meanwhile) - pub fn messages_for(&mut self, topic: B::Hash) - -> TracingUnboundedReceiver<TopicNotification> - { - let (tx, rx) = tracing_unbounded("mpsc_gossip_messages_for"); - for entry in self.messages.iter_mut().filter(|e| e.topic == topic) { - tx.unbounded_send(TopicNotification { - message: entry.message.clone(), - sender: entry.sender.clone(), - }) - .expect("receiver known to be live; qed"); - } - - self.live_message_sinks.entry(topic).or_default().push(tx); - - rx + /// Get valid messages received in the past for a topic (might have expired meanwhile). + pub fn messages_for(&mut self, topic: B::Hash) -> impl Iterator<Item = TopicNotification> + '_ { + self.messages.iter().filter(move |e| e.topic == topic).map(|entry| TopicNotification { + message: entry.message.clone(), + sender: entry.sender.clone(), + }) } - /// Handle an incoming message for topic by who via protocol. Discard message if topic already - /// known, the message is old, its source peers isn't a registered peer or the connection to - /// them is broken. + /// Register incoming messages and return the ones that are new and valid (according to a gossip + /// validator) and should thus be forwarded to the upper layers. pub fn on_incoming( &mut self, network: &mut dyn Network<B>, who: PeerId, messages: Vec<Vec<u8>>, - ) { + ) -> Vec<(B::Hash, TopicNotification)> { + let mut to_forward = vec![]; + if !messages.is_empty() { trace!(target: "gossip", "Received {} messages from peer {}", messages.len(), who); } @@ -335,23 +318,19 @@ impl<B: BlockT> ConsensusGossip<B> { network.report_peer(who.clone(), rep::GOSSIP_SUCCESS); if let Some(ref mut peer) = self.peers.get_mut(&who) { peer.known_messages.insert(message_hash); - if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) { - trace!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); - entry.get_mut().retain(|sink| { - if let Err(e) = sink.unbounded_send(TopicNotification { - message: message.clone(), - sender: Some(who.clone()) - }) { - trace!(target: "gossip", "Error broadcasting message notification: {:?}", e); - } - !sink.is_closed() - }); - if entry.get().is_empty() { - entry.remove_entry(); - } - } + + to_forward.push((topic, TopicNotification { + message: message.clone(), + sender: Some(who.clone()) + })); + if keep { - self.register_message_hashed(message_hash, topic, message, Some(who.clone())); + self.register_message_hashed( + message_hash, + topic, + message, + Some(who.clone()), + ); } } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); @@ -361,6 +340,8 @@ impl<B: BlockT> ConsensusGossip<B> { trace!(target:"gossip", "Discard message from peer {}", who); } } + + to_forward } /// Send all messages with given topic to a peer. @@ -437,7 +418,6 @@ impl<B: BlockT> ConsensusGossip<B> { mod tests { use std::sync::Arc; use sp_runtime::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; - use futures::executor::block_on_stream; use super::*; @@ -518,16 +498,18 @@ mod tests { } #[test] - fn message_stream_include_those_sent_before_asking_for_stream() { + fn message_stream_include_those_sent_before_asking() { let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]); + // Register message. let message = vec![4, 5, 6]; let topic = HashFor::<Block>::hash(&[1,2,3]); - consensus.register_message(topic, message.clone()); - let mut stream = block_on_stream(consensus.messages_for(topic)); - assert_eq!(stream.next(), Some(TopicNotification { message: message, sender: None })); + assert_eq!( + consensus.messages_for(topic).next(), + Some(TopicNotification { message: message, sender: None }), + ); } #[test] @@ -544,22 +526,6 @@ mod tests { assert_eq!(consensus.messages.len(), 2); } - #[test] - fn can_keep_multiple_subscribers_per_topic() { - let mut consensus = ConsensusGossip::<Block>::new(Arc::new(AllowAll), [0, 0, 0, 0]); - - let message = vec![4, 5, 6]; - let topic = HashFor::<Block>::hash(&[1, 2, 3]); - - consensus.register_message(topic, message.clone()); - - let mut stream1 = block_on_stream(consensus.messages_for(topic)); - let mut stream2 = block_on_stream(consensus.messages_for(topic)); - - assert_eq!(stream1.next(), Some(TopicNotification { message: message.clone(), sender: None })); - assert_eq!(stream2.next(), Some(TopicNotification { message, sender: None })); - } - #[test] fn peer_is_removed_on_disconnect() { struct TestNetwork; -- GitLab