Unverified Commit 1e9c0540 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Network bridge refactoring impl (#1537)

* update networking types

* port over overseer-protocol message types

* Add the collation protocol to network bridge

* message sending

* stub for ConnectToValidators

* add some helper traits and methods to protocol types

* add collator protocol message

* leaves-updating

* peer connection and disconnection

* add utilities for dispatching multiple events

* implement message handling

* add an observedrole enum with equality and no sentry nodes

* derive partial-eq on network bridge event

* add PartialEq impls for network message types

* add Into implementation for observedrole

* port over existing network bridge tests

* add some more tests

* port bitfield distribution

* port over bitfield distribution tests

* add codec indices

* port PoV distribution

* port over PoV distribution tests

* port over statement distribution

* port over statement distribution tests

* update overseer and service-new

* address review comments

* port availability distribution

* port over availability distribution tests
parent fb831768
Pipeline #103670 passed with stages
in 25 minutes and 26 seconds
......@@ -4423,8 +4423,8 @@ dependencies = [
"maplit",
"parity-scale-codec",
"parking_lot 0.11.0",
"polkadot-network",
"polkadot-network-bridge",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
......@@ -4450,19 +4450,16 @@ dependencies = [
"parity-scale-codec",
"parking_lot 0.11.0",
"polkadot-erasure-coding",
"polkadot-network",
"polkadot-network-bridge",
"polkadot-node-primitives",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-primitives",
"sc-keystore",
"sc-network",
"smallvec 1.4.1",
"smol-timeout",
"sp-core",
"sp-keyring",
"sp-staking",
"streamunordered",
]
......@@ -4608,12 +4605,13 @@ dependencies = [
"log 0.4.11",
"parity-scale-codec",
"parking_lot 0.10.2",
"polkadot-node-primitives",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-primitives",
"sc-network",
"sp-core",
"sp-keyring",
"sp-runtime",
"streamunordered",
]
......@@ -4786,6 +4784,18 @@ dependencies = [
"sp-core",
]
[[package]]
name = "polkadot-node-network-protocol"
version = "0.1.0"
dependencies = [
"parity-scale-codec",
"polkadot-node-primitives",
"polkadot-primitives",
"sc-network",
"sp-core",
"sp-runtime",
]
[[package]]
name = "polkadot-node-primitives"
version = "0.1.0"
......@@ -4810,6 +4820,7 @@ dependencies = [
"parity-scale-codec",
"parking_lot 0.10.2",
"pin-project",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem-test-helpers",
"polkadot-primitives",
......@@ -4913,11 +4924,11 @@ dependencies = [
"log 0.4.11",
"parity-scale-codec",
"parking_lot 0.10.2",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-primitives",
"sc-network",
"sp-core",
"sp-runtime",
"streamunordered",
......@@ -5271,6 +5282,7 @@ dependencies = [
"log 0.4.11",
"parity-scale-codec",
"parking_lot 0.10.2",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
......
......@@ -55,6 +55,7 @@ members = [
"node/core/runtime-api",
"node/network/bridge",
"node/network/pov-distribution",
"node/network/protocol",
"node/network/statement-distribution",
"node/network/bitfield-distribution",
"node/network/availability-distribution",
......
......@@ -9,16 +9,13 @@ futures = "0.3.5"
log = "0.4.11"
streamunordered = "0.5.1"
codec = { package="parity-scale-codec", version = "1.3.4", features = ["std"] }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-erasure-coding = { path = "../../../erasure-coding" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-network-bridge = { path = "../../network/bridge" }
polkadot-network = { path = "../../../network" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
derive_more = "0.99.9"
sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
[dev-dependencies]
......@@ -31,4 +28,4 @@ futures-timer = "3.0.2"
smol-timeout = "0.1.0"
env_logger = "0.7.1"
assert_matches = "1.3.0"
smallvec = "1"
\ No newline at end of file
smallvec = "1"
......@@ -32,24 +32,27 @@ use sp_core::{
};
use sc_keystore as keystore;
use node_primitives::{ProtocolId, View};
use log::{trace, warn};
use polkadot_erasure_coding::branch_hash;
use polkadot_primitives::v1::{
PARACHAIN_KEY_TYPE_ID,
BlakeTwo256, CommittedCandidateReceipt, CoreState, ErasureChunk,
Hash as Hash, HashT, Id as ParaId,
ValidatorId, ValidatorIndex,
ValidatorId, ValidatorIndex, SessionIndex,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, NetworkBridgeMessage, RuntimeApiMessage,
RuntimeApiRequest, AvailabilityStoreMessage, ChainApiMessage,
};
use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
};
use sc_network::ReputationChange as Rep;
use sp_staking::SessionIndex;
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, ReputationChange as Rep, PeerId,
NetworkBridgeEvent,
};
use std::collections::{HashMap, HashSet};
use std::io;
use std::iter;
......@@ -76,7 +79,6 @@ type Result<T> = std::result::Result<T, Error>;
const COST_MERKLE_PROOF_INVALID: Rep = Rep::new(-100, "Merkle proof was invalid");
const COST_NOT_A_LIVE_CANDIDATE: Rep = Rep::new(-51, "Candidate is not live");
const COST_MESSAGE_NOT_DECODABLE: Rep = Rep::new(-100, "Message is not decodable");
const COST_PEER_DUPLICATE_MESSAGE: Rep = Rep::new(-500, "Peer sent identical messages");
const BENEFIT_VALID_MESSAGE_FIRST: Rep = Rep::new(15, "Valid message with new information");
const BENEFIT_VALID_MESSAGE: Rep = Rep::new(10, "Valid message");
......@@ -284,17 +286,13 @@ impl ProtocolState {
}
}
fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
AllMessages::AvailabilityDistribution(AvailabilityDistributionMessage::NetworkBridgeUpdate(n))
}
/// Deal with network bridge updates and track what needs to be tracked
/// which depends on the message type received.
async fn handle_network_msg<Context>(
ctx: &mut Context,
keystore: KeyStorePtr,
state: &mut ProtocolState,
bridge_message: NetworkBridgeEvent,
bridge_message: NetworkBridgeEvent<protocol_v1::AvailabilityDistributionMessage>,
) -> Result<()>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
......@@ -314,19 +312,13 @@ where
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(ctx, keystore, state, view).await?;
}
NetworkBridgeEvent::PeerMessage(remote, bytes) => {
if let Ok(gossiped_availability) =
AvailabilityGossipMessage::decode(&mut (bytes.as_slice()))
{
trace!(
target: TARGET,
"Received availability gossip from peer {:?}",
&remote
);
process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?;
} else {
modify_reputation(ctx, remote, COST_MESSAGE_NOT_DECODABLE).await?;
}
NetworkBridgeEvent::PeerMessage(remote, msg) => {
let gossiped_availability = match msg {
protocol_v1::AvailabilityDistributionMessage::Chunk(candidate_hash, chunk) =>
AvailabilityGossipMessage { candidate_hash, erasure_chunk: chunk }
};
process_incoming_peer_message(ctx, state, remote, gossiped_availability).await?;
}
}
Ok(())
......@@ -494,16 +486,19 @@ where
.insert(message_id);
}
let encoded = message.encode();
per_candidate
.message_vault
.insert(message.erasure_chunk.index, message);
.insert(message.erasure_chunk.index, message.clone());
let wire_message = protocol_v1::AvailabilityDistributionMessage::Chunk(
message.candidate_hash,
message.erasure_chunk,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendMessage(
NetworkBridgeMessage::SendValidationMessage(
peers.clone(),
AvailabilityDistributionSubsystem::PROTOCOL_ID,
encoded,
protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message),
),
))
.await
......@@ -709,9 +704,6 @@ pub struct AvailabilityDistributionSubsystem {
}
impl AvailabilityDistributionSubsystem {
/// The protocol identifier for bitfield distribution.
const PROTOCOL_ID: ProtocolId = *b"avad";
/// Number of ancestors to keep around for the relay-chain heads.
const K: usize = 3;
......@@ -725,20 +717,13 @@ impl AvailabilityDistributionSubsystem {
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
// startup: register the network protocol with the bridge.
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::RegisterEventProducer(Self::PROTOCOL_ID, network_update_message),
))
.await
.map_err::<Error, _>(Into::into)?;
// work: process incoming messages from the overseer.
let mut state = ProtocolState::default();
loop {
let message = ctx.recv().await.map_err::<Error, _>(Into::into)?;
match message {
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::NetworkBridgeUpdate(event),
msg: AvailabilityDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
if let Err(e) = handle_network_msg(
&mut ctx,
......
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use super::*;
use assert_matches::assert_matches;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
......@@ -7,6 +23,7 @@ use polkadot_primitives::v1::{
OmittedValidationData, PoV, ScheduledCore, ValidatorPair,
};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::ObservedRole;
use futures::{executor, future, Future};
use futures_timer::Delay;
......@@ -26,6 +43,15 @@ macro_rules! delay {
};
}
fn chunk_protocol_message(message: AvailabilityGossipMessage)
-> protocol_v1::AvailabilityDistributionMessage
{
protocol_v1::AvailabilityDistributionMessage::Chunk(
message.candidate_hash,
message.erasure_chunk,
)
}
struct TestHarness {
virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
}
......@@ -437,12 +463,9 @@ fn reputation_verification() {
)
.await;
// ignore event producer registration
let _ = overseer_recv(&mut virtual_overseer).await;
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![current,]),
),
)
......@@ -668,7 +691,7 @@ fn reputation_verification() {
// setup peer a with interest in current
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full),
),
)
......@@ -676,7 +699,7 @@ fn reputation_verification() {
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![current]),
),
)
......@@ -685,7 +708,7 @@ fn reputation_verification() {
// setup peer b with interest in ancestor
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full),
),
)
......@@ -693,7 +716,7 @@ fn reputation_verification() {
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![ancestors[0]]),
),
)
......@@ -701,35 +724,6 @@ fn reputation_verification() {
delay!(100);
/////////////////////////////////////////////////////////
// ready for action
// check if garbage messages are detected and peer rep is changed as expected
let garbage = b"I am garbage";
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
// AvailabilityDistributionSubsystem::PROTOCOL_ID,
garbage.to_vec(),
)),
)
.await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(
peer,
rep
)
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_MESSAGE_NOT_DECODABLE);
}
);
let valid: AvailabilityGossipMessage = make_valid_availability_gossip(
&test_state,
candidates[0].hash(),
......@@ -741,8 +735,11 @@ fn reputation_verification() {
// valid (first, from b)
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(peer_b.clone(), valid.encode()),
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
chunk_protocol_message(valid.clone()),
),
),
)
.await;
......@@ -765,8 +762,11 @@ fn reputation_verification() {
// valid (duplicate, from b)
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(peer_b.clone(), valid.encode()),
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
chunk_protocol_message(valid.clone()),
),
),
)
.await;
......@@ -789,8 +789,11 @@ fn reputation_verification() {
// valid (second, from a)
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid.encode()),
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
chunk_protocol_message(valid.clone()),
),
),
)
.await;
......@@ -812,7 +815,7 @@ fn reputation_verification() {
// peer a is not interested in anything anymore
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![]),
),
)
......@@ -822,8 +825,11 @@ fn reputation_verification() {
// send the a message again, so we should detect the duplicate
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid.encode()),
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
chunk_protocol_message(valid.clone()),
),
),
)
.await;
......@@ -846,7 +852,7 @@ fn reputation_verification() {
// setup peer a with interest in parent x
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerDisconnected(peer_b.clone()),
),
)
......@@ -856,7 +862,7 @@ fn reputation_verification() {
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full),
),
)
......@@ -874,8 +880,11 @@ fn reputation_verification() {
// send the a message before we send a view update
overseer_send(
&mut virtual_overseer,
AvailabilityDistributionMessage::NetworkBridgeUpdate(
NetworkBridgeEvent::PeerMessage(peer_a.clone(), valid2.encode()),
AvailabilityDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
chunk_protocol_message(valid2),
),
),
)
.await;
......
......@@ -14,7 +14,7 @@ node-primitives = { package = "polkadot-node-primitives", path = "../../primitiv
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-network-bridge = { path = "../../network/bridge" }
polkadot-network = { path = "../../../network" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
[dev-dependencies]
......
......@@ -23,15 +23,13 @@
use codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
use node_primitives::{ProtocolId, View};
use log::{trace, warn};
use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
};
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use sc_network::ReputationChange;
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange};
use std::collections::{HashMap, HashSet};
const COST_SIGNATURE_INVALID: ReputationChange =
......@@ -42,8 +40,6 @@ const COST_MISSING_PEER_SESSION_KEY: ReputationChange =
ReputationChange::new(-133, "Missing peer session key");
const COST_NOT_IN_VIEW: ReputationChange =
ReputationChange::new(-51, "Not interested in that parent hash");
const COST_MESSAGE_NOT_DECODABLE: ReputationChange =
ReputationChange::new(-100, "Not interested in that parent hash");
const COST_PEER_DUPLICATE_MESSAGE: ReputationChange =
ReputationChange::new(-500, "Peer sent the same message multiple times");
const BENEFIT_VALID_MESSAGE_FIRST: ReputationChange =
......@@ -54,11 +50,28 @@ const BENEFIT_VALID_MESSAGE: ReputationChange =
/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
pub struct BitfieldGossipMessage {
struct BitfieldGossipMessage {
/// The relay parent this message is relative to.
pub relay_parent: Hash,
relay_parent: Hash,
/// The actual signed availability bitfield.
pub signed_availability: SignedAvailabilityBitfield,
signed_availability: SignedAvailabilityBitfield,
}
impl BitfieldGossipMessage {
fn into_validation_protocol(self) -> protocol_v1::ValidationProtocol {
protocol_v1::ValidationProtocol::BitfieldDistribution(
self.into_network_message()
)
}
fn into_network_message(self)
-> protocol_v1::BitfieldDistributionMessage
{
protocol_v1::BitfieldDistributionMessage::Bitfield(
self.relay_parent,
self.signed_availability,
)
}
}
/// Data used to track information of peers and relay parents the
......@@ -114,28 +127,15 @@ impl PerRelayParentData {
}
}
fn network_update_message(n: NetworkBridgeEvent) -> AllMessages {
AllMessages::BitfieldDistribution(BitfieldDistributionMessage::NetworkBridgeUpdate(n))
}
/// The bitfield distribution subsystem.
pub struct BitfieldDistribution;
impl BitfieldDistribution {
/// The protocol identifier for bitfield distribution.
const PROTOCOL_ID: ProtocolId = *b"bitd";
/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut ctx: Context) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
// startup: register the network protocol with the bridge.
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::RegisterEventProducer(Self::PROTOCOL_ID, network_update_message),
))
.await?;
// work: process incoming messages from the overseer and process accordingly.
let mut state = ProtocolState::default();
loop {
......@@ -149,7 +149,7 @@ impl BitfieldDistribution {
.await?;
}
FromOverseer::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdate(event),
msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
trace!(target: "bitd", "Processing NetworkMessage");
// a network message was received
......@@ -314,10 +314,9 @@ where
);
} else {
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendMessage(
NetworkBridgeMessage::SendValidationMessage(
interested_peers,
BitfieldDistribution::PROTOCOL_ID,
message.encode(),
message.into_validation_protocol(),
),
))
.await?;
......@@ -413,7 +412,7 @@ where
async fn handle_network_msg<Context>(
ctx: &mut Context,
state: &mut ProtocolState,
bridge_message: NetworkBridgeEvent,
bridge_message: NetworkBridgeEvent<protocol_v1::BitfieldDistributionMessage>,
) -> SubsystemResult<()>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
......@@ -433,12 +432,16 @@ where
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(state, view)?;
}