From 25cfb884af297b873b8b43b437e7be3adcd7bb12 Mon Sep 17 00:00:00 2001
From: Peter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Date: Thu, 28 Jan 2021 15:19:05 +0100
Subject: [PATCH] misbehavior: report multiple offenses per validator as
 necessary (#2222)

* use proper descriptive generic type names

* cleanup

* Table stores a list of detected misbehavior per authority

* add Table::drain_misbehaviors_for

* WIP: unify misbehavior types; report multiple misbehaviors per validator

Code checks, but tests don't yet pass.

* update drain_misbehaviors: return authority id as well as specific misbehavior

* enable unchecked construction of Signed structs in tests

* remove test-features feature & unnecessary generic

* fix backing tests

This took a while to figure out, because where we'd previously been
passing around `SignedFullStatement`s, we now needed to construct
those on the fly within the test, to take advantage of the signature-
checking in the constructor. That, in turn, necessitated changing the
iterable type of `drain_misbehaviors` to return the validator index,
and passing that validator index along within the misbehavior report.

Once that was sorted, however, it became relatively straightforward:
just needed to add appropriate methods to deconstruct the misbehavior
reports, and then we could construct the signed statements directly.

* fix bad merge
---
 polkadot/node/core/backing/src/lib.rs         | 119 ++--
 polkadot/node/primitives/src/lib.rs           | 140 +----
 polkadot/node/subsystem/src/messages.rs       |   5 +-
 .../src/types/overseer-protocol.md            | 588 ++++++++++--------
 polkadot/statement-table/src/generic.rs       | 328 ++++++----
 5 files changed, 605 insertions(+), 575 deletions(-)

diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs
index 8b9fdfb8756..affb6f32798 100644
--- a/polkadot/node/core/backing/src/lib.rs
+++ b/polkadot/node/core/backing/src/lib.rs
@@ -19,7 +19,6 @@
 #![deny(unused_crate_dependencies)]
 
 use std::collections::{HashMap, HashSet};
-use std::convert::TryFrom;
 use std::pin::Pin;
 use std::sync::Arc;
 
@@ -28,13 +27,12 @@ use futures::{channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt};
 
 use sp_keystore::SyncCryptoStorePtr;
 use polkadot_primitives::v1::{
-	CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
-	ValidatorIndex, SigningContext, PoV, CandidateHash,
-	CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
-	CoreState, CoreIndex, CollatorId, ValidityAttestation, CandidateCommitments,
+	AvailableData, BackedCandidate, CandidateCommitments, CandidateDescriptor, CandidateHash,
+	CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreIndex, CoreState, Hash, Id as ParaId,
+	PoV, SigningContext, ValidatorId, ValidatorIndex, ValidatorSignature, ValidityAttestation,
 };
 use polkadot_node_primitives::{
-	FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
+	Statement, SignedFullStatement, ValidationResult,
 };
 use polkadot_subsystem::{
 	JaegerSpan, PerLeafSpan,
@@ -60,8 +58,9 @@ use statement_table::{
 	Context as TableContextTrait,
 	Table,
 	v1::{
+		SignedStatement as TableSignedStatement,
 		Statement as TableStatement,
-		SignedStatement as TableSignedStatement, Summary as TableSummary,
+		Summary as TableSummary,
 	},
 };
 use thiserror::Error;
@@ -145,8 +144,6 @@ struct CandidateBackingJob {
 	/// The candidates that are includable, by hash. Each entry here indicates
 	/// that we've sent the provisioner the backed candidate.
 	backed: HashSet<CandidateHash>,
-	/// We have already reported misbehaviors for these validators.
-	reported_misbehavior_for: HashSet<ValidatorIndex>,
 	keystore: SyncCryptoStorePtr,
 	table: Table<TableContext>,
 	table_context: TableContext,
@@ -644,36 +641,17 @@ impl CandidateBackingJob {
 	}
 
 	/// Check if there have happened any new misbehaviors and issue necessary messages.
-	///
-	/// TODO: Report multiple misbehaviors (https://github.com/paritytech/polkadot/issues/1387)
 	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
 	async fn issue_new_misbehaviors(&mut self) -> Result<(), Error> {
-		let mut reports = Vec::new();
-
-		for (k, v) in self.table.get_misbehavior().iter() {
-			if !self.reported_misbehavior_for.contains(k) {
-				self.reported_misbehavior_for.insert(*k);
-
-				let f = FromTableMisbehavior {
-					id: *k,
-					report: v.clone(),
-					signing_context: self.table_context.signing_context.clone(),
-					key: self.table_context.validators[*k as usize].clone(),
-				};
-
-				if let Ok(report) = MisbehaviorReport::try_from(f) {
-					let message = ProvisionerMessage::ProvisionableData(
-						self.parent,
-						ProvisionableData::MisbehaviorReport(self.parent, report),
-					);
-
-					reports.push(message);
-				}
-			}
-		}
-
-		for report in reports.drain(..) {
-			self.send_to_provisioner(report).await?
+		// collect the misbehaviors to avoid double mutable self borrow issues
+		let misbehaviors: Vec<_> = self.table.drain_misbehaviors().collect();
+		for (validator_id, report) in misbehaviors {
+			self.send_to_provisioner(
+				ProvisionerMessage::ProvisionableData(
+					self.parent,
+					ProvisionableData::MisbehaviorReport(self.parent, validator_id, report)
+				)
+			).await?
 		}
 
 		Ok(())
@@ -1086,7 +1064,6 @@ impl util::JobTrait for CandidateBackingJob {
 				seconded: None,
 				unbacked_candidates: HashMap::new(),
 				backed: HashSet::new(),
-				reported_misbehavior_for: HashSet::new(),
 				keystore,
 				table: Table::default(),
 				table_context,
@@ -1199,9 +1176,7 @@ mod tests {
 	use super::*;
 	use assert_matches::assert_matches;
 	use futures::{future, Future};
-	use polkadot_primitives::v1::{
-		ScheduledCore, BlockData, PersistedValidationData, HeadData, GroupRotationInfo,
-	};
+	use polkadot_primitives::v1::{BlockData, GroupRotationInfo, HeadData, PersistedValidationData, ScheduledCore};
 	use polkadot_subsystem::{
 		messages::{RuntimeApiRequest, RuntimeApiMessage},
 		ActiveLeavesUpdate, FromOverseer, OverseerSignal,
@@ -1210,12 +1185,23 @@ mod tests {
 	use sp_keyring::Sr25519Keyring;
 	use sp_application_crypto::AppKey;
 	use sp_keystore::{CryptoStore, SyncCryptoStore};
+	use statement_table::v1::Misbehavior;
 	use std::collections::HashMap;
 
 	fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
 		val_ids.iter().map(|v| v.public().into()).collect()
 	}
 
+	fn table_statement_to_primitive(
+		statement: TableStatement,
+	) -> Statement {
+		match statement {
+			TableStatement::Candidate(committed_candidate_receipt) => Statement::Seconded(committed_candidate_receipt),
+			TableStatement::Valid(candidate_hash) => Statement::Valid(candidate_hash),
+			TableStatement::Invalid(candidate_hash) => Statement::Invalid(candidate_hash),
+		}
+	}
+
 	struct TestState {
 		chain_ids: Vec<ParaId>,
 		keystore: SyncCryptoStorePtr,
@@ -1950,19 +1936,30 @@ mod tests {
 						_,
 						ProvisionableData::MisbehaviorReport(
 							relay_parent,
-							MisbehaviorReport::SelfContradiction(_, s1, s2),
+							validator_index,
+							Misbehavior::ValidityDoubleVote(vdv),
 						)
 					)
 				) if relay_parent == test_state.relay_parent => {
-					s1.check_signature(
+					let ((t1, s1), (t2, s2)) = vdv.deconstruct::<TableContext>();
+					let t1 = table_statement_to_primitive(t1);
+					let t2 = table_statement_to_primitive(t2);
+
+					SignedFullStatement::new(
+						t1,
+						validator_index,
+						s1,
 						&test_state.signing_context,
-						&test_state.validator_public[s1.validator_index() as usize],
-					).unwrap();
+						&test_state.validator_public[validator_index as usize],
+					).expect("signature must be valid");
 
-					s2.check_signature(
+					SignedFullStatement::new(
+						t2,
+						validator_index,
+						s2,
 						&test_state.signing_context,
-						&test_state.validator_public[s2.validator_index() as usize],
-					).unwrap();
+						&test_state.validator_public[validator_index as usize],
+					).expect("signature must be valid");
 				}
 			);
 
@@ -1979,19 +1976,30 @@ mod tests {
 						_,
 						ProvisionableData::MisbehaviorReport(
 							relay_parent,
-							MisbehaviorReport::SelfContradiction(_, s1, s2),
+							validator_index,
+							Misbehavior::ValidityDoubleVote(vdv),
 						)
 					)
 				) if relay_parent == test_state.relay_parent => {
-					s1.check_signature(
+					let ((t1, s1), (t2, s2)) = vdv.deconstruct::<TableContext>();
+					let t1 = table_statement_to_primitive(t1);
+					let t2 = table_statement_to_primitive(t2);
+
+					SignedFullStatement::new(
+						t1,
+						validator_index,
+						s1,
 						&test_state.signing_context,
-						&test_state.validator_public[s1.validator_index() as usize],
-					).unwrap();
+						&test_state.validator_public[validator_index as usize],
+					).expect("signature must be valid");
 
-					s2.check_signature(
+					SignedFullStatement::new(
+						t2,
+						validator_index,
+						s2,
 						&test_state.signing_context,
-						&test_state.validator_public[s2.validator_index() as usize],
-					).unwrap();
+						&test_state.validator_public[validator_index as usize],
+					).expect("signature must be valid");
 				}
 			);
 		});
@@ -2464,6 +2472,7 @@ mod tests {
 	#[test]
 	fn candidate_backing_reorders_votes() {
 		use sp_core::Encode;
+		use std::convert::TryFrom;
 
 		let relay_parent = [1; 32].into();
 		let para_id = ParaId::from(10);
diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs
index 6c4e63861c1..e7545b146cb 100644
--- a/polkadot/node/primitives/src/lib.rs
+++ b/polkadot/node/primitives/src/lib.rs
@@ -25,17 +25,9 @@
 use futures::Future;
 use parity_scale_codec::{Decode, Encode};
 use polkadot_primitives::v1::{
-	Hash, CommittedCandidateReceipt, CandidateReceipt, CompactStatement,
-	EncodeAs, Signed, SigningContext, ValidatorIndex, ValidatorId,
-	UpwardMessage, ValidationCode, PersistedValidationData,
-	HeadData, PoV, CollatorPair, Id as ParaId, OutboundHrmpMessage, CandidateCommitments, CandidateHash,
-};
-use polkadot_statement_table::{
-	generic::{
-		ValidityDoubleVote as TableValidityDoubleVote,
-		MultipleCandidates as TableMultipleCandidates,
-	},
-	v1::Misbehavior as TableMisbehavior,
+	CandidateCommitments, CandidateHash, CollatorPair, CommittedCandidateReceipt, CompactStatement,
+	EncodeAs, Hash, HeadData, Id as ParaId, OutboundHrmpMessage, PersistedValidationData, PoV,
+	Signed, UpwardMessage, ValidationCode,
 };
 use std::pin::Pin;
 
@@ -105,36 +97,6 @@ impl EncodeAs<CompactStatement> for Statement {
 /// Only the compact `SignedStatement` is suitable for submission to the chain.
 pub type SignedFullStatement = Signed<Statement, CompactStatement>;
 
-/// A misbehaviour report.
-#[derive(Debug, Clone)]
-pub enum MisbehaviorReport {
-	/// These validator nodes disagree on this candidate's validity, please figure it out
-	///
-	/// Most likely, the list of statments all agree except for the final one. That's not
-	/// guaranteed, though; if somehow we become aware of lots of
-	/// statements disagreeing about the validity of a candidate before taking action,
-	/// this message should be dispatched with all of them, in arbitrary order.
-	///
-	/// This variant is also used when our own validity checks disagree with others'.
-	CandidateValidityDisagreement(CandidateReceipt, Vec<SignedFullStatement>),
-	/// I've noticed a peer contradicting itself about a particular candidate
-	SelfContradiction(CandidateReceipt, SignedFullStatement, SignedFullStatement),
-	/// This peer has seconded more than one parachain candidate for this relay parent head
-	DoubleVote(SignedFullStatement, SignedFullStatement),
-}
-
-/// A utility struct used to convert `TableMisbehavior` to `MisbehaviorReport`s.
-pub struct FromTableMisbehavior {
-	/// Index of the validator.
-	pub id: ValidatorIndex,
-	/// The misbehavior reported by the table.
-	pub report: TableMisbehavior,
-	/// Signing context.
-	pub signing_context: SigningContext,
-	/// Misbehaving validator's public key.
-	pub key: ValidatorId,
-}
-
 /// Candidate invalidity details
 #[derive(Debug)]
 pub enum InvalidCandidate {
@@ -170,102 +132,6 @@ pub enum ValidationResult {
 	Invalid(InvalidCandidate),
 }
 
-impl std::convert::TryFrom<FromTableMisbehavior> for MisbehaviorReport {
-	type Error = ();
-
-	fn try_from(f: FromTableMisbehavior) -> Result<Self, Self::Error> {
-		match f.report {
-			TableMisbehavior::ValidityDoubleVote(
-				TableValidityDoubleVote::IssuedAndValidity((c, s1), (d, s2))
-			) => {
-				let receipt = c.clone();
-				let signed_1 = SignedFullStatement::new(
-					Statement::Seconded(c),
-					f.id,
-					s1,
-					&f.signing_context,
-					&f.key,
-				).ok_or(())?;
-				let signed_2 = SignedFullStatement::new(
-					Statement::Valid(d),
-					f.id,
-					s2,
-					&f.signing_context,
-					&f.key,
-				).ok_or(())?;
-
-				Ok(MisbehaviorReport::SelfContradiction(receipt.to_plain(), signed_1, signed_2))
-			}
-			TableMisbehavior::ValidityDoubleVote(
-				TableValidityDoubleVote::IssuedAndInvalidity((c, s1), (d, s2))
-			) => {
-				let receipt = c.clone();
-				let signed_1 = SignedFullStatement::new(
-					Statement::Seconded(c),
-					f.id,
-					s1,
-					&f.signing_context,
-					&f.key,
-				).ok_or(())?;
-				let signed_2 = SignedFullStatement::new(
-					Statement::Invalid(d),
-					f.id,
-					s2,
-					&f.signing_context,
-					&f.key,
-				).ok_or(())?;
-
-				Ok(MisbehaviorReport::SelfContradiction(receipt.to_plain(), signed_1, signed_2))
-			}
-			TableMisbehavior::ValidityDoubleVote(
-				TableValidityDoubleVote::ValidityAndInvalidity(c, s1, s2)
-			) => {
-				let signed_1 = SignedFullStatement::new(
-					Statement::Valid(c.hash()),
-					f.id,
-					s1,
-					&f.signing_context,
-					&f.key,
-				).ok_or(())?;
-				let signed_2 = SignedFullStatement::new(
-					Statement::Invalid(c.hash()),
-					f.id,
-					s2,
-					&f.signing_context,
-					&f.key,
-				).ok_or(())?;
-
-				Ok(MisbehaviorReport::SelfContradiction(c.to_plain(), signed_1, signed_2))
-			}
-			TableMisbehavior::MultipleCandidates(
-				TableMultipleCandidates {
-					first,
-					second,
-				}
-			) => {
-				let signed_1 = SignedFullStatement::new(
-					Statement::Seconded(first.0),
-					f.id,
-					first.1,
-					&f.signing_context,
-					&f.key,
-				).ok_or(())?;
-
-				let signed_2 = SignedFullStatement::new(
-					Statement::Seconded(second.0),
-					f.id,
-					second.1,
-					&f.signing_context,
-					&f.key,
-				).ok_or(())?;
-
-				Ok(MisbehaviorReport::DoubleVote(signed_1, signed_2))
-			}
-			_ => Err(()),
-		}
-	}
-}
-
 /// The output of a collator.
 ///
 /// This differs from `CandidateCommitments` in two ways:
diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs
index e47ebf8326d..daff6dc4c13 100644
--- a/polkadot/node/subsystem/src/messages.rs
+++ b/polkadot/node/subsystem/src/messages.rs
@@ -28,7 +28,7 @@ use polkadot_node_network_protocol::{
 	v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId,
 };
 use polkadot_node_primitives::{
-	CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult,
+	CollationGenerationConfig, SignedFullStatement, ValidationResult,
 	approval::{BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote},
 };
 use polkadot_primitives::v1::{
@@ -41,6 +41,7 @@ use polkadot_primitives::v1::{
 	ValidatorIndex, ValidatorSignature, InboundDownwardMessage, InboundHrmpMessage,
 	CandidateIndex,
 };
+use polkadot_statement_table::v1::Misbehavior;
 use std::{sync::Arc, collections::btree_map::BTreeMap};
 
 /// Subsystem messages where each message is always bound to a relay parent.
@@ -508,7 +509,7 @@ pub enum ProvisionableData {
 	/// The Candidate Backing subsystem believes that this candidate is valid, pending availability.
 	BackedCandidate(CandidateReceipt),
 	/// Misbehavior reports are self-contained proofs of validator misbehavior.
-	MisbehaviorReport(Hash, MisbehaviorReport),
+	MisbehaviorReport(Hash, ValidatorIndex, Misbehavior),
 	/// Disputes trigger a broad dispute resolution process.
 	Dispute(Hash, ValidatorSignature),
 }
diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
index ebdfa37da2b..d545c64a72f 100644
--- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
+++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md
@@ -30,8 +30,8 @@ Indicates a change in active leaves. Activated leaves should have jobs, whereas
 
 ```rust
 struct ActiveLeavesUpdate {
-	activated: [Hash], // in practice, these should probably be a SmallVec
-	deactivated: [Hash],
+    activated: [Hash], // in practice, these should probably be a SmallVec
+    deactivated: [Hash],
 }
 ```
 
@@ -41,46 +41,46 @@ Messages received by the approval voting subsystem.
 
 ```rust
 enum AssignmentCheckResult {
-	// The vote was accepted and should be propagated onwards.
-	Accepted,
-	// The vote was valid but duplicate and should not be propagated onwards.
-	AcceptedDuplicate,
-	// The vote was valid but too far in the future to accept right now.
-	TooFarInFuture,
-	// The vote was bad and should be ignored, reporting the peer who propagated it.
-	Bad,
+    // The vote was accepted and should be propagated onwards.
+    Accepted,
+    // The vote was valid but duplicate and should not be propagated onwards.
+    AcceptedDuplicate,
+    // The vote was valid but too far in the future to accept right now.
+    TooFarInFuture,
+    // The vote was bad and should be ignored, reporting the peer who propagated it.
+    Bad,
 }
 
 enum ApprovalCheckResult {
-	// The vote was accepted and should be propagated onwards.
-	Accepted,
-	// The vote was bad and should be ignored, reporting the peer who propagated it.
-	Bad,
+    // The vote was accepted and should be propagated onwards.
+    Accepted,
+    // The vote was bad and should be ignored, reporting the peer who propagated it.
+    Bad,
 }
 
 enum ApprovalVotingMessage {
-	/// Check if the assignment is valid and can be accepted by our view of the protocol.
-	/// Should not be sent unless the block hash is known.
-	CheckAndImportAssignment(
-		IndirectAssignmentCert,
-		ResponseChannel<AssignmentCheckResult>,
-	),
-	/// Check if the approval vote is valid and can be accepted by our view of the
-	/// protocol.
-	///
-	/// Should not be sent unless the block hash within the indirect vote is known.
-	CheckAndImportApproval(
-		IndirectSignedApprovalVote,
-		ResponseChannel<ApprovalCheckResult>,
-	),
-	/// Returns the highest possible ancestor hash of the provided block hash which is
-	/// acceptable to vote on finality for.
-	/// The `BlockNumber` provided is the number of the block's ancestor which is the
-	/// earliest possible vote.
-	///
-	/// It can also return the same block hash, if that is acceptable to vote upon.
-	/// Return `None` if the input hash is unrecognized.
-	ApprovedAncestor(Hash, BlockNumber, ResponseChannel<Option<Hash>>),
+    /// Check if the assignment is valid and can be accepted by our view of the protocol.
+    /// Should not be sent unless the block hash is known.
+    CheckAndImportAssignment(
+        IndirectAssignmentCert,
+        ResponseChannel<AssignmentCheckResult>,
+    ),
+    /// Check if the approval vote is valid and can be accepted by our view of the
+    /// protocol.
+    ///
+    /// Should not be sent unless the block hash within the indirect vote is known.
+    CheckAndImportApproval(
+        IndirectSignedApprovalVote,
+        ResponseChannel<ApprovalCheckResult>,
+    ),
+    /// Returns the highest possible ancestor hash of the provided block hash which is
+    /// acceptable to vote on finality for.
+    /// The `BlockNumber` provided is the number of the block's ancestor which is the
+    /// earliest possible vote.
+    ///
+    /// It can also return the same block hash, if that is acceptable to vote upon.
+    /// Return `None` if the input hash is unrecognized.
+    ApprovedAncestor(Hash, BlockNumber, ResponseChannel<Option<Hash>>),
 }
 ```
 
@@ -91,32 +91,35 @@ Messages received by the approval Distribution subsystem.
 ```rust
 /// Metadata about a block which is now live in the approval protocol.
 struct BlockApprovalMeta {
-	/// The hash of the block.
-	hash: Hash,
-	/// The number of the block.
-	number: BlockNumber,
-	/// The hash of the parent block.
-	parent_hash: Hash,
-	/// The candidates included by the block. Note that these are not the same as the candidates that appear within the
-	/// block body.
-	candidates: Vec<CandidateHash>,
-	/// The consensus slot number of the block.
-	slot_number: SlotNumber,
+    /// The hash of the block.
+    hash: Hash,
+    /// The number of the block.
+    number: BlockNumber,
+    /// The candidates included by the block. Note that these are not the same as the candidates that appear within the
+    /// block body.
+    parent_hash: Hash,
+    /// The candidates included by the block. Note that these are not the same as the candidates that appear within the
+    /// block body.
+    candidates: Vec<CandidateHash>,
+    /// The consensus slot number of the block.
+    slot_number: SlotNumber,
 }
 
 enum ApprovalDistributionMessage {
-	/// Notify the `ApprovalDistribution` subsystem about new blocks and the candidates contained within
-	/// them.
-	NewBlocks(Vec<BlockApprovalMeta>),
-	/// Distribute an assignment cert from the local validator. The cert is assumed
-	/// to be valid, relevant, and for the given relay-parent and validator index.
-	DistributeAssignment(IndirectAssignmentCert, CandidateIndex),
-	/// Distribute an approval vote for the local validator. The approval vote is assumed to be
-	/// valid, relevant, and the corresponding approval already issued. If not, the subsystem is free to drop
-	/// the message.
-	DistributeApproval(IndirectSignedApprovalVote),
-	/// An update from the network bridge.
-	NetworkBridgeUpdateV1(NetworkBridgeEvent<ApprovalDistributionV1Message>),
+    /// Notify the `ApprovalDistribution` subsystem about new blocks and the candidates contained within
+    /// them.
+    NewBlocks(Vec<BlockApprovalMeta>),
+    /// Distribute an assignment cert from the local validator. The cert is assumed
+    /// to be valid, relevant, and for the given relay-parent and validator index.
+    ///
+    /// The `u32` param is the candidate index in the fully-included list.
+    DistributeAssignment(IndirectAssignmentCert, u32),
+    /// Distribute an approval vote for the local validator. The approval vote is assumed to be
+    /// valid, relevant, and the corresponding approval already issued. If not, the subsystem is free to drop
+    /// the message.
+    DistributeApproval(IndirectSignedApprovalVote),
+    /// An update from the network bridge.
+    NetworkBridgeUpdateV1(NetworkBridgeEvent<ApprovalDistributionV1Message>),
 }
 ```
 
@@ -132,13 +135,13 @@ This is a network protocol that receives messages of type [`AvailabilityDistribu
 
 ```rust
 enum AvailabilityDistributionMessage {
-	/// Distribute an availability chunk to other validators.
-	DistributeChunk(Hash, ErasureChunk),
-	/// Fetch an erasure chunk from network by candidate hash and chunk index.
-	FetchChunk(Hash, u32),
-	/// Event from the network.
-	/// An update on network state from the network bridge.
-	NetworkBridgeUpdateV1(NetworkBridgeEvent<AvailabilityDistributionV1Message>),
+    /// Distribute an availability chunk to other validators.
+    DistributeChunk(Hash, ErasureChunk),
+    /// Fetch an erasure chunk from network by candidate hash and chunk index.
+    FetchChunk(Hash, u32),
+    /// Event from the network.
+    /// An update on network state from the network bridge.
+    NetworkBridgeUpdateV1(NetworkBridgeEvent<AvailabilityDistributionV1Message>),
 }
 ```
 
@@ -148,16 +151,16 @@ Messages received by the availability recovery subsystem.
 
 ```rust
 enum RecoveryError {
-	Invalid,
-	Unavailable,
+    Invalid,
+    Unavailable,
 }
 enum AvailabilityRecoveryMessage {
-	/// Recover available data from validators on the network.
-	RecoverAvailableData(
-		CandidateReceipt,
-		SessionIndex,
-		ResponseChannel<Result<AvailableData, RecoveryError>>,
-	),
+    /// Recover available data from validators on the network.
+    RecoverAvailableData(
+        CandidateReceipt,
+        SessionIndex,
+        ResponseChannel<Result<AvailableData, RecoveryError>>,
+    ),
 }
 ```
 
@@ -167,19 +170,19 @@ Messages to and from the availability store.
 
 ```rust
 enum AvailabilityStoreMessage {
-	/// Query the `AvailableData` of a candidate by hash.
-	QueryAvailableData(CandidateHash, ResponseChannel<Option<AvailableData>>),
-	/// Query whether an `AvailableData` exists within the AV Store.
-	QueryDataAvailability(CandidateHash, ResponseChannel<bool>),
-	/// Query a specific availability chunk of the candidate's erasure-coding by validator index.
-	/// Returns the chunk and its inclusion proof against the candidate's erasure-root.
-	QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel<Option<ErasureChunk>>),
-	/// Store a specific chunk of the candidate's erasure-coding, with an
-	/// accompanying proof.
-	StoreChunk(CandidateHash, ErasureChunk, ResponseChannel<Result<()>>),
-	/// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's
-	/// `ErasureChunk`.
-	StoreAvailableData(CandidateHash, Option<ValidatorIndex>, u32, AvailableData, ResponseChannel<Result<()>>),
+    /// Query the `AvailableData` of a candidate by hash.
+    QueryAvailableData(CandidateHash, ResponseChannel<Option<AvailableData>>),
+    /// Query whether an `AvailableData` exists within the AV Store.
+    QueryDataAvailability(CandidateHash, ResponseChannel<bool>),
+    /// Query a specific availability chunk of the candidate's erasure-coding by validator index.
+    /// Returns the chunk and its inclusion proof against the candidate's erasure-root.
+    QueryChunk(CandidateHash, ValidatorIndex, ResponseChannel<Option<AvailabilityChunkAndProof>>),
+    /// Store a specific chunk of the candidate's erasure-coding by validator index, with an
+    /// accompanying proof.
+    StoreChunk(CandidateHash, ValidatorIndex, AvailabilityChunkAndProof, ResponseChannel<Result<()>>),
+    /// Store `AvailableData`. If `ValidatorIndex` is provided, also store this validator's
+    /// `AvailabilityChunkAndProof`.
+    StoreAvailableData(CandidateHash, Option<ValidatorIndex>, u32, AvailableData, ResponseChannel<Result<()>>),
 }
 ```
 
@@ -190,11 +193,11 @@ This is a network protocol that receives messages of type [`BitfieldDistribution
 
 ```rust
 enum BitfieldDistributionMessage {
-	/// Distribute a bitfield signed by a validator to other validators.
-	/// The bitfield distribution subsystem will assume this is indeed correctly signed.
-	DistributeBitfield(relay_parent, SignedAvailabilityBitfield),
-	/// Receive a network bridge update.
-	NetworkBridgeUpdateV1(NetworkBridgeEvent<BitfieldDistributionV1Message>),
+    /// Distribute a bitfield signed by a validator to other validators.
+    /// The bitfield distribution subsystem will assume this is indeed correctly signed.
+    DistributeBitfield(relay_parent, SignedAvailabilityBitfield),
+    /// Receive a network bridge update.
+    NetworkBridgeUpdateV1(NetworkBridgeEvent<BitfieldDistributionV1Message>),
 }
 ```
 
@@ -243,31 +246,31 @@ The Chain API subsystem is responsible for providing an interface to chain data.
 
 ```rust
 enum ChainApiMessage {
-	/// Get the block number by hash.
-	/// Returns `None` if a block with the given hash is not present in the db.
-	BlockNumber(Hash, ResponseChannel<Result<Option<BlockNumber>, Error>>),
-	/// Request the block header by hash.
-	/// Returns `None` if a block with the given hash is not present in the db.
-	BlockHeader(Hash, ResponseChannel<Result<Option<BlockHeader>, Error>>),
-	/// Get the finalized block hash by number.
-	/// Returns `None` if a block with the given number is not present in the db.
-	/// Note: the caller must ensure the block is finalized.
-	FinalizedBlockHash(BlockNumber, ResponseChannel<Result<Option<Hash>, Error>>),
-	/// Get the last finalized block number.
-	/// This request always succeeds.
-	FinalizedBlockNumber(ResponseChannel<Result<BlockNumber, Error>>),
-	/// Request the `k` ancestors block hashes of a block with the given hash.
-	/// The response channel may return a `Vec` of size up to `k`
-	/// filled with ancestors hashes with the following order:
-	/// `parent`, `grandparent`, ...
-	Ancestors {
-		/// The hash of the block in question.
-		hash: Hash,
-		/// The number of ancestors to request.
-		k: usize,
-		/// The response channel.
-		response_channel: ResponseChannel<Result<Vec<Hash>, Error>>,
-	}
+    /// Get the block number by hash.
+    /// Returns `None` if a block with the given hash is not present in the db.
+    BlockNumber(Hash, ResponseChannel<Result<Option<BlockNumber>, Error>>),
+    /// Request the block header by hash.
+    /// Returns `None` if a block with the given hash is not present in the db.
+    BlockHeader(Hash, ResponseChannel<Result<Option<BlockHeader>, Error>>),
+    /// Get the finalized block hash by number.
+    /// Returns `None` if a block with the given number is not present in the db.
+    /// Note: the caller must ensure the block is finalized.
+    FinalizedBlockHash(BlockNumber, ResponseChannel<Result<Option<Hash>, Error>>),
+    /// Get the last finalized block number.
+    /// This request always succeeds.
+    FinalizedBlockNumber(ResponseChannel<Result<BlockNumber, Error>>),
+    /// Request the `k` ancestors block hashes of a block with the given hash.
+    /// The response channel may return a `Vec` of size up to `k`
+    /// filled with ancestors hashes with the following order:
+    /// `parent`, `grandparent`, ...
+    Ancestors {
+        /// The hash of the block in question.
+        hash: Hash,
+        /// The number of ancestors to request.
+        k: usize,
+        /// The response channel.
+        response_channel: ResponseChannel<Result<Vec<Hash>, Error>>,
+    }
 }
 ```
 
@@ -279,22 +282,22 @@ This is a network protocol that receives messages of type [`CollatorProtocolV1Me
 
 ```rust
 enum CollatorProtocolMessage {
-	/// Signal to the collator protocol that it should connect to validators with the expectation
-	/// of collating on the given para. This is only expected to be called once, early on, if at all,
-	/// and only by the Collation Generation subsystem. As such, it will overwrite the value of
-	/// the previous signal.
-	///
-	/// This should be sent before any `DistributeCollation` message.
-	CollateOn(ParaId),
-	/// Provide a collation to distribute to validators.
-	DistributeCollation(CandidateReceipt, PoV),
-	/// Fetch a collation under the given relay-parent for the given ParaId.
-	FetchCollation(Hash, ParaId, ResponseChannel<(CandidateReceipt, PoV)>),
-	/// Report a collator as having provided an invalid collation. This should lead to disconnect
-	/// and blacklist of the collator.
-	ReportCollator(CollatorId),
-	/// Note a collator as having provided a good collation.
-	NoteGoodCollation(CollatorId),
+    /// Signal to the collator protocol that it should connect to validators with the expectation
+    /// of collating on the given para. This is only expected to be called once, early on, if at all,
+    /// and only by the Collation Generation subsystem. As such, it will overwrite the value of
+    /// the previous signal.
+    ///
+    /// This should be sent before any `DistributeCollation` message.
+    CollateOn(ParaId),
+    /// Provide a collation to distribute to validators.
+    DistributeCollation(CandidateReceipt, PoV),
+    /// Fetch a collation under the given relay-parent for the given ParaId.
+    FetchCollation(Hash, ParaId, ResponseChannel<(CandidateReceipt, PoV)>),
+    /// Report a collator as having provided an invalid collation. This should lead to disconnect
+    /// and blacklist of the collator.
+    ReportCollator(CollatorId),
+    /// Note a collator as having provided a good collation.
+    NoteGoodCollation(CollatorId),
 }
 ```
 
@@ -306,79 +309,118 @@ to the low-level networking code.
 ```rust
 /// Peer-sets handled by the network bridge.
 enum PeerSet {
-	/// The collation peer-set is used to distribute collations from collators to validators.
-	Collation,
-	/// The validation peer-set is used to distribute information relevant to parachain
-	/// validation among validators. This may include nodes which are not validators,
-	/// as some protocols on this peer-set are expected to be gossip.
-	Validation,
+    /// The collation peer-set is used to distribute collations from collators to validators.
+    Collation,
+    /// The validation peer-set is used to distribute information relevant to parachain
+    /// validation among validators. This may include nodes which are not validators,
+    /// as some protocols on this peer-set are expected to be gossip.
+    Validation,
 }
 
 enum NetworkBridgeMessage {
-	/// Report a cost or benefit of a peer. Negative values are costs, positive are benefits.
-	ReportPeer(PeerSet, PeerId, cost_benefit: i32),
-	/// Send a message to one or more peers on the validation peerset.
-	SendValidationMessage([PeerId], ValidationProtocolV1),
-	/// Send a message to one or more peers on the collation peerset.
-	SendCollationMessage([PeerId], CollationProtocolV1),
-	/// Send multiple validation messages.
-	SendValidationMessages([([PeerId, ValidationProtocolV1])]),
-	/// Send multiple collation messages.
-	SendCollationMessages([([PeerId, ValidationProtocolV1])]),
-	/// Connect to peers who represent the given `validator_ids`.
-	///
-	/// Also ask the network to stay connected to these peers at least
-	/// until the request is revoked.
-	/// This can be done by dropping the receiver.
-	ConnectToValidators {
-		/// Ids of the validators to connect to.
-		validator_ids: Vec<AuthorityDiscoveryId>,
-		/// Response sender by which the issuer can learn the `PeerId`s of
-		/// the validators as they are connected.
-		/// The response is sent immediately for already connected peers.
-		connected: ResponseStream<(AuthorityDiscoveryId, PeerId)>,
-	},
+    /// Report a cost or benefit of a peer. Negative values are costs, positive are benefits.
+    ReportPeer(PeerSet, PeerId, cost_benefit: i32),
+    /// Send a message to one or more peers on the validation peerset.
+    SendValidationMessage([PeerId], ValidationProtocolV1),
+    /// Send a message to one or more peers on the collation peerset.
+    SendCollationMessage([PeerId], ValidationProtocolV1),
+    /// Send multiple validation messages.
+    SendValidationMessages([([PeerId, ValidationProtocolV1])]),
+    /// Send multiple collation messages.
+    SendCollationMessages([([PeerId, ValidationProtocolV1])]),
+    /// Connect to peers who represent the given `validator_ids`.
+    ///
+    /// Also ask the network to stay connected to these peers at least
+    /// until the request is revoked.
+    /// This can be done by dropping the receiver.
+    ConnectToValidators {
+        /// Ids of the validators to connect to.
+        validator_ids: Vec<AuthorityDiscoveryId>,
+        /// Response sender by which the issuer can learn the `PeerId`s of
+        /// the validators as they are connected.
+        /// The response is sent immediately for already connected peers.
+        connected: ResponseStream<(AuthorityDiscoveryId, PeerId)>,
+    },
 }
 ```
 
 ## Misbehavior Report
 
 ```rust
-enum MisbehaviorReport {
-  /// These validator nodes disagree on this candidate's validity, please figure it out
-  ///
-  /// Most likely, the list of statments all agree except for the final one. That's not
-  /// guaranteed, though; if somehow we become aware of lots of
-  /// statements disagreeing about the validity of a candidate before taking action,
-  /// this message should be dispatched with all of them, in arbitrary order.
-  ///
-  /// This variant is also used when our own validity checks disagree with others'.
-  CandidateValidityDisagreement(CandidateReceipt, Vec<SignedFullStatement>),
-  /// I've noticed a peer contradicting itself about a particular candidate
-  SelfContradiction(CandidateReceipt, SignedFullStatement, SignedFullStatement),
-  /// This peer has seconded more than one parachain candidate for this relay parent head
-  DoubleVote(CandidateReceipt, SignedFullStatement, SignedFullStatement),
+pub type Misbehavior = generic::Misbehavior<
+    CommittedCandidateReceipt,
+    CandidateHash,
+    ValidatorIndex,
+    ValidatorSignature,
+>;
+
+mod generic {
+    /// Misbehavior: voting more than one way on candidate validity.
+    ///
+    /// Since there are three possible ways to vote, a double vote is possible in
+    /// three possible combinations (unordered)
+    pub enum ValidityDoubleVote<Candidate, Digest, Signature> {
+        /// Implicit vote by issuing and explicitly voting validity.
+        IssuedAndValidity((Candidate, Signature), (Digest, Signature)),
+        /// Implicit vote by issuing and explicitly voting invalidity
+        IssuedAndInvalidity((Candidate, Signature), (Digest, Signature)),
+        /// Direct votes for validity and invalidity
+        ValidityAndInvalidity(Candidate, Signature, Signature),
+    }
+
+    /// Misbehavior: multiple signatures on same statement.
+    pub enum DoubleSign<Candidate, Digest, Signature> {
+        /// On candidate.
+        Candidate(Candidate, Signature, Signature),
+        /// On validity.
+        Validity(Digest, Signature, Signature),
+        /// On invalidity.
+        Invalidity(Digest, Signature, Signature),
+    }
+
+    /// Misbehavior: declaring multiple candidates.
+    pub struct MultipleCandidates<Candidate, Signature> {
+        /// The first candidate seen.
+        pub first: (Candidate, Signature),
+        /// The second candidate seen.
+        pub second: (Candidate, Signature),
+    }
+
+    /// Misbehavior: submitted statement for wrong group.
+    pub struct UnauthorizedStatement<Candidate, Digest, AuthorityId, Signature> {
+        /// A signed statement which was submitted without proper authority.
+        pub statement: SignedStatement<Candidate, Digest, AuthorityId, Signature>,
+    }
+
+    pub enum Misbehavior<Candidate, Digest, AuthorityId, Signature> {
+        /// Voted invalid and valid on validity.
+        ValidityDoubleVote(ValidityDoubleVote<Candidate, Digest, Signature>),
+        /// Submitted multiple candidates.
+        MultipleCandidates(MultipleCandidates<Candidate, Signature>),
+        /// Submitted a message that was unauthorized.
+        UnauthorizedStatement(UnauthorizedStatement<Candidate, Digest, AuthorityId, Signature>),
+        /// Submitted two valid signatures for the same message.
+        DoubleSign(DoubleSign<Candidate, Digest, Signature>),
+    }
 }
 ```
 
-If this subsystem chooses to second a parachain block, it dispatches a `CandidateBackingSubsystemMessage`.
-
 ## PoV Distribution Message
 
 This is a network protocol that receives messages of type [`PoVDistributionV1Message`][PoVDistributionV1NetworkMessage].
 
 ```rust
 enum PoVDistributionMessage {
-	/// Fetch a PoV from the network.
-	///
-	/// This `CandidateDescriptor` should correspond to a candidate seconded under the provided
-	/// relay-parent hash.
-	FetchPoV(Hash, CandidateDescriptor, ResponseChannel<PoV>),
-	/// Distribute a PoV for the given relay-parent and CandidateDescriptor.
-	/// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor
-	DistributePoV(Hash, CandidateDescriptor, PoV),
-	/// An update from the network bridge.
-	NetworkBridgeUpdateV1(NetworkBridgeEvent<PoVDistributionV1Message>),
+    /// Fetch a PoV from the network.
+    ///
+    /// This `CandidateDescriptor` should correspond to a candidate seconded under the provided
+    /// relay-parent hash.
+    FetchPoV(Hash, CandidateDescriptor, ResponseChannel<PoV>),
+    /// Distribute a PoV for the given relay-parent and CandidateDescriptor.
+    /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor
+    DistributePoV(Hash, CandidateDescriptor, PoV),
+    /// An update from the network bridge.
+    NetworkBridgeUpdateV1(NetworkBridgeEvent<PoVDistributionV1Message>),
 }
 ```
 
@@ -430,48 +472,48 @@ This is fueled by an auxiliary type encapsulating all request types defined in t
 
 ```rust
 enum RuntimeApiRequest {
-	/// Get the current validator set.
-	Validators(ResponseChannel<Vec<ValidatorId>>),
-	/// Get the validator groups and rotation info.
-	ValidatorGroups(ResponseChannel<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>),
-	/// Get information about all availability cores.
-	AvailabilityCores(ResponseChannel<Vec<CoreState>>),
-	/// with the given occupied core assumption.
-	PersistedValidationData(
-		ParaId,
-		OccupiedCoreAssumption,
-		ResponseChannel<Option<PersistedValidationData>>,
-	),
-	/// Sends back `true` if the commitments pass all acceptance criteria checks.
-	CheckValidationOutputs(
-		ParaId,
-		CandidateCommitments,
-		RuntimeApiSender<bool>,
-	),
-	/// Get the session index for children of the block. This can be used to construct a signing
-	/// context.
-	SessionIndexForChild(ResponseChannel<SessionIndex>),
-	/// Get the validation code for a specific para, using the given occupied core assumption.
-	ValidationCode(ParaId, OccupiedCoreAssumption, ResponseChannel<Option<ValidationCode>>),
-	/// Fetch the historical validation code used by a para for candidates executed in
-	/// the context of a given block height in the current chain.
-	HistoricalValidationCode(ParaId, BlockNumber, ResponseChannel<Option<ValidationCode>>),
-	/// Get a committed candidate receipt for all candidates pending availability.
-	CandidatePendingAvailability(ParaId, ResponseChannel<Option<CommittedCandidateReceipt>>),
-	/// Get all events concerning candidates in the last block.
-	CandidateEvents(ResponseChannel<Vec<CandidateEvent>>),
-	/// Get the session info for the given session, if stored.
-	SessionInfo(SessionIndex, ResponseChannel<Option<SessionInfo>>),
-	/// Get all the pending inbound messages in the downward message queue for a para.
-	DmqContents(ParaId, ResponseChannel<Vec<InboundDownwardMessage<BlockNumber>>>),
-	/// Get the contents of all channels addressed to the given recipient. Channels that have no
-	/// messages in them are also included.
-	InboundHrmpChannelsContents(ParaId, ResponseChannel<BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>>>),
+    /// Get the current validator set.
+    Validators(ResponseChannel<Vec<ValidatorId>>),
+    /// Get the validator groups and rotation info.
+    ValidatorGroups(ResponseChannel<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>),
+    /// Get information about all availability cores.
+    AvailabilityCores(ResponseChannel<Vec<CoreState>>),
+    /// with the given occupied core assumption.
+    PersistedValidationData(
+        ParaId,
+        OccupiedCoreAssumption,
+        ResponseChannel<Option<PersistedValidationData>>,
+    ),
+    /// Sends back `true` if the commitments pass all acceptance criteria checks.
+    CheckValidationOutputs(
+        ParaId,
+        CandidateCommitments,
+        RuntimeApiSender<bool>,
+    ),
+    /// Get the session index for children of the block. This can be used to construct a signing
+    /// context.
+    SessionIndexForChild(ResponseChannel<SessionIndex>),
+    /// Get the validation code for a specific para, using the given occupied core assumption.
+    ValidationCode(ParaId, OccupiedCoreAssumption, ResponseChannel<Option<ValidationCode>>),
+    /// Fetch the historical validation code used by a para for candidates executed in
+    /// the context of a given block height in the current chain.
+    HistoricalValidationCode(ParaId, BlockNumber, ResponseChannel<Option<ValidationCode>>),
+    /// Get a committed candidate receipt for all candidates pending availability.
+    CandidatePendingAvailability(ParaId, ResponseChannel<Option<CommittedCandidateReceipt>>),
+    /// Get all events concerning candidates in the last block.
+    CandidateEvents(ResponseChannel<Vec<CandidateEvent>>),
+    /// Get the session info for the given session, if stored.
+    SessionInfo(SessionIndex, ResponseChannel<Option<SessionInfo>>),
+    /// Get all the pending inbound messages in the downward message queue for a para.
+    DmqContents(ParaId, ResponseChannel<Vec<InboundDownwardMessage<BlockNumber>>>),
+    /// Get the contents of all channels addressed to the given recipient. Channels that have no
+    /// messages in them are also included.
+    InboundHrmpChannelsContents(ParaId, ResponseChannel<BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumber>>>>),
 }
 
 enum RuntimeApiMessage {
-	/// Make a request of the runtime API against the post-state of the given relay-parent.
-	Request(Hash, RuntimeApiRequest),
+    /// Make a request of the runtime API against the post-state of the given relay-parent.
+    Request(Hash, RuntimeApiRequest),
 }
 ```
 
@@ -484,16 +526,16 @@ This is a network protocol that receives messages of type [`StatementDistributio
 
 ```rust
 enum StatementDistributionMessage {
-	/// An update from the network bridge.
-	NetworkBridgeUpdateV1(NetworkBridgeEvent<StatementDistributionV1Message>),
-	/// We have validated a candidate and want to share our judgment with our peers.
-	/// The hash is the relay parent.
-	///
-	/// The statement distribution subsystem assumes that the statement should be correctly
-	/// signed.
-	Share(Hash, SignedFullStatement),
-	/// Register a listener to be notified on any new statements.
-	RegisterStatementListener(ResponseChannel<SignedFullStatement>),
+    /// An update from the network bridge.
+    NetworkBridgeUpdateV1(NetworkBridgeEvent<StatementDistributionV1Message>),
+    /// We have validated a candidate and want to share our judgment with our peers.
+    /// The hash is the relay parent.
+    ///
+    /// The statement distribution subsystem assumes that the statement should be correctly
+    /// signed.
+    Share(Hash, SignedFullStatement),
+    /// Register a listener to be notified on any new statements.
+    RegisterStatementListener(ResponseChannel<SignedFullStatement>),
 }
 ```
 
@@ -505,11 +547,11 @@ Various modules request that the [Candidate Validation subsystem](../node/utilit
 
 /// Result of the validation of the candidate.
 enum ValidationResult {
-	/// Candidate is valid, and here are the outputs and the validation data used to form inputs.
-	/// In practice, this should be a shared type so that validation caching can be done.
-	Valid(CandidateCommitments, PersistedValidationData),
-	/// Candidate is invalid.
-	Invalid,
+    /// Candidate is valid, and here are the outputs and the validation data used to form inputs.
+    /// In practice, this should be a shared type so that validation caching can be done.
+    Valid(CandidateCommitments, PersistedValidationData),
+    /// Candidate is invalid.
+    Invalid,
 }
 
 /// Messages received by the Validation subsystem.
@@ -521,37 +563,37 @@ enum ValidationResult {
 /// or `Ok(ValidationResult::Invalid)`.
 #[derive(Debug)]
 pub enum CandidateValidationMessage {
-	/// Validate a candidate with provided parameters using relay-chain state.
-	///
-	/// This will implicitly attempt to gather the `PersistedValidationData` and `ValidationCode`
-	/// from the runtime API of the chain, based on the `relay_parent`
-	/// of the `CandidateDescriptor`.
-	///
-	/// This will also perform checking of validation outputs against the acceptance criteria.
-	///
-	/// If there is no state available which can provide this data or the core for
-	/// the para is not free at the relay-parent, an error is returned.
-	ValidateFromChainState(
-		CandidateDescriptor,
-		Arc<PoV>,
-		oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
-	),
-	/// Validate a candidate with provided, exhaustive parameters for validation.
-	///
-	/// Explicitly provide the `PersistedValidationData` and `ValidationCode` so this can do full
-	/// validation without needing to access the state of the relay-chain.
-	///
-	/// This request doesn't involve acceptance criteria checking, therefore only useful for the
-	/// cases where the validity of the candidate is established. This is the case for the typical
-	/// use-case: secondary checkers would use this request relying on the full prior checks
-	/// performed by the relay-chain.
-	ValidateFromExhaustive(
-		PersistedValidationData,
-		ValidationCode,
-		CandidateDescriptor,
-		Arc<PoV>,
-		oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
-	),
+    /// Validate a candidate with provided parameters using relay-chain state.
+    ///
+    /// This will implicitly attempt to gather the `PersistedValidationData` and `ValidationCode`
+    /// from the runtime API of the chain, based on the `relay_parent`
+    /// of the `CandidateDescriptor`.
+    ///
+    /// This will also perform checking of validation outputs against the acceptance criteria.
+    ///
+    /// If there is no state available which can provide this data or the core for
+    /// the para is not free at the relay-parent, an error is returned.
+    ValidateFromChainState(
+        CandidateDescriptor,
+        Arc<PoV>,
+        oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
+    ),
+    /// Validate a candidate with provided, exhaustive parameters for validation.
+    ///
+    /// Explicitly provide the `PersistedValidationData` and `ValidationCode` so this can do full
+    /// validation without needing to access the state of the relay-chain.
+    ///
+    /// This request doesn't involve acceptance criteria checking, therefore only useful for the
+    /// cases where the validity of the candidate is established. This is the case for the typical
+    /// use-case: secondary checkers would use this request relying on the full prior checks
+    /// performed by the relay-chain.
+    ValidateFromExhaustive(
+        PersistedValidationData,
+        ValidationCode,
+        CandidateDescriptor,
+        Arc<PoV>,
+        oneshot::Sender<Result<ValidationResult, ValidationFailed>>,
+    ),
 }
 ```
 
diff --git a/polkadot/statement-table/src/generic.rs b/polkadot/statement-table/src/generic.rs
index fe51965a232..68bc507b53b 100644
--- a/polkadot/statement-table/src/generic.rs
+++ b/polkadot/statement-table/src/generic.rs
@@ -24,7 +24,7 @@
 //! indicating whether the candidate is valid or invalid. Once a threshold of the committee
 //! has signed validity statements, the candidate may be marked includable.
 
-use std::collections::hash_map::{HashMap, Entry};
+use std::collections::hash_map::{self, Entry, HashMap};
 use std::hash::Hash;
 use std::fmt::Debug;
 
@@ -34,7 +34,7 @@ use parity_scale_codec::{Encode, Decode};
 
 /// Context for the statement table.
 pub trait Context {
-	/// A authority ID
+	/// An authority ID
 	type AuthorityId: Debug + Hash + Eq + Clone;
 	/// The digest (hash or other unique attribute) of a candidate.
 	type Digest: Debug + Hash + Eq + Clone;
@@ -61,32 +61,29 @@ pub trait Context {
 
 /// Statements circulated among peers.
 #[derive(PartialEq, Eq, Debug, Clone, Encode, Decode)]
-pub enum Statement<C, D> {
-	/// Broadcast by an authority to indicate that this is his candidate for
-	/// inclusion.
+pub enum Statement<Candidate, Digest> {
+	/// Broadcast by an authority to indicate that this is its candidate for inclusion.
 	///
 	/// Broadcasting two different candidate messages per round is not allowed.
 	#[codec(index = "1")]
-	Candidate(C),
-	/// Broadcast by a authority to attest that the candidate with given digest
-	/// is valid.
+	Candidate(Candidate),
+	/// Broadcast by a authority to attest that the candidate with given digest is valid.
 	#[codec(index = "2")]
-	Valid(D),
-	/// Broadcast by a authority to attest that the candidate with given digest
-	/// is invalid.
+	Valid(Digest),
+	/// Broadcast by a authority to attest that the candidate with given digest is invalid.
 	#[codec(index = "3")]
-	Invalid(D),
+	Invalid(Digest),
 }
 
 /// A signed statement.
 #[derive(PartialEq, Eq, Debug, Clone, Encode, Decode)]
-pub struct SignedStatement<C, D, V, S> {
+pub struct SignedStatement<Candidate, Digest, AuthorityId, Signature> {
 	/// The statement.
-	pub statement: Statement<C, D>,
+	pub statement: Statement<Candidate, Digest>,
 	/// The signature.
-	pub signature: S,
+	pub signature: Signature,
 	/// The sender.
-	pub sender: V,
+	pub sender: AuthorityId,
 }
 
 /// Misbehavior: voting more than one way on candidate validity.
@@ -94,77 +91,124 @@ pub struct SignedStatement<C, D, V, S> {
 /// Since there are three possible ways to vote, a double vote is possible in
 /// three possible combinations (unordered)
 #[derive(PartialEq, Eq, Debug, Clone)]
-pub enum ValidityDoubleVote<C, D, S> {
+pub enum ValidityDoubleVote<Candidate, Digest, Signature> {
 	/// Implicit vote by issuing and explicitly voting validity.
-	IssuedAndValidity((C, S), (D, S)),
+	IssuedAndValidity((Candidate, Signature), (Digest, Signature)),
 	/// Implicit vote by issuing and explicitly voting invalidity
-	IssuedAndInvalidity((C, S), (D, S)),
+	IssuedAndInvalidity((Candidate, Signature), (Digest, Signature)),
 	/// Direct votes for validity and invalidity
-	ValidityAndInvalidity(C, S, S),
+	ValidityAndInvalidity(Candidate, Signature, Signature),
+}
+
+impl<Candidate, Digest, Signature> ValidityDoubleVote<Candidate, Digest, Signature> {
+	/// Deconstruct this misbehavior into two `(Statement, Signature)` pairs, erasing the information
+	/// about precisely what the problem was.
+	pub fn deconstruct<Ctx>(self) -> (
+		(Statement<Candidate, Digest>, Signature),
+		(Statement<Candidate, Digest>, Signature),
+	)
+	where
+		Ctx: Context<Candidate=Candidate, Digest=Digest, Signature=Signature>,
+		Candidate: Debug + Ord + Eq + Clone,
+		Digest: Debug + Hash + Eq + Clone,
+		Signature: Debug + Eq + Clone,
+	{
+		match self {
+			Self::IssuedAndValidity((c, s1), (d, s2)) => {
+				((Statement::Candidate(c), s1), (Statement::Valid(d), s2))
+			}
+			Self::IssuedAndInvalidity((c, s1), (d, s2)) => {
+				((Statement::Candidate(c), s1), (Statement::Invalid(d), s2))
+			}
+			Self::ValidityAndInvalidity(c, s1, s2) => {
+				(
+					(Statement::Valid(Ctx::candidate_digest(&c)), s1),
+					(Statement::Invalid(Ctx::candidate_digest(&c)), s2),
+				)
+			}
+		}
+	}
 }
 
 /// Misbehavior: multiple signatures on same statement.
 #[derive(PartialEq, Eq, Debug, Clone)]
-pub enum DoubleSign<C, D, S> {
+pub enum DoubleSign<Candidate, Digest, Signature> {
 	/// On candidate.
-	Candidate(C, S, S),
+	Candidate(Candidate, Signature, Signature),
 	/// On validity.
-	Validity(D, S, S),
+	Validity(Digest, Signature, Signature),
 	/// On invalidity.
-	Invalidity(D, S, S),
+	Invalidity(Digest, Signature, Signature),
+}
+
+impl<Candidate, Digest, Signature> DoubleSign<Candidate, Digest, Signature> {
+	/// Deconstruct this misbehavior into a statement with two signatures, erasing the information about
+	/// precisely where in the process the issue was detected.
+	pub fn deconstruct(self) -> (Statement<Candidate, Digest>, Signature, Signature) {
+		match self {
+			Self::Candidate(candidate, a, b) => (Statement::Candidate(candidate), a, b),
+			Self::Validity(digest, a, b) => (Statement::Valid(digest), a, b),
+			Self::Invalidity(digest, a, b) => (Statement::Invalid(digest), a, b),
+		}
+	}
 }
 
 /// Misbehavior: declaring multiple candidates.
 #[derive(PartialEq, Eq, Debug, Clone)]
-pub struct MultipleCandidates<C, S> {
+pub struct MultipleCandidates<Candidate, Signature> {
 	/// The first candidate seen.
-	pub first: (C, S),
+	pub first: (Candidate, Signature),
 	/// The second candidate seen.
-	pub second: (C, S),
+	pub second: (Candidate, Signature),
 }
 
 /// Misbehavior: submitted statement for wrong group.
 #[derive(PartialEq, Eq, Debug, Clone)]
-pub struct UnauthorizedStatement<C, D, V, S> {
+pub struct UnauthorizedStatement<Candidate, Digest, AuthorityId, Signature> {
 	/// A signed statement which was submitted without proper authority.
-	pub statement: SignedStatement<C, D, V, S>,
+	pub statement: SignedStatement<Candidate, Digest, AuthorityId, Signature>,
 }
 
 /// Different kinds of misbehavior. All of these kinds of malicious misbehavior
 /// are easily provable and extremely disincentivized.
 #[derive(PartialEq, Eq, Debug, Clone)]
-pub enum Misbehavior<C, D, V, S> {
+pub enum Misbehavior<Candidate, Digest, AuthorityId, Signature> {
 	/// Voted invalid and valid on validity.
-	ValidityDoubleVote(ValidityDoubleVote<C, D, S>),
+	ValidityDoubleVote(ValidityDoubleVote<Candidate, Digest, Signature>),
 	/// Submitted multiple candidates.
-	MultipleCandidates(MultipleCandidates<C, S>),
+	MultipleCandidates(MultipleCandidates<Candidate, Signature>),
 	/// Submitted a message that was unauthorized.
-	UnauthorizedStatement(UnauthorizedStatement<C, D, V, S>),
+	UnauthorizedStatement(UnauthorizedStatement<Candidate, Digest, AuthorityId, Signature>),
 	/// Submitted two valid signatures for the same message.
-	DoubleSign(DoubleSign<C, D, S>),
+	DoubleSign(DoubleSign<Candidate, Digest, Signature>),
 }
 
 /// Type alias for misbehavior corresponding to context type.
-pub type MisbehaviorFor<C> = Misbehavior<<C as Context>::Candidate, <C as Context>::Digest, <C as Context>::AuthorityId, <C as Context>::Signature>;
+pub type MisbehaviorFor<Ctx> = Misbehavior<
+	<Ctx as Context>::Candidate,
+	<Ctx as Context>::Digest,
+	<Ctx as Context>::AuthorityId,
+	<Ctx as Context>::Signature,
+>;
 
 // kinds of votes for validity
 #[derive(Clone, PartialEq, Eq)]
-enum ValidityVote<S: Eq + Clone> {
+enum ValidityVote<Signature: Eq + Clone> {
 	// implicit validity vote by issuing
-	Issued(S),
+	Issued(Signature),
 	// direct validity vote
-	Valid(S),
+	Valid(Signature),
 	// direct invalidity vote
-	Invalid(S),
+	Invalid(Signature),
 }
 
 /// A summary of import of a statement.
 #[derive(Clone, PartialEq, Eq, Debug)]
-pub struct Summary<D, G> {
+pub struct Summary<Digest, Group> {
 	/// The digest of the candidate referenced.
-	pub candidate: D,
+	pub candidate: Digest,
 	/// The group that the candidate is in.
-	pub group_id: G,
+	pub group_id: Group,
 	/// How many validity votes are currently witnessed.
 	pub validity_votes: usize,
 	/// Whether this has been signalled bad by at least one participant.
@@ -173,13 +217,13 @@ pub struct Summary<D, G> {
 
 /// A validity attestation.
 #[derive(Clone, PartialEq, Decode, Encode)]
-pub enum ValidityAttestation<S> {
+pub enum ValidityAttestation<Signature> {
 	/// implicit validity attestation by issuing.
 	/// This corresponds to issuance of a `Candidate` statement.
-	Implicit(S),
+	Implicit(Signature),
 	/// An explicit attestation. This corresponds to issuance of a
 	/// `Valid` statement.
-	Explicit(S),
+	Explicit(Signature),
 }
 
 impl Into<PrimitiveValidityAttestation> for ValidityAttestation<ValidatorSignature> {
@@ -203,14 +247,14 @@ pub struct AttestedCandidate<Group, Candidate, AuthorityId, Signature> {
 }
 
 /// Stores votes and data about a candidate.
-pub struct CandidateData<C: Context> {
-	group_id: C::GroupId,
-	candidate: C::Candidate,
-	validity_votes: HashMap<C::AuthorityId, ValidityVote<C::Signature>>,
-	indicated_bad_by: Vec<C::AuthorityId>,
+pub struct CandidateData<Ctx: Context> {
+	group_id: Ctx::GroupId,
+	candidate: Ctx::Candidate,
+	validity_votes: HashMap<Ctx::AuthorityId, ValidityVote<Ctx::Signature>>,
+	indicated_bad_by: Vec<Ctx::AuthorityId>,
 }
 
-impl<C: Context> CandidateData<C> {
+impl<Ctx: Context> CandidateData<Ctx> {
 	/// whether this has been indicated bad by anyone.
 	pub fn indicated_bad(&self) -> bool {
 		!self.indicated_bad_by.is_empty()
@@ -220,7 +264,7 @@ impl<C: Context> CandidateData<C> {
 	/// If the candidate can be included, it will return `Some`.
 	pub fn attested(&self, validity_threshold: usize)
 		-> Option<AttestedCandidate<
-			C::GroupId, C::Candidate, C::AuthorityId, C::Signature,
+			Ctx::GroupId, Ctx::Candidate, Ctx::AuthorityId, Ctx::Signature,
 		>>
 	{
 		if self.can_be_included(validity_threshold) {
@@ -259,7 +303,7 @@ impl<C: Context> CandidateData<C> {
 		self.validity_votes.len() >= validity_threshold
 	}
 
-	fn summary(&self, digest: C::Digest) -> Summary<C::Digest, C::GroupId> {
+	fn summary(&self, digest: Ctx::Digest) -> Summary<Ctx::Digest, Ctx::GroupId> {
 		Summary {
 			candidate: digest,
 			group_id: self.group_id.clone(),
@@ -270,11 +314,11 @@ impl<C: Context> CandidateData<C> {
 }
 
 // authority metadata
-struct AuthorityData<C: Context> {
-	proposal: Option<(C::Digest, C::Signature)>,
+struct AuthorityData<Ctx: Context> {
+	proposal: Option<(Ctx::Digest, Ctx::Signature)>,
 }
 
-impl<C: Context> Default for AuthorityData<C> {
+impl<Ctx: Context> Default for AuthorityData<Ctx> {
 	fn default() -> Self {
 		AuthorityData {
 			proposal: None,
@@ -283,20 +327,20 @@ impl<C: Context> Default for AuthorityData<C> {
 }
 
 /// Type alias for the result of a statement import.
-pub type ImportResult<C> = Result<
-	Option<Summary<<C as Context>::Digest, <C as Context>::GroupId>>,
-	MisbehaviorFor<C>
+pub type ImportResult<Ctx> = Result<
+	Option<Summary<<Ctx as Context>::Digest, <Ctx as Context>::GroupId>>,
+	MisbehaviorFor<Ctx>
 >;
 
 /// Stores votes
-pub struct Table<C: Context> {
-	authority_data: HashMap<C::AuthorityId, AuthorityData<C>>,
-	detected_misbehavior: HashMap<C::AuthorityId, MisbehaviorFor<C>>,
-	candidate_votes: HashMap<C::Digest, CandidateData<C>>,
-	includable_count: HashMap<C::GroupId, usize>,
+pub struct Table<Ctx: Context> {
+	authority_data: HashMap<Ctx::AuthorityId, AuthorityData<Ctx>>,
+	detected_misbehavior: HashMap<Ctx::AuthorityId, Vec<MisbehaviorFor<Ctx>>>,
+	candidate_votes: HashMap<Ctx::Digest, CandidateData<Ctx>>,
+	includable_count: HashMap<Ctx::GroupId, usize>,
 }
 
-impl<C: Context> Default for Table<C> {
+impl<Ctx: Context> Default for Table<Ctx> {
 	fn default() -> Self {
 		Table {
 			authority_data: HashMap::new(),
@@ -307,15 +351,15 @@ impl<C: Context> Default for Table<C> {
 	}
 }
 
-impl<C: Context> Table<C> {
+impl<Ctx: Context> Table<Ctx> {
 	/// Produce a set of proposed candidates.
 	///
 	/// This will be at most one per group, consisting of the
 	/// best candidate for each group with requisite votes for inclusion.
 	///
 	/// The vector is sorted in ascending order by group id.
-	pub fn proposed_candidates(&self, context: &C) -> Vec<AttestedCandidate<
-		C::GroupId, C::Candidate, C::AuthorityId, C::Signature,
+	pub fn proposed_candidates(&self, context: &Ctx) -> Vec<AttestedCandidate<
+		Ctx::GroupId, Ctx::Candidate, Ctx::AuthorityId, Ctx::Signature,
 	>> {
 		use std::collections::BTreeMap;
 		use std::collections::btree_map::Entry as BTreeEntry;
@@ -354,7 +398,7 @@ impl<C: Context> Table<C> {
 	}
 
 	/// Whether a candidate can be included.
-	pub fn candidate_includable(&self, digest: &C::Digest, context: &C) -> bool {
+	pub fn candidate_includable(&self, digest: &Ctx::Digest, context: &Ctx) -> bool {
 		self.candidate_votes.get(digest).map_or(false, |data| {
 			let v_threshold = context.requisite_votes(&data.group_id);
 			data.can_be_included(v_threshold)
@@ -364,9 +408,9 @@ impl<C: Context> Table<C> {
 	/// Get the attested candidate for `digest`.
 	///
 	/// Returns `Some(_)` if the candidate exists and is includable.
-	pub fn attested_candidate(&self, digest: &C::Digest, context: &C)
+	pub fn attested_candidate(&self, digest: &Ctx::Digest, context: &Ctx)
 		-> Option<AttestedCandidate<
-			C::GroupId, C::Candidate, C::AuthorityId, C::Signature,
+			Ctx::GroupId, Ctx::Candidate, Ctx::AuthorityId, Ctx::Signature,
 		>>
 	{
 		self.candidate_votes.get(digest).and_then(|data| {
@@ -384,9 +428,9 @@ impl<C: Context> Table<C> {
 	/// If this returns `None`, the statement was either duplicate or invalid.
 	pub fn import_statement(
 		&mut self,
-		context: &C,
-		statement: SignedStatement<C::Candidate, C::Digest, C::AuthorityId, C::Signature>,
-	) -> Option<Summary<C::Digest, C::GroupId>> {
+		context: &Ctx,
+		statement: SignedStatement<Ctx::Candidate, Ctx::Digest, Ctx::AuthorityId, Ctx::Signature>,
+	) -> Option<Summary<Ctx::Digest, Ctx::GroupId>> {
 		let SignedStatement { statement, signature, sender: signer } = statement;
 
 		let res = match statement {
@@ -414,25 +458,34 @@ impl<C: Context> Table<C> {
 			Ok(maybe_summary) => maybe_summary,
 			Err(misbehavior) => {
 				// all misbehavior in agreement is provable and actively malicious.
-				// punishments are not cumulative.
-				self.detected_misbehavior.insert(signer, misbehavior);
+				// punishments may be cumulative.
+				self.detected_misbehavior.entry(signer).or_default().push(misbehavior);
 				None
 			}
 		}
 	}
 
 	/// Get a candidate by digest.
-	pub fn get_candidate(&self, digest: &C::Digest) -> Option<&C::Candidate> {
+	pub fn get_candidate(&self, digest: &Ctx::Digest) -> Option<&Ctx::Candidate> {
 		self.candidate_votes.get(digest).map(|d| &d.candidate)
 	}
 
 	/// Access all witnessed misbehavior.
 	pub fn get_misbehavior(&self)
-		-> &HashMap<C::AuthorityId, MisbehaviorFor<C>>
+		-> &HashMap<Ctx::AuthorityId, Vec<MisbehaviorFor<Ctx>>>
 	{
 		&self.detected_misbehavior
 	}
 
+	/// Create a draining iterator of misbehaviors.
+	///
+	/// This consumes all detected misbehaviors, even if the iterator is not completely consumed.
+	pub fn drain_misbehaviors(&mut self) -> DrainMisbehaviors<'_, Ctx> {
+		self.detected_misbehavior
+			.drain()
+			.into()
+	}
+
 	/// Get the current number of parachains with includable candidates.
 	pub fn includable_count(&self) -> usize {
 		self.includable_count.len()
@@ -440,26 +493,26 @@ impl<C: Context> Table<C> {
 
 	fn import_candidate(
 		&mut self,
-		context: &C,
-		from: C::AuthorityId,
-		candidate: C::Candidate,
-		signature: C::Signature,
-	) -> ImportResult<C> {
-		let group = C::candidate_group(&candidate);
-		if !context.is_member_of(&from, &group) {
+		context: &Ctx,
+		authority: Ctx::AuthorityId,
+		candidate: Ctx::Candidate,
+		signature: Ctx::Signature,
+	) -> ImportResult<Ctx> {
+		let group = Ctx::candidate_group(&candidate);
+		if !context.is_member_of(&authority, &group) {
 			return Err(Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
 				statement: SignedStatement {
 					signature,
 					statement: Statement::Candidate(candidate),
-					sender: from,
+					sender: authority,
 				},
 			}));
 		}
 
 		// check that authority hasn't already specified another candidate.
-		let digest = C::candidate_digest(&candidate);
+		let digest = Ctx::candidate_digest(&candidate);
 
-		let new_proposal = match self.authority_data.entry(from.clone()) {
+		let new_proposal = match self.authority_data.entry(authority.clone()) {
 			Entry::Occupied(mut occ) => {
 				// if digest is different, fetch candidate and
 				// note misbehavior.
@@ -510,7 +563,7 @@ impl<C: Context> Table<C> {
 
 		self.validity_vote(
 			context,
-			from,
+			authority,
 			digest,
 			ValidityVote::Issued(signature),
 		)
@@ -518,11 +571,11 @@ impl<C: Context> Table<C> {
 
 	fn validity_vote(
 		&mut self,
-		context: &C,
-		from: C::AuthorityId,
-		digest: C::Digest,
-		vote: ValidityVote<C::Signature>,
-	) -> ImportResult<C> {
+		context: &Ctx,
+		from: Ctx::AuthorityId,
+		digest: Ctx::Digest,
+		vote: ValidityVote<Ctx::Signature>,
+	) -> ImportResult<Ctx> {
 		let votes = match self.candidate_votes.get_mut(&digest) {
 			None => return Ok(None),
 			Some(votes) => votes,
@@ -608,9 +661,68 @@ impl<C: Context> Table<C> {
 	}
 }
 
-fn update_includable_count<G: Hash + Eq + Clone>(
-	map: &mut HashMap<G, usize>,
-	group_id: &G,
+type Drain<'a, Ctx> = hash_map::Drain<'a, <Ctx as Context>::AuthorityId, Vec<MisbehaviorFor<Ctx>>>;
+
+struct MisbehaviorForAuthority<Ctx: Context> {
+	id: Ctx::AuthorityId,
+	misbehaviors: Vec<MisbehaviorFor<Ctx>>,
+}
+
+impl<Ctx: Context> From<(Ctx::AuthorityId, Vec<MisbehaviorFor<Ctx>>)> for MisbehaviorForAuthority<Ctx> {
+	fn from((id, mut misbehaviors): (Ctx::AuthorityId, Vec<MisbehaviorFor<Ctx>>)) -> Self {
+		// we're going to be popping items off this list in the iterator, so reverse it now to
+		// preserve the original ordering.
+		misbehaviors.reverse();
+		Self { id, misbehaviors }
+	}
+}
+
+impl<Ctx: Context> Iterator for MisbehaviorForAuthority<Ctx> {
+	type Item = (Ctx::AuthorityId, MisbehaviorFor<Ctx>);
+
+	fn next(&mut self) -> Option<Self::Item> {
+		self.misbehaviors.pop().map(|misbehavior| (self.id.clone(), misbehavior))
+	}
+}
+
+pub struct DrainMisbehaviors<'a, Ctx: Context> {
+	drain: Drain<'a, Ctx>,
+	in_progress: Option<MisbehaviorForAuthority<Ctx>>,
+}
+
+impl<'a, Ctx: Context> From<Drain<'a, Ctx>> for DrainMisbehaviors<'a, Ctx> {
+	fn from(drain: Drain<'a, Ctx>) -> Self {
+		Self {
+			drain,
+			in_progress: None,
+		}
+	}
+}
+
+impl<'a, Ctx: Context> DrainMisbehaviors<'a, Ctx> {
+	fn maybe_item(&mut self) -> Option<(Ctx::AuthorityId, MisbehaviorFor<Ctx>)> {
+		self.in_progress.as_mut().and_then(Iterator::next)
+	}
+}
+
+impl<'a, Ctx: Context> Iterator for DrainMisbehaviors<'a, Ctx> {
+	type Item = (Ctx::AuthorityId, MisbehaviorFor<Ctx>);
+
+	fn next(&mut self) -> Option<Self::Item> {
+		// Note: this implementation will prematurely return `None` if `self.drain.next()` ever returns a
+		// tuple whose vector is empty. That will never currently happen, as the only modification
+		// to the backing map is currently via `drain` and `entry(...).or_default().push(...)`.
+		// However, future code changes might change that property.
+		self.maybe_item().or_else(|| {
+			self.in_progress = self.drain.next().map(Into::into);
+			self.maybe_item()
+		})
+	}
+}
+
+fn update_includable_count<Group: Hash + Eq + Clone>(
+	map: &mut HashMap<Group, usize>,
+	group_id: &Group,
 	was_includable: bool,
 	is_includable: bool,
 ) {
@@ -633,7 +745,7 @@ mod tests {
 	use super::*;
 	use std::collections::HashMap;
 
-	fn create<C: Context>() -> Table<C> {
+	fn create<Candidate: Context>() -> Table<Candidate> {
 		Table::default()
 	}
 
@@ -721,8 +833,8 @@ mod tests {
 
 		table.import_statement(&context, statement_b);
 		assert_eq!(
-			table.detected_misbehavior.get(&AuthorityId(1)).unwrap(),
-			&Misbehavior::MultipleCandidates(MultipleCandidates {
+			table.detected_misbehavior[&AuthorityId(1)][0],
+			Misbehavior::MultipleCandidates(MultipleCandidates {
 				first: (Candidate(2, 100), Signature(1)),
 				second: (Candidate(2, 999), Signature(1)),
 			})
@@ -749,8 +861,8 @@ mod tests {
 		table.import_statement(&context, statement);
 
 		assert_eq!(
-			table.detected_misbehavior.get(&AuthorityId(1)).unwrap(),
-			&Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
+			table.detected_misbehavior[&AuthorityId(1)][0],
+			Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
 				statement: SignedStatement {
 					statement: Statement::Candidate(Candidate(2, 100)),
 					signature: Signature(1),
@@ -793,8 +905,8 @@ mod tests {
 		table.import_statement(&context, bad_validity_vote);
 
 		assert_eq!(
-			table.detected_misbehavior.get(&AuthorityId(2)).unwrap(),
-			&Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
+			table.detected_misbehavior[&AuthorityId(2)][0],
+			Misbehavior::UnauthorizedStatement(UnauthorizedStatement {
 				statement: SignedStatement {
 					statement: Statement::Valid(candidate_a_digest),
 					signature: Signature(2),
@@ -844,8 +956,8 @@ mod tests {
 		table.import_statement(&context, invalid_statement);
 
 		assert_eq!(
-			table.detected_misbehavior.get(&AuthorityId(2)).unwrap(),
-			&Misbehavior::ValidityDoubleVote(ValidityDoubleVote::ValidityAndInvalidity(
+			table.detected_misbehavior[&AuthorityId(2)][0],
+			Misbehavior::ValidityDoubleVote(ValidityDoubleVote::ValidityAndInvalidity(
 				Candidate(2, 100),
 				Signature(2),
 				Signature(2),
@@ -978,8 +1090,8 @@ mod tests {
 
 		table.import_statement(&context, extra_vote);
 		assert_eq!(
-			table.detected_misbehavior.get(&AuthorityId(1)).unwrap(),
-			&Misbehavior::ValidityDoubleVote(ValidityDoubleVote::IssuedAndValidity(
+			table.detected_misbehavior[&AuthorityId(1)][0],
+			Misbehavior::ValidityDoubleVote(ValidityDoubleVote::IssuedAndValidity(
 				(Candidate(2, 100), Signature(1)),
 				(Digest(100), Signature(1)),
 			))
-- 
GitLab