Unverified Commit 3e8c810d authored by Gav Wood's avatar Gav Wood
Browse files

Merge branch 'master' of github.com:paritytech/polkadot

parents 5f3623a5 ec11d7e6
Pipeline #86391 passed with stages
in 20 minutes and 45 seconds
......@@ -28,7 +28,7 @@ use serde::{Serialize, Deserialize};
#[cfg(feature = "std")]
use primitives::bytes;
use primitives::RuntimeDebug;
use runtime_primitives::traits::{Block as BlockT};
use runtime_primitives::traits::Block as BlockT;
use inherents::InherentIdentifier;
use application_crypto::KeyTypeId;
......
......@@ -337,7 +337,7 @@ decl_storage! {
pub RelayDispatchQueue: map hasher(twox_64_concat) ParaId => Vec<UpwardMessage>;
/// Size of the dispatch queues. Separated from actual data in order to avoid costly
/// decoding when checking receipt validity. First item in tuple is the count of messages
/// second if the total length (in bytes) of the message payloads.
/// second if the total length (in bytes) of the message payloads.
pub RelayDispatchQueueSize: map hasher(twox_64_concat) ParaId => (u32, u32);
/// The ordered list of ParaIds that have a `RelayDispatchQueue` entry.
NeedsDispatch: Vec<ParaId>;
......
......@@ -498,7 +498,7 @@ decl_event!{
}
impl<T: Trait> Module<T> {
/// Ensures that the given `ParaId` corresponds to a registered parathread, and returns a descriptor if so.
/// Ensures that the given `ParaId` corresponds to a registered parathread, and returns a descriptor if so.
pub fn ensure_thread_id(id: ParaId) -> Option<ParaInfo> {
Paras::get(id).and_then(|info| if let Scheduling::Dynamic = info.scheduling {
Some(info)
......
......@@ -53,7 +53,7 @@ use frame_support::{
weights::DispatchInfo,
};
use pallet_transaction_payment_rpc_runtime_api::RuntimeDispatchInfo;
use session::{historical as session_historical};
use session::historical as session_historical;
#[cfg(feature = "std")]
pub use staking::StakerStatus;
......
......@@ -54,7 +54,7 @@ pub use self::shared_table::{
pub use self::validation_service::{ServiceHandle, ServiceBuilder};
#[cfg(not(target_os = "unknown"))]
pub use parachain::wasm_executor::{run_worker as run_validation_worker};
pub use parachain::wasm_executor::run_worker as run_validation_worker;
mod dynamic_inclusion;
mod error;
......@@ -108,6 +108,7 @@ pub trait Network {
/// Instantiate a table router using the given shared table.
/// Also pass through any outgoing messages to be broadcast to peers.
#[must_use]
fn build_table_router(
&self,
table: Arc<SharedTable>,
......
......@@ -26,21 +26,20 @@
//!
//! These attestation sessions are kept live until they are periodically garbage-collected.
use std::{time::{Duration, Instant}, sync::Arc};
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin};
use std::collections::HashMap;
use crate::pipeline::FullOutput;
use sc_client_api::{BlockchainEvents, BlockBackend};
use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use futures::{future::ready, prelude::*, task::{Spawn, SpawnExt}};
use futures::{prelude::*, task::{Spawn, SpawnExt}};
use polkadot_primitives::{Block, Hash, BlockId};
use polkadot_primitives::parachain::{
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, SigningContext,
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
CollationInfo, SigningContext,
};
use babe_primitives::BabeApi;
use keystore::KeyStorePtr;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_api::{ProvideRuntimeApi, ApiExt};
use runtime_primitives::traits::HashFor;
use availability_store::Store as AvailabilityStore;
......@@ -140,13 +139,10 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBackend<Block>,
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
BlockBuilderApi<Block> +
BabeApi<Block> +
ApiExt<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
N::TableRouter: Send + 'static + Sync,
N::BuildTableRouter: Send + Unpin + 'static,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
SC: SelectChain<Block> + 'static,
......@@ -171,10 +167,10 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
let mut parachain_validation = ParachainValidationInstances {
client: self.client.clone(),
network: self.network,
collators: self.collators,
spawner: self.spawner,
availability_store: self.availability_store,
live_instances: HashMap::new(),
collation_fetch: DefaultCollationFetch(self.collators),
};
let client = self.client;
......@@ -236,6 +232,57 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
}
}
/// Abstraction over `collation_fetch`.
pub(crate) trait CollationFetch {
/// Error type used by `collation_fetch`.
type Error: std::fmt::Debug;
/// Fetch a collation for the given `parachain`.
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static;
}
#[derive(Clone)]
struct DefaultCollationFetch<C>(C);
impl<C> CollationFetch for DefaultCollationFetch<C>
where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
{
type Error = C::Error;
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
{
crate::collation::collation_fetch(
parachain,
relay_parent,
self.0,
client,
max_block_data_size,
n_validators,
).boxed()
}
}
// finds the first key we are capable of signing with out of the given set of validators,
// if any.
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
......@@ -248,13 +295,11 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc
}
/// Constructs parachain-agreement instances.
pub(crate) struct ParachainValidationInstances<C, N, P, SP> {
pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
/// The client instance.
client: Arc<P>,
/// The backing network handle.
network: N,
/// Parachain collators.
collators: C,
/// handle to spawner
spawner: SP,
/// Store for extrinsic data.
......@@ -262,18 +307,20 @@ pub(crate) struct ParachainValidationInstances<C, N, P, SP> {
/// Live agreements. Maps relay chain parent hashes to attestation
/// instances.
live_instances: HashMap<Hash, ValidationInstanceHandle>,
/// Used to fetch a collation.
collation_fetch: CF,
}
impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
C: Collators + Send + Unpin + 'static + Sync,
impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
N: Network,
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
N::Error: 'static,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N::TableRouter: Send + 'static + Sync,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
N::BuildTableRouter: Unpin + Send + 'static,
SP: Spawn + Send + 'static,
CF: CollationFetch + Clone + Send + Sync + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
{
......@@ -361,13 +408,41 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
max_block_data_size,
));
let router = self.network.build_table_router(
let build_router = self.network.build_table_router(
table.clone(),
&validators,
);
if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) {
self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index);
let availability_store = self.availability_store.clone();
let client = self.client.clone();
let collation_fetch = self.collation_fetch.clone();
let res = self.spawner.spawn(async move {
// It is important that we build the router as it launches tasks internally
// that are required to receive gossip messages.
let router = match build_router.await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to build router: {:?}", e);
return
}
};
if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
let n_validators = validators.len();
launch_work(
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
availability_store,
router,
n_validators,
index,
).await;
}
});
if let Err(e) = res {
error!(target: "validation", "Failed to create router and launch work: {:?}", e);
}
let tracker = ValidationInstanceHandle {
......@@ -384,98 +459,305 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
fn retain<F: FnMut(&Hash) -> bool>(&mut self, mut pred: F) {
self.live_instances.retain(|k, _| pred(k))
}
}
// launch parachain work asynchronously.
fn launch_work(
&self,
relay_parent: Hash,
validation_para: ParaId,
build_router: N::BuildTableRouter,
max_block_data_size: Option<u64>,
n_validators: usize,
local_id: ValidatorIndex,
) {
let (collators, client) = (self.collators.clone(), self.client.clone());
let availability_store = self.availability_store.clone();
// launch parachain work asynchronously.
async fn launch_work<CFF, E>(
collation_fetch: impl FnOnce() -> CFF,
availability_store: AvailabilityStore,
router: impl TableRouter,
n_validators: usize,
local_id: ValidatorIndex,
) where
E: std::fmt::Debug,
CFF: Future<Output = Result<(CollationInfo, FullOutput), E>> + Send,
{
// fetch a local collation from connected collators.
let (collation_info, full_output) = match collation_fetch().await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
return
}
};
let crate::pipeline::FullOutput {
commitments,
erasure_chunks,
available_data,
..
} = full_output;
let receipt = collation_info.into_receipt(commitments);
let pov_block = available_data.pov_block.clone();
if let Err(e) = availability_store.make_available(
receipt.hash(),
available_data,
).await {
warn!(
target: "validation",
"Failed to make parachain block data available: {}",
e,
);
}
if let Err(e) = availability_store.clone().add_erasure_chunks(
receipt.clone(),
n_validators as _,
erasure_chunks.clone(),
).await {
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
}
if let Err(e) = router.local_collation(
receipt,
pov_block,
(local_id, &erasure_chunks),
).await {
warn!(target: "validation", "Failed to send local collation: {:?}", e);
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{executor::{ThreadPool, self}, future::ready, channel::mpsc};
use availability_store::ErasureNetworking;
use polkadot_primitives::parachain::{
PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex,
CollationInfo, DutyRoster, GlobalValidationSchedule, LocalValidationData,
Retriable, CollatorId, BlockData, Chain, AvailableData, SigningContext,
};
use runtime_primitives::traits::Block as BlockT;
use std::pin::Pin;
use sp_keyring::sr25519::Keyring;
/// Events fired while running mock implementations to follow execution.
enum Events {
BuildTableRouter,
CollationFetch,
LocalCollation,
}
#[derive(Clone)]
struct MockNetwork(mpsc::UnboundedSender<Events>);
impl Network for MockNetwork {
type Error = String;
type TableRouter = MockTableRouter;
type BuildTableRouter = Pin<Box<dyn Future<Output = Result<MockTableRouter, String>> + Send>>;
fn build_table_router(
&self,
_: Arc<SharedTable>,
_: &[ValidatorId],
) -> Self::BuildTableRouter {
let event_sender = self.0.clone();
async move {
event_sender.unbounded_send(Events::BuildTableRouter).expect("Send `BuildTableRouter`");
Ok(MockTableRouter(event_sender))
}.boxed()
}
}
#[derive(Clone)]
struct MockTableRouter(mpsc::UnboundedSender<Events>);
impl TableRouter for MockTableRouter {
type Error = String;
type SendLocalCollation = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
type FetchValidationProof = Box<dyn Future<Output = Result<PoVBlock, String>> + Unpin>;
fn local_collation(
&self,
_: AbridgedCandidateReceipt,
_: PoVBlock,
_: (ValidatorIndex, &[ErasureChunk]),
) -> Self::SendLocalCollation {
let sender = self.0.clone();
async move {
sender.unbounded_send(Events::LocalCollation).expect("Send `LocalCollation`");
Ok(())
}.boxed()
}
fn fetch_pov_block(&self, _: &AbridgedCandidateReceipt) -> Self::FetchValidationProof {
unimplemented!("Not required in tests")
}
}
let with_router = move |router: N::TableRouter| {
// fetch a local collation from connected collators.
let collation_work = crate::collation::collation_fetch(
validation_para,
#[derive(Clone)]
struct MockErasureNetworking;
impl ErasureNetworking for MockErasureNetworking {
type Error = String;
fn fetch_erasure_chunk(
&self,
_: &Hash,
_: u32,
) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>> {
ready(Err("Not required in tests".to_string())).boxed()
}
fn distribute_erasure_chunk(&self, _: Hash, _: ErasureChunk) {
unimplemented!("Not required in tests")
}
}
#[derive(Clone)]
struct MockCollationFetch(mpsc::UnboundedSender<Events>);
impl CollationFetch for MockCollationFetch {
type Error = ();
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
_: Arc<P>,
_: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), ()>> + Send>> {
let info = CollationInfo {
parachain_index: parachain,
relay_parent,
collators,
client.clone(),
max_block_data_size,
collator: Default::default(),
signature: Default::default(),
head_data: Default::default(),
pov_block_hash: Default::default(),
};
let available_data = AvailableData {
pov_block: PoVBlock { block_data: BlockData(Vec::new()) },
omitted_validation: Default::default(),
};
let full_output = FullOutput {
available_data,
commitments: Default::default(),
erasure_chunks: Default::default(),
n_validators,
);
};
collation_work.then(move |result| match result {
Ok((collation_info, full_output)) => {
let crate::pipeline::FullOutput {
commitments,
erasure_chunks,
available_data,
..
} = full_output;
let receipt = collation_info.into_receipt(commitments);
// Apparently the `async move` block is the only way to convince
// the compiler that we are not moving values out of borrowed context.
let av_clone = availability_store.clone();
let receipt_clone = receipt.clone();
let erasure_chunks_clone = erasure_chunks.clone();
let pov_block = available_data.pov_block.clone();
let res = async move {
if let Err(e) = av_clone.make_available(
receipt_clone.hash(),
available_data,
).await {
warn!(
target: "validation",
"Failed to make parachain block data available: {}",
e,
);
}
if let Err(e) = av_clone.clone().add_erasure_chunks(
receipt_clone,
n_validators as _,
erasure_chunks_clone,
).await {
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
}
}
.unit_error()
.then(move |_| {
router.local_collation(
receipt,
pov_block,
(local_id, &erasure_chunks),
).map_err(|e| warn!(target: "validation", "Failed to send local collation: {:?}", e))
});
res.boxed()
}
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
Box::pin(ready(Ok(())))
}
})
};
let sender = self.0;
let router_work = build_router
.map_ok(with_router)
.map_err(|e| {
warn!(target: "validation" , "Failed to build table router: {:?}", e);
})
.and_then(|r| r)
.map(|_| ());
async move {
sender.unbounded_send(Events::CollationFetch).expect("`CollationFetch` event send");
Ok((info, full_output))
}.boxed()
}
}
#[derive(Clone)]
struct MockRuntimeApi {
validators: Vec<ValidatorId>,
duty_roster: DutyRoster,
}
// spawn onto thread pool.
if self.spawner.spawn(router_work).is_err() {
error!("Failed to spawn router work task");
impl ProvideRuntimeApi<Block> for MockRuntimeApi {
type Api = Self;
fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> {
self.clone().into()
}
}
sp_api::mock_impl_runtime_apis! {
impl ParachainHost<Block> for MockRuntimeApi {
type Error = sp_blockchain::Error;
fn validators(&self) -> Vec<ValidatorId> { self.validators.clone() }
fn duty_roster(&self) -> DutyRoster { self.duty_roster.clone() }
fn active_parachains() -> Vec<(ParaId, Option<(CollatorId, Retriable)>)> { vec![(ParaId::from(1), None)] }
fn global_validation_schedule() -> GlobalValidationSchedule { Default::default() }
fn local_validation_data(_: ParaId) -> Option<LocalValidationData> { None }
fn parachain_code(_: ParaId) -> Option<Vec<u8>> { None }
fn get_heads(_: Vec<<Block as BlockT>::Extrinsic>) -> Option<Vec<AbridgedCandidateReceipt>> {
None
}
fn signing_context() -> SigningContext {
Default::default()
}
}
}
#[test]
fn launch_work_is_executed_properly() {
let executor = ThreadPool::new().unwrap();
let keystore = keystore::Store::new_in_memory();
// Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator.
keystore.write().insert_ephemeral_from_seed::<ValidatorPair>(&Keyring::Bob.to_seed())
.expect("Insert key into keystore");
let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())];
let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())];
let duty_roster = DutyRoster { validator_duty };
let (events_sender, events) = mpsc::unbounded();
let mut parachain_validation = ParachainValidationInstances {
client: Arc::new(MockRuntimeApi { validators, duty_roster }),
network: MockNetwork(events_sender.clone()),
collation_fetch: MockCollationFetch(events_sender.clone()),
spawner: executor.clone(),
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),