diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index c1bfeb0d08e901a940bc5d4775403378b3287a68..08e11146bb525172f783baad01bad3130eb529e6 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -253,6 +253,42 @@ struct TopicTracker { set_id: u64, } +impl TopicTracker { + fn is_expired(&self, round: u64, set_id: u64) -> bool { + if set_id < self.set_id { + trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, self.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_set_id"; + "set_id" => ?set_id, "ours" => ?self.set_id + ); + return true; + } else if set_id == self.set_id + 1 { + // allow a few first rounds of future set. + if round > MESSAGE_ROUND_TOLERANCE { + trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, self.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set"; + "round" => ?round, "ours" => ?self.set_id + ); + return true; + } + } else if set_id == self.set_id { + if round < self.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { + trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, self.min_live_round, self.max_round); + telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob"; + "round" => ?round, "our_min_live_round" => ?self.min_live_round, "our_max_round" => ?self.max_round + ); + return true; + } + } else { + trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, self.set_id); + telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set"; + "set_id" => ?set_id, "ours" => ?self.set_id + ); + return true; + } + false + } +} + struct GossipValidator<Block: BlockT> { rounds: parking_lot::RwLock<TopicTracker>, _marker: ::std::marker::PhantomData<Block>, @@ -294,38 +330,7 @@ impl<Block: BlockT> GossipValidator<Block> { } fn is_expired(&self, round: u64, set_id: u64) -> bool { - let rounds = self.rounds.read(); - if set_id < rounds.set_id { - trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id); - telemetry!(CONSENSUS_TRACE; "afg.expired_set_id"; - "set_id" => ?set_id, "ours" => ?rounds.set_id - ); - return true; - } else if set_id == rounds.set_id + 1 { - // allow a few first rounds of future set. - if round > MESSAGE_ROUND_TOLERANCE { - trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id); - telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set"; - "round" => ?round, "ours" => ?rounds.set_id - ); - return true; - } - } else if set_id == rounds.set_id { - if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { - trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round); - telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob"; - "round" => ?round, "our_min_live_round" => ?rounds.min_live_round, "our_max_round" => ?rounds.max_round - ); - return true; - } - } else { - trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id); - telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set"; - "set_id" => ?set_id, "ours" => ?rounds.set_id - ); - return true; - } - false + self.rounds.read().is_expired(round, set_id) } fn validate_round_message(&self, full: VoteOrPrecommitMessage<Block>) @@ -401,6 +406,18 @@ impl<Block: BlockT> network_gossip::Validator<Block::Hash> for GossipValidator<B } } } + + fn message_expired<'a>(&'a self) -> Box<FnMut(Block::Hash, &[u8]) -> bool + 'a> { + let rounds = self.rounds.read(); + Box::new(move |_topic, mut data| { + match GossipMessage::<Block>::decode(&mut data) { + None => true, + Some(GossipMessage::Commit(full)) => rounds.is_expired(full.round, full.set_id), + Some(GossipMessage::VoteOrPrecommit(full)) => + rounds.is_expired(full.round, full.set_id), + } + }) + } } /// A handle to the network. This is generally implemented by providing some @@ -476,7 +493,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B self.validator.note_round(round, set_id); let (tx, rx) = sync::oneshot::channel(); self.service.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(message_topic::<B>(round, set_id)); + let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, message_topic::<B>(round, set_id)); let _ = tx.send(inner_rx); }); NetworkStream { outer: rx, inner: None } @@ -501,7 +518,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B self.validator.note_set(set_id); let (tx, rx) = sync::oneshot::channel(); self.service.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(commit_topic::<B>(set_id)); + let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, commit_topic::<B>(set_id)); let _ = tx.send(inner_rx); }); NetworkStream { outer: rx, inner: None } diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 733421c4df61247b44ef12f58c02b48cd90cb252..4821686f38bea64a0b4f967df89dcd381a7cb947 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -181,7 +181,10 @@ impl Network<Block> for MessageRouting { self.validator.note_round(round, set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id)); + let messages = peer.consensus_gossip_messages_for( + GRANDPA_ENGINE_ID, + make_topic(round, set_id), + ); let messages = messages.map_err( move |_| panic!("Messages for round {} dropped too early", round) @@ -211,7 +214,10 @@ impl Network<Block> for MessageRouting { self.validator.note_set(set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id)); + let messages = peer.consensus_gossip_messages_for( + GRANDPA_ENGINE_ID, + make_commit_topic(set_id), + ); let messages = messages.map_err( move |_| panic!("Commit messages for set {} dropped too early", set_id) diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index 720cf55a67b75e3697bcd96b80b6274cc7b6c9bc..8ed6f8ba58f75ac526081542b77a152555e9744c 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -40,17 +40,26 @@ struct PeerConsensus<H> { is_authority: bool, } +#[derive(Clone, Copy)] +enum Status { + Live, + Future, +} + struct MessageEntry<B: BlockT> { message_hash: B::Hash, topic: B::Hash, message: ConsensusMessage, timestamp: Instant, + status: Status, } /// Message validation result. pub enum ValidationResult<H> { /// Message is valid with this topic. Valid(H), + /// Message is future with this topic. + Future(H), /// Invalid message. Invalid, /// Obsolete message. @@ -61,12 +70,20 @@ pub enum ValidationResult<H> { pub trait Validator<H> { /// Validate consensus message. fn validate(&self, data: &[u8]) -> ValidationResult<H>; + + /// Produce a closure for validating messages on a given topic. + fn message_expired<'a>(&'a self) -> Box<FnMut(H, &[u8]) -> bool + 'a> { + Box::new(move |_topic, data| match self.validate(data) { + ValidationResult::Valid(_) | ValidationResult::Future(_) => false, + ValidationResult::Invalid | ValidationResult::Expired => true, + }) + } } /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip<B: BlockT> { peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>, - live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<Vec<u8>>>>, + live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec<mpsc::UnboundedSender<Vec<u8>>>>, messages: Vec<MessageEntry<B>>, known_messages: LruCache<B::Hash, ()>, validators: HashMap<ConsensusEngineId, Arc<Validator<B::Hash>>>, @@ -102,7 +119,9 @@ impl<B: BlockT> ConsensusGossip<B> { // Send out all known messages to authorities. let mut known_messages = HashSet::new(); for entry in self.messages.iter() { - if entry.timestamp + MESSAGE_LIFETIME < now { continue }; + if entry.timestamp + MESSAGE_LIFETIME < now { continue } + if let Status::Future = entry.status { continue } + known_messages.insert(entry.message_hash); protocol.send_message(who, Message::Consensus(entry.message.clone())); } @@ -161,18 +180,23 @@ impl<B: BlockT> ConsensusGossip<B> { } } - fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F) + fn register_message<F>( + &mut self, + message_hash: B::Hash, + topic: B::Hash, + status: Status, + get_message: F, + ) where F: Fn() -> ConsensusMessage { - if self.known_messages.insert(message_hash, ()).is_none() - { + if self.known_messages.insert(message_hash, ()).is_none() { self.messages.push(MessageEntry { topic, message_hash, message: get_message(), timestamp: Instant::now(), + status, }); - } } @@ -184,6 +208,8 @@ impl<B: BlockT> ConsensusGossip<B> { /// Prune old or no longer relevant consensus messages. Provide a predicate /// for pruning, which returns `false` when the items with a given topic should be pruned. pub fn collect_garbage(&mut self) { + use std::collections::hash_map::Entry; + self.live_message_sinks.retain(|_, sinks| { sinks.retain(|sink| !sink.is_closed()); !sinks.is_empty() @@ -194,15 +220,23 @@ impl<B: BlockT> ConsensusGossip<B> { let validators = &self.validators; let now = Instant::now(); - self.messages.retain(|entry| { - entry.timestamp + MESSAGE_LIFETIME >= now - && match validators.get(&entry.message.engine_id) - .map(|v| v.validate(&entry.message.data)) - { - Some(ValidationResult::Valid(_)) => true, - _ => false, - } - }); + let mut check_fns = HashMap::new(); + let mut message_expired = move |entry: &MessageEntry<B>| { + let engine_id = entry.message.engine_id; + let check_fn = match check_fns.entry(engine_id) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(vacant) => match validators.get(&engine_id) { + None => return true, // treat all messages with no validator as expired + Some(validator) => vacant.insert(validator.message_expired()), + } + }; + + (check_fn)(entry.topic, &entry.message.data) + }; + + self.messages.retain(|entry| + entry.timestamp + MESSAGE_LIFETIME >= now && !message_expired(entry) + ); trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", before - self.messages.len(), @@ -216,12 +250,46 @@ impl<B: BlockT> ConsensusGossip<B> { } /// Get data of valid, incoming messages for a topic (but might have expired meanwhile) - pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<Vec<u8>> { + pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash) + -> mpsc::UnboundedReceiver<Vec<u8>> + { let (tx, rx) = mpsc::unbounded(); - for entry in self.messages.iter().filter(|e| e.topic == topic) { - tx.unbounded_send(entry.message.data.clone()).expect("receiver known to be live; qed"); + + let validator = match self.validators.get(&engine_id) { + None => { + self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); + return rx; + } + Some(v) => v, + }; + + for entry in self.messages.iter_mut() + .filter(|e| e.topic == topic && e.message.engine_id == engine_id) + { + let live = match entry.status { + Status::Live => true, + Status::Future => match validator.validate(&entry.message.data) { + ValidationResult::Valid(_) => { + entry.status = Status::Live; + true + } + _ => { + // don't send messages considered to be future still. + // if messages are considered expired they'll be cleaned up when we + // collect garbage. + false + } + } + }; + + if live { + entry.status = Status::Live; + tx.unbounded_send(entry.message.data.clone()) + .expect("receiver known to be live; qed"); + } } - self.live_message_sinks.entry(topic).or_default().push(tx); + + self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); rx } @@ -247,11 +315,13 @@ impl<B: BlockT> ConsensusGossip<B> { if let Some(ref mut peer) = self.peers.get_mut(&who) { use std::collections::hash_map::Entry; + let engine_id = message.engine_id; //validate the message - let topic = match self.validators.get(&message.engine_id) + let (topic, status) = match self.validators.get(&engine_id) .map(|v| v.validate(&message.data)) { - Some(ValidationResult::Valid(topic)) => topic, + Some(ValidationResult::Valid(topic)) => (topic, Status::Live), + Some(ValidationResult::Future(topic)) => (topic, Status::Future), Some(ValidationResult::Invalid) => { trace!(target:"gossip", "Invalid message from {}", who); protocol.report_peer( @@ -275,13 +345,14 @@ impl<B: BlockT> ConsensusGossip<B> { who, Severity::Useless(format!("Sent unknown consensus engine id")), ); - trace!(target:"gossip", "Unknown message engine id {:?} from {}", message.engine_id, who); + trace!(target:"gossip", "Unknown message engine id {:?} from {}", + engine_id, who); return None; } }; peer.known_messages.insert(message_hash); - if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) { + if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); entry.get_mut().retain(|sink| { if let Err(e) = sink.unbounded_send(message.data.clone()) { @@ -293,7 +364,7 @@ impl<B: BlockT> ConsensusGossip<B> { entry.remove_entry(); } } - self.multicast_inner(protocol, message_hash, topic, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, status, || message.clone()); Some((topic, message)) } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); @@ -309,7 +380,7 @@ impl<B: BlockT> ConsensusGossip<B> { message: ConsensusMessage, ) { let message_hash = HashFor::<B>::hash(&message.data); - self.multicast_inner(protocol, message_hash, topic, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone()); } fn multicast_inner<F>( @@ -317,12 +388,15 @@ impl<B: BlockT> ConsensusGossip<B> { protocol: &mut Context<B>, message_hash: B::Hash, topic: B::Hash, + status: Status, get_message: F, ) where F: Fn() -> ConsensusMessage { - self.register_message(message_hash, topic, &get_message); - self.propagate(protocol, message_hash, get_message); + self.register_message(message_hash, topic, status, &get_message); + if let Status::Live = status { + self.propagate(protocol, message_hash, get_message); + } } /// Note new consensus session. @@ -335,6 +409,8 @@ impl<B: BlockT> ConsensusGossip<B> { mod tests { use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; use std::time::Instant; + use futures::Stream; + use super::*; type Block = RawBlock<ExtrinsicWrapper<u64>>; @@ -347,21 +423,21 @@ mod tests { message_hash: $hash, message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]}, timestamp: $now, + status: Status::Live, }); } } } - #[test] - fn collects_garbage() { - - struct AllowAll; - impl Validator<H256> for AllowAll { - fn validate(&self, _data: &[u8]) -> ValidationResult<H256> { - ValidationResult::Valid(H256::default()) - } + struct AllowAll; + impl Validator<H256> for AllowAll { + fn validate(&self, _data: &[u8]) -> ValidationResult<H256> { + ValidationResult::Valid(H256::default()) } + } + #[test] + fn collects_garbage() { struct AllowOne; impl Validator<H256> for AllowOne { fn validate(&self, data: &[u8]) -> ValidationResult<H256> { @@ -417,14 +493,15 @@ mod tests { use futures::Stream; let mut consensus = ConsensusGossip::<Block>::new(); + consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message_hash = HashFor::<Block>::hash(&message.data); let topic = HashFor::<Block>::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, || message.clone()); - let stream = consensus.messages_for(topic); + consensus.register_message(message_hash, topic, Status::Live, || message.clone()); + let stream = consensus.messages_for([0, 0, 0, 0], topic); assert_eq!(stream.wait().next(), Some(Ok(message.data))); } @@ -437,29 +514,47 @@ mod tests { let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, || msg_a.clone()); - consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, || msg_b.clone()); + consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, Status::Live, || msg_a.clone()); + consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, Status::Live, || msg_b.clone()); assert_eq!(consensus.messages.len(), 2); } #[test] fn can_keep_multiple_subscribers_per_topic() { - use futures::Stream; - let mut consensus = ConsensusGossip::<Block>::new(); + consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message_hash = HashFor::<Block>::hash(&message.data); let topic = HashFor::<Block>::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, || message.clone()); + consensus.register_message(message_hash, topic, Status::Live, || message.clone()); - let stream1 = consensus.messages_for(topic); - let stream2 = consensus.messages_for(topic); + let stream1 = consensus.messages_for([0, 0, 0, 0], topic); + let stream2 = consensus.messages_for([0, 0, 0, 0], topic); assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone()))); assert_eq!(stream2.wait().next(), Some(Ok(message.data))); } + + #[test] + fn topics_are_localized_to_engine_id() { + let mut consensus = ConsensusGossip::<Block>::new(); + consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); + + let topic = [1; 32].into(); + let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; + let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] }; + + consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, Status::Live, || msg_a.clone()); + consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, Status::Live, || msg_b.clone()); + + let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait(); + + assert_eq!(stream.next(), Some(Ok(vec![1, 2, 3]))); + let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic)); + assert_eq!(stream.next(), None); + } } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 47750080a6bec40f30350f52dd89af7340794c60..3adf917a5cf21370a331ee91a8e7817118616775 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -369,11 +369,12 @@ impl<D, S: NetworkSpecialization<Block> + Clone> Peer<D, S> { /// access the underlying consensus gossip handler pub fn consensus_gossip_messages_for( &self, + engine_id: ConsensusEngineId, topic: <Block as BlockT>::Hash, ) -> mpsc::UnboundedReceiver<Vec<u8>> { let (tx, rx) = oneshot::channel(); self.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(topic); + let inner_rx = gossip.messages_for(engine_id, topic); let _ = tx.send(inner_rx); }); rx.wait().ok().expect("1. Network is running, 2. it should handle the above closure successfully")