From 164943b9614d2406ed61abf4178074ef06d587cb Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Fri, 17 May 2019 14:30:10 -0400
Subject: [PATCH] strip out all ICMP network code and begin gossip refactor for
 attestations (#256)

* strip out all ICMP code and begin gossip refactor

* validate incoming statements

* message_allowed logic

* compiles

* do reporting and neighbor packet validation

* tests compile

* propagate gossip messages

* test message_allowed

* some more tests

* address grumbles
---
 polkadot/Cargo.lock                      |   7 -
 polkadot/collator/src/lib.rs             |  19 +-
 polkadot/network/Cargo.toml              |   1 -
 polkadot/network/src/gossip.rs           | 713 +++++++++++++++++++++--
 polkadot/network/src/lib.rs              |   2 -
 polkadot/network/src/router.rs           |  53 +-
 polkadot/network/src/tests/mod.rs        |   6 +-
 polkadot/network/src/tests/validation.rs | 103 +---
 polkadot/network/src/validation.rs       | 311 ++--------
 polkadot/service/src/lib.rs              |   2 +-
 polkadot/statement-table/src/generic.rs  |   2 +-
 polkadot/validation/src/lib.rs           |   8 +-
 12 files changed, 737 insertions(+), 490 deletions(-)

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index c2b8ca8cf34..fce53ea88cd 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -2314,7 +2314,6 @@ dependencies = [
  "polkadot-availability-store 0.1.0",
  "polkadot-primitives 0.1.0",
  "polkadot-validation 0.1.0",
- "slice-group-by 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "sr-primitives 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
  "substrate-client 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
  "substrate-keyring 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
@@ -3017,11 +3016,6 @@ name = "slab"
 version = "0.4.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
-[[package]]
-name = "slice-group-by"
-version = "0.2.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-
 [[package]]
 name = "slog"
 version = "2.4.1"
@@ -5233,7 +5227,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 "checksum sha3 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "34a5e54083ce2b934bf059fdf38e7330a154177e029ab6c4e18638f2f624053a"
 "checksum shell32-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9ee04b46101f57121c9da2b151988283b6beb79b34f5bb29a58ee48cb695122c"
 "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
-"checksum slice-group-by 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "049599674ed27c9b78b93265482068999c0fc71116e186ea4a408e9fc47723b0"
 "checksum slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e1a2eec401952cd7b12a84ea120e2d57281329940c3f93c2bf04f462539508e"
 "checksum slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e544d16c6b230d84c866662fe55e31aacfca6ae71e6fc49ae9a311cb379bfc2f"
 "checksum slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ddc0d2aff1f8f325ef660d9a0eb6e6dcd20b30b3f581a5897f58bf42d061c37a"
diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs
index 4afb9c6c9e2..2f5e63a293a 100644
--- a/polkadot/collator/src/lib.rs
+++ b/polkadot/collator/src/lib.rs
@@ -136,7 +136,7 @@ pub trait RelayChainContext {
 	type FutureEgress: IntoFuture<Item=ConsolidatedIngress, Error=Self::Error>;
 
 	/// Get un-routed egress queues from a parachain to the local parachain.
-	fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress;
+	fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress;
 }
 
 /// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
@@ -202,20 +202,17 @@ impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
 	type Error = String;
 	type FutureEgress = Box<Future<Item=ConsolidatedIngress, Error=String> + Send>;
 
-	fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress {
-		let session = self.network.instantiate_session(SessionParams {
+	fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
+		// TODO: https://github.com/paritytech/polkadot/issues/253
+		//
+		// Fetch ingress and accumulate all unrounted egress
+		let _session = self.network.instantiate_session(SessionParams {
 			local_session_key: None,
 			parent_hash: self.parent_hash,
 			authorities: self.authorities.clone(),
 		}).map_err(|e| format!("unable to instantiate validation session: {:?}", e));
 
-		let fetch_incoming = session
-			.and_then(move |session| session.fetch_incoming(id).map_err(|e|
-				format!("unable to fetch incoming data: {:?}", e)
-			))
-			.map(ConsolidatedIngress);
-
-		Box::new(fetch_incoming)
+		Box::new(future::ok(ConsolidatedIngress(Vec::new())))
 	}
 }
 
@@ -266,7 +263,7 @@ impl<P, E> Worker for CollationNode<P, E> where
 		};
 
 		let message_validator = polkadot_network::gossip::register_validator(
-			&*network,
+			network.clone(),
 			move |block_hash: &Hash| {
 				use client::BlockStatus;
 				use polkadot_network::gossip::Known;
diff --git a/polkadot/network/Cargo.toml b/polkadot/network/Cargo.toml
index 2a8b58154ef..32f68a7f937 100644
--- a/polkadot/network/Cargo.toml
+++ b/polkadot/network/Cargo.toml
@@ -18,7 +18,6 @@ sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "pol
 futures = "0.1"
 tokio = "0.1.7"
 log = "0.4"
-slice-group-by = "0.2.2"
 exit-future = "0.1.4"
 
 [dev-dependencies]
diff --git a/polkadot/network/src/gossip.rs b/polkadot/network/src/gossip.rs
index 90ca08bd014..35b978ac5ae 100644
--- a/polkadot/network/src/gossip.rs
+++ b/polkadot/network/src/gossip.rs
@@ -16,35 +16,90 @@
 
 //! Gossip messages and the message validator
 
-use substrate_network::PeerId;
+use substrate_network::{config::Roles, PeerId};
 use substrate_network::consensus_gossip::{
 	self as network_gossip, ValidationResult as GossipValidationResult,
-	ValidatorContext,
+	ValidatorContext, MessageIntent, ConsensusMessage,
 };
-use polkadot_validation::SignedStatement;
+use polkadot_validation::{GenericStatement, SignedStatement};
 use polkadot_primitives::{Block, Hash, SessionKey, parachain::ValidatorIndex};
-use codec::Decode;
+use codec::{Decode, Encode};
 
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
 use parking_lot::RwLock;
 
 use super::NetworkService;
+use router::attestation_topic;
 
 /// The engine ID of the polkadot attestation system.
 pub const POLKADOT_ENGINE_ID: sr_primitives::ConsensusEngineId = [b'd', b'o', b't', b'1'];
 
+// arbitrary; in practice this should not be more than 2.
+const MAX_CHAIN_HEADS: usize = 5;
+
+mod benefit {
+	/// When a peer sends us a previously-unknown candidate statement.
+	pub const NEW_CANDIDATE: i32 = 100;
+	/// When a peer sends us a previously-unknown attestation.
+	pub const NEW_ATTESTATION: i32 = 50;
+}
+
+mod cost {
+	/// A peer sent us an attestation and we don't know the candidate.
+	pub const ATTESTATION_NO_CANDIDATE: i32 = -100;
+	/// A peer sent us a statement we consider in the future.
+	pub const FUTURE_MESSAGE: i32 = -100;
+	/// A peer sent us a statement from the past.
+	pub const PAST_MESSAGE: i32 = -30;
+	/// A peer sent us a malformed message.
+	pub const MALFORMED_MESSAGE: i32 = -500;
+	/// A peer sent us a wrongly signed message.
+	pub const BAD_SIGNATURE: i32 = -500;
+	/// A peer sent us a bad neighbor packet.
+	pub const BAD_NEIGHBOR_PACKET: i32 = -300;
+}
+
 /// A gossip message.
 #[derive(Encode, Decode, Clone)]
-pub(crate) struct GossipMessage {
+pub(crate) enum GossipMessage {
+	/// A packet sent to a neighbor but not relayed.
+	#[codec(index = "1")]
+	Neighbor(VersionedNeighborPacket),
+	/// An attestation-statement about the candidate.
+	/// Non-candidate statements should only be sent to peers who are aware of the candidate.
+	#[codec(index = "2")]
+	Statement(GossipStatement),
+	// TODO: https://github.com/paritytech/polkadot/issues/253
+	// erasure-coded chunks.
+}
+
+/// A gossip message containing a statement.
+#[derive(Encode, Decode, Clone)]
+pub(crate) struct GossipStatement {
 	/// The relay chain parent hash.
 	pub(crate) relay_parent: Hash,
 	/// The signed statement being gossipped.
-	pub(crate) statement: SignedStatement,
+	pub(crate) signed_statement: SignedStatement,
+}
+
+/// A versioned neighbor message.
+#[derive(Encode, Decode, Clone)]
+pub enum VersionedNeighborPacket {
+	#[codec(index = "1")]
+	V1(NeighborPacket),
+}
+
+/// Contains information on which chain heads the peer is
+/// accepting messages for.
+#[derive(Encode, Decode, Clone)]
+pub struct NeighborPacket {
+	chain_heads: Vec<Hash>,
 }
 
 /// whether a block is known.
+#[derive(Clone, Copy)]
 pub enum Known {
 	/// The block is a known leaf.
 	Leaf,
@@ -73,12 +128,20 @@ impl<F> KnownOracle for F where F: Fn(&Hash) -> Option<Known> + Send + Sync {
 // that we've actually done the registration, this should be the only way
 // to construct it outside of tests.
 pub fn register_validator<O: KnownOracle + 'static>(
-	service: &NetworkService,
+	service: Arc<NetworkService>,
 	oracle: O,
 ) -> RegisteredMessageValidator {
+	let s = service.clone();
+	let report_handle = Box::new(move |peer: &PeerId, cost_benefit| {
+		s.report_peer(peer.clone(), cost_benefit);
+	});
 	let validator = Arc::new(MessageValidator {
-		live_session: RwLock::new(HashMap::new()),
-		oracle,
+		report_handle,
+		inner: RwLock::new(Inner {
+			peers: HashMap::new(),
+			our_view: Default::default(),
+			oracle,
+		})
 	});
 
 	let gossip_side = validator.clone();
@@ -99,28 +162,43 @@ pub struct RegisteredMessageValidator {
 
 impl RegisteredMessageValidator {
 	#[cfg(test)]
-	pub(crate) fn new_test<O: KnownOracle + 'static>(oracle: O) -> Self {
-		let validator = Arc::new(MessageValidator {
-			live_session: RwLock::new(HashMap::new()),
-			oracle,
-		});
+	pub(crate) fn new_test<O: KnownOracle + 'static>(
+		oracle: O,
+		report_handle: Box<Fn(&PeerId, i32) + Send + Sync>,
+	) -> Self {
+		let validator = Arc::new(MessageValidator::new_test(oracle, report_handle));
 
 		RegisteredMessageValidator { inner: validator as _ }
 	}
 
 	/// Note a live attestation session. This must be removed later with
 	/// `remove_session`.
-	pub(crate) fn note_session(&self, relay_parent: Hash, validation: MessageValidationData) {
-		self.inner.live_session.write().insert(relay_parent, validation);
-	}
+	pub(crate) fn note_session<F: FnMut(&PeerId, ConsensusMessage)>(
+		&self,
+		relay_parent: Hash,
+		validation: MessageValidationData,
+		send_neighbor_packet: F,
+	) {
+		// add an entry in our_view
+		// prune any entries from our_view which are no longer leaves
+		let mut inner = self.inner.inner.write();
+		inner.our_view.add_session(relay_parent, validation);
+		{
+
+			let &mut Inner { ref oracle, ref mut our_view, .. } = &mut *inner;
+			our_view.prune_old_sessions(|parent| match oracle.is_known(parent) {
+				Some(Known::Leaf) => true,
+				_ => false,
+			});
+		}
 
-	/// Remove a live attestation session when it is no longer live.
-	pub(crate) fn remove_session(&self, relay_parent: &Hash) {
-		self.inner.live_session.write().remove(relay_parent);
+		// send neighbor packets to peers
+		inner.multicast_neighbor_packet(send_neighbor_packet);
 	}
 }
 
 /// The data needed for validating gossip.
+#[derive(Default)]
 pub(crate) struct MessageValidationData {
 	/// The authorities at a block.
 	pub(crate) authorities: Vec<SessionKey>,
@@ -129,57 +207,594 @@ pub(crate) struct MessageValidationData {
 }
 
 impl MessageValidationData {
-	fn check_statement(&self, relay_parent: &Hash, statement: &SignedStatement) -> bool {
+	fn check_statement(&self, relay_parent: &Hash, statement: &SignedStatement) -> Result<(), ()> {
 		let sender = match self.index_mapping.get(&statement.sender) {
 			Some(val) => val,
-			None => return false,
+			None => return Err(()),
 		};
 
-		self.authorities.contains(&sender) &&
+		let good = self.authorities.contains(&sender) &&
 			::polkadot_validation::check_statement(
 				&statement.statement,
 				&statement.signature,
 				sender.clone(),
 				relay_parent,
-			)
+			);
+
+		if good {
+			Ok(())
+		} else {
+			Err(())
+		}
+	}
+}
+
+// knowledge about attestations on a single parent-hash.
+#[derive(Default)]
+struct Knowledge {
+	candidates: HashSet<Hash>,
+}
+
+impl Knowledge {
+	// whether the peer is aware of a candidate with given hash.
+	fn is_aware_of(&self, candidate_hash: &Hash) -> bool {
+		self.candidates.contains(candidate_hash)
+	}
+
+	// note that the peer is aware of a candidate with given hash.
+	fn note_aware(&mut self, candidate_hash: Hash) {
+		self.candidates.insert(candidate_hash);
+	}
+}
+
+struct PeerData {
+	live: HashMap<Hash, Knowledge>,
+}
+
+impl PeerData {
+	fn knowledge_at_mut(&mut self, parent_hash: &Hash) -> Option<&mut Knowledge> {
+		self.live.get_mut(parent_hash)
+	}
+}
+
+struct OurView {
+	live_sessions: Vec<(Hash, SessionView)>,
+	topics: HashMap<Hash, Hash>, // maps topic hashes to block hashes.
+}
+
+impl Default for OurView {
+	fn default() -> Self {
+		OurView {
+			live_sessions: Vec::with_capacity(MAX_CHAIN_HEADS),
+			topics: Default::default(),
+		}
+	}
+}
+
+impl OurView {
+	fn session_view(&self, relay_parent: &Hash) -> Option<&SessionView> {
+		self.live_sessions.iter()
+			.find_map(|&(ref h, ref sesh)| if h == relay_parent { Some(sesh) } else { None } )
+	}
+
+	fn session_view_mut(&mut self, relay_parent: &Hash) -> Option<&mut SessionView> {
+		self.live_sessions.iter_mut()
+			.find_map(|&mut (ref h, ref mut sesh)| if h == relay_parent { Some(sesh) } else { None } )
+	}
+
+	fn add_session(&mut self, relay_parent: Hash, validation_data: MessageValidationData) {
+		self.live_sessions.push((
+			relay_parent,
+			SessionView {
+				validation_data,
+				knowledge: Default::default(),
+			},
+		));
+		self.topics.insert(attestation_topic(relay_parent), relay_parent);
+	}
+
+	fn prune_old_sessions<F: Fn(&Hash) -> bool>(&mut self, is_leaf: F) {
+		let live_sessions = &mut self.live_sessions;
+		live_sessions.retain(|&(ref relay_parent, _)| is_leaf(relay_parent));
+		self.topics.retain(|_, v| live_sessions.iter().find(|(p, _)| p == v).is_some());
+	}
+
+	fn knows_topic(&self, topic: &Hash) -> bool {
+		self.topics.contains_key(topic)
+	}
+
+	fn topic_block(&self, topic: &Hash) -> Option<&Hash> {
+		self.topics.get(topic)
+	}
+
+	fn neighbor_info(&self) -> Vec<Hash> {
+		self.live_sessions.iter().take(MAX_CHAIN_HEADS).map(|(p, _)| p.clone()).collect()
+	}
+}
+
+struct SessionView {
+	validation_data: MessageValidationData,
+	knowledge: Knowledge,
+}
+
+struct Inner<O: ?Sized> {
+	peers: HashMap<PeerId, PeerData>,
+	our_view: OurView,
+	oracle: O,
+}
+
+impl<O: ?Sized + KnownOracle> Inner<O> {
+	fn validate_statement(&mut self, message: GossipStatement)
+		-> (GossipValidationResult<Hash>, i32)
+	{
+		// message must reference one of our chain heads and one
+		// if message is not a `Candidate` we should have the candidate available
+		// in `our_view`.
+		match self.our_view.session_view(&message.relay_parent) {
+			None => {
+				let cost = match self.oracle.is_known(&message.relay_parent) {
+					Some(Known::Leaf) => {
+						warn!(
+							target: "network",
+							"Leaf block {} not considered live for attestation",
+							message.relay_parent,
+						);
+
+						0
+					}
+					Some(Known::Old) => cost::PAST_MESSAGE,
+					_ => cost::FUTURE_MESSAGE,
+				};
+
+				(GossipValidationResult::Discard, cost)
+			}
+			Some(view) => {
+				// first check that we are capable of receiving this message
+				// in a DoS-proof manner.
+				let benefit = match message.signed_statement.statement {
+					GenericStatement::Candidate(_) => benefit::NEW_CANDIDATE,
+					GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => {
+						if !view.knowledge.is_aware_of(h) {
+							let cost = cost::ATTESTATION_NO_CANDIDATE;
+							return (GossipValidationResult::Discard, cost);
+						}
+
+						benefit::NEW_ATTESTATION
+					}
+				};
+
+				// validate signature.
+				let res = view.validation_data.check_statement(
+					&message.relay_parent,
+					&message.signed_statement,
+				);
+
+				match res {
+					Ok(()) => {
+						let topic = attestation_topic(message.relay_parent);
+						(GossipValidationResult::ProcessAndKeep(topic), benefit)
+					}
+					Err(()) => (GossipValidationResult::Discard, cost::BAD_SIGNATURE),
+				}
+			}
+		}
+	}
+
+	fn validate_neighbor_packet(&mut self, sender: &PeerId, packet: NeighborPacket)
+		-> (GossipValidationResult<Hash>, i32, Vec<Hash>)
+	{
+		let chain_heads = packet.chain_heads;
+		if chain_heads.len() > MAX_CHAIN_HEADS {
+			(GossipValidationResult::Discard, cost::BAD_NEIGHBOR_PACKET, Vec::new())
+		} else {
+			let mut new_topics = Vec::new();
+			if let Some(ref mut peer) = self.peers.get_mut(sender) {
+				peer.live.retain(|k, _| chain_heads.contains(k));
+				for head in chain_heads {
+					peer.live.entry(head).or_insert_with(|| {
+						new_topics.push(attestation_topic(head));
+						Default::default()
+					});
+				}
+			}
+			(GossipValidationResult::Discard, 0, new_topics)
+		}
+	}
+
+	fn multicast_neighbor_packet<F: FnMut(&PeerId, ConsensusMessage)>(
+		&self,
+		mut send_neighbor_packet: F,
+	) {
+		let neighbor_packet = GossipMessage::Neighbor(VersionedNeighborPacket::V1(NeighborPacket {
+			chain_heads: self.our_view.neighbor_info()
+		}));
+
+		let message = ConsensusMessage {
+			data: neighbor_packet.encode(),
+			engine_id: POLKADOT_ENGINE_ID,
+		};
+
+		for peer in self.peers.keys() {
+			send_neighbor_packet(peer, message.clone())
+		}
 	}
 }
 
 /// An unregistered message validator. Register this with `register_validator`.
 pub struct MessageValidator<O: ?Sized> {
-	live_session: RwLock<HashMap<Hash, MessageValidationData>>,
-	oracle: O,
+	report_handle: Box<Fn(&PeerId, i32) + Send + Sync>,
+	inner: RwLock<Inner<O>>,
+}
+
+impl<O: KnownOracle + ?Sized> MessageValidator<O> {
+	#[cfg(test)]
+	fn new_test(
+		oracle: O,
+		report_handle: Box<Fn(&PeerId, i32) + Send + Sync>,
+	) -> Self where O: Sized{
+		MessageValidator {
+			report_handle,
+			inner: RwLock::new(Inner {
+				peers: HashMap::new(),
+				our_view: Default::default(),
+				oracle,
+			})
+		}
+	}
+
+	fn report(&self, who: &PeerId, cost_benefit: i32) {
+		(self.report_handle)(who, cost_benefit)
+	}
 }
 
 impl<O: KnownOracle + ?Sized> network_gossip::Validator<Block> for MessageValidator<O> {
-	fn validate(&self, context: &mut ValidatorContext<Block>, _sender: &PeerId, mut data: &[u8])
+	fn new_peer(&self, _context: &mut ValidatorContext<Block>, who: &PeerId, _roles: Roles) {
+		let mut inner = self.inner.write();
+		inner.peers.insert(who.clone(), PeerData {
+			live: HashMap::new(),
+		});
+	}
+
+	fn peer_disconnected(&self, _context: &mut ValidatorContext<Block>, who: &PeerId) {
+		let mut inner = self.inner.write();
+		inner.peers.remove(who);
+	}
+
+	fn validate(&self, context: &mut ValidatorContext<Block>, sender: &PeerId, mut data: &[u8])
 		-> GossipValidationResult<Hash>
 	{
-		let orig_data = data;
-		match GossipMessage::decode(&mut data) {
-			Some(GossipMessage { relay_parent, statement }) => {
-				let live = self.live_session.read();
-				let topic = || ::router::attestation_topic(relay_parent.clone());
-				if let Some(validation) = live.get(&relay_parent) {
-					if validation.check_statement(&relay_parent, &statement) {
-						// repropagate
-						let topic = topic();
-						context.broadcast_message(topic, orig_data.to_owned(), false);
-						GossipValidationResult::ProcessAndKeep(topic)
-					} else {
-						GossipValidationResult::Discard
-					}
-				} else {
-					match self.oracle.is_known(&relay_parent) {
-						None | Some(Known::Leaf) => GossipValidationResult::ProcessAndKeep(topic()),
-						Some(Known::Old) | Some(Known::Bad) => GossipValidationResult::Discard,
+		let (res, cost_benefit) = match GossipMessage::decode(&mut data) {
+			None => (GossipValidationResult::Discard, cost::MALFORMED_MESSAGE),
+			Some(GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet))) => {
+				let (res, cb, topics) = self.inner.write().validate_neighbor_packet(sender, packet);
+				for new_topic in topics {
+					context.send_topic(sender, new_topic, false);
+				}
+				(res, cb)
+			}
+			Some(GossipMessage::Statement(statement)) => {
+				let (res, cb) = self.inner.write().validate_statement(statement);
+				if let GossipValidationResult::ProcessAndKeep(ref topic) = res {
+					context.broadcast_message(topic.clone(), data.to_vec(), false);
+				}
+				(res, cb)
+			}
+		};
+
+		self.report(sender, cost_benefit);
+		res
+	}
+
+	fn message_expired<'a>(&'a self) -> Box<FnMut(Hash, &[u8]) -> bool + 'a> {
+		let inner = self.inner.read();
+
+		Box::new(move |topic, _data| {
+			// check that topic is one of our live sessions. everything else is expired
+			!inner.our_view.knows_topic(&topic)
+		})
+	}
+
+	fn message_allowed<'a>(&'a self) -> Box<FnMut(&PeerId, MessageIntent, &Hash, &[u8]) -> bool + 'a> {
+		let mut inner = self.inner.write();
+		Box::new(move |who, intent, topic, data| {
+			let &mut Inner { ref mut peers, ref mut our_view, .. } = &mut *inner;
+
+			match intent {
+				MessageIntent::PeriodicRebroadcast => return false,
+				_ => {},
+			}
+
+			let relay_parent = match our_view.topic_block(topic) {
+				None => return false,
+				Some(hash) => hash.clone(),
+			};
+
+			// check that topic is one of our peers' live sessions.
+			let peer_knowledge = match peers.get_mut(who)
+				.and_then(|p| p.knowledge_at_mut(&relay_parent))
+			{
+				Some(p) => p,
+				None => return false,
+			};
+
+			match GossipMessage::decode(&mut &data[..]) {
+				Some(GossipMessage::Statement(statement)) => {
+					let signed = statement.signed_statement;
+
+					match signed.statement {
+						GenericStatement::Valid(ref h) | GenericStatement::Invalid(ref h) => {
+							// `valid` and `invalid` statements can only be propagated after
+							// a candidate message is known by that peer.
+							if !peer_knowledge.is_aware_of(h) {
+								return false;
+							}
+						}
+						GenericStatement::Candidate(ref c) => {
+							// if we are sending a `Candidate` message we should make sure that
+							// our_view and their_view reflects that we know about the candidate.
+							let hash = c.hash();
+							peer_knowledge.note_aware(hash);
+							if let Some(our_view) = our_view.session_view_mut(&relay_parent) {
+								our_view.knowledge.note_aware(hash);
+							}
+						}
 					}
 				}
+				_ => return false,
 			}
-			None => {
-				debug!(target: "validation", "Error decoding gossip message");
-				GossipValidationResult::Discard
+
+			true
+		})
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use substrate_network::consensus_gossip::Validator as ValidatorT;
+	use std::sync::mpsc;
+	use parking_lot::Mutex;
+	use polkadot_primitives::parachain::{CandidateReceipt, HeadData};
+	use substrate_primitives::crypto::UncheckedInto;
+	use substrate_primitives::ed25519::Signature as Ed25519Signature;
+
+	#[derive(PartialEq, Clone, Debug)]
+	enum ContextEvent {
+		BroadcastTopic(Hash, bool),
+		BroadcastMessage(Hash, Vec<u8>, bool),
+		SendMessage(PeerId, Vec<u8>),
+		SendTopic(PeerId, Hash, bool),
+	}
+
+	#[derive(Default)]
+	struct MockValidatorContext {
+		events: Vec<ContextEvent>,
+	}
+
+	impl MockValidatorContext {
+		fn clear(&mut self) {
+			self.events.clear()
+		}
+	}
+
+	impl network_gossip::ValidatorContext<Block> for MockValidatorContext {
+		fn broadcast_topic(&mut self, topic: Hash, force: bool) {
+			self.events.push(ContextEvent::BroadcastTopic(topic, force));
+		}
+		fn broadcast_message(&mut self, topic: Hash, message: Vec<u8>, force: bool) {
+			self.events.push(ContextEvent::BroadcastMessage(topic, message, force));
+		}
+		fn send_message(&mut self, who: &PeerId, message: Vec<u8>) {
+			self.events.push(ContextEvent::SendMessage(who.clone(), message));
+		}
+		fn send_topic(&mut self, who: &PeerId, topic: Hash, force: bool) {
+			self.events.push(ContextEvent::SendTopic(who.clone(), topic, force));
+		}
+	}
+
+	#[test]
+	fn message_allowed() {
+		let (tx, _rx) = mpsc::channel();
+		let tx = Mutex::new(tx);
+		let known_map = HashMap::<Hash, Known>::new();
+		let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
+		let validator = MessageValidator::new_test(
+			move |hash: &Hash| known_map.get(hash).map(|x| x.clone()),
+			report_handle,
+		);
+
+		let peer_a = PeerId::random();
+
+		let mut validator_context = MockValidatorContext::default();
+		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
+		assert!(validator_context.events.is_empty());
+		validator_context.clear();
+
+		let hash_a = [1u8; 32].into();
+		let hash_b = [2u8; 32].into();
+		let hash_c = [3u8; 32].into();
+
+		let message = GossipMessage::Neighbor(VersionedNeighborPacket::V1(NeighborPacket {
+				chain_heads: vec![hash_a, hash_b],
+			})).encode();
+		let res = validator.validate(
+			&mut validator_context,
+			&peer_a,
+			&message[..],
+		);
+
+		match res {
+			GossipValidationResult::Discard => {},
+			_ => panic!("wrong result"),
+		}
+		assert_eq!(
+			validator_context.events,
+			vec![
+				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
+				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
+			],
+		);
+
+		validator_context.clear();
+
+		let candidate_receipt = CandidateReceipt {
+			parachain_index: 5.into(),
+			collator: [255; 32].unchecked_into(),
+			head_data: HeadData(vec![9, 9, 9]),
+			signature: Default::default(),
+			balance_uploads: Vec::new(),
+			egress_queue_roots: Vec::new(),
+			fees: 1_000_000,
+			block_data_hash: [20u8; 32].into(),
+		};
+
+		let statement = GossipMessage::Statement(GossipStatement {
+			relay_parent: hash_a,
+			signed_statement: SignedStatement {
+				statement: GenericStatement::Candidate(candidate_receipt),
+				signature: Ed25519Signature([255u8; 64]),
+				sender: 1,
 			}
+		});
+		let encoded = statement.encode();
+
+		let topic_a = attestation_topic(hash_a);
+		let topic_b = attestation_topic(hash_b);
+		let topic_c = attestation_topic(hash_c);
+
+		// topic_a is in all 3 views -> succeed
+		validator.inner.write().our_view.add_session(hash_a, MessageValidationData::default());
+		// topic_b is in the neighbor's view but not ours -> fail
+		// topic_c is not in either -> fail
+
+		{
+			let mut message_allowed = validator.message_allowed();
+			assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded));
+			assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_b, &encoded));
+			assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_c, &encoded));
+		}
+	}
+
+	#[test]
+	fn too_many_chain_heads_is_report() {
+		let (tx, rx) = mpsc::channel();
+		let tx = Mutex::new(tx);
+		let known_map = HashMap::<Hash, Known>::new();
+		let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
+		let validator = MessageValidator::new_test(
+			move |hash: &Hash| known_map.get(hash).map(|x| x.clone()),
+			report_handle,
+		);
+
+		let peer_a = PeerId::random();
+
+		let mut validator_context = MockValidatorContext::default();
+		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
+		assert!(validator_context.events.is_empty());
+		validator_context.clear();
+
+		let chain_heads = (0..MAX_CHAIN_HEADS+1).map(|i| [i as u8; 32].into()).collect();
+
+		let message = GossipMessage::Neighbor(VersionedNeighborPacket::V1(NeighborPacket {
+				chain_heads,
+			})).encode();
+		let res = validator.validate(
+			&mut validator_context,
+			&peer_a,
+			&message[..],
+		);
+
+		match res {
+			GossipValidationResult::Discard => {},
+			_ => panic!("wrong result"),
+		}
+		assert_eq!(
+			validator_context.events,
+			Vec::new(),
+		);
+
+		drop(validator);
+
+		assert_eq!(rx.iter().collect::<Vec<_>>(), vec![(peer_a, cost::BAD_NEIGHBOR_PACKET)]);
+	}
+
+	#[test]
+	fn statement_only_sent_when_candidate_known() {
+		let (tx, _rx) = mpsc::channel();
+		let tx = Mutex::new(tx);
+		let known_map = HashMap::<Hash, Known>::new();
+		let report_handle = Box::new(move |peer: &PeerId, cb: i32| tx.lock().send((peer.clone(), cb)).unwrap());
+		let validator = MessageValidator::new_test(
+			move |hash: &Hash| known_map.get(hash).map(|x| x.clone()),
+			report_handle,
+		);
+
+		let peer_a = PeerId::random();
+
+		let mut validator_context = MockValidatorContext::default();
+		validator.new_peer(&mut validator_context, &peer_a, Roles::FULL);
+		assert!(validator_context.events.is_empty());
+		validator_context.clear();
+
+		let hash_a = [1u8; 32].into();
+		let hash_b = [2u8; 32].into();
+
+		let message = GossipMessage::Neighbor(VersionedNeighborPacket::V1(NeighborPacket {
+				chain_heads: vec![hash_a, hash_b],
+			})).encode();
+		let res = validator.validate(
+			&mut validator_context,
+			&peer_a,
+			&message[..],
+		);
+
+		match res {
+			GossipValidationResult::Discard => {},
+			_ => panic!("wrong result"),
+		}
+		assert_eq!(
+			validator_context.events,
+			vec![
+				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false),
+				ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_b), false),
+			],
+		);
+
+		validator_context.clear();
+
+		let topic_a = attestation_topic(hash_a);
+		let c_hash = [99u8; 32].into();
+
+		let statement = GossipMessage::Statement(GossipStatement {
+			relay_parent: hash_a,
+			signed_statement: SignedStatement {
+				statement: GenericStatement::Valid(c_hash),
+				signature: Ed25519Signature([255u8; 64]),
+				sender: 1,
+			}
+		});
+		let encoded = statement.encode();
+		validator.inner.write().our_view.add_session(hash_a, MessageValidationData::default());
+
+		{
+			let mut message_allowed = validator.message_allowed();
+			assert!(!message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
+		}
+
+		validator
+			.inner
+			.write()
+			.peers
+			.get_mut(&peer_a)
+			.unwrap()
+			.live
+			.get_mut(&hash_a)
+			.unwrap()
+			.note_aware(c_hash);
+
+		{
+			let mut message_allowed = validator.message_allowed();
+			assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..]));
 		}
 	}
 }
diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs
index ac98ec32614..bd61ed1ff8a 100644
--- a/polkadot/network/src/lib.rs
+++ b/polkadot/network/src/lib.rs
@@ -31,10 +31,8 @@ extern crate polkadot_primitives;
 extern crate arrayvec;
 extern crate parking_lot;
 extern crate tokio;
-extern crate slice_group_by;
 extern crate exit_future;
 
-#[macro_use]
 extern crate futures;
 #[macro_use]
 extern crate log;
diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs
index dad32c8a911..f181d42c942 100644
--- a/polkadot/network/src/router.rs
+++ b/polkadot/network/src/router.rs
@@ -25,10 +25,10 @@
 
 use sr_primitives::traits::{ProvideRuntimeApi, BlakeTwo256, Hash as HashT};
 use polkadot_validation::{
-	SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Outgoing, Validated
+	SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated
 };
 use polkadot_primitives::{Block, Hash};
-use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message,
+use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost,
 	ValidatorIndex, Collation, PoVBlock,
 };
 use gossip::RegisteredMessageValidator;
@@ -43,8 +43,6 @@ use std::sync::Arc;
 
 use validation::{self, SessionDataFetcher, NetworkService, Executor};
 
-type IngressPairRef<'a> = (ParaId, &'a [Message]);
-
 /// Compute the gossip topic for attestations on the given parent hash.
 pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
 	let mut v = parent_hash.as_ref().to_vec();
@@ -86,16 +84,14 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
 		// this will block internally until the gossip messages stream is obtained.
 		self.network().gossip_messages_for(self.attestation_topic)
 			.filter_map(|msg| {
+				use crate::gossip::GossipMessage;
+
 				debug!(target: "validation", "Processing statement for live validation session");
-				crate::gossip::GossipMessage::decode(&mut &msg.message[..])
+				match GossipMessage::decode(&mut &msg.message[..]) {
+					Some(GossipMessage::Statement(s)) => Some(s.signed_statement),
+					_ => None,
+				}
 			})
-			.map(|msg| msg.statement)
-	}
-
-	/// Get access to the session data fetcher.
-	#[cfg(test)]
-	pub(crate) fn fetcher(&self) -> &SessionDataFetcher<P, E, N, T> {
-		&self.fetcher
 	}
 
 	fn parent_hash(&self) -> Hash {
@@ -174,38 +170,6 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
 		}
 	}
 
-	/// Broadcast outgoing messages to peers.
-	pub(crate) fn broadcast_egress(&self, outgoing: Outgoing) {
-		use slice_group_by::LinearGroupBy;
-
-		let mut group_messages = Vec::new();
-		for egress in outgoing {
-			let source = egress.from;
-			let messages = egress.messages.outgoing_messages;
-
-			let groups = LinearGroupBy::new(&messages, |a, b| a.target == b.target);
-			for group in groups {
-				let target = match group.get(0) {
-					Some(msg) => msg.target,
-					None => continue, // skip empty.
-				};
-
-				group_messages.clear(); // reuse allocation from previous iterations.
-				group_messages.extend(group.iter().map(|msg| msg.data.clone()).map(Message));
-
-				debug!(target: "valdidation", "Circulating messages from {:?} to {:?} at {}",
-					source, target, self.parent_hash());
-
-				// this is the ingress from source to target, with given messages.
-				let target_incoming =
-					validation::incoming_message_topic(self.parent_hash(), target);
-				let ingress_for: IngressPairRef = (source, &group_messages[..]);
-
-				self.network().gossip_message(target_incoming, ingress_for.encode());
-			}
-		}
-	}
-
 	fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
 		-> impl Future<Item=(),Error=()> + Send + 'static
 		where
