Unverified Commit e2dc9371 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Asyncify launch_work a bit more (#777)

* Asyncify launch_work a bit more

* An error message misword

* A bit more async in collator
parent e56bc780
Pipeline #75308 passed with stages
in 20 minutes and 5 seconds
......@@ -363,13 +363,15 @@ fn run_collator_node<S, E, P, Extrinsic>(
};
let inner_exit = exit.clone();
let work = client.import_notification_stream()
.for_each(move |notification| {
let work = async move {
let mut notification_stream = client.import_notification_stream();
while let Some(notification) = notification_stream.next().await {
macro_rules! try_fr {
($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return future::Either::Left(future::err(Error::Polkadot(
Err(e) => return (future::err(Error::Polkadot(
format!("{:?}", e)
))),
}
......@@ -386,11 +388,11 @@ fn run_collator_node<S, E, P, Extrinsic>(
let validation_network = validation_network.clone();
let inner_exit_2 = inner_exit.clone();
let work = future::lazy(move |_| {
let work = future::lazy(move |_| async move {
let api = client.runtime_api();
let status = match try_fr!(api.parachain_status(&id, para_id)) {
Some(status) => status,
None => return future::Either::Left(future::ok(())),
None => return future::ok(()),
};
let validators = try_fr!(api.validators(&id));
......@@ -407,14 +409,14 @@ fn run_collator_node<S, E, P, Extrinsic>(
validators,
};
let collation_work = collate(
if let Ok((collation, outgoing)) = collate(
relay_parent,
para_id,
status,
context,
parachain_context,
key,
).map_ok(move |(collation, outgoing)| {
).await {
network.with_spec(move |spec, ctx| {
let res = spec.add_local_collation(
ctx,
......@@ -426,10 +428,9 @@ fn run_collator_node<S, E, P, Extrinsic>(
let exit = inner_exit_2.clone();
tokio::spawn(future::select(res.boxed(), exit).map(drop).map(|_| Ok(())).compat());
})
});
future::Either::Right(collation_work)
});
}
future::ok(())
});
let deadlined = future::select(
......@@ -450,8 +451,8 @@ fn run_collator_node<S, E, P, Extrinsic>(
).map(drop);
tokio::spawn(future.map(|_| Ok(())).compat());
future::ready(())
});
}
}.boxed();
service.spawn_essential_task(work);
......
......@@ -34,7 +34,7 @@ use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use futures::prelude::*;
use futures::{future::{ready, select}, task::{Spawn, SpawnExt}};
use futures::{future::select, task::{Spawn, SpawnExt}};
use polkadot_primitives::{Block, Hash, BlockId};
use polkadot_primitives::parachain::{
Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair,
......@@ -384,18 +384,18 @@ impl<C, N, P> ParachainValidationInstances<C, N, P> where
let (collators, client) = (self.collators.clone(), self.client.clone());
let availability_store = self.availability_store.clone();
let with_router = move |router: N::TableRouter| {
let with_router = move |router: N::TableRouter| async move {
// fetch a local collation from connected collators.
let collation_work = crate::collation::collation_fetch(
match crate::collation::collation_fetch(
validation_para,
relay_parent,
collators,
client.clone(),
max_block_data_size,
);
).await {
Ok(collation_work) => {
let (collation, outgoing_targeted, fees_charged) = collation_work;
collation_work.map(move |result| match result {
Ok((collation, outgoing_targeted, fees_charged)) => {
match crate::collation::produce_receipt_and_chunks(
authorities_num,
&collation.pov,
......@@ -410,41 +410,33 @@ impl<C, N, P> ParachainValidationInstances<C, N, P> where
let chunks_clone = chunks.clone();
let receipt_clone = receipt.clone();
let res = async move {
if let Err(e) = av_clone.clone().add_erasure_chunks(
relay_parent.clone(),
receipt_clone,
chunks_clone,
).await {
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
}
}
.unit_error()
.boxed()
.then(move |_| {
if let Err(e) = av_clone.clone().add_erasure_chunks(
relay_parent.clone(),
receipt_clone,
chunks_clone,
).await {
warn!(
target: "validation",
"Failed to add erasure chunks: {}", e
);
} else {
router.local_collation(
collation,
receipt,
outgoing_targeted,
(local_id, &chunks),
);
ready(())
});
Some(res)
}
}
Err(e) => {
warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
None
}
}
};
}
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
None
warn!(target: "validation", "Failed to fetch a candidate: {:?}", e);
}
})
}
};
let router = build_router
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment