diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index b078d3bfd5558e6e1a58c272834479e3bff8d042..e4ec0e94b1baea74fb3f6da155b5a3c88591d51f 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -27,6 +27,9 @@ use runtime_primitives::generic::BlockId; use message::{self, generic::Message as GenericMessage}; use protocol::Context; use service::Roles; +use specialization::Specialization; +use StatusMessage; +use generic_message; // TODO: Add additional spam/DoS attack protection. const MESSAGE_LIFETIME: Duration = Duration::from_secs(600); @@ -57,6 +60,7 @@ pub struct ConsensusGossip<B: BlockT> { live_message_sinks: HashMap<B::Hash, mpsc::UnboundedSender<ConsensusMessage<B>>>, messages: Vec<MessageEntry<B>>, message_hashes: HashSet<B::Hash>, + session_start: Option<B::Hash>, } impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { @@ -67,6 +71,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { live_message_sinks: HashMap::new(), messages: Default::default(), message_hashes: Default::default(), + session_start: None } } @@ -300,6 +305,55 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { self.register_message(hash, message); self.propagate(protocol, generic, hash); } + + /// Note new consensus session. + pub fn new_session(&mut self, parent_hash: B::Hash) { + let old_session = self.session_start.take(); + self.session_start = Some(parent_hash); + self.collect_garbage(|topic| old_session.as_ref().map_or(true, |h| topic != h)); + } +} + +impl<Block: BlockT> Specialization<Block> for ConsensusGossip<Block> where + Block::Header: HeaderT<Number=u64> +{ + fn status(&self) -> Vec<u8> { + Vec::new() + } + + fn on_connect(&mut self, ctx: &mut Context<Block>, who: NodeIndex, status: StatusMessage<Block>) { + self.new_peer(ctx, who, status.roles); + } + + fn on_disconnect(&mut self, ctx: &mut Context<Block>, who: NodeIndex) { + self.peer_disconnected(ctx, who); + } + + fn on_message(&mut self, ctx: &mut Context<Block>, who: NodeIndex, message: &mut Option<message::Message<Block>>) { + match message.take() { + Some(generic_message::Message::BftMessage(msg)) => { + trace!(target: "gossip", "BFT message from {}: {:?}", who, msg); + // TODO: check signature here? what if relevant block is unknown? + self.on_bft_message(ctx, who, msg) + } + r => *message = r, + } + } + + fn on_abort(&mut self) { + self.abort(); + } + + fn maintain_peers(&mut self, _ctx: &mut Context<Block>) { + self.collect_garbage(|_| true); + } + + fn on_block_imported( + &mut self, + _ctx: &mut Context<Block>, + _hash: <Block as BlockT>::Hash, + _header: &<Block as BlockT>::Header) + {} } #[cfg(test)] diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 12d14b8e4739bf2c58065f8eee3674b0817bb577..1061d241a58d6f7f0cd69e5d754f3b1f2733360b 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -49,6 +49,7 @@ extern crate substrate_test_client as test_client; mod service; mod sync; +#[macro_use] mod protocol; mod io; mod config; @@ -74,3 +75,5 @@ pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBf pub use error::Error; pub use config::{Roles, ProtocolConfig}; pub use on_demand::{OnDemand, OnDemandService, RemoteResponse}; +#[doc(hidden)] +pub use runtime_primitives::traits::Block as BlockT; diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 1bf96c3744812e82f72e6faaf83954b068afc122..be62717618777c864b15d408b58499977b5167fa 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -277,7 +277,7 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> { GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(io, who, response), GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(io, who, request), GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(io, who, response), - other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other), + other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, &mut Some(other)), } } @@ -709,3 +709,102 @@ pub(crate) fn hash_message<B: BlockT>(message: &Message<B>) -> B::Hash { let data = message.encode(); HashFor::<B>::hash(&data) } + +/// Construct a simple protocol that is composed of several sub protocols. +/// Each "sub protocol" needs to implement `Specialization` and needs to provide a `new()` function. +/// For more fine grained implementations, this macro is not usable. +/// +/// # Example +/// +/// ```nocompile +/// construct_simple_protocol! { +/// pub struct MyProtocol where Block = MyBlock { +/// consensus_gossip: ConsensusGossip<MyBlock>, +/// other_protocol: MyCoolStuff, +/// } +/// } +/// ``` +/// +/// You can also provide an optional parameter after `where Block = MyBlock`, so it looks like +/// `where Block = MyBlock, Status = consensus_gossip`. This will instruct the implementation to +/// use the `status()` function from the `ConsensusGossip` protocol. By default, `status()` returns +/// an empty vector. +#[macro_export] +macro_rules! construct_simple_protocol { + ( + $( #[ $attr:meta ] )* + pub struct $protocol:ident where + Block = $block:ident + $( , Status = $status_protocol_name:ident )* + { + $( $sub_protocol_name:ident : $sub_protocol:ident $( <$protocol_block:ty> )*, )* + } + ) => { + $( #[$attr] )* + pub struct $protocol { + $( $sub_protocol_name: $sub_protocol $( <$protocol_block> )*, )* + } + + impl $protocol { + /// Instantiate a node protocol handler. + pub fn new() -> Self { + Self { + $( $sub_protocol_name: $sub_protocol::new(), )* + } + } + } + + impl $crate::specialization::Specialization<$block> for $protocol { + fn status(&self) -> Vec<u8> { + $( + let status = self.$status_protocol_name.status(); + + if !status.is_empty() { + return status; + } + )* + + Vec::new() + } + + fn on_connect( + &mut self, + ctx: &mut $crate::Context<$block>, + who: $crate::NodeIndex, + status: $crate::StatusMessage<$block> + ) { + $( self.$sub_protocol_name.on_connect(ctx, who, status); )* + } + + fn on_disconnect(&mut self, ctx: &mut $crate::Context<$block>, who: $crate::NodeIndex) { + $( self.$sub_protocol_name.on_disconnect(ctx, who); )* + } + + fn on_message( + &mut self, + ctx: &mut $crate::Context<$block>, + who: $crate::NodeIndex, + message: &mut Option<$crate::message::Message<$block>> + ) { + $( self.$sub_protocol_name.on_message(ctx, who, message); )* + } + + fn on_abort(&mut self) { + $( self.$sub_protocol_name.on_abort(); )* + } + + fn maintain_peers(&mut self, ctx: &mut $crate::Context<$block>) { + $( self.$sub_protocol_name.maintain_peers(ctx); )* + } + + fn on_block_imported( + &mut self, + ctx: &mut $crate::Context<$block>, + hash: <$block as $crate::BlockT>::Hash, + header: &<$block as $crate::BlockT>::Header + ) { + $( self.$sub_protocol_name.on_block_imported(ctx, hash, header); )* + } + } + } +} diff --git a/substrate/core/network/src/specialization.rs b/substrate/core/network/src/specialization.rs index 6fe3289330ae4b3fd9613c7feb514d93074fba16..ccd1071adb4cef806ca97a1d307a795fe7be8a4e 100644 --- a/substrate/core/network/src/specialization.rs +++ b/substrate/core/network/src/specialization.rs @@ -35,7 +35,7 @@ pub trait Specialization<B: BlockT>: Send + Sync + 'static { fn on_disconnect(&mut self, ctx: &mut Context<B>, who: NodeIndex); /// Called when a network-specific message arrives. - fn on_message(&mut self, ctx: &mut Context<B>, who: NodeIndex, message: ::message::Message<B>); + fn on_message(&mut self, ctx: &mut Context<B>, who: NodeIndex, message: &mut Option<::message::Message<B>>); /// Called on abort. fn on_abort(&mut self) { } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 6f4e1b1818fd5f745f016f6ab3b8f11880561fc9..606b82d8ff8f18a446c2d69e9f2bd8b89ab7a0f1 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -66,8 +66,8 @@ impl Specialization<Block> for DummySpecialization { self.gossip.peer_disconnected(ctx, peer_id); } - fn on_message(&mut self, ctx: &mut Context<Block>, peer_id: NodeIndex, message: ::message::Message<Block>) { - if let ::message::generic::Message::ChainSpecific(data) = message { + fn on_message(&mut self, ctx: &mut Context<Block>, peer_id: NodeIndex, message: &mut Option<::message::Message<Block>>) { + if let Some(::message::generic::Message::ChainSpecific(data)) = message.take() { let gossip_message = GossipMessage::decode(&mut &data[..]) .expect("gossip messages all in known format; qed"); self.gossip.on_chain_specific(ctx, peer_id, data, gossip_message.topic) diff --git a/substrate/node/network/src/consensus.rs b/substrate/node/network/src/consensus.rs index e4c111d625f97a4fd428d8f9a9e9f4adc8597be7..44d338d3303982206179c3f7fc5e716125b0a53c 100644 --- a/substrate/node/network/src/consensus.rs +++ b/substrate/node/network/src/consensus.rs @@ -275,7 +275,7 @@ impl<P: AuthoringApi + Send + Sync + 'static> Network for ConsensusNetwork<P> { // spin up a task in the background that processes all incoming statements // TODO: propagate statements on a timer? let process_task = self.network.with_spec(|spec, _ctx| { - spec.new_consensus(parent_hash); + spec.consensus_gossip.new_session(parent_hash); MessageProcessTask { inner_stream: spec.consensus_gossip.messages_for(parent_hash), bft_messages: bft_send, diff --git a/substrate/node/network/src/lib.rs b/substrate/node/network/src/lib.rs index b205da06524ffc872ad87be1716f6075a55fc31e..109e53616d9a16a215646d4f0bfa924698b1e616 100644 --- a/substrate/node/network/src/lib.rs +++ b/substrate/node/network/src/lib.rs @@ -21,6 +21,7 @@ #![warn(unused_extern_crates)] extern crate substrate_bft as bft; +#[macro_use] extern crate substrate_network; extern crate substrate_primitives; @@ -36,79 +37,15 @@ extern crate log; pub mod consensus; -use node_primitives::{Block, Hash, Header}; -use substrate_network::{NodeIndex, Context, Severity}; +use node_primitives::{Block, Hash}; use substrate_network::consensus_gossip::ConsensusGossip; -use substrate_network::{message, generic_message}; -use substrate_network::specialization::Specialization; -use substrate_network::StatusMessage as GenericFullStatus; - -type FullStatus = GenericFullStatus<Block>; /// Specialization of the network service for the node protocol. pub type NetworkService = ::substrate_network::Service<Block, Protocol, Hash>; - -/// Demo protocol attachment for substrate. -pub struct Protocol { - consensus_gossip: ConsensusGossip<Block>, - live_consensus: Option<Hash>, -} - -impl Protocol { - /// Instantiate a node protocol handler. - pub fn new() -> Self { - Protocol { - consensus_gossip: ConsensusGossip::new(), - live_consensus: None, - } - } - - /// Note new consensus session. - fn new_consensus(&mut self, parent_hash: Hash) { - let old_consensus = self.live_consensus.take(); - self.live_consensus = Some(parent_hash); - self.consensus_gossip - .collect_garbage(|topic| old_consensus.as_ref().map_or(true, |h| topic != h)); - } -} - -impl Specialization<Block> for Protocol { - fn status(&self) -> Vec<u8> { - Vec::new() - } - - fn on_connect(&mut self, ctx: &mut Context<Block>, who: NodeIndex, status: FullStatus) { - self.consensus_gossip.new_peer(ctx, who, status.roles); - } - - fn on_disconnect(&mut self, ctx: &mut Context<Block>, who: NodeIndex) { - self.consensus_gossip.peer_disconnected(ctx, who); - } - - fn on_message(&mut self, ctx: &mut Context<Block>, who: NodeIndex, message: message::Message<Block>) { - match message { - generic_message::Message::BftMessage(msg) => { - trace!(target: "node-network", "BFT message from {}: {:?}", who, msg); - // TODO: check signature here? what if relevant block is unknown? - self.consensus_gossip.on_bft_message(ctx, who, msg) - } - generic_message::Message::ChainSpecific(_) => { - trace!(target: "node-network", "Bad message from {}", who); - ctx.report_peer(who, Severity::Bad("Invalid node protocol message format")); - } - _ => {} - } - } - - fn on_abort(&mut self) { - self.consensus_gossip.abort(); - } - - fn maintain_peers(&mut self, _ctx: &mut Context<Block>) { - self.consensus_gossip.collect_garbage(|_| true); - } - - fn on_block_imported(&mut self, _ctx: &mut Context<Block>, _hash: Hash, _header: &Header) { +construct_simple_protocol! { + /// Demo protocol attachment for substrate. + pub struct Protocol where Block = Block { + consensus_gossip: ConsensusGossip<Block>, } }