@@ -263,7 +227,6 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
 impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
 	fn drop(&mut self) {
 		let parent_hash = self.parent_hash().clone();
-		self.message_validator.remove_session(&parent_hash);
 		self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
 	}
 }
diff --git a/polkadot/network/src/tests/mod.rs b/polkadot/network/src/tests/mod.rs
index a9d7be3423a..7ad237d2148 100644
--- a/polkadot/network/src/tests/mod.rs
+++ b/polkadot/network/src/tests/mod.rs
@@ -27,11 +27,10 @@ use polkadot_primitives::parachain::{
 	ConsolidatedIngressRoots,
 };
 use substrate_primitives::crypto::UncheckedInto;
-use sr_primitives::traits::Block as BlockT;
 use codec::Encode;
 use substrate_network::{
-	PeerId, PeerInfo, ClientHandle, Context, config::Roles,
-	message::{BlockRequest, generic::{ConsensusMessage, FinalityProofRequest}},
+	PeerId, Context, config::Roles,
+	message::generic::ConsensusMessage,
 	specialization::NetworkSpecialization, generic_message::Message as GenericMessage
 };
 
@@ -79,7 +78,6 @@ impl TestContext {
 	}
 }
 
-
 fn make_pov(block_data: Vec<u8>) -> PoVBlock {
 	PoVBlock {
 		block_data: BlockData(block_data),
diff --git a/polkadot/network/src/tests/validation.rs b/polkadot/network/src/tests/validation.rs
index 73762763c09..428d86e0195 100644
--- a/polkadot/network/src/tests/validation.rs
+++ b/polkadot/network/src/tests/validation.rs
@@ -16,7 +16,9 @@
 
 //! Tests and helpers for validation networking.
 
-use validation::NetworkService;
+#![allow(unused)]
+
+use validation::{NetworkService, GossipService};
 use substrate_network::Context as NetContext;
 use substrate_network::consensus_gossip::TopicNotification;
 use substrate_primitives::{NativeOrEncoded, ExecutionContext};
@@ -151,7 +153,11 @@ impl NetworkService for TestNetwork {
 		let _ = self.gossip.send_message.unbounded_send((topic, notification));
 	}
 
-	fn drop_gossip(&self, _topic: Hash) {}
+	fn with_gossip<F: Send + 'static>(&self, with: F)
+		where F: FnOnce(&mut GossipService, &mut NetContext<Block>)
+	{
+		unimplemented!()
+	}
 
 	fn with_spec<F: Send + 'static>(&self, with: F)
 		where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
@@ -342,6 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
 
 		let message_val = crate::gossip::RegisteredMessageValidator::new_test(
 			|_hash: &_| Some(crate::gossip::Known::Leaf),
+			Box::new(|_, _| {}),
 		);
 
 		TestValidationNetwork::new(
@@ -408,95 +415,3 @@ fn make_table(data: &ApiData, local_key: &AuthorityKeyring, parent_hash: Hash) -
 		store,
 	))
 }
-
-#[test]
-fn ingress_fetch_works() {
-	let mut runtime = Runtime::new().unwrap();
-	let built = build_network(3, runtime.executor());
-
-	let id_a: ParaId = 1.into();
-	let id_b: ParaId = 2.into();
-	let id_c: ParaId = 3.into();
-
-	let key_a = AuthorityKeyring::Alice;
-	let key_b = AuthorityKeyring::Bob;
-	let key_c = AuthorityKeyring::Charlie;
-
-	let messages_from_a = vec![
-		OutgoingMessage { target: id_b, data: vec![1, 2, 3] },
-		OutgoingMessage { target: id_b, data: vec![3, 4, 5] },
-		OutgoingMessage { target: id_c, data: vec![9, 9, 9] },
-	];
-
-	let messages_from_b = vec![
-		OutgoingMessage { target: id_a, data: vec![1, 1, 1, 1, 1,] },
-		OutgoingMessage { target: id_c, data: b"hello world".to_vec() },
-	];
-
-	let messages_from_c = vec![
-		OutgoingMessage { target: id_a, data: b"dog42".to_vec() },
-		OutgoingMessage { target: id_b, data: b"dogglesworth".to_vec() },
-	];
-
-	let ingress = {
-		let mut builder = IngressBuilder::default();
-		builder.add_messages(id_a, &messages_from_a);
-		builder.add_messages(id_b, &messages_from_b);
-		builder.add_messages(id_c, &messages_from_c);
-
-		builder.build()
-	};
-
-	let parent_hash = [1; 32].into();
-
-	let (router_a, router_b, router_c) = {
-		let validators: Vec<ValidatorId> = vec![
-			key_a.into(),
-			key_b.into(),
-			key_c.into(),
-		];
-
-		// NOTE: this is possible only because we are currently asserting that parachain validators
-		// share their crypto with the (Aura) authority set. Once that assumption breaks, so will this
-		// code.
-		let authorities = validators.clone();
-
-		let mut api_handle = built.api_handle.lock();
-		*api_handle = ApiData {
-			active_parachains: vec![id_a, id_b, id_c],
-			duties: vec![Chain::Parachain(id_a), Chain::Parachain(id_b), Chain::Parachain(id_c)],
-			validators,
-			ingress,
-		};
-
-		(
-			built.networks[0].communication_for(
-				make_table(&*api_handle, &key_a, parent_hash),
-				vec![MessagesFrom::from_messages(id_a, messages_from_a)],
-				&authorities,
-			),
-			built.networks[1].communication_for(
-				make_table(&*api_handle, &key_b, parent_hash),
-				vec![MessagesFrom::from_messages(id_b, messages_from_b)],
-				&authorities,
-			),
-			built.networks[2].communication_for(
-				make_table(&*api_handle, &key_c, parent_hash),
-				vec![MessagesFrom::from_messages(id_c, messages_from_c)],
-				&authorities,
-			),
-		)
-	};
-
-	// make sure everyone can get ingress for their own parachain.
-	let fetch_a = router_a.then(move |r| r.unwrap().fetcher()
-		.fetch_incoming(id_a).map_err(|_| format!("Could not fetch ingress_a")));
-	let fetch_b = router_b.then(move |r| r.unwrap().fetcher()
-		.fetch_incoming(id_b).map_err(|_| format!("Could not fetch ingress_b")));
-	let fetch_c = router_c.then(move |r| r.unwrap().fetcher()
-		.fetch_incoming(id_c).map_err(|_| format!("Could not fetch ingress_c")));
-
-	let work = fetch_a.join3(fetch_b, fetch_c);
-	runtime.spawn(built.gossip.then(|_| Ok(()))); // in background.
-	runtime.block_on(work).unwrap();
-}
diff --git a/polkadot/network/src/validation.rs b/polkadot/network/src/validation.rs
index fffbf88c1ac..ee752388119 100644
--- a/polkadot/network/src/validation.rs
+++ b/polkadot/network/src/validation.rs
@@ -19,13 +19,14 @@
 //! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
 //! each time a validation session begins on a new chain head.
 
-use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT};
-use substrate_network::Context as NetContext;
-use substrate_network::consensus_gossip::{TopicNotification, MessageRecipient as GossipMessageRecipient};
+use sr_primitives::traits::ProvideRuntimeApi;
+use substrate_network::{PeerId, Context as NetContext};
+use substrate_network::consensus_gossip::{
+	self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
+};
 use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
 use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
-use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, Message, CandidateReceipt, CollatorId, ValidatorId, PoVBlock, ValidatorIndex};
-use codec::{Encode, Decode};
+use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId, ValidatorId, PoVBlock, ValidatorIndex};
 
 use futures::prelude::*;
 use futures::future::{self, Executor as FutureExecutor};
@@ -71,6 +72,17 @@ impl Executor for TaskExecutor {
 	}
 }
 
+/// A gossip network subservice.
+pub trait GossipService {
+	fn send_message(&mut self, ctx: &mut NetContext<Block>, who: &PeerId, message: ConsensusMessage);
+}
+
+impl GossipService for consensus_gossip::ConsensusGossip<Block> {
+	fn send_message(&mut self, ctx: &mut NetContext<Block>, who: &PeerId, message: ConsensusMessage) {
+		consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
+	}
+}
+
 /// Basic functionality that a network has to fulfill.
 pub trait NetworkService: Send + Sync + 'static {
 	/// Get a stream of gossip messages for a given hash.
@@ -79,8 +91,9 @@ pub trait NetworkService: Send + Sync + 'static {
 	/// Gossip a message on given topic.
 	fn gossip_message(&self, topic: Hash, message: Vec<u8>);
 
-	/// Drop a gossip topic.
-	fn drop_gossip(&self, topic: Hash);
+	/// Execute a closure with the gossip service.
+	fn with_gossip<F: Send + 'static>(&self, with: F)
+		where F: FnOnce(&mut GossipService, &mut NetContext<Block>);
 
 	/// Execute a closure with the polkadot protocol.
 	fn with_spec<F: Send + 'static>(&self, with: F)
@@ -91,7 +104,7 @@ impl NetworkService for super::NetworkService {
 	fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> {
 		let (tx, rx) = std::sync::mpsc::channel();
 
-		self.with_gossip(move |gossip, _| {
+		super::NetworkService::with_gossip(self, move |gossip, _| {
 			let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
 			let _ = tx.send(inner_rx);
 		});
@@ -111,7 +124,11 @@ impl NetworkService for super::NetworkService {
 		);
 	}
 
-	fn drop_gossip(&self, _topic: Hash) { }
+	fn with_gossip<F: Send + 'static>(&self, with: F)
+		where F: FnOnce(&mut GossipService, &mut NetContext<Block>)
+	{
+		super::NetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
+	}
 
 	fn with_spec<F: Send + 'static>(&self, with: F)
 		where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
@@ -199,16 +216,20 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
 			.collect();
 
 		let (tx, rx) = oneshot::channel();
-		self.network.with_spec(move |spec, ctx| {
-			// before requesting messages, note live consensus session.
-			message_validator.note_session(
-				parent_hash,
-				MessageValidationData {
-					authorities: params.authorities.clone(),
-					index_mapping,
-				},
-			);
 
+		{
+			let message_validator = self.message_validator.clone();
+			let authorities = params.authorities.clone();
+			self.network.with_gossip(move |gossip, ctx| {
+				message_validator.note_session(
+					parent_hash,
+					MessageValidationData { authorities, index_mapping },
+					|peer_id, message| gossip.send_message(ctx, peer_id, message),
+				);
+			});
+		}
+
+		self.network.with_spec(move |spec, ctx| {
 			let session = spec.new_validation_session(ctx, params);
 			let _ = tx.send(SessionDataFetcher {
 				network,
@@ -217,7 +238,6 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
 				parent_hash,
 				knowledge: session.knowledge().clone(),
 				exit,
-				fetch_incoming: session.fetched_incoming().clone(),
 				message_validator,
 			});
 		});
@@ -241,7 +261,6 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
 	fn communication_for(
 		&self,
 		table: Arc<SharedTable>,
-		outgoing: polkadot_validation::Outgoing,
 		authorities: &[ValidatorId],
 	) -> Self::BuildTableRouter {
 		let parent_hash = table.consensus_parent_hash().clone();
@@ -264,8 +283,6 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
 					message_validator,
 				);
 
-				table_router.broadcast_egress(outgoing);
-
 				let table_router_clone = table_router.clone();
 				let work = table_router.checked_statements()
 					.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
@@ -406,22 +423,12 @@ impl Future for IncomingReceiver {
 	}
 }
 
-/// Incoming message gossip topic for a parachain at a given block hash.
-pub(crate) fn incoming_message_topic(parent_hash: Hash, parachain: ParaId) -> Hash {
-	let mut v = parent_hash.as_ref().to_vec();
-	parachain.using_encoded(|s| v.extend(s));
-	v.extend(b"incoming");
-
-	BlakeTwo256::hash(&v[..])
-}
-
 /// A current validation session instance.
 #[derive(Clone)]
 pub(crate) struct ValidationSession {
 	parent_hash: Hash,
 	knowledge: Arc<Mutex<Knowledge>>,
 	local_session_key: Option<ValidatorId>,
-	fetch_incoming: Arc<Mutex<FetchIncoming>>,
 }
 
 impl ValidationSession {
@@ -432,7 +439,6 @@ impl ValidationSession {
 			parent_hash: params.parent_hash,
 			knowledge: Arc::new(Mutex::new(Knowledge::new())),
 			local_session_key: params.local_session_key,
-			fetch_incoming: Arc::new(Mutex::new(FetchIncoming::new())),
 		}
 	}
 
@@ -442,11 +448,6 @@ impl ValidationSession {
 		&self.knowledge
 	}
 
-	/// Get a handle to the shared list of parachains' incoming data fetch.
-	pub(crate) fn fetched_incoming(&self) -> &Arc<Mutex<FetchIncoming>> {
-		&self.fetch_incoming
-	}
-
 	// execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities
 	// we believe should have the data.
 	fn with_pov_block<F, U>(&self, hash: &Hash, f: F) -> U
@@ -646,58 +647,10 @@ impl Future for PoVReceiver {
 	}
 }
 
-/// Wrapper around bookkeeping for tracking which parachains we're fetching incoming messages
-/// for.
-pub(crate) struct FetchIncoming {
-	exit_signal: ::exit_future::Signal,
-	parachains_fetching: HashMap<ParaId, IncomingReceiver>,
-}
-
-impl FetchIncoming {
-	fn new() -> Self {
-		FetchIncoming {
-			exit_signal: ::exit_future::signal_only(),
-			parachains_fetching: HashMap::new(),
-		}
-	}
-
-	// registers intent to fetch incoming. returns an optional piece of work
-	// that, if some, is needed to be run to completion in order for the future to
-	// resolve.
-	//
-	// impl Future has a bug here where it wrongly assigns a `'static` bound to `M`.
-	fn fetch_with_work<M, W>(&mut self, para_id: ParaId, make_work: M)
-		-> (IncomingReceiver, Option<Box<Future<Item=(),Error=()> + Send>>) where
-		M: FnOnce() -> W,
-		W: Future<Item=Option<Incoming>> + Send + 'static,
-	{
-		let (tx, rx) = match self.parachains_fetching.entry(para_id) {
-			Entry::Occupied(entry) => return (entry.get().clone(), None),
-			Entry::Vacant(entry) => {
-				// has not been requested yet.
-				let (tx, rx) = oneshot::channel();
-				let rx = IncomingReceiver { inner: rx.shared() };
-				entry.insert(rx.clone());
-
-				(tx, rx)
-			}
-		};
-
-		let exit = self.exit_signal.make_exit();
-		let work = make_work()
-			.map(move |incoming| if let Some(i) = incoming { let _ = tx.send(i); })
-			.select2(exit)
-			.then(|_| Ok(()));
-
-		(rx, Some(Box::new(work)))
-	}
-}
-
 /// Can fetch data for a given validation session
 pub struct SessionDataFetcher<P, E, N: NetworkService, T> {
 	network: Arc<N>,
 	api: Arc<P>,
-	fetch_incoming: Arc<Mutex<FetchIncoming>>,
 	exit: E,
 	task_executor: T,
 	knowledge: Arc<Mutex<Knowledge>>,
@@ -744,7 +697,6 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E
 			api: self.api.clone(),
 			task_executor: self.task_executor.clone(),
 			parent_hash: self.parent_hash.clone(),
-			fetch_incoming: self.fetch_incoming.clone(),
 			knowledge: self.knowledge.clone(),
 			exit: self.exit.clone(),
 			message_validator: self.message_validator.clone(),
@@ -783,130 +735,11 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
 		});
 		PoVReceiver { outer: rx, inner: None }
 	}
-
-	/// Fetch incoming messages for a parachain.
-	pub fn fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver {
-		let (rx, work) = self.fetch_incoming.lock().fetch_with_work(parachain.clone(), move || {
-			let parent_hash: Hash = self.parent_hash();
-			let topic = incoming_message_topic(parent_hash, parachain);
-
-			let gossip_messages = self.network().gossip_messages_for(topic)
-				.map_err(|()| panic!("unbounded receivers do not throw errors; qed"))
-				.filter_map(|msg| IngressPair::decode(&mut msg.message.as_slice()));
-
-			let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
-				.map_err(|e| format!("Cannot fetch ingress for parachain {:?} at {:?}: {:?}",
-					parachain, parent_hash, e)
-				);
-
-			canon_roots.into_future()
-				.and_then(move |ingress_roots| match ingress_roots {
-					None => Err(format!("No parachain {:?} registered at {}", parachain, parent_hash)),
-					Some(roots) => Ok(roots.0.into_iter().collect())
-				})
-				.and_then(move |ingress_roots| ComputeIngress {
-					inner: gossip_messages,
-					ingress_roots,
-					incoming: Vec::new(),
-				})
-				.select2(self.exit.clone())
-				.map(|res| match res {
-					future::Either::A((incoming, _)) => incoming,
-					future::Either::B(_) => None,
-				})
-		});
-
-		if let Some(work) = work {
-			self.task_executor.spawn(work);
-		}
-
-		rx
-	}
-}
-
-impl<P, E, N: NetworkService, T> Drop for SessionDataFetcher<P, E, N, T> {
-	fn drop(&mut self) {
-		// a bit of a hack...
-		let network = self.network.clone();
-		let fetch_incoming = self.fetch_incoming.clone();
-		let message_validator = self.message_validator.clone();
-
-		let parent_hash = self.parent_hash();
-
-		self.network.with_spec(move |spec, _| {
-			if !spec.remove_validation_session(parent_hash) { return }
-
-			let mut incoming_fetched = fetch_incoming.lock();
-			for (para_id, _) in incoming_fetched.parachains_fetching.drain() {
-				network.drop_gossip(incoming_message_topic(
-					parent_hash,
-					para_id,
-				));
-			}
-
-			message_validator.remove_session(&parent_hash);
-		});
-	}
-}
-
-type IngressPair = (ParaId, Vec<Message>);
-
-// computes ingress from incoming stream of messages.
-// returns `None` if the stream concludes too early.
-#[must_use = "futures do nothing unless polled"]
-struct ComputeIngress<S> {
-	ingress_roots: HashMap<ParaId, Hash>,
-	incoming: Vec<IngressPair>,
-	inner: S,
-}
-
-impl<S> Future for ComputeIngress<S> where S: Stream<Item=IngressPair> {
-	type Item = Option<Incoming>;
-	type Error = S::Error;
-
-	fn poll(&mut self) -> Poll<Option<Incoming>, Self::Error> {
-		loop {
-			if self.ingress_roots.is_empty() {
-				return Ok(Async::Ready(
-					Some(::std::mem::replace(&mut self.incoming, Vec::new()))
-				))
-			}
-
-			let (para_id, messages) = match try_ready!(self.inner.poll()) {
-				None => return Ok(Async::Ready(None)),
-				Some(next) => next,
-			};
-
-			match self.ingress_roots.entry(para_id) {
-				Entry::Vacant(_) => continue,
-				Entry::Occupied(occupied) => {
-					let canon_root = occupied.get().clone();
-					let messages = messages.iter().map(|m| &m.0[..]);
-					if ::polkadot_validation::message_queue_root(messages) != canon_root {
-						continue;
-					}
-
-					occupied.remove();
-				}
-			}
-
-			let pos = self.incoming.binary_search_by_key(
-				&para_id,
-				|&(id, _)| id,
-			)
-				.err()
-				.expect("incoming starts empty and only inserted when \
-					para_id not inserted before; qed");
-
-			self.incoming.insert(pos, (para_id, messages));
-		}
-	}
 }
 
 #[cfg(test)]
 mod tests {
 	use super::*;
-	use futures::stream;
 	use substrate_primitives::crypto::UncheckedInto;
 
 	#[test]
@@ -959,72 +792,6 @@ mod tests {
 		}
 	}
 
