diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 6ff134014b288328cbccef42df0f33c8081a2a05..ab125cb005ee7926bbd998460a9d0df6442ac34b 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -4423,8 +4423,8 @@ dependencies = [ "maplit", "parity-scale-codec", "parking_lot 0.11.0", - "polkadot-network", "polkadot-network-bridge", + "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", @@ -4450,19 +4450,16 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.11.0", "polkadot-erasure-coding", - "polkadot-network", "polkadot-network-bridge", - "polkadot-node-primitives", + "polkadot-node-network-protocol", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-primitives", "sc-keystore", - "sc-network", "smallvec 1.4.1", "smol-timeout", "sp-core", "sp-keyring", - "sp-staking", "streamunordered", ] @@ -4608,12 +4605,13 @@ dependencies = [ "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", - "polkadot-node-primitives", + "polkadot-node-network-protocol", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-primitives", "sc-network", "sp-core", + "sp-keyring", "sp-runtime", "streamunordered", ] @@ -4786,6 +4784,18 @@ dependencies = [ "sp-core", ] +[[package]] +name = "polkadot-node-network-protocol" +version = "0.1.0" +dependencies = [ + "parity-scale-codec", + "polkadot-node-primitives", + "polkadot-primitives", + "sc-network", + "sp-core", + "sp-runtime", +] + [[package]] name = "polkadot-node-primitives" version = "0.1.0" @@ -4810,6 +4820,7 @@ dependencies = [ "parity-scale-codec", "parking_lot 0.10.2", "pin-project", + "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem-test-helpers", "polkadot-primitives", @@ -4913,11 +4924,11 @@ dependencies = [ "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", + "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-primitives", - "sc-network", "sp-core", "sp-runtime", "streamunordered", @@ -5271,6 +5282,7 @@ dependencies = [ "log 0.4.11", "parity-scale-codec", "parking_lot 0.10.2", + "polkadot-node-network-protocol", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index be1bc74858b2e806cdc53e5f2d4907f8afb0dd16..0fa1ae7e2485c2cda4bb512d27594e712cf47602 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -55,6 +55,7 @@ members = [ "node/core/runtime-api", "node/network/bridge", "node/network/pov-distribution", + "node/network/protocol", "node/network/statement-distribution", "node/network/bitfield-distribution", "node/network/availability-distribution", diff --git a/polkadot/node/network/availability-distribution/Cargo.toml b/polkadot/node/network/availability-distribution/Cargo.toml index c2d313c3cf1f66f48e5970783267c649e9b4c7b6..a966c769bddfd011c1838a18e0cd221cec0627d0 100644 --- a/polkadot/node/network/availability-distribution/Cargo.toml +++ b/polkadot/node/network/availability-distribution/Cargo.toml @@ -9,16 +9,13 @@ futures = "0.3.5" log = "0.4.11" streamunordered = "0.5.1" codec = { package="parity-scale-codec", version = "1.3.4", features = ["std"] } -node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } polkadot-primitives = { path = "../../../primitives" } polkadot-erasure-coding = { path = "../../../erasure-coding" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-network-bridge = { path = "../../network/bridge" } -polkadot-network = { path = "../../../network" } -sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-node-network-protocol = { path = "../../network/protocol" } sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } derive_more = "0.99.9" -sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } [dev-dependencies] @@ -31,4 +28,4 @@ futures-timer = "3.0.2" smol-timeout = "0.1.0" env_logger = "0.7.1" assert_matches = "1.3.0" -smallvec = "1" \ No newline at end of file +smallvec = "1" diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index b3934ded6fe1970181c8fb343dffd9a4c56b266c..2a5a08606575911b415fe179d7a7cf5bb8d4aae0 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -32,24 +32,27 @@ use sp_core::{ }; use sc_keystore as keystore; -use node_primitives::{ProtocolId, View}; - use log::{trace, warn}; use polkadot_erasure_coding::branch_hash; use polkadot_primitives::v1::{ PARACHAIN_KEY_TYPE_ID, BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk, Hash as Hash, HashT, Id as ParaId, - ValidatorId, ValidatorIndex, + ValidatorId, ValidatorIndex, SessionIndex, +}; +use polkadot_subsystem::messages::{ + AllMessages, AvailabilityDistributionMessage, NetworkBridgeMessage, RuntimeApiMessage, + RuntimeApiRequest, AvailabilityStoreMessage, ChainApiMessage, }; -use polkadot_subsystem::messages::*; use polkadot_subsystem::{ errors::{ChainApiError, RuntimeApiError}, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, }; -use sc_network::ReputationChange as Rep; -use sp_staking::SessionIndex; +use polkadot_node_network_protocol::{ + v1 as protocol_v1, View, ReputationChange as Rep, PeerId, + NetworkBridgeEvent, +}; use std::collections::{HashMap, HashSet}; use std::io; use std::iter; @@ -76,7 +79,6 @@ type Result<T> = std::result::Result<T, Error>; const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid"); const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live"); -const COST_MESSAGE_NOT_DECODABLE: Rep = Rep::new(-100, "Message is not decodable"); const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages"); const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information"); const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message"); @@ -284,17 +286,13 @@ impl ProtocolState { } } -fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { - AllMessages::AvailabilityDistribution(AvailabilityDistributionMessage::NetworkBridgeUpdate(n)) -} - /// Deal with network bridge updates and track what needs to be tracked /// which depends on the message type received. async fn handle_network_msg<Context>( ctx: &mut Context, keystore: KeyStorePtr, state: &mut ProtocolState, - bridge_message: NetworkBridgeEvent, + bridge_message: NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>, ) -> Result<()> where Context: SubsystemContext<Message = AvailabilityDistributionMessage>, @@ -314,19 +312,13 @@ where NetworkBridgeEvent::OurViewChange(view) => { handle_our_view_change(ctx, keystore, state, view).await?; } - NetworkBridgeEvent::PeerMessage(remote, bytes) => { - if let Ok(gossiped_availability) = - AvailabilityGossipMessage::decode(&mut (bytes.as_slice())) - { - trace!( - target: TARGET, - "Received availability gossip from peer {:?}", - &remote - ); - process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?; - } else { - modify_reputation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await?; - } + NetworkBridgeEvent::PeerMessage(remote, msg) => { + let gossiped_availability = match msg { + protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) => + AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk } + }; + + process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?; } } Ok(()) @@ -494,16 +486,19 @@ where .insert(message_id); } - let encoded = message.encode(); per_candidate .message_vault - .insert(message.erasure_chunk.index, message); + .insert(message.erasure_chunk.index, message.clone()); + + let wire_message = protocol_v1::AvailabilityDistributionMessage::Chunk( + message.candidate_hash, + message.erasure_chunk, + ); ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendMessage( + NetworkBridgeMessage::SendValidationMessage( peers.clone(), - AvailabilityDistributionSubsystem::PROTOCOL_ID, - encoded, + protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message), ), )) .await @@ -709,9 +704,6 @@ pub struct AvailabilityDistributionSubsystem { } impl AvailabilityDistributionSubsystem { - /// The protocol identifier for bitfield distribution. - const PROTOCOL_ID: ProtocolId = *b"avad"; - /// Number of ancestors to keep around for the relay-chain heads. const K: usize = 3; @@ -725,20 +717,13 @@ impl AvailabilityDistributionSubsystem { where Context: SubsystemContext<Message = AvailabilityDistributionMessage>, { - // startup: register the network protocol with the bridge. - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::RegisterEventProducer(Self::PROTOCOL_ID, network_update_message), - )) - .await - .map_err::<Error, _>(Into::into)?; - // work: process incoming messages from the overseer. let mut state = ProtocolState::default(); loop { let message = ctx.recv().await.map_err::<Error, _>(Into::into)?; match message { FromOverseer::Communication { - msg: AvailabilityDistributionMessage::NetworkBridgeUpdate(event), + msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event), } => { if let Err(e) = handle_network_msg( &mut ctx, diff --git a/polkadot/node/network/availability-distribution/src/tests.rs b/polkadot/node/network/availability-distribution/src/tests.rs index 130c2700ca805b76a7db7da2f80b1af09439a67a..f4988fd8b77ed91b00a1ea8f67784276749a4f44 100644 --- a/polkadot/node/network/availability-distribution/src/tests.rs +++ b/polkadot/node/network/availability-distribution/src/tests.rs @@ -1,3 +1,19 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + use super::*; use assert_matches::assert_matches; use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; @@ -7,6 +23,7 @@ use polkadot_primitives::v1::{ OmittedValidationData, PoV, ScheduledCore, ValidatorPair, }; use polkadot_subsystem_testhelpers as test_helpers; +use polkadot_node_network_protocol::ObservedRole; use futures::{executor, future, Future}; use futures_timer::Delay; @@ -26,6 +43,15 @@ macro_rules! delay { }; } +fn chunk_protocol_message(message: AvailabilityGossipMessage) + -> protocol_v1::AvailabilityDistributionMessage +{ + protocol_v1::AvailabilityDistributionMessage::Chunk( + message.candidate_hash, + message.erasure_chunk, + ) +} + struct TestHarness { virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>, } @@ -437,12 +463,9 @@ fn reputation_verification() { ) .await; - // ignore event producer registration - let _ = overseer_recv(&mut virtual_overseer).await; - overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::OurViewChange(view![current,]), ), ) @@ -668,7 +691,7 @@ fn reputation_verification() { // setup peer a with interest in current overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full), ), ) @@ -676,7 +699,7 @@ fn reputation_verification() { overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current]), ), ) @@ -685,7 +708,7 @@ fn reputation_verification() { // setup peer b with interest in ancestor overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), ), ) @@ -693,7 +716,7 @@ fn reputation_verification() { overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![ancestors[0]]), ), ) @@ -701,35 +724,6 @@ fn reputation_verification() { delay!(100); - ///////////////////////////////////////////////////////// - // ready for action - - // check if garbage messages are detected and peer rep is changed as expected - let garbage = b"I am garbage"; - - overseer_send( - &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - // AvailabilityDistributionSubsystem::PROTOCOL_ID, - garbage.to_vec(), - )), - ) - .await; - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer( - peer, - rep - ) - ) => { - assert_eq!(peer, peer_b); - assert_eq!(rep, COST_MESSAGE_NOT_DECODABLE); - } - ); - let valid: AvailabilityGossipMessage = make_valid_availability_gossip( &test_state, candidates[0].hash(), @@ -741,8 +735,11 @@ fn reputation_verification() { // valid (first, from b) overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerMessage(peer_b.clone(), valid.encode()), + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + chunk_protocol_message(valid.clone()), + ), ), ) .await; @@ -765,8 +762,11 @@ fn reputation_verification() { // valid (duplicate, from b) overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerMessage(peer_b.clone(), valid.encode()), + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + chunk_protocol_message(valid.clone()), + ), ), ) .await; @@ -789,8 +789,11 @@ fn reputation_verification() { // valid (second, from a) overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid.encode()), + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + chunk_protocol_message(valid.clone()), + ), ), ) .await; @@ -812,7 +815,7 @@ fn reputation_verification() { // peer a is not interested in anything anymore overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![]), ), ) @@ -822,8 +825,11 @@ fn reputation_verification() { // send the a message again, so we should detect the duplicate overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid.encode()), + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + chunk_protocol_message(valid.clone()), + ), ), ) .await; @@ -846,7 +852,7 @@ fn reputation_verification() { // setup peer a with interest in parent x overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerDisconnected(peer_b.clone()), ), ) @@ -856,7 +862,7 @@ fn reputation_verification() { overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), ), ) @@ -874,8 +880,11 @@ fn reputation_verification() { // send the a message before we send a view update overseer_send( &mut virtual_overseer, - AvailabilityDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid2.encode()), + AvailabilityDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + chunk_protocol_message(valid2), + ), ), ) .await; diff --git a/polkadot/node/network/bitfield-distribution/Cargo.toml b/polkadot/node/network/bitfield-distribution/Cargo.toml index 0ae5ad5a3ddc5ed0c9ed498455eefdc2430be7e1..768d63e1f606c63ec81b9c1cd80ecda021cdf785 100644 --- a/polkadot/node/network/bitfield-distribution/Cargo.toml +++ b/polkadot/node/network/bitfield-distribution/Cargo.toml @@ -14,7 +14,7 @@ node-primitives = { package = "polkadot-node-primitives", path = "../../primitiv polkadot-primitives = { path = "../../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-network-bridge = { path = "../../network/bridge" } -polkadot-network = { path = "../../../network" } +polkadot-node-network-protocol = { path = "../../network/protocol" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } [dev-dependencies] diff --git a/polkadot/node/network/bitfield-distribution/src/lib.rs b/polkadot/node/network/bitfield-distribution/src/lib.rs index 939b6616df37637436839c3580480cad961fc380..8da071bddd1e1dc0ed78a78fb9edd759c65a4460 100644 --- a/polkadot/node/network/bitfield-distribution/src/lib.rs +++ b/polkadot/node/network/bitfield-distribution/src/lib.rs @@ -23,15 +23,13 @@ use codec::{Decode, Encode}; use futures::{channel::oneshot, FutureExt}; -use node_primitives::{ProtocolId, View}; - use log::{trace, warn}; use polkadot_subsystem::messages::*; use polkadot_subsystem::{ ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, }; use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId}; -use sc_network::ReputationChange; +use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange}; use std::collections::{HashMap, HashSet}; const COST_SIGNATURE_INVALID: ReputationChange = @@ -42,8 +40,6 @@ const COST_MISSING_PEER_SESSION_KEY: ReputationChange = ReputationChange::new(-133, "Missing peer session key"); const COST_NOT_IN_VIEW: ReputationChange = ReputationChange::new(-51, "Not interested in that parent hash"); -const COST_MESSAGE_NOT_DECODABLE: ReputationChange = - ReputationChange::new(-100, "Not interested in that parent hash"); const COST_PEER_DUPLICATE_MESSAGE: ReputationChange = ReputationChange::new(-500, "Peer sent the same message multiple times"); const BENEFIT_VALID_MESSAGE_FIRST: ReputationChange = @@ -54,11 +50,28 @@ const BENEFIT_VALID_MESSAGE: ReputationChange = /// Checked signed availability bitfield that is distributed /// to other peers. #[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)] -pub struct BitfieldGossipMessage { +struct BitfieldGossipMessage { /// The relay parent this message is relative to. - pub relay_parent: Hash, + relay_parent: Hash, /// The actual signed availability bitfield. - pub signed_availability: SignedAvailabilityBitfield, + signed_availability: SignedAvailabilityBitfield, +} + +impl BitfieldGossipMessage { + fn into_validation_protocol(self) -> protocol_v1::ValidationProtocol { + protocol_v1::ValidationProtocol::BitfieldDistribution( + self.into_network_message() + ) + } + + fn into_network_message(self) + -> protocol_v1::BitfieldDistributionMessage + { + protocol_v1::BitfieldDistributionMessage::Bitfield( + self.relay_parent, + self.signed_availability, + ) + } } /// Data used to track information of peers and relay parents the @@ -114,28 +127,15 @@ impl PerRelayParentData { } } -fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { - AllMessages::BitfieldDistribution(BitfieldDistributionMessage::NetworkBridgeUpdate(n)) -} - /// The bitfield distribution subsystem. pub struct BitfieldDistribution; impl BitfieldDistribution { - /// The protocol identifier for bitfield distribution. - const PROTOCOL_ID: ProtocolId = *b"bitd"; - /// Start processing work as passed on from the Overseer. async fn run<Context>(mut ctx: Context) -> SubsystemResult<()> where Context: SubsystemContext<Message = BitfieldDistributionMessage>, { - // startup: register the network protocol with the bridge. - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::RegisterEventProducer(Self::PROTOCOL_ID, network_update_message), - )) - .await?; - // work: process incoming messages from the overseer and process accordingly. let mut state = ProtocolState::default(); loop { @@ -149,7 +149,7 @@ impl BitfieldDistribution { .await?; } FromOverseer::Communication { - msg: BitfieldDistributionMessage::NetworkBridgeUpdate(event), + msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event), } => { trace!(target: "bitd", "Processing NetworkMessage"); // a network message was received @@ -314,10 +314,9 @@ where ); } else { ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendMessage( + NetworkBridgeMessage::SendValidationMessage( interested_peers, - BitfieldDistribution::PROTOCOL_ID, - message.encode(), + message.into_validation_protocol(), ), )) .await?; @@ -413,7 +412,7 @@ where async fn handle_network_msg<Context>( ctx: &mut Context, state: &mut ProtocolState, - bridge_message: NetworkBridgeEvent, + bridge_message: NetworkBridgeEvent<protocol_v1::BitfieldDistributionMessage>, ) -> SubsystemResult<()> where Context: SubsystemContext<Message = BitfieldDistributionMessage>, @@ -433,12 +432,16 @@ where NetworkBridgeEvent::OurViewChange(view) => { handle_our_view_change(state, view)?; } - NetworkBridgeEvent::PeerMessage(remote, bytes) => { - if let Ok(gossiped_bitfield) = BitfieldGossipMessage::decode(&mut (bytes.as_slice())) { - trace!(target: "bitd", "Received bitfield gossip from peer {:?}", &remote); - process_incoming_peer_message(ctx, state, remote, gossiped_bitfield).await?; - } else { - modify_reputation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await?; + NetworkBridgeEvent::PeerMessage(remote, message) => { + match message { + protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => { + trace!(target: "bitd", "Received bitfield gossip from peer {:?}", &remote); + let gossiped_bitfield = BitfieldGossipMessage { + relay_parent, + signed_availability: bitfield, + }; + process_incoming_peer_message(ctx, state, remote, gossiped_bitfield).await?; + } } } } @@ -541,10 +544,9 @@ where .insert(validator.clone()); ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendMessage( + NetworkBridgeMessage::SendValidationMessage( vec![dest], - BitfieldDistribution::PROTOCOL_ID, - message.encode(), + message.into_validation_protocol(), ), )) .await?; @@ -612,6 +614,7 @@ mod test { use sp_core::crypto::Pair; use std::time::Duration; use assert_matches::assert_matches; + use polkadot_node_network_protocol::ObservedRole; macro_rules! view { ( $( $hash:expr ),* $(,)? ) => [ @@ -742,7 +745,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, - NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.encode()), + NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()), )); // reputation change due to invalid validator index @@ -795,7 +798,7 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, - NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.encode()), + NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.into_network_message()), )); // reputation change due to invalid validator index @@ -848,7 +851,10 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, - NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.encode()), + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + msg.clone().into_network_message(), + ), )); // none of our peers has any interest in any messages @@ -878,7 +884,10 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, - NetworkBridgeEvent::PeerMessage(peer_a.clone(), msg.encode()), + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + msg.clone().into_network_message(), + ), )); assert_matches!( @@ -895,7 +904,10 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, - NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.encode()), + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + msg.clone().into_network_message(), + ), )); assert_matches!( @@ -909,6 +921,7 @@ mod test { ); }); } + #[test] fn changing_view() { let _ = env_logger::builder() @@ -960,7 +973,10 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, - NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.encode()), + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + msg.clone().into_network_message(), + ), )); // gossip to the overseer @@ -977,12 +993,11 @@ mod test { // gossip to the network assert_matches!( handle.recv().await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage ( - peers, proto, bytes + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage ( + peers, out_msg, )) => { assert_eq!(peers, peers![peer_b]); - assert_eq!(proto, BitfieldDistribution::PROTOCOL_ID); - assert_eq!(bytes, msg.encode()); + assert_eq!(out_msg, msg.clone().into_validation_protocol()); } ); @@ -1014,7 +1029,10 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, - NetworkBridgeEvent::PeerMessage(peer_b.clone(), msg.encode()), + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + msg.clone().into_network_message(), + ), )); // reputation change for peer B @@ -1042,7 +1060,10 @@ mod test { launch!(handle_network_msg( &mut ctx, &mut state, - NetworkBridgeEvent::PeerMessage(peer_a.clone(), msg.encode()), + NetworkBridgeEvent::PeerMessage( + peer_a.clone(), + msg.clone().into_network_message(), + ), )); // reputation change for peer B @@ -1058,59 +1079,4 @@ mod test { }); } - - - #[test] - fn invalid_peer_message() { - let _ = env_logger::builder() - .filter(None, log::LevelFilter::Trace) - .is_test(true) - .try_init(); - - let hash_a: Hash = [0; 32].into(); - let peer_a = PeerId::random(); - - // validator 0 key pair - let (mut state, _signing_context, _validator_pair) = state_with_view(view![], hash_a.clone()); - - let pool = sp_core::testing::TaskExecutor::new(); - let (mut ctx, mut handle) = - make_subsystem_context::<BitfieldDistributionMessage, _>(pool); - - executor::block_on(async move { - launch!(handle_network_msg( - &mut ctx, - &mut state, - NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full), - )); - - // make peer b interested - launch!(handle_network_msg( - &mut ctx, - &mut state, - NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a]), - )); - - assert!(state.peer_views.contains_key(&peer_a)); - - // recv a first message from the network - launch!(handle_network_msg( - &mut ctx, - &mut state, - NetworkBridgeEvent::PeerMessage(peer_a.clone(), b"00AaBbCcDdEeFf".to_vec()), - )); - - // reputation change for peer A - assert_matches!( - handle.recv().await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ReportPeer(peer, rep) - ) => { - assert_eq!(peer, peer_a); - assert_eq!(rep, COST_MESSAGE_NOT_DECODABLE); - } - ); - - }); - } } diff --git a/polkadot/node/network/bridge/Cargo.toml b/polkadot/node/network/bridge/Cargo.toml index ef8b605bb15c3da3443a9f7c14780fbac270bcd4..555f158b782e75f12438345e636d8c3a23f4f3d4 100644 --- a/polkadot/node/network/bridge/Cargo.toml +++ b/polkadot/node/network/bridge/Cargo.toml @@ -10,14 +10,15 @@ log = "0.4.8" futures-timer = "3.0.2" streamunordered = "0.5.1" polkadot-primitives = { path = "../../../primitives" } -node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } parity-scale-codec = "1.3.4" sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-node-network-protocol = { path = "../protocol" } [dev-dependencies] assert_matches = "1.3.0" parking_lot = "0.10.0" polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 85063b2b0c8b30b321ba8b974cc93e502ef7bd1b..9f347d39f19bb5ad034ed7290fe8e623653585a6 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -20,23 +20,27 @@ use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; use futures::future::BoxFuture; use futures::stream::BoxStream; +use futures::channel::oneshot; -use sc_network::{ - ObservedRole, ReputationChange, PeerId, - Event as NetworkEvent, -}; +use sc_network::Event as NetworkEvent; use sp_runtime::ConsensusEngineId; use polkadot_subsystem::{ ActiveLeavesUpdate, FromOverseer, OverseerSignal, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError, SubsystemResult, }; -use polkadot_subsystem::messages::{NetworkBridgeEvent, NetworkBridgeMessage, AllMessages}; -use node_primitives::{ProtocolId, View}; -use polkadot_primitives::v1::{Block, Hash}; +use polkadot_subsystem::messages::{ + NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage, + BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage, + CollatorProtocolMessage, +}; +use polkadot_primitives::v1::{Block, Hash, ValidatorId}; +use polkadot_node_network_protocol::{ + ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1 +}; -use std::collections::btree_map::{BTreeMap, Entry as BEntry}; use std::collections::hash_map::{HashMap, Entry as HEntry}; +use std::iter::ExactSizeIterator; use std::pin::Pin; use std::sync::Arc; @@ -45,15 +49,19 @@ use std::sync::Arc; /// We use the same limit to compute the view sent to peers locally. const MAX_VIEW_HEADS: usize = 5; -/// The engine ID of the polkadot network protocol. -pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2"; -/// The protocol name. -pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/2"; +/// The engine ID of the validation protocol. +pub const VALIDATION_PROTOCOL_ID: ConsensusEngineId = *b"pvn1"; +/// The protocol name for the validation peer-set. +pub const VALIDATION_PROTOCOL_NAME: &[u8] = b"/polkadot/validation/1"; +/// The engine ID of the collation protocol. +pub const COLLATION_PROTOCOL_ID: ConsensusEngineId = *b"pcn1"; +/// The protocol name for the collation peer-set. +pub const COLLATION_PROTOCOL_NAME: &[u8] = b"/polkadot/collation/1"; const MALFORMED_MESSAGE_COST: ReputationChange = ReputationChange::new(-500, "Malformed Network-bridge message"); -const UNKNOWN_PROTO_COST: ReputationChange - = ReputationChange::new(-50, "Message sent to unknown protocol"); +const UNCONNECTED_PEERSET_COST: ReputationChange + = ReputationChange::new(-50, "Message sent to un-connected peer-set"); const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view"); @@ -62,10 +70,10 @@ const TARGET: &'static str = "network_bridge"; /// Messages received on the network. #[derive(Debug, Encode, Decode, Clone)] -pub enum WireMessage { +pub enum WireMessage<M> { /// A message from a peer on a specific protocol. #[codec(index = "1")] - ProtocolMessage(ProtocolId, Vec<u8>), + ProtocolMessage(M), /// A view update from a peer. #[codec(index = "2")] ViewUpdate(View), @@ -73,24 +81,28 @@ pub enum WireMessage { /// Information about the notifications protocol. Should be used during network configuration /// or shortly after startup to register the protocol with the network service. -pub fn notifications_protocol_info() -> (ConsensusEngineId, std::borrow::Cow<'static, [u8]>) { - (POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME.into()) +pub fn notifications_protocol_info() -> Vec<(ConsensusEngineId, std::borrow::Cow<'static, [u8]>)> { + vec![ + (VALIDATION_PROTOCOL_ID, VALIDATION_PROTOCOL_NAME.into()), + (COLLATION_PROTOCOL_ID, COLLATION_PROTOCOL_NAME.into()), + ] } /// An action to be carried out by the network. -#[derive(PartialEq)] +#[derive(Debug, PartialEq)] pub enum NetworkAction { /// Note a change in reputation for a peer. ReputationChange(PeerId, ReputationChange), - /// Write a notification to a given peer. - WriteNotification(PeerId, Vec<u8>), + /// Write a notification to a given peer on the given peer-set. + WriteNotification(PeerId, PeerSet, Vec<u8>), } /// An abstraction over networking for the purposes of this subsystem. pub trait Network: Send + 'static { /// Get a stream of all events occurring on the network. This may include events unrelated /// to the Polkadot protocol - the user of this function should filter only for events related - /// to the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID). + /// to the [`VALIDATION_PROTOCOL_ID`](VALIDATION_PROTOCOL_ID) + /// or [`COLLATION_PROTOCOL_ID`](COLLATION_PROTOCOL_ID) fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>; /// Get access to an underlying sink for all network actions. @@ -107,12 +119,12 @@ pub trait Network: Send + 'static { }.boxed() } - /// Write a notification to a peer on the [`POLKADOT_ENGINE_ID`](POLKADOT_ENGINE_ID) topic. - fn write_notification(&mut self, who: PeerId, message: Vec<u8>) + /// Write a notification to a peer on the given peer-set's protocol. + fn write_notification(&mut self, who: PeerId, peer_set: PeerSet, message: Vec<u8>) -> BoxFuture<SubsystemResult<()>> { async move { - self.action_sink().send(NetworkAction::WriteNotification(who, message)).await + self.action_sink().send(NetworkAction::WriteNotification(who, peer_set, message)).await }.boxed() } } @@ -143,11 +155,20 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> { peer, cost_benefit, ), - NetworkAction::WriteNotification(peer, message) => self.0.write_notification( - peer, - POLKADOT_ENGINE_ID, - message, - ), + NetworkAction::WriteNotification(peer, peer_set, message) => { + match peer_set { + PeerSet::Validation => self.0.write_notification( + peer, + VALIDATION_PROTOCOL_ID, + message, + ), + PeerSet::Collation => self.0.write_notification( + peer, + COLLATION_PROTOCOL_ID, + message, + ), + } + } } Ok(()) @@ -197,20 +218,24 @@ impl<Net, Context> Subsystem<Context> for NetworkBridge<Net> struct PeerData { /// Latest view sent by the peer. view: View, - /// The role of the peer. - role: ObservedRole, } #[derive(Debug)] enum Action { - RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), - SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>), + SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol), + SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol), + ConnectToValidators(PeerSet, Vec<ValidatorId>, oneshot::Sender<Vec<(ValidatorId, PeerId)>>), ReportPeer(PeerId, ReputationChange), + ActiveLeaves(ActiveLeavesUpdate), - PeerConnected(PeerId, ObservedRole), - PeerDisconnected(PeerId), - PeerMessages(PeerId, Vec<WireMessage>), + PeerConnected(PeerSet, PeerId, ObservedRole), + PeerDisconnected(PeerSet, PeerId), + PeerMessages( + PeerId, + Vec<WireMessage<protocol_v1::ValidationProtocol>>, + Vec<WireMessage<protocol_v1::CollationProtocol>>, + ), Abort, Nop, @@ -224,11 +249,13 @@ fn action_from_overseer_message( => Action::ActiveLeaves(active_leaves), Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort, Ok(FromOverseer::Communication { msg }) => match msg { - NetworkBridgeMessage::RegisterEventProducer(protocol_id, message_producer) - => Action::RegisterEventProducer(protocol_id, message_producer), NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), - NetworkBridgeMessage::SendMessage(peers, protocol, message) - => Action::SendMessage(peers, protocol, message), + NetworkBridgeMessage::SendValidationMessage(peers, msg) + => Action::SendValidationMessage(peers, msg), + NetworkBridgeMessage::SendCollationMessage(peers, msg) + => Action::SendCollationMessage(peers, msg), + NetworkBridgeMessage::ConnectToValidators(peer_set, validators, res) + => Action::ConnectToValidators(peer_set, validators, res), }, Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_))) => Action::Nop, @@ -239,40 +266,55 @@ fn action_from_overseer_message( } } -fn action_from_network_message(event: Option<NetworkEvent>) -> Option<Action> { +fn action_from_network_message(event: Option<NetworkEvent>) -> Action { match event { None => { log::info!(target: TARGET, "Shutting down Network Bridge: underlying event stream concluded"); - Some(Action::Abort) + Action::Abort } - Some(NetworkEvent::Dht(_)) => None, + Some(NetworkEvent::Dht(_)) => Action::Nop, Some(NetworkEvent::NotificationStreamOpened { remote, engine_id, role }) => { - if engine_id == POLKADOT_ENGINE_ID { - Some(Action::PeerConnected(remote, role)) - } else { - None + let role = role.into(); + match engine_id { + x if x == VALIDATION_PROTOCOL_ID + => Action::PeerConnected(PeerSet::Validation, remote, role), + x if x == COLLATION_PROTOCOL_ID + => Action::PeerConnected(PeerSet::Collation, remote, role), + _ => Action::Nop, } } Some(NetworkEvent::NotificationStreamClosed { remote, engine_id }) => { - if engine_id == POLKADOT_ENGINE_ID { - Some(Action::PeerDisconnected(remote)) - } else { - None + match engine_id { + x if x == VALIDATION_PROTOCOL_ID + => Action::PeerDisconnected(PeerSet::Validation, remote), + x if x == COLLATION_PROTOCOL_ID + => Action::PeerDisconnected(PeerSet::Collation, remote), + _ => Action::Nop, } } Some(NetworkEvent::NotificationsReceived { remote, messages }) => { - let v: Result<Vec<_>, _> = messages.iter() - .filter(|(engine_id, _)| engine_id == &POLKADOT_ENGINE_ID) + let v_messages: Result<Vec<_>, _> = messages.iter() + .filter(|(engine_id, _)| engine_id == &VALIDATION_PROTOCOL_ID) + .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) + .collect(); + + let v_messages = match v_messages { + Err(_) => return Action::ReportPeer(remote, MALFORMED_MESSAGE_COST), + Ok(v) => v, + }; + + let c_messages: Result<Vec<_>, _> = messages.iter() + .filter(|(engine_id, _)| engine_id == &COLLATION_PROTOCOL_ID) .map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref())) .collect(); - match v { - Err(_) => Some(Action::ReportPeer(remote, MALFORMED_MESSAGE_COST)), - Ok(v) => if v.is_empty() { - None + match c_messages { + Err(_) => Action::ReportPeer(remote, MALFORMED_MESSAGE_COST), + Ok(c_messages) => if v_messages.is_empty() && c_messages.is_empty() { + Action::Nop } else { - Some(Action::PeerMessages(remote, v)) - } + Action::PeerMessages(remote, v_messages, c_messages) + }, } } } @@ -282,37 +324,218 @@ fn construct_view(live_heads: &[Hash]) -> View { View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect()) } -async fn dispatch_update_to_all( - update: NetworkBridgeEvent, - event_producers: impl IntoIterator<Item=&fn(NetworkBridgeEvent) -> AllMessages>, - ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>, -) -> polkadot_subsystem::SubsystemResult<()> { - // collect messages here to avoid the borrow lasting across await boundary. - let messages: Vec<_> = event_producers.into_iter() - .map(|producer| producer(update.clone())) - .collect(); - - ctx.send_messages(messages).await -} - async fn update_view( - peers: &HashMap<PeerId, PeerData>, - live_heads: &[Hash], net: &mut impl Network, + ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>, + live_heads: &[Hash], local_view: &mut View, -) -> SubsystemResult<Option<NetworkBridgeEvent>> { + validation_peers: &HashMap<PeerId, PeerData>, + collation_peers: &HashMap<PeerId, PeerData>, +) -> SubsystemResult<()> { let new_view = construct_view(live_heads); - if *local_view == new_view { return Ok(None) } + if *local_view == new_view { return Ok(()) } + *local_view = new_view.clone(); - let message = WireMessage::ViewUpdate(new_view.clone()).encode(); + send_validation_message( + net, + validation_peers.keys().cloned(), + WireMessage::ViewUpdate(new_view.clone()), + ).await?; + + send_collation_message( + net, + collation_peers.keys().cloned(), + WireMessage::ViewUpdate(new_view.clone()), + ).await?; + + if let Err(e) = dispatch_validation_event_to_all( + NetworkBridgeEvent::OurViewChange(new_view.clone()), + ctx, + ).await { + log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); + return Err(e) + } + + if let Err(e) = dispatch_collation_event_to_all( + NetworkBridgeEvent::OurViewChange(new_view.clone()), + ctx, + ).await { + log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); + return Err(e) + } + + Ok(()) +} + +// Handle messages on a specific peer-set. The peer is expected to be connected on that +// peer-set. +async fn handle_peer_messages<M>( + peer: PeerId, + peers: &mut HashMap<PeerId, PeerData>, + messages: Vec<WireMessage<M>>, + net: &mut impl Network, +) -> SubsystemResult<Vec<NetworkBridgeEvent<M>>> { + let peer_data = match peers.get_mut(&peer) { + None => { + net.report_peer(peer, UNCONNECTED_PEERSET_COST).await?; - let notifications = peers.keys().cloned() - .map(move |peer| Ok(NetworkAction::WriteNotification(peer, message.clone()))); + return Ok(Vec::new()); + }, + Some(d) => d, + }; + + let mut outgoing_messages = Vec::with_capacity(messages.len()); + for message in messages { + outgoing_messages.push(match message { + WireMessage::ViewUpdate(new_view) => { + if new_view.0.len() > MAX_VIEW_HEADS { + net.report_peer( + peer.clone(), + MALFORMED_VIEW_COST, + ).await?; + + continue + } else if new_view == peer_data.view { + continue + } else { + peer_data.view = new_view; - net.action_sink().send_all(&mut stream::iter(notifications)).await?; + NetworkBridgeEvent::PeerViewChange( + peer.clone(), + peer_data.view.clone(), + ) + } + } + WireMessage::ProtocolMessage(message) => { + NetworkBridgeEvent::PeerMessage(peer.clone(), message) + } + }) + } - Ok(Some(NetworkBridgeEvent::OurViewChange(local_view.clone()))) + Ok(outgoing_messages) +} + +async fn send_validation_message<I>( + net: &mut impl Network, + peers: I, + message: WireMessage<protocol_v1::ValidationProtocol>, +) -> SubsystemResult<()> + where + I: IntoIterator<Item=PeerId>, + I::IntoIter: ExactSizeIterator, +{ + send_message(net, peers, PeerSet::Validation, message).await +} + +async fn send_collation_message<I>( + net: &mut impl Network, + peers: I, + message: WireMessage<protocol_v1::CollationProtocol>, +) -> SubsystemResult<()> + where + I: IntoIterator<Item=PeerId>, + I::IntoIter: ExactSizeIterator, +{ + send_message(net, peers, PeerSet::Collation, message).await +} + +async fn send_message<M, I>( + net: &mut impl Network, + peers: I, + peer_set: PeerSet, + message: WireMessage<M>, +) -> SubsystemResult<()> + where + M: Encode + Clone, + I: IntoIterator<Item=PeerId>, + I::IntoIter: ExactSizeIterator, +{ + let mut message_producer = stream::iter({ + let peers = peers.into_iter(); + let n_peers = peers.len(); + let mut message = Some(message.encode()); + + peers.enumerate().map(move |(i, peer)| { + // optimization: avoid cloning the message for the last peer in the + // list. The message payload can be quite large. If the underlying + // network used `Bytes` this would not be necessary. + let message = if i == n_peers - 1 { + message.take() + .expect("Only taken in last iteration of loop, never afterwards; qed") + } else { + message.as_ref() + .expect("Only taken in last iteration of loop, we are not there yet; qed") + .clone() + }; + + Ok(NetworkAction::WriteNotification(peer, peer_set, message)) + }) + }); + + net.action_sink().send_all(&mut message_producer).await +} + +async fn dispatch_validation_event_to_all( + event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>, + ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>, +) -> SubsystemResult<()> { + dispatch_validation_events_to_all(std::iter::once(event), ctx).await +} + +async fn dispatch_collation_event_to_all( + event: NetworkBridgeEvent<protocol_v1::CollationProtocol>, + ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>, +) -> SubsystemResult<()> { + dispatch_collation_events_to_all(std::iter::once(event), ctx).await +} + +async fn dispatch_validation_events_to_all<I>( + events: I, + ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>, +) -> SubsystemResult<()> + where + I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::ValidationProtocol>>, + I::IntoIter: Send, +{ + let messages_for = |event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>| { + let a = std::iter::once(event.focus().ok().map(|m| AllMessages::AvailabilityDistribution( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1(m) + ))); + + let b = std::iter::once(event.focus().ok().map(|m| AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdateV1(m) + ))); + + let p = std::iter::once(event.focus().ok().map(|m| AllMessages::PoVDistribution( + PoVDistributionMessage::NetworkBridgeUpdateV1(m) + ))); + + let s = std::iter::once(event.focus().ok().map(|m| AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdateV1(m) + ))); + + a.chain(b).chain(p).chain(s).filter_map(|x| x) + }; + + ctx.send_messages(events.into_iter().flat_map(messages_for)).await +} + +async fn dispatch_collation_events_to_all<I>( + events: I, + ctx: &mut impl SubsystemContext<Message=NetworkBridgeMessage>, +) -> SubsystemResult<()> + where + I: IntoIterator<Item = NetworkBridgeEvent<protocol_v1::CollationProtocol>>, + I::IntoIter: Send, +{ + let messages_for = |event: NetworkBridgeEvent<protocol_v1::CollationProtocol>| { + event.focus().ok().map(|m| AllMessages::CollatorProtocol( + CollatorProtocolMessage::NetworkBridgeUpdateV1(m) + )) + }; + + ctx.send_messages(events.into_iter().flat_map(messages_for)).await } async fn run_network<N: Network>( @@ -322,11 +545,11 @@ async fn run_network<N: Network>( let mut event_stream = net.event_stream().fuse(); // Most recent heads are at the back. - let mut live_heads = Vec::with_capacity(MAX_VIEW_HEADS); + let mut live_heads: Vec<Hash> = Vec::with_capacity(MAX_VIEW_HEADS); let mut local_view = View(Vec::new()); - let mut peers: HashMap<PeerId, PeerData> = HashMap::new(); - let mut event_producers = BTreeMap::new(); + let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new(); + let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new(); loop { let action = { @@ -334,178 +557,161 @@ async fn run_network<N: Network>( let mut net_event_next = event_stream.next().fuse(); futures::pin_mut!(subsystem_next); - let action = futures::select! { - subsystem_msg = subsystem_next => Some(action_from_overseer_message(subsystem_msg)), + futures::select! { + subsystem_msg = subsystem_next => action_from_overseer_message(subsystem_msg), net_event = net_event_next => action_from_network_message(net_event), - }; - - match action { - Some(a) => a, - None => continue, } }; match action { - Action::RegisterEventProducer(protocol_id, event_producer) => { - // insert only if none present. - if let BEntry::Vacant(entry) = event_producers.entry(protocol_id) { - let event_producer = entry.insert(event_producer); - - // send the event producer information on all connected peers. - let mut messages = Vec::with_capacity(peers.len() * 2); - for (peer, data) in &peers { - messages.push(event_producer( - NetworkBridgeEvent::PeerConnected(peer.clone(), data.role.clone()) - )); - - messages.push(event_producer( - NetworkBridgeEvent::PeerViewChange(peer.clone(), data.view.clone()) - )); - } + Action::Nop => {} + Action::Abort => return Ok(()), - ctx.send_messages(messages).await?; - } + Action::SendValidationMessage(peers, msg) => send_message( + &mut net, + peers, + PeerSet::Validation, + WireMessage::ProtocolMessage(msg), + ).await?, + + Action::SendCollationMessage(peers, msg) => send_message( + &mut net, + peers, + PeerSet::Collation, + WireMessage::ProtocolMessage(msg), + ).await?, + + Action::ConnectToValidators(_peer_set, _validators, _res) => { + // TODO: https://github.com/paritytech/polkadot/issues/1461 } - Action::SendMessage(peers, protocol, message) => { - let mut message_producer = stream::iter({ - let n_peers = peers.len(); - let mut message = Some( - WireMessage::ProtocolMessage(protocol, message).encode() - ); - - peers.iter().cloned().enumerate().map(move |(i, peer)| { - // optimization: avoid cloning the message for the last peer in the - // list. The message payload can be quite large. If the underlying - // network used `Bytes` this would not be necessary. - let message = if i == n_peers - 1 { - message.take() - .expect("Only taken in last iteration of loop, never afterwards; qed") - } else { - message.as_ref() - .expect("Only taken in last iteration of loop, we are not there yet; qed") - .clone() - }; - Ok(NetworkAction::WriteNotification(peer, message)) - }) - }); + Action::ReportPeer(peer, rep) => net.report_peer(peer, rep).await?, - net.action_sink().send_all(&mut message_producer).await?; - } - Action::ReportPeer(peer, rep) => { - net.report_peer(peer, rep).await?; - } Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { live_heads.extend(activated); live_heads.retain(|h| !deactivated.contains(h)); - if let Some(view_update) - = update_view(&peers, &live_heads, &mut net, &mut local_view).await? - { - if let Err(e) = dispatch_update_to_all( - view_update, - event_producers.values(), - &mut ctx, - ).await { - log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); - return Err(e) - } - } + update_view( + &mut net, + &mut ctx, + &live_heads, + &mut local_view, + &validation_peers, + &collation_peers, + ).await?; } - Action::PeerConnected(peer, role) => { - match peers.entry(peer.clone()) { + + Action::PeerConnected(peer_set, peer, role) => { + let peer_map = match peer_set { + PeerSet::Validation => &mut validation_peers, + PeerSet::Collation => &mut collation_peers, + }; + + match peer_map.entry(peer.clone()) { HEntry::Occupied(_) => continue, HEntry::Vacant(vacant) => { vacant.insert(PeerData { view: View(Vec::new()), - role: role.clone(), }); - if let Err(e) = dispatch_update_to_all( - NetworkBridgeEvent::PeerConnected(peer, role), - event_producers.values(), - &mut ctx, - ).await { + let res = match peer_set { + PeerSet::Validation => dispatch_validation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerViewChange( + peer, + View(Default::default()), + ), + ], + &mut ctx, + ).await, + PeerSet::Collation => dispatch_collation_events_to_all( + vec![ + NetworkBridgeEvent::PeerConnected(peer.clone(), role), + NetworkBridgeEvent::PeerViewChange( + peer, + View(Default::default()), + ), + ], + &mut ctx, + ).await, + }; + + if let Err(e) = res { log::warn!("Aborting - Failure to dispatch messages to overseer"); - return Err(e) + return Err(e); } } } } - Action::PeerDisconnected(peer) => { - if peers.remove(&peer).is_some() { - if let Err(e) = dispatch_update_to_all( - NetworkBridgeEvent::PeerDisconnected(peer), - event_producers.values(), - &mut ctx, - ).await { - log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); + Action::PeerDisconnected(peer_set, peer) => { + let peer_map = match peer_set { + PeerSet::Validation => &mut validation_peers, + PeerSet::Collation => &mut collation_peers, + }; + + if peer_map.remove(&peer).is_some() { + let res = match peer_set { + PeerSet::Validation => dispatch_validation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut ctx, + ).await, + PeerSet::Collation => dispatch_collation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut ctx, + ).await, + }; + + if let Err(e) = res { + log::warn!( + target: TARGET, + "Aborting - Failure to dispatch messages to overseer", + ); return Err(e) } } }, - Action::PeerMessages(peer, messages) => { - let peer_data = match peers.get_mut(&peer) { - None => continue, - Some(d) => d, - }; - - let mut outgoing_messages = Vec::with_capacity(messages.len()); - for message in messages { - match message { - WireMessage::ViewUpdate(new_view) => { - if new_view.0.len() > MAX_VIEW_HEADS { - net.report_peer( - peer.clone(), - MALFORMED_VIEW_COST, - ).await?; - - continue - } - - if new_view == peer_data.view { continue } - peer_data.view = new_view; - - let update = NetworkBridgeEvent::PeerViewChange( - peer.clone(), - peer_data.view.clone(), - ); - - outgoing_messages.extend( - event_producers.values().map(|producer| producer(update.clone())) - ); - } - WireMessage::ProtocolMessage(protocol, message) => { - let message = match event_producers.get(&protocol) { - Some(producer) => Some(producer( - NetworkBridgeEvent::PeerMessage(peer.clone(), message) - )), - None => { - net.report_peer( - peer.clone(), - UNKNOWN_PROTO_COST, - ).await?; - - None - } - }; - - if let Some(message) = message { - outgoing_messages.push(message); - } - } + Action::PeerMessages(peer, v_messages, c_messages) => { + if !v_messages.is_empty() { + let events = handle_peer_messages( + peer.clone(), + &mut validation_peers, + v_messages, + &mut net, + ).await?; + + if let Err(e) = dispatch_validation_events_to_all( + events, + &mut ctx, + ).await { + log::warn!( + target: TARGET, + "Aborting - Failure to dispatch messages to overseer", + ); + return Err(e) } } - let send_messages = ctx.send_messages(outgoing_messages); - if let Err(e) = send_messages.await { - log::warn!(target: TARGET, "Aborting - Failure to dispatch messages to overseer"); - return Err(e) + if !c_messages.is_empty() { + let events = handle_peer_messages( + peer.clone(), + &mut collation_peers, + c_messages, + &mut net, + ).await?; + + if let Err(e) = dispatch_collation_events_to_all( + events, + &mut ctx, + ).await { + log::warn!( + target: TARGET, + "Aborting - Failure to dispatch messages to overseer", + ); + return Err(e) + } } }, - - Action::Abort => return Ok(()), - Action::Nop => (), } } } @@ -521,7 +727,10 @@ mod tests { use assert_matches::assert_matches; use polkadot_subsystem::messages::{StatementDistributionMessage, BitfieldDistributionMessage}; - use polkadot_node_subsystem_test_helpers::{SingleItemSink, SingleItemStream}; + use polkadot_node_subsystem_test_helpers::{ + SingleItemSink, SingleItemStream, TestSubsystemContextHandle, + }; + use sp_keyring::Sr25519Keyring; // The subsystem's view of the network - only supports a single call to `event_stream`. struct TestNetwork { @@ -555,6 +764,13 @@ mod tests { ) } + fn peer_set_engine_id(peer_set: PeerSet) -> ConsensusEngineId { + match peer_set { + PeerSet::Validation => VALIDATION_PROTOCOL_ID, + PeerSet::Collation => COLLATION_PROTOCOL_ID, + } + } + impl Network for TestNetwork { fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> { self.net_events.lock() @@ -586,25 +802,25 @@ mod tests { v } - async fn connect_peer(&mut self, peer: PeerId, role: ObservedRole) { + async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) { self.send_network_event(NetworkEvent::NotificationStreamOpened { remote: peer, - engine_id: POLKADOT_ENGINE_ID, - role, + engine_id: peer_set_engine_id(peer_set), + role: role.into(), }).await; } - async fn disconnect_peer(&mut self, peer: PeerId) { + async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) { self.send_network_event(NetworkEvent::NotificationStreamClosed { remote: peer, - engine_id: POLKADOT_ENGINE_ID, + engine_id: peer_set_engine_id(peer_set), }).await; } - async fn peer_message(&mut self, peer: PeerId, message: Vec<u8>) { + async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec<u8>) { self.send_network_event(NetworkEvent::NotificationsReceived { remote: peer, - messages: vec![(POLKADOT_ENGINE_ID, message.into())], + messages: vec![(peer_set_engine_id(peer_set), message.into())], }).await; } @@ -621,7 +837,7 @@ mod tests { struct TestHarness { network_handle: TestNetworkHandle, - virtual_overseer: polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle<NetworkBridgeMessage>, + virtual_overseer: TestSubsystemContextHandle<NetworkBridgeMessage>, } fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) { @@ -647,6 +863,51 @@ mod tests { executor::block_on(future::select(test_fut, network_bridge)); } + async fn assert_sends_validation_event_to_all( + event: NetworkBridgeEvent<protocol_v1::ValidationProtocol>, + virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeMessage>, + ) { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::AvailabilityDistribution( + AvailabilityDistributionMessage::NetworkBridgeUpdateV1(e) + ) if e == event.focus().expect("could not focus message") + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::BitfieldDistribution( + BitfieldDistributionMessage::NetworkBridgeUpdateV1(e) + ) if e == event.focus().expect("could not focus message") + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::PoVDistribution( + PoVDistributionMessage::NetworkBridgeUpdateV1(e) + ) if e == event.focus().expect("could not focus message") + ); + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::StatementDistribution( + StatementDistributionMessage::NetworkBridgeUpdateV1(e) + ) if e == event.focus().expect("could not focus message") + ); + } + + async fn assert_sends_collation_event_to_all( + event: NetworkBridgeEvent<protocol_v1::CollationProtocol>, + virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeMessage>, + ) { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::CollatorProtocol( + CollatorProtocolMessage::NetworkBridgeUpdateV1(e) + ) if e == event.focus().expect("could not focus message") + ) + } + #[test] fn sends_view_updates_to_peers() { test_harness(|test_harness| async move { @@ -655,23 +916,44 @@ mod tests { let peer_a = PeerId::random(); let peer_b = PeerId::random(); - network_handle.connect_peer(peer_a.clone(), ObservedRole::Full).await; - network_handle.connect_peer(peer_b.clone(), ObservedRole::Full).await; + network_handle.connect_peer( + peer_a.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; + network_handle.connect_peer( + peer_b.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; let hash_a = Hash::from([1; 32]); - virtual_overseer.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a)))).await; + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a))) + ).await; let actions = network_handle.next_network_actions(2).await; - let wire_message = WireMessage::ViewUpdate(View(vec![hash_a])).encode(); + let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate( + View(vec![hash_a]) + ).encode(); + assert!(network_actions_contains( &actions, - &NetworkAction::WriteNotification(peer_a, wire_message.clone()), + &NetworkAction::WriteNotification( + peer_a, + PeerSet::Validation, + wire_message.clone(), + ), )); assert!(network_actions_contains( &actions, - &NetworkAction::WriteNotification(peer_b, wire_message.clone()), + &NetworkAction::WriteNotification( + peer_b, + PeerSet::Validation, + wire_message.clone(), + ), )); }); } @@ -686,106 +968,108 @@ mod tests { let peer = PeerId::random(); - let proto_statement = *b"abcd"; - let proto_bitfield = *b"wxyz"; + network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await; - network_handle.connect_peer(peer.clone(), ObservedRole::Full).await; + let view = View(vec![Hash::from([1u8; 32])]); - virtual_overseer.send(FromOverseer::Communication { - msg: NetworkBridgeMessage::RegisterEventProducer( - proto_statement, - |event| AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate(event) - ) - ), - }).await; + // bridge will inform about all connected peers. + { + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; + } - virtual_overseer.send(FromOverseer::Communication { - msg: NetworkBridgeMessage::RegisterEventProducer( - proto_bitfield, - |event| AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::NetworkBridgeUpdate(event) - ) - ), - }).await; + network_handle.peer_message( + peer.clone(), + PeerSet::Validation, + WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate( + view.clone(), + ).encode(), + ).await; - let view = View(vec![Hash::from([1u8; 32])]); + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), view), + &mut virtual_overseer, + ).await; + }); + } - // bridge will inform about all previously-connected peers. - { - assert_matches!( - virtual_overseer.recv().await, - AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) - ) - ) if p == peer - ); + #[test] + fn peer_messages_sent_via_overseer() { + test_harness(|test_harness| async move { + let TestHarness { + mut network_handle, + mut virtual_overseer, + } = test_harness; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerViewChange(p, v) - ) - ) if p == peer && v == View(Default::default()) - ); + let peer = PeerId::random(); - assert_matches!( - virtual_overseer.recv().await, - AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) - ) - ) if p == peer - ); + network_handle.connect_peer( + peer.clone(), + PeerSet::Validation, + ObservedRole::Full, + ).await; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerViewChange(p, v) - ) - ) if p == peer && v == View(Default::default()) - ); + // bridge will inform about all connected peers. + { + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; } + let pov_distribution_message = protocol_v1::PoVDistributionMessage::Awaiting( + [0; 32].into(), + vec![[1; 32].into()], + ); + + let message = protocol_v1::ValidationProtocol::PoVDistribution( + pov_distribution_message.clone(), + ); + network_handle.peer_message( peer.clone(), - WireMessage::ViewUpdate(view.clone()).encode(), + PeerSet::Validation, + WireMessage::ProtocolMessage(message.clone()).encode(), ).await; - // statement distribution message comes first because handlers are ordered by - // protocol ID. + network_handle.disconnect_peer(peer.clone(), PeerSet::Validation).await; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerViewChange(p, v) - ) - ) => { - assert_eq!(p, peer); - assert_eq!(v, view); - } - ); + // PoV distribution message comes first, and the message is only sent to that subsystem. + // then a disconnection event arises that is sent to all validation networking subsystems. assert_matches!( virtual_overseer.recv().await, - AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerViewChange(p, v) + AllMessages::PoVDistribution( + PoVDistributionMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage(p, m) ) ) => { assert_eq!(p, peer); - assert_eq!(v, view); + assert_eq!(m, pov_distribution_message); } ); + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer), + &mut virtual_overseer, + ).await; }); } #[test] - fn peer_messages_sent_via_overseer() { + fn peer_disconnect_from_just_one_peerset() { test_harness(|test_harness| async move { let TestHarness { mut network_handle, @@ -794,103 +1078,310 @@ mod tests { let peer = PeerId::random(); - let proto_statement = *b"abcd"; - let proto_bitfield = *b"wxyz"; + network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await; + network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await; - network_handle.connect_peer(peer.clone(), ObservedRole::Full).await; + // bridge will inform about all connected peers. + { + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; + } - virtual_overseer.send(FromOverseer::Communication { - msg: NetworkBridgeMessage::RegisterEventProducer( - proto_statement, - |event| AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate(event) - ) - ), - }).await; + { + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; + } - virtual_overseer.send(FromOverseer::Communication { - msg: NetworkBridgeMessage::RegisterEventProducer( - proto_bitfield, - |event| AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::NetworkBridgeUpdate(event) - ) + network_handle.disconnect_peer(peer.clone(), PeerSet::Validation).await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerDisconnected(peer.clone()), + &mut virtual_overseer, + ).await; + + // to show that we're still connected on the collation protocol, send a view update. + + let hash_a = Hash::from([1; 32]); + + virtual_overseer.send( + FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(hash_a))) + ).await; + + let actions = network_handle.next_network_actions(1).await; + let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate( + View(vec![hash_a]) + ).encode(); + + assert!(network_actions_contains( + &actions, + &NetworkAction::WriteNotification( + peer.clone(), + PeerSet::Collation, + wire_message.clone(), ), - }).await; + )); + }); + } - // bridge will inform about all previously-connected peers. - { - assert_matches!( - virtual_overseer.recv().await, - AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) - ) - ) if p == peer - ); + #[test] + fn relays_collation_protocol_messages() { + test_harness(|test_harness| async move { + let TestHarness { + mut network_handle, + mut virtual_overseer, + } = test_harness; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerViewChange(p, v) - ) - ) if p == peer && v == View(Default::default()) - ); + let peer_a = PeerId::random(); + let peer_b = PeerId::random(); - assert_matches!( - virtual_overseer.recv().await, - AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerConnected(p, ObservedRole::Full) - ) - ) if p == peer - ); + network_handle.connect_peer(peer_a.clone(), PeerSet::Validation, ObservedRole::Full).await; + network_handle.connect_peer(peer_b.clone(), PeerSet::Collation, ObservedRole::Full).await; - assert_matches!( - virtual_overseer.recv().await, - AllMessages::BitfieldDistribution( - BitfieldDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerViewChange(p, v) - ) - ) if p == peer && v == View(Default::default()) - ); + // bridge will inform about all connected peers. + { + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer_a.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; } - let payload = vec![1, 2, 3]; + { + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer_b.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; + } + + // peer A gets reported for sending a collation message. + + let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare( + Sr25519Keyring::Alice.public().into() + ); + + let message = protocol_v1::CollationProtocol::CollatorProtocol( + collator_protocol_message.clone() + ); network_handle.peer_message( - peer.clone(), - WireMessage::ProtocolMessage(proto_statement, payload.clone()).encode(), + peer_a.clone(), + PeerSet::Collation, + WireMessage::ProtocolMessage(message.clone()).encode(), ).await; - network_handle.disconnect_peer(peer.clone()).await; + let actions = network_handle.next_network_actions(1).await; + assert!(network_actions_contains( + &actions, + &NetworkAction::ReputationChange( + peer_a.clone(), + UNCONNECTED_PEERSET_COST, + ), + )); + + // peer B has the message relayed. - // statement distribution message comes first because handlers are ordered by - // protocol ID, and then a disconnection event comes - indicating that the message - // was only sent to the correct protocol. + network_handle.peer_message( + peer_b.clone(), + PeerSet::Collation, + WireMessage::ProtocolMessage(message.clone()).encode(), + ).await; assert_matches!( virtual_overseer.recv().await, - AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate( + AllMessages::CollatorProtocol( + CollatorProtocolMessage::NetworkBridgeUpdateV1( NetworkBridgeEvent::PeerMessage(p, m) ) ) => { - assert_eq!(p, peer); - assert_eq!(m, payload); + assert_eq!(p, peer_b); + assert_eq!(m, collator_protocol_message); } ); + }); + } - assert_matches!( - virtual_overseer.recv().await, - AllMessages::StatementDistribution( - StatementDistributionMessage::NetworkBridgeUpdate( - NetworkBridgeEvent::PeerDisconnected(p) + #[test] + fn different_views_on_different_peer_sets() { + test_harness(|test_harness| async move { + let TestHarness { + mut network_handle, + mut virtual_overseer, + } = test_harness; + + let peer = PeerId::random(); + + network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await; + network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await; + + // bridge will inform about all connected peers. + { + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; + } + + { + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; + } + + let view_a = View(vec![[1; 32].into()]); + let view_b = View(vec![[2; 32].into()]); + + network_handle.peer_message( + peer.clone(), + PeerSet::Validation, + WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(view_a.clone()).encode(), + ).await; + + network_handle.peer_message( + peer.clone(), + PeerSet::Collation, + WireMessage::<protocol_v1::CollationProtocol>::ViewUpdate(view_b.clone()).encode(), + ).await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), view_a.clone()), + &mut virtual_overseer, + ).await; + + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), view_b.clone()), + &mut virtual_overseer, + ).await; + }); + } + + #[test] + fn send_messages_to_peers() { + test_harness(|test_harness| async move { + let TestHarness { + mut network_handle, + mut virtual_overseer, + } = test_harness; + + let peer = PeerId::random(); + + network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await; + network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await; + + // bridge will inform about all connected peers. + { + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_validation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; + } + + { + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full), + &mut virtual_overseer, + ).await; + + assert_sends_collation_event_to_all( + NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())), + &mut virtual_overseer, + ).await; + } + + // send a validation protocol message. + + { + let pov_distribution_message = protocol_v1::PoVDistributionMessage::Awaiting( + [0; 32].into(), + vec![[1; 32].into()], + ); + + let message = protocol_v1::ValidationProtocol::PoVDistribution( + pov_distribution_message.clone(), + ); + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::SendValidationMessage( + vec![peer.clone()], + message.clone(), ) - ) => { - assert_eq!(p, peer); - } - ); + }).await; + + assert_eq!( + network_handle.next_network_action().await, + NetworkAction::WriteNotification( + peer.clone(), + PeerSet::Validation, + WireMessage::ProtocolMessage(message).encode(), + ) + ); + } + + // send a collation protocol message. + + { + let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare( + Sr25519Keyring::Alice.public().into() + ); + + let message = protocol_v1::CollationProtocol::CollatorProtocol( + collator_protocol_message.clone() + ); + + virtual_overseer.send(FromOverseer::Communication { + msg: NetworkBridgeMessage::SendCollationMessage( + vec![peer.clone()], + message.clone(), + ) + }).await; + + assert_eq!( + network_handle.next_network_action().await, + NetworkAction::WriteNotification( + peer.clone(), + PeerSet::Collation, + WireMessage::ProtocolMessage(message).encode(), + ) + ); + } }); } } diff --git a/polkadot/node/network/pov-distribution/Cargo.toml b/polkadot/node/network/pov-distribution/Cargo.toml index 74a566f35dce5681d00269070f7b3052df1de77a..cedc94732cfb832cc7206bc977815a078f95d081 100644 --- a/polkadot/node/network/pov-distribution/Cargo.toml +++ b/polkadot/node/network/pov-distribution/Cargo.toml @@ -12,9 +12,9 @@ streamunordered = "0.5.1" polkadot-primitives = { path = "../../../primitives" } node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" } parity-scale-codec = "1.3.4" -sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-node-network-protocol = { path = "../../network/protocol" } [dev-dependencies] parking_lot = "0.10.0" diff --git a/polkadot/node/network/pov-distribution/src/lib.rs b/polkadot/node/network/pov-distribution/src/lib.rs index 9589e2fad95aa1e5a8de4ddcb1daa5e6e52ac74d..994090c0d588333b2d61129dc507192d7f73cbe2 100644 --- a/polkadot/node/network/pov-distribution/src/lib.rs +++ b/polkadot/node/network/pov-distribution/src/lib.rs @@ -24,21 +24,20 @@ use polkadot_subsystem::{ ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, FromOverseer, SpawnedSubsystem, }; use polkadot_subsystem::messages::{ - PoVDistributionMessage, NetworkBridgeEvent, ReputationChange as Rep, PeerId, - RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, + PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage, +}; +use polkadot_node_network_protocol::{ + v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View, }; -use node_primitives::{View, ProtocolId}; use futures::prelude::*; use futures::channel::oneshot; -use parity_scale_codec::{Encode, Decode}; use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::sync::Arc; const COST_APPARENT_FLOOD: Rep = Rep::new(-500, "Peer appears to be flooding us with PoV requests"); const COST_UNEXPECTED_POV: Rep = Rep::new(-500, "Peer sent us an unexpected PoV"); -const COST_MALFORMED_MESSAGE: Rep = Rep::new(-500, "Peer sent us a malformed message"); const COST_AWAITED_NOT_IN_VIEW: Rep = Rep::new(-100, "Peer claims to be awaiting something outside of its view"); @@ -46,20 +45,6 @@ const BENEFIT_FRESH_POV: Rep = Rep::new(25, "Peer supplied us with an awaited Po const BENEFIT_LATE_POV: Rep = Rep::new(10, "Peer supplied us with an awaited PoV, \ but was not the first to do so"); -const PROTOCOL_V1: ProtocolId = *b"pvd1"; - -#[derive(Encode, Decode)] -enum WireMessage { - /// Notification that we are awaiting the given PoVs (by hash) against a - /// specific relay-parent hash. - #[codec(index = "0")] - Awaiting(Hash, Vec<Hash>), - /// Notification of an awaited PoV, in a given relay-parent context. - /// (relay_parent, pov_hash, pov) - #[codec(index = "1")] - SendPoV(Hash, Hash, PoV), -} - /// The PoV Distribution Subsystem. pub struct PoVDistribution; @@ -98,6 +83,22 @@ struct PeerState { awaited: HashMap<Hash, HashSet<Hash>>, } +fn awaiting_message(relay_parent: Hash, awaiting: Vec<Hash>) + -> protocol_v1::ValidationProtocol +{ + protocol_v1::ValidationProtocol::PoVDistribution( + protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, awaiting) + ) +} + +fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV) + -> protocol_v1::ValidationProtocol +{ + protocol_v1::ValidationProtocol::PoVDistribution( + protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) + ) +} + /// Handles the signal. If successful, returns `true` if the subsystem should conclude, /// `false` otherwise. async fn handle_signal( @@ -169,11 +170,10 @@ async fn notify_all_we_are_awaiting( if peers_to_send.is_empty() { return Ok(()) } - let payload = WireMessage::Awaiting(relay_parent, vec![pov_hash]).encode(); + let payload = awaiting_message(relay_parent, vec![pov_hash]); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, - PROTOCOL_V1, payload, ))).await } @@ -194,11 +194,10 @@ async fn notify_one_we_are_awaiting_many( if awaiting_hashes.is_empty() { return Ok(()) } - let payload = WireMessage::Awaiting(relay_parent, awaiting_hashes).encode(); + let payload = awaiting_message(relay_parent, awaiting_hashes); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( vec![peer.clone()], - PROTOCOL_V1, payload, ))).await } @@ -226,11 +225,10 @@ async fn distribute_to_awaiting( if peers_to_send.is_empty() { return Ok(()) } - let payload = WireMessage::SendPoV(relay_parent, pov_hash, pov.clone()).encode(); + let payload = send_pov_message(relay_parent, pov_hash, pov.clone()); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send, - PROTOCOL_V1, payload, ))).await } @@ -361,11 +359,10 @@ async fn handle_awaiting( // For all requested PoV hashes, if we have it, we complete the request immediately. // Otherwise, we note that the peer is awaiting the PoV. if let Some(pov) = relay_parent_state.known.get(&pov_hash) { - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( - vec![peer.clone()], - PROTOCOL_V1, - WireMessage::SendPoV(relay_parent, pov_hash, (&**pov).clone()).encode(), - ))).await?; + let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone()); + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) + )).await?; } else { peer_awaiting.insert(pov_hash); } @@ -449,7 +446,7 @@ async fn handle_incoming_pov( async fn handle_network_update( state: &mut State, ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>, - update: NetworkBridgeEvent, + update: NetworkBridgeEvent<protocol_v1::PoVDistributionMessage>, ) -> SubsystemResult<()> { match update { NetworkBridgeEvent::PeerConnected(peer, _observed_role) => { @@ -483,17 +480,18 @@ async fn handle_network_update( Ok(()) } - NetworkBridgeEvent::PeerMessage(peer, bytes) => { - match WireMessage::decode(&mut &bytes[..]) { - Ok(msg) => match msg { - WireMessage::Awaiting(relay_parent, pov_hashes) => handle_awaiting( + NetworkBridgeEvent::PeerMessage(peer, message) => { + match message { + protocol_v1::PoVDistributionMessage::Awaiting(relay_parent, pov_hashes) + => handle_awaiting( state, ctx, peer, relay_parent, pov_hashes, ).await, - WireMessage::SendPoV(relay_parent, pov_hash, pov) => handle_incoming_pov( + protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov) + => handle_incoming_pov( state, ctx, peer, @@ -501,11 +499,6 @@ async fn handle_network_update( pov_hash, pov, ).await, - }, - Err(_) => { - report_peer(ctx, peer, COST_MALFORMED_MESSAGE).await?; - Ok(()) - } } } NetworkBridgeEvent::OurViewChange(view) => { @@ -515,19 +508,9 @@ async fn handle_network_update( } } -fn network_update_message(update: NetworkBridgeEvent) -> AllMessages { - AllMessages::PoVDistribution(PoVDistributionMessage::NetworkBridgeUpdate(update)) -} - async fn run( mut ctx: impl SubsystemContext<Message = PoVDistributionMessage>, ) -> SubsystemResult<()> { - // startup: register the network protocol with the bridge. - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::RegisterEventProducer( - PROTOCOL_V1, - network_update_message, - ))).await?; - let mut state = State { relay_parent_state: HashMap::new(), peer_state: HashMap::new(), @@ -556,7 +539,7 @@ async fn run( descriptor, pov, ).await?, - PoVDistributionMessage::NetworkBridgeUpdate(event) => + PoVDistributionMessage::NetworkBridgeUpdateV1(event) => handle_network_update( &mut state, &mut ctx, @@ -661,13 +644,12 @@ mod tests { assert_matches!( handle.recv().await, AllMessages::NetworkBridge( - NetworkBridgeMessage::SendMessage(peers, protocol, message) + NetworkBridgeMessage::SendValidationMessage(peers, message) ) => { assert_eq!(peers, vec![peer_a.clone()]); - assert_eq!(protocol, PROTOCOL_V1); assert_eq!( message, - WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + send_pov_message(hash_a, pov_hash, pov.clone()), ); } ) @@ -737,13 +719,12 @@ mod tests { assert_matches!( handle.recv().await, AllMessages::NetworkBridge( - NetworkBridgeMessage::SendMessage(peers, protocol, message) + NetworkBridgeMessage::SendValidationMessage(peers, message) ) => { assert_eq!(peers, vec![peer_a.clone()]); - assert_eq!(protocol, PROTOCOL_V1); assert_eq!( message, - WireMessage::Awaiting(hash_a, vec![pov_hash]).encode(), + awaiting_message(hash_a, vec![pov_hash]), ); } ) @@ -809,13 +790,12 @@ mod tests { assert_matches!( handle.recv().await, AllMessages::NetworkBridge( - NetworkBridgeMessage::SendMessage(peers, protocol, message) + NetworkBridgeMessage::SendValidationMessage(peers, message) ) => { assert_eq!(peers, vec![peer_a.clone()]); - assert_eq!(protocol, PROTOCOL_V1); assert_eq!( message, - WireMessage::Awaiting(hash_a, vec![pov_a_hash]).encode(), + awaiting_message(hash_a, vec![pov_a_hash]), ); } ) @@ -878,8 +858,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), - ), + send_pov_message(hash_a, pov_hash, pov.clone()), + ).focus().unwrap(), ).await.unwrap(); handle_network_update( @@ -887,8 +867,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_b.clone(), - WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), - ), + send_pov_message(hash_a, pov_hash, pov.clone()), + ).focus().unwrap(), ).await.unwrap(); assert_eq!(&*pov_recv.await.unwrap(), &pov); @@ -966,8 +946,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::SendPoV(hash_a, pov_hash, bad_pov.clone()).encode(), - ), + send_pov_message(hash_a, pov_hash, bad_pov.clone()), + ).focus().unwrap(), ).await.unwrap(); // didn't complete our sender. @@ -1029,8 +1009,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), - ), + send_pov_message(hash_a, pov_hash, pov.clone()), + ).focus().unwrap(), ).await.unwrap(); assert_matches!( @@ -1090,8 +1070,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::SendPoV(hash_b, pov_hash, pov.clone()).encode(), - ), + send_pov_message(hash_b, pov_hash, pov.clone()), + ).focus().unwrap(), ).await.unwrap(); assert_matches!( @@ -1152,8 +1132,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::Awaiting(hash_a, vec![pov_hash]).encode(), - ), + awaiting_message(hash_a, vec![pov_hash]), + ).focus().unwrap(), ).await.unwrap(); } @@ -1166,8 +1146,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::Awaiting(hash_a, vec![last_pov_hash]).encode(), - ), + awaiting_message(hash_a, vec![last_pov_hash]), + ).focus().unwrap(), ).await.unwrap(); // No more bookkeeping for you! @@ -1235,8 +1215,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::Awaiting(hash_b, vec![pov_hash]).encode(), - ), + awaiting_message(hash_b, vec![pov_hash]), + ).focus().unwrap(), ).await.unwrap(); assert!(state.peer_state[&peer_a].awaited.get(&hash_b).is_none()); @@ -1297,8 +1277,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::Awaiting(hash_b, vec![pov_hash]).encode(), - ), + awaiting_message(hash_b, vec![pov_hash]), + ).focus().unwrap(), ).await.unwrap(); // Illegal `awaited` is ignored. @@ -1371,8 +1351,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), - ), + send_pov_message(hash_a, pov_hash, pov.clone()), + ).focus().unwrap(), ).await.unwrap(); assert_eq!(&*pov_recv.await.unwrap(), &pov); @@ -1390,13 +1370,12 @@ mod tests { assert_matches!( handle.recv().await, AllMessages::NetworkBridge( - NetworkBridgeMessage::SendMessage(peers, protocol, message) + NetworkBridgeMessage::SendValidationMessage(peers, message) ) => { assert_eq!(peers, vec![peer_b.clone()]); - assert_eq!(protocol, PROTOCOL_V1); assert_eq!( message, - WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), + send_pov_message(hash_a, pov_hash, pov.clone()), ); } ); @@ -1454,8 +1433,8 @@ mod tests { &mut ctx, NetworkBridgeEvent::PeerMessage( peer_a.clone(), - WireMessage::SendPoV(hash_a, pov_hash, pov.clone()).encode(), - ), + send_pov_message(hash_a, pov_hash, pov.clone()), + ).focus().unwrap(), ).await.unwrap(); assert_eq!(&*pov_recv.await.unwrap(), &pov); diff --git a/polkadot/node/network/protocol/Cargo.toml b/polkadot/node/network/protocol/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..abcb6ae2adda370f3778d9c7ec476d53ae3a6337 --- /dev/null +++ b/polkadot/node/network/protocol/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "polkadot-node-network-protocol" +version = "0.1.0" +authors = ["Parity Technologies <admin@parity.io>"] +edition = "2018" +description = "Primitives types for the Node-side" + +[dependencies] +polkadot-primitives = { path = "../../../primitives" } +polkadot-node-primitives = { path = "../../primitives" } +parity-scale-codec = { version = "1.3.4", default-features = false, features = ["derive"] } +runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..7658048ca5c8dbf59e024676744c1a807b7f7a9e --- /dev/null +++ b/polkadot/node/network/protocol/src/lib.rs @@ -0,0 +1,269 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. + +//! Network protocol types for parachains. + +use polkadot_primitives::v1::Hash; +use parity_scale_codec::{Encode, Decode}; +use std::convert::TryFrom; + +pub use sc_network::{ReputationChange, PeerId}; + +/// A unique identifier of a request. +pub type RequestId = u64; + +/// A version of the protocol. +pub type ProtocolVersion = u32; + +/// An error indicating that this the over-arching message type had the wrong variant +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct WrongVariant; + +/// The peer-sets that the network manages. Different subsystems will use different peer-sets. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum PeerSet { + /// The validation peer-set is responsible for all messages related to candidate validation and communication among validators. + Validation, + /// The collation peer-set is used for validator<>collator communication. + Collation, +} + +/// The advertised role of a node. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ObservedRole { + /// A light node. + Light, + /// A full node. + Full, + /// A node claiming to be an authority (unauthenticated) + Authority, +} + +impl From<sc_network::ObservedRole> for ObservedRole { + fn from(role: sc_network::ObservedRole) -> ObservedRole { + match role { + sc_network::ObservedRole::Light => ObservedRole::Light, + sc_network::ObservedRole::Authority => ObservedRole::Authority, + sc_network::ObservedRole::Full + | sc_network::ObservedRole::OurSentry + | sc_network::ObservedRole::OurGuardedAuthority + => ObservedRole::Full, + } + } +} + +impl Into<sc_network::ObservedRole> for ObservedRole { + fn into(self) -> sc_network::ObservedRole { + match self { + ObservedRole::Light => sc_network::ObservedRole::Light, + ObservedRole::Full => sc_network::ObservedRole::Full, + ObservedRole::Authority => sc_network::ObservedRole::Authority, + } + } +} + +/// Events from network. +#[derive(Debug, Clone, PartialEq)] +pub enum NetworkBridgeEvent<M> { + /// A peer has connected. + PeerConnected(PeerId, ObservedRole), + + /// A peer has disconnected. + PeerDisconnected(PeerId), + + /// Peer has sent a message. + PeerMessage(PeerId, M), + + /// Peer's `View` has changed. + PeerViewChange(PeerId, View), + + /// Our `View` has changed. + OurViewChange(View), +} + +macro_rules! impl_try_from { + ($m_ty:ident, $variant:ident, $out:ty) => { + impl TryFrom<$m_ty> for $out { + type Error = crate::WrongVariant; + + #[allow(unreachable_patterns)] // when there is only one variant + fn try_from(x: $m_ty) -> Result<$out, Self::Error> { + match x { + $m_ty::$variant(y) => Ok(y), + _ => Err(crate::WrongVariant), + } + } + } + + impl<'a> TryFrom<&'a $m_ty> for &'a $out { + type Error = crate::WrongVariant; + + fn try_from(x: &'a $m_ty) -> Result<&'a $out, Self::Error> { + #[allow(unreachable_patterns)] // when there is only one variant + match *x { + $m_ty::$variant(ref y) => Ok(y), + _ => Err(crate::WrongVariant), + } + } + } + } +} + +impl<M> NetworkBridgeEvent<M> { + /// Focus an overarching network-bridge event into some more specific variant. + /// + /// This acts as a call to `clone`, except in the case where the event is a message event, + /// in which case the clone can be expensive and it only clones if the message type can + /// be focused. + pub fn focus<'a, T>(&'a self) -> Result<NetworkBridgeEvent<T>, WrongVariant> + where T: 'a + Clone, &'a T: TryFrom<&'a M, Error = WrongVariant> + { + Ok(match *self { + NetworkBridgeEvent::PeerConnected(ref peer, ref role) + => NetworkBridgeEvent::PeerConnected(peer.clone(), role.clone()), + NetworkBridgeEvent::PeerDisconnected(ref peer) + => NetworkBridgeEvent::PeerDisconnected(peer.clone()), + NetworkBridgeEvent::PeerMessage(ref peer, ref msg) + => NetworkBridgeEvent::PeerMessage(peer.clone(), <&'a T>::try_from(msg)?.clone()), + NetworkBridgeEvent::PeerViewChange(ref peer, ref view) + => NetworkBridgeEvent::PeerViewChange(peer.clone(), view.clone()), + NetworkBridgeEvent::OurViewChange(ref view) + => NetworkBridgeEvent::OurViewChange(view.clone()), + }) + } +} + +/// A succinct representation of a peer's view. This consists of a bounded amount of chain heads. +/// +/// Up to `N` (5?) chain heads. +#[derive(Default, Debug, Clone, PartialEq, Eq, Encode, Decode)] +pub struct View(pub Vec<Hash>); + +impl View { + /// Returns an iterator of the hashes present in `Self` but not in `other`. + pub fn difference<'a>(&'a self, other: &'a View) -> impl Iterator<Item = &'a Hash> + 'a { + self.0.iter().filter(move |h| !other.contains(h)) + } + + /// An iterator containing hashes present in both `Self` and in `other`. + pub fn intersection<'a>(&'a self, other: &'a View) -> impl Iterator<Item = &'a Hash> + 'a { + self.0.iter().filter(move |h| other.contains(h)) + } + + /// Whether the view contains a given hash. + pub fn contains(&self, hash: &Hash) -> bool { + self.0.contains(hash) + } +} + +/// v1 protocol types. +pub mod v1 { + use polkadot_primitives::v1::{ + Hash, CollatorId, Id as ParaId, ErasureChunk, CandidateReceipt, + SignedAvailabilityBitfield, PoV, + }; + use polkadot_node_primitives::SignedFullStatement; + use parity_scale_codec::{Encode, Decode}; + use std::convert::TryFrom; + use super::RequestId; + + /// Network messages used by the availability distribution subsystem + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub enum AvailabilityDistributionMessage { + /// An erasure chunk for a given candidate hash. + #[codec(index = "0")] + Chunk(Hash, ErasureChunk), + } + + /// Network messages used by the bitfield distribution subsystem. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub enum BitfieldDistributionMessage { + /// A signed availability bitfield for a given relay-parent hash. + #[codec(index = "0")] + Bitfield(Hash, SignedAvailabilityBitfield), + } + + /// Network messages used by the PoV distribution subsystem. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub enum PoVDistributionMessage { + /// Notification that we are awaiting the given PoVs (by hash) against a + /// specific relay-parent hash. + #[codec(index = "0")] + Awaiting(Hash, Vec<Hash>), + /// Notification of an awaited PoV, in a given relay-parent context. + /// (relay_parent, pov_hash, pov) + #[codec(index = "1")] + SendPoV(Hash, Hash, PoV), + } + + /// Network messages used by the statement distribution subsystem. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub enum StatementDistributionMessage { + /// A signed full statement under a given relay-parent. + #[codec(index = "0")] + Statement(Hash, SignedFullStatement) + } + + /// Network messages used by the collator protocol subsystem + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub enum CollatorProtocolMessage { + /// Declare the intent to advertise collations under a collator ID. + #[codec(index = "0")] + Declare(CollatorId), + /// Advertise a collation to a validator. Can only be sent once the peer has declared + /// that they are a collator with given ID. + #[codec(index = "1")] + AdvertiseCollation(Hash, ParaId), + /// Request the advertised collation at that relay-parent. + #[codec(index = "2")] + RequestCollation(RequestId, Hash, ParaId), + /// A requested collation. + #[codec(index = "3")] + Collation(RequestId, CandidateReceipt, PoV), + } + + /// All network messages on the validation peer-set. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub enum ValidationProtocol { + /// Availability distribution messages + #[codec(index = "0")] + AvailabilityDistribution(AvailabilityDistributionMessage), + /// Bitfield distribution messages + #[codec(index = "1")] + BitfieldDistribution(BitfieldDistributionMessage), + /// PoV Distribution messages + #[codec(index = "2")] + PoVDistribution(PoVDistributionMessage), + /// Statement distribution messages + #[codec(index = "3")] + StatementDistribution(StatementDistributionMessage), + } + + impl_try_from!(ValidationProtocol, AvailabilityDistribution, AvailabilityDistributionMessage); + impl_try_from!(ValidationProtocol, BitfieldDistribution, BitfieldDistributionMessage); + impl_try_from!(ValidationProtocol, PoVDistribution, PoVDistributionMessage); + impl_try_from!(ValidationProtocol, StatementDistribution, StatementDistributionMessage); + + /// All network messages on the collation peer-set. + #[derive(Debug, Clone, Encode, Decode, PartialEq)] + pub enum CollationProtocol { + /// Collator protocol messages + #[codec(index = "0")] + CollatorProtocol(CollatorProtocolMessage), + } + + impl_try_from!(CollationProtocol, CollatorProtocol, CollatorProtocolMessage); +} diff --git a/polkadot/node/network/statement-distribution/Cargo.toml b/polkadot/node/network/statement-distribution/Cargo.toml index 6dfaee78bbda2207ae9d1725db57a9cc3c022e28..c92fdcf6db902876b2fb271a92d88e197a6b11cd 100644 --- a/polkadot/node/network/statement-distribution/Cargo.toml +++ b/polkadot/node/network/statement-distribution/Cargo.toml @@ -16,6 +16,7 @@ parity-scale-codec = "1.3.4" sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } +polkadot-node-network-protocol = { path = "../../network/protocol" } arrayvec = "0.5.1" indexmap = "1.4.0" diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index f58e6da4d46e55330f4c5b392716781ee4c4c267..3f8e6842a873ece169f925082c25b1431d91f299 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -24,15 +24,16 @@ use polkadot_subsystem::{ ActiveLeavesUpdate, FromOverseer, OverseerSignal, }; use polkadot_subsystem::messages::{ - AllMessages, NetworkBridgeMessage, NetworkBridgeEvent, StatementDistributionMessage, - PeerId, ReputationChange as Rep, CandidateBackingMessage, RuntimeApiMessage, - RuntimeApiRequest, + AllMessages, NetworkBridgeMessage, StatementDistributionMessage, CandidateBackingMessage, + RuntimeApiMessage, RuntimeApiRequest, }; -use node_primitives::{ProtocolId, View, SignedFullStatement}; +use node_primitives::SignedFullStatement; use polkadot_primitives::v1::{ Hash, CompactStatement, ValidatorIndex, ValidatorId, SigningContext, ValidatorSignature, }; -use parity_scale_codec::{Encode, Decode}; +use polkadot_node_network_protocol::{ + v1 as protocol_v1, View, PeerId, ReputationChange as Rep, NetworkBridgeEvent, +}; use futures::prelude::*; use futures::channel::oneshot; @@ -40,11 +41,8 @@ use indexmap::IndexSet; use std::collections::{HashMap, HashSet}; -const PROTOCOL_V1: ProtocolId = *b"sdn1"; - const COST_UNEXPECTED_STATEMENT: Rep = Rep::new(-100, "Unexpected Statement"); const COST_INVALID_SIGNATURE: Rep = Rep::new(-500, "Invalid Statement Signature"); -const COST_INVALID_MESSAGE: Rep = Rep::new(-500, "Invalid message"); const COST_DUPLICATE_STATEMENT: Rep = Rep::new(-250, "Statement sent more than once by peer"); const COST_APPARENT_FLOOD: Rep = Rep::new(-1000, "Peer appears to be flooding us with statements"); @@ -77,10 +75,6 @@ impl<C> Subsystem<C> for StatementDistribution } } -fn network_update_message(n: NetworkBridgeEvent) -> AllMessages { - AllMessages::StatementDistribution(StatementDistributionMessage::NetworkBridgeUpdate(n)) -} - /// Tracks our impression of a single peer's view of the candidates a validator has seconded /// for a given relay-parent. /// @@ -480,13 +474,6 @@ fn check_statement_signature( .and_then(|v| statement.check_signature(&signing_context, v)) } -#[derive(Encode, Decode)] -enum WireMessage { - /// relay-parent, full statement. - #[codec(index = "0")] - Statement(Hash, SignedFullStatement), -} - /// Places the statement in storage if it is new, and then /// circulates the statement to all peers who have not seen it yet, and /// sends all statements dependent on that statement to peers who could previously not receive @@ -534,6 +521,14 @@ async fn circulate_statement_and_dependents( Ok(()) } +fn statement_message(relay_parent: Hash, statement: SignedFullStatement) + -> protocol_v1::ValidationProtocol +{ + protocol_v1::ValidationProtocol::StatementDistribution( + protocol_v1::StatementDistributionMessage::Statement(relay_parent, statement) + ) +} + /// Circulates a statement to all peers who have not seen it yet, and returns /// an iterator over peers who need to have dependent statements sent. async fn circulate_statement( @@ -554,10 +549,9 @@ async fn circulate_statement( // Send all these peers the initial statement. if !peers_to_send.is_empty() { - let payload = WireMessage::Statement(relay_parent, stored.statement.clone()).encode(); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + let payload = statement_message(relay_parent, stored.statement.clone()); + ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( peers_to_send.keys().cloned().collect(), - PROTOCOL_V1, payload, ))).await?; } @@ -580,16 +574,14 @@ async fn send_statements_about( ) -> SubsystemResult<()> { for statement in active_head.statements_about(candidate_hash) { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { - let payload = WireMessage::Statement( + let payload = statement_message( relay_parent, statement.statement.clone(), - ).encode(); + ); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( - vec![peer.clone()], - PROTOCOL_V1, - payload, - ))).await?; + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) + )).await?; } } @@ -606,16 +598,14 @@ async fn send_statements( ) -> SubsystemResult<()> { for statement in active_head.statements() { if peer_data.send(&relay_parent, &statement.fingerprint()).is_some() { - let payload = WireMessage::Statement( + let payload = statement_message( relay_parent, statement.statement.clone(), - ).encode(); + ); - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( - vec![peer.clone()], - PROTOCOL_V1, - payload, - ))).await?; + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload) + )).await?; } } @@ -643,11 +633,10 @@ async fn handle_incoming_message<'a>( our_view: &View, active_heads: &'a mut HashMap<Hash, ActiveHeadData>, ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>, - message: Vec<u8>, + message: protocol_v1::StatementDistributionMessage, ) -> SubsystemResult<Option<(Hash, &'a StoredStatement)>> { - let (relay_parent, statement) = match WireMessage::decode(&mut &message[..]) { - Err(_) => return report_peer(ctx, peer, COST_INVALID_MESSAGE).await.map(|_| None), - Ok(WireMessage::Statement(r, s)) => (r, s), + let (relay_parent, statement) = match message { + protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s), }; if !our_view.contains(&relay_parent) { @@ -750,7 +739,7 @@ async fn handle_network_update( active_heads: &mut HashMap<Hash, ActiveHeadData>, ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>, our_view: &mut View, - update: NetworkBridgeEvent, + update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>, ) -> SubsystemResult<()> { match update { NetworkBridgeEvent::PeerConnected(peer, _role) => { @@ -827,12 +816,6 @@ async fn handle_network_update( async fn run( mut ctx: impl SubsystemContext<Message = StatementDistributionMessage>, ) -> SubsystemResult<()> { - // startup: register the network protocol with the bridge. - ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::RegisterEventProducer( - PROTOCOL_V1, - network_update_message, - ))).await?; - let mut peers: HashMap<PeerId, PeerData> = HashMap::new(); let mut our_view = View::default(); let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new(); @@ -897,13 +880,14 @@ async fn run( relay_parent, statement, ).await?, - StatementDistributionMessage::NetworkBridgeUpdate(event) => handle_network_update( - &mut peers, - &mut active_heads, - &mut ctx, - &mut our_view, - event, - ).await?, + StatementDistributionMessage::NetworkBridgeUpdateV1(event) => + handle_network_update( + &mut peers, + &mut active_heads, + &mut ctx, + &mut our_view, + event, + ).await?, } } } @@ -1272,19 +1256,16 @@ mod tests { for statement in active_head.statements_about(candidate_hash) { let message = handle.recv().await; let expected_to = vec![peer.clone()]; - let expected_protocol = PROTOCOL_V1; let expected_payload - = WireMessage::Statement(hash_c, statement.statement.clone()).encode(); + = statement_message(hash_c, statement.statement.clone()); assert_matches!( message, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( to, - protocol, payload, )) => { assert_eq!(to, expected_to); - assert_eq!(protocol, expected_protocol); assert_eq!(payload, expected_payload) } ) @@ -1383,19 +1364,17 @@ mod tests { let message = handle.recv().await; assert_matches!( message, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendMessage( + AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage( to, - protocol, payload, )) => { assert_eq!(to.len(), 2); assert!(to.contains(&peer_b)); assert!(to.contains(&peer_c)); - assert_eq!(protocol, PROTOCOL_V1); assert_eq!( payload, - WireMessage::Statement(hash_b, statement.statement.clone()).encode(), + statement_message(hash_b, statement.statement.clone()), ); } ) diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index c1eab3d8a83539f66223ca7bacdb4a25d1e06617..3b12da323ba98e4e8c20eec31c142a16529e807f 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -145,6 +145,7 @@ fn main() { candidate_validation: Subsystem2, candidate_backing: Subsystem1, candidate_selection: DummySubsystem, + collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index dac25ab5224c357892544d5d71e42270cbbf0fa1..d4931baa3306955a9e42c0699d3807d2024c9e68 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -78,7 +78,7 @@ use polkadot_subsystem::messages::{ CandidateValidationMessage, CandidateBackingMessage, CandidateSelectionMessage, ChainApiMessage, StatementDistributionMessage, AvailabilityDistributionMessage, BitfieldSigningMessage, BitfieldDistributionMessage, - ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, + ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, CollatorProtocolMessage, AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, }; pub use polkadot_subsystem::{ @@ -333,6 +333,9 @@ pub struct Overseer<S: SpawnNamed> { /// A candidate selection subsystem. candidate_selection_subsystem: OverseenSubsystem<CandidateSelectionMessage>, + /// A collator protocol subsystem + collator_protocol_subsystem: OverseenSubsystem<CollatorProtocolMessage>, + /// A statement distribution subsystem. statement_distribution_subsystem: OverseenSubsystem<StatementDistributionMessage>, @@ -395,13 +398,15 @@ pub struct Overseer<S: SpawnNamed> { /// /// [`Subsystem`]: trait.Subsystem.html /// [`DummySubsystem`]: struct.DummySubsystem.html -pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA> { +pub struct AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA> { /// A candidate validation subsystem. pub candidate_validation: CV, /// A candidate backing subsystem. pub candidate_backing: CB, /// A candidate selection subsystem. pub candidate_selection: CS, + /// A collator protocol subsystem. + pub collator_protocol: CP, /// A statement distribution subsystem. pub statement_distribution: SD, /// An availability distribution subsystem. @@ -494,6 +499,7 @@ where /// candidate_validation: ValidationSubsystem, /// candidate_backing: DummySubsystem, /// candidate_selection: DummySubsystem, + /// collator_protocol: DummySubsystem, /// statement_distribution: DummySubsystem, /// availability_distribution: DummySubsystem, /// bitfield_signing: DummySubsystem, @@ -524,15 +530,16 @@ where /// # /// # }); } /// ``` - pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>( + pub fn new<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>( leaves: impl IntoIterator<Item = BlockInfo>, - all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>, + all_subsystems: AllSubsystems<CV, CB, CS, CP, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> where CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send, CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send, CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send, + CP: Subsystem<OverseerSubsystemContext<CollatorProtocolMessage>> + Send, SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send, AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send, BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + Send, @@ -574,6 +581,13 @@ where all_subsystems.candidate_selection, )?; + let collator_protocol_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.collator_protocol, + )?; + let statement_distribution_subsystem = spawn( &mut s, &mut running_subsystems, @@ -655,6 +669,7 @@ where candidate_validation_subsystem, candidate_backing_subsystem, candidate_selection_subsystem, + collator_protocol_subsystem, statement_distribution_subsystem, availability_distribution_subsystem, bitfield_signing_subsystem, @@ -690,6 +705,10 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } + if let Some(ref mut s) = self.collator_protocol_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } @@ -844,6 +863,10 @@ where s.tx.send(FromOverseer::Signal(signal.clone())).await?; } + if let Some(ref mut s) = self.collator_protocol_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { s.tx.send(FromOverseer::Signal(signal.clone())).await?; } @@ -900,6 +923,11 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } + AllMessages::CollatorProtocol(msg) => { + if let Some(ref mut s) = self.collator_protocol_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } AllMessages::StatementDistribution(msg) => { if let Some(ref mut s) = self.statement_distribution_subsystem.instance { let _ = s.tx.send(FromOverseer::Communication { msg }).await; @@ -1102,6 +1130,7 @@ mod tests { candidate_validation: TestSubsystem1(s1_tx), candidate_backing: TestSubsystem2(s2_tx), candidate_selection: DummySubsystem, + collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -1166,6 +1195,7 @@ mod tests { candidate_validation: TestSubsystem1(s1_tx), candidate_backing: TestSubsystem4, candidate_selection: DummySubsystem, + collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -1283,6 +1313,7 @@ mod tests { candidate_validation: TestSubsystem5(tx_5), candidate_backing: TestSubsystem6(tx_6), candidate_selection: DummySubsystem, + collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, @@ -1385,6 +1416,7 @@ mod tests { candidate_validation: TestSubsystem5(tx_5), candidate_backing: TestSubsystem6(tx_6), candidate_selection: DummySubsystem, + collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs index 2e5f993a7e770bc8002ec10c6dbb1cbf1ff00b31..2bcc7a392647da9ccdd024180077a84c83a806d6 100644 --- a/polkadot/node/primitives/src/lib.rs +++ b/polkadot/node/primitives/src/lib.rs @@ -258,29 +258,3 @@ impl std::convert::TryFrom<FromTableMisbehavior> for MisbehaviorReport { } } } - -/// A unique identifier for a network protocol. -pub type ProtocolId = [u8; 4]; - -/// A succinct representation of a peer's view. This consists of a bounded amount of chain heads. -/// -/// Up to `N` (5?) chain heads. -#[derive(Default, Debug, Clone, PartialEq, Eq, Encode, Decode)] -pub struct View(pub Vec<Hash>); - -impl View { - /// Returns an iterator of the hashes present in `Self` but not in `other`. - pub fn difference<'a>(&'a self, other: &'a View) -> impl Iterator<Item = &'a Hash> + 'a { - self.0.iter().filter(move |h| !other.contains(h)) - } - - /// An iterator containing hashes present in both `Self` and in `other`. - pub fn intersection<'a>(&'a self, other: &'a View) -> impl Iterator<Item = &'a Hash> + 'a { - self.0.iter().filter(move |h| other.contains(h)) - } - - /// Whether the view contains a given hash. - pub fn contains(&self, hash: &Hash) -> bool { - self.0.contains(hash) - } -} diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index df4bd64b8aa849cbf6f3f2657cc823cbcb6d8fed..ca350454f025544f8e5fa1e52f812226d955e9c9 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -281,6 +281,7 @@ fn real_overseer<S: SpawnNamed>( candidate_validation: DummySubsystem, candidate_backing: DummySubsystem, candidate_selection: DummySubsystem, + collator_protocol: DummySubsystem, statement_distribution: DummySubsystem, availability_distribution: DummySubsystem, bitfield_signing: DummySubsystem, diff --git a/polkadot/node/subsystem/Cargo.toml b/polkadot/node/subsystem/Cargo.toml index 3e676cb4f164141b4df5fb71f76bab1e36c1013e..6ad013c177f3c5751e4a5e33b4f03695ca479604 100644 --- a/polkadot/node/subsystem/Cargo.toml +++ b/polkadot/node/subsystem/Cargo.toml @@ -15,6 +15,7 @@ parity-scale-codec = "1.3.4" parking_lot = { version = "0.10.0", optional = true } pin-project = "0.4.22" polkadot-node-primitives = { path = "../primitives" } +polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-primitives = { path = "../../primitives" } polkadot-statement-table = { path = "../../statement-table" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index ae1d429d757c1b8ba4e6f477e904021a8ec7074d..60813124015ac937b32d367aa7290064f785ceed 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -25,7 +25,7 @@ use futures::channel::{mpsc, oneshot}; use polkadot_primitives::v1::{ - Hash, CommittedCandidateReceipt, + Hash, CommittedCandidateReceipt, CollatorId, CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId, SignedAvailabilityBitfield, ValidatorId, ValidationCode, ValidatorIndex, CoreAssignment, CoreOccupied, CandidateDescriptor, @@ -34,12 +34,13 @@ use polkadot_primitives::v1::{ CandidateEvent, SessionIndex, BlockNumber, }; use polkadot_node_primitives::{ - MisbehaviorReport, SignedFullStatement, View, ProtocolId, ValidationResult, + MisbehaviorReport, SignedFullStatement, ValidationResult, +}; +use polkadot_node_network_protocol::{ + v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, PeerSet, }; use std::sync::Arc; -pub use sc_network::{ObservedRole, ReputationChange, PeerId}; - /// A notification of a new backed candidate. #[derive(Debug)] pub struct NewBackedCandidate(pub BackedCandidate); @@ -142,45 +143,71 @@ impl CandidateValidationMessage { } } -/// Events from network. -#[derive(Debug, Clone)] -pub enum NetworkBridgeEvent { - /// A peer has connected. - PeerConnected(PeerId, ObservedRole), - - /// A peer has disconnected. - PeerDisconnected(PeerId), - - /// Peer has sent a message. - PeerMessage(PeerId, Vec<u8>), - /// Peer's `View` has changed. - PeerViewChange(PeerId, View), +/// Messages received by the Collator Protocol subsystem. +#[derive(Debug)] +pub enum CollatorProtocolMessage { + /// Signal to the collator protocol that it should connect to validators with the expectation + /// of collating on the given para. This is only expected to be called once, early on, if at all, + /// and only by the Collation Generation subsystem. As such, it will overwrite the value of + /// the previous signal. + /// + /// This should be sent before any `DistributeCollation` message. + CollateOn(ParaId), + /// Provide a collation to distribute to validators. + DistributeCollation(CandidateReceipt, PoV), + /// Fetch a collation under the given relay-parent for the given ParaId. + FetchCollation(Hash, ParaId, oneshot::Sender<(CandidateReceipt, PoV)>), + /// Report a collator as having provided an invalid collation. This should lead to disconnect + /// and blacklist of the collator. + ReportCollator(CollatorId), + /// Note a collator as having provided a good collation. + NoteGoodCollation(CollatorId), + /// Get a network bridge update. + NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>), +} - /// Our `View` has changed. - OurViewChange(View), +impl CollatorProtocolMessage { + /// If the current variant contains the relay parent hash, return it. + pub fn relay_parent(&self) -> Option<Hash> { + match self { + Self::CollateOn(_) => None, + Self::DistributeCollation(receipt, _) => Some(receipt.descriptor().relay_parent), + Self::FetchCollation(relay_parent, _, _) => Some(*relay_parent), + Self::ReportCollator(_) => None, + Self::NoteGoodCollation(_) => None, + Self::NetworkBridgeUpdateV1(_) => None, + } + } } /// Messages received by the network bridge subsystem. #[derive(Debug)] pub enum NetworkBridgeMessage { - /// Register an event producer on startup. - RegisterEventProducer(ProtocolId, fn(NetworkBridgeEvent) -> AllMessages), - /// Report a peer for their actions. ReportPeer(PeerId, ReputationChange), - /// Send a message to multiple peers. - SendMessage(Vec<PeerId>, ProtocolId, Vec<u8>), + /// Send a message to one or more peers on the validation peer-set. + SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol), + + /// Send a message to one or more peers on the collation peer-set. + SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol), + + /// Connect to peers who represent the given `ValidatorId`s at the given relay-parent. + /// + /// Also accepts a response channel by which the issuer can learn the `PeerId`s of those + /// validators. + ConnectToValidators(PeerSet, Vec<ValidatorId>, oneshot::Sender<Vec<(ValidatorId, PeerId)>>), } impl NetworkBridgeMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option<Hash> { match self { - Self::RegisterEventProducer(_, _) => None, Self::ReportPeer(_, _) => None, - Self::SendMessage(_, _, _) => None, + Self::SendValidationMessage(_, _) => None, + Self::SendCollationMessage(_, _) => None, + Self::ConnectToValidators(_, _, _) => None, } } } @@ -189,14 +216,14 @@ impl NetworkBridgeMessage { #[derive(Debug)] pub enum AvailabilityDistributionMessage { /// Event from the network bridge. - NetworkBridgeUpdate(NetworkBridgeEvent), + NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>), } impl AvailabilityDistributionMessage { /// If the current variant contains the relay parent hash, return it. pub fn relay_parent(&self) -> Option<Hash> { match self { - Self::NetworkBridgeUpdate(_) => None, + Self::NetworkBridgeUpdateV1(_) => None, } } } @@ -208,7 +235,7 @@ pub enum BitfieldDistributionMessage { DistributeBitfield(Hash, SignedAvailabilityBitfield), /// Event from the network bridge. - NetworkBridgeUpdate(NetworkBridgeEvent), + NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::BitfieldDistributionMessage>), } impl BitfieldDistributionMessage { @@ -216,7 +243,7 @@ impl BitfieldDistributionMessage { pub fn relay_parent(&self) -> Option<Hash> { match self { Self::DistributeBitfield(hash, _) => Some(*hash), - Self::NetworkBridgeUpdate(_) => None, + Self::NetworkBridgeUpdateV1(_) => None, } } } @@ -391,7 +418,7 @@ pub enum StatementDistributionMessage { /// given relay-parent hash and it should be distributed to other validators. Share(Hash, SignedFullStatement), /// Event from the network bridge. - NetworkBridgeUpdate(NetworkBridgeEvent), + NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>), } impl StatementDistributionMessage { @@ -399,7 +426,7 @@ impl StatementDistributionMessage { pub fn relay_parent(&self) -> Option<Hash> { match self { Self::Share(hash, _) => Some(*hash), - Self::NetworkBridgeUpdate(_) => None, + Self::NetworkBridgeUpdateV1(_) => None, } } } @@ -464,7 +491,7 @@ pub enum PoVDistributionMessage { /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor DistributePoV(Hash, CandidateDescriptor, Arc<PoV>), /// An update from the network bridge. - NetworkBridgeUpdate(NetworkBridgeEvent), + NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::PoVDistributionMessage>), } impl PoVDistributionMessage { @@ -473,7 +500,7 @@ impl PoVDistributionMessage { match self { Self::FetchPoV(hash, _, _) => Some(*hash), Self::DistributePoV(hash, _, _) => Some(*hash), - Self::NetworkBridgeUpdate(_) => None, + Self::NetworkBridgeUpdateV1(_) => None, } } } @@ -487,6 +514,10 @@ pub enum AllMessages { CandidateBacking(CandidateBackingMessage), /// Message for the candidate selection subsystem. CandidateSelection(CandidateSelectionMessage), + /// Message for the Chain API subsystem. + ChainApi(ChainApiMessage), + /// Message for the Collator Protocol subsystem. + CollatorProtocol(CollatorProtocolMessage), /// Message for the statement distribution subsystem. StatementDistribution(StatementDistributionMessage), /// Message for the availability distribution subsystem. @@ -505,6 +536,4 @@ pub enum AllMessages { AvailabilityStore(AvailabilityStoreMessage), /// Message for the network bridge subsystem. NetworkBridge(NetworkBridgeMessage), - /// Message for the Chain API subsystem. - ChainApi(ChainApiMessage), }