Unverified Commit 96f1a994 authored by Ashley's avatar Ashley
Browse files

Migrate validation to std futures

parent aaf5e55f
......@@ -5,8 +5,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures01 = { package = "futures", version = "0.1.17" }
futures = { version = "0.3.1", features = ["compat"] }
futures = "0.3.1"
futures-timer = "2.0"
parking_lot = "0.9.0"
......
......@@ -30,7 +30,6 @@ use sp_blockchain::{HeaderBackend, Result as ClientResult};
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use availability_store::Store as AvailabilityStore;
use futures01::prelude::*;
use futures::{StreamExt, FutureExt, Future, future::{ready, select}, task::{Spawn, SpawnExt}};
use polkadot_primitives::{Block, BlockId};
use polkadot_primitives::parachain::{CandidateReceipt, ParachainHost};
......@@ -118,8 +117,8 @@ pub(crate) fn start<C, N, P, SC>(
max_block_data_size: Option<u64>,
) -> ServiceHandle
where
C: Collators + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBody<Block>,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
......@@ -128,7 +127,7 @@ pub(crate) fn start<C, N, P, SC>(
ApiExt<Block, Error = sp_blockchain::Error>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
SC: SelectChain<Block> + 'static,
{
const TIMER_INTERVAL: Duration = Duration::from_secs(30);
......
......@@ -28,8 +28,9 @@ use polkadot_primitives::{Block, Hash, BlockId, Balance, parachain::{
use runtime_primitives::traits::ProvideRuntimeApi;
use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, MessageRef, UpwardMessageRef};
use trie::TrieConfiguration;
use futures01::prelude::*;
use log::debug;
use std::task::{Poll, Context};
use std::pin::Pin;
/// Encapsulates connections to collators and allows collation on any parachain.
///
......@@ -37,8 +38,9 @@ use log::debug;
pub trait Collators: Clone {
/// Errors when producing collations.
type Error: std::fmt::Debug;
/// A full collation.
type Collation: IntoFuture<Item=Collation,Error=Self::Error>;
type Collation: futures::Future<Output=Result<Collation,Self::Error>>;
/// Collate on a specific parachain, building on a given relay chain parent hash.
///
......@@ -61,7 +63,7 @@ pub struct CollationFetch<C: Collators, P> {
relay_parent_hash: Hash,
relay_parent: BlockId,
collators: C,
live_fetch: Option<<C::Collation as IntoFuture>::Future>,
live_fetch: Option<C::Collation>,
client: Arc<P>,
max_block_data_size: Option<u64>,
}
......@@ -97,41 +99,50 @@ impl<C: Collators, P> CollationFetch<C, P> {
}
}
impl<C: Collators, P: ProvideRuntimeApi> Future for CollationFetch<C, P>
where P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
impl<C, P> futures::Future for CollationFetch<C, P>
where
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
C: Collators + Unpin,
P: ProvideRuntimeApi,
<C as Collators>::Collation: Unpin,
{
type Item = (Collation, OutgoingMessages);
type Error = C::Error;
type Output = Result<(Collation, OutgoingMessages),C::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
fn poll(&mut self) -> Poll<(Collation, OutgoingMessages), C::Error> {
loop {
let collation = {
let parachain = self.parachain.clone();
let (r, c) = (self.relay_parent_hash, &self.collators);
let poll = self.live_fetch
.get_or_insert_with(move || c.collate(parachain, r).into_future())
.poll();
let parachain = this.parachain.clone();
let (r, c) = (this.relay_parent_hash, &this.collators);
futures01::try_ready!(poll)
let future = this.live_fetch
.get_or_insert_with(move || c.collate(parachain, r));
match Pin::new(future).poll(cx) {
Poll::Ready(Ok(c)) => c,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending
}
};
let res = validate_collation(
&*self.client,
&self.relay_parent,
&*this.client,
&this.relay_parent,
&collation,
self.max_block_data_size,
this.max_block_data_size,
);
match res {
Ok(e) => {
return Ok(Async::Ready((collation, e)))
return Poll::Ready(Ok((collation, e)))
}
Err(e) => {
debug!("Failed to validate parachain due to API error: {}", e);
// just continue if we got a bad collation or failed to validate
self.live_fetch = None;
self.collators.note_bad_collator(collation.receipt.collator)
this.live_fetch = None;
this.collators.note_bad_collator(collation.receipt.collator)
}
}
}
......
......@@ -46,9 +46,6 @@ pub enum Error {
Timer(std::io::Error),
#[display(fmt = "Failed to compute deadline of now + {:?}", _0)]
DeadlineComputeFailure(std::time::Duration),
/// Unable to dispatch agreement future
#[display(fmt = "Unable to dispatch agreement future: {:?}", _0)]
Executor(futures01::future::ExecuteErrorKind),
Join(tokio::task::JoinError)
}
......
......@@ -58,10 +58,9 @@ use futures_timer::Delay;
use txpool_api::{TransactionPool, InPoolTransaction};
use attestation_service::ServiceHandle;
use futures01::prelude::*;
use futures::{
future::{self, Either, select}, FutureExt, StreamExt, compat::Future01CompatExt, Stream,
stream::unfold, task::{Spawn, SpawnExt}
future::{self, Either, select}, FutureExt, StreamExt, Stream,
stream::unfold, task::{Spawn, SpawnExt},
};
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
......@@ -111,7 +110,7 @@ pub trait TableRouter: Clone {
/// Errors when fetching data from the network.
type Error: std::fmt::Debug;
/// Future that resolves when candidate data is fetched.
type FetchValidationProof: IntoFuture<Item=PoVBlock,Error=Self::Error>;
type FetchValidationProof: futures::Future<Output=Result<PoVBlock, Self::Error>>;
/// Call with local candidate data. This will make the data available on the network,
/// and sign, import, and broadcast a statement about the candidate.
......@@ -123,16 +122,13 @@ pub trait TableRouter: Clone {
/// A long-lived network which can create parachain statement and BFT message routing processes on demand.
pub trait Network {
/// The error type of asynchronously building the table router.
type Error: std::fmt::Debug;
/// The table router type. This should handle importing of any statements,
/// routing statements to peers, and driving completion of any `StatementProducers`.
type TableRouter: TableRouter;
/// The future used for asynchronously building the table router.
/// This should not fail.
type BuildTableRouter: IntoFuture<Item=Self::TableRouter,Error=Self::Error>;
type BuildTableRouter: futures::Future<Output=Self::TableRouter>;
/// Instantiate a table router using the given shared table.
/// Also pass through any outgoing messages to be broadcast to peers.
......@@ -263,13 +259,13 @@ struct ParachainValidation<C, N, P> {
}
impl<C, N, P> ParachainValidation<C, N, P> where
C: Collators + Send + 'static,
C: Collators + Send + Unpin + 'static,
N: Network,
P: ProvideRuntimeApi + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
<C::Collation as IntoFuture>::Future: Send + 'static,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
N::BuildTableRouter: Unpin + Send + 'static,
{
/// Get an attestation table for given parent hash.
///
......@@ -401,23 +397,16 @@ impl<C, N, P> ParachainValidation<C, N, P> where
e,
),
}
Ok(())
futures::future::ready(())
}
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
Ok(())
futures::future::ready(())
}
})
};
let router = build_router
.into_future()
.map_err(|e| {
warn!(target: "validation" , "Failed to build table router: {:?}", e);
})
.and_then(with_router)
.compat();
let router = build_router.map(with_router);
let cancellable_work = select(exit, router).map(|_| ());
......@@ -447,8 +436,8 @@ pub struct ProposerFactory<C, N, P, SC, TxPool: TransactionPool> {
}
impl<C, N, P, SC, TxPool> ProposerFactory<C, N, P, SC, TxPool> where
C: Collators + Send + Sync + 'static,
<C::Collation as IntoFuture>::Future: Send + 'static,
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents<Block> + BlockBody<Block>,
P: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
......@@ -457,7 +446,7 @@ impl<C, N, P, SC, TxPool> ProposerFactory<C, N, P, SC, TxPool> where
ApiExt<Block, Error = sp_blockchain::Error>,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
TxPool: TransactionPool,
SC: SelectChain<Block> + 'static,
{
......@@ -506,7 +495,7 @@ impl<C, N, P, SC, TxPool> ProposerFactory<C, N, P, SC, TxPool> where
}
impl<C, N, P, SC, TxPool> consensus::Environment<Block> for ProposerFactory<C, N, P, SC, TxPool> where
C: Collators + Send + 'static,
C: Collators + Send + Unpin + 'static,
N: Network,
TxPool: TransactionPool<Block=Block> + 'static,
P: ProvideRuntimeApi + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
......@@ -514,9 +503,9 @@ impl<C, N, P, SC, TxPool> consensus::Environment<Block> for ProposerFactory<C, N
BlockBuilderApi<Block> +
BabeApi<Block> +
ApiExt<Block, Error = sp_blockchain::Error>,
<C::Collation as IntoFuture>::Future: Send + 'static,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
<N::BuildTableRouter as IntoFuture>::Future: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
SC: SelectChain<Block>,
{
type Proposer = Proposer<P, TxPool>;
......
......@@ -17,9 +17,10 @@
//! Implements a future which resolves when all of the candidates referenced are includable.
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures01::prelude::*;
use futures01::sync::oneshot;
use futures::channel::oneshot;
use polkadot_primitives::Hash;
......@@ -94,12 +95,11 @@ impl IncludabilitySender {
/// Future that resolves when all the candidates within are includable.
pub struct Includable(oneshot::Receiver<()>);
impl Future for Includable {
type Item = ();
type Error = oneshot::Canceled;
impl futures::Future for Includable {
type Output = Result<(), oneshot::Canceled>;
fn poll(&mut self) -> Poll<(), oneshot::Canceled> {
self.0.poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut Pin::into_inner(self).0).poll(cx)
}
}
......@@ -132,6 +132,6 @@ mod tests {
sender.update_candidate(hash1, true);
assert!(sender.is_complete());
recv.wait().unwrap();
futures::executor::block_on(recv).unwrap();
}
}
......@@ -19,6 +19,8 @@
use std::collections::hash_map::{HashMap, Entry};
use std::sync::Arc;
use std::pin::Pin;
use std::task::{Poll, Context};
use availability_store::{Data, Store as AvailabilityStore};
use table::{self, Table, Context as TableContextTrait};
......@@ -29,7 +31,6 @@ use polkadot_primitives::parachain::{
};
use parking_lot::Mutex;
use futures01::prelude::*;
use log::{warn, debug};
use bitvec::bitvec;
......@@ -139,7 +140,7 @@ impl SharedTableInner {
statement: table::SignedStatement,
max_block_data_size: Option<u64>,
) -> Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
R::FetchValidationProof
>> {
let summary = self.table.import_statement(context, statement)?;
self.update_trackers(&summary.candidate, context);
......@@ -173,7 +174,7 @@ impl SharedTableInner {
None
}
Some(candidate) => {
let fetch = router.fetch_pov_block(candidate).into_future();
let fetch = router.fetch_pov_block(candidate);
Some(Work {
candidate_receipt: candidate.clone(),
......@@ -266,7 +267,7 @@ pub struct ParachainWork<Fetch> {
max_block_data_size: Option<u64>,
}
impl<Fetch: Future> ParachainWork<Fetch> {
impl<Fetch: futures::Future> ParachainWork<Fetch> {
/// Prime the parachain work with an API reference for extracting
/// chain information.
pub fn prime<P: ProvideRuntimeApi>(self, api: Arc<P>)
......@@ -318,22 +319,27 @@ pub struct PrimedParachainWork<Fetch, F> {
validate: F,
}
impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
impl<Fetch, F, Err> futures::Future for PrimedParachainWork<Fetch, F>
where
Fetch: Future<Item=PoVBlock,Error=Err>,
F: FnMut(&BlockId, &Collation) -> Result<OutgoingMessages, ()>,
Fetch: futures::Future<Output=Result<PoVBlock,Err>> + Unpin,
F: FnMut(&BlockId, &Collation) -> Result<OutgoingMessages, ()> + Unpin,
Err: From<::std::io::Error>,
{
type Item = Validated;
type Error = Err;
type Output = Result<Validated, Err>;
fn poll(&mut self) -> Poll<Validated, Err> {
let work = &mut self.inner.work;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
let work = &mut this.inner.work;
let candidate = &work.candidate_receipt;
let pov_block = futures01::try_ready!(work.fetch.poll());
let validation_res = (self.validate)(
&BlockId::hash(self.inner.relay_parent),
let pov_block = match Pin::new(&mut work.fetch).poll(cx) {
Poll::Ready(Ok(block)) => block,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
};
let validation_res = (this.validate)(
&BlockId::hash(this.inner.relay_parent),
&Collation { pov: pov_block.clone(), receipt: candidate.clone() },
);
......@@ -352,8 +358,8 @@ impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
.map(|(_target, root, data)| (root, data))
.collect();
self.inner.availability_store.make_available(Data {
relay_parent: self.inner.relay_parent,
this.inner.availability_store.make_available(Data {
relay_parent: this.inner.relay_parent,
parachain_id: work.candidate_receipt.parachain_index,
candidate_hash,
block_data: pov_block.block_data.clone(),
......@@ -367,7 +373,7 @@ impl<Fetch, F, Err> Future for PrimedParachainWork<Fetch, F>
}
};
Ok(Async::Ready(Validated {
Poll::Ready(Ok(Validated {
statement: validity_statement,
result,
}))
......@@ -440,7 +446,7 @@ impl SharedTable {
router: &R,
statement: table::SignedStatement,
) -> Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
R::FetchValidationProof,
>> {
self.inner.lock().import_remote_statement(&*self.context, router, statement, self.max_block_data_size)
}
......@@ -456,7 +462,7 @@ impl SharedTable {
R: TableRouter,
I: IntoIterator<Item=table::SignedStatement>,
U: ::std::iter::FromIterator<Option<ParachainWork<
<R::FetchValidationProof as IntoFuture>::Future,
R::FetchValidationProof,
>>>,
{
let mut inner = self.inner.lock();
......@@ -574,7 +580,8 @@ mod tests {
use substrate_keyring::Sr25519Keyring;
use primitives::crypto::UncheckedInto;
use polkadot_primitives::parachain::{BlockData, ConsolidatedIngress};
use futures01::future;
use futures::future;
use futures::executor::block_on;
fn pov_block_with_data(data: Vec<u8>) -> PoVBlock {
PoVBlock {
......@@ -587,7 +594,7 @@ mod tests {
struct DummyRouter;
impl TableRouter for DummyRouter {
type Error = ::std::io::Error;
type FetchValidationProof = future::FutureResult<PoVBlock,Self::Error>;
type FetchValidationProof = future::Ready<Result<PoVBlock,Self::Error>>;
fn local_collation(&self, _collation: Collation, _outgoing: OutgoingMessages) {
}
......@@ -727,7 +734,7 @@ mod tests {
let hash = candidate.hash();
let producer: ParachainWork<future::FutureResult<_, ::std::io::Error>> = ParachainWork {
let producer: ParachainWork<future::Ready<Result<_, ::std::io::Error>>> = ParachainWork {
work: Work {
candidate_receipt: candidate,
fetch: future::ok(pov_block.clone()),
......@@ -737,8 +744,7 @@ mod tests {
max_block_data_size: None,
};
let validated = producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() }))
.wait()
let validated = block_on(producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() })))
.unwrap();
assert_eq!(validated.pov_block(), &pov_block);
......@@ -778,8 +784,7 @@ mod tests {
max_block_data_size: None,
};
let validated = producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() }))
.wait()
let validated = block_on(producer.prime_with(|_, _| Ok(OutgoingMessages { outgoing_messages: Vec::new() })))
.unwrap();
assert_eq!(validated.pov_block(), &pov_block);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment