Unverified Commit ec11d7e6 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Ensure that table router is always built (#952)

* Ensure that table router is always build

This pr ensures that the table router is always build, aka the future is
resolved. This is important, as the table router internally spawns tasks
to handle gossip messages. Handling gossip messages is not only required
on parachain validators, but also on relay chain validators to receive collations.

Tests are added to ensure that the assumptions hold.

* Fix compilation

* Switch to closures

* Remove empty line

* Revert "Remove empty line"

This reverts commit a6c19438.

* Revert "Switch to closures"

This reverts commit b989c303.

* Hybrid approach

* Rename test

* Make trait crate local
parent 36e9d17e
Pipeline #86349 passed with stages
in 24 minutes and 36 seconds
......@@ -28,7 +28,7 @@ use serde::{Serialize, Deserialize};
#[cfg(feature = "std")]
use primitives::bytes;
use primitives::RuntimeDebug;
use runtime_primitives::traits::{Block as BlockT};
use runtime_primitives::traits::Block as BlockT;
use inherents::InherentIdentifier;
use application_crypto::KeyTypeId;
......
......@@ -337,7 +337,7 @@ decl_storage! {
pub RelayDispatchQueue: map hasher(twox_64_concat) ParaId => Vec<UpwardMessage>;
/// Size of the dispatch queues. Separated from actual data in order to avoid costly
/// decoding when checking receipt validity. First item in tuple is the count of messages
/// second if the total length (in bytes) of the message payloads.
/// second if the total length (in bytes) of the message payloads.
pub RelayDispatchQueueSize: map hasher(twox_64_concat) ParaId => (u32, u32);
/// The ordered list of ParaIds that have a `RelayDispatchQueue` entry.
NeedsDispatch: Vec<ParaId>;
......
......@@ -498,7 +498,7 @@ decl_event!{
}
impl<T: Trait> Module<T> {
/// Ensures that the given `ParaId` corresponds to a registered parathread, and returns a descriptor if so.
/// Ensures that the given `ParaId` corresponds to a registered parathread, and returns a descriptor if so.
pub fn ensure_thread_id(id: ParaId) -> Option<ParaInfo> {
Paras::get(id).and_then(|info| if let Scheduling::Dynamic = info.scheduling {
Some(info)
......
......@@ -53,7 +53,7 @@ use frame_support::{
weights::DispatchInfo,
};
use pallet_transaction_payment_rpc_runtime_api::RuntimeDispatchInfo;
use session::{historical as session_historical};
use session::historical as session_historical;
#[cfg(feature = "std")]
pub use staking::StakerStatus;
......
......@@ -54,7 +54,7 @@ pub use self::shared_table::{
pub use self::validation_service::{ServiceHandle, ServiceBuilder};
#[cfg(not(target_os = "unknown"))]
pub use parachain::wasm_executor::{run_worker as run_validation_worker};
pub use parachain::wasm_executor::run_worker as run_validation_worker;
mod dynamic_inclusion;
mod error;
......@@ -108,6 +108,7 @@ pub trait Network {
/// Instantiate a table router using the given shared table.
/// Also pass through any outgoing messages to be broadcast to peers.
#[must_use]
fn build_table_router(
&self,
table: Arc<SharedTable>,
......
......@@ -26,21 +26,20 @@
//!
//! These attestation sessions are kept live until they are periodically garbage-collected.
use std::{time::{Duration, Instant}, sync::Arc};
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin};
use std::collections::HashMap;
use crate::pipeline::FullOutput;
use sc_client_api::{BlockchainEvents, BlockBackend};
use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use futures::{future::ready, prelude::*, task::{Spawn, SpawnExt}};
use futures::{prelude::*, task::{Spawn, SpawnExt}};
use polkadot_primitives::{Block, Hash, BlockId};
use polkadot_primitives::parachain::{
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, SigningContext,
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
CollationInfo, SigningContext,
};
use babe_primitives::BabeApi;
use keystore::KeyStorePtr;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_api::{ProvideRuntimeApi, ApiExt};
use runtime_primitives::traits::HashFor;
use availability_store::Store as AvailabilityStore;
......@@ -140,13 +139,10 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBackend<Block>,
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
BlockBuilderApi<Block> +
BabeApi<Block> +
ApiExt<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
N::TableRouter: Send + 'static + Sync,
N::BuildTableRouter: Send + Unpin + 'static,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
SC: SelectChain<Block> + 'static,
......@@ -171,10 +167,10 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
let mut parachain_validation = ParachainValidationInstances {
client: self.client.clone(),
network: self.network,
collators: self.collators,
spawner: self.spawner,
availability_store: self.availability_store,
live_instances: HashMap::new(),
collation_fetch: DefaultCollationFetch(self.collators),
};
let client = self.client;
......@@ -236,6 +232,57 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
}
}
/// Abstraction over `collation_fetch`.
pub(crate) trait CollationFetch {
/// Error type used by `collation_fetch`.
type Error: std::fmt::Debug;
/// Fetch a collation for the given `parachain`.
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static;
}
#[derive(Clone)]
struct DefaultCollationFetch<C>(C);
impl<C> CollationFetch for DefaultCollationFetch<C>
where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
{
type Error = C::Error;
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
{
crate::collation::collation_fetch(
parachain,
relay_parent,
self.0,
client,
max_block_data_size,
n_validators,
).boxed()
}
}
// finds the first key we are capable of signing with out of the given set of validators,
// if any.
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
......@@ -248,13 +295,11 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc
}
/// Constructs parachain-agreement instances.
pub(crate) struct ParachainValidationInstances<C, N, P, SP> {
pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
/// The client instance.
client: Arc<P>,
/// The backing network handle.
network: N,
/// Parachain collators.
collators: C,
/// handle to spawner
spawner: SP,
/// Store for extrinsic data.
......@@ -262,18 +307,20 @@ pub(crate) struct ParachainValidationInstances<C, N, P, SP> {
/// Live agreements. Maps relay chain parent hashes to attestation
/// instances.
live_instances: HashMap<Hash, ValidationInstanceHandle>,
/// Used to fetch a collation.
collation_fetch: CF,
}
impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
C: Collators + Send + Unpin + 'static + Sync,
impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
N: Network,
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
N::Error: 'static,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N::TableRouter: Send + 'static + Sync,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
N::BuildTableRouter: Unpin + Send + 'static,
SP: Spawn + Send + 'static,
CF: CollationFetch + Clone + Send + Sync + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
{
......@@ -361,13 +408,41 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
max_block_data_size,
));
let router = self.network.build_table_router(
let build_router = self.network.build_table_router(
table.clone(),
&validators,
);
if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) {
self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index);
let availability_store = self.availability_store.clone();
let client = self.client.clone();
let collation_fetch = self.collation_fetch.clone();
let res = self.spawner.spawn(async move {
// It is important that we build the router as it launches tasks internally
// that are required to receive gossip messages.
let router = match build_router.await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to build router: {:?}", e);
return
}
};
if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
let n_validators = validators.len();
launch_work(
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
availability_store,
router,
n_validators,
index,
).await;
}
});
if let Err(e) = res {
error!(target: "validation", "Failed to create router and launch work: {:?}", e);
}
let tracker = ValidationInstanceHandle {
......@@ -384,98 +459,305 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
fn retain<F: FnMut(&Hash) -> bool>(&mut self, mut pred: F) {
self.live_instances.retain(|k, _| pred(k))
}
}
// launch parachain work asynchronously.
fn launch_work(
&self,
relay_parent: Hash,
validation_para: ParaId,
build_router: N::BuildTableRouter,
max_block_data_size: Option<u64>,
n_validators: usize,
local_id: ValidatorIndex,
) {
let (collators, client) = (self.collators.clone(), self.client.clone());
let availability_store = self.availability_store.clone();
// launch parachain work asynchronously.
async fn launch_work<CFF, E>(
collation_fetch: impl FnOnce() -> CFF,
availability_store: AvailabilityStore,
router: impl TableRouter,
n_validators: usize,
local_id: ValidatorIndex,
) where
E: std::fmt::Debug,
CFF: Future<Output = Result<(CollationInfo, FullOutput), E>> + Send,
{
// fetch a local collation from connected collators.
let (collation_info, full_output) = match collation_fetch().await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
return
}
};
let crate::pipeline::FullOutput {
commitments,
erasure_chunks,
available_data,
..
} = full_output;
let receipt = collation_info.into_receipt(commitments);
let pov_block = available_data.pov_block.clone();
if let Err(e) = availability_store.make_available(
receipt.hash(),
available_data,
).await {
warn!(
target: "validation",
"Failed to make parachain block data available: {}",
e,
);
}
if let Err(e) = availability_store.clone().add_erasure_chunks(
receipt.clone(),
n_validators as _,
erasure_chunks.clone(),
).await {
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
}
if let Err(e) = router.local_collation(
receipt,
pov_block,
(local_id, &erasure_chunks),
).await {
warn!(target: "validation", "Failed to send local collation: {:?}", e);
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor::{ThreadPool, self}, future::ready, channel::mpsc};
use availability_store::ErasureNetworking;
use polkadot_primitives::parachain::{
PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex,
CollationInfo, DutyRoster, GlobalValidationSchedule, LocalValidationData,
Retriable, CollatorId, BlockData, Chain, AvailableData, SigningContext,
};
use runtime_primitives::traits::Block as BlockT;
use std::pin::Pin;
use sp_keyring::sr25519::Keyring;
/// Events fired while running mock implementations to follow execution.
enum Events {
BuildTableRouter,
CollationFetch,
LocalCollation,
}
#[derive(Clone)]
struct MockNetwork(mpsc::UnboundedSender<Events>);
impl Network for MockNetwork {
type Error = String;
type TableRouter = MockTableRouter;
type BuildTableRouter = Pin<Box<dyn Future<Output = Result<MockTableRouter, String>> + Send>>;
fn build_table_router(
&self,
_: Arc<SharedTable>,
_: &[ValidatorId],
) -> Self::BuildTableRouter {
let event_sender = self.0.clone();
async move {
event_sender.unbounded_send(Events::BuildTableRouter).expect("Send `BuildTableRouter`");
Ok(MockTableRouter(event_sender))
}.boxed()
}
}
#[derive(Clone)]
struct MockTableRouter(mpsc::UnboundedSender<Events>);
impl TableRouter for MockTableRouter {
type Error = String;
type SendLocalCollation = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
type FetchValidationProof = Box<dyn Future<Output = Result<PoVBlock, String>> + Unpin>;
fn local_collation(
&self,
_: AbridgedCandidateReceipt,
_: PoVBlock,
_: (ValidatorIndex, &[ErasureChunk]),
) -> Self::SendLocalCollation {
let sender = self.0.clone();
async move {
sender.unbounded_send(Events::LocalCollation).expect("Send `LocalCollation`");
Ok(())
}.boxed()
}
fn fetch_pov_block(&self, _: &AbridgedCandidateReceipt) -> Self::FetchValidationProof {
unimplemented!("Not required in tests")
}
}
let with_router = move |router: N::TableRouter| {
// fetch a local collation from connected collators.
let collation_work = crate::collation::collation_fetch(
validation_para,
#[derive(Clone)]
struct MockErasureNetworking;
impl ErasureNetworking for MockErasureNetworking {
type Error = String;
fn fetch_erasure_chunk(
&self,
_: &Hash,
_: u32,
) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>> {
ready(Err("Not required in tests".to_string())).boxed()
}
fn distribute_erasure_chunk(&self, _: Hash, _: ErasureChunk) {
unimplemented!("Not required in tests")
}
}
#[derive(Clone)]
struct MockCollationFetch(mpsc::UnboundedSender<Events>);
impl CollationFetch for MockCollationFetch {
type Error = ();
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
_: Arc<P>,
_: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), ()>> + Send>> {
let info = CollationInfo {
parachain_index: parachain,
relay_parent,
collators,
client.clone(),
max_block_data_size,
collator: Default::default(),
signature: Default::default(),
head_data: Default::default(),
pov_block_hash: Default::default(),
};
let available_data = AvailableData {
pov_block: PoVBlock { block_data: BlockData(Vec::new()) },
omitted_validation: Default::default(),
};
let full_output = FullOutput {
available_data,
commitments: Default::default(),
erasure_chunks: Default::default(),
n_validators,
);
};
collation_work.then(move |result| match result {
Ok((collation_info, full_output)) => {
let crate::pipeline::FullOutput {
commitments,
erasure_chunks,
available_data,
..
} = full_output;
let receipt = collation_info.into_receipt(commitments);
// Apparently the `async move` block is the only way to convince
// the compiler that we are not moving values out of borrowed context.
let av_clone = availability_store.clone();
let receipt_clone = receipt.clone();
let erasure_chunks_clone = erasure_chunks.clone();
let pov_block = available_data.pov_block.clone();
let res = async move {
if let Err(e) = av_clone.make_available(
receipt_clone.hash(),
available_data,
).await {
warn!(
target: "validation",
"Failed to make parachain block data available: {}",
e,
);
}
if let Err(e) = av_clone.clone().add_erasure_chunks(
receipt_clone,
n_validators as _,
erasure_chunks_clone,
).await {
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
}
}
.unit_error()
.then(move |_| {
router.local_collation(
receipt,
pov_block,
(local_id, &erasure_chunks),
).map_err(|e| warn!(target: "validation", "Failed to send local collation: {:?}", e))
});
res.boxed()
}
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
Box::pin(ready(Ok(())))
}
})
};
let sender = self.0;
let router_work = build_router
.map_ok(with_router)
.map_err(|e| {
warn!(target: "validation" , "Failed to build table router: {:?}", e);
})
.and_then(|r| r)
.map(|_| ());
async move {
sender.unbounded_send(Events::CollationFetch).expect("`CollationFetch` event send");
Ok((info, full_output))
}.boxed()
}
}
#[derive(Clone)]
struct MockRuntimeApi {
validators: Vec<ValidatorId>,
duty_roster: DutyRoster,
}
// spawn onto thread pool.
if self.spawner.spawn(router_work).is_err() {
error!("Failed to spawn router work task");
impl ProvideRuntimeApi<Block> for MockRuntimeApi {
type Api = Self;
fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
self.clone().into()
}
}
sp_api::mock_impl_runtime_apis! {
impl ParachainHost<Block> for MockRuntimeApi {
type Error = sp_blockchain::Error;
fn validators(&self) -> Vec<ValidatorId> { self.validators.clone() }
fn duty_roster(&self) -> DutyRoster { self.duty_roster.clone() }
fn active_parachains() -> Vec<(ParaId, Option<(CollatorId, Retriable)>)> { vec![(ParaId::from(1), None)] }
fn global_validation_schedule() -> GlobalValidationSchedule { Default::default() }
fn local_validation_data(_: ParaId) -> Option<LocalValidationData> { None }
fn parachain_code(_: ParaId) -> Option<Vec<u8>> { None }
fn get_heads(_: Vec<<Block as BlockT>::Extrinsic>) -> Option<Vec<AbridgedCandidateReceipt>> {
None
}
fn signing_context() -> SigningContext {
Default::default()
}
}
}
#[test]
fn launch_work_is_executed_properly() {
let executor = ThreadPool::new().unwrap();
let keystore = keystore::Store::new_in_memory();
// Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator.
keystore.write().insert_ephemeral_from_seed::<ValidatorPair>(&Keyring::Bob.to_seed())
.expect("Insert key into keystore");
let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())];
let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())];
let duty_roster = DutyRoster { validator_duty };
let (events_sender, events) = mpsc::unbounded();
let mut parachain_validation = ParachainValidationInstances {
client: Arc::new(MockRuntimeApi { validators, duty_roster }),
network: MockNetwork(events_sender.clone()),
collation_fetch: MockCollationFetch(events_sender.clone()),
spawner: executor.clone(),