Commit 2d688b13 authored by Ashley's avatar Ashley Committed by Bastian Köcher
Browse files

Supercede 'Propagate Substrate#4284 to Polkadot' (#695)

* Propagate Substrate#4284 to Polkadot

* Fix tests

* Fixes

* Use hash part of fund id as child unique id.

* Add comma

* Switch branch

* run cargo update

* Update polkadot-master only

* Fix collator
parent b0535e68
Pipeline #72199 passed with stages
in 37 minutes and 19 seconds
This diff is collapsed.
......@@ -65,7 +65,7 @@ use polkadot_cli::{
Worker, IntoExit, ProvideRuntimeApi, AbstractService, CustomConfiguration, ParachainHost,
};
use polkadot_network::validation::{LeafWorkParams, ValidationNetwork};
use polkadot_network::{PolkadotNetworkService, PolkadotProtocol};
use polkadot_network::PolkadotProtocol;
use polkadot_runtime::RuntimeApi;
pub use polkadot_cli::VersionInfo;
......@@ -90,7 +90,7 @@ pub trait Network: Send + Sync {
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
}
impl<P, E, SP> Network for ValidationNetwork<P, E, PolkadotNetworkService, SP> where
impl<P, E, SP> Network for ValidationNetwork<P, E, SP> where
P: 'static + Send + Sync,
E: 'static + Send + Sync,
SP: 'static + Spawn + Clone + Send + Sync,
......@@ -231,7 +231,7 @@ pub async fn collate<R, P>(
/// Polkadot-api context.
struct ApiContext<P, E, SP> {
network: Arc<ValidationNetwork<P, E, PolkadotNetworkService, SP>>,
network: Arc<ValidationNetwork<P, E, SP>>,
parent_hash: Hash,
validators: Vec<ValidatorId>,
}
......@@ -347,12 +347,12 @@ impl<P, E> Worker for CollationNode<P, E> where
let message_validator = polkadot_network::gossip::register_validator(
network.clone(),
(is_known, client.clone()),
&spawner
);
let validation_network = Arc::new(ValidationNetwork::new(
network.clone(),
exit.clone(),
message_validator,
exit.clone(),
client.clone(),
spawner.clone(),
));
......
......@@ -14,6 +14,7 @@ polkadot-primitives = { path = "../primitives" }
polkadot-erasure-coding = { path = "../erasure-coding" }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sc-network-gossip = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
futures = "0.3.1"
......
......@@ -51,10 +51,10 @@
use sp_runtime::{generic::BlockId, traits::{ProvideRuntimeApi, BlakeTwo256, Hash as HashT}};
use sp_blockchain::Error as ClientError;
use sc_network::{config::Roles, PeerId, ReputationChange};
use sc_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
ValidatorContext, MessageIntent, ConsensusMessage,
use sc_network::{config::Roles, Context, PeerId, ReputationChange};
use sc_network_gossip::{
ValidationResult as GossipValidationResult,
ValidatorContext, MessageIntent,
};
use polkadot_validation::{SignedStatement};
use polkadot_primitives::{Block, Hash};
......@@ -68,11 +68,12 @@ use std::collections::HashMap;
use std::sync::Arc;
use arrayvec::ArrayVec;
use futures::prelude::*;
use parking_lot::RwLock;
use log::warn;
use super::PolkadotNetworkService;
use crate::router::attestation_topic;
use crate::{GossipMessageStream, NetworkService, PolkadotProtocol, router::attestation_topic};
use attestation::{View as AttestationView, PeerData as AttestationPeerData};
use message_routing::{View as MessageRoutingView};
......@@ -133,7 +134,7 @@ mod cost {
}
/// A gossip message.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub enum GossipMessage {
/// A packet sent to a neighbor but not relayed.
#[codec(index = "1")]
......@@ -151,15 +152,6 @@ pub enum GossipMessage {
ErasureChunk(ErasureChunkMessage),
}
impl GossipMessage {
fn to_consensus_message(&self) -> ConsensusMessage {
ConsensusMessage {
data: self.encode(),
engine_id: POLKADOT_ENGINE_ID,
}
}
}
impl From<NeighborPacket> for GossipMessage {
fn from(packet: NeighborPacket) -> Self {
GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))
......@@ -179,7 +171,7 @@ impl From<GossipParachainMessages> for GossipMessage {
}
/// A gossip message containing a statement.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct GossipStatement {
/// The block hash of the relay chain being referred to. In context, this should
/// be a leaf.
......@@ -200,7 +192,7 @@ impl GossipStatement {
/// A gossip message containing one erasure chunk of a candidate block.
/// For each chunk of block erasure encoding one of this messages is constructed.
#[derive(Encode, Decode, Clone, Debug)]
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
pub struct ErasureChunkMessage {
/// The chunk itself.
pub chunk: PrimitiveChunk,
......@@ -221,7 +213,7 @@ impl From<ErasureChunkMessage> for GossipMessage {
/// These are all the messages posted from one parachain to another during the
/// execution of a single parachain block. Since this parachain block may have been
/// included in many forks of the relay chain, there is no relay-chain leaf parameter.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct GossipParachainMessages {
/// The root of the message queue.
pub queue_root: Hash,
......@@ -241,7 +233,7 @@ impl GossipParachainMessages {
}
/// A versioned neighbor message.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub enum VersionedNeighborPacket {
#[codec(index = "1")]
V1(NeighborPacket),
......@@ -249,13 +241,13 @@ pub enum VersionedNeighborPacket {
/// Contains information on which chain heads the peer is
/// accepting messages for.
#[derive(Encode, Decode, Clone)]
#[derive(Encode, Decode, Clone, PartialEq)]
pub struct NeighborPacket {
chain_heads: Vec<Hash>,
}
/// whether a block is known.
#[derive(Clone, Copy)]
#[derive(Clone, Copy, PartialEq)]
pub enum Known {
/// The block is a known leaf.
Leaf,
......@@ -318,6 +310,7 @@ impl<F, P> ChainContext for (F, P) where
pub fn register_validator<C: ChainContext + 'static>(
service: Arc<PolkadotNetworkService>,
chain: C,
executor: &impl futures::task::Spawn,
) -> RegisteredMessageValidator
{
let s = service.clone();
......@@ -338,19 +331,26 @@ pub fn register_validator<C: ChainContext + 'static>(
});
let gossip_side = validator.clone();
service.with_gossip(|gossip, ctx|
gossip.register_validator(ctx, POLKADOT_ENGINE_ID, gossip_side)
let gossip_engine = sc_network_gossip::GossipEngine::new(
service.clone(),
executor,
POLKADOT_ENGINE_ID,
gossip_side,
);
RegisteredMessageValidator { inner: validator as _ }
RegisteredMessageValidator {
inner: validator as _,
service: Some(service),
gossip_engine: Some(gossip_engine),
}
}
#[derive(PartialEq)]
enum NewLeafAction {
// (who, message)
TargetedMessage(PeerId, ConsensusMessage),
TargetedMessage(PeerId, GossipMessage),
// (topic, message)
Multicast(Hash, ConsensusMessage),
Multicast(Hash, GossipMessage),
}
/// Actions to take after noting a new block-DAG leaf.
......@@ -365,15 +365,14 @@ impl NewLeafActions {
/// Perform the queued actions, feeding into gossip.
pub fn perform(
self,
gossip: &mut dyn crate::GossipService,
ctx: &mut dyn sc_network::Context<Block>,
gossip: &dyn crate::NetworkService,
) {
for action in self.actions {
match action {
NewLeafAction::TargetedMessage(who, message)
=> gossip.send_message(ctx, &who, message),
=> gossip.send_message(who, message),
NewLeafAction::Multicast(topic, message)
=> gossip.multicast(ctx, &topic, message),
=> gossip.gossip_message(topic, message),
}
}
}
......@@ -385,6 +384,10 @@ impl NewLeafActions {
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<dyn ChainContext>>,
// Note: this is always `Some` in real code and `None` in tests.
service: Option<Arc<PolkadotNetworkService>>,
// Note: this is always `Some` in real code and `None` in tests.
gossip_engine: Option<sc_network_gossip::GossipEngine<Block>>,
}
impl RegisteredMessageValidator {
......@@ -395,7 +398,11 @@ impl RegisteredMessageValidator {
) -> Self {
let validator = Arc::new(MessageValidator::new_test(chain, report_handle));
RegisteredMessageValidator { inner: validator as _ }
RegisteredMessageValidator {
inner: validator as _,
service: None,
gossip_engine: None,
}
}
pub fn register_availability_store(&mut self, availability_store: av_store::Store) {
......@@ -449,7 +456,7 @@ impl RegisteredMessageValidator {
let message = GossipMessage::from(GossipParachainMessages {
queue_root: *queue_root,
messages,
}).to_consensus_message();
});
actions.push(NewLeafAction::Multicast(*topic, message));
......@@ -463,6 +470,49 @@ impl RegisteredMessageValidator {
}
}
impl NetworkService for RegisteredMessageValidator {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.messages_for(topic)
} else {
log::error!("Called gossip_messages_for on a test engine");
futures::channel::mpsc::unbounded().1
};
GossipMessageStream::new(topic_stream.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.gossip_message(
topic,
message.encode(),
false,
);
} else {
log::error!("Called gossip_message on a test engine");
}
}
fn send_message(&self, who: PeerId, message: GossipMessage) {
if let Some(gossip_engine) = self.gossip_engine.as_ref() {
gossip_engine.send_message(vec![who], message.encode());
} else {
log::error!("Called send_message on a test engine");
}
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>)
{
if let Some(service) = self.service.as_ref() {
service.with_spec(with)
} else {
log::error!("Called with_spec on a test engine");
}
}
}
/// The data needed for validating gossip messages.
#[derive(Default)]
pub(crate) struct MessageValidationData {
......@@ -585,13 +635,13 @@ impl<C: ?Sized + ChainContext> Inner<C> {
}
}
fn multicast_neighbor_packet<F: FnMut(&PeerId, ConsensusMessage)>(
fn multicast_neighbor_packet<F: FnMut(&PeerId, GossipMessage)>(
&self,
mut send_neighbor_packet: F,
) {
let neighbor_packet = GossipMessage::from(NeighborPacket {
chain_heads: self.attestation_view.neighbor_info().collect(),
}).to_consensus_message();
});
for peer in self.peers.keys() {
send_neighbor_packet(peer, neighbor_packet.clone())
......@@ -628,7 +678,7 @@ impl<C: ChainContext + ?Sized> MessageValidator<C> {
}
}
impl<C: ChainContext + ?Sized> network_gossip::Validator<Block> for MessageValidator<C> {
impl<C: ChainContext + ?Sized> sc_network_gossip::Validator<Block> for MessageValidator<C> {
fn new_peer(&self, _context: &mut dyn ValidatorContext<Block>, who: &PeerId, _roles: Roles) {
let mut inner = self.inner.write();
inner.peers.insert(who.clone(), PeerData::default());
......@@ -746,7 +796,7 @@ impl<C: ChainContext + ?Sized> network_gossip::Validator<Block> for MessageValid
#[cfg(test)]
mod tests {
use super::*;
use sc_network::consensus_gossip::Validator as ValidatorT;
use sc_network_gossip::Validator as ValidatorT;
use std::sync::mpsc;
use parking_lot::Mutex;
use polkadot_primitives::parachain::{CandidateReceipt, HeadData};
......@@ -776,7 +826,7 @@ mod tests {
}
}
impl network_gossip::ValidatorContext<Block> for MockValidatorContext {
impl sc_network_gossip::ValidatorContext<Block> for MockValidatorContext {
fn broadcast_topic(&mut self, topic: Hash, force: bool) {
self.events.push(ContextEvent::BroadcastTopic(topic, force));
}
......@@ -792,12 +842,12 @@ mod tests {
}
impl NewLeafActions {
fn has_message(&self, who: PeerId, message: ConsensusMessage) -> bool {
fn has_message(&self, who: PeerId, message: GossipMessage) -> bool {
let x = NewLeafAction::TargetedMessage(who, message);
self.actions.iter().find(|&m| m == &x).is_some()
}
fn has_multicast(&self, topic: Hash, message: ConsensusMessage) -> bool {
fn has_multicast(&self, topic: Hash, message: GossipMessage) -> bool {
let x = NewLeafAction::Multicast(topic, message);
self.actions.iter().find(|&m| m == &x).is_some()
}
......@@ -1082,12 +1132,12 @@ mod tests {
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).to_consensus_message()));
})));
assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).to_consensus_message()));
})));
}
// ensure that we are allowed to multicast to a peer with same chain head,
......@@ -1154,12 +1204,12 @@ mod tests {
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).to_consensus_message()));
})));
assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).to_consensus_message()));
})));
}
// ensure that we are not allowed to multicast to either peer, as they
......@@ -1168,12 +1218,12 @@ mod tests {
let message = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).encode();
});
let mut allowed = validator.inner.message_allowed();
let intent = MessageIntent::Broadcast;
assert!(!allowed(&peer_a, intent, &root_a_topic, &message[..]));
assert!(!allowed(&peer_b, intent, &root_a_topic, &message[..]));
assert!(!allowed(&peer_a, intent, &root_a_topic, &message.encode()));
assert!(!allowed(&peer_b, intent, &root_a_topic, &message.encode()));
}
// peer A gets updated to the chain head. now we'll attempt to broadcast
......@@ -1259,17 +1309,17 @@ mod tests {
let queue_messages = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: root_a_messages.clone(),
}).to_consensus_message();
});
let not_queue_messages = GossipMessage::from(GossipParachainMessages {
queue_root: root_a,
messages: not_root_a_messages.clone(),
}).encode();
});
let queue_messages_wrong_root = GossipMessage::from(GossipParachainMessages {
queue_root: not_root_a,
messages: root_a_messages.clone(),
}).encode();
});
// ensure that we attempt to multicast all relevant queues after noting a leaf.
{
......@@ -1281,7 +1331,7 @@ mod tests {
assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket {
chain_heads: vec![hash_a],
}).to_consensus_message()));
})));
// we don't know this queue! no broadcast :(
assert!(!actions.has_multicast(root_a_topic, queue_messages.clone()));
......@@ -1292,7 +1342,7 @@ mod tests {
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&queue_messages_wrong_root[..],
&queue_messages_wrong_root.encode(),
);
match res {
......@@ -1308,7 +1358,7 @@ mod tests {
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&not_queue_messages[..],
&not_queue_messages.encode(),
);
match res {
......@@ -1324,7 +1374,7 @@ mod tests {
let res = validator.inner.validate(
&mut validator_context,
&peer_a,
&queue_messages.data[..],
&queue_messages.encode(),
);
match res {
......@@ -1333,7 +1383,7 @@ mod tests {
}
assert_eq!(validator_context.events, vec![
ContextEvent::BroadcastMessage(root_a_topic, queue_messages.data.clone(), false),
ContextEvent::BroadcastMessage(root_a_topic, queue_messages.encode(), false),
]);
}
}
......
......@@ -30,7 +30,7 @@
//! a `Candidate` we are aware of. Otherwise, it is possible we could be forced to
//! consider an infinite amount of attestations produced by a misbehaving validator.
use sc_network::consensus_gossip::{ValidationResult as GossipValidationResult};
use sc_network_gossip::{ValidationResult as GossipValidationResult};
use sc_network::ReputationChange;
use polkadot_validation::GenericStatement;
use polkadot_primitives::Hash;
......
......@@ -26,7 +26,7 @@ pub mod validation;
pub mod gossip;
use codec::{Decode, Encode};
use futures::channel::{oneshot, mpsc};
use futures::channel::oneshot;
use futures::prelude::*;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
......@@ -37,9 +37,7 @@ use sc_network::{
PeerId, RequestId, Context, StatusMessage as GenericFullStatus,
specialization::NetworkSpecialization as Specialization,
};
use sc_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use sc_network_gossip::TopicNotification;
use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey};
use self::collator_pool::{CollatorPool, Role, Action};
use self::local_collations::LocalCollations;
......@@ -49,7 +47,7 @@ use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::task::{Context as PollContext, Poll};
use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage, ErasureChunkMessage};
use crate::gossip::{GossipMessage, ErasureChunkMessage, RegisteredMessageValidator};
#[cfg(test)]
mod tests;
......@@ -90,13 +88,12 @@ pub trait NetworkService: Send + Sync + 'static {
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: GossipMessage);
/// Execute a closure with the gossip service.
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn Context<Block>);
/// Send a message to a specific peer we're connected to.
fn send_message(&self, who: PeerId, message: GossipMessage);
/// Execute a closure with the polkadot protocol.
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>);
where Self: Sized, F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>);
}
/// This is a newtype that implements a [`ProvideGossipMessages`] shim trait.
......@@ -106,11 +103,10 @@ pub trait NetworkService: Send + Sync + 'static {
///
/// [`NetworkService`]: ./trait.NetworkService.html
/// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html
pub struct AvailabilityNetworkShim<T>(pub std::sync::Arc<T>);
#[derive(Clone)]
pub struct AvailabilityNetworkShim(pub RegisteredMessageValidator);
impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
where T: NetworkService
{
impl av_store::ProvideGossipMessages for AvailabilityNetworkShim {
fn gossip_messages_for(&self, topic: Hash)
-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
{
......@@ -145,67 +141,6 @@ impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
}
}
impl<T> Clone for AvailabilityNetworkShim<T> {
fn clone(&self) -> Self {
AvailabilityNetworkShim(self.0.clone())
}
}
impl NetworkService for PolkadotNetworkService {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = std::sync::mpsc::channel();
PolkadotNetworkService::with_gossip(self, move |gossip, _| {
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
let topic_stream = match rx.recv() {
Ok(rx) => rx,
Err(_) => mpsc::unbounded().1, // return empty channel.
};
GossipMessageStream::new(topic_stream.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_consensus_message(
topic,
POLKADOT_ENGINE_ID,
message.encode(),
GossipMessageRecipient::BroadcastToAll,
);
}
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut dyn GossipService, &mut dyn Context<Block>)
{
PolkadotNetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>)
{
PolkadotNetworkService::with_spec(self, with)
}
}
/// A gossip network subservice.
pub trait GossipService {
fn send_message(&mut self, ctx: &mut dyn Context<Block>, who: &PeerId, message: ConsensusMessage);
fn multicast(&mut self, ctx: &mut dyn Context<Block>, topic: &Hash, message: ConsensusMessage);
}
impl GossipService for consensus_gossip::ConsensusGossip<Block> {
fn send_message(&mut self, ctx: &mut dyn Context<Block>, who: &PeerId, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
}
fn multicast(&mut self, ctx: &mut dyn Context<Block>, topic: &Hash, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::multicast(self, ctx, *topic, message, false)
}
}
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>,
......
......@@ -72,18 +72,18 @@ pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
}
/// Table routing implementation.
pub struct Router<P, E, N: NetworkService, T> {
pub struct Router<P, E, T> {
table: Arc<SharedTable>,
attestation_topic: Hash,
fetcher: LeafWorkDataFetcher<P, E, N, T>,
fetcher: LeafWorkDataFetcher<P, E, T>,
deferred_statements: Arc<Mutex<DeferredStatements>>,
message_validator: RegisteredMessageValidator,
}
impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
impl<P, E, T> Router<P, E, T> {
pub(crate) fn new(
table: Arc<SharedTable>,
fetcher: LeafWorkDataFetcher<P, E, N, T>,
fetcher: LeafWorkDataFetcher<P, E, T>,
message_validator: RegisteredMessageValidator,
) -> Self {
let parent_hash = fetcher.parent_hash();
......@@ -103,19 +103,19 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {