Commit b989c303 authored by Bastian Köcher's avatar Bastian Köcher
Browse files

Switch to closures

parent ade2ecd1
Pipeline #85741 canceled with stage
in 56 seconds
......@@ -26,8 +26,9 @@
//!
//! These attestation sessions are kept live until they are periodically garbage-collected.
use std::{time::{Duration, Instant}, sync::Arc, pin::Pin};
use std::collections::HashMap;
use std::{
time::{Duration, Instant}, sync::Arc, marker::PhantomData, collections::HashMap,
};
use crate::pipeline::FullOutput;
use sc_client_api::{BlockchainEvents, BlockBackend};
......@@ -137,6 +138,7 @@ pub struct ServiceBuilder<C, N, P, SC, SP> {
impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
C: Collators + Send + Sync + Unpin + 'static,
C::Error: Send,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBackend<Block>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
......@@ -164,13 +166,30 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
NotifyImport(sc_client_api::BlockImportNotification<Block>),
}
let collators = self.collators.clone();
let mut parachain_validation = ParachainValidationInstances {
client: self.client.clone(),
network: self.network,
spawner: self.spawner,
availability_store: self.availability_store,
live_instances: HashMap::new(),
collation_fetch: DefaultCollationFetch(self.collators),
collation_fetch_builder: move || {
let collators = collators.clone();
|para_id, relay_parent, client, max_block_data_size, n_validators| {
crate::collation::collation_fetch(
para_id,
relay_parent,
collators,
client,
max_block_data_size,
n_validators,
)
}
},
_phantom: PhantomData,
};
let client = self.client;
......@@ -232,57 +251,6 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
}
}
/// Abstraction over `collation_fetch`.
pub trait CollationFetch {
/// Error type used by `collation_fetch`.
type Error: std::fmt::Debug;
/// Fetch a collation for the given `parachain`.
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static;
}
#[derive(Clone)]
struct DefaultCollationFetch<C>(C);
impl<C> CollationFetch for DefaultCollationFetch<C>
where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
{
type Error = C::Error;
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
client: Arc<P>,
max_block_data_size: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), Self::Error>> + Send>>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
{
crate::collation::collation_fetch(
parachain,
relay_parent,
self.0,
client,
max_block_data_size,
n_validators,
).boxed()
}
}
// finds the first key we are capable of signing with out of the given set of validators,
// if any.
fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc<ValidatorPair>> {
......@@ -295,7 +263,7 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc
}
/// Constructs parachain-agreement instances.
pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
pub(crate) struct ParachainValidationInstances<N, P, SP, CFB, CF, CFF, Error> {
/// The client instance.
client: Arc<P>,
/// The backing network handle.
......@@ -308,10 +276,11 @@ pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
/// instances.
live_instances: HashMap<Hash, ValidationInstanceHandle>,
/// Used to fetch a collation.
collation_fetch: CF,
collation_fetch_builder: CFB,
_phantom: PhantomData<(CF, CFF, Error)>
}
impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
impl<N, P, SP, CFB, CF, CFF, Error> ParachainValidationInstances<N, P, SP, CFB, CF, CFF, Error> where
N: Network,
N::Error: 'static,
P: ProvideRuntimeApi<Block> + Send + Sync + 'static,
......@@ -320,7 +289,10 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
N::BuildTableRouter: Unpin + Send + 'static,
SP: Spawn + Send + 'static,
CF: CollationFetch + Clone + Send + 'static,
CFB: Fn() -> CF + Send,
CF: FnOnce(ParaId, Hash, Arc<P>, Option<u64>, usize) -> CFF + Send + 'static,
CFF: Future<Output = Result<(CollationInfo, FullOutput), Error>> + Send,
Error: std::fmt::Debug + Send,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<P, Block>: sp_api::StateBackend<HashFor<Block>>,
{
......@@ -336,7 +308,7 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
parent_hash: Hash,
keystore: &KeyStorePtr,
max_block_data_size: Option<u64>,
) -> Result<ValidationInstanceHandle, Error> {
) -> Result<ValidationInstanceHandle, crate::Error> {
use primitives::Pair;
if let Some(tracker) = self.live_instances.get(&parent_hash) {
......@@ -415,7 +387,7 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
let availability_store = self.availability_store.clone();
let client = self.client.clone();
let collation_fetch = self.collation_fetch.clone();
let collation_fetch = (self.collation_fetch_builder)();
let res = self.spawner.spawn(async move {
// It is important that we build the router as it launches tasks internally
......@@ -429,15 +401,13 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
};
if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
let n_validators = validators.len();
launch_work(
collation_fetch,
client,
|| collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
availability_store,
parent_hash,
id,
router,
max_block_data_size,
validators.len(),
n_validators,
index,
).await;
}
......@@ -464,28 +434,18 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
}
// launch parachain work asynchronously.
async fn launch_work<Client>(
collation_fetch: impl CollationFetch,
client: Arc<Client>,
async fn launch_work<CF, Error>(
collation_fetch: impl FnOnce() -> CF,
availability_store: AvailabilityStore,
relay_parent: Hash,
validation_para: ParaId,
router: impl TableRouter,
max_block_data_size: Option<u64>,
n_validators: usize,
local_id: ValidatorIndex,
) where
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
CF: Future<Output = Result<(CollationInfo, FullOutput), Error>> + Send,
Error: std::fmt::Debug,
{
// fetch a local collation from connected collators.
let (collation_info, full_output) = match collation_fetch.collation_fetch(
validation_para,
relay_parent,
client,
max_block_data_size,
n_validators,
).await {
let (collation_info, full_output) = match collation_fetch().await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
......@@ -621,48 +581,39 @@ mod tests {
}
}
#[derive(Clone)]
struct MockCollationFetch(mpsc::UnboundedSender<Events>);
impl CollationFetch for MockCollationFetch {
type Error = ();
fn collation_fetch<P>(
self,
parachain: ParaId,
relay_parent: Hash,
_: Arc<P>,
_: Option<u64>,
n_validators: usize,
) -> Pin<Box<dyn Future<Output = Result<(CollationInfo, FullOutput), ()>> + Send>> {
let info = CollationInfo {
parachain_index: parachain,
relay_parent,
collator: Default::default(),
signature: Default::default(),
head_data: Default::default(),
pov_block_hash: Default::default(),
};
let available_data = AvailableData {
pov_block: PoVBlock { block_data: BlockData(Vec::new()) },
omitted_validation: Default::default(),
};
fn make_collation_fetch<P>(
parachain: ParaId,
relay_parent: Hash,
_: Arc<P>,
_: Option<u64>,
n_validators: usize,
events_sender: mpsc::UnboundedSender<Events>,
) -> impl Future<Output = Result<(CollationInfo, FullOutput), ()>> + Send {
let info = CollationInfo {
parachain_index: parachain,
relay_parent,
collator: Default::default(),
signature: Default::default(),
head_data: Default::default(),
pov_block_hash: Default::default(),
};
let full_output = FullOutput {
available_data,
commitments: Default::default(),
erasure_chunks: Default::default(),
n_validators,
};
let available_data = AvailableData {
pov_block: PoVBlock { block_data: BlockData(Vec::new()) },
omitted_validation: Default::default(),
};
let sender = self.0;
let full_output = FullOutput {
available_data,
commitments: Default::default(),
erasure_chunks: Default::default(),
n_validators,
};
async move {
sender.unbounded_send(Events::CollationFetch).expect("`CollationFetch` event send");
async move {
events_sender.unbounded_send(Events::CollationFetch).expect("`CollationFetch` event send");
Ok((info, full_output))
}.boxed()
Ok((info, full_output))
}
}
......@@ -714,13 +665,21 @@ mod tests {
let (events_sender, events) = mpsc::unbounded();
let events_sender_clone = events_sender.clone();
let mut parachain_validation = ParachainValidationInstances {
client: Arc::new(MockRuntimeApi { validators, duty_roster }),
network: MockNetwork(events_sender.clone()),
collation_fetch: MockCollationFetch(events_sender.clone()),
collation_fetch_builder: move || {
let events_sender = events_sender_clone.clone();
|para_id, relay_parent, client, max_block_data_size, n_validators| {
make_collation_fetch(para_id, relay_parent, client, max_block_data_size, n_validators, events_sender)
}
},
spawner: executor.clone(),
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),
live_instances: HashMap::new(),
_phantom: PhantomData,
};
parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
......@@ -752,13 +711,21 @@ mod tests {
let (events_sender, events) = mpsc::unbounded();
let events_sender_clone = events_sender.clone();
let mut parachain_validation = ParachainValidationInstances {
client: Arc::new(MockRuntimeApi { validators, duty_roster }),
network: MockNetwork(events_sender.clone()),
collation_fetch: MockCollationFetch(events_sender.clone()),
collation_fetch_builder: move || {
let events_sender = events_sender_clone.clone();
|para_id, relay_parent, client, max_block_data_size, n_validators| {
make_collation_fetch(para_id, relay_parent, client, max_block_data_size, n_validators, events_sender)
}
},
spawner: executor.clone(),
availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking),
live_instances: HashMap::new(),
_phantom: PhantomData,
};
parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment