diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock
index 72a56877997832be26fde13a32b46bbd1564f256..a05893e615489b72268efba7a528893636716f5b 100644
--- a/substrate/Cargo.lock
+++ b/substrate/Cargo.lock
@@ -684,7 +684,8 @@ name = "polkadot-candidate-agreement"
 version = "0.1.0"
 dependencies = [
  "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
- "polkadot-primitives 0.1.0",
+ "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
 [[package]]
@@ -1162,6 +1163,15 @@ dependencies = [
  "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
+[[package]]
+name = "tokio-timer"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)",
+ "slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "triehash"
 version = "0.1.0"
@@ -1382,6 +1392,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 "checksum tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "514aae203178929dbf03318ad7c683126672d4d96eccb77b29603d33c9e25743"
 "checksum tokio-proto 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8fbb47ae81353c63c487030659494b295f6cb6576242f907f203473b191b0389"
 "checksum tokio-service 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "24da22d077e0f15f55162bdbdc661228c1581892f52074fb242678d015b45162"
+"checksum tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6131e780037787ff1b3f8aad9da83bca02438b72277850dd6ad0d455e0e20efc"
 "checksum triehash 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9291c7f0fae44858b5e087dd462afb382354120003778f1695b44aab98c7abd7"
 "checksum twox-hash 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "475352206e7a290c5fccc27624a163e8d0d115f7bb60ca18a64fc9ce056d7435"
 "checksum uint 0.1.0 (git+https://github.com/paritytech/primitives.git)" = "<none>"
diff --git a/substrate/candidate-agreement/Cargo.toml b/substrate/candidate-agreement/Cargo.toml
index 9a2dc0ffb77dbf99d652ffb079099b627df12b82..8aa2d0001b5e83aeec4dcaf8be36059367a2d310 100644
--- a/substrate/candidate-agreement/Cargo.toml
+++ b/substrate/candidate-agreement/Cargo.toml
@@ -4,5 +4,6 @@ version = "0.1.0"
 authors = ["Parity Technologies <admin@parity.io>"]
 
 [dependencies]
-futures = "0.1"
-polkadot-primitives = { path = "../primitives" }
+futures = "0.1.17"
+parking_lot = "0.4"
+tokio-timer = "0.1.2"
diff --git a/substrate/candidate-agreement/src/bft/accumulator.rs b/substrate/candidate-agreement/src/bft/accumulator.rs
index 8999a9f29bb0075894bbf85f987e3796196c9d63..ab035737fb84eb0e83f983330679d658bc2a670e 100644
--- a/substrate/candidate-agreement/src/bft/accumulator.rs
+++ b/substrate/candidate-agreement/src/bft/accumulator.rs
@@ -117,37 +117,37 @@ struct VoteCounts {
 
 /// Accumulates messages for a given round of BFT consensus.
 ///
-/// This isn't tied to the "view" of a single validator. It
+/// This isn't tied to the "view" of a single authority. It
 /// keeps accurate track of the state of the BFT consensus based
 /// on all messages imported.
 #[derive(Debug)]
-pub struct Accumulator<Candidate, Digest, ValidatorId, Signature>
+pub struct Accumulator<Candidate, Digest, AuthorityId, Signature>
 	where
 	Candidate: Eq + Clone,
 	Digest: Hash + Eq + Clone,
-	ValidatorId: Hash + Eq,
+	AuthorityId: Hash + Eq,
 	Signature: Eq + Clone,
 {
 	round_number: usize,
 	threshold: usize,
-	round_proposer: ValidatorId,
+	round_proposer: AuthorityId,
 	proposal: Option<Candidate>,
-	prepares: HashMap<ValidatorId, (Digest, Signature)>,
-	commits: HashMap<ValidatorId, (Digest, Signature)>,
+	prepares: HashMap<AuthorityId, (Digest, Signature)>,
+	commits: HashMap<AuthorityId, (Digest, Signature)>,
 	vote_counts: HashMap<Digest, VoteCounts>,
-	advance_round: HashSet<ValidatorId>,
+	advance_round: HashSet<AuthorityId>,
 	state: State<Candidate, Digest, Signature>,
 }
 
-impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, ValidatorId, Signature>
+impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, AuthorityId, Signature>
 	where
 	Candidate: Eq + Clone,
 	Digest: Hash + Eq + Clone,
-	ValidatorId: Hash + Eq,
+	AuthorityId: Hash + Eq,
 	Signature: Eq + Clone,
 {
 	/// Create a new state accumulator.
-	pub fn new(round_number: usize, threshold: usize, round_proposer: ValidatorId) -> Self {
+	pub fn new(round_number: usize, threshold: usize, round_proposer: AuthorityId) -> Self {
 		Accumulator {
 			round_number,
 			threshold,
@@ -171,11 +171,6 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
 		self.round_number.clone()
 	}
 
-	/// Get the round proposer.
-	pub fn round_proposer(&self) -> &ValidatorId {
-		&self.round_proposer
-	}
-
 	pub fn proposal(&self) -> Option<&Candidate> {
 		self.proposal.as_ref()
 	}
@@ -189,7 +184,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
 	/// and authorization should have already been checked.
 	pub fn import_message(
 		&mut self,
-		message: LocalizedMessage<Candidate, Digest, ValidatorId, Signature>,
+		message: LocalizedMessage<Candidate, Digest, AuthorityId, Signature>,
 	)
 	{
 		// message from different round.
@@ -210,7 +205,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
 	fn import_proposal(
 		&mut self,
 		proposal: Candidate,
-		sender: ValidatorId,
+		sender: AuthorityId,
 	) {
 		if sender != self.round_proposer || self.proposal.is_some() { return }
 
@@ -221,7 +216,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
 	fn import_prepare(
 		&mut self,
 		digest: Digest,
-		sender: ValidatorId,
+		sender: AuthorityId,
 		signature: Signature,
 	) {
 		// ignore any subsequent prepares by the same sender.
@@ -264,7 +259,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
 	fn import_commit(
 		&mut self,
 		digest: Digest,
-		sender: ValidatorId,
+		sender: AuthorityId,
 		signature: Signature,
 	) {
 		// ignore any subsequent commits by the same sender.
@@ -304,7 +299,7 @@ impl<Candidate, Digest, ValidatorId, Signature> Accumulator<Candidate, Digest, V
 
 	fn import_advance_round(
 		&mut self,
-		sender: ValidatorId,
+		sender: AuthorityId,
 	) {
 		self.advance_round.insert(sender);
 
@@ -332,7 +327,7 @@ mod tests {
 	pub struct Digest(usize);
 
 	#[derive(Hash, PartialEq, Eq, Debug)]
-	pub struct ValidatorId(usize);
+	pub struct AuthorityId(usize);
 
 	#[derive(PartialEq, Eq, Clone, Debug)]
 	pub struct Signature(usize, usize);
@@ -347,7 +342,7 @@ mod tests {
 
 		let check_message = |r, d: &Digest, s: &Signature| {
 			if r == 2 && d.0 == 600 && s.0 == 600 {
-				Some(ValidatorId(s.1))
+				Some(AuthorityId(s.1))
 			} else {
 				None
 			}
@@ -370,11 +365,11 @@ mod tests {
 
 	#[test]
 	fn accepts_proposal_from_proposer_only() {
-		let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, ValidatorId(8));
+		let mut accumulator = Accumulator::<_, Digest, _, _>::new(1, 7, AuthorityId(8));
 		assert_eq!(accumulator.state(), &State::Begin);
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(5),
+			sender: AuthorityId(5),
 			signature: Signature(999, 5),
 			message: Message::Propose(1, Candidate(999)),
 		});
@@ -382,7 +377,7 @@ mod tests {
 		assert_eq!(accumulator.state(), &State::Begin);
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(8),
+			sender: AuthorityId(8),
 			signature: Signature(999, 8),
 			message: Message::Propose(1, Candidate(999)),
 		});
@@ -392,11 +387,11 @@ mod tests {
 
 	#[test]
 	fn reaches_prepare_phase() {
-		let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
+		let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
 		assert_eq!(accumulator.state(), &State::Begin);
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(8),
+			sender: AuthorityId(8),
 			signature: Signature(999, 8),
 			message: Message::Propose(1, Candidate(999)),
 		});
@@ -405,7 +400,7 @@ mod tests {
 
 		for i in 0..6 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(999, i),
 				message: Message::Prepare(1, Digest(999)),
 			});
@@ -414,7 +409,7 @@ mod tests {
 		}
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(7),
+			sender: AuthorityId(7),
 			signature: Signature(999, 7),
 			message: Message::Prepare(1, Digest(999)),
 		});
@@ -427,11 +422,11 @@ mod tests {
 
 	#[test]
 	fn prepare_to_commit() {
-		let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
+		let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
 		assert_eq!(accumulator.state(), &State::Begin);
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(8),
+			sender: AuthorityId(8),
 			signature: Signature(999, 8),
 			message: Message::Propose(1, Candidate(999)),
 		});
@@ -440,7 +435,7 @@ mod tests {
 
 		for i in 0..6 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(999, i),
 				message: Message::Prepare(1, Digest(999)),
 			});
@@ -449,7 +444,7 @@ mod tests {
 		}
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(7),
+			sender: AuthorityId(7),
 			signature: Signature(999, 7),
 			message: Message::Prepare(1, Digest(999)),
 		});
@@ -461,7 +456,7 @@ mod tests {
 
 		for i in 0..6 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(999, i),
 				message: Message::Commit(1, Digest(999)),
 			});
@@ -473,7 +468,7 @@ mod tests {
 		}
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(7),
+			sender: AuthorityId(7),
 			signature: Signature(999, 7),
 			message: Message::Commit(1, Digest(999)),
 		});
@@ -486,11 +481,11 @@ mod tests {
 
 	#[test]
 	fn prepare_to_advance() {
-		let mut accumulator = Accumulator::new(1, 7, ValidatorId(8));
+		let mut accumulator = Accumulator::new(1, 7, AuthorityId(8));
 		assert_eq!(accumulator.state(), &State::Begin);
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(8),
+			sender: AuthorityId(8),
 			signature: Signature(999, 8),
 			message: Message::Propose(1, Candidate(999)),
 		});
@@ -499,7 +494,7 @@ mod tests {
 
 		for i in 0..7 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(999, i),
 				message: Message::Prepare(1, Digest(999)),
 			});
@@ -512,7 +507,7 @@ mod tests {
 
 		for i in 0..6 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(999, i),
 				message: Message::AdvanceRound(1),
 			});
@@ -524,7 +519,7 @@ mod tests {
 		}
 
 		accumulator.import_message(LocalizedMessage {
-			sender: ValidatorId(7),
+			sender: AuthorityId(7),
 			signature: Signature(999, 7),
 			message: Message::AdvanceRound(1),
 		});
@@ -537,12 +532,12 @@ mod tests {
 
 	#[test]
 	fn conclude_different_than_proposed() {
-		let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, ValidatorId(8));
+		let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
 		assert_eq!(accumulator.state(), &State::Begin);
 
 		for i in 0..7 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(999, i),
 				message: Message::Prepare(1, Digest(999)),
 			});
@@ -555,7 +550,7 @@ mod tests {
 
 		for i in 0..7 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(999, i),
 				message: Message::Commit(1, Digest(999)),
 			});
@@ -569,12 +564,12 @@ mod tests {
 
 	#[test]
 	fn begin_to_advance() {
-		let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, ValidatorId(8));
+		let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, AuthorityId(8));
 		assert_eq!(accumulator.state(), &State::Begin);
 
 		for i in 0..7 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(1, i),
 				message: Message::AdvanceRound(1),
 			});
@@ -588,12 +583,12 @@ mod tests {
 
 	#[test]
 	fn conclude_without_prepare() {
-		let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, ValidatorId(8));
+		let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
 		assert_eq!(accumulator.state(), &State::Begin);
 
 		for i in 0..7 {
 			accumulator.import_message(LocalizedMessage {
-				sender: ValidatorId(i),
+				sender: AuthorityId(i),
 				signature: Signature(999, i),
 				message: Message::Commit(1, Digest(999)),
 			});
diff --git a/substrate/candidate-agreement/src/bft/mod.rs b/substrate/candidate-agreement/src/bft/mod.rs
index b17092c4514124145b70b664c6b851ab1168463a..f131e44e1f8b93d48fbba3c8a8586d08b70f1204 100644
--- a/substrate/candidate-agreement/src/bft/mod.rs
+++ b/substrate/candidate-agreement/src/bft/mod.rs
@@ -76,30 +76,30 @@ pub trait Context {
 	type Candidate: Debug + Eq + Clone;
 	/// Candidate digest.
 	type Digest: Debug + Hash + Eq + Clone;
-	/// Validator ID.
-	type ValidatorId: Debug + Hash + Eq + Clone;
+	/// Authority ID.
+	type AuthorityId: Debug + Hash + Eq + Clone;
 	/// Signature.
 	type Signature: Debug + Eq + Clone;
 	/// A future that resolves when a round timeout is concluded.
 	type RoundTimeout: Future<Item=()>;
 	/// A future that resolves when a proposal is ready.
-	type Proposal: Future<Item=Self::Candidate>;
+	type CreateProposal: Future<Item=Self::Candidate>;
 
-	/// Get the local validator ID.
-	fn local_id(&self) -> Self::ValidatorId;
+	/// Get the local authority ID.
+	fn local_id(&self) -> Self::AuthorityId;
 
 	/// Get the best proposal.
-	fn proposal(&self) -> Self::Proposal;
+	fn proposal(&self) -> Self::CreateProposal;
 
 	/// Get the digest of a candidate.
 	fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
 
-	/// Sign a message using the local validator ID.
+	/// Sign a message using the local authority ID.
 	fn sign_local(&self, message: Message<Self::Candidate, Self::Digest>)
-		-> LocalizedMessage<Self::Candidate, Self::Digest, Self::ValidatorId, Self::Signature>;
+		-> LocalizedMessage<Self::Candidate, Self::Digest, Self::AuthorityId, Self::Signature>;
 
 	/// Get the proposer for a given round of consensus.
-	fn round_proposer(&self, round: usize) -> Self::ValidatorId;
+	fn round_proposer(&self, round: usize) -> Self::AuthorityId;
 
 	/// Whether the candidate is valid.
 	fn candidate_valid(&self, candidate: &Self::Candidate) -> bool;
@@ -121,11 +121,11 @@ pub enum Communication<C, D, V, S> {
 
 /// Type alias for a localized message using only type parameters from `Context`.
 // TODO: actual type alias when it's no longer a warning.
-pub struct ContextCommunication<C: Context + ?Sized>(pub Communication<C::Candidate, C::Digest, C::ValidatorId, C::Signature>);
+pub struct ContextCommunication<C: Context + ?Sized>(pub Communication<C::Candidate, C::Digest, C::AuthorityId, C::Signature>);
 
 impl<C: Context + ?Sized> Clone for ContextCommunication<C>
 	where
-		LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>: Clone,
+		LocalizedMessage<C::Candidate, C::Digest, C::AuthorityId, C::Signature>: Clone,
 		PrepareJustification<C::Digest, C::Signature>: Clone,
 {
 	fn clone(&self) -> Self {
@@ -242,19 +242,19 @@ enum LocalState {
 //   - a higher threshold-prepare is broadcast to us. in this case we can
 //     advance to the round of the threshold-prepare. this is an indication
 //     that we have experienced severe asynchrony/clock drift with the remainder
-//     of the other validators, and it is unlikely that we can assist in
+//     of the other authorities, and it is unlikely that we can assist in
 //     consensus meaningfully. nevertheless we make an attempt.
 struct Strategy<C: Context> {
 	nodes: usize,
 	max_faulty: usize,
-	fetching_proposal: Option<C::Proposal>,
+	fetching_proposal: Option<C::CreateProposal>,
 	round_timeout: future::Fuse<C::RoundTimeout>,
 	local_state: LocalState,
 	locked: Option<Locked<C::Digest, C::Signature>>,
 	notable_candidates: HashMap<C::Digest, C::Candidate>,
-	current_accumulator: Accumulator<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
-	future_accumulator: Accumulator<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
-	local_id: C::ValidatorId,
+	current_accumulator: Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
+	future_accumulator: Accumulator<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
+	local_id: C::AuthorityId,
 }
 
 impl<C: Context> Strategy<C> {
@@ -290,7 +290,7 @@ impl<C: Context> Strategy<C> {
 
 	fn import_message(
 		&mut self,
-		msg: LocalizedMessage<C::Candidate, C::Digest, C::ValidatorId, C::Signature>
+		msg: LocalizedMessage<C::Candidate, C::Digest, C::AuthorityId, C::Signature>
 	) {
 		let round_number = msg.message.round_number();
 
@@ -330,7 +330,7 @@ impl<C: Context> Strategy<C> {
 		-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
 		where
 			C::RoundTimeout: Future<Error=E>,
-			C::Proposal: Future<Error=E>,
+			C::CreateProposal: Future<Error=E>,
 	{
 		let mut last_watermark = (
 			self.current_accumulator.round_number(),
@@ -363,7 +363,7 @@ impl<C: Context> Strategy<C> {
 		-> Poll<Committed<C::Candidate, C::Digest, C::Signature>, E>
 		where
 			C::RoundTimeout: Future<Error=E>,
-			C::Proposal: Future<Error=E>,
+			C::CreateProposal: Future<Error=E>,
 	{
 		self.propose(context, sending)?;
 		self.prepare(context, sending);
@@ -413,7 +413,7 @@ impl<C: Context> Strategy<C> {
 	}
 
 	fn propose(&mut self, context: &C, sending: &mut Sending<ContextCommunication<C>>)
-		-> Result<(), <C::Proposal as Future>::Error>
+		-> Result<(), <C::CreateProposal as Future>::Error>
 	{
 		if let LocalState::Start = self.local_state {
 			let mut propose = false;
@@ -629,7 +629,7 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
 	where
 		C: Context,
 		C::RoundTimeout: Future<Error=E>,
-		C::Proposal: Future<Error=E>,
+		C::CreateProposal: Future<Error=E>,
 		I: Stream<Item=ContextCommunication<C>,Error=E>,
 		O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
 		E: From<InputStreamConcluded>,
@@ -699,8 +699,15 @@ impl<C, I, O, E> Future for Agreement<C, I, O>
 /// conclude without having witnessed the conclusion.
 /// In general, this future should be pre-empted by the import of a justification
 /// set for this block height.
-pub fn agree<C: Context, I, O>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
+pub fn agree<C: Context, I, O, E>(context: C, nodes: usize, max_faulty: usize, input: I, output: O)
 	-> Agreement<C, I, O>
+	where
+		C: Context,
+		C::RoundTimeout: Future<Error=E>,
+		C::CreateProposal: Future<Error=E>,
+		I: Stream<Item=ContextCommunication<C>,Error=E>,
+		O: Sink<SinkItem=ContextCommunication<C>,SinkError=E>,
+		E: From<InputStreamConcluded>,
 {
 	let strategy = Strategy::create(&context, nodes, max_faulty);
 	Agreement {
diff --git a/substrate/candidate-agreement/src/bft/tests.rs b/substrate/candidate-agreement/src/bft/tests.rs
index ff66ff047658b462ba69e21a4d22e196278eca38..10ef9321242b195a7887378901fc163e63842179 100644
--- a/substrate/candidate-agreement/src/bft/tests.rs
+++ b/substrate/candidate-agreement/src/bft/tests.rs
@@ -18,11 +18,13 @@
 
 use super::*;
 
+use tests::Network;
+
 use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
 use futures::prelude::*;
-use futures::sync::{oneshot, mpsc};
+use futures::sync::oneshot;
 use futures::future::FutureResult;
 
 #[derive(Debug, PartialEq, Eq, Clone, Hash)]
@@ -32,10 +34,10 @@ struct Candidate(usize);
 struct Digest(usize);
 
 #[derive(Debug, PartialEq, Eq, Clone, Hash)]
-struct ValidatorId(usize);
+struct AuthorityId(usize);
 
 #[derive(Debug, PartialEq, Eq, Clone)]
-struct Signature(Message<Candidate, Digest>, ValidatorId);
+struct Signature(Message<Candidate, Digest>, AuthorityId);
 
 struct SharedContext {
 	node_count: usize,
@@ -87,13 +89,13 @@ impl SharedContext {
 		self.current_round += 1;
 	}
 
-	fn round_proposer(&self, round: usize) -> ValidatorId {
-		ValidatorId(round % self.node_count)
+	fn round_proposer(&self, round: usize) -> AuthorityId {
+		AuthorityId(round % self.node_count)
 	}
 }
 
 struct TestContext {
-	local_id: ValidatorId,
+	local_id: AuthorityId,
 	proposal: Mutex<usize>,
 	shared: Arc<Mutex<SharedContext>>,
 }
@@ -101,16 +103,16 @@ struct TestContext {
 impl Context for TestContext {
 	type Candidate = Candidate;
 	type Digest = Digest;
-	type ValidatorId = ValidatorId;
+	type AuthorityId = AuthorityId;
 	type Signature = Signature;
 	type RoundTimeout = Box<Future<Item=(), Error=Error>>;
-	type Proposal = FutureResult<Candidate, Error>;
+	type CreateProposal = FutureResult<Candidate, Error>;
 
-	fn local_id(&self) -> ValidatorId {
+	fn local_id(&self) -> AuthorityId {
 		self.local_id.clone()
 	}
 
-	fn proposal(&self) -> Self::Proposal {
+	fn proposal(&self) -> Self::CreateProposal {
 		let proposal = {
 			let mut p = self.proposal.lock().unwrap();
 			let x = *p;
@@ -126,7 +128,7 @@ impl Context for TestContext {
 	}
 
 	fn sign_local(&self, message: Message<Candidate, Digest>)
-		-> LocalizedMessage<Candidate, Digest, ValidatorId, Signature>
+		-> LocalizedMessage<Candidate, Digest, AuthorityId, Signature>
 	{
 		let signature = Signature(message.clone(), self.local_id.clone());
 		LocalizedMessage {
@@ -136,7 +138,7 @@ impl Context for TestContext {
 		}
 	}
 
-	fn round_proposer(&self, round: usize) -> ValidatorId {
+	fn round_proposer(&self, round: usize) -> AuthorityId {
 		self.shared.lock().unwrap().round_proposer(round)
 	}
 
@@ -149,70 +151,6 @@ impl Context for TestContext {
 	}
 }
 
-type Comm = ContextCommunication<TestContext>;
-
-struct Network {
-	endpoints: Vec<mpsc::UnboundedSender<Comm>>,
-	input: mpsc::UnboundedReceiver<(usize, Comm)>,
-}
-
-impl Network {
-	fn new(nodes: usize)
-		-> (Network, Vec<mpsc::UnboundedSender<(usize, Comm)>>, Vec<mpsc::UnboundedReceiver<Comm>>)
-	{
-		let mut inputs = Vec::with_capacity(nodes);
-		let mut outputs = Vec::with_capacity(nodes);
-		let mut endpoints = Vec::with_capacity(nodes);
-
-		let (in_tx, in_rx) = mpsc::unbounded();
-		for _ in 0..nodes {
-			let (out_tx, out_rx) = mpsc::unbounded();
-			inputs.push(in_tx.clone());
-			outputs.push(out_rx);
-			endpoints.push(out_tx);
-		}
-
-		let network = Network {
-			endpoints,
-			input: in_rx,
-		};
-
-		(network, inputs, outputs)
-	}
-
-	fn route_on_thread(self) {
-		::std::thread::spawn(move || { let _ = self.wait(); });
-	}
-}
-
-impl Future for Network {
-	type Item = ();
-	type Error = Error;
-
-	fn poll(&mut self) -> Poll<(), Error> {
-		match self.input.poll() {
-			Err(_) => Err(Error),
-			Ok(Async::NotReady) => Ok(Async::NotReady),
-			Ok(Async::Ready(None)) => Ok(Async::Ready(())),
-			Ok(Async::Ready(Some((sender, item)))) => {
-				{
-					let receiving_endpoints = self.endpoints
-						.iter()
-						.enumerate()
-						.filter(|&(i, _)| i != sender)
-						.map(|(_, x)| x);
-
-					for endpoint in receiving_endpoints {
-						let _ = endpoint.unbounded_send(item.clone());
-					}
-				}
-
-				self.poll()
-			}
-		}
-	}
-}
-
 fn timeout_in(t: Duration) -> oneshot::Receiver<()> {
 	let (tx, rx) = oneshot::channel();
 	::std::thread::spawn(move || {
@@ -240,7 +178,7 @@ fn consensus_completes_with_minimum_good() {
 		.enumerate()
 		.map(|(i, (tx, rx))| {
 			let ctx = TestContext {
-				local_id: ValidatorId(i),
+				local_id: AuthorityId(i),
 				proposal: Mutex::new(i),
 				shared: shared_context.clone(),
 			};
@@ -296,7 +234,7 @@ fn consensus_does_not_complete_without_enough_nodes() {
 		.enumerate()
 		.map(|(i, (tx, rx))| {
 			let ctx = TestContext {
-				local_id: ValidatorId(i),
+				local_id: AuthorityId(i),
 				proposal: Mutex::new(i),
 				shared: shared_context.clone(),
 			};
@@ -335,7 +273,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
 		round_number: locked_round,
 		digest: locked_digest.clone(),
 		signatures: (0..7)
-			.map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), ValidatorId(i)))
+			.map(|i| Signature(Message::Prepare(locked_round, locked_digest.clone()), AuthorityId(i)))
 			.collect()
 	}.check(7, |_, _, s| Some(s.1.clone())).unwrap();
 
@@ -352,7 +290,7 @@ fn threshold_plus_one_locked_on_proposal_only_one_with_candidate() {
 		.enumerate()
 		.map(|(i, (tx, rx))| {
 			let ctx = TestContext {
-				local_id: ValidatorId(i),
+				local_id: AuthorityId(i),
 				proposal: Mutex::new(i),
 				shared: shared_context.clone(),
 			};
diff --git a/substrate/candidate-agreement/src/handle_incoming.rs b/substrate/candidate-agreement/src/handle_incoming.rs
new file mode 100644
index 0000000000000000000000000000000000000000..625c950784106b7448c58294e89ce0b861ef4d5a
--- /dev/null
+++ b/substrate/candidate-agreement/src/handle_incoming.rs
@@ -0,0 +1,214 @@
+// Copyright 2017 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! A stream that handles incoming messages to the BFT agreement module and statement
+//! table. It forwards as necessary, and dispatches requests for determining availability
+//! and validity of candidates as necessary.
+
+use std::collections::HashSet;
+
+use futures::prelude::*;
+use futures::stream::{Fuse, FuturesUnordered};
+use futures::sync::mpsc;
+
+use table::{self, Statement, Context as TableContext};
+
+use super::{Context, CheckedMessage, SharedTable, TypeResolve};
+
+enum CheckResult {
+	Available,
+	Unavailable,
+	Valid,
+	Invalid,
+}
+
+enum Checking<D, A, V> {
+	Availability(D, A),
+	Validity(D, V),
+}
+
+impl<D, A, V, E> Future for Checking<D, A, V>
+	where
+		D: Clone,
+		A: Future<Item=bool,Error=E>,
+		V: Future<Item=bool,Error=E>,
+{
+	type Item = (D, CheckResult);
+	type Error = E;
+
+	fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+		Ok(Async::Ready(match *self {
+			Checking::Availability(ref digest, ref mut f) => {
+				match try_ready!(f.poll()) {
+					true => (digest.clone(), CheckResult::Available),
+					false => (digest.clone(), CheckResult::Unavailable),
+				}
+			}
+			Checking::Validity(ref digest, ref mut f) => {
+				match try_ready!(f.poll()) {
+					true => (digest.clone(), CheckResult::Valid),
+					false => (digest.clone(), CheckResult::Invalid),
+				}
+			}
+		}))
+	}
+}
+
+/// Handles incoming messages to the BFT service and statement table.
+///
+/// Also triggers requests for determining validity and availability of other
+/// parachain candidates.
+pub struct HandleIncoming<C: Context, I> {
+	table: SharedTable<C>,
+	messages_in: Fuse<I>,
+	bft_out: mpsc::UnboundedSender<<C as TypeResolve>::BftCommunication>,
+	local_id: C::AuthorityId,
+	requesting_about: FuturesUnordered<Checking<
+		C::Digest,
+		<C::CheckAvailability as IntoFuture>::Future,
+		<C::CheckCandidate as IntoFuture>::Future,
+	>>,
+	checked_validity: HashSet<C::Digest>,
+	checked_availability: HashSet<C::Digest>,
+}
+
+impl<C: Context, I> HandleIncoming<C, I> {
+	fn sign_and_import_statement(&self, digest: C::Digest, result: CheckResult) {
+		let statement = match result {
+			CheckResult::Valid => Statement::Valid(digest),
+			CheckResult::Invalid => Statement::Invalid(digest),
+			CheckResult::Available => Statement::Available(digest),
+			CheckResult::Unavailable => return, // no such statement and not provable.
+		};
+
+		// TODO: trigger broadcast to peers immediately?
+		self.table.sign_and_import(statement);
+	}
+
+	fn import_message(&mut self, origin: C::AuthorityId, message: CheckedMessage<C>) {
+		match message {
+			CheckedMessage::Bft(msg) => { let _ = self.bft_out.unbounded_send(msg); }
+			CheckedMessage::Table(table_messages) => {
+				// import all table messages and check for any that we
+				// need to produce statements for.
+				let msg_iter = table_messages
+					.into_iter()
+					.map(|m| (m, Some(origin.clone())));
+				let summaries: Vec<_> = self.table.import_statements(msg_iter);
+
+				for summary in summaries {
+					self.dispatch_on_summary(summary)
+				}
+			}
+		}
+	}
+
+	// on new candidates in our group, begin checking validity.
+	// on new candidates in our availability sphere, begin checking availability.
+	fn dispatch_on_summary(&mut self, summary: table::Summary<C::Digest, C::GroupId>) {
+		let is_validity_member =
+			self.table.context().is_member_of(&self.local_id, &summary.group_id);
+
+		let is_availability_member =
+			self.table.context().is_availability_guarantor_of(&self.local_id, &summary.group_id);
+
+		let digest = &summary.candidate;
+
+		// TODO: consider a strategy based on the number of candidate votes as well.
+		let checking_validity =
+			is_validity_member &&
+			self.checked_validity.insert(digest.clone()) &&
+			self.table.proposed_digest() != Some(digest.clone());
+
+		let checking_availability = is_availability_member && self.checked_availability.insert(digest.clone());
+
+		if checking_validity || checking_availability {
+			let context = &*self.table.context();
+			let requesting_about = &mut self.requesting_about;
+			self.table.with_candidate(digest, |c| match c {
+				None => {} // TODO: handle table inconsistency somehow?
+				Some(candidate) => {
+					if checking_validity {
+						let future = context.check_validity(candidate).into_future();
+						let checking = Checking::Validity(digest.clone(), future);
+						requesting_about.push(checking);
+					}
+
+					if checking_availability {
+						let future = context.check_availability(candidate).into_future();
+						let checking = Checking::Availability(digest.clone(), future);
+						requesting_about.push(checking);
+					}
+				}
+			})
+		}
+	}
+}
+
+impl<C, I, E> HandleIncoming<C, I>
+	where
+		C: Context,
+		I: Stream<Item=(C::AuthorityId, CheckedMessage<C>),Error=E>,
+		C::CheckAvailability: IntoFuture<Error=E>,
+		C::CheckCandidate: IntoFuture<Error=E>,
+{
+	pub fn new(
+		table: SharedTable<C>,
+		messages_in: I,
+		bft_out: mpsc::UnboundedSender<<C as TypeResolve>::BftCommunication>,
+	) -> Self {
+		let local_id = table.context().local_id();
+
+		HandleIncoming {
+			table,
+			bft_out,
+			local_id,
+			messages_in: messages_in.fuse(),
+			requesting_about: FuturesUnordered::new(),
+			checked_validity: HashSet::new(),
+			checked_availability: HashSet::new(),
+		}
+	}
+}
+
+impl<C, I, E> Future for HandleIncoming<C, I>
+	where
+		C: Context,
+		I: Stream<Item=(C::AuthorityId, CheckedMessage<C>),Error=E>,
+		C::CheckAvailability: IntoFuture<Error=E>,
+		C::CheckCandidate: IntoFuture<Error=E>,
+{
+	type Item = ();
+	type Error = E;
+
+	fn poll(&mut self) -> Poll<(), E> {
+		loop {
+			// FuturesUnordered is safe to poll after it has completed.
+			while let Async::Ready(Some((d, r))) = self.requesting_about.poll()? {
+				self.sign_and_import_statement(d, r);
+			}
+
+			match try_ready!(self.messages_in.poll()) {
+				None => if self.requesting_about.is_empty() {
+					return Ok(Async::Ready(()))
+				} else {
+					return Ok(Async::NotReady)
+				},
+				Some((origin, msg)) => self.import_message(origin, msg),
+			}
+		}
+	}
+}
diff --git a/substrate/candidate-agreement/src/lib.rs b/substrate/candidate-agreement/src/lib.rs
index 09dd56f5f0874fdbf4642a094068fa6a49ce9711..2cf4be5c547157d18e47c621ec63c014d381c142 100644
--- a/substrate/candidate-agreement/src/lib.rs
+++ b/substrate/candidate-agreement/src/lib.rs
@@ -16,21 +16,610 @@
 
 //! Propagation and agreement of candidates.
 //!
-//! Validators are split into groups by parachain, and each validator might come
-//! up its own candidate for their parachain. Within groups, validators pass around
+//! Authorities are split into groups by parachain, and each authority might come
+//! up its own candidate for their parachain. Within groups, authorities pass around
 //! their candidates and produce statements of validity.
 //!
-//! Any candidate that receives majority approval by the validators in a group
-//! may be subject to inclusion, unless any validators flag that candidate as invalid.
+//! Any candidate that receives majority approval by the authorities in a group
+//! may be subject to inclusion, unless any authorities flag that candidate as invalid.
 //!
 //! Wrongly flagging as invalid should be strongly disincentivized, so that in the
 //! equilibrium state it is not expected to happen. Likewise with the submission
 //! of invalid blocks.
 //!
-//! Groups themselves may be compromised by malicious validators.
+//! Groups themselves may be compromised by malicious authorities.
 
+#[macro_use]
 extern crate futures;
-extern crate polkadot_primitives as primitives;
+extern crate parking_lot;
+extern crate tokio_timer;
 
-pub mod bft;
-pub mod table;
+use std::collections::{HashMap, HashSet};
+use std::fmt::Debug;
+use std::hash::Hash;
+use std::sync::Arc;
+use std::time::Duration;
+
+use futures::prelude::*;
+use futures::sync::{mpsc, oneshot};
+use parking_lot::Mutex;
+use tokio_timer::Timer;
+
+use table::Table;
+
+mod bft;
+mod handle_incoming;
+mod round_robin;
+mod table;
+
+#[cfg(test)]
+pub mod tests;
+
+/// Context necessary for agreement.
+pub trait Context: Send + Clone {
+	/// A authority ID
+	type AuthorityId: Debug + Hash + Eq + Clone + Ord;
+	/// The digest (hash or other unique attribute) of a candidate.
+	type Digest: Debug + Hash + Eq + Clone;
+	/// The group ID type
+	type GroupId: Debug + Hash + Ord + Eq + Clone;
+	/// A signature type.
+	type Signature: Debug + Eq + Clone;
+	/// Candidate type. In practice this will be a candidate receipt.
+	type ParachainCandidate: Debug + Ord + Eq + Clone;
+	/// The actual block proposal type. This is what is agreed upon, and
+	/// is composed of multiple candidates.
+	type Proposal: Debug + Eq + Clone;
+
+	/// A future that resolves when a candidate is checked for validity.
+	///
+	/// In Polkadot, this will involve fetching the corresponding block data,
+	/// producing the necessary ingress, and running the parachain validity function.
+	type CheckCandidate: IntoFuture<Item=bool>;
+
+	/// A future that resolves when availability of a candidate's external
+	/// data is checked.
+	type CheckAvailability: IntoFuture<Item=bool>;
+
+	/// The statement batch type.
+	type StatementBatch: StatementBatch<
+		Self::AuthorityId,
+		table::SignedStatement<Self::ParachainCandidate, Self::Digest, Self::AuthorityId, Self::Signature>,
+	>;
+
+	/// Get the digest of a candidate.
+	fn candidate_digest(candidate: &Self::ParachainCandidate) -> Self::Digest;
+
+	/// Get the digest of a proposal.
+	fn proposal_digest(proposal: &Self::Proposal) -> Self::Digest;
+
+	/// Get the group of a candidate.
+	fn candidate_group(candidate: &Self::ParachainCandidate) -> Self::GroupId;
+
+	/// Get the primary for a given round.
+	fn round_proposer(&self, round: usize) -> Self::AuthorityId;
+
+	/// Check a candidate for validity.
+	fn check_validity(&self, candidate: &Self::ParachainCandidate) -> Self::CheckCandidate;
+
+	/// Check availability of candidate data.
+	fn check_availability(&self, candidate: &Self::ParachainCandidate) -> Self::CheckAvailability;
+
+	/// Attempt to combine a set of parachain candidates into a proposal.
+	///
+	/// This may arbitrarily return `None`, but the intent is for `Some`
+	/// to only be returned when candidates from enough groups are known.
+	///
+	/// "enough" may be subjective as well.
+	fn create_proposal(&self, candidates: Vec<&Self::ParachainCandidate>)
+		-> Option<Self::Proposal>;
+
+	/// Check validity of a proposal. This should call out to the `check_candidate`
+	/// function for all parachain candidates contained within it, as well as
+	/// checking other validity constraints of the proposal.
+	fn proposal_valid<F>(&self, proposal: &Self::Proposal, check_candidate: F) -> bool
+		where F: FnMut(&Self::ParachainCandidate) -> bool;
+
+	/// Get the local authority ID.
+	fn local_id(&self) -> Self::AuthorityId;
+
+	/// Sign a table validity statement with the local key.
+	fn sign_table_statement(
+		&self,
+		statement: &table::Statement<Self::ParachainCandidate, Self::Digest>
+	) -> Self::Signature;
+
+	/// Sign a BFT agreement message.
+	fn sign_bft_message(&self, &bft::Message<Self::Proposal, Self::Digest>) -> Self::Signature;
+}
+
+/// Helper for type resolution for contexts until type aliases apply bounds.
+pub trait TypeResolve {
+	type SignedTableStatement;
+	type BftCommunication;
+	type BftCommitted;
+	type Misbehavior;
+}
+
+impl<C: Context> TypeResolve for C {
+	type SignedTableStatement = table::SignedStatement<C::ParachainCandidate, C::Digest, C::AuthorityId, C::Signature>;
+	type BftCommunication = bft::Communication<C::Proposal, C::Digest, C::AuthorityId, C::Signature>;
+	type BftCommitted = bft::Committed<C::Proposal,C::Digest,C::Signature>;
+	type Misbehavior = table::Misbehavior<C::ParachainCandidate, C::Digest, C::AuthorityId, C::Signature>;
+}
+
+/// Information about a specific group.
+#[derive(Debug, Clone)]
+pub struct GroupInfo<V: Hash + Eq> {
+	/// Authorities meant to check validity of candidates.
+	pub validity_guarantors: HashSet<V>,
+	/// Authorities meant to check availability of candidate data.
+	pub availability_guarantors: HashSet<V>,
+	/// Number of votes needed for validity.
+	pub needed_validity: usize,
+	/// Number of votes needed for availability.
+	pub needed_availability: usize,
+}
+
+struct TableContext<C: Context> {
+	context: C,
+	groups: HashMap<C::GroupId, GroupInfo<C::AuthorityId>>,
+}
+
+impl<C: Context> ::std::ops::Deref for TableContext<C> {
+	type Target = C;
+
+	fn deref(&self) -> &C {
+		&self.context
+	}
+}
+
+impl<C: Context> table::Context for TableContext<C> {
+	type AuthorityId = C::AuthorityId;
+	type Digest = C::Digest;
+	type GroupId = C::GroupId;
+	type Signature = C::Signature;
+	type Candidate = C::ParachainCandidate;
+
+	fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest {
+		C::candidate_digest(candidate)
+	}
+
+	fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId {
+		C::candidate_group(candidate)
+	}
+
+	fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool {
+		self.groups.get(group).map_or(false, |g| g.validity_guarantors.contains(authority))
+	}
+
+	fn is_availability_guarantor_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool {
+		self.groups.get(group).map_or(false, |g| g.availability_guarantors.contains(authority))
+	}
+
+	fn requisite_votes(&self, group: &Self::GroupId) -> (usize, usize) {
+		self.groups.get(group).map_or(
+			(usize::max_value(), usize::max_value()),
+			|g| (g.needed_validity, g.needed_availability),
+		)
+	}
+}
+
+// A shared table object.
+struct SharedTableInner<C: Context> {
+	table: Table<TableContext<C>>,
+	proposed_digest: Option<C::Digest>,
+	awaiting_proposal: Vec<oneshot::Sender<C::Proposal>>,
+}
+
+impl<C: Context> SharedTableInner<C> {
+	fn import_statement(
+		&mut self,
+		context: &TableContext<C>,
+		statement: <C as TypeResolve>::SignedTableStatement,
+		received_from: Option<C::AuthorityId>
+	) -> Option<table::Summary<C::Digest, C::GroupId>> {
+		self.table.import_statement(context, statement, received_from)
+	}
+
+	fn update_proposal(&mut self, context: &TableContext<C>) {
+		if self.awaiting_proposal.is_empty() { return }
+		let proposal_candidates = self.table.proposed_candidates(context);
+		if let Some(proposal) = context.context.create_proposal(proposal_candidates) {
+			for sender in self.awaiting_proposal.drain(..) {
+				let _ = sender.send(proposal.clone());
+			}
+		}
+	}
+
+	fn get_proposal(&mut self, context: &TableContext<C>) -> oneshot::Receiver<C::Proposal> {
+		let (tx, rx) = oneshot::channel();
+		self.awaiting_proposal.push(tx);
+		self.update_proposal(context);
+		rx
+	}
+
+	fn proposal_valid(&mut self, context: &TableContext<C>, proposal: &C::Proposal) -> bool {
+		context.context.proposal_valid(proposal, |contained_candidate| {
+			// check that the candidate is valid (has enough votes)
+			let digest = C::candidate_digest(contained_candidate);
+			self.table.candidate_includable(&digest, context)
+		})
+	}
+}
+
+/// A shared table object.
+pub struct SharedTable<C: Context> {
+	context: Arc<TableContext<C>>,
+	inner: Arc<Mutex<SharedTableInner<C>>>,
+}
+
+impl<C: Context> Clone for SharedTable<C> {
+	fn clone(&self) -> Self {
+		SharedTable {
+			context: self.context.clone(),
+			inner: self.inner.clone()
+		}
+	}
+}
+
+impl<C: Context> SharedTable<C> {
+	/// Create a new shared table.
+	pub fn new(context: C, groups: HashMap<C::GroupId, GroupInfo<C::AuthorityId>>) -> Self {
+		SharedTable {
+			context: Arc::new(TableContext { context, groups }),
+			inner: Arc::new(Mutex::new(SharedTableInner {
+				table: Table::default(),
+				awaiting_proposal: Vec::new(),
+				proposed_digest: None,
+			}))
+		}
+	}
+
+	/// Import a single statement.
+	pub fn import_statement(
+		&self,
+		statement: <C as TypeResolve>::SignedTableStatement,
+		received_from: Option<C::AuthorityId>,
+	) -> Option<table::Summary<C::Digest, C::GroupId>> {
+		self.inner.lock().import_statement(&*self.context, statement, received_from)
+	}
+
+	/// Sign and import a local statement.
+	pub fn sign_and_import(
+		&self,
+		statement: table::Statement<C::ParachainCandidate, C::Digest>,
+	) -> Option<table::Summary<C::Digest, C::GroupId>> {
+		let proposed_digest = match statement {
+			table::Statement::Candidate(ref c) => Some(C::candidate_digest(c)),
+			_ => None,
+		};
+
+		let signed_statement = table::SignedStatement {
+			signature: self.context.sign_table_statement(&statement),
+			sender: self.context.local_id(),
+			statement,
+		};
+
+		let mut inner = self.inner.lock();
+		if proposed_digest.is_some() {
+			inner.proposed_digest = proposed_digest;
+		}
+
+		inner.import_statement(&*self.context, signed_statement, None)
+	}
+
+	/// Import many statements at once.
+	///
+	/// Provide an iterator yielding pairs of (statement, received_from).
+	pub fn import_statements<I, U>(&self, iterable: I) -> U
+		where
+			I: IntoIterator<Item=(<C as TypeResolve>::SignedTableStatement, Option<C::AuthorityId>)>,
+			U: ::std::iter::FromIterator<table::Summary<C::Digest, C::GroupId>>,
+	{
+		let mut inner = self.inner.lock();
+
+		iterable.into_iter().filter_map(move |(statement, received_from)| {
+			inner.import_statement(&*self.context, statement, received_from)
+		}).collect()
+	}
+
+	/// Update the proposal sealing.
+	pub fn update_proposal(&self) {
+		self.inner.lock().update_proposal(&*self.context)
+	}
+
+	/// Register interest in receiving a proposal when ready.
+	/// If one is ready immediately, it will be provided.
+	pub fn get_proposal(&self) -> oneshot::Receiver<C::Proposal> {
+		self.inner.lock().get_proposal(&*self.context)
+	}
+
+	/// Check if a proposal is valid.
+	pub fn proposal_valid(&self, proposal: &C::Proposal) -> bool {
+		self.inner.lock().proposal_valid(&*self.context, proposal)
+	}
+
+	/// Execute a closure using a specific candidate.
+	///
+	/// Deadlocks if called recursively.
+	pub fn with_candidate<F, U>(&self, digest: &C::Digest, f: F) -> U
+		where F: FnOnce(Option<&C::ParachainCandidate>) -> U
+	{
+		let inner = self.inner.lock();
+		f(inner.table.get_candidate(digest))
+	}
+
+	/// Get all witnessed misbehavior.
+	pub fn get_misbehavior(&self) -> HashMap<C::AuthorityId, <C as TypeResolve>::Misbehavior> {
+		self.inner.lock().table.get_misbehavior().clone()
+	}
+
+	/// Fill a statement batch.
+	pub fn fill_batch(&self, batch: &mut C::StatementBatch) {
+		self.inner.lock().table.fill_batch(batch);
+	}
+
+	/// Get the local proposed candidate digest.
+	pub fn proposed_digest(&self) -> Option<C::Digest> {
+		self.inner.lock().proposed_digest.clone()
+	}
+
+	// Get a handle to the table context.
+	fn context(&self) -> &TableContext<C> {
+		&*self.context
+	}
+}
+
+/// Errors that can occur during agreement.
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum Error {
+	IoTerminated,
+	FaultyTimer,
+	CannotPropose,
+}
+
+impl From<bft::InputStreamConcluded> for Error {
+	fn from(_: bft::InputStreamConcluded) -> Error {
+		Error::IoTerminated
+	}
+}
+
+/// Context owned by the BFT future necessary to execute the logic.
+pub struct BftContext<C: Context> {
+	context: C,
+	table: SharedTable<C>,
+	timer: Timer,
+	round_timeout_multiplier: u64,
+}
+
+impl<C: Context> bft::Context for BftContext<C>
+	where C::Proposal: 'static,
+{
+	type AuthorityId = C::AuthorityId;
+	type Digest = C::Digest;
+	type Signature = C::Signature;
+	type Candidate = C::Proposal;
+	type RoundTimeout = Box<Future<Item=(),Error=Error>>;
+	type CreateProposal = Box<Future<Item=Self::Candidate,Error=Error>>;
+
+	fn local_id(&self) -> Self::AuthorityId {
+		self.context.local_id()
+	}
+
+	fn proposal(&self) -> Self::CreateProposal {
+		Box::new(self.table.get_proposal().map_err(|_| Error::CannotPropose))
+	}
+
+	fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest {
+		C::proposal_digest(candidate)
+	}
+
+	fn sign_local(&self, message: bft::Message<Self::Candidate, Self::Digest>)
+		-> bft::LocalizedMessage<Self::Candidate, Self::Digest, Self::AuthorityId, Self::Signature>
+	{
+		let sender = self.local_id();
+		let signature = self.context.sign_bft_message(&message);
+		bft::LocalizedMessage {
+			message,
+			sender,
+			signature,
+		}
+	}
+
+	fn round_proposer(&self, round: usize) -> Self::AuthorityId {
+		self.context.round_proposer(round)
+	}
+
+	fn candidate_valid(&self, proposal: &Self::Candidate) -> bool {
+		self.table.proposal_valid(proposal)
+	}
+
+	fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout {
+		let round = ::std::cmp::min(63, round) as u32;
+		let timeout = 1u64.checked_shl(round)
+			.unwrap_or_else(u64::max_value)
+			.saturating_mul(self.round_timeout_multiplier);
+
+		Box::new(self.timer.sleep(Duration::from_secs(timeout))
+			.map_err(|_| Error::FaultyTimer))
+	}
+}
+
+
+/// Parameters necessary for agreement.
+pub struct AgreementParams<C: Context> {
+	/// The context itself.
+	pub context: C,
+	/// For scheduling timeouts.
+	pub timer: Timer,
+	/// The statement table.
+	pub table: SharedTable<C>,
+	/// The number of nodes.
+	pub nodes: usize,
+	/// The maximum number of faulty nodes.
+	pub max_faulty: usize,
+	/// The round timeout multiplier: 2^round_number is multiplied by this.
+	pub round_timeout_multiplier: u64,
+	/// The maximum amount of messages to queue.
+	pub message_buffer_size: usize,
+	/// Interval to attempt forming proposals over.
+	pub form_proposal_interval: Duration,
+}
+
+/// Recovery for messages
+pub trait MessageRecovery<C: Context> {
+	/// The unchecked message type. This implies that work hasn't been done
+	/// to decode the payload and check and authenticate a signature.
+	type UncheckedMessage;
+
+	/// Attempt to transform a checked message into an unchecked.
+	fn check_message(&self, Self::UncheckedMessage) -> Option<CheckedMessage<C>>;
+}
+
+/// A batch of statements to send out.
+pub trait StatementBatch<V, T> {
+	/// Get the target authorities of these statements.
+	fn targets(&self) -> &[V];
+
+	/// If the batch is empty.
+	fn is_empty(&self) -> bool;
+
+	/// Push a statement onto the batch. Returns false when the batch is full.
+	///
+	/// This is meant to do work like incrementally serializing the statements
+	/// into a vector of bytes while making sure the length is below a certain
+	/// amount.
+	fn push(&mut self, statement: T) -> bool;
+}
+
+/// Recovered and fully checked messages.
+pub enum CheckedMessage<C: Context> {
+	/// Messages meant for the BFT agreement logic.
+	Bft(<C as TypeResolve>::BftCommunication),
+	/// Statements circulating about the table.
+	Table(Vec<<C as TypeResolve>::SignedTableStatement>),
+}
+
+/// Outgoing messages to the network.
+#[derive(Debug, Clone)]
+pub enum OutgoingMessage<C: Context> {
+	/// Messages meant for BFT agreement peers.
+	Bft(<C as TypeResolve>::BftCommunication),
+	/// Batches of table statements.
+	Table(C::StatementBatch),
+}
+
+/// Create an agreement future, and I/O streams.
+// TODO: kill 'static bounds and use impl Future.
+pub fn agree<
+	Context,
+	NetIn,
+	NetOut,
+	Recovery,
+	PropagateStatements,
+	LocalCandidate,
+	Err,
+>(
+	params: AgreementParams<Context>,
+	net_in: NetIn,
+	net_out: NetOut,
+	recovery: Recovery,
+	propagate_statements: PropagateStatements,
+	local_candidate: LocalCandidate,
+)
+	-> Box<Future<Item=<Context as TypeResolve>::BftCommitted,Error=Error>>
+	where
+		Context: ::Context + 'static,
+		Context::CheckCandidate: IntoFuture<Error=Err>,
+		Context::CheckAvailability: IntoFuture<Error=Err>,
+		NetIn: Stream<Item=(Context::AuthorityId, Vec<Recovery::UncheckedMessage>),Error=Err> + 'static,
+		NetOut: Sink<SinkItem=OutgoingMessage<Context>> + 'static,
+		Recovery: MessageRecovery<Context> + 'static,
+		PropagateStatements: Stream<Item=Context::StatementBatch,Error=Err> + 'static,
+		LocalCandidate: IntoFuture<Item=Context::ParachainCandidate> + 'static
+{
+	let (bft_in_in, bft_in_out) = mpsc::unbounded();
+	let (bft_out_in, bft_out_out) = mpsc::unbounded();
+
+	let agreement = {
+		let bft_context = BftContext {
+			context: params.context,
+			table: params.table.clone(),
+			timer: params.timer.clone(),
+			round_timeout_multiplier: params.round_timeout_multiplier,
+		};
+
+		bft::agree(
+			bft_context,
+			params.nodes,
+			params.max_faulty,
+			bft_in_out.map(bft::ContextCommunication).map_err(|_| Error::IoTerminated),
+			bft_out_in.sink_map_err(|_| Error::IoTerminated),
+		)
+	};
+
+	let route_messages_in = {
+		let round_robin = round_robin::RoundRobinBuffer::new(net_in, params.message_buffer_size);
+
+		let round_robin_recovered = round_robin
+			.filter_map(move |(sender, msg)| recovery.check_message(msg).map(move |x| (sender, x)));
+
+		handle_incoming::HandleIncoming::new(
+			params.table.clone(),
+			round_robin_recovered,
+			bft_in_in,
+		).map_err(|_| Error::IoTerminated)
+	};
+
+	let route_messages_out = {
+		let table = params.table.clone();
+		let periodic_table_statements = propagate_statements
+			.or_else(|_| ::futures::future::empty()) // halt the stream instead of error.
+			.map(move |mut batch| { table.fill_batch(&mut batch); batch })
+			.filter(|b| !b.is_empty())
+			.map(OutgoingMessage::Table);
+
+		let complete_out_stream = bft_out_out
+			.map_err(|_| Error::IoTerminated)
+			.map(|bft::ContextCommunication(x)| x)
+			.map(OutgoingMessage::Bft)
+			.select(periodic_table_statements);
+
+		net_out.sink_map_err(|_| Error::IoTerminated).send_all(complete_out_stream)
+	};
+
+	let import_local_candidate = {
+		let table = params.table.clone();
+		local_candidate
+			.into_future()
+			.map(table::Statement::Candidate)
+			.map(Some)
+			.or_else(|_| Ok(None))
+			.map(move |s| if let Some(s) = s {
+				table.sign_and_import(s);
+			})
+	};
+
+	let create_proposal_on_interval = {
+		let table = params.table;
+		params.timer.interval(params.form_proposal_interval)
+			.map_err(|_| Error::FaultyTimer)
+			.for_each(move |_| { table.update_proposal(); Ok(()) })
+	};
+
+	// if these auxiliary futures terminate before the agreement, then
+	// that is an error.
+	let auxiliary_futures = route_messages_in.join4(
+		create_proposal_on_interval,
+		route_messages_out,
+		import_local_candidate,
+	).and_then(|_| Err(Error::IoTerminated));
+
+	let future = agreement
+		.select(auxiliary_futures)
+		.map(|(committed, _)| committed)
+		.map_err(|(e, _)| e);
+
+	Box::new(future)
+}
diff --git a/substrate/candidate-agreement/src/round_robin.rs b/substrate/candidate-agreement/src/round_robin.rs
new file mode 100644
index 0000000000000000000000000000000000000000..3f98507cab89d2a38a8a1114bd2efe9c21191840
--- /dev/null
+++ b/substrate/candidate-agreement/src/round_robin.rs
@@ -0,0 +1,164 @@
+// Copyright 2017 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Round-robin buffer for incoming messages.
+//!
+//! This takes batches of messages associated with a sender as input,
+//! and yields messages in a fair order by sender.
+
+use std::collections::{Bound, BTreeMap, VecDeque};
+
+use futures::prelude::*;
+use futures::stream::Fuse;
+
+/// Implementation of the round-robin buffer for incoming messages.
+#[derive(Debug)]
+pub struct RoundRobinBuffer<V: Ord + Eq, S, M> {
+	buffer: BTreeMap<V, VecDeque<M>>,
+	last_processed_from: Option<V>,
+	stored_messages: usize,
+	max_messages: usize,
+	inner: Fuse<S>,
+}
+
+impl<V: Ord + Eq + Clone, S: Stream, M> RoundRobinBuffer<V, S, M> {
+	/// Create a new round-robin buffer which holds up to a maximum
+	/// amount of messages.
+	pub fn new(stream: S, buffer_size: usize) -> Self {
+		RoundRobinBuffer {
+			buffer: BTreeMap::new(),
+			last_processed_from: None,
+			stored_messages: 0,
+			max_messages: buffer_size,
+			inner: stream.fuse(),
+		}
+	}
+}
+
+impl<V: Ord + Eq + Clone, S, M> RoundRobinBuffer<V, S, M> {
+	fn next_message(&mut self) -> Option<(V, M)> {
+		if self.stored_messages == 0 {
+			return None
+		}
+
+		// first pick up from the last authority we processed a message from
+		let mut next = {
+			let lower_bound = match self.last_processed_from {
+				None => Bound::Unbounded,
+				Some(ref x) => Bound::Excluded(x.clone()),
+			};
+
+			self.buffer.range_mut((lower_bound, Bound::Unbounded))
+				.filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v)))
+				.next()
+		};
+
+		// but wrap around to the beginning again if we got nothing.
+		if next.is_none() {
+			next = self.buffer.iter_mut()
+				.filter_map(|(k, v)| v.pop_front().map(|v| (k.clone(), v)))
+				.next();
+		}
+
+		if let Some((ref authority, _)) = next {
+			self.stored_messages -= 1;
+			self.last_processed_from = Some(authority.clone());
+		}
+
+		next
+	}
+
+	// import messages, discarding when the buffer is full.
+	fn import_messages(&mut self, sender: V, messages: Vec<M>) {
+		let space_remaining = self.max_messages - self.stored_messages;
+		self.stored_messages += ::std::cmp::min(space_remaining, messages.len());
+
+		let v = self.buffer.entry(sender).or_insert_with(VecDeque::new);
+		v.extend(messages.into_iter().take(space_remaining));
+	}
+}
+
+impl<V: Ord + Eq + Clone, S, M> Stream for RoundRobinBuffer<V, S, M>
+	where S: Stream<Item=(V, Vec<M>)>
+{
+	type Item = (V, M);
+	type Error = S::Error;
+
+	fn poll(&mut self) -> Poll<Option<Self::Item>, S::Error> {
+		loop {
+			match self.inner.poll()? {
+				Async::NotReady | Async::Ready(None) => break,
+				Async::Ready(Some((sender, msgs))) => self.import_messages(sender, msgs),
+			}
+		}
+
+		let done = self.inner.is_done();
+		Ok(match self.next_message() {
+			Some(msg) => Async::Ready(Some(msg)),
+			None => if done { Async::Ready(None) } else { Async::NotReady },
+		})
+	}
+}
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use futures::stream::{self, Stream};
+
+	#[derive(Debug, PartialEq, Eq)]
+	struct UncheckedMessage { data: Vec<u8> }
+
+	#[test]
+	fn is_fair_and_wraps_around() {
+		let stream = stream::iter_ok(vec![
+			(1, vec![
+				UncheckedMessage { data: vec![1, 3, 5] },
+				UncheckedMessage { data: vec![3, 5, 7] },
+				UncheckedMessage { data: vec![5, 7, 9] },
+			]),
+			(2, vec![
+				UncheckedMessage { data: vec![2, 4, 6] },
+				UncheckedMessage { data: vec![4, 6, 8] },
+				UncheckedMessage { data: vec![6, 8, 10] },
+			]),
+		]);
+
+		let round_robin = RoundRobinBuffer::new(stream, 100);
+		let output = round_robin.wait().collect::<Result<Vec<_>, ()>>().unwrap();
+
+		assert_eq!(output, vec![
+			(1, UncheckedMessage { data: vec![1, 3, 5] }),
+			(2, UncheckedMessage { data: vec![2, 4, 6] }),
+			(1, UncheckedMessage { data: vec![3, 5, 7] }),
+
+			(2, UncheckedMessage { data: vec![4, 6, 8] }),
+			(1, UncheckedMessage { data: vec![5, 7, 9] }),
+			(2, UncheckedMessage { data: vec![6, 8, 10] }),
+		]);
+	}
+
+	#[test]
+	fn discards_when_full() {
+		let stream = stream::iter_ok(vec![
+			(1, (0..200).map(|i| UncheckedMessage { data: vec![i] }).collect())
+		]);
+
+		let round_robin = RoundRobinBuffer::new(stream, 100);
+		let output = round_robin.wait().collect::<Result<Vec<_>, ()>>().unwrap();
+
+		assert_eq!(output.len(), 100);
+	}
+}
diff --git a/substrate/candidate-agreement/src/table.rs b/substrate/candidate-agreement/src/table.rs
index 381244e58b6d6a51c7d504d789eb95f0616a3664..2909d219c6fbbb1f6b35ca148e99318e2c51ce8e 100644
--- a/substrate/candidate-agreement/src/table.rs
+++ b/substrate/candidate-agreement/src/table.rs
@@ -16,14 +16,14 @@
 
 //! The statement table.
 //!
-//! This stores messages other validators issue about candidates.
+//! This stores messages other authorities issue about candidates.
 //!
 //! These messages are used to create a proposal submitted to a BFT consensus process.
 //!
 //! Proposals are formed of sets of candidates which have the requisite number of
 //! validity and availability votes.
 //!
-//! Each parachain is associated with two sets of validators: those which can
+//! Each parachain is associated with two sets of authorities: those which can
 //! propose and attest to validity of candidates, and those who can only attest
 //! to availability.
 
@@ -32,35 +32,37 @@ use std::collections::hash_map::{HashMap, Entry};
 use std::hash::Hash;
 use std::fmt::Debug;
 
+use super::StatementBatch;
+
 /// Context for the statement table.
 pub trait Context {
-	/// A validator ID
-	type ValidatorId: Hash + Eq + Clone + Debug;
+	/// A authority ID
+	type AuthorityId: Debug + Hash + Eq + Clone;
 	/// The digest (hash or other unique attribute) of a candidate.
-	type Digest: Hash + Eq + Clone + Debug;
-	/// Candidate type.
-	type Candidate: Ord + Eq + Clone + Debug;
+	type Digest: Debug + Hash + Eq + Clone;
 	/// The group ID type
-	type GroupId: Hash + Ord + Eq + Clone + Debug;
+	type GroupId: Debug + Hash + Ord + Eq + Clone;
 	/// A signature type.
-	type Signature: Eq + Clone +  Debug;
+	type Signature: Debug + Eq + Clone;
+	/// Candidate type. In practice this will be a candidate receipt.
+	type Candidate: Debug + Ord + Eq + Clone;
 
 	/// get the digest of a candidate.
-	fn candidate_digest(&self, candidate: &Self::Candidate) -> Self::Digest;
+	fn candidate_digest(candidate: &Self::Candidate) -> Self::Digest;
 
 	/// get the group of a candidate.
-	fn candidate_group(&self, candidate: &Self::Candidate) -> Self::GroupId;
+	fn candidate_group(candidate: &Self::Candidate) -> Self::GroupId;
 
-	/// Whether a validator is a member of a group.
+	/// Whether a authority is a member of a group.
 	/// Members are meant to submit candidates and vote on validity.
-	fn is_member_of(&self, validator: &Self::ValidatorId, group: &Self::GroupId) -> bool;
+	fn is_member_of(&self, authority: &Self::AuthorityId, group: &Self::GroupId) -> bool;
 
-	/// Whether a validator is an availability guarantor of a group.
+	/// Whether a authority is an availability guarantor of a group.
 	/// Guarantors are meant to vote on availability for candidates submitted
 	/// in a group.
 	fn is_availability_guarantor_of(
 		&self,
-		validator: &Self::ValidatorId,
+		authority: &Self::AuthorityId,
 		group: &Self::GroupId,
 	) -> bool;
 
@@ -69,26 +71,26 @@ pub trait Context {
 }
 
 /// Statements circulated among peers.
-#[derive(PartialEq, Eq, Debug)]
+#[derive(PartialEq, Eq, Debug, Clone)]
 pub enum Statement<C, D> {
-	/// Broadcast by a validator to indicate that this is his candidate for
+	/// Broadcast by a authority to indicate that this is his candidate for
 	/// inclusion.
 	///
 	/// Broadcasting two different candidate messages per round is not allowed.
 	Candidate(C),
-	/// Broadcast by a validator to attest that the candidate with given digest
+	/// Broadcast by a authority to attest that the candidate with given digest
 	/// is valid.
 	Valid(D),
-	/// Broadcast by a validator to attest that the auxiliary data for a candidate
+	/// Broadcast by a authority to attest that the auxiliary data for a candidate
 	/// with given digest is available.
 	Available(D),
-	/// Broadcast by a validator to attest that the candidate with given digest
+	/// Broadcast by a authority to attest that the candidate with given digest
 	/// is invalid.
 	Invalid(D),
 }
 
 /// A signed statement.
-#[derive(PartialEq, Eq, Debug)]
+#[derive(PartialEq, Eq, Debug, Clone)]
 pub struct SignedStatement<C, D, V, S> {
 	/// The statement.
 	pub statement: Statement<C, D>,
@@ -98,23 +100,23 @@ pub struct SignedStatement<C, D, V, S> {
 	pub sender: V,
 }
 
-// A unique trace for a class of valid statements issued by a validator.
+// A unique trace for a class of valid statements issued by a authority.
 //
-// We keep track of which statements we have received or sent to other validators
+// We keep track of which statements we have received or sent to other authorities
 // in order to prevent relaying the same data multiple times.
 //
-// The signature of the statement is replaced by the validator because the validator
+// The signature of the statement is replaced by the authority because the authority
 // is unique while signatures are not (at least under common schemes like
 // Schnorr or ECDSA).
 #[derive(Hash, PartialEq, Eq, Clone)]
 enum StatementTrace<V, D> {
-	/// The candidate proposed by the validator.
+	/// The candidate proposed by the authority.
 	Candidate(V),
-	/// A validity statement from that validator about the given digest.
+	/// A validity statement from that authority about the given digest.
 	Valid(V, D),
-	/// An invalidity statement from that validator about the given digest.
+	/// An invalidity statement from that authority about the given digest.
 	Invalid(V, D),
-	/// An availability statement from that validator about the given digest.
+	/// An availability statement from that authority about the given digest.
 	Available(V, D),
 }
 
@@ -122,7 +124,7 @@ enum StatementTrace<V, D> {
 ///
 /// Since there are three possible ways to vote, a double vote is possible in
 /// three possible combinations (unordered)
-#[derive(PartialEq, Eq, Debug)]
+#[derive(PartialEq, Eq, Debug, Clone)]
 pub enum ValidityDoubleVote<C, D, S> {
 	/// Implicit vote by issuing and explicity voting validity.
 	IssuedAndValidity((C, S), (D, S)),
@@ -133,7 +135,7 @@ pub enum ValidityDoubleVote<C, D, S> {
 }
 
 /// Misbehavior: declaring multiple candidates.
-#[derive(PartialEq, Eq, Debug)]
+#[derive(PartialEq, Eq, Debug, Clone)]
 pub struct MultipleCandidates<C, S> {
 	/// The first candidate seen.
 	pub first: (C, S),
@@ -142,7 +144,7 @@ pub struct MultipleCandidates<C, S> {
 }
 
 /// Misbehavior: submitted statement for wrong group.
-#[derive(PartialEq, Eq, Debug)]
+#[derive(PartialEq, Eq, Debug, Clone)]
 pub struct UnauthorizedStatement<C, D, V, S> {
 	/// A signed statement which was submitted without proper authority.
 	pub statement: SignedStatement<C, D, V, S>,
@@ -150,7 +152,7 @@ pub struct UnauthorizedStatement<C, D, V, S> {
 
 /// Different kinds of misbehavior. All of these kinds of malicious misbehavior
 /// are easily provable and extremely disincentivized.
-#[derive(PartialEq, Eq, Debug)]
+#[derive(PartialEq, Eq, Debug, Clone)]
 pub enum Misbehavior<C, D, V, S> {
 	/// Voted invalid and valid on validity.
 	ValidityDoubleVote(ValidityDoubleVote<C, D, S>),
@@ -168,7 +170,7 @@ pub trait ResolveMisbehavior {
 }
 
 impl<C: Context + ?Sized> ResolveMisbehavior for C {
-	type Misbehavior = Misbehavior<C::Candidate, C::Digest, C::ValidatorId, C::Signature>;
+	type Misbehavior = Misbehavior<C::Candidate, C::Digest, C::AuthorityId, C::Signature>;
 }
 
 // kinds of votes for validity
@@ -201,9 +203,9 @@ pub struct Summary<D, G> {
 pub struct CandidateData<C: Context> {
 	group_id: C::GroupId,
 	candidate: C::Candidate,
-	validity_votes: HashMap<C::ValidatorId, ValidityVote<C::Signature>>,
-	availability_votes: HashMap<C::ValidatorId, C::Signature>,
-	indicated_bad_by: Vec<C::ValidatorId>,
+	validity_votes: HashMap<C::AuthorityId, ValidityVote<C::Signature>>,
+	availability_votes: HashMap<C::AuthorityId, C::Signature>,
+	indicated_bad_by: Vec<C::AuthorityId>,
 }
 
 impl<C: Context> CandidateData<C> {
@@ -212,20 +214,9 @@ impl<C: Context> CandidateData<C> {
 		!self.indicated_bad_by.is_empty()
 	}
 
-	/// Get an iterator over those who have indicated this candidate valid.
-	// TODO: impl trait
-	pub fn voted_valid_by<'a>(&'a self) -> Box<Iterator<Item=C::ValidatorId> + 'a> {
-		Box::new(self.validity_votes.iter().filter_map(|(v, vote)| {
-			match *vote {
-				ValidityVote::Issued(_) | ValidityVote::Valid(_) => Some(v.clone()),
-				ValidityVote::Invalid(_) => None,
-			}
-		}))
-	}
-
 	// Candidate data can be included in a proposal
 	// if it has enough validity and availability votes
-	// and no validators have called it bad.
+	// and no authorities have called it bad.
 	fn can_be_included(&self, validity_threshold: usize, availability_threshold: usize) -> bool {
 		self.indicated_bad_by.is_empty()
 			&& self.validity_votes.len() >= validity_threshold
@@ -243,35 +234,46 @@ impl<C: Context> CandidateData<C> {
 	}
 }
 
-// validator metadata
-struct ValidatorData<C: Context> {
+// authority metadata
+struct AuthorityData<C: Context> {
 	proposal: Option<(C::Digest, C::Signature)>,
-	known_statements: HashSet<StatementTrace<C::ValidatorId, C::Digest>>,
+	known_statements: HashSet<StatementTrace<C::AuthorityId, C::Digest>>,
 }
 
-/// Create a new, empty statement table.
-pub fn create<C: Context>() -> Table<C> {
-	Table {
-		validator_data: HashMap::default(),
-		detected_misbehavior: HashMap::default(),
-		candidate_votes: HashMap::default(),
+impl<C: Context> Default for AuthorityData<C> {
+	fn default() -> Self {
+		AuthorityData {
+			proposal: None,
+			known_statements: HashSet::default(),
+		}
 	}
 }
 
 /// Stores votes
-#[derive(Default)]
 pub struct Table<C: Context> {
-	validator_data: HashMap<C::ValidatorId, ValidatorData<C>>,
-	detected_misbehavior: HashMap<C::ValidatorId, <C as ResolveMisbehavior>::Misbehavior>,
+	authority_data: HashMap<C::AuthorityId, AuthorityData<C>>,
+	detected_misbehavior: HashMap<C::AuthorityId, <C as ResolveMisbehavior>::Misbehavior>,
 	candidate_votes: HashMap<C::Digest, CandidateData<C>>,
 }
 
+impl<C: Context> Default for Table<C> {
+	fn default() -> Self {
+		Table {
+			authority_data: HashMap::new(),
+			detected_misbehavior: HashMap::new(),
+			candidate_votes: HashMap::new(),
+		}
+	}
+}
+
 impl<C: Context> Table<C> {
 	/// Produce a set of proposed candidates.
 	///
 	/// This will be at most one per group, consisting of the
 	/// best candidate for each group with requisite votes for inclusion.
-	pub fn proposed_candidates(&self, context: &C) -> Vec<C::Candidate> {
+	///
+	/// The vector is sorted in ascending order by group id.
+	pub fn proposed_candidates<'a>(&'a self, context: &C) -> Vec<&'a C::Candidate> {
 		use std::collections::BTreeMap;
 		use std::collections::btree_map::Entry as BTreeEntry;
 
@@ -285,7 +287,7 @@ impl<C: Context> Table<C> {
 			match best_candidates.entry(group_id.clone()) {
 				BTreeEntry::Occupied(mut occ) => {
 					let candidate_ref = occ.get_mut();
-					if *candidate_ref < candidate {
+					if *candidate_ref > candidate {
 						*candidate_ref = candidate;
 					}
 				}
@@ -293,32 +295,27 @@ impl<C: Context> Table<C> {
 			}
 		}
 
-		best_candidates.values().map(|v| C::Candidate::clone(v)).collect::<Vec<_>>()
+		best_candidates.values().cloned().collect::<Vec<_>>()
 	}
 
-	/// Get an iterator of all candidates with a given group.
-	// TODO: impl iterator
-	pub fn candidates_in_group<'a>(&'a self, group_id: C::GroupId)
-		-> Box<Iterator<Item=&'a CandidateData<C>> + 'a>
-	{
-		Box::new(self.candidate_votes.values().filter(move |c| c.group_id == group_id))
-	}
-
-	/// Drain all misbehavior observed up to this point.
-	pub fn drain_misbehavior(&mut self) -> HashMap<C::ValidatorId, <C as ResolveMisbehavior>::Misbehavior> {
-		::std::mem::replace(&mut self.detected_misbehavior, HashMap::new())
+	/// Whether a candidate can be included.
+	pub fn candidate_includable(&self, digest: &C::Digest, context: &C) -> bool {
+		self.candidate_votes.get(digest).map_or(false, |data| {
+			let (v_threshold, a_threshold) = context.requisite_votes(&data.group_id);
+			data.can_be_included(v_threshold, a_threshold)
+		})
 	}
 
 	/// Import a signed statement. Signatures should be checked for validity, and the
-	/// sender should be checked to actually be a validator.
+	/// sender should be checked to actually be a authority.
 	///
 	/// This can note the origin of the statement to indicate that he has
 	/// seen it already.
 	pub fn import_statement(
 		&mut self,
 		context: &C,
-		statement: SignedStatement<C::Candidate, C::Digest, C::ValidatorId, C::Signature>,
-		from: Option<C::ValidatorId>
+		statement: SignedStatement<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
+		from: Option<C::AuthorityId>
 	) -> Option<Summary<C::Digest, C::GroupId>> {
 		let SignedStatement { statement, signature, sender: signer } = statement;
 
@@ -371,8 +368,143 @@ impl<C: Context> Table<C> {
 		maybe_summary
 	}
 
-	fn note_trace_seen(&mut self, trace: StatementTrace<C::ValidatorId, C::Digest>, known_by: C::ValidatorId) {
-		self.validator_data.entry(known_by).or_insert_with(|| ValidatorData {
+	/// Get a candidate by digest.
+	pub fn get_candidate(&self, digest: &C::Digest) -> Option<&C::Candidate> {
+		self.candidate_votes.get(digest).map(|d| &d.candidate)
+	}
+
+	/// Access all witnessed misbehavior.
+	pub fn get_misbehavior(&self)
+		-> &HashMap<C::AuthorityId, <C as ResolveMisbehavior>::Misbehavior>
+	{
+		&self.detected_misbehavior
+	}
+
+	/// Fill a statement batch and note messages seen by the targets.
+	pub fn fill_batch<B>(&mut self, batch: &mut B)
+		where B: StatementBatch<
+			C::AuthorityId,
+			SignedStatement<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
+		>
+	{
+		// naively iterate all statements so far, taking any that
+		// at least one of the targets has not seen.
+
+		// workaround for the fact that it's inconvenient to borrow multiple
+		// entries out of a hashmap mutably -- we just move them out and
+		// replace them when we're done.
+		struct SwappedTargetData<'a, C: 'a + Context> {
+			authority_data: &'a mut HashMap<C::AuthorityId, AuthorityData<C>>,
+			target_data: Vec<(C::AuthorityId, AuthorityData<C>)>,
+		}
+
+		impl<'a, C: 'a + Context> Drop for SwappedTargetData<'a, C> {
+			fn drop(&mut self) {
+				for (id, data) in self.target_data.drain(..) {
+					self.authority_data.insert(id, data);
+				}
+			}
+		}
+
+		// pre-fetch authority data for all the targets.
+		let mut target_data = {
+			let authority_data = &mut self.authority_data;
+			let mut target_data = Vec::with_capacity(batch.targets().len());
+			for target in batch.targets() {
+				let active_data = match authority_data.get_mut(target) {
+					None => Default::default(),
+					Some(x) => ::std::mem::replace(x, Default::default()),
+				};
+
+				target_data.push((target.clone(), active_data));
+			}
+
+			SwappedTargetData {
+				authority_data,
+				target_data
+			}
+		};
+
+		let target_data = &mut target_data.target_data;
+
+		macro_rules! attempt_send {
+			($trace:expr, sender=$sender:expr, sig=$sig:expr, statement=$statement:expr) => {{
+				let trace = $trace;
+				let can_send = target_data.iter()
+					.any(|t| !t.1.known_statements.contains(&trace));
+
+				if can_send {
+					let statement = SignedStatement {
+						statement: $statement,
+						signature: $sig,
+						sender: $sender,
+					};
+
+					if batch.push(statement) {
+						for target in target_data.iter_mut() {
+							target.1.known_statements.insert(trace.clone());
+						}
+					} else {
+						return;
+					}
+				}
+			}}
+		}
+
+		// reconstruct statements for anything whose trace passes the filter.
+		for (digest, candidate) in self.candidate_votes.iter() {
+			let issuance_iter = candidate.validity_votes.iter()
+				.filter(|&(_, x)| if let ValidityVote::Issued(_) = *x { true } else { false });
+
+			let validity_iter = candidate.validity_votes.iter()
+				.filter(|&(_, x)| if let ValidityVote::Issued(_) = *x { false } else { true });
+
+			// send issuance statements before votes.
+			for (sender, vote) in issuance_iter.chain(validity_iter) {
+				match *vote {
+					ValidityVote::Issued(ref sig) => {
+						attempt_send!(
+							StatementTrace::Candidate(sender.clone()),
+							sender = sender.clone(),
+							sig = sig.clone(),
+							statement = Statement::Candidate(candidate.candidate.clone())
+						)
+					}
+					ValidityVote::Valid(ref sig) => {
+						attempt_send!(
+							StatementTrace::Valid(sender.clone(), digest.clone()),
+							sender = sender.clone(),
+							sig = sig.clone(),
+							statement = Statement::Valid(digest.clone())
+						)
+					}
+					ValidityVote::Invalid(ref sig) => {
+						attempt_send!(
+							StatementTrace::Invalid(sender.clone(), digest.clone()),
+							sender = sender.clone(),
+							sig = sig.clone(),
+							statement = Statement::Invalid(digest.clone())
+						)
+					}
+				}
+			};
+
+
+			// and lastly send availability.
+			for (sender, sig) in candidate.availability_votes.iter() {
+				attempt_send!(
+					StatementTrace::Available(sender.clone(), digest.clone()),
+					sender = sender.clone(),
+					sig = sig.clone(),
+					statement = Statement::Available(digest.clone())
+				)
+			}
+		}
+
+	}
+
+	fn note_trace_seen(&mut self, trace: StatementTrace<C::AuthorityId, C::Digest>, known_by: C::AuthorityId) {
+		self.authority_data.entry(known_by).or_insert_with(|| AuthorityData {
 			proposal: None,
 			known_statements: HashSet::default(),
 		}).known_statements.insert(trace);
@@ -381,11 +513,11 @@ impl<C: Context> Table<C> {
 	fn import_candidate(
 		&mut self,
 		context: &C,
-		from: C::ValidatorId,
+		from: C::AuthorityId,
 		candidate: C::Candidate,
 		signature: C::Signature,
 	) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
-		let group = context.candidate_group(&candidate);
+		let group = C::candidate_group(&candidate);
 		if !context.is_member_of(&from, &group) {
 			return (
 				Some(Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
@@ -399,10 +531,10 @@ impl<C: Context> Table<C> {
 			);
 		}
 
-		// check that validator hasn't already specified another candidate.
-		let digest = context.candidate_digest(&candidate);
+		// check that authority hasn't already specified another candidate.
+		let digest = C::candidate_digest(&candidate);
 
-		let new_proposal = match self.validator_data.entry(from.clone()) {
+		let new_proposal = match self.authority_data.entry(from.clone()) {
 			Entry::Occupied(mut occ) => {
 				// if digest is different, fetch candidate and
 				// note misbehavior.
@@ -411,7 +543,7 @@ impl<C: Context> Table<C> {
 				if let Some((ref old_digest, ref old_sig)) = existing.proposal {
 					if old_digest != &digest {
 						const EXISTENCE_PROOF: &str =
-							"when proposal first received from validator, candidate \
+							"when proposal first received from authority, candidate \
 							votes entry is created. proposal here is `Some`, therefore \
 							candidate votes entry exists; qed";
 
@@ -436,7 +568,7 @@ impl<C: Context> Table<C> {
 				}
 			}
 			Entry::Vacant(vacant) => {
-				vacant.insert(ValidatorData {
+				vacant.insert(AuthorityData {
 					proposal: Some((digest.clone(), signature.clone())),
 					known_statements: HashSet::new(),
 				});
@@ -467,7 +599,7 @@ impl<C: Context> Table<C> {
 	fn validity_vote(
 		&mut self,
 		context: &C,
-		from: C::ValidatorId,
+		from: C::AuthorityId,
 		digest: C::Digest,
 		vote: ValidityVote<C::Signature>,
 	) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
@@ -476,7 +608,7 @@ impl<C: Context> Table<C> {
 			Some(votes) => votes,
 		};
 
-		// check that this validator actually can vote in this group.
+		// check that this authority actually can vote in this group.
 		if !context.is_member_of(&from, &votes.group_id) {
 			let (sig, valid) = match vote {
 				ValidityVote::Valid(s) => (s, true),
@@ -546,7 +678,7 @@ impl<C: Context> Table<C> {
 	fn availability_vote(
 		&mut self,
 		context: &C,
-		from: C::ValidatorId,
+		from: C::AuthorityId,
 		digest: C::Digest,
 		signature: C::Signature,
 	) -> (Option<<C as ResolveMisbehavior>::Misbehavior>, Option<Summary<C::Digest, C::GroupId>>) {
@@ -555,7 +687,7 @@ impl<C: Context> Table<C> {
 			Some(votes) => votes,
 		};
 
-		// check that this validator actually can vote in this group.
+		// check that this authority actually can vote in this group.
 		if !context.is_availability_guarantor_of(&from, &votes.group_id) {
 			return (
 				Some(Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
@@ -577,10 +709,19 @@ impl<C: Context> Table<C> {
 #[cfg(test)]
 mod tests {
 	use super::*;
+	use ::tests::VecBatch;
 	use std::collections::HashMap;
 
+	fn create<C: Context>() -> Table<C> {
+		Table {
+			authority_data: HashMap::default(),
+			detected_misbehavior: HashMap::default(),
+			candidate_votes: HashMap::default(),
+		}
+	}
+
 	#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)]
-	struct ValidatorId(usize);
+	struct AuthorityId(usize);
 
 	#[derive(Debug, Copy, Clone, Hash, PartialOrd, Ord, PartialEq, Eq)]
 	struct GroupId(usize);
@@ -598,38 +739,38 @@ mod tests {
 	#[derive(Debug, PartialEq, Eq)]
 	struct TestContext {
 		// v -> (validity, availability)
-		validators: HashMap<ValidatorId, (GroupId, GroupId)>
+		authorities: HashMap<AuthorityId, (GroupId, GroupId)>
 	}
 
 	impl Context for TestContext {
-		type ValidatorId = ValidatorId;
+		type AuthorityId = AuthorityId;
 		type Digest = Digest;
 		type Candidate = Candidate;
 		type GroupId = GroupId;
 		type Signature = Signature;
 
-		fn candidate_digest(&self, candidate: &Candidate) -> Digest {
+		fn candidate_digest(candidate: &Candidate) -> Digest {
 			Digest(candidate.1)
 		}
 
-		fn candidate_group(&self, candidate: &Candidate) -> GroupId {
+		fn candidate_group(candidate: &Candidate) -> GroupId {
 			GroupId(candidate.0)
 		}
 
 		fn is_member_of(
 			&self,
-			validator: &ValidatorId,
+			authority: &AuthorityId,
 			group: &GroupId
 		) -> bool {
-			self.validators.get(validator).map(|v| &v.0 == group).unwrap_or(false)
+			self.authorities.get(authority).map(|v| &v.0 == group).unwrap_or(false)
 		}
 
 		fn is_availability_guarantor_of(
 			&self,
-			validator: &ValidatorId,
+			authority: &AuthorityId,
 			group: &GroupId
 		) -> bool {
-			self.validators.get(validator).map(|v| &v.1 == group).unwrap_or(false)
+			self.authorities.get(authority).map(|v| &v.1 == group).unwrap_or(false)
 		}
 
 		fn requisite_votes(&self, _id: &GroupId) -> (usize, usize) {
@@ -640,9 +781,9 @@ mod tests {
 	#[test]
 	fn submitting_two_candidates_is_misbehavior() {
 		let context = TestContext {
-			validators: {
+			authorities: {
 				let mut map = HashMap::new();
-				map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
+				map.insert(AuthorityId(1), (GroupId(2), GroupId(455)));
 				map
 			}
 		};
@@ -651,21 +792,21 @@ mod tests {
 		let statement_a = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 100)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 
 		let statement_b = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 999)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 
 		table.import_statement(&context, statement_a, None);
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
 
 		table.import_statement(&context, statement_b, None);
 		assert_eq!(
-			table.detected_misbehavior.get(&ValidatorId(1)).unwrap(),
+			table.detected_misbehavior.get(&AuthorityId(1)).unwrap(),
 			&Misbehavior::MultipleCandidates(MultipleCandidates {
 				first: (Candidate(2, 100), Signature(1)),
 				second: (Candidate(2, 999), Signature(1)),
@@ -676,9 +817,9 @@ mod tests {
 	#[test]
 	fn submitting_candidate_from_wrong_group_is_misbehavior() {
 		let context = TestContext {
-			validators: {
+			authorities: {
 				let mut map = HashMap::new();
-				map.insert(ValidatorId(1), (GroupId(3), GroupId(455)));
+				map.insert(AuthorityId(1), (GroupId(3), GroupId(455)));
 				map
 			}
 		};
@@ -687,18 +828,18 @@ mod tests {
 		let statement = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 100)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 
 		table.import_statement(&context, statement, None);
 
 		assert_eq!(
-			table.detected_misbehavior.get(&ValidatorId(1)).unwrap(),
+			table.detected_misbehavior.get(&AuthorityId(1)).unwrap(),
 			&Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
 				statement: SignedStatement {
 					statement: Statement::Candidate(Candidate(2, 100)),
 					signature: Signature(1),
-					sender: ValidatorId(1),
+					sender: AuthorityId(1),
 				},
 			})
 		);
@@ -707,10 +848,10 @@ mod tests {
 	#[test]
 	fn unauthorized_votes() {
 		let context = TestContext {
-			validators: {
+			authorities: {
 				let mut map = HashMap::new();
-				map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
-				map.insert(ValidatorId(2), (GroupId(3), GroupId(222)));
+				map.insert(AuthorityId(1), (GroupId(2), GroupId(455)));
+				map.insert(AuthorityId(2), (GroupId(3), GroupId(222)));
 				map
 			}
 		};
@@ -720,56 +861,56 @@ mod tests {
 		let candidate_a = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 100)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 		let candidate_a_digest = Digest(100);
 
 		let candidate_b = SignedStatement {
 			statement: Statement::Candidate(Candidate(3, 987)),
 			signature: Signature(2),
-			sender: ValidatorId(2),
+			sender: AuthorityId(2),
 		};
 		let candidate_b_digest = Digest(987);
 
 		table.import_statement(&context, candidate_a, None);
 		table.import_statement(&context, candidate_b, None);
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2)));
 
-		// validator 1 votes for availability on 2's candidate.
+		// authority 1 votes for availability on 2's candidate.
 		let bad_availability_vote = SignedStatement {
 			statement: Statement::Available(candidate_b_digest.clone()),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 		table.import_statement(&context, bad_availability_vote, None);
 
 		assert_eq!(
-			table.detected_misbehavior.get(&ValidatorId(1)).unwrap(),
+			table.detected_misbehavior.get(&AuthorityId(1)).unwrap(),
 			&Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
 				statement: SignedStatement {
 					statement: Statement::Available(candidate_b_digest),
 					signature: Signature(1),
-					sender: ValidatorId(1),
+					sender: AuthorityId(1),
 				},
 			})
 		);
 
-		// validator 2 votes for validity on 1's candidate.
+		// authority 2 votes for validity on 1's candidate.
 		let bad_validity_vote = SignedStatement {
 			statement: Statement::Valid(candidate_a_digest.clone()),
 			signature: Signature(2),
-			sender: ValidatorId(2),
+			sender: AuthorityId(2),
 		};
 		table.import_statement(&context, bad_validity_vote, None);
 
 		assert_eq!(
-			table.detected_misbehavior.get(&ValidatorId(2)).unwrap(),
+			table.detected_misbehavior.get(&AuthorityId(2)).unwrap(),
 			&Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
 				statement: SignedStatement {
 					statement: Statement::Valid(candidate_a_digest),
 					signature: Signature(2),
-					sender: ValidatorId(2),
+					sender: AuthorityId(2),
 				},
 			})
 		);
@@ -778,10 +919,10 @@ mod tests {
 	#[test]
 	fn validity_double_vote_is_misbehavior() {
 		let context = TestContext {
-			validators: {
+			authorities: {
 				let mut map = HashMap::new();
-				map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
-				map.insert(ValidatorId(2), (GroupId(2), GroupId(246)));
+				map.insert(AuthorityId(1), (GroupId(2), GroupId(455)));
+				map.insert(AuthorityId(2), (GroupId(2), GroupId(246)));
 				map
 			}
 		};
@@ -790,32 +931,32 @@ mod tests {
 		let statement = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 100)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 		let candidate_digest = Digest(100);
 
 		table.import_statement(&context, statement, None);
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
 
 		let valid_statement = SignedStatement {
 			statement: Statement::Valid(candidate_digest.clone()),
 			signature: Signature(2),
-			sender: ValidatorId(2),
+			sender: AuthorityId(2),
 		};
 
 		let invalid_statement = SignedStatement {
 			statement: Statement::Invalid(candidate_digest.clone()),
 			signature: Signature(2),
-			sender: ValidatorId(2),
+			sender: AuthorityId(2),
 		};
 
 		table.import_statement(&context, valid_statement, None);
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2)));
 
 		table.import_statement(&context, invalid_statement, None);
 
 		assert_eq!(
-			table.detected_misbehavior.get(&ValidatorId(2)).unwrap(),
+			table.detected_misbehavior.get(&AuthorityId(2)).unwrap(),
 			&Misbehavior::ValidityDoubleVote(ValidityDoubleVote::ValidityAndInvalidity(
 				candidate_digest,
 				Signature(2),
@@ -827,9 +968,9 @@ mod tests {
 	#[test]
 	fn issue_and_vote_is_misbehavior() {
 		let context = TestContext {
-			validators: {
+			authorities: {
 				let mut map = HashMap::new();
-				map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
+				map.insert(AuthorityId(1), (GroupId(2), GroupId(455)));
 				map
 			}
 		};
@@ -838,22 +979,22 @@ mod tests {
 		let statement = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 100)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 		let candidate_digest = Digest(100);
 
 		table.import_statement(&context, statement, None);
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
 
 		let extra_vote = SignedStatement {
 			statement: Statement::Valid(candidate_digest.clone()),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 
 		table.import_statement(&context, extra_vote, None);
 		assert_eq!(
-			table.detected_misbehavior.get(&ValidatorId(1)).unwrap(),
+			table.detected_misbehavior.get(&AuthorityId(1)).unwrap(),
 			&Misbehavior::ValidityDoubleVote(ValidityDoubleVote::IssuedAndValidity(
 				(Candidate(2, 100), Signature(1)),
 				(Digest(100), Signature(1)),
@@ -877,18 +1018,18 @@ mod tests {
 		assert!(!candidate.can_be_included(validity_threshold, availability_threshold));
 
 		for i in 0..validity_threshold {
-			candidate.validity_votes.insert(ValidatorId(i + 100), ValidityVote::Valid(Signature(i + 100)));
+			candidate.validity_votes.insert(AuthorityId(i + 100), ValidityVote::Valid(Signature(i + 100)));
 		}
 
 		assert!(!candidate.can_be_included(validity_threshold, availability_threshold));
 
 		for i in 0..availability_threshold {
-			candidate.availability_votes.insert(ValidatorId(i + 255), Signature(i + 255));
+			candidate.availability_votes.insert(AuthorityId(i + 255), Signature(i + 255));
 		}
 
 		assert!(candidate.can_be_included(validity_threshold, availability_threshold));
 
-		candidate.indicated_bad_by.push(ValidatorId(1024));
+		candidate.indicated_bad_by.push(AuthorityId(1024));
 
 		assert!(!candidate.can_be_included(validity_threshold, availability_threshold));
 	}
@@ -896,9 +1037,9 @@ mod tests {
 	#[test]
 	fn candidate_import_gives_summary() {
 		let context = TestContext {
-			validators: {
+			authorities: {
 				let mut map = HashMap::new();
-				map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
+				map.insert(AuthorityId(1), (GroupId(2), GroupId(455)));
 				map
 			}
 		};
@@ -907,7 +1048,7 @@ mod tests {
 		let statement = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 100)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 
 		let summary = table.import_statement(&context, statement, None)
@@ -922,10 +1063,10 @@ mod tests {
 	#[test]
 	fn candidate_vote_gives_summary() {
 		let context = TestContext {
-			validators: {
+			authorities: {
 				let mut map = HashMap::new();
-				map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
-				map.insert(ValidatorId(2), (GroupId(2), GroupId(455)));
+				map.insert(AuthorityId(1), (GroupId(2), GroupId(455)));
+				map.insert(AuthorityId(2), (GroupId(2), GroupId(455)));
 				map
 			}
 		};
@@ -934,23 +1075,23 @@ mod tests {
 		let statement = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 100)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 		let candidate_digest = Digest(100);
 
 		table.import_statement(&context, statement, None);
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
 
 		let vote = SignedStatement {
 			statement: Statement::Valid(candidate_digest.clone()),
 			signature: Signature(2),
-			sender: ValidatorId(2),
+			sender: AuthorityId(2),
 		};
 
 		let summary = table.import_statement(&context, vote, None)
 			.expect("candidate vote to give summary");
 
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2)));
 
 		assert_eq!(summary.candidate, Digest(100));
 		assert_eq!(summary.group_id, GroupId(2));
@@ -961,10 +1102,10 @@ mod tests {
 	#[test]
 	fn availability_vote_gives_summary() {
 		let context = TestContext {
-			validators: {
+			authorities: {
 				let mut map = HashMap::new();
-				map.insert(ValidatorId(1), (GroupId(2), GroupId(455)));
-				map.insert(ValidatorId(2), (GroupId(5), GroupId(2)));
+				map.insert(AuthorityId(1), (GroupId(2), GroupId(455)));
+				map.insert(AuthorityId(2), (GroupId(5), GroupId(2)));
 				map
 			}
 		};
@@ -973,27 +1114,78 @@ mod tests {
 		let statement = SignedStatement {
 			statement: Statement::Candidate(Candidate(2, 100)),
 			signature: Signature(1),
-			sender: ValidatorId(1),
+			sender: AuthorityId(1),
 		};
 		let candidate_digest = Digest(100);
 
 		table.import_statement(&context, statement, None);
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(1)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(1)));
 
 		let vote = SignedStatement {
 			statement: Statement::Available(candidate_digest.clone()),
 			signature: Signature(2),
-			sender: ValidatorId(2),
+			sender: AuthorityId(2),
 		};
 
 		let summary = table.import_statement(&context, vote, None)
 			.expect("candidate vote to give summary");
 
-		assert!(!table.detected_misbehavior.contains_key(&ValidatorId(2)));
+		assert!(!table.detected_misbehavior.contains_key(&AuthorityId(2)));
 
 		assert_eq!(summary.candidate, Digest(100));
 		assert_eq!(summary.group_id, GroupId(2));
 		assert_eq!(summary.validity_votes, 1);
 		assert_eq!(summary.availability_votes, 1);
 	}
+
+	#[test]
+	fn filling_batch_sets_known_flag() {
+		let context = TestContext {
+			authorities: {
+				let mut map = HashMap::new();
+				for i in 1..10 {
+					map.insert(AuthorityId(i), (GroupId(2), GroupId(400 + i)));
+				}
+				map
+			}
+		};
+
+		let mut table = create();
+		let statement = SignedStatement {
+			statement: Statement::Candidate(Candidate(2, 100)),
+			signature: Signature(1),
+			sender: AuthorityId(1),
+		};
+
+		table.import_statement(&context, statement, None);
+
+		for i in 2..10 {
+			let statement = SignedStatement {
+				statement: Statement::Valid(Digest(100)),
+				signature: Signature(i),
+				sender: AuthorityId(i),
+			};
+
+			table.import_statement(&context, statement, None);
+		}
+
+		let mut batch = VecBatch {
+			max_len: 5,
+			targets: (1..10).map(AuthorityId).collect(),
+			items: Vec::new(),
+		};
+
+		// 9 statements in the table, each seen by one.
+		table.fill_batch(&mut batch);
+		assert_eq!(batch.items.len(), 5);
+
+		// 9 statements in the table, 5 of which seen by all targets.
+		batch.items.clear();
+		table.fill_batch(&mut batch);
+		assert_eq!(batch.items.len(), 4);
+
+		batch.items.clear();
+		table.fill_batch(&mut batch);
+		assert!(batch.items.is_empty());
+	}
 }
diff --git a/substrate/candidate-agreement/src/tests/mod.rs b/substrate/candidate-agreement/src/tests/mod.rs
new file mode 100644
index 0000000000000000000000000000000000000000..1599a94aa69b7f5c1c6f987c9e1f4da572a28be2
--- /dev/null
+++ b/substrate/candidate-agreement/src/tests/mod.rs
@@ -0,0 +1,385 @@
+// Copyright 2017 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
+
+//! Tests and test helpers for the candidate agreement.
+
+const VALIDITY_CHECK_DELAY_MS: u64 = 100;
+const AVAILABILITY_CHECK_DELAY_MS: u64 = 100;
+const PROPOSAL_FORMATION_TICK_MS: u64 = 50;
+const PROPAGATE_STATEMENTS_TICK_MS: u64 = 200;
+const TIMER_TICK_DURATION_MS: u64 = 10;
+
+use std::collections::HashMap;
+
+use futures::prelude::*;
+use futures::sync::mpsc;
+use tokio_timer::Timer;
+
+use super::*;
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)]
+struct AuthorityId(usize);
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
+struct Digest(Vec<usize>);
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
+struct GroupId(usize);
+
+#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone)]
+struct ParachainCandidate {
+	group: GroupId,
+	data: usize,
+}
+
+#[derive(PartialEq, Eq, Debug, Clone)]
+struct Proposal {
+	candidates: Vec<ParachainCandidate>,
+}
+
+#[derive(PartialEq, Eq, Debug, Clone)]
+enum Signature {
+	Table(AuthorityId, table::Statement<ParachainCandidate, Digest>),
+	Bft(AuthorityId, bft::Message<Proposal, Digest>),
+}
+
+enum Error {
+	Timer(tokio_timer::TimerError),
+	NetOut,
+	NetIn,
+}
+
+#[derive(Debug, Clone)]
+struct SharedTestContext {
+	n_authorities: usize,
+	n_groups: usize,
+	timer: Timer,
+}
+
+#[derive(Debug, Clone)]
+struct TestContext {
+	shared: Arc<SharedTestContext>,
+	local_id: AuthorityId,
+}
+
+impl Context for TestContext {
+	type AuthorityId = AuthorityId;
+	type Digest = Digest;
+	type GroupId = GroupId;
+	type Signature = Signature;
+	type Proposal = Proposal;
+	type ParachainCandidate = ParachainCandidate;
+
+	type CheckCandidate = Box<Future<Item=bool,Error=Error>>;
+	type CheckAvailability = Box<Future<Item=bool,Error=Error>>;
+
+	type StatementBatch = VecBatch<
+		AuthorityId,
+		table::SignedStatement<ParachainCandidate, Digest, AuthorityId, Signature>
+	>;
+
+	fn candidate_digest(candidate: &ParachainCandidate) -> Digest {
+		Digest(vec![candidate.group.0, candidate.data])
+	}
+
+	fn proposal_digest(candidate: &Proposal) -> Digest {
+		Digest(candidate.candidates.iter().fold(Vec::new(), |mut a, c| {
+			a.extend(Self::candidate_digest(c).0);
+			a
+		}))
+	}
+
+	fn candidate_group(candidate: &ParachainCandidate) -> GroupId {
+		candidate.group.clone()
+	}
+
+	fn round_proposer(&self, round: usize) -> AuthorityId {
+		AuthorityId(round % self.shared.n_authorities)
+	}
+
+	fn check_validity(&self, _candidate: &ParachainCandidate) -> Self::CheckCandidate {
+		let future = self.shared.timer
+			.sleep(::std::time::Duration::from_millis(VALIDITY_CHECK_DELAY_MS))
+			.map_err(Error::Timer)
+			.map(|_| true);
+
+		Box::new(future)
+	}
+
+	fn check_availability(&self, _candidate: &ParachainCandidate) -> Self::CheckAvailability {
+		let future = self.shared.timer
+			.sleep(::std::time::Duration::from_millis(AVAILABILITY_CHECK_DELAY_MS))
+			.map_err(Error::Timer)
+			.map(|_| true);
+
+		Box::new(future)
+	}
+
+	fn create_proposal(&self, candidates: Vec<&ParachainCandidate>)
+		-> Option<Proposal>
+	{
+		let t = self.shared.n_groups * 2 / 3;
+		if candidates.len() >= t {
+			Some(Proposal {
+				candidates: candidates.iter().map(|x| (&**x).clone()).collect()
+			})
+		} else {
+			None
+		}
+	}
+
+	fn proposal_valid<F>(&self, proposal: &Proposal, check_candidate: F) -> bool
+		where F: FnMut(&ParachainCandidate) -> bool
+	{
+		if proposal.candidates.len() >= self.shared.n_groups * 2 / 3 {
+			proposal.candidates.iter().all(check_candidate)
+		} else {
+			false
+		}
+	}
+
+	fn local_id(&self) -> AuthorityId {
+		self.local_id.clone()
+	}
+
+	fn sign_table_statement(
+		&self,
+		statement: &table::Statement<ParachainCandidate, Digest>
+	) -> Signature {
+		Signature::Table(self.local_id(), statement.clone())
+	}
+
+	fn sign_bft_message(&self, message: &bft::Message<Proposal, Digest>) -> Signature {
+		Signature::Bft(self.local_id(), message.clone())
+	}
+}
+
+struct TestRecovery;
+
+impl MessageRecovery<TestContext> for TestRecovery {
+	type UncheckedMessage = OutgoingMessage<TestContext>;
+
+	fn check_message(&self, msg: Self::UncheckedMessage) -> Option<CheckedMessage<TestContext>> {
+		Some(match msg {
+			OutgoingMessage::Bft(c) => CheckedMessage::Bft(c),
+			OutgoingMessage::Table(batch) => CheckedMessage::Table(batch.items),
+		})
+	}
+}
+
+pub struct Network<T> {
+	endpoints: Vec<mpsc::UnboundedSender<T>>,
+	input: mpsc::UnboundedReceiver<(usize, T)>,
+}
+
+impl<T: Clone + Send + 'static> Network<T> {
+	pub fn new(nodes: usize)
+		-> (Self, Vec<mpsc::UnboundedSender<(usize, T)>>, Vec<mpsc::UnboundedReceiver<T>>)
+	{
+		let mut inputs = Vec::with_capacity(nodes);
+		let mut outputs = Vec::with_capacity(nodes);
+		let mut endpoints = Vec::with_capacity(nodes);
+
+		let (in_tx, in_rx) = mpsc::unbounded();
+		for _ in 0..nodes {
+			let (out_tx, out_rx) = mpsc::unbounded();
+			inputs.push(in_tx.clone());
+			outputs.push(out_rx);
+			endpoints.push(out_tx);
+		}
+
+		let network = Network {
+			endpoints,
+			input: in_rx,
+		};
+
+		(network, inputs, outputs)
+	}
+
+	pub fn route_on_thread(self) {
+		::std::thread::spawn(move || { let _ = self.wait(); });
+	}
+}
+
+impl<T: Clone> Future for Network<T> {
+	type Item = ();
+	type Error = ();
+
+	fn poll(&mut self) -> Poll<(), Self::Error> {
+		match try_ready!(self.input.poll()) {
+			None => Ok(Async::Ready(())),
+			Some((sender, item)) => {
+				{
+					let receiving_endpoints = self.endpoints
+						.iter()
+						.enumerate()
+						.filter(|&(i, _)| i != sender)
+						.map(|(_, x)| x);
+
+					for endpoint in receiving_endpoints {
+						let _ = endpoint.unbounded_send(item.clone());
+					}
+				}
+
+				self.poll()
+			}
+		}
+	}
+}
+
+#[derive(Debug, Clone)]
+pub struct VecBatch<V, T> {
+	pub max_len: usize,
+	pub targets: Vec<V>,
+	pub items: Vec<T>,
+}
+
+impl<V, T> ::StatementBatch<V, T> for VecBatch<V, T> {
+	fn targets(&self) -> &[V] { &self.targets }
+	fn is_empty(&self) -> bool { self.items.is_empty() }
+	fn push(&mut self, item: T) -> bool {
+		if self.items.len() == self.max_len {
+			false
+		} else {
+			self.items.push(item);
+			true
+		}
+	}
+}
+
+fn make_group_assignments(n_authorities: usize, n_groups: usize)
+	-> HashMap<GroupId, GroupInfo<AuthorityId>>
+{
+	let mut map = HashMap::new();
+	let threshold = (n_authorities / n_groups) / 2;
+	let make_blank_group = || {
+		GroupInfo {
+			validity_guarantors: HashSet::new(),
+			availability_guarantors: HashSet::new(),
+			needed_validity: threshold,
+			needed_availability: threshold,
+		}
+	};
+
+	// every authority checks validity of his ID modulo n_groups and
+	// guarantees availability for the group above that.
+	for a_id in 0..n_authorities {
+		let primary_group = a_id % n_groups;
+		let availability_groups = [
+			(a_id + 1) % n_groups,
+			a_id.wrapping_sub(1) % n_groups,
+		];
+
+		map.entry(GroupId(primary_group))
+			.or_insert_with(&make_blank_group)
+			.validity_guarantors
+			.insert(AuthorityId(a_id));
+
+		for &availability_group in &availability_groups {
+			map.entry(GroupId(availability_group))
+				.or_insert_with(&make_blank_group)
+				.availability_guarantors
+				.insert(AuthorityId(a_id));
+		}
+	}
+
+	map
+}
+
+fn make_blank_batch<T>(n_authorities: usize) -> VecBatch<AuthorityId, T> {
+	VecBatch {
+		max_len: 20,
+		targets: (0..n_authorities).map(AuthorityId).collect(),
+		items: Vec::new(),
+	}
+}
+
+#[test]
+fn consensus_completes_with_minimum_good() {
+	let n = 50;
+	let f = 16;
+	let n_groups = 10;
+
+	let timer = ::tokio_timer::wheel()
+		.tick_duration(Duration::from_millis(TIMER_TICK_DURATION_MS))
+		.num_slots(1 << 16)
+		.build();
+
+	let (network, inputs, outputs) = Network::<(AuthorityId, OutgoingMessage<TestContext>)>::new(n - f);
+	network.route_on_thread();
+
+	let shared_test_context = Arc::new(SharedTestContext {
+		n_authorities: n,
+		n_groups: n_groups,
+		timer: timer.clone(),
+	});
+
+	let groups = make_group_assignments(n, n_groups);
+
+	let authorities = inputs.into_iter().zip(outputs).enumerate().map(|(raw_id, (input, output))| {
+		let id = AuthorityId(raw_id);
+		let context = TestContext {
+			shared: shared_test_context.clone(),
+			local_id: id,
+		};
+
+		let shared_table = SharedTable::new(context.clone(), groups.clone());
+		let params = AgreementParams {
+			context,
+			timer: timer.clone(),
+			table: shared_table,
+			nodes: n,
+			max_faulty: f,
+			round_timeout_multiplier: 4,
+			message_buffer_size: 100,
+			form_proposal_interval: Duration::from_millis(PROPOSAL_FORMATION_TICK_MS),
+		};
+
+		let net_out = input
+			.sink_map_err(|_| Error::NetOut)
+			.with(move |x| Ok::<_, Error>((id.0, (id, x))) );
+
+		let net_in = output
+			.map_err(|_| Error::NetIn)
+			.map(move |(v, msg)| (v, vec![msg]));
+
+		let propagate_statements = timer
+			.interval(Duration::from_millis(PROPAGATE_STATEMENTS_TICK_MS))
+			.map(move |()| make_blank_batch(n))
+			.map_err(Error::Timer);
+
+		let local_candidate = if raw_id < n_groups {
+			let candidate = ParachainCandidate {
+				group: GroupId(raw_id),
+				data: raw_id,
+			};
+			::futures::future::Either::A(Ok::<_, Error>(candidate).into_future())
+		} else {
+			::futures::future::Either::B(::futures::future::empty())
+		};
+
+		agree::<_, _, _, _, _, _, Error>(
+			params,
+			net_in,
+			net_out,
+			TestRecovery,
+			propagate_statements,
+			local_candidate
+		)
+	}).collect::<Vec<_>>();
+
+	futures::future::join_all(authorities).wait().unwrap();
+}