diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs index 1a8ff9ebab8f74538180a24993b37f21f2491a62..f498b51460ace8cb46e6276a178e40262680896c 100644 --- a/substrate/core/finality-grandpa/src/communication.rs +++ b/substrate/core/finality-grandpa/src/communication.rs @@ -130,12 +130,12 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> { if rebroadcast { let SetId(set_id) = self.set_id; if let Some((Round(c_round), ref c_commit)) = self.last_commit { - self.network.send_commit(c_round, set_id, c_commit.clone()); + self.network.send_commit(c_round, set_id, c_commit.clone(), true); } let Round(round) = self.round_messages.0; for message in self.round_messages.1.iter().cloned() { - self.network.send_message(round, set_id, message); + self.network.send_message(round, set_id, message, true); } for (&announce_hash, &Round(round)) in &self.announcements { @@ -143,6 +143,7 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> { } } } + loop { match self.incoming_broadcast.poll().expect("UnboundedReceiver does not yield errors; qed") { Async::NotReady => return Ok(Async::NotReady), @@ -168,7 +169,7 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> { } // always send out to network. - self.network.send_commit(round.0, self.set_id.0, commit); + self.network.send_commit(round.0, self.set_id.0, commit, false); } Broadcast::Message(round, set_id, message) => { if self.set_id == set_id { @@ -182,7 +183,7 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> { } // always send out to network. - self.network.send_message(round.0, set_id.0, message); + self.network.send_message(round.0, set_id.0, message, false); } Broadcast::Announcement(round, set_id, hash) => { if self.set_id == set_id { @@ -215,7 +216,7 @@ impl<B: BlockT, N: Network<B>> Network<B> for BroadcastHandle<B, N> { self.network.messages_for(round, set_id) } - fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) { + fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>, _force: bool) { let _ = self.relay.unbounded_send(Broadcast::Message(Round(round), SetId(set_id), message)); } @@ -231,7 +232,7 @@ impl<B: BlockT, N: Network<B>> Network<B> for BroadcastHandle<B, N> { self.network.commit_messages(set_id) } - fn send_commit(&self, round: u64, set_id: u64, message: Vec<u8>) { + fn send_commit(&self, round: u64, set_id: u64, message: Vec<u8>, _force: bool) { let _ = self.relay.unbounded_send(Broadcast::Commit(Round(round), SetId(set_id), message)); } @@ -350,7 +351,7 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N> // announce our block hash to peers and propagate the // message. self.network.announce(self.round, self.set_id, target_hash); - self.network.send_message(self.round, self.set_id, message.encode()); + self.network.send_message(self.round, self.set_id, message.encode(), false); // forward the message to the inner sender. let _ = self.sender.unbounded_send(signed); @@ -526,7 +527,7 @@ impl<Block: BlockT, N: Network<Block>> Sink for CommitsOut<Block, N> { message: compact_commit, }); - self.network.send_commit(round, self.set_id, Encode::encode(&message)); + self.network.send_commit(round, self.set_id, Encode::encode(&message), false); Ok(AsyncSink::Ready) } diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index a62cceb6553625ffefd28023648ac2f823316ab3..d6eda5a0916b8783e7933d1daf27e3fb244ea1fe 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -283,6 +283,7 @@ impl TopicTracker { ); return true; } + false } } @@ -358,6 +359,7 @@ impl<Block: BlockT> GossipValidator<Block> { -> network_gossip::ValidationResult<Block::Hash> { use grandpa::Message as GrandpaMessage; + if self.is_expired(full.round, full.set_id) { return network_gossip::ValidationResult::Expired; } @@ -392,6 +394,7 @@ impl<Block: BlockT> GossipValidator<Block> { let precommits_signed_by: Vec<String> = full.message.auth_data.iter().map(move |(_, a)| { format!("{}", a) }).collect(); + telemetry!(CONSENSUS_INFO; "afg.received_commit_msg"; "contains_precommits_signed_by" => ?precommits_signed_by, "round" => ?full.round, @@ -442,7 +445,7 @@ pub trait Network<Block: BlockT>: Clone { fn messages_for(&self, round: u64, set_id: u64) -> Self::In; /// Send a message at a specific round out. - fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>); + fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>, force: bool); /// Clean up messages for a round. fn drop_round_messages(&self, round: u64, set_id: u64); @@ -455,7 +458,7 @@ pub trait Network<Block: BlockT>: Clone { fn commit_messages(&self, set_id: u64) -> Self::In; /// Send message over the commit channel. - fn send_commit(&self, round: u64, set_id: u64, message: Vec<u8>); + fn send_commit(&self, round: u64, set_id: u64, message: Vec<u8>, force: bool); /// Inform peers that a block with given hash should be downloaded. fn announce(&self, round: u64, set_id: u64, block: Block::Hash); @@ -508,9 +511,9 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B NetworkStream { outer: rx, inner: None } } - fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) { + fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>, force: bool) { let topic = message_topic::<B>(round, set_id); - self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message); + self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message, force); } fn drop_round_messages(&self, round: u64, set_id: u64) { @@ -533,9 +536,9 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B NetworkStream { outer: rx, inner: None } } - fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) { + fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>, force: bool) { let topic = commit_topic::<B>(set_id); - self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message); + self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message, force); } fn announce(&self, round: u64, _set_id: u64, block: B::Hash) { diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index a6a9f32a9d4994f751b3f3e07703eac0c12e5180..6daddea562450822a82c7bb512d84c947432018f 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -193,9 +193,10 @@ impl Network<Block> for MessageRouting { Box::new(messages) } - fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) { + fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>, force: bool) { let inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), GRANDPA_ENGINE_ID, message); + inner.peer(self.peer_id) + .gossip_message(make_topic(round, set_id), GRANDPA_ENGINE_ID, message, force); } fn drop_round_messages(&self, round: u64, set_id: u64) { @@ -214,7 +215,7 @@ impl Network<Block> for MessageRouting { self.validator.note_set(set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); - let messages = peer.consensus_gossip_messages_for( + let messages = peer.consensus_gossip_messages_for( GRANDPA_ENGINE_ID, make_commit_topic(set_id), ); @@ -226,9 +227,10 @@ impl Network<Block> for MessageRouting { Box::new(messages) } - fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) { + fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>, force: bool) { let inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), GRANDPA_ENGINE_ID, message); + inner.peer(self.peer_id) + .gossip_message(make_commit_topic(set_id), GRANDPA_ENGINE_ID, message, force); } fn announce(&self, _round: u64, _set_id: u64, _block: H256) { diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index 72a384e97a6fd69c19fabe8a32d542f9fc07286c..4d45f261439d7edd398aa30319003ce894271479 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -19,7 +19,6 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use std::time::{Instant, Duration}; use log::{trace, debug}; use futures::sync::mpsc; use rand::{self, seq::SliceRandom}; @@ -32,7 +31,6 @@ use crate::config::Roles; use crate::ConsensusEngineId; // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 -const MESSAGE_LIFETIME: Duration = Duration::from_secs(120); const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096; struct PeerConsensus<H> { @@ -50,7 +48,6 @@ struct MessageEntry<B: BlockT> { message_hash: B::Hash, topic: B::Hash, message: ConsensusMessage, - timestamp: Instant, status: Status, } @@ -115,11 +112,9 @@ impl<B: BlockT> ConsensusGossip<B> { pub fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex, roles: Roles) { if roles.intersects(Roles::AUTHORITY) { trace!(target:"gossip", "Registering {:?} {}", roles, who); - let now = Instant::now(); // Send out all known messages to authorities. let mut known_messages = HashSet::new(); for entry in self.messages.iter() { - if entry.timestamp + MESSAGE_LIFETIME < now { continue } if let Status::Future = entry.status { continue } known_messages.insert(entry.message_hash); @@ -143,12 +138,13 @@ impl<B: BlockT> ConsensusGossip<B> { protocol: &mut Context<B>, message_hash: B::Hash, get_message: F, + force: bool, ) where F: Fn() -> ConsensusMessage, { let mut non_authorities: Vec<_> = self.peers.iter() .filter_map(|(id, ref peer)| - if !peer.is_authority && !peer.known_messages.contains(&message_hash) { + if !peer.is_authority && (!peer.known_messages.contains(&message_hash) || force) { Some(*id) } else { None @@ -165,17 +161,15 @@ impl<B: BlockT> ConsensusGossip<B> { for (id, ref mut peer) in self.peers.iter_mut() { if peer.is_authority { - if peer.known_messages.insert(message_hash.clone()) { + if peer.known_messages.insert(message_hash.clone()) || force { let message = get_message(); trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message); protocol.send_message(*id, Message::Consensus(message)); } } else if non_authorities.contains(&id) { - if peer.known_messages.insert(message_hash.clone()) { - let message = get_message(); - trace!(target:"gossip", "Propagating to {}: {:?}", id, message); - protocol.send_message(*id, Message::Consensus(message)); - } + let message = get_message(); + trace!(target:"gossip", "Propagating to {}: {:?}", id, message); + protocol.send_message(*id, Message::Consensus(message)); } } } @@ -194,7 +188,6 @@ impl<B: BlockT> ConsensusGossip<B> { topic, message_hash, message: get_message(), - timestamp: Instant::now(), status, }); } @@ -218,7 +211,6 @@ impl<B: BlockT> ConsensusGossip<B> { let known_messages = &mut self.known_messages; let before = self.messages.len(); let validators = &self.validators; - let now = Instant::now(); let mut check_fns = HashMap::new(); let mut message_expired = move |entry: &MessageEntry<B>| { @@ -234,9 +226,7 @@ impl<B: BlockT> ConsensusGossip<B> { (check_fn)(entry.topic, &entry.message.data) }; - self.messages.retain(|entry| - entry.timestamp + MESSAGE_LIFETIME >= now && !message_expired(entry) - ); + self.messages.retain(|entry| !message_expired(entry)); trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", before - self.messages.len(), @@ -332,7 +322,7 @@ impl<B: BlockT> ConsensusGossip<B> { Some(ValidationResult::Expired) => { trace!(target:"gossip", "Ignored expired message from {}", who); return None; - } + }, None => { protocol.report_peer( who, @@ -341,7 +331,7 @@ impl<B: BlockT> ConsensusGossip<B> { trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who); return None; - } + }, }; peer.known_messages.insert(message_hash); @@ -357,7 +347,7 @@ impl<B: BlockT> ConsensusGossip<B> { entry.remove_entry(); } } - self.multicast_inner(protocol, message_hash, topic, status, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, status, || message.clone(), false); Some((topic, message)) } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); @@ -371,9 +361,10 @@ impl<B: BlockT> ConsensusGossip<B> { protocol: &mut Context<B>, topic: B::Hash, message: ConsensusMessage, + force: bool, ) { let message_hash = HashFor::<B>::hash(&message.data); - self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone()); + self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone(), force); } fn multicast_inner<F>( @@ -383,25 +374,20 @@ impl<B: BlockT> ConsensusGossip<B> { topic: B::Hash, status: Status, get_message: F, + force: bool, ) where F: Fn() -> ConsensusMessage { self.register_message(message_hash, topic, status, &get_message); if let Status::Live = status { - self.propagate(protocol, message_hash, get_message); + self.propagate(protocol, message_hash, get_message, force); } } - - /// Note new consensus session. - pub fn new_session(&mut self, _parent_hash: B::Hash) { - self.collect_garbage(); - } } #[cfg(test)] mod tests { use runtime_primitives::testing::{H256, Block as RawBlock, ExtrinsicWrapper}; - use std::time::Instant; use futures::Stream; use super::*; @@ -409,13 +395,12 @@ mod tests { type Block = RawBlock<ExtrinsicWrapper<u64>>; macro_rules! push_msg { - ($consensus:expr, $topic:expr, $hash: expr, $now: expr, $m:expr) => { + ($consensus:expr, $topic:expr, $hash: expr, $m:expr) => { if $consensus.known_messages.insert($hash, ()).is_none() { $consensus.messages.push(MessageEntry { topic: $topic, message_hash: $hash, - message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]}, - timestamp: $now, + message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0] }, status: Status::Live, }); } @@ -450,9 +435,8 @@ mod tests { let m1 = vec![1, 2, 3]; let m2 = vec![4, 5, 6]; - let now = Instant::now(); - push_msg!(consensus, prev_hash, m1_hash, now, m1); - push_msg!(consensus, best_hash, m2_hash, now, m2.clone()); + push_msg!(consensus, prev_hash, m1_hash, m1); + push_msg!(consensus, best_hash, m2_hash, m2.clone()); consensus.known_messages.insert(m1_hash, ()); consensus.known_messages.insert(m2_hash, ()); @@ -470,15 +454,6 @@ mod tests { // known messages are only pruned based on size. assert_eq!(consensus.known_messages.len(), 2); assert!(consensus.known_messages.contains_key(&m2_hash)); - - // make timestamp expired, but the message is still kept as known - consensus.messages.clear(); - consensus.known_messages.clear(); - consensus.register_validator(test_engine_id, Arc::new(AllowAll)); - push_msg!(consensus, best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone()); - consensus.collect_garbage(); - assert!(consensus.messages.is_empty()); - assert_eq!(consensus.known_messages.len(), 1); } #[test] diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 9636d1bb14ff48f823fc27c6d1a74ac4b5742b1c..bcfd1d07e53e8ca3e08b0386e1f748a3aa3b89d1 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -240,7 +240,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> { /// Execute a closure with the consensus gossip. ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>), /// Incoming gossip consensus message. - GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>), + GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>, bool), /// Tell protocol to abort sync (does not stop protocol). /// Only used in tests. #[cfg(any(test, feature = "test-helpers"))] @@ -380,8 +380,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { ProtocolContext::new(&mut self.context_data, &self.network_chan); task.call_box(&mut self.consensus_gossip, &mut context); } - ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => { - self.gossip_consensus_message(topic, engine_id, message) + ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, force) => { + self.gossip_consensus_message(topic, engine_id, message, force) } ProtocolMsg::BlocksProcessed(hashes, has_error) => { self.sync.blocks_processed(hashes, has_error); @@ -503,11 +503,18 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { ); } - fn gossip_consensus_message(&mut self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) { + fn gossip_consensus_message( + &mut self, + topic: B::Hash, + engine_id: ConsensusEngineId, + message: Vec<u8>, + force: bool, + ) { self.consensus_gossip.multicast( &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), topic, ConsensusMessage{ data: message, engine_id }, + force, ); } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 6a6291af3f5bd5626cf5e104ccc5c7a0e0846529..714192b7f06aadff8bb4c2f754d96169228f4bba 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -247,11 +247,17 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { } /// Send a consensus message through the gossip - pub fn gossip_consensus_message(&self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) { + pub fn gossip_consensus_message( + &self, + topic: B::Hash, + engine_id: ConsensusEngineId, + message: Vec<u8>, + force: bool, + ) { let _ = self .protocol_sender .send(ProtocolMsg::GossipConsensusMessage( - topic, engine_id, message, + topic, engine_id, message, force, )); } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 5fa13b58a8a434d89394c7fdd2c6a5923178ed80..afdac863ab9a371b6dcd78a168ee2021d6e40a43 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -355,10 +355,16 @@ impl<D, S: NetworkSpecialization<Block> + Clone> Peer<D, S> { /// Push a message into the gossip network and relay to peers. /// `TestNet::sync_step` needs to be called to ensure it's propagated. - pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, engine_id: ConsensusEngineId, data: Vec<u8>) { + pub fn gossip_message( + &self, + topic: <Block as BlockT>::Hash, + engine_id: ConsensusEngineId, + data: Vec<u8>, + force: bool, + ) { let _ = self .protocol_sender - .send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data)); + .send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, force)); } pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: <Block as BlockT>::Hash) {