Unverified Commit 7545a340 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Make `validation::NetworkService` strongly typed (#295)

By using a strongly typed network service, we make sure that we send and
receive the correct messages. Before there was a bug, a `SignedStatement`
was sent and a `GossipMessage` was decoded, but this could never work.
parent cf7b4564
Pipeline #41145 passed with stages
in 21 minutes and 25 seconds
...@@ -11,7 +11,7 @@ parking_lot = "0.7.1" ...@@ -11,7 +11,7 @@ parking_lot = "0.7.1"
av_store = { package = "polkadot-availability-store", path = "../availability-store" } av_store = { package = "polkadot-availability-store", path = "../availability-store" }
polkadot-validation = { path = "../validation" } polkadot-validation = { path = "../validation" }
polkadot-primitives = { path = "../primitives" } polkadot-primitives = { path = "../primitives" }
parity-codec = { version = "3.0", features = ["derive"] } parity-codec = { version = "3.5.1", features = ["derive"] }
substrate-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } substrate-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
substrate-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } substrate-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
......
...@@ -64,7 +64,7 @@ mod cost { ...@@ -64,7 +64,7 @@ mod cost {
/// A gossip message. /// A gossip message.
#[derive(Encode, Decode, Clone)] #[derive(Encode, Decode, Clone)]
pub(crate) enum GossipMessage { pub enum GossipMessage {
/// A packet sent to a neighbor but not relayed. /// A packet sent to a neighbor but not relayed.
#[codec(index = "1")] #[codec(index = "1")]
Neighbor(VersionedNeighborPacket), Neighbor(VersionedNeighborPacket),
...@@ -76,13 +76,29 @@ pub(crate) enum GossipMessage { ...@@ -76,13 +76,29 @@ pub(crate) enum GossipMessage {
// erasure-coded chunks. // erasure-coded chunks.
} }
impl From<GossipStatement> for GossipMessage {
fn from(stmt: GossipStatement) -> Self {
GossipMessage::Statement(stmt)
}
}
/// A gossip message containing a statement. /// A gossip message containing a statement.
#[derive(Encode, Decode, Clone)] #[derive(Encode, Decode, Clone)]
pub(crate) struct GossipStatement { pub struct GossipStatement {
/// The relay chain parent hash. /// The relay chain parent hash.
pub(crate) relay_parent: Hash, pub relay_parent: Hash,
/// The signed statement being gossipped. /// The signed statement being gossipped.
pub(crate) signed_statement: SignedStatement, pub signed_statement: SignedStatement,
}
impl GossipStatement {
/// Create a new instance.
pub fn new(relay_parent: Hash, signed_statement: SignedStatement) -> Self {
Self {
relay_parent,
signed_statement,
}
}
} }
/// A versioned neighbor message. /// A versioned neighbor message.
......
...@@ -69,7 +69,7 @@ mod benefit { ...@@ -69,7 +69,7 @@ mod benefit {
type FullStatus = GenericFullStatus<Block>; type FullStatus = GenericFullStatus<Block>;
/// Specialization of the network service for the polkadot protocol. /// Specialization of the network service for the polkadot protocol.
pub type NetworkService = ::substrate_network::NetworkService<Block, PolkadotProtocol>; pub type NetworkService = substrate_network::NetworkService<Block, PolkadotProtocol>;
/// Status of a Polkadot node. /// Status of a Polkadot node.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
......
...@@ -31,9 +31,8 @@ use polkadot_primitives::{Block, Hash}; ...@@ -31,9 +31,8 @@ use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost, use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost,
ValidatorIndex, Collation, PoVBlock, ValidatorIndex, Collation, PoVBlock,
}; };
use crate::gossip::RegisteredMessageValidator; use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement};
use parity_codec::{Encode, Decode};
use futures::prelude::*; use futures::prelude::*;
use parking_lot::Mutex; use parking_lot::Mutex;
use log::{debug, trace}; use log::{debug, trace};
...@@ -79,19 +78,18 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> { ...@@ -79,19 +78,18 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
/// Return a future of checked messages. These should be imported into the router /// Return a future of checked messages. These should be imported into the router
/// with `import_statement`. /// with `import_statement`.
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement,Error=()> { ///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> {
// spin up a task in the background that processes all incoming statements // spin up a task in the background that processes all incoming statements
// validation has been done already by the gossip validator. // validation has been done already by the gossip validator.
// this will block internally until the gossip messages stream is obtained. // this will block internally until the gossip messages stream is obtained.
self.network().gossip_messages_for(self.attestation_topic) self.network().gossip_messages_for(self.attestation_topic)
.filter_map(|msg| { .filter_map(|msg| match msg.0 {
use crate::gossip::GossipMessage; GossipMessage::Statement(s) => Some(s.signed_statement),
_ => None
debug!(target: "validation", "Processing statement for live validation session");
match GossipMessage::decode(&mut &msg.message[..]) {
Some(GossipMessage::Statement(s)) => Some(s.signed_statement),
_ => None,
}
}) })
} }
...@@ -180,6 +178,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w ...@@ -180,6 +178,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
let network = self.network().clone(); let network = self.network().clone();
let knowledge = self.fetcher.knowledge().clone(); let knowledge = self.fetcher.knowledge().clone();
let attestation_topic = self.attestation_topic.clone(); let attestation_topic = self.attestation_topic.clone();
let parent_hash = self.parent_hash();
producer.prime(self.fetcher.api().clone()) producer.prime(self.fetcher.api().clone())
.map(move |validated| { .map(move |validated| {
...@@ -193,8 +192,11 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w ...@@ -193,8 +192,11 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
// propagate the statement. // propagate the statement.
// consider something more targeted than gossip in the future. // consider something more targeted than gossip in the future.
let signed = table.import_validated(validated); let statement = GossipStatement::new(
network.gossip_message(attestation_topic, signed.encode()); parent_hash,
table.import_validated(validated),
);
network.gossip_message(attestation_topic, statement.into());
}) })
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e)) .map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e))
} }
...@@ -213,11 +215,14 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh ...@@ -213,11 +215,14 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
// produce a signed statement // produce a signed statement
let hash = collation.receipt.hash(); let hash = collation.receipt.hash();
let validated = Validated::collated_local(collation.receipt, collation.pov.clone(), extrinsic.clone()); let validated = Validated::collated_local(collation.receipt, collation.pov.clone(), extrinsic.clone());
let statement = self.table.import_validated(validated); let statement = GossipStatement::new(
self.parent_hash(),
self.table.import_validated(validated),
);
// give to network to make available. // give to network to make available.
self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic)); self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(extrinsic));
self.network().gossip_message(self.attestation_topic, statement.encode()); self.network().gossip_message(self.attestation_topic, statement.into());
} }
fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof { fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof {
......
...@@ -18,7 +18,8 @@ ...@@ -18,7 +18,8 @@
#![allow(unused)] #![allow(unused)]
use crate::validation::{NetworkService, GossipService}; use crate::validation::{NetworkService, GossipService, GossipMessageStream};
use crate::gossip::GossipMessage;
use substrate_network::Context as NetContext; use substrate_network::Context as NetContext;
use substrate_network::consensus_gossip::TopicNotification; use substrate_network::consensus_gossip::TopicNotification;
use substrate_primitives::{NativeOrEncoded, ExecutionContext}; use substrate_primitives::{NativeOrEncoded, ExecutionContext};
...@@ -40,6 +41,7 @@ use std::collections::HashMap; ...@@ -40,6 +41,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use futures::{prelude::*, sync::mpsc}; use futures::{prelude::*, sync::mpsc};
use tokio::runtime::{Runtime, TaskExecutor}; use tokio::runtime::{Runtime, TaskExecutor};
use parity_codec::Encode;
use super::TestContext; use super::TestContext;
...@@ -142,14 +144,14 @@ struct TestNetwork { ...@@ -142,14 +144,14 @@ struct TestNetwork {
} }
impl NetworkService for TestNetwork { impl NetworkService for TestNetwork {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> { fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
let _ = self.gossip.send_listener.unbounded_send((topic, tx)); let _ = self.gossip.send_listener.unbounded_send((topic, tx));
rx GossipMessageStream::new(rx)
} }
fn gossip_message(&self, topic: Hash, message: Vec<u8>) { fn gossip_message(&self, topic: Hash, message: GossipMessage) {
let notification = TopicNotification { message, sender: None }; let notification = TopicNotification { message: message.encode(), sender: None };
let _ = self.gossip.send_message.unbounded_send((topic, notification)); let _ = self.gossip.send_message.unbounded_send((topic, notification));
} }
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called //! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
//! each time a validation session begins on a new chain head. //! each time a validation session begins on a new chain head.
use crate::gossip::GossipMessage;
use sr_primitives::traits::ProvideRuntimeApi; use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::{PeerId, Context as NetContext}; use substrate_network::{PeerId, Context as NetContext};
use substrate_network::consensus_gossip::{ use substrate_network::consensus_gossip::{
...@@ -43,7 +44,7 @@ use std::sync::Arc; ...@@ -43,7 +44,7 @@ use std::sync::Arc;
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use tokio::runtime::TaskExecutor; use tokio::runtime::TaskExecutor;
use parking_lot::Mutex; use parking_lot::Mutex;
use log::warn; use log::{debug, warn};
use crate::router::Router; use crate::router::Router;
use crate::gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData}; use crate::gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData};
...@@ -52,6 +53,8 @@ use super::PolkadotProtocol; ...@@ -52,6 +53,8 @@ use super::PolkadotProtocol;
pub use polkadot_validation::Incoming; pub use polkadot_validation::Incoming;
use parity_codec::{Encode, Decode};
/// An executor suitable for dispatching async consensus tasks. /// An executor suitable for dispatching async consensus tasks.
pub trait Executor { pub trait Executor {
fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F); fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F);
...@@ -87,13 +90,46 @@ impl GossipService for consensus_gossip::ConsensusGossip<Block> { ...@@ -87,13 +90,46 @@ impl GossipService for consensus_gossip::ConsensusGossip<Block> {
} }
} }
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: mpsc::UnboundedReceiver<TopicNotification>,
}
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: mpsc::UnboundedReceiver<TopicNotification>) -> Self {
Self {
topic_stream
}
}
}
impl Stream for GossipMessageStream {
type Item = (GossipMessage, Option<PeerId>);
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let msg = match futures::try_ready!(self.topic_stream.poll()) {
Some(msg) => msg,
None => return Ok(Async::Ready(None)),
};
debug!(target: "validation", "Processing statement for live validation session");
if let Some(gmsg) = GossipMessage::decode(&mut &msg.message[..]) {
return Ok(Async::Ready(Some((gmsg, msg.sender))))
}
}
}
}
/// Basic functionality that a network has to fulfill. /// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static { pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash. /// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification>; fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream;
/// Gossip a message on given topic. /// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: Vec<u8>); fn gossip_message(&self, topic: Hash, message: GossipMessage);
/// Execute a closure with the gossip service. /// Execute a closure with the gossip service.
fn with_gossip<F: Send + 'static>(&self, with: F) fn with_gossip<F: Send + 'static>(&self, with: F)
...@@ -105,7 +141,7 @@ pub trait NetworkService: Send + Sync + 'static { ...@@ -105,7 +141,7 @@ pub trait NetworkService: Send + Sync + 'static {
} }
impl NetworkService for super::NetworkService { impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> { fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = std::sync::mpsc::channel(); let (tx, rx) = std::sync::mpsc::channel();
super::NetworkService::with_gossip(self, move |gossip, _| { super::NetworkService::with_gossip(self, move |gossip, _| {
...@@ -113,17 +149,19 @@ impl NetworkService for super::NetworkService { ...@@ -113,17 +149,19 @@ impl NetworkService for super::NetworkService {
let _ = tx.send(inner_rx); let _ = tx.send(inner_rx);
}); });
match rx.recv() { let topic_stream = match rx.recv() {
Ok(rx) => rx, Ok(rx) => rx,
Err(_) => mpsc::unbounded().1, // return empty channel. Err(_) => mpsc::unbounded().1, // return empty channel.
} };
GossipMessageStream::new(topic_stream)
} }
fn gossip_message(&self, topic: Hash, message: Vec<u8>) { fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_consensus_message( self.gossip_consensus_message(
topic, topic,
POLKADOT_ENGINE_ID, POLKADOT_ENGINE_ID,
message, message.encode(),
GossipMessageRecipient::BroadcastToAll, GossipMessageRecipient::BroadcastToAll,
); );
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment