From b3eae17f65c9f5b0259ea4138a78879459ff0bde Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Wed, 20 Feb 2019 12:40:03 +0100 Subject: [PATCH] Gossip refactoring (#1811) * First part of gossip protocol refactoring * Message validation in GRANDPA * Reverted to time-based expiration for future round messages * Removed collect_garbage_for_topic * Use consensus engine id instead of kind * Use try_init Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Comment Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Added expiration check on broadcast Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Apply suggestions from code review Co-Authored-By: arkpar <arkady.paronyan@gmail.com> * Style * Style --- .../finality-grandpa/src/communication.rs | 84 ++--- .../core/finality-grandpa/src/environment.rs | 2 - substrate/core/finality-grandpa/src/lib.rs | 199 ++++++++++- substrate/core/finality-grandpa/src/tests.rs | 44 ++- .../core/network/src/consensus_gossip.rs | 326 +++++++++--------- substrate/core/network/src/lib.rs | 2 +- substrate/core/network/src/message.rs | 19 +- substrate/core/network/src/protocol.rs | 29 +- substrate/core/network/src/service.rs | 6 +- substrate/core/network/src/test/mod.rs | 14 +- 10 files changed, 456 insertions(+), 269 deletions(-) diff --git a/substrate/core/finality-grandpa/src/communication.rs b/substrate/core/finality-grandpa/src/communication.rs index 4bfbe4c74e2..18fbcbfeb46 100644 --- a/substrate/core/finality-grandpa/src/communication.rs +++ b/substrate/core/finality-grandpa/src/communication.rs @@ -28,7 +28,8 @@ use parity_codec::{Encode, Decode}; use substrate_primitives::{ed25519, Ed25519AuthorityId}; use runtime_primitives::traits::Block as BlockT; use tokio::timer::Interval; -use crate::{Error, Network, Message, SignedMessage, Commit, CompactCommit}; +use crate::{Error, Network, Message, SignedMessage, Commit, + CompactCommit, GossipMessage, FullCommitMessage, VoteOrPrecommitMessage}; fn localized_payload<E: Encode>(round: u64, set_id: u64, message: &E) -> Vec<u8> { (message, round, set_id).encode() @@ -259,8 +260,6 @@ pub(crate) fn check_message_sig<Block: BlockT>( /// converts a message stream into a stream of signed messages. /// the output stream checks signatures also. pub(crate) fn checked_message_stream<Block: BlockT, S>( - round: u64, - set_id: u64, inner: S, voters: Arc<VoterSet<Ed25519AuthorityId>>, ) @@ -269,28 +268,27 @@ pub(crate) fn checked_message_stream<Block: BlockT, S>( { inner .filter_map(|raw| { - let decoded = SignedMessage::<Block>::decode(&mut &raw[..]); + let decoded = GossipMessage::<Block>::decode(&mut &raw[..]); if decoded.is_none() { debug!(target: "afg", "Skipping malformed message {:?}", raw); } decoded }) .and_then(move |msg| { - // check signature. - if !voters.contains_key(&msg.id) { - debug!(target: "afg", "Skipping message from unknown voter {}", msg.id); - return Ok(None); + match msg { + GossipMessage::VoteOrPrecommit(msg) => { + // check signature. + if !voters.contains_key(&msg.message.id) { + debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); + return Ok(None); + } + Ok(Some(msg.message)) + } + _ => { + debug!(target: "afg", "Skipping unknown message type"); + return Ok(None); + } } - - // we ignore messages where the signature doesn't check out. - let res = check_message_sig::<Block>( - &msg.message, - &msg.id, - &msg.signature, - round, - set_id - ); - Ok(res.map(move |()| msg).ok()) }) .filter_map(|x| x) .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) @@ -322,10 +320,16 @@ impl<Block: BlockT, N: Network<Block>> Sink for OutgoingMessages<Block, N> id: local_id, }; + let message = GossipMessage::VoteOrPrecommit(VoteOrPrecommitMessage::<Block> { + message: signed.clone(), + round: self.round, + set_id: self.set_id, + }); + // announce our block hash to peers and propagate the // message. self.network.announce(self.round, self.set_id, target_hash); - self.network.send_message(self.round, self.set_id, signed.encode()); + self.network.send_message(self.round, self.set_id, message.encode()); // forward the message to the inner sender. let _ = self.sender.unbounded_send(signed); @@ -392,34 +396,18 @@ pub(crate) fn outgoing_messages<Block: BlockT, N: Network<Block>>( fn check_compact_commit<Block: BlockT>( msg: CompactCommit<Block>, voters: &VoterSet<Ed25519AuthorityId>, - round: u64, - set_id: u64, ) -> Option<CompactCommit<Block>> { - use grandpa::Message as GrandpaMessage; if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() { debug!(target: "afg", "Skipping malformed compact commit"); return None; } // check signatures on all contained precommits. - for (precommit, &(ref sig, ref id)) in msg.precommits.iter().zip(&msg.auth_data) { + for (_, ref id) in &msg.auth_data { if !voters.contains_key(id) { debug!(target: "afg", "Skipping commit containing unknown voter {}", id); return None; } - - let res = check_message_sig::<Block>( - &GrandpaMessage::Precommit(precommit.clone()), - id, - sig, - round, - set_id, - ); - - if let Err(()) = res { - debug!(target: "afg", "Skipping commit containing bad message"); - return None; - } } Some(msg) @@ -428,7 +416,6 @@ fn check_compact_commit<Block: BlockT>( /// A stream for incoming commit messages. This checks all the signatures on the /// messages. pub(crate) fn checked_commit_stream<Block: BlockT, S>( - set_id: u64, inner: S, voters: Arc<VoterSet<Ed25519AuthorityId>>, ) @@ -438,14 +425,23 @@ pub(crate) fn checked_commit_stream<Block: BlockT, S>( inner .filter_map(|raw| { // this could be optimized by decoding piecewise. - let decoded = <(u64, CompactCommit<Block>)>::decode(&mut &raw[..]); + let decoded = GossipMessage::<Block>::decode(&mut &raw[..]); if decoded.is_none() { trace!(target: "afg", "Skipping malformed commit message {:?}", raw); } decoded }) - .filter_map(move |(round, msg)| { - check_compact_commit::<Block>(msg, &*voters, round, set_id).map(move |c| (round, c)) + .filter_map(move |msg| { + match msg { + GossipMessage::Commit(msg) => { + let round = msg.round; + check_compact_commit::<Block>(msg.message, &*voters).map(move |c| (round, c)) + }, + _ => { + debug!(target: "afg", "Skipping unknown message type"); + return None; + } + } }) .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } @@ -491,7 +487,13 @@ impl<Block: BlockT, N: Network<Block>> Sink for CommitsOut<Block, N> { auth_data }; - self.network.send_commit(round, self.set_id, Encode::encode(&(round, compact_commit))); + let message = FullCommitMessage::<Block> { + round: round, + set_id: self.set_id, + message: compact_commit, + }; + + self.network.send_commit(round, self.set_id, Encode::encode(&message)); Ok(AsyncSink::Ready) } diff --git a/substrate/core/finality-grandpa/src/environment.rs b/substrate/core/finality-grandpa/src/environment.rs index d210f0fa8b0..71424e8be91 100644 --- a/substrate/core/finality-grandpa/src/environment.rs +++ b/substrate/core/finality-grandpa/src/environment.rs @@ -248,8 +248,6 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb let precommit_timer = Delay::new(now + self.config.gossip_duration * 4); let incoming = crate::communication::checked_message_stream::<Block, _>( - round, - self.set_id, self.network.messages_for(round, self.set_id), self.voters.clone(), ); diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 5400ae0dd01..ad00980d3c4 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -53,7 +53,7 @@ //! included in the newly-finalized chain. use futures::prelude::*; -use log::{debug, info, warn}; +use log::{debug, info, warn, trace}; use futures::sync::{self, mpsc, oneshot}; use client::{ BlockchainEvents, CallExecutor, Client, backend::Backend, @@ -61,6 +61,7 @@ use client::{ }; use client::blockchain::HeaderBackend; use parity_codec::{Encode, Decode}; +use parity_codec_derive::{Encode, Decode}; use runtime_primitives::traits::{ NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT, DigestItemFor, DigestItem, @@ -73,6 +74,7 @@ use grandpa::Error as GrandpaError; use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet}; use network::Service as NetworkService; +use network::consensus_gossip as network_gossip; use std::sync::Arc; use std::time::Duration; @@ -106,6 +108,10 @@ const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round"; const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters"; const CONSENSUS_CHANGES_KEY: &[u8] = b"grandpa_consensus_changes"; +const GRANDPA_ENGINE_ID: network::ConsensusEngineId = [b'a', b'f', b'g', b'1']; + +const MESSAGE_ROUND_TOLERANCE: u64 = 2; + /// round-number, round-state type LastCompleted<H, N> = (u64, RoundState<H, N>); @@ -118,6 +124,25 @@ pub type SignedMessage<Block> = grandpa::SignedMessage< ed25519::Signature, Ed25519AuthorityId, >; + +/// Grandpa gossip message type. +/// This is the root type that gets encoded and sent on the network. +#[derive(Debug, Encode, Decode)] +pub enum GossipMessage<Block: BlockT> { + /// Grandpa message with round and set info. + VoteOrPrecommit(VoteOrPrecommitMessage<Block>), + /// Grandpa commit message with round and set info. + Commit(FullCommitMessage<Block>), +} + +/// Network level message with topic information. +#[derive(Debug, Encode, Decode)] +pub struct VoteOrPrecommitMessage<Block: BlockT> { + pub round: u64, + pub set_id: u64, + pub message: SignedMessage<Block>, +} + /// A prevote message for this chain's block type. pub type Prevote<Block> = grandpa::Prevote<<Block as BlockT>::Hash, NumberFor<Block>>; /// A precommit message for this chain's block type. @@ -137,6 +162,14 @@ pub type CompactCommit<Block> = grandpa::CompactCommit< Ed25519AuthorityId >; +/// Network level commit message with topic information. +#[derive(Debug, Encode, Decode)] +pub struct FullCommitMessage<Block: BlockT> { + pub round: u64, + pub set_id: u64, + pub message: CompactCommit<Block>, +} + /// Configuration for the GRANDPA service. #[derive(Clone)] pub struct Config { @@ -213,6 +246,142 @@ impl Stream for NetworkStream { } } +struct TopicTracker { + min_live_round: u64, + max_round: u64, + set_id: u64, +} + +struct GossipValidator<Block: BlockT> { + rounds: parking_lot::RwLock<TopicTracker>, + _marker: ::std::marker::PhantomData<Block>, +} + +impl<Block: BlockT> GossipValidator<Block> { + fn new() -> GossipValidator<Block> { + GossipValidator { + rounds: parking_lot::RwLock::new(TopicTracker { + min_live_round: 0, + max_round: 0, + set_id: 0, + }), + _marker: Default::default(), + } + } + + fn note_round(&self, round: u64, set_id: u64) { + let mut rounds = self.rounds.write(); + if set_id > rounds.set_id { + rounds.set_id = set_id; + rounds.max_round = 0; + rounds.min_live_round = 0; + } + rounds.max_round = rounds.max_round.max(round); + } + + fn note_set(&self, _set_id: u64) { + } + + fn drop_round(&self, round: u64, set_id: u64) { + let mut rounds = self.rounds.write(); + if set_id == rounds.set_id && round >= rounds.min_live_round { + rounds.min_live_round = round + 1; + } + } + + fn drop_set(&self, _set_id: u64) { + } + + fn is_expired(&self, round: u64, set_id: u64) -> bool { + let rounds = self.rounds.read(); + if set_id < rounds.set_id { + trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, rounds.set_id); + return true; + } else if set_id == rounds.set_id + 1 { + // allow a few first rounds of future set. + if round > MESSAGE_ROUND_TOLERANCE { + trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, rounds.set_id); + return true; + } + } else if set_id == rounds.set_id { + if round < rounds.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { + trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, rounds.min_live_round, rounds.max_round); + return true; + } + } else { + trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, rounds.set_id); + return true; + } + false + } + + fn validate_round_message(&self, full: VoteOrPrecommitMessage<Block>) + -> network_gossip::ValidationResult<Block::Hash> + { + if self.is_expired(full.round, full.set_id) { + return network_gossip::ValidationResult::Expired; + } + + if let Err(()) = communication::check_message_sig::<Block>( + &full.message.message, + &full.message.id, + &full.message.signature, + full.round, + full.set_id + ) { + debug!(target: "afg", "Bad message signature {}", full.message.id); + return network_gossip::ValidationResult::Invalid; + } + + let topic = message_topic::<Block>(full.round, full.set_id); + network_gossip::ValidationResult::Valid(topic) + } + + fn validate_commit_message(&self, full: FullCommitMessage<Block>) + -> network_gossip::ValidationResult<Block::Hash> + { + use grandpa::Message as GrandpaMessage; + if self.is_expired(full.round, full.set_id) { + return network_gossip::ValidationResult::Expired; + } + + if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() { + debug!(target: "afg", "Malformed compact commit"); + return network_gossip::ValidationResult::Invalid; + } + + // check signatures on all contained precommits. + for (precommit, &(ref sig, ref id)) in full.message.precommits.iter().zip(&full.message.auth_data) { + if let Err(()) = communication::check_message_sig::<Block>( + &GrandpaMessage::Precommit(precommit.clone()), + id, + sig, + full.round, + full.set_id, + ) { + debug!(target: "afg", "Bad commit message signature {}", id); + return network_gossip::ValidationResult::Invalid; + } + } + + let topic = commit_topic::<Block>(full.set_id); + network_gossip::ValidationResult::Valid(topic) + } +} + +impl<Block: BlockT> network_gossip::Validator<Block::Hash> for GossipValidator<Block> { + fn validate(&self, mut data: &[u8]) -> network_gossip::ValidationResult<Block::Hash> { + match GossipMessage::<Block>::decode(&mut data) { + Some(GossipMessage::VoteOrPrecommit(message)) => self.validate_round_message(message), + Some(GossipMessage::Commit(message)) => self.validate_commit_message(message), + None => { + debug!(target: "afg", "Error decoding message"); + network_gossip::ValidationResult::Invalid + } + } + } +} + /// A handle to the network. This is generally implemented by providing some /// handle to a gossip service or similar. /// @@ -247,20 +416,27 @@ pub trait Network<Block: BlockT>: Clone { /// Bridge between NetworkService, gossiping consensus messages and Grandpa pub struct NetworkBridge<B: BlockT, S: network::specialization::NetworkSpecialization<B>> { - service: Arc<NetworkService<B, S>> + service: Arc<NetworkService<B, S>>, + validator: Arc<GossipValidator<B>>, } impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>> NetworkBridge<B, S> { /// Create a new NetworkBridge to the given NetworkService pub fn new(service: Arc<NetworkService<B, S>>) -> Self { - NetworkBridge { service } + let validator = Arc::new(GossipValidator::new()); + let v = validator.clone(); + service.with_gossip(move |gossip, _| { + gossip.register_validator(GRANDPA_ENGINE_ID, v); + }); + NetworkBridge { service, validator: validator } } } impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Clone for NetworkBridge<B, S> { fn clone(&self) -> Self { NetworkBridge { - service: Arc::clone(&self.service) + service: Arc::clone(&self.service), + validator: Arc::clone(&self.validator), } } } @@ -276,6 +452,7 @@ fn commit_topic<B: BlockT>(set_id: u64) -> B::Hash { impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B> for NetworkBridge<B, S> { type In = NetworkStream; fn messages_for(&self, round: u64, set_id: u64) -> Self::In { + self.validator.note_round(round, set_id); let (tx, rx) = sync::oneshot::channel(); self.service.with_gossip(move |gossip, _| { let inner_rx = gossip.messages_for(message_topic::<B>(round, set_id)); @@ -286,20 +463,21 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) { let topic = message_topic::<B>(round, set_id); - self.service.gossip_consensus_message(topic, message, false); + self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message); } fn drop_round_messages(&self, round: u64, set_id: u64) { - let topic = message_topic::<B>(round, set_id); - self.service.with_gossip(move |gossip, _| gossip.collect_garbage_for_topic(topic)); + self.validator.drop_round(round, set_id); + self.service.with_gossip(move |gossip, _| gossip.collect_garbage()); } fn drop_set_messages(&self, set_id: u64) { - let topic = commit_topic::<B>(set_id); - self.service.with_gossip(move |gossip, _| gossip.collect_garbage_for_topic(topic)); + self.validator.drop_set(set_id); + self.service.with_gossip(move |gossip, _| gossip.collect_garbage()); } fn commit_messages(&self, set_id: u64) -> Self::In { + self.validator.note_set(set_id); let (tx, rx) = sync::oneshot::channel(); self.service.with_gossip(move |gossip, _| { let inner_rx = gossip.messages_for(commit_topic::<B>(set_id)); @@ -310,7 +488,7 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) { let topic = commit_topic::<B>(set_id); - self.service.gossip_consensus_message(topic, message, false); + self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message); } fn announce(&self, round: u64, _set_id: u64, block: B::Hash) { @@ -439,7 +617,6 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>( { // verification stream let commit_in = crate::communication::checked_commit_stream::<Block, _>( - set_id, network.commit_messages(set_id), voters.clone(), ); diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index ac64b7d2489..029e517943a 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -139,13 +139,24 @@ impl TestNetFactory for GrandpaTestNet { struct MessageRouting { inner: Arc<Mutex<GrandpaTestNet>>, peer_id: usize, + validator: Arc<GossipValidator<Block>>, } impl MessageRouting { fn new(inner: Arc<Mutex<GrandpaTestNet>>, peer_id: usize,) -> Self { + let validator = Arc::new(GossipValidator::new()); + let v = validator.clone(); + { + let inner = inner.lock(); + let peer = inner.peer(peer_id); + peer.with_gossip(move |gossip, _| { + gossip.register_validator(GRANDPA_ENGINE_ID, v); + }); + } MessageRouting { inner, peer_id, + validator, } } @@ -157,37 +168,18 @@ impl MessageRouting { } fn make_topic(round: u64, set_id: u64) -> Hash { - let mut hash = Hash::default(); - round.using_encoded(|s| { - let raw = hash.as_mut(); - raw[..8].copy_from_slice(s); - }); - set_id.using_encoded(|s| { - let raw = hash.as_mut(); - raw[8..16].copy_from_slice(s); - }); - hash + message_topic::<Block>(round, set_id) } fn make_commit_topic(set_id: u64) -> Hash { - let mut hash = Hash::default(); - - { - let raw = hash.as_mut(); - raw[16..22].copy_from_slice(b"commit"); - } - set_id.using_encoded(|s| { - let raw = hash.as_mut(); - raw[24..].copy_from_slice(s); - }); - - hash + commit_topic::<Block>(set_id) } impl Network<Block> for MessageRouting { type In = Box<Stream<Item=Vec<u8>,Error=()> + Send>; fn messages_for(&self, round: u64, set_id: u64) -> Self::In { + self.validator.note_round(round, set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); let messages = peer.consensus_gossip_messages_for(make_topic(round, set_id)); @@ -201,20 +193,23 @@ impl Network<Block> for MessageRouting { fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) { let inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message, false); + inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), GRANDPA_ENGINE_ID, message); } fn drop_round_messages(&self, round: u64, set_id: u64) { + self.validator.drop_round(round, set_id); let topic = make_topic(round, set_id); self.drop_messages(topic); } fn drop_set_messages(&self, set_id: u64) { + self.validator.drop_set(set_id); let topic = make_commit_topic(set_id); self.drop_messages(topic); } fn commit_messages(&self, set_id: u64) -> Self::In { + self.validator.note_set(set_id); let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); let messages = peer.consensus_gossip_messages_for(make_commit_topic(set_id)); @@ -228,7 +223,7 @@ impl Network<Block> for MessageRouting { fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>) { let inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), message, false); + inner.peer(self.peer_id).gossip_message(make_commit_topic(set_id), GRANDPA_ENGINE_ID, message); } fn announce(&self, _round: u64, _set_id: u64, _block: H256) { @@ -516,6 +511,7 @@ fn finalize_3_voters_1_observer() { #[test] fn transition_3_voters_twice_1_observer() { + let _ = env_logger::try_init(); let peers_a = &[ Keyring::Alice, Keyring::Bob, diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index 68f5db835fb..720cf55a67b 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -18,20 +18,22 @@ //! Handles chain-specific and standard BFT messages. use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use std::time::{Instant, Duration}; use log::{trace, debug}; use futures::sync::mpsc; use rand::{self, seq::SliceRandom}; use lru_cache::LruCache; -use network_libp2p::NodeIndex; +use network_libp2p::{Severity, NodeIndex}; use runtime_primitives::traits::{Block as BlockT, Hash, HashFor}; pub use crate::message::generic::{Message, ConsensusMessage}; use crate::protocol::Context; use crate::config::Roles; +use crate::ConsensusEngineId; // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 const MESSAGE_LIFETIME: Duration = Duration::from_secs(120); -const DEAD_TOPICS_CACHE_SIZE: usize = 4096; +const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096; struct PeerConsensus<H> { known_messages: HashSet<H>, @@ -39,21 +41,35 @@ struct PeerConsensus<H> { } struct MessageEntry<B: BlockT> { - topic: B::Hash, message_hash: B::Hash, + topic: B::Hash, message: ConsensusMessage, - broadcast: bool, + timestamp: Instant, +} + +/// Message validation result. +pub enum ValidationResult<H> { + /// Message is valid with this topic. + Valid(H), + /// Invalid message. + Invalid, + /// Obsolete message. + Expired, +} + +/// Validates consensus messages. +pub trait Validator<H> { + /// Validate consensus message. + fn validate(&self, data: &[u8]) -> ValidationResult<H>; } /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip<B: BlockT> { - peers: HashMap<NodeIndex, PeerConsensus<(B::Hash, B::Hash)>>, - live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<ConsensusMessage>>>, + peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>, + live_message_sinks: HashMap<B::Hash, Vec<mpsc::UnboundedSender<Vec<u8>>>>, messages: Vec<MessageEntry<B>>, - known_messages: HashSet<(B::Hash, B::Hash)>, - known_dead_topics: LruCache<B::Hash, ()>, - message_times: HashMap<(B::Hash, B::Hash), Instant>, - session_start: Option<B::Hash>, + known_messages: LruCache<B::Hash, ()>, + validators: HashMap<ConsensusEngineId, Arc<Validator<B::Hash>>>, } impl<B: BlockT> ConsensusGossip<B> { @@ -63,10 +79,8 @@ impl<B: BlockT> ConsensusGossip<B> { peers: HashMap::new(), live_message_sinks: HashMap::new(), messages: Default::default(), - known_messages: Default::default(), - known_dead_topics: LruCache::new(DEAD_TOPICS_CACHE_SIZE), - message_times: Default::default(), - session_start: None + known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE), + validators: Default::default(), } } @@ -75,15 +89,22 @@ impl<B: BlockT> ConsensusGossip<B> { self.live_message_sinks.clear(); } + /// Register message validator for a message type. + pub fn register_validator(&mut self, engine_id: ConsensusEngineId, validator: Arc<Validator<B::Hash>>) { + self.validators.insert(engine_id, validator); + } + /// Handle new connected peer. pub fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex, roles: Roles) { if roles.intersects(Roles::AUTHORITY) { trace!(target:"gossip", "Registering {:?} {}", roles, who); + let now = Instant::now(); // Send out all known messages to authorities. let mut known_messages = HashSet::new(); for entry in self.messages.iter() { - known_messages.insert((entry.topic, entry.message_hash)); - protocol.send_message(who, Message::Consensus(entry.topic.clone(), entry.message.clone(), entry.broadcast)); + if entry.timestamp + MESSAGE_LIFETIME < now { continue }; + known_messages.insert(entry.message_hash); + protocol.send_message(who, Message::Consensus(entry.message.clone())); } self.peers.insert(who, PeerConsensus { known_messages, @@ -102,30 +123,18 @@ impl<B: BlockT> ConsensusGossip<B> { &mut self, protocol: &mut Context<B>, message_hash: B::Hash, - topic: B::Hash, - broadcast: bool, get_message: F, ) where F: Fn() -> ConsensusMessage, { - if broadcast { - for (id, ref mut peer) in self.peers.iter_mut() { - if peer.known_messages.insert((topic.clone(), message_hash.clone())) { - let message = get_message(); - if peer.is_authority { - trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message); - } else { - trace!(target:"gossip", "Propagating to {}: {:?}", id, message); - } - protocol.send_message(*id, Message::Consensus(topic, message, broadcast)); - } - } - - return; - } - let mut non_authorities: Vec<_> = self.peers.iter() - .filter_map(|(id, ref peer)| if !peer.is_authority && !peer.known_messages.contains(&(topic, message_hash)) { Some(*id) } else { None }) + .filter_map(|(id, ref peer)| + if !peer.is_authority && !peer.known_messages.contains(&message_hash) { + Some(*id) + } else { + None + } + ) .collect(); non_authorities.shuffle(&mut rand::thread_rng()); @@ -137,34 +146,33 @@ impl<B: BlockT> ConsensusGossip<B> { for (id, ref mut peer) in self.peers.iter_mut() { if peer.is_authority { - if peer.known_messages.insert((topic.clone(), message_hash.clone())) { + if peer.known_messages.insert(message_hash.clone()) { let message = get_message(); trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message); - protocol.send_message(*id, Message::Consensus(topic, message, broadcast)); + protocol.send_message(*id, Message::Consensus(message)); } } else if non_authorities.contains(&id) { - let message = get_message(); - trace!(target:"gossip", "Propagating to {}: {:?}", id, message); - peer.known_messages.insert((topic.clone(), message_hash.clone())); - protocol.send_message(*id, Message::Consensus(topic, message, broadcast)); + if peer.known_messages.insert(message_hash.clone()) { + let message = get_message(); + trace!(target:"gossip", "Propagating to {}: {:?}", id, message); + protocol.send_message(*id, Message::Consensus(message)); + } } } } - fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, broadcast: bool, get_message: F) + fn register_message<F>(&mut self, message_hash: B::Hash, topic: B::Hash, get_message: F) where F: Fn() -> ConsensusMessage { - if !self.known_dead_topics.contains_key(&topic) && - self.known_messages.insert((topic, message_hash)) + if self.known_messages.insert(message_hash, ()).is_none() { self.messages.push(MessageEntry { topic, message_hash, - broadcast, message: get_message(), + timestamp: Instant::now(), }); - self.message_times.insert((topic, message_hash), Instant::now()); } } @@ -173,38 +181,27 @@ impl<B: BlockT> ConsensusGossip<B> { self.peers.remove(&who); } - /// Prune all existing messages for the given topic and mark it as dead, all - /// new messages for the given topic are ignored. - pub fn collect_garbage_for_topic(&mut self, topic: B::Hash) { - self.known_dead_topics.insert(topic, ()); - self.collect_garbage(|_| true); - } - /// Prune old or no longer relevant consensus messages. Provide a predicate /// for pruning, which returns `false` when the items with a given topic should be pruned. - pub fn collect_garbage<P: Fn(&B::Hash) -> bool>(&mut self, predicate: P) { + pub fn collect_garbage(&mut self) { self.live_message_sinks.retain(|_, sinks| { sinks.retain(|sink| !sink.is_closed()); !sinks.is_empty() }); - let message_times = &mut self.message_times; let known_messages = &mut self.known_messages; - let known_dead_topics = &mut self.known_dead_topics; let before = self.messages.len(); + let validators = &self.validators; let now = Instant::now(); self.messages.retain(|entry| { - !known_dead_topics.contains_key(&entry.topic) && - message_times.get(&(entry.topic, entry.message_hash)) - .map(|instant| *instant + MESSAGE_LIFETIME >= now && predicate(&entry.topic)) - .unwrap_or(false) - }); - - known_messages.retain(|(topic, message_hash)| { - message_times.get(&(*topic, *message_hash)) - .map(|instant| *instant + (5 * MESSAGE_LIFETIME) >= now) - .unwrap_or(false) + entry.timestamp + MESSAGE_LIFETIME >= now + && match validators.get(&entry.message.engine_id) + .map(|v| v.validate(&entry.message.data)) + { + Some(ValidationResult::Valid(_)) => true, + _ => false, + } }); trace!(target: "gossip", "Cleaned up {} stale messages, {} left ({} known)", @@ -213,18 +210,16 @@ impl<B: BlockT> ConsensusGossip<B> { known_messages.len(), ); - message_times.retain(|h, _| known_messages.contains(h)); - for (_, ref mut peer) in self.peers.iter_mut() { - peer.known_messages.retain(|h| known_messages.contains(h)); + peer.known_messages.retain(|h| known_messages.contains_key(h)); } } - /// Get all incoming messages for a topic. - pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> { + /// Get data of valid, incoming messages for a topic (but might have expired meanwhile) + pub fn messages_for(&mut self, topic: B::Hash) -> mpsc::UnboundedReceiver<Vec<u8>> { let (tx, rx) = mpsc::unbounded(); for entry in self.messages.iter().filter(|e| e.topic == topic) { - tx.unbounded_send(entry.message.clone()).expect("receiver known to be live; qed"); + tx.unbounded_send(entry.message.data.clone()).expect("receiver known to be live; qed"); } self.live_message_sinks.entry(topic).or_default().push(tx); @@ -239,29 +234,57 @@ impl<B: BlockT> ConsensusGossip<B> { &mut self, protocol: &mut Context<B>, who: NodeIndex, - topic: B::Hash, message: ConsensusMessage, - broadcast: bool, + is_syncing: bool, ) -> Option<(B::Hash, ConsensusMessage)> { - let message_hash = HashFor::<B>::hash(&message[..]); - - if self.known_dead_topics.contains_key(&topic) { - trace!(target:"gossip", "Ignored message from {} in dead topic {}", who, topic); - return None; - } + let message_hash = HashFor::<B>::hash(&message.data[..]); - if self.known_messages.contains(&(topic, message_hash)) { - trace!(target:"gossip", "Ignored already known message from {} in {}", who, topic); + if self.known_messages.contains_key(&message_hash) { + trace!(target:"gossip", "Ignored already known message from {}", who); return None; } if let Some(ref mut peer) = self.peers.get_mut(&who) { use std::collections::hash_map::Entry; - peer.known_messages.insert((topic, message_hash)); + + //validate the message + let topic = match self.validators.get(&message.engine_id) + .map(|v| v.validate(&message.data)) + { + Some(ValidationResult::Valid(topic)) => topic, + Some(ValidationResult::Invalid) => { + trace!(target:"gossip", "Invalid message from {}", who); + protocol.report_peer( + who, + Severity::Bad(format!("Sent invalid consensus message")), + ); + return None; + }, + Some(ValidationResult::Expired) => { + trace!(target:"gossip", "Ignored expired message from {}", who); + if !is_syncing { + protocol.report_peer( + who, + Severity::Useless(format!("Sent expired consensus message")), + ); + } + return None; + } + None => { + protocol.report_peer( + who, + Severity::Useless(format!("Sent unknown consensus engine id")), + ); + trace!(target:"gossip", "Unknown message engine id {:?} from {}", message.engine_id, who); + return None; + } + }; + + peer.known_messages.insert(message_hash); if let Entry::Occupied(mut entry) = self.live_message_sinks.entry(topic) { debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); entry.get_mut().retain(|sink| { - if let Err(e) = sink.unbounded_send(message.clone()) { + if let Err(e) = sink.unbounded_send(message.data.clone()) { trace!(target:"gossip", "Error broadcasting message notification: {:?}", e); } !sink.is_closed() @@ -270,13 +293,12 @@ impl<B: BlockT> ConsensusGossip<B> { entry.remove_entry(); } } + self.multicast_inner(protocol, message_hash, topic, || message.clone()); + Some((topic, message)) } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); - return None; + None } - - self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone()); - Some((topic, message)) } /// Multicast a message to all peers. @@ -285,10 +307,9 @@ impl<B: BlockT> ConsensusGossip<B> { protocol: &mut Context<B>, topic: B::Hash, message: ConsensusMessage, - broadcast: bool, ) { - let message_hash = HashFor::<B>::hash(&message); - self.multicast_inner(protocol, message_hash, topic, broadcast, || message.clone()); + let message_hash = HashFor::<B>::hash(&message.data); + self.multicast_inner(protocol, message_hash, topic, || message.clone()); } fn multicast_inner<F>( @@ -296,20 +317,17 @@ impl<B: BlockT> ConsensusGossip<B> { protocol: &mut Context<B>, message_hash: B::Hash, topic: B::Hash, - broadcast: bool, get_message: F, ) where F: Fn() -> ConsensusMessage { - self.register_message(message_hash, topic, broadcast, &get_message); - self.propagate(protocol, message_hash, topic, broadcast, get_message); + self.register_message(message_hash, topic, &get_message); + self.propagate(protocol, message_hash, get_message); } /// Note new consensus session. - pub fn new_session(&mut self, parent_hash: B::Hash) { - let old_session = self.session_start.take(); - self.session_start = Some(parent_hash); - self.collect_garbage(|topic| old_session.as_ref().map_or(true, |h| topic != h)); + pub fn new_session(&mut self, _parent_hash: B::Hash) { + self.collect_garbage(); } } @@ -323,91 +341,75 @@ mod tests { macro_rules! push_msg { ($consensus:expr, $topic:expr, $hash: expr, $now: expr, $m:expr) => { - if $consensus.known_messages.insert(($topic, $hash)) { + if $consensus.known_messages.insert($hash, ()).is_none() { $consensus.messages.push(MessageEntry { topic: $topic, message_hash: $hash, - message: $m, - broadcast: false, + message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]}, + timestamp: $now, }); - $consensus.message_times.insert(($topic, $hash), $now); } } } #[test] fn collects_garbage() { + + struct AllowAll; + impl Validator<H256> for AllowAll { + fn validate(&self, _data: &[u8]) -> ValidationResult<H256> { + ValidationResult::Valid(H256::default()) + } + } + + struct AllowOne; + impl Validator<H256> for AllowOne { + fn validate(&self, data: &[u8]) -> ValidationResult<H256> { + if data[0] == 1 { + ValidationResult::Valid(H256::default()) + } else { + ValidationResult::Expired + } + } + } + let prev_hash = H256::random(); let best_hash = H256::random(); let mut consensus = ConsensusGossip::<Block>::new(); - let now = Instant::now(); let m1_hash = H256::random(); let m2_hash = H256::random(); let m1 = vec![1, 2, 3]; let m2 = vec![4, 5, 6]; + let now = Instant::now(); push_msg!(consensus, prev_hash, m1_hash, now, m1); push_msg!(consensus, best_hash, m2_hash, now, m2.clone()); - consensus.known_messages.insert((prev_hash, m1_hash)); - consensus.known_messages.insert((best_hash, m2_hash)); + consensus.known_messages.insert(m1_hash, ()); + consensus.known_messages.insert(m2_hash, ()); - // nothing to collect - consensus.collect_garbage(|_t| true); + let test_engine_id = Default::default(); + consensus.register_validator(test_engine_id, Arc::new(AllowAll)); + consensus.collect_garbage(); assert_eq!(consensus.messages.len(), 2); assert_eq!(consensus.known_messages.len(), 2); - // nothing to collect with default. - consensus.collect_garbage(|&topic| topic != Default::default()); - assert_eq!(consensus.messages.len(), 2); - assert_eq!(consensus.known_messages.len(), 2); + consensus.register_validator(test_engine_id, Arc::new(AllowOne)); - // topic that was used in one message. - consensus.collect_garbage(|topic| topic != &prev_hash); + // m2 is expired + consensus.collect_garbage(); assert_eq!(consensus.messages.len(), 1); - // known messages are only pruned based on expiration time + // known messages are only pruned based on size. assert_eq!(consensus.known_messages.len(), 2); - assert!(consensus.known_messages.contains(&(best_hash, m2_hash))); + assert!(consensus.known_messages.contains_key(&m2_hash)); // make timestamp expired, but the message is still kept as known consensus.messages.clear(); consensus.known_messages.clear(); + consensus.register_validator(test_engine_id, Arc::new(AllowAll)); push_msg!(consensus, best_hash, m2_hash, now - MESSAGE_LIFETIME, m2.clone()); - consensus.collect_garbage(|_topic| true); + consensus.collect_garbage(); assert!(consensus.messages.is_empty()); assert_eq!(consensus.known_messages.len(), 1); - - // make timestamp expired past the known message lifetime - consensus.known_messages.clear(); - push_msg!(consensus, best_hash, m2_hash, now - (5 * MESSAGE_LIFETIME), m2); - consensus.collect_garbage(|_topic| true); - assert!(consensus.messages.is_empty()); - assert!(consensus.known_messages.is_empty()); - } - - #[test] - fn collects_garbage_for_topic() { - let topic = H256::random(); - let dead_topic = H256::random(); - let message = Vec::new(); - let now = Instant::now(); - let mut consensus = ConsensusGossip::<Block>::new(); - - let message_hash = H256::random(); - push_msg!(consensus, topic, message_hash, now, message.clone()); - push_msg!(consensus, dead_topic, message_hash, now, message.clone()); - assert_eq!(consensus.messages.len(), 2); - - consensus.collect_garbage_for_topic(topic); - - // removes all messages for the topic and marks the topic as dead - assert_eq!(consensus.messages.len(), 1); - assert_eq!(consensus.known_messages.len(), 2); - assert_eq!(consensus.known_dead_topics.len(), 1); - - // new messages for dead topics are ignored - consensus.register_message(HashFor::<Block>::hash(&message), topic, false, || message.clone()); - assert_eq!(consensus.messages.len(), 1); - assert_eq!(consensus.known_messages.len(), 2); } #[test] @@ -416,15 +418,15 @@ mod tests { let mut consensus = ConsensusGossip::<Block>::new(); - let message = vec![1, 2, 3]; + let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - let message_hash = HashFor::<Block>::hash(&message); + let message_hash = HashFor::<Block>::hash(&message.data); let topic = HashFor::<Block>::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, false, || message.clone()); + consensus.register_message(message_hash, topic, || message.clone()); let stream = consensus.messages_for(topic); - assert_eq!(stream.wait().next(), Some(Ok(message))); + assert_eq!(stream.wait().next(), Some(Ok(message.data))); } #[test] @@ -432,11 +434,11 @@ mod tests { let mut consensus = ConsensusGossip::<Block>::new(); let topic = [1; 32].into(); - let msg_a = vec![1, 2, 3]; - let msg_b = vec![4, 5, 6]; + let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; + let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - consensus.register_message(HashFor::<Block>::hash(&msg_a), topic, false, || msg_a.clone()); - consensus.register_message(HashFor::<Block>::hash(&msg_b), topic, false, || msg_b.clone()); + consensus.register_message(HashFor::<Block>::hash(&msg_a.data), topic, || msg_a.clone()); + consensus.register_message(HashFor::<Block>::hash(&msg_b.data), topic, || msg_b.clone()); assert_eq!(consensus.messages.len(), 2); } @@ -447,17 +449,17 @@ mod tests { let mut consensus = ConsensusGossip::<Block>::new(); - let message = vec![1, 2, 3]; + let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - let message_hash = HashFor::<Block>::hash(&message); + let message_hash = HashFor::<Block>::hash(&message.data); let topic = HashFor::<Block>::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, false, || message.clone()); + consensus.register_message(message_hash, topic, || message.clone()); let stream1 = consensus.messages_for(topic); let stream2 = consensus.messages_for(topic); - assert_eq!(stream1.wait().next(), Some(Ok(message.clone()))); - assert_eq!(stream2.wait().next(), Some(Ok(message))); + assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone()))); + assert_eq!(stream2.wait().next(), Some(Ok(message.data))); } } diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index dc3a2dfb2f0..7a86c9e86ef 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -44,7 +44,7 @@ pub use network_libp2p::{ NodeIndex, ProtocolId, Severity, Protocol, Multiaddr, obtain_private_key, multiaddr, PeerId, PublicKey }; -pub use message::{generic as generic_message, RequestId, Status as StatusMessage}; +pub use message::{generic as generic_message, RequestId, Status as StatusMessage, ConsensusEngineId}; pub use error::Error; pub use on_demand::{OnDemand, OnDemandService, RemoteResponse}; #[doc(hidden)] diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs index 0aff9166b64..b54032d317f 100644 --- a/substrate/core/network/src/message.rs +++ b/substrate/core/network/src/message.rs @@ -30,6 +30,9 @@ pub use self::generic::{ /// A unique ID of a request. pub type RequestId = u64; +/// Consensus engine unique ID. +pub type ConsensusEngineId = [u8; 4]; + /// Type alias for using the message type using block type parameters. pub type Message<B> = generic::Message< <B as BlockT>::Header, @@ -132,10 +135,16 @@ pub mod generic { use crate::config::Roles; use super::{ BlockAttributes, RemoteCallResponse, RemoteReadResponse, - RequestId, Transactions, Direction + RequestId, Transactions, Direction, ConsensusEngineId, }; - /// Consensus is opaque to us - pub type ConsensusMessage = Vec<u8>; + /// Consensus is mostly opaque to us + #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] + pub struct ConsensusMessage { + /// Identifies consensus engine. + pub engine_id: ConsensusEngineId, + /// Message payload. + pub data: Vec<u8>, + } /// Block data sent in the response. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] @@ -177,7 +186,7 @@ pub mod generic { /// Transactions. Transactions(Transactions<Extrinsic>), /// Consensus protocol message. - Consensus(Hash, ConsensusMessage, bool), // topic, opaque Vec<u8>, broadcast + Consensus(ConsensusMessage), /// Remote method call request. RemoteCallRequest(RemoteCallRequest<Hash>), /// Remote method call response. @@ -216,6 +225,8 @@ pub mod generic { pub struct Status<Hash, Number> { /// Protocol version. pub version: u32, + /// Minimum supported version. + pub min_supported_version: u32, /// Supported roles. pub roles: Roles, /// Best block number. diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index b8c88c86244..5840371e32a 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -20,8 +20,8 @@ use primitives::storage::StorageKey; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero}; use consensus::import_queue::ImportQueue; -use crate::message::{self, Message}; -use crate::message::generic::Message as GenericMessage; +use crate::message::{self, Message, ConsensusEngineId}; +use crate::message::generic::{Message as GenericMessage, ConsensusMessage}; use crate::consensus_gossip::ConsensusGossip; use crate::on_demand::OnDemandService; use crate::specialization::NetworkSpecialization; @@ -44,7 +44,9 @@ const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1000); const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(5000); /// Current protocol version. -pub(crate) const CURRENT_VERSION: u32 = 1; +pub(crate) const CURRENT_VERSION: u32 = 2; +/// Lowest version we support +const MIN_VERSION: u32 = 2; // Maximum allowed entries in `BlockResponse` const MAX_BLOCK_DATA_RESPONSE: u32 = 128; @@ -199,7 +201,7 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>,> { /// Execute a closure with the consensus gossip. ExecuteWithGossip(Box<GossipTask<B> + Send + 'static>), /// Incoming gossip consensus message. - GossipConsensusMessage(B::Hash, Vec<u8>, bool), + GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec<u8>), /// Is protocol currently major-syncing? IsMajorSyncing(Sender<bool>), /// Is protocol currently offline? @@ -327,8 +329,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { ProtocolContext::new(&mut self.context_data, &self.network_chan); task.call_box(&mut self.consensus_gossip, &mut context); } - ProtocolMsg::GossipConsensusMessage(topic, message, broadcast) => { - self.gossip_consensus_message(topic, message, broadcast) + ProtocolMsg::GossipConsensusMessage(topic, engine_id, message) => { + self.gossip_consensus_message(topic, engine_id, message) } ProtocolMsg::IsMajorSyncing(sender) => { let is_syncing = self.sync.status().is_major_syncing(); @@ -420,13 +422,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response), GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(who, request), GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response), - GenericMessage::Consensus(topic, msg, broadcast) => { + GenericMessage::Consensus(msg) => { self.consensus_gossip.on_incoming( &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), who, - topic, msg, - broadcast, + self.sync.status().is_major_syncing(), ); } other => self.specialization.on_message( @@ -446,12 +447,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { ); } - fn gossip_consensus_message(&mut self, topic: B::Hash, message: Vec<u8>, broadcast: bool) { + fn gossip_consensus_message(&mut self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) { self.consensus_gossip.multicast( &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), topic, - message, - broadcast, + ConsensusMessage{ data: message, engine_id }, ); } @@ -599,7 +599,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Perform time based maintenance. fn tick(&mut self) { - self.consensus_gossip.collect_garbage(|_| true); + self.consensus_gossip.collect_garbage(); self.maintain_peers(); self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); self.on_demand @@ -653,7 +653,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { )); return; } - if status.version != CURRENT_VERSION { + if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { let reason = format!("Peer using unsupported protocol version {}", status.version); self.network_chan.send(NetworkMsg::ReportPeer( who, @@ -803,6 +803,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { if let Ok(info) = self.context_data.chain.info() { let status = message::generic::Status { version: CURRENT_VERSION, + min_supported_version: MIN_VERSION, genesis_hash: info.chain.genesis_hash, roles: self.config.roles.into(), best_number: info.chain.best_number, diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index f857e712d34..81e79dbad9f 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -25,7 +25,7 @@ use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, S use network_libp2p::{Protocol as Libp2pProtocol, RegisteredProtocol}; use consensus::import_queue::{ImportQueue, Link}; use crate::consensus_gossip::ConsensusGossip; -use crate::message::Message; +use crate::message::{Message, ConsensusEngineId}; use crate::protocol::{self, Context, Protocol, ProtocolMsg, ProtocolStatus, PeerInfo}; use crate::config::Params; use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; @@ -208,11 +208,11 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { } /// Send a consensus message through the gossip - pub fn gossip_consensus_message(&self, topic: B::Hash, message: Vec<u8>, broadcast: bool) { + pub fn gossip_consensus_message(&self, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec<u8>) { let _ = self .protocol_sender .send(ProtocolMsg::GossipConsensusMessage( - topic, message, broadcast, + topic, engine_id, message, )); } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 71cead8da9b..c16b2b8fd57 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -34,12 +34,12 @@ use consensus::import_queue::{BasicQueue, ImportQueue, IncomingBlock}; use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier}; use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; -use crate::consensus_gossip::{ConsensusGossip, ConsensusMessage}; +use crate::consensus_gossip::ConsensusGossip; use crossbeam_channel::{self as channel, Sender, select}; use futures::Future; use futures::sync::{mpsc, oneshot}; use keyring::Keyring; -use crate::message::Message; +use crate::message::{Message, ConsensusEngineId}; use network_libp2p::{NodeIndex, ProtocolId}; use parity_codec::Encode; use parking_lot::Mutex; @@ -274,21 +274,21 @@ impl<D> Peer<D> { /// Push a message into the gossip network and relay to peers. /// `TestNet::sync_step` needs to be called to ensure it's propagated. - pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, data: Vec<u8>, broadcast: bool) { + pub fn gossip_message(&self, topic: <Block as BlockT>::Hash, engine_id: ConsensusEngineId, data: Vec<u8>) { let _ = self .protocol_sender - .send(ProtocolMsg::GossipConsensusMessage(topic, data, broadcast)); + .send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data)); } - pub fn consensus_gossip_collect_garbage_for_topic(&self, topic: <Block as BlockT>::Hash) { - self.with_gossip(move |gossip, _| gossip.collect_garbage_for_topic(topic)) + pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: <Block as BlockT>::Hash) { + self.with_gossip(move |gossip, _| gossip.collect_garbage()) } /// access the underlying consensus gossip handler pub fn consensus_gossip_messages_for( &self, topic: <Block as BlockT>::Hash, - ) -> mpsc::UnboundedReceiver<ConsensusMessage> { + ) -> mpsc::UnboundedReceiver<Vec<u8>> { let (tx, rx) = oneshot::channel(); self.with_gossip(move |gossip, _| { let inner_rx = gossip.messages_for(topic); -- GitLab