Unverified Commit 96f5dc51 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Revert async await to fix collation (#839)

* Revert e2dc9371

* Make it work
parent 90660e43
Pipeline #78718 passed with stages
in 20 minutes and 23 seconds
......@@ -364,7 +364,7 @@ fn run_collator_node<S, P, Extrinsic>(
($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return (future::err(Error::Polkadot(
Err(e) => return future::Either::Left(future::err(Error::Polkadot(
format!("{:?}", e)
))),
}
......@@ -380,11 +380,11 @@ fn run_collator_node<S, P, Extrinsic>(
let parachain_context = parachain_context.clone();
let validation_network = validation_network.clone();
let work = future::lazy(move |_| async move {
let work = future::lazy(move |_| {
let api = client.runtime_api();
let status = match try_fr!(api.parachain_status(&id, para_id)) {
Some(status) => status,
None => return future::ok(()),
None => return future::Either::Left(future::ok(())),
};
let validators = try_fr!(api.validators(&id));
......@@ -401,14 +401,14 @@ fn run_collator_node<S, P, Extrinsic>(
validators,
};
if let Ok((collation, outgoing)) = collate(
let collation_work = collate(
relay_parent,
para_id,
status,
context,
parachain_context,
key,
).await {
).map_ok(move |(collation, outgoing)| {
network.with_spec(move |spec, ctx| {
let res = spec.add_local_collation(
ctx,
......@@ -419,9 +419,10 @@ fn run_collator_node<S, P, Extrinsic>(
);
tokio::spawn(res.boxed());
});
}
future::ok(())
})
});
future::Either::Right(collation_work)
});
let deadlined = future::select(
......@@ -436,7 +437,7 @@ fn run_collator_node<S, P, Extrinsic>(
}
});
let future = silenced.map(drop);
let future = silenced.map(drop);
tokio::spawn(future);
}
......
......@@ -33,8 +33,7 @@ use sc_client_api::{BlockchainEvents, BlockBody};
use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use futures::prelude::*;
use futures::task::{Spawn, SpawnExt};
use futures::{future::ready, prelude::*, task::{Spawn, SpawnExt}};
use polkadot_primitives::{Block, Hash, BlockId};
use polkadot_primitives::parachain::{
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
......@@ -149,6 +148,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
SC: SelectChain<Block> + 'static,
SP: Spawn + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
......@@ -265,12 +265,13 @@ pub(crate) struct ParachainValidationInstances<C, N, P, SP> {
}
impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
C: Collators + Send + Unpin + 'static,
C: Collators + Send + Unpin + 'static + Sync,
N: Network,
P: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
<N::TableRouter as TableRouter>::SendLocalCollation: Send,
N::BuildTableRouter: Unpin + Send + 'static,
SP: Spawn + Send + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
......@@ -288,9 +289,7 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
parent_hash: Hash,
keystore: &KeyStorePtr,
max_block_data_size: Option<u64>,
)
-> Result<ValidationInstanceHandle, Error>
{
) -> Result<ValidationInstanceHandle, Error> {
use primitives::Pair;
if let Some(tracker) = self.live_instances.get(&parent_hash) {
......@@ -381,18 +380,18 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
let (collators, client) = (self.collators.clone(), self.client.clone());
let availability_store = self.availability_store.clone();
let with_router = move |router: N::TableRouter| async move {
let with_router = move |router: N::TableRouter| {
// fetch a local collation from connected collators.
match crate::collation::collation_fetch(
let collation_work = crate::collation::collation_fetch(
validation_para,
relay_parent,
collators,
client.clone(),
max_block_data_size,
).await {
Ok(collation_work) => {
let (collation, outgoing_targeted, parent_head, fees_charged) = collation_work;
);
collation_work.then(move |result| match result {
Ok((collation, outgoing_targeted, parent_head, fees_charged)) => {
match crate::collation::produce_receipt_and_chunks(
authorities_num,
parent_head,
......@@ -408,40 +407,38 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
let chunks_clone = chunks.clone();
let receipt_clone = receipt.clone();
if let Err(e) = av_clone.clone().add_erasure_chunks(
relay_parent.clone(),
receipt_clone,
chunks_clone,
).await {
warn!(
target: "validation",
"Failed to add erasure chunks: {}", e
);
} else {
let res = router.local_collation(
let res = async move {
if let Err(e) = av_clone.clone().add_erasure_chunks(
relay_parent.clone(),
receipt_clone,
chunks_clone,
).await {
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
}
}
.unit_error()
.then(move |_| {
router.local_collation(
collation,
receipt,
outgoing_targeted,
(local_id, &chunks),
).await;
).map_err(|e| warn!(target: "validation", "Failed to send local collation: {:?}", e))
});
if let Err(e) = res {
warn!(
target: "validation",
"Failed to notify network of local collation: {:?}", e
);
}
};
res.boxed()
}
Err(e) => {
warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
Box::pin(ready(Ok(())))
}
};
}
}
Err(e) => {
warn!(target: "validation", "Failed to fetch a candidate: {:?}", e);
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
Box::pin(ready(Ok(())))
}
}
})
};
let router_work = build_router
......@@ -449,6 +446,7 @@ impl<C, N, P, SP> ParachainValidationInstances<C, N, P, SP> where
.map_err(|e| {
warn!(target: "validation" , "Failed to build table router: {:?}", e);
})
.and_then(|r| r)
.map(|_| ());
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment