From 7d881a2ec5da846809451ac229b6f3943e433e24 Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Thu, 12 Jul 2018 09:47:10 +0100
Subject: [PATCH] Validator side of the collation protocol. (#295)

* skeleton of collators object

* awaiting and handling collations. rename `collators` to CollationPool

* add some tests

* add tests

* implement Collators trait for ConsensusNetwork

* plug collators into main polkadot-network

* ignore collator role message

* add a couple more tests

* garbage collection for collations

* ensure disconnected backup collator is removed from pool

* address other grumbles
---
 substrate/polkadot/collator/src/lib.rs        |   5 +-
 substrate/polkadot/consensus/src/collation.rs |   6 +
 .../polkadot/network/src/collator_pool.rs     | 320 ++++++++++++++++++
 substrate/polkadot/network/src/consensus.rs   |  37 +-
 substrate/polkadot/network/src/lib.rs         | 137 ++++++--
 substrate/polkadot/network/src/tests.rs       |  28 +-
 .../polkadot/primitives/src/parachain.rs      |  13 +-
 substrate/substrate/network/src/protocol.rs   |   5 +
 .../substrate/network/src/specialization.rs   |   3 +
 9 files changed, 521 insertions(+), 33 deletions(-)
 create mode 100644 substrate/polkadot/network/src/collator_pool.rs

diff --git a/substrate/polkadot/collator/src/lib.rs b/substrate/polkadot/collator/src/lib.rs
index f7557f353e1..a4629c57938 100644
--- a/substrate/polkadot/collator/src/lib.rs
+++ b/substrate/polkadot/collator/src/lib.rs
@@ -157,7 +157,8 @@ pub fn collate<'a, R, P>(
 			ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
 		);
 
-		let signature = key.sign(&block_data.0[..]).into();
+		let block_data_hash = block_data.hash();
+		let signature = key.sign(&block_data_hash.0[..]).into();
 		let pubkey_bytes: [u8; 32] = key.public().into();
 
 		let receipt = parachain::CandidateReceipt {
@@ -168,7 +169,7 @@ pub fn collate<'a, R, P>(
 			balance_uploads: Vec::new(),
 			egress_queue_roots: Vec::new(),
 			fees: 0,
-			block_data_hash: block_data.hash(),
+			block_data_hash,
 		};
 
 		parachain::Collation {
diff --git a/substrate/polkadot/consensus/src/collation.rs b/substrate/polkadot/consensus/src/collation.rs
index db490a0eb17..f7db48db619 100644
--- a/substrate/polkadot/consensus/src/collation.rs
+++ b/substrate/polkadot/consensus/src/collation.rs
@@ -37,6 +37,12 @@ pub trait Collators: Clone {
 	type Collation: IntoFuture<Item=Collation,Error=Self::Error>;
 
 	/// Collate on a specific parachain, building on a given relay chain parent hash.
+	///
+	/// The returned collation should be checked for basic validity in the signature
+	/// and will be checked for state-transition validity by the consumer of this trait.
+	///
+	/// This does not have to guarantee local availability, as a valid collation
+	/// will be passed to the `TableRouter` instance.
 	fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation;
 
 	/// Note a bad collator. TODO: take proof
diff --git a/substrate/polkadot/network/src/collator_pool.rs b/substrate/polkadot/network/src/collator_pool.rs
new file mode 100644
index 00000000000..12ddade1de1
--- /dev/null
+++ b/substrate/polkadot/network/src/collator_pool.rs
@@ -0,0 +1,320 @@
+// Copyright 2018 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Bridge between the network and consensus service for getting collations to it.
+
+use polkadot_primitives::{AccountId, Hash};
+use polkadot_primitives::parachain::{Id as ParaId, Collation};
+
+use futures::sync::oneshot;
+
+use std::collections::hash_map::{HashMap, Entry};
+use std::time::{Duration, Instant};
+
+const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);
+
+/// The role of the collator. Whether they're the primary or backup for this parachain.
+#[derive(PartialEq, Debug, Serialize, Deserialize)]
+pub enum Role {
+	/// Primary collators should send collations whenever it's time.
+	Primary,
+	/// Backup collators should not.
+	Backup,
+}
+
+/// A maintenance action for the collator set.
+#[derive(PartialEq, Debug)]
+#[allow(dead_code)]
+pub enum Action {
+	/// Disconnect the given collator.
+	Disconnect(AccountId),
+	/// Give the collator a new role.
+	NewRole(AccountId, Role),
+}
+
+struct CollationSlot {
+	live_at: Instant,
+	entries: SlotEntries,
+}
+
+impl CollationSlot {
+	fn blank_now() -> Self {
+		CollationSlot {
+			live_at: Instant::now(),
+			entries: SlotEntries::Blank,
+		}
+	}
+
+	fn stay_alive(&self, now: Instant) -> bool {
+		self.live_at + COLLATION_LIFETIME > now
+	}
+}
+
+enum SlotEntries {
+	Blank,
+	// not queried yet
+	Pending(Vec<Collation>),
+	// waiting for next to arrive.
+	Awaiting(Vec<oneshot::Sender<Collation>>),
+}
+
+impl SlotEntries {
+	fn received_collation(&mut self, collation: Collation) {
+		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
+			SlotEntries::Blank => SlotEntries::Pending(vec![collation]),
+			SlotEntries::Pending(mut cs) => {
+				cs.push(collation);
+				SlotEntries::Pending(cs)
+			}
+			SlotEntries::Awaiting(senders) => {
+				for sender in senders {
+					let _ = sender.send(collation.clone());
+				}
+
+				SlotEntries::Blank
+			}
+		};
+	}
+
+	fn await_with(&mut self, sender: oneshot::Sender<Collation>) {
+		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
+			SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]),
+			SlotEntries::Awaiting(mut senders) => {
+				senders.push(sender);
+				SlotEntries::Awaiting(senders)
+			}
+			SlotEntries::Pending(mut cs) => {
+				let next_collation = cs.pop().expect("empty variant is always `Blank`; qed");
+				let _ = sender.send(next_collation);
+
+				if cs.is_empty() {
+					SlotEntries::Blank
+				} else {
+					SlotEntries::Pending(cs)
+				}
+			}
+		};
+	}
+}
+
+struct ParachainCollators {
+	primary: AccountId,
+	backup: Vec<AccountId>,
+}
+
+/// Manages connected collators and role assignments from the perspective of a validator.
+pub struct CollatorPool {
+	collators: HashMap<AccountId, ParaId>,
+	parachain_collators: HashMap<ParaId, ParachainCollators>,
+	collations: HashMap<(Hash, ParaId), CollationSlot>,
+}
+
+impl CollatorPool {
+	/// Create a new `CollatorPool` object.
+	pub fn new() -> Self {
+		CollatorPool {
+			collators: HashMap::new(),
+			parachain_collators: HashMap::new(),
+			collations: HashMap::new(),
+		}
+	}
+
+	/// Call when a new collator is authenticated. Returns the role.
+	pub fn on_new_collator(&mut self, account_id: AccountId, para_id: ParaId) -> Role {
+		self.collators.insert(account_id.clone(), para_id);
+		match self.parachain_collators.entry(para_id) {
+			Entry::Vacant(vacant) => {
+				vacant.insert(ParachainCollators {
+					primary: account_id,
+					backup: Vec::new(),
+				});
+
+				Role::Primary
+			},
+			Entry::Occupied(mut occupied) => {
+				occupied.get_mut().backup.push(account_id);
+
+				Role::Backup
+			}
+		}
+	}
+
+	/// Called when a collator disconnects. If it was the primary, returns a new primary for that
+	/// parachain.
+	pub fn on_disconnect(&mut self, account_id: AccountId) -> Option<AccountId> {
+		self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) {
+			Entry::Vacant(_) => None,
+			Entry::Occupied(mut occ) => {
+				if occ.get().primary == account_id {
+					if occ.get().backup.is_empty() {
+						occ.remove();
+						None
+					} else {
+						let mut collators = occ.get_mut();
+						collators.primary = collators.backup.pop().expect("backup non-empty; qed");
+						Some(collators.primary)
+					}
+				} else {
+					let pos = occ.get().backup.iter().position(|a| a == &account_id)
+						.expect("registered collator always present in backup if not primary; qed");
+
+					occ.get_mut().backup.remove(pos);
+					None
+				}
+			}
+		})
+	}
+
+	/// Called when a collation is received.
+	/// The collator should be registered for the parachain of the collation as a precondition of this function.
+	/// The collation should have been checked for integrity of signature before passing to this function.
+	pub fn on_collation(&mut self, account_id: AccountId, relay_parent: Hash, collation: Collation) {
+		if let Some(para_id) = self.collators.get(&account_id) {
+			debug_assert_eq!(para_id, &collation.receipt.parachain_index);
+
+			// TODO: punish if not primary?
+
+			self.collations.entry((relay_parent, para_id.clone()))
+				.or_insert_with(CollationSlot::blank_now)
+				.entries
+				.received_collation(collation);
+		}
+	}
+
+	/// Wait for a collation from a parachain.
+	pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
+		self.collations.entry((relay_parent, para_id))
+			.or_insert_with(CollationSlot::blank_now)
+			.entries
+			.await_with(sender);
+	}
+
+	/// Call periodically to perform collator set maintenance.
+	/// Returns a set of actions to perform on the network level.
+	pub fn maintain_peers(&mut self) -> Vec<Action> {
+		// TODO: rearrange periodically to new primary, evaluate based on latency etc.
+		Vec::new()
+	}
+
+	/// called when a block with given hash has been imported.
+	pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) {
+		let now = Instant::now();
+		self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use polkadot_primitives::parachain::{CandidateReceipt, BlockData, HeadData};
+	use substrate_primitives::H512;
+	use futures::Future;
+
+	#[test]
+	fn disconnect_primary_gives_new_primary() {
+		let mut pool = CollatorPool::new();
+		let para_id: ParaId = 5.into();
+		let bad_primary = [0; 32].into();
+		let good_backup = [1; 32].into();
+
+		assert_eq!(pool.on_new_collator(bad_primary, para_id.clone()), Role::Primary);
+		assert_eq!(pool.on_new_collator(good_backup, para_id.clone()), Role::Backup);
+		assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup));
+		assert_eq!(pool.on_disconnect(good_backup), None);
+	}
+
+	#[test]
+	fn disconnect_backup_removes_from_pool() {
+		let mut pool = CollatorPool::new();
+		let para_id: ParaId = 5.into();
+		let primary = [0; 32].into();
+		let backup = [1; 32].into();
+
+		assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
+		assert_eq!(pool.on_new_collator(backup, para_id.clone()), Role::Backup);
+		assert_eq!(pool.on_disconnect(backup), None);
+		assert!(pool.parachain_collators.get(&para_id).unwrap().backup.is_empty());
+	}
+
+	#[test]
+	fn await_before_collation() {
+		let mut pool = CollatorPool::new();
+		let para_id: ParaId = 5.into();
+		let primary = [0; 32].into();
+		let relay_parent = [1; 32].into();
+
+		assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
+		let (tx1, rx1) = oneshot::channel();
+		let (tx2, rx2) = oneshot::channel();
+		pool.await_collation(relay_parent, para_id, tx1);
+		pool.await_collation(relay_parent, para_id, tx2);
+		pool.on_collation(primary, relay_parent, Collation {
+			receipt: CandidateReceipt {
+				parachain_index: para_id,
+				collator: primary.into(),
+				signature: H512::from([2; 64]).into(),
+				head_data: HeadData(vec![1, 2, 3]),
+				balance_uploads: vec![],
+				egress_queue_roots: vec![],
+				fees: 0,
+				block_data_hash: [3; 32].into(),
+			},
+			block_data: BlockData(vec![4, 5, 6]),
+		});
+
+		rx1.wait().unwrap();
+		rx2.wait().unwrap();
+	}
+
+	#[test]
+	fn collate_before_await() {
+		let mut pool = CollatorPool::new();
+		let para_id: ParaId = 5.into();
+		let primary = [0; 32].into();
+		let relay_parent = [1; 32].into();
+
+		assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
+
+		pool.on_collation(primary, relay_parent, Collation {
+			receipt: CandidateReceipt {
+				parachain_index: para_id,
+				collator: primary.into(),
+				signature: H512::from([2; 64]).into(),
+				head_data: HeadData(vec![1, 2, 3]),
+				balance_uploads: vec![],
+				egress_queue_roots: vec![],
+				fees: 0,
+				block_data_hash: [3; 32].into(),
+			},
+			block_data: BlockData(vec![4, 5, 6]),
+		});
+
+		let (tx, rx) = oneshot::channel();
+		pool.await_collation(relay_parent, para_id, tx);
+		rx.wait().unwrap();
+	}
+
+	#[test]
+	fn slot_stay_alive() {
+		let slot = CollationSlot::blank_now();
+		let now = slot.live_at;
+
+		assert!(slot.stay_alive(now));
+		assert!(slot.stay_alive(now + Duration::from_secs(10)));
+		assert!(!slot.stay_alive(now + COLLATION_LIFETIME));
+		assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10)));
+	}
+}
diff --git a/substrate/polkadot/network/src/consensus.rs b/substrate/polkadot/network/src/consensus.rs
index 0eb14d9381a..3fe22acd5aa 100644
--- a/substrate/polkadot/network/src/consensus.rs
+++ b/substrate/polkadot/network/src/consensus.rs
@@ -27,7 +27,7 @@ use polkadot_consensus::{Network, SharedTable, Collators};
 use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
 use polkadot_primitives::parachain::{Id as ParaId, Collation};
 
-use futures::{future, prelude::*};
+use futures::prelude::*;
 use futures::sync::mpsc;
 
 use std::sync::Arc;
@@ -304,13 +304,38 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Network for ConsensusNetwork<P
 	}
 }
 
+/// Error when the network appears to be down.
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub struct NetworkDown;
+
+/// A future that resolves when a collation is received.
+pub struct AwaitingCollation(Option<::futures::sync::oneshot::Receiver<Collation>>);
+
+impl Future for AwaitingCollation {
+	type Item = Collation;
+	type Error = NetworkDown;
+
+	fn poll(&mut self) -> Poll<Collation, NetworkDown> {
+		match self.0.poll().map_err(|_| NetworkDown)? {
+			Async::Ready(None) => Err(NetworkDown),
+			Async::Ready(Some(x)) => Ok(Async::Ready(x)),
+			Async::NotReady => Ok(Async::NotReady),
+		}
+	}
+}
+
+
 impl<P: LocalPolkadotApi + Send + Sync + 'static> Collators for ConsensusNetwork<P> {
-	type Error = ();
-	type Collation = future::Empty<Collation, ()>;
+	type Error = NetworkDown;
+	type Collation = AwaitingCollation;
 
-	fn collate(&self, _parachain: ParaId, _relay_parent: Hash) -> Self::Collation {
-		future::empty()
+	fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
+		AwaitingCollation(
+			self.network.with_spec(|spec, _| spec.await_collation(relay_parent, parachain))
+		)
 	}
 
-	fn note_bad_collator(&self, _collator: AccountId) { }
+	fn note_bad_collator(&self, collator: AccountId) {
+		self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator));
+	}
 }
diff --git a/substrate/polkadot/network/src/lib.rs b/substrate/polkadot/network/src/lib.rs
index eef8c8192e2..1a9b995e5f3 100644
--- a/substrate/polkadot/network/src/lib.rs
+++ b/substrate/polkadot/network/src/lib.rs
@@ -43,6 +43,7 @@ extern crate rhododendron;
 #[macro_use]
 extern crate log;
 
+mod collator_pool;
 mod router;
 pub mod consensus;
 
@@ -50,17 +51,19 @@ use codec::Slicable;
 use futures::sync::oneshot;
 use parking_lot::Mutex;
 use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
-use polkadot_primitives::{Block, SessionKey, Hash};
-use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
+use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header};
+use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation};
 use substrate_network::{PeerId, RequestId, Context};
 use substrate_network::consensus_gossip::ConsensusGossip;
 use substrate_network::{message, generic_message};
 use substrate_network::specialization::Specialization;
 use substrate_network::StatusMessage as GenericFullStatus;
+use self::collator_pool::{CollatorPool, Role, Action};
 
 use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
+
 #[cfg(test)]
 mod tests;
 
@@ -75,16 +78,16 @@ pub type NetworkService = ::substrate_network::Service<Block, PolkadotProtocol>;
 /// Status of a Polkadot node.
 #[derive(Debug, PartialEq, Eq, Clone)]
 pub struct Status {
-	collating_for: Option<ParaId>,
+	collating_for: Option<(AccountId, ParaId)>,
 }
 
 impl Slicable for Status {
 	fn encode(&self) -> Vec<u8> {
 		let mut v = Vec::new();
 		match self.collating_for {
-			Some(ref id) => {
+			Some(ref details) => {
 				v.push(1);
-				id.using_encoded(|s| v.extend(s));
+				details.using_encoded(|s| v.extend(s));
 			}
 			None => {
 				v.push(0);
@@ -96,7 +99,7 @@ impl Slicable for Status {
 	fn decode<I: ::codec::Input>(input: &mut I) -> Option<Self> {
 		let collating_for = match input.read_byte()? {
 			0 => None,
-			1 => Some(ParaId::decode(input)?),
+			1 => Some(Slicable::decode(input)?),
 			_ => return None,
 		};
 		Some(Status { collating_for })
@@ -196,6 +199,10 @@ pub enum Message {
 	RequestBlockData(RequestId, Hash),
 	/// Provide block data by candidate hash or nothing if unknown.
 	BlockData(RequestId, Option<BlockData>),
+	/// Tell a collator their role.
+	CollatorRole(Role),
+	/// A collation provided by a peer. Relay parent and collation.
+	Collation(Hash, Collation),
 }
 
 fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
@@ -207,8 +214,7 @@ fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message)
 pub struct PolkadotProtocol {
 	peers: HashMap<PeerId, PeerInfo>,
 	consensus_gossip: ConsensusGossip<Block>,
-	collators: HashMap<ParaId, Vec<PeerId>>,
-	collating_for: Option<ParaId>,
+	collators: CollatorPool,
 	live_consensus: Option<CurrentConsensus>,
 	in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
 	pending: Vec<BlockDataRequest>,
@@ -221,8 +227,7 @@ impl PolkadotProtocol {
 		PolkadotProtocol {
 			peers: HashMap::new(),
 			consensus_gossip: ConsensusGossip::new(),
-			collators: HashMap::new(),
-			collating_for: None,
+			collators: CollatorPool::new(),
 			live_consensus: None,
 			in_flight: HashMap::new(),
 			pending: Vec::new(),
@@ -260,7 +265,10 @@ impl PolkadotProtocol {
 		let parent_hash = consensus.parent_hash;
 		let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash);
 
-		for (id, info) in self.peers.iter_mut().filter(|&(_, ref info)| info.validator) {
+		// TODO: optimize for when session key changes and only send to collators who are relevant in next few blocks.
+		for (id, info) in self.peers.iter_mut()
+			.filter(|&(_, ref info)| info.validator || info.status.collating_for.is_some())
+		{
 			send_polkadot_message(
 				ctx,
 				*id,
@@ -363,6 +371,8 @@ impl PolkadotProtocol {
 				send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data));
 			}
 			Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data),
+			Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation),
+			Message::CollatorRole(_) => {},
 		}
 	}
 
@@ -386,7 +396,7 @@ impl PolkadotProtocol {
 
 impl Specialization<Block> for PolkadotProtocol {
 	fn status(&self) -> Vec<u8> {
-		Status { collating_for: self.collating_for.clone() }.encode()
+		Status { collating_for: None }.encode()
 	}
 
 	fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, status: FullStatus) {
@@ -397,13 +407,23 @@ impl Specialization<Block> for PolkadotProtocol {
 			}
 		};
 
-		if let Some(ref para_id) = local_status.collating_for {
-			self.collators.entry(para_id.clone())
-				.or_insert_with(Vec::new)
-				.push(peer_id);
+		if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
+			if self.collator_peer_id(acc_id.clone()).is_some() {
+				ctx.disable_peer(peer_id);
+				return
+			}
+
+			let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
+			send_polkadot_message(
+				ctx,
+				peer_id,
+				Message::CollatorRole(collator_role),
+			);
 		}
 
 		let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
+		let send_key = validator || local_status.collating_for.is_some();
+
 		self.peers.insert(peer_id, PeerInfo {
 			status: local_status,
 			session_keys: Default::default(),
@@ -411,8 +431,7 @@ impl Specialization<Block> for PolkadotProtocol {
 		});
 
 		self.consensus_gossip.new_peer(ctx, peer_id, &status.roles);
-
-		if let (true, &Some(ref consensus)) = (validator, &self.live_consensus) {
+		if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
 			send_polkadot_message(
 				ctx,
 				peer_id,
@@ -425,9 +444,16 @@ impl Specialization<Block> for PolkadotProtocol {
 
 	fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId) {
 		if let Some(info) = self.peers.remove(&peer_id) {
-			if let Some(collators) = info.status.collating_for.and_then(|id| self.collators.get_mut(&id)) {
-				if let Some(pos) = collators.iter().position(|x| x == &peer_id) {
-					collators.swap_remove(pos);
+			if let Some((acc_id, _)) = info.status.collating_for {
+				let new_primary = self.collators.on_disconnect(acc_id)
+					.and_then(|new_primary| self.collator_peer_id(new_primary));
+
+				if let Some(new_primary) = new_primary {
+					send_polkadot_message(
+						ctx,
+						new_primary,
+						Message::CollatorRole(Role::Primary),
+					)
 				}
 			}
 
@@ -483,6 +509,75 @@ impl Specialization<Block> for PolkadotProtocol {
 
 	fn maintain_peers(&mut self, ctx: &mut Context<Block>) {
 		self.consensus_gossip.collect_garbage(None);
+		self.collators.collect_garbage(None);
 		self.dispatch_pending_requests(ctx);
+
+		for collator_action in self.collators.maintain_peers() {
+			match collator_action {
+				Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator),
+				Action::NewRole(account_id, role) => if let Some(collator) = self.collator_peer_id(account_id) {
+					send_polkadot_message(
+						ctx,
+						collator,
+						Message::CollatorRole(role),
+					)
+				},
+			}
+		}
+	}
+
+	fn on_block_imported(&mut self, _ctx: &mut Context<Block>, hash: Hash, _header: &Header) {
+		self.collators.collect_garbage(Some(&hash));
+	}
+}
+
+impl PolkadotProtocol {
+	// we received a collation from a peer
+	fn on_collation(&mut self, ctx: &mut Context<Block>, from: PeerId, relay_parent: Hash, collation: Collation) {
+		let collation_para = collation.receipt.parachain_index;
+		let collated_acc = collation.receipt.collator;
+
+		match self.peers.get(&from) {
+			None => ctx.disconnect_peer(from),
+			Some(peer_info) => match peer_info.status.collating_for {
+				None => ctx.disable_peer(from),
+				Some((ref acc_id, ref para_id)) => {
+					let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
+					if structurally_valid && collation.receipt.check_signature().is_ok() {
+						self.collators.on_collation(acc_id.clone(), relay_parent, collation)
+					} else {
+						ctx.disable_peer(from)
+					};
+				}
+			},
+		}
+	}
+
+	fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
+		let (tx, rx) = oneshot::channel();
+		self.collators.await_collation(relay_parent, para_id, tx);
+		rx
+	}
+
+	// get connected peer with given account ID for collation.
+	fn collator_peer_id(&self, account_id: AccountId) -> Option<PeerId> {
+		let check_info = |info: &PeerInfo| info
+			.status
+			.collating_for
+			.as_ref()
+			.map_or(false, |&(ref acc_id, _)| acc_id == &account_id);
+
+		self.peers
+			.iter()
+			.filter(|&(_, info)| check_info(info))
+			.map(|(peer_id, _)| *peer_id)
+			.next()
+	}
+
+	// disconnect a collator by account-id.
+	fn disconnect_bad_collator(&self, ctx: &mut Context<Block>, account_id: AccountId) {
+		if let Some(peer_id) = self.collator_peer_id(account_id) {
+			ctx.disable_peer(peer_id)
+		}
 	}
 }
diff --git a/substrate/polkadot/network/src/tests.rs b/substrate/polkadot/network/src/tests.rs
index 19db9890ccb..5e3eca46516 100644
--- a/substrate/polkadot/network/src/tests.rs
+++ b/substrate/polkadot/network/src/tests.rs
@@ -110,11 +110,12 @@ fn sends_session_key() {
 	let parent_hash = [0; 32].into();
 	let local_key = [1; 32].into();
 
-	let status = Status { collating_for: None };
+	let validator_status = Status { collating_for: None };
+	let collator_status = Status { collating_for: Some(([2; 32].into(), 5.into())) };
 
 	{
 		let mut ctx = TestContext::default();
-		protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority]));
+		protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority]));
 		assert!(ctx.messages.is_empty());
 	}
 
@@ -128,7 +129,7 @@ fn sends_session_key() {
 
 	{
 		let mut ctx = TestContext::default();
-		protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority]));
+		protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![]));
 		assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key)));
 	}
 }
@@ -207,3 +208,24 @@ fn fetches_from_those_with_knowledge() {
 		assert_eq!(recv.wait().unwrap(), block_data);
 	}
 }
+
+#[test]
+fn remove_bad_collator() {
+	let mut protocol = PolkadotProtocol::new();
+
+	let peer_id = 1;
+	let account_id = [2; 32].into();
+
+	let status = Status { collating_for: Some((account_id, 5.into())) };
+
+	{
+		let mut ctx = TestContext::default();
+		protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![]));
+	}
+
+	{
+		let mut ctx = TestContext::default();
+		protocol.disconnect_bad_collator(&mut ctx, account_id);
+		assert!(ctx.disabled.contains(&peer_id));
+	}
+}
diff --git a/substrate/polkadot/primitives/src/parachain.rs b/substrate/polkadot/primitives/src/parachain.rs
index ba528bb148d..bbf9591ee0b 100644
--- a/substrate/polkadot/primitives/src/parachain.rs
+++ b/substrate/polkadot/primitives/src/parachain.rs
@@ -144,7 +144,7 @@ pub struct CandidateReceipt {
 	pub parachain_index: Id,
 	/// The collator's relay-chain account ID
 	pub collator: super::AccountId,
-	/// Signature on block data by collator.
+	/// Signature on blake2-256 of the block data by collator.
 	pub signature: CandidateSignature,
 	/// The head-data
 	pub head_data: HeadData,
@@ -195,6 +195,17 @@ impl CandidateReceipt {
 		use runtime_primitives::traits::{BlakeTwo256, Hash};
 		BlakeTwo256::hash_of(self)
 	}
+
+	/// Check integrity vs. provided block data.
+	pub fn check_signature(&self) -> Result<(), ()> {
+		use runtime_primitives::traits::Verify;
+
+		if self.signature.verify(&self.block_data_hash.0[..], &self.collator) {
+			Ok(())
+		} else {
+			Err(())
+		}
+	}
 }
 
 impl PartialOrd for CandidateReceipt {
diff --git a/substrate/substrate/network/src/protocol.rs b/substrate/substrate/network/src/protocol.rs
index 1b7b1db9cad..93985c85fe2 100644
--- a/substrate/substrate/network/src/protocol.rs
+++ b/substrate/substrate/network/src/protocol.rs
@@ -560,6 +560,11 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> where B::Header: HeaderT<Nu
 
 	pub fn on_block_imported(&self, io: &mut SyncIo, hash: B::Hash, header: &B::Header) {
 		self.sync.write().update_chain_info(&header);
+		self.specialization.write().on_block_imported(
+			&mut ProtocolContext::new(&self.context_data, io),
+			hash.clone(),
+			header
+		);
 
 		// blocks are not announced by light clients
 		if self.config.roles & Role::LIGHT == Role::LIGHT {
diff --git a/substrate/substrate/network/src/specialization.rs b/substrate/substrate/network/src/specialization.rs
index d3fa760317e..999c545291d 100644
--- a/substrate/substrate/network/src/specialization.rs
+++ b/substrate/substrate/network/src/specialization.rs
@@ -42,4 +42,7 @@ pub trait Specialization<B: BlockT>: Send + Sync + 'static {
 
 	/// Called periodically to maintain peers and handle timeouts.
 	fn maintain_peers(&mut self, _ctx: &mut Context<B>) { }
+
+	/// Called when a block is _imported_ at the head of the chain (not during major sync).
+	fn on_block_imported(&mut self, _ctx: &mut Context<B>, _hash: B::Hash, _header: &B::Header) { }
 }
-- 
GitLab