From 58ab4f6b9fc85a414bbfd4b013ab29fa5e7a7b86 Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Mon, 17 Jun 2019 14:38:03 +0200
Subject: [PATCH] Track and accumulate ingress roots in runtime (#287)

* track unrouted ingress in runtime

* test ingress routing

* fix compilation

* add space

Co-Authored-By: Gavin Wood <github@gavwood.com>
---
 polkadot/network/src/lib.rs              |   8 +-
 polkadot/network/src/tests/mod.rs        |   4 +-
 polkadot/network/src/tests/validation.rs |  10 +-
 polkadot/primitives/src/parachain.rs     |  43 ++--
 polkadot/runtime/src/lib.rs              |   4 +-
 polkadot/runtime/src/parachains.rs       | 277 +++++++++++++++++------
 polkadot/validation/src/collation.rs     |  18 +-
 7 files changed, 263 insertions(+), 101 deletions(-)

diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs
index a0b64eb5436..c191c912e9f 100644
--- a/polkadot/network/src/lib.rs
+++ b/polkadot/network/src/lib.rs
@@ -30,7 +30,7 @@ use futures::sync::oneshot;
 use polkadot_primitives::{Block, SessionKey, Hash, Header};
 use polkadot_primitives::parachain::{
 	Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock,
-	ConsolidatedIngressRoots,
+	StructuredUnroutedIngress,
 };
 use substrate_network::{PeerId, RequestId, Context};
 use substrate_network::{message, generic_message};
@@ -83,7 +83,7 @@ struct PoVBlockRequest {
 	candidate_hash: Hash,
 	block_data_hash: Hash,
 	sender: oneshot::Sender<PoVBlock>,
-	canon_roots: ConsolidatedIngressRoots,
+	canon_roots: StructuredUnroutedIngress,
 }
 
 impl PoVBlockRequest {
@@ -218,7 +218,7 @@ impl PolkadotProtocol {
 		ctx: &mut Context<Block>,
 		candidate: &CandidateReceipt,
 		relay_parent: Hash,
-		canon_roots: ConsolidatedIngressRoots,
+		canon_roots: StructuredUnroutedIngress,
 	) -> oneshot::Receiver<PoVBlock> {
 		let (tx, rx) = oneshot::channel();
 
@@ -547,7 +547,7 @@ impl Specialization<Block> for PolkadotProtocol {
 							validation_session_parent: Default::default(),
 							candidate_hash: Default::default(),
 							block_data_hash: Default::default(),
-							canon_roots: ConsolidatedIngressRoots(Vec::new()),
+							canon_roots: StructuredUnroutedIngress(Vec::new()),
 							sender,
 						}));
 					}
diff --git a/polkadot/network/src/tests/mod.rs b/polkadot/network/src/tests/mod.rs
index 799fa87c2e7..c0e659df7f4 100644
--- a/polkadot/network/src/tests/mod.rs
+++ b/polkadot/network/src/tests/mod.rs
@@ -24,7 +24,7 @@ use polkadot_validation::GenericStatement;
 use polkadot_primitives::{Block, Hash, SessionKey};
 use polkadot_primitives::parachain::{
 	CandidateReceipt, HeadData, PoVBlock, BlockData, CollatorId, ValidatorId,
-	ConsolidatedIngressRoots,
+	StructuredUnroutedIngress,
 };
 use substrate_primitives::crypto::UncheckedInto;
 use parity_codec::Encode;
@@ -175,7 +175,7 @@ fn fetches_from_those_with_knowledge() {
 	let knowledge = session.knowledge();
 
 	knowledge.lock().note_statement(a_key.clone(), &GenericStatement::Valid(candidate_hash));
-	let canon_roots = ConsolidatedIngressRoots(Vec::new());
+	let canon_roots = StructuredUnroutedIngress(Vec::new());
 	let recv = protocol.fetch_pov_block(
 		&mut TestContext::default(),
 		&candidate_receipt,
diff --git a/polkadot/network/src/tests/validation.rs b/polkadot/network/src/tests/validation.rs
index 0a503b58eab..326576df667 100644
--- a/polkadot/network/src/tests/validation.rs
+++ b/polkadot/network/src/tests/validation.rs
@@ -29,7 +29,7 @@ use polkadot_validation::{SharedTable, MessagesFrom, Network};
 use polkadot_primitives::{SessionKey, Block, Hash, Header, BlockId};
 use polkadot_primitives::parachain::{
 	Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage,
-	ValidatorId, ConsolidatedIngressRoots,
+	ValidatorId, StructuredUnroutedIngress, BlockIngressRoots,
 };
 use parking_lot::Mutex;
 use substrate_client::error::Result as ClientResult;
@@ -175,7 +175,7 @@ struct ApiData {
 	validators: Vec<ValidatorId>,
 	duties: Vec<Chain>,
 	active_parachains: Vec<ParaId>,
-	ingress: HashMap<ParaId, ConsolidatedIngressRoots>,
+	ingress: HashMap<ParaId, StructuredUnroutedIngress>,
 }
 
 #[derive(Default, Clone)]
@@ -306,7 +306,7 @@ impl ParachainHost<Block> for RuntimeApi {
 		_: ExecutionContext,
 		id: Option<ParaId>,
 		_: Vec<u8>,
-	) -> ClientResult<NativeOrEncoded<Option<ConsolidatedIngressRoots>>> {
+	) -> ClientResult<NativeOrEncoded<Option<StructuredUnroutedIngress>>> {
 		let id = id.unwrap();
 		Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned()))
 	}
@@ -372,7 +372,7 @@ impl IngressBuilder {
 		}
 	}
 
-	fn build(self) -> HashMap<ParaId, ConsolidatedIngressRoots> {
+	fn build(self) -> HashMap<ParaId, BlockIngressRoots> {
 		let mut map = HashMap::new();
 		for ((source, target), messages) in self.egress {
 			map.entry(target).or_insert_with(Vec::new)
@@ -383,7 +383,7 @@ impl IngressBuilder {
 			roots.sort_by_key(|&(para_id, _)| para_id);
 		}
 
-		map.into_iter().map(|(k, v)| (k, ConsolidatedIngressRoots(v))).collect()
+		map.into_iter().map(|(k, v)| (k, BlockIngressRoots(v))).collect()
 	}
 }
 
diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs
index 9281809b0b6..eb0bba30923 100644
--- a/polkadot/primitives/src/parachain.rs
+++ b/polkadot/primitives/src/parachain.rs
@@ -19,7 +19,7 @@
 use rstd::prelude::*;
 use rstd::cmp::Ordering;
 use parity_codec::{Encode, Decode};
-use super::{Hash, Balance};
+use super::{Hash, Balance, BlockNumber};
 
 #[cfg(feature = "std")]
 use serde::{Serialize, Deserialize};
@@ -200,18 +200,35 @@ pub struct PoVBlock {
 #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Debug))]
 pub struct Message(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec<u8>);
 
-/// Consolidated ingress roots.
+/// All ingress roots at one block.
 ///
-/// This is an ordered vector of other parachains' egress queue roots,
-/// obtained according to the routing rules. The same parachain may appear
-/// twice.
+/// This is an ordered vector of other parachain's egress queue roots from a specific block.
+/// empty roots are omitted. Each parachain may appear once at most.
+#[derive(Default, PartialEq, Eq, Clone, Encode)]
+#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Decode))]
+pub struct BlockIngressRoots(pub Vec<(Id, Hash)>);
+
+/// All ingress roots, grouped by block number (ascending). To properly
+/// interpret this struct, the user must have knowledge of which fork of the relay
+/// chain all block numbers correspond to.
 #[derive(Default, PartialEq, Eq, Clone, Encode)]
 #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Decode))]
-pub struct ConsolidatedIngressRoots(pub Vec<(Id, Hash)>);
+pub struct StructuredUnroutedIngress(pub Vec<(BlockNumber, BlockIngressRoots)>);
+
+#[cfg(feature = "std")]
+impl StructuredUnroutedIngress {
+	/// Get the length of all the ingress roots across all blocks.
+	pub fn len(&self) -> usize {
+		self.0.iter().fold(0, |a, (_, roots)| a + roots.0.len())
+	}
 
-impl From<Vec<(Id, Hash)>> for ConsolidatedIngressRoots {
-	fn from(v: Vec<(Id, Hash)>) -> Self {
-		ConsolidatedIngressRoots(v)
+	/// Returns an iterator over all ingress roots. The block number indicates
+	/// the height at which that root was posted to the relay chain. The parachain ID is the
+	/// message sender.
+	pub fn iter(&self) -> impl Iterator<Item=(BlockNumber, &Id, &Hash)> {
+		self.0.iter().flat_map(|&(n, ref roots)|
+			roots.0.iter().map(move |&(ref from, ref root)| (n, from, root))
+		)
 	}
 }
 
@@ -219,7 +236,7 @@ impl From<Vec<(Id, Hash)>> for ConsolidatedIngressRoots {
 ///
 /// This is just an ordered vector of other parachains' egress queues,
 /// obtained according to the routing rules. The same parachain may appear
-/// twice.
+/// more than once.
 #[derive(Default, PartialEq, Eq, Clone, Decode)]
 #[cfg_attr(feature = "std", derive(Serialize, Deserialize, Encode, Debug))]
 pub struct ConsolidatedIngress(pub Vec<(Id, Vec<Message>)>);
@@ -324,9 +341,9 @@ substrate_client::decl_runtime_apis! {
 		fn parachain_head(id: Id) -> Option<Vec<u8>>;
 		/// Get the given parachain's head code blob.
 		fn parachain_code(id: Id) -> Option<Vec<u8>>;
-		/// Get the ingress roots to a specific parachain at a
-		/// block.
-		fn ingress(to: Id) -> Option<ConsolidatedIngressRoots>;
+		/// Get all the unrouted ingress roots at the given block that
+		/// are targeting the given parachain.
+		fn ingress(to: Id) -> Option<StructuredUnroutedIngress>;
 	}
 }
 
diff --git a/polkadot/runtime/src/lib.rs b/polkadot/runtime/src/lib.rs
index 7bb57c2dfb0..7fe558669e0 100644
--- a/polkadot/runtime/src/lib.rs
+++ b/polkadot/runtime/src/lib.rs
@@ -356,8 +356,8 @@ impl_runtime_apis! {
 		fn parachain_code(id: parachain::Id) -> Option<Vec<u8>> {
 			Parachains::parachain_code(&id)
 		}
-		fn ingress(to: parachain::Id) -> Option<parachain::ConsolidatedIngressRoots> {
-			Parachains::ingress(to).map(Into::into)
+		fn ingress(to: parachain::Id) -> Option<parachain::StructuredUnroutedIngress> {
+			Parachains::ingress(to).map(parachain::StructuredUnroutedIngress)
 		}
 	}
 
diff --git a/polkadot/runtime/src/parachains.rs b/polkadot/runtime/src/parachains.rs
index aebdb6a8777..16c19191220 100644
--- a/polkadot/runtime/src/parachains.rs
+++ b/polkadot/runtime/src/parachains.rs
@@ -17,14 +17,17 @@
 //! Main parachains logic. For now this is just the determination of which validators do what.
 
 use rstd::prelude::*;
+use rstd::collections::btree_map::BTreeMap;
 use parity_codec::{Decode, HasCompact};
 use srml_support::{decl_storage, decl_module, fail, ensure};
 
 use bitvec::{bitvec, BigEndian};
-use sr_primitives::traits::{Hash as HashT, BlakeTwo256, Member, CheckedConversion};
+use sr_primitives::traits::{
+	Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One, Zero,
+};
 use primitives::{Hash, parachain::{
 	Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion,
-	ParachainDispatchOrigin, UpwardMessage
+	ParachainDispatchOrigin, UpwardMessage, BlockIngressRoots,
 }};
 use {system, session};
 use srml_support::{
@@ -44,6 +47,32 @@ use rstd::marker::PhantomData;
 
 use system::ensure_none;
 
+// ranges for iteration of general block number don't work, so this
+// is a utility to get around that.
+struct BlockNumberRange<N> {
+	low: N,
+	high: N,
+}
+
+impl<N: Saturating + One + PartialOrd + PartialEq + Clone> Iterator for BlockNumberRange<N> {
+	type Item = N;
+
+	fn next(&mut self) -> Option<N> {
+		if self.low >= self.high {
+			return None
+		}
+
+		let item = self.low.clone();
+		self.low = self.low.clone().saturating_add(One::one());
+		Some(item)
+	}
+}
+
+// creates a range iterator between `low` and `high`. `low` must be <= `high`.
+fn number_range<N>(low: N, high: N) -> BlockNumberRange<N> {
+	BlockNumberRange { low, high }
+}
+
 /// Parachain registration API.
 pub trait ParachainRegistrar<AccountId> {
 	/// An identifier for a parachain.
@@ -76,6 +105,12 @@ impl<T: Trait> ParachainRegistrar<T::AccountId> for Module<T> {
 		<Parachains<T>>::put(parachains);
 		<Heads<T>>::insert(id, initial_head_data);
 
+		// Because there are no ordering guarantees that inherents
+		// are applied before regular transactions, a parachain candidate could
+		// be registered before the `UpdateHeads` inherent is processed. If so, messages
+		// could be sent to a parachain in the block it is registered.
+		<Watermarks<T>>::insert(id, <system::Module<T>>::block_number().saturating_sub(One::one()));
+
 		Ok(())
 	}
 	fn deregister_parachain(id: ParaId) -> Result {
@@ -88,10 +123,17 @@ impl<T: Trait> ParachainRegistrar<T::AccountId> for Module<T> {
 		<Code<T>>::remove(id);
 		<Heads<T>>::remove(id);
 
-		// clear all routing entries to and from other parachains.
-		for other in parachains.iter().cloned() {
-			<Routing<T>>::remove((id, other));
-			<Routing<T>>::remove((other, id));
+		let watermark = <Watermarks<T>>::take(id);
+
+		// clear all routing entries _to_. But not those _from_.
+		if let Some(watermark) = watermark {
+			let now = <system::Module<T>>::block_number();
+
+			// iterate over all blocks between watermark and now + 1 (since messages might
+			// have already been sent to `id` in this block.
+			for unrouted_block in number_range(watermark, now).map(|n| n.saturating_add(One::one())) {
+				<UnroutedIngress<T>>::remove(&(unrouted_block, id));
+			}
 		}
 
 		<Parachains<T>>::put(parachains);
@@ -137,8 +179,16 @@ decl_storage! {
 		pub Code get(parachain_code): map ParaId => Option<Vec<u8>>;
 		// The heads of the parachains registered at present.
 		pub Heads get(parachain_head): map ParaId => Option<Vec<u8>>;
-		// message routing roots (from, to).
-		pub Routing: map (ParaId, ParaId) => Option<Hash>;
+		// The watermark heights of the parachains registered at present.
+		// For every parachain, this is the block height from which all messages targeting
+		// that parachain have been processed. Can be `None` only if the parachain doesn't exist.
+		pub Watermarks get(watermark): map ParaId => Option<T::BlockNumber>;
+
+		/// Unrouted ingress. Maps (BlockNumber, to_chain) pairs to [(from_chain, egress_root)].
+		///
+		/// There may be an entry under (i, p) in this map for every i between the parachain's
+		/// watermark and the current block.
+		pub UnroutedIngress: map (T::BlockNumber, ParaId) => Option<Vec<(ParaId, Hash)>>;
 
 		/// Messages ready to be dispatched onto the relay chain. It is subject to
 		/// `MAX_MESSAGE_COUNT` and `WATERMARK_MESSAGE_SIZE`.
@@ -170,6 +220,7 @@ decl_storage! {
 				// no ingress -- a chain cannot be routed to until it is live.
 				<Code<T> as generator::StorageMap<_, _>>::insert(&id, &code, storage);
 				<Heads<T> as generator::StorageMap<_, _>>::insert(&id, &genesis, storage);
+				<Watermarks<T> as generator::StorageMap<_, _>>::insert(&id, &Zero::zero(), storage);
 			}
 		});
 	}
@@ -220,21 +271,15 @@ decl_module! {
 
 				Self::check_attestations(&heads)?;
 
-				for head in heads.iter() {
-					let id = head.parachain_index();
-					<Heads<T>>::insert(id, &head.candidate.head_data.0);
-
-					// update egress.
-					for &(to, root) in &head.candidate.egress_queue_roots {
-						<Routing<T>>::insert((id, to), root);
-					}
+				let current_number = <system::Module<T>>::block_number();
 
-					// Queue up upwards messages (from parachains to relay chain).
-					Self::queue_upward_messages(id, &head.candidate.upward_messages);
-				}
+				Self::update_routing(
+					current_number,
+					&heads
+				);
 
 				Self::dispatch_upward_messages(
-					<system::Module<T>>::block_number(),
+					current_number,
 					&active_parachains,
 					MAX_QUEUE_COUNT,
 					WATERMARK_QUEUE_SIZE,
@@ -327,6 +372,48 @@ impl<T: Trait> Module<T> {
 		Ok(())
 	}
 
+	/// Update routing information from the parachain heads. This queues upwards
+	/// messages to the relay chain as well.
+	fn update_routing(
+		now: T::BlockNumber,
+		heads: &[AttestedCandidate],
+	) {
+		// TODO: per-chain watermark
+		// https://github.com/paritytech/polkadot/issues/286
+		let watermark = now.saturating_sub(One::one());
+
+		let mut ingress_update = BTreeMap::new();
+
+		for head in heads.iter() {
+			let id = head.parachain_index();
+			<Heads<T>>::insert(id, &head.candidate.head_data.0);
+
+			let last_watermark = <Watermarks<T>>::mutate(id, |mark| {
+				rstd::mem::replace(mark, Some(watermark))
+			});
+
+			if let Some(last_watermark) = last_watermark {
+				// Discard routed ingress.
+				for routed_height in number_range(last_watermark, watermark) {
+					<UnroutedIngress<T>>::remove(&(routed_height, id));
+				}
+			}
+
+			// place our egress root to `to` into the ingress table for (now, `to`).
+			for &(to, root) in &head.candidate.egress_queue_roots {
+				ingress_update.entry(to).or_insert_with(Vec::new).push((id, root));
+			}
+
+			// Queue up upwards messages (from parachains to relay chain).
+			Self::queue_upward_messages(id, &head.candidate.upward_messages);
+		}
+
+		// apply the ingress update.
+		for (to, ingress_roots) in ingress_update {
+			<UnroutedIngress<T>>::insert((now, to), ingress_roots);
+		}
+	}
+
 	/// Place any new upward messages into our queue for later dispatch.
 	fn queue_upward_messages(id: ParaId, upward_messages: &[UpwardMessage]) {
 		if !upward_messages.is_empty() {
@@ -445,16 +532,19 @@ impl<T: Trait> Module<T> {
 	}
 
 	/// Calculate the ingress to a specific parachain.
+	/// Complexity: O(n) in the number of blocks since the parachain's watermark.
+	/// invoked off-chain.
 	///
-	/// Yields a list of parachains being routed from, and the egress
-	/// queue roots to consider.
-	pub fn ingress(to: ParaId) -> Option<Vec<(ParaId, Hash)>> {
-		let active_parachains = Self::active_parachains();
-		if !active_parachains.contains(&to) { return None }
-
-		Some(active_parachains.into_iter().filter(|i| i != &to)
-			.filter_map(move |from| {
-				<Routing<T>>::get((from, to.clone())).map(move |h| (from, h))
+	/// Yields a structure containing all unrouted ingress to the parachain.
+	pub fn ingress(to: ParaId) -> Option<Vec<(T::BlockNumber, BlockIngressRoots)>> {
+		let watermark = <Watermarks<T>>::get(to)?;
+		let now = <system::Module<T>>::block_number();
+
+		Some(number_range(watermark.saturating_add(One::one()),now)
+			.filter_map(|unrouted_height| {
+				<UnroutedIngress<T>>::get(&(unrouted_height, to)).map(|roots| {
+					(unrouted_height, BlockIngressRoots(roots))
+				})
 			})
 			.collect())
 	}
@@ -1299,6 +1389,8 @@ mod tests {
 
 	#[test]
 	fn ingress_works() {
+		use sr_primitives::traits::OnFinalize;
+
 		let parachains = vec![
 			(0u32.into(), vec![], vec![]),
 			(1u32.into(), vec![], vec![]),
@@ -1306,62 +1398,115 @@ mod tests {
 		];
 
 		with_externalities(&mut new_test_ext(parachains), || {
-			let from_a = vec![(1.into(), [1; 32].into())];
-			let mut candidate_a = AttestedCandidate {
-				validity_votes: vec![],
-				candidate: CandidateReceipt {
-					parachain_index: 0.into(),
-					collator: Default::default(),
-					signature: Default::default(),
-					head_data: HeadData(vec![1, 2, 3]),
-					egress_queue_roots: from_a.clone(),
-					fees: 0,
-					block_data_hash: Default::default(),
-					upward_messages: vec![],
-				}
-			};
+			assert_eq!(Parachains::ingress(ParaId::from(1)), Some(Vec::new()));
+			assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new()));
 
-			let from_b = vec![(99.into(), [1; 32].into())];
-			let mut candidate_b = AttestedCandidate {
-				validity_votes: vec![],
-				candidate: CandidateReceipt {
-					parachain_index: 1.into(),
-					collator: Default::default(),
-					signature: Default::default(),
-					head_data: HeadData(vec![1, 2, 3]),
-					egress_queue_roots: from_b.clone(),
-					fees: 0,
-					block_data_hash: Default::default(),
-					upward_messages: vec![],
-				}
-			};
+			for i in 1..10 {
+				System::set_block_number(i);
+
+				let from_a = vec![(1.into(), [i as u8; 32].into())];
+				let mut candidate_a = AttestedCandidate {
+					validity_votes: vec![],
+					candidate: CandidateReceipt {
+						parachain_index: 0.into(),
+						collator: Default::default(),
+						signature: Default::default(),
+						head_data: HeadData(vec![1, 2, 3]),
+						egress_queue_roots: from_a.clone(),
+						fees: 0,
+						block_data_hash: Default::default(),
+						upward_messages: vec![],
+					}
+				};
 
-			make_attestations(&mut candidate_a);
-			make_attestations(&mut candidate_b);
+				let from_b = vec![(99.into(), [i as u8; 32].into())];
+				let mut candidate_b = AttestedCandidate {
+					validity_votes: vec![],
+					candidate: CandidateReceipt {
+						parachain_index: 1.into(),
+						collator: Default::default(),
+						signature: Default::default(),
+						head_data: HeadData(vec![1, 2, 3]),
+						egress_queue_roots: from_b.clone(),
+						fees: 0,
+						block_data_hash: Default::default(),
+						upward_messages: vec![],
+					}
+				};
 
-			assert_eq!(Parachains::ingress(ParaId::from(1)), Some(Vec::new()));
-			assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new()));
+				make_attestations(&mut candidate_a);
+				make_attestations(&mut candidate_b);
+
+				assert!(Parachains::dispatch(
+					set_heads(vec![candidate_a, candidate_b]),
+					Origin::NONE,
+				).is_ok());
+
+				Parachains::on_finalize(i);
+			}
 
+			System::set_block_number(10);
 			assert!(Parachains::dispatch(
-				set_heads(vec![candidate_a, candidate_b]),
+				set_heads(vec![]),
 				Origin::NONE,
 			).is_ok());
 
+			// parachain 1 has had a bunch of parachain candidates included,
+			// which raises the watermark.
 			assert_eq!(
 				Parachains::ingress(ParaId::from(1)),
-				Some(vec![(0.into(), [1; 32].into())]),
+				Some(vec![
+					(9, BlockIngressRoots(vec![
+						(0.into(), [9; 32].into())
+					]))
+				]),
 			);
 
+			// parachain 99 hasn't had any candidates included, so the
+			// ingress is piling up.
 			assert_eq!(
 				Parachains::ingress(ParaId::from(99)),
-				Some(vec![(1.into(), [1; 32].into())]),
+				Some((1..10).map(|i| (i, BlockIngressRoots(
+					vec![(1.into(), [i as u8; 32].into())]
+				))).collect::<Vec<_>>()),
 			);
 
 			assert_ok!(Parachains::deregister_parachain(1u32.into()));
 
-			// after deregistering, there is no ingress to 1 and we stop routing
-			// from 1.
+			// after deregistering, there is no ingress to 1, but unrouted messages
+			// from 1 stick around.
 			assert_eq!(Parachains::ingress(ParaId::from(1)), None);
+			assert_eq!(Parachains::ingress(ParaId::from(99)), Some((1..10).map(|i| (i, BlockIngressRoots(
+				vec![(1.into(), [i as u8; 32].into())]
+			))).collect::<Vec<_>>()));
+
+			Parachains::on_finalize(10);
+			System::set_block_number(11);
+
+			let mut candidate_c = AttestedCandidate {
+				validity_votes: vec![],
+				candidate: CandidateReceipt {
+					parachain_index: 99.into(),
+					collator: Default::default(),
+					signature: Default::default(),
+					head_data: HeadData(vec![1, 2, 3]),
+					egress_queue_roots: Vec::new(),
+					fees: 0,
+					block_data_hash: Default::default(),
+					upward_messages: vec![],
+				}
+			};
+			make_attestations(&mut candidate_c);
+
+			assert!(Parachains::dispatch(
+				set_heads(vec![candidate_c]),
+				Origin::NONE,
+			).is_ok());
+
+			Parachains::on_finalize(11);
+			System::set_block_number(12);
+
+			// at the next block, ingress to 99 should be empty.
 			assert_eq!(Parachains::ingress(ParaId::from(99)), Some(Vec::new()));
 		});
 	}
diff --git a/polkadot/validation/src/collation.rs b/polkadot/validation/src/collation.rs
index f2058f1e0e2..77c6a905293 100644
--- a/polkadot/validation/src/collation.rs
+++ b/polkadot/validation/src/collation.rs
@@ -22,7 +22,7 @@
 use std::sync::Arc;
 
 use polkadot_primitives::{Block, Hash, BlockId, parachain::CollatorId, parachain::{
-	ConsolidatedIngress, ConsolidatedIngressRoots, CandidateReceipt, ParachainHost,
+	ConsolidatedIngress, StructuredUnroutedIngress, CandidateReceipt, ParachainHost,
 	Id as ParaId, Collation, Extrinsic, OutgoingMessage, UpwardMessage
 }};
 use runtime_primitives::traits::ProvideRuntimeApi;
@@ -324,21 +324,21 @@ impl Externalities {
 
 /// Validate incoming messages against expected roots.
 pub fn validate_incoming(
-	roots: &ConsolidatedIngressRoots,
+	roots: &StructuredUnroutedIngress,
 	ingress: &ConsolidatedIngress,
 ) -> Result<(), Error> {
-	if roots.0.len() != ingress.0.len() {
+	if roots.len() != ingress.0.len() {
 		return Err(Error::IngressCanonicalityMismatch {
 			expected: roots.0.len(),
 			got: ingress.0.len()
 		});
 	}
 
-	let all_iter = roots.0.iter().zip(&ingress.0);
-	for ((expected_id, root), (got_id, messages)) in all_iter {
-		if expected_id != got_id {
+	let all_iter = roots.iter().zip(&ingress.0);
+	for ((_, expected_from, root), (got_id, messages)) in all_iter {
+		if expected_from != got_id {
 			return Err(Error::IngressChainMismatch {
-				expected: *expected_id,
+				expected: *expected_from,
 				got: *got_id
 			});
 		}
@@ -346,7 +346,7 @@ pub fn validate_incoming(
 		let got_root = message_queue_root(messages.iter().map(|msg| &msg.0[..]));
 		if &got_root != root {
 			return Err(Error::IngressRootMismatch{
-				id: *expected_id,
+				id: *expected_from,
 				expected: *root,
 				got: got_root
 			});
@@ -429,7 +429,7 @@ mod tests {
 	use super::*;
 	use parachain::wasm_executor::Externalities as ExternalitiesTrait;
 	use parachain::ParachainDispatchOrigin;
-	use polkadot_primitives::parachain::{Statement::Candidate, CandidateReceipt, HeadData};
+	use polkadot_primitives::parachain::{CandidateReceipt, HeadData};
 
 	#[test]
 	fn compute_and_check_egress() {
-- 
GitLab