-	#[test]
-	fn compute_ingress_works() {
-		let actual_messages = [
-			(
-				ParaId::from(1),
-				vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])],
-			),
-			(
-				ParaId::from(2),
-				vec![
-					Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]),
-					Message(b"hello world".to_vec()),
-				],
-			),
-			(
-				ParaId::from(5),
-				vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])],
-			),
-		];
-
-		let roots: HashMap<_, _> = actual_messages.iter()
-			.map(|&(para_id, ref messages)| (
-				para_id,
-				::polkadot_validation::message_queue_root(messages.iter().map(|m| &m.0)),
-			))
-			.collect();
-
-		let inputs = [
-			(
-				ParaId::from(1), // wrong message.
-				vec![Message(vec![1, 1, 2, 2]), Message(vec![3, 3, 4, 4])],
-			),
-			(
-				ParaId::from(1),
-				vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])],
-			),
-			(
-				ParaId::from(1), // duplicate
-				vec![Message(vec![1, 3, 5, 6]), Message(vec![4, 4, 4, 4])],
-			),
-
-			(
-				ParaId::from(5), // out of order
-				vec![Message(vec![1, 2, 3, 4, 5]), Message(vec![6, 9, 6, 9])],
-			),
-			(
-				ParaId::from(1234), // un-routed parachain.
-				vec![Message(vec![9, 9, 9, 9])],
-			),
-			(
-				ParaId::from(2),
-				vec![
-					Message(vec![1, 3, 7, 9, 1, 2, 3, 4, 5, 6]),
-					Message(b"hello world".to_vec()),
-				],
-			),
-		];
-		let ingress = ComputeIngress {
-			ingress_roots: roots,
-			incoming: Vec::new(),
-			inner: stream::iter_ok::<_, ()>(inputs.iter().cloned()),
-		};
-
-		assert_eq!(ingress.wait().unwrap().unwrap(), actual_messages);
-	}
-
 	#[test]
 	fn add_new_sessions_works() {
 		let mut live_sessions = LiveValidationSessions::new();
diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs
index 84d4f31e931..d568b908bc2 100644
--- a/polkadot/service/src/lib.rs
+++ b/polkadot/service/src/lib.rs
@@ -272,7 +272,7 @@ construct_service_factory! {
 
 				let gossip_validator_select_chain = select_chain.clone();
 				let gossip_validator = network_gossip::register_validator(
-					&*service.network(),
+					service.network(),
 					move |block_hash: &Hash| {
 						use client::BlockStatus;
 
diff --git a/polkadot/statement-table/src/generic.rs b/polkadot/statement-table/src/generic.rs
index c7aa9bf93d3..ce44e8af41c 100644
--- a/polkadot/statement-table/src/generic.rs
+++ b/polkadot/statement-table/src/generic.rs
@@ -58,7 +58,7 @@ pub trait Context {
 /// Statements circulated among peers.
 #[derive(PartialEq, Eq, Debug, Clone, Encode, Decode)]
 pub enum Statement<C, D> {
-	/// Broadcast by a authority to indicate that this is his candidate for
+	/// Broadcast by an authority to indicate that this is his candidate for
 	/// inclusion.
 	///
 	/// Broadcasting two different candidate messages per round is not allowed.
diff --git a/polkadot/validation/src/lib.rs b/polkadot/validation/src/lib.rs
index e24f226728d..b7c5e93dcfe 100644
--- a/polkadot/validation/src/lib.rs
+++ b/polkadot/validation/src/lib.rs
@@ -179,7 +179,6 @@ pub trait Network {
 	fn communication_for(
 		&self,
 		table: Arc<SharedTable>,
-		outgoing: Outgoing,
 		authorities: &[SessionKey],
 	) -> Self::BuildTableRouter;
 }
@@ -319,7 +318,11 @@ impl<C, N, P> ParachainValidation<C, N, P> where
 			.map(|x| x.collect())
 			.unwrap_or_default();
 
-		let outgoing: Vec<_> = {
+		// TODO: https://github.com/paritytech/polkadot/issues/253
+		//
+		// We probably don't only want active validators to do this, or messages
+		// will disappear when validators exit the set.
+		let _outgoing: Vec<_> = {
 			// extract all extrinsic data that we have and propagate to peers.
 			live_instances.get(&grandparent_hash).map(|parent_validation| {
 				parent_candidates.iter().filter_map(|c| {
@@ -351,7 +354,6 @@ impl<C, N, P> ParachainValidation<C, N, P> where
 		let table = Arc::new(SharedTable::new(authorities, group_info, sign_with.clone(), parent_hash, self.extrinsic_store.clone()));
 		let router = self.network.communication_for(
 			table.clone(),
-			outgoing,
 			authorities,
 		);
 
-- 
GitLab