Unverified Commit b0535e68 authored by Ashley's avatar Ashley Committed by GitHub
Browse files

Rewrite some Future structs as async functions (#679)

* Squashed commit of the following:

commit e97a1715
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Tue Dec 10 15:06:28 2019 +0100

    Rewrite some functions as async

commit 970e4851
Merge: f98966ac 47828402
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Tue Dec 10 11:19:37 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-futures-update

commit f98966ac
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 23:40:20 2019 +0100

    Add async blocks back in

commit 7fa88af0
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 23:17:02 2019 +0100

    Revert "Asyncify network functions"

    This reverts commit f20ae654.

commit 82413550
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 19:09:55 2019 +0100

    Fix validation test again

commit 47e002b0
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 19:07:43 2019 +0100

    Switch favicon

commit 0c5c1409
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 18:54:10 2019 +0100

    Fix validation test

commit 8bb6a018
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 18:53:54 2019 +0100

    Nits

commit 33410f3a
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 18:43:09 2019 +0100

    Fix av store test

commit f0c517eb
Merge: 938f411a 60e72111
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 18:21:39 2019 +0100

    Merge branch 'ashley-futures-updates' into ashley-futures-update

commit 60e72111
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 18:19:40 2019 +0100

    Clean up browser validation worker error

commit f20ae654
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 18:16:40 2019 +0100

    Asyncify network functions

commit b22758d0
Merge: 2e8b05ed ca8d5c54
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 17:47:26 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-futures-updates

commit 2e8b05ed
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 17:45:52 2019 +0100

    Box pin changes

commit 08bfdf7f


Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 17:15:38 2019 +0100

    Update network/src/lib.rs
Co-Authored-By: Pierre Krieger's avatarPierre Krieger <pierre.krieger1708@gmail.com>

commit d8be456c


Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 17:15:32 2019 +0100

    Update network/src/lib.rs
Co-Authored-By: Pierre Krieger's avatarPierre Krieger <pierre.krieger1708@gmail.com>

commit ec736727


Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 17:14:36 2019 +0100

    Update availability-store/src/worker.rs
Co-Authored-By: Pierre Krieger's avatarPierre Krieger <pierre.krieger1708@gmail.com>

commit 938f411a
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 17:05:05 2019 +0100

    Revert "Revert removal of tokio_executor that causes tokio version mismatch panic"

    This reverts commit cfeb50c0.

commit f92f5804
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 15:47:35 2019 +0100

    Fix adder test parachain

commit cfeb50c0
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 15:31:36 2019 +0100

    Revert removal of tokio_executor that causes tokio version mismatch panic

commit 5bcb83a1
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 15:17:55 2019 +0100

    Fix typo

commit fc02b1dc
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 15:02:50 2019 +0100

    Fix collator

commit 6c4ff5b3
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 14:35:37 2019 +0100

    Small changes

commit e1338cb4
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 14:24:42 2019 +0100

    Fix network tests

commit 4e458f7a
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 12:25:26 2019 +0100

    Remove futures01 from availability-store

commit 5729f6cd
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 12:22:33 2019 +0100

    Fix validation tests

commit a8206125
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 12:01:48 2019 +0100

    Fix availability store tests

commit 112344fa
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 11:36:03 2019 +0100

    Update tokio version

commit d2de6d8b
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 11:33:25 2019 +0100

    Revert cli tokio version to avoid libp2p panic

commit 0c5f24e0
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 9 11:27:13 2019 +0100

    Switch to polkadot-master

commit 2e2311e3
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Fri Dec 6 15:07:21 2019 +0100

    Re-add release flag

commit 6adc1b61
Merge: 9767f832 5e9542c8
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Fri Dec 6 13:36:35 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit 9767f832
Merge: c528dc6d 84ece424
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Wed Dec 4 17:11:39 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit c528dc6d
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Wed Dec 4 17:07:00 2019 +0100

    Fix wasm build

commit da233a12
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Wed Dec 4 16:25:49 2019 +0100

    tidy

commit 832f8054
Merge: 4e1da888 78e828d8
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Wed Dec 4 15:56:56 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit 4e1da888
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Tue Dec 3 16:47:02 2019 +0100

    Temp switch back to substrate/master

commit af88a873
Merge: a03a980c abb51115
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 2 19:33:14 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit a03a980c
Merge: 31a88a93 f7d48261
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 2 13:52:37 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit 31a88a93
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 2 13:52:35 2019 +0100

    Tidy

commit 5b33b7a7
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 2 11:55:51 2019 +0100

    Add browser-demo

commit 868f6e51
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 2 10:51:57 2019 +0100

    Add initial browser file

commit e5e399c2
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Mon Dec 2 10:45:02 2019 +0100

    Add browser-demo

commit 408288b0
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Sun Dec 1 19:28:33 2019 +0100

    Get polkadot to compile via wasm!

commit 04ffe72e
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Sun Dec 1 19:28:16 2019 +0100

    Migrate service

commit 119f0829
Merge: 93fb6428 5422684f
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Sun Dec 1 17:43:49 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit 93fb6428
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Sun Dec 1 12:21:25 2019 +0100

    Switch branch

commit 0c4fe833
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Sat Nov 30 11:45:59 2019 +0100

    Tidy up validation

commit 73563253
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Sat Nov 30 11:39:09 2019 +0100

    Tidy up network

commit 1c9cf042
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Sat Nov 30 01:16:35 2019 +0100

    Final changes to validation

commit 322cca52
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Sat Nov 30 00:31:55 2019 +0100

    Migrate network to std futures

commit 96f1a994
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Fri Nov 29 23:31:04 2019 +0100

    Migrate validation to std futures

commit aaf5e55f
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Fri Nov 29 17:10:11 2019 +0100

    Switch to Spawn trait

commit 2ab282f5
Merge: cceb6b72 5598ed9b
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Fri Nov 29 16:31:24 2019 +0100

    Merge remote-tracking branch 'parity/master' into ashley-compile-to-wasm

commit cceb6b72
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Fri Nov 29 15:47:14 2019 +0100

    Make validation work on wasm!

commit b45a95cf
Merge: 3773d5c1 db7eaa6b
Author: Ashley <ashley.ruglys@gmail.com>
Date:   Fri Nov 29 13:57:23 2019 +0100

    Merge remote-tracking branch 'tomaka/wasm-start' into HEAD

commit db7eaa6b
Merge: 6f97dbb7 2ab32dac
Author: Pierre Krieger <pierre.krieger1708@gmail.com>
Date:   Thu Nov 28 13:58:15 2019 +0100

    Merge branch 'master' into wasm-start

commit 6f97dbb7
Author: Pierre Krieger <pierre.krieger1708@gmail.com>
Date:   Thu Nov 28 12:47:45 2019 +0100

    Use --manifest-path instead

commit 20104e98
Author: Pierre Krieger <pierre.krieger1708@gmail.com>
Date:   Thu Nov 28 10:44:51 2019 +0100

    Make availability-store compile for WASM

* Fix build

* Fix futures blocking panic in validators (again)

* Deindent
parent 3a457c54
Pipeline #72047 failed with stages
in 11 minutes and 6 seconds
...@@ -48,6 +48,7 @@ use std::collections::HashSet; ...@@ -48,6 +48,7 @@ use std::collections::HashSet;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use std::pin::Pin;
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn}; use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
use log::{warn, error}; use log::{warn, error};
...@@ -242,20 +243,26 @@ impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E, ...@@ -242,20 +243,26 @@ impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E,
SP: Spawn + Clone + Send + Sync SP: Spawn + Clone + Send + Sync
{ {
type Error = String; type Error = String;
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>; type FutureEgress = Pin<Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Send>>;
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress { fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
// TODO: https://github.com/paritytech/polkadot/issues/253 let network = self.network.clone();
// let parent_hash = self.parent_hash;
// Fetch ingress and accumulate all unrounted egress let authorities = self.validators.clone();
let _session = self.network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None, async move {
parent_hash: self.parent_hash, // TODO: https://github.com/paritytech/polkadot/issues/253
authorities: self.validators.clone(), //
}) // Fetch ingress and accumulate all unrounted egress
.map_err(|e| format!("unable to instantiate validation session: {:?}", e)); let _session = network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None,
Box::new(future::ok(ConsolidatedIngress(Vec::new()))) parent_hash,
authorities,
})
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));
Ok(ConsolidatedIngress(Vec::new()))
}.boxed()
} }
} }
...@@ -425,7 +432,7 @@ impl<P, E> Worker for CollationNode<P, E> where ...@@ -425,7 +432,7 @@ impl<P, E> Worker for CollationNode<P, E> where
); );
let exit = inner_exit_2.clone(); let exit = inner_exit_2.clone();
tokio::spawn(future::select(res, exit).map(drop)); tokio::spawn(future::select(res.boxed(), exit).map(drop));
}) })
}); });
......
...@@ -28,7 +28,6 @@ pub mod gossip; ...@@ -28,7 +28,6 @@ pub mod gossip;
use codec::{Decode, Encode}; use codec::{Decode, Encode};
use futures::channel::{oneshot, mpsc}; use futures::channel::{oneshot, mpsc};
use futures::prelude::*; use futures::prelude::*;
use futures::future::Either;
use polkadot_primitives::{Block, Hash, Header}; use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock, Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
...@@ -837,25 +836,6 @@ impl PolkadotProtocol { ...@@ -837,25 +836,6 @@ impl PolkadotProtocol {
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.info.parachain_index); relay_parent, collation.info.parachain_index);
let res = match self.availability_store {
Some(ref availability_store) => {
let availability_store_cloned = availability_store.clone();
let collation_cloned = collation.clone();
Either::Left((async move {
let _ = availability_store_cloned.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
}
)
.boxed()
)
}
None => Either::Right(futures::future::ready(())),
};
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
match self.validators.get(&primary) { match self.validators.get(&primary) {
Some(who) => { Some(who) => {
...@@ -871,7 +851,19 @@ impl PolkadotProtocol { ...@@ -871,7 +851,19 @@ impl PolkadotProtocol {
} }
} }
res let availability_store = self.availability_store.clone();
let collation_cloned = collation.clone();
async move {
if let Some(availability_store) = availability_store {
let _ = availability_store.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
}
}
} }
/// Give the network protocol a handle to an availability store, used for /// Give the network protocol a handle to an availability store, used for
......
...@@ -41,8 +41,9 @@ use log::{debug, trace}; ...@@ -41,8 +41,9 @@ use log::{debug, trace};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use std::pin::Pin;
use crate::validation::{self, LeafWorkDataFetcher, Executor}; use crate::validation::{LeafWorkDataFetcher, Executor};
use crate::NetworkService; use crate::NetworkService;
/// Compute the gossip topic for attestations on the given parent hash. /// Compute the gossip topic for attestations on the given parent hash.
...@@ -232,7 +233,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh ...@@ -232,7 +233,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
E: Future<Output=()> + Clone + Send + 'static, E: Future<Output=()> + Clone + Send + 'static,
{ {
type Error = io::Error; type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver; type FetchValidationProof = Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>>;
// We have fetched from a collator and here the receipt should have been already formed. // We have fetched from a collator and here the receipt should have been already formed.
fn local_collation( fn local_collation(
......
...@@ -41,7 +41,7 @@ use std::collections::HashMap; ...@@ -41,7 +41,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Poll, Context}; use std::task::{Poll, Context};
use futures::{prelude::*, channel::mpsc}; use futures::{prelude::*, channel::mpsc, future::{select, Either}};
use codec::Encode; use codec::Encode;
use super::{TestContext, TestChainContext}; use super::{TestContext, TestChainContext};
...@@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification { ...@@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification {
} }
} }
struct GossipRouter { async fn gossip_router(
incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>, mut incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>, mut incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>
outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)>, ) {
messages: Vec<(Hash, TopicNotification)>, let mut outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)> = Vec::new();
} let mut messages = Vec::new();
impl GossipRouter { loop {
fn add_message(&mut self, topic: Hash, message: TopicNotification) { match select(incoming_messages.next(), incoming_streams.next()).await {
self.outgoing.retain(|&(ref o_topic, ref sender)| { Either::Left((Some((topic, message)), _)) => {
o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok() outgoing.retain(|&(ref o_topic, ref sender)| {
}); o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
self.messages.push((topic, message)); });
} messages.push((topic, message));
},
fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<TopicNotification>) { Either::Right((Some((topic, sender)), _)) => {
for message in self.messages.iter() for message in messages.iter()
.filter(|&&(ref t, _)| t == &topic) .filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| clone_gossip(msg)) .map(|&(_, ref msg)| clone_gossip(msg))
{ {
if let Err(_) = sender.unbounded_send(message) { return } if let Err(_) = sender.unbounded_send(message) { return }
} }
self.outgoing.push((topic, sender)); outgoing.push((topic, sender));
} },
} Either::Left((None, _)) | Either::Right((None, _)) => panic!("ended early.")
impl Future for GossipRouter {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
loop {
match Pin::new(&mut this.incoming_messages).poll_next(cx) {
Poll::Ready(Some((topic, message))) => this.add_message(topic, message),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
}
loop {
match Pin::new(&mut this.incoming_streams).poll_next(cx) {
Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender),
Poll::Ready(None) => panic!("ended early."),
Poll::Pending => break,
}
} }
Poll::Pending
} }
} }
#[derive(Clone)] #[derive(Clone)]
struct GossipHandle { struct GossipHandle {
send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>, send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<TopicNotification>)>, send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
} }
fn make_gossip() -> (GossipRouter, GossipHandle) { fn make_gossip() -> (impl Future<Output = ()>, GossipHandle) {
let (message_tx, message_rx) = mpsc::unbounded(); let (message_tx, message_rx) = mpsc::unbounded();
let (listener_tx, listener_rx) = mpsc::unbounded(); let (listener_tx, listener_rx) = mpsc::unbounded();
( (
GossipRouter { gossip_router(message_rx, listener_rx),
incoming_messages: message_rx,
incoming_streams: listener_rx,
outgoing: Vec::new(),
messages: Vec::new(),
},
GossipHandle { send_message: message_tx, send_listener: listener_tx }, GossipHandle { send_message: message_tx, send_listener: listener_tx },
) )
} }
...@@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork< ...@@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork<
>; >;
struct Built { struct Built {
gossip: GossipRouter, gossip: Pin<Box<dyn Future<Output = ()>>>,
api_handle: Arc<Mutex<ApiData>>, api_handle: Arc<Mutex<ApiData>>,
networks: Vec<TestValidationNetwork>, networks: Vec<TestValidationNetwork>,
} }
...@@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built { ...@@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
let networks: Vec<_> = networks.collect(); let networks: Vec<_> = networks.collect();
Built { Built {
gossip: gossip_router, gossip: gossip_router.boxed(),
api_handle, api_handle,
networks, networks,
} }
......
...@@ -33,14 +33,13 @@ use polkadot_primitives::parachain::{ ...@@ -33,14 +33,13 @@ use polkadot_primitives::parachain::{
use futures::prelude::*; use futures::prelude::*;
use futures::task::SpawnExt; use futures::task::SpawnExt;
pub use futures::task::Spawn as Executor; pub use futures::task::Spawn as Executor;
use futures::channel::oneshot::{self, Receiver}; use futures::channel::oneshot;
use futures::future::{ready, select}; use futures::future::{ready, select};
use std::collections::hash_map::{HashMap, Entry}; use std::collections::hash_map::{HashMap, Entry};
use std::io; use std::io;
use std::sync::Arc; use std::sync::Arc;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Poll, Context};
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use parking_lot::Mutex; use parking_lot::Mutex;
...@@ -242,47 +241,30 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where ...@@ -242,47 +241,30 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
#[derive(Clone, Copy, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct NetworkDown; pub struct NetworkDown;
/// A future that resolves when a collation is received.
pub struct AwaitingCollation {
outer: oneshot::Receiver<oneshot::Receiver<Collation>>,
inner: Option<oneshot::Receiver<Collation>>
}
impl Future for AwaitingCollation {
type Output = Result<Collation, NetworkDown>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
if let Some(ref mut inner) = this.inner {
return Pin::new(inner).poll(cx).map_err(|_| NetworkDown)
}
match Pin::new(&mut this.outer).poll(cx) {
Poll::Ready(Ok(inner)) => {
this.inner = Some(inner);
Pin::new(this).poll(cx)
},
Poll::Ready(Err(_)) => Poll::Ready(Err(NetworkDown)),
Poll::Pending => Poll::Pending,
}
}
}
impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static, P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>, P::Api: ParachainHost<Block>,
N: NetworkService, N: NetworkService,
{ {
type Error = NetworkDown; type Error = NetworkDown;
type Collation = AwaitingCollation; type Collation = Pin<Box<dyn Future<Output = Result<Collation, NetworkDown>> + Send>>;
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, _| { let network = self.network.clone();
let collation = spec.await_collation(relay_parent, parachain);
let _ = tx.send(collation); // A future that resolves when a collation is received.
}); async move {
AwaitingCollation{outer: rx, inner: None} network.with_spec(move |spec, _| {
let collation = spec.await_collation(relay_parent, parachain);
let _ = tx.send(collation);
});
rx.await
.map_err(|_| NetworkDown)?
.await
.map_err(|_| NetworkDown)
}.boxed()
} }
...@@ -348,27 +330,6 @@ impl Knowledge { ...@@ -348,27 +330,6 @@ impl Knowledge {
} }
} }
/// receiver for incoming data.
#[derive(Clone)]
pub struct IncomingReceiver {
inner: future::Shared<Receiver<Incoming>>
}
impl Future for IncomingReceiver {
type Output = Result<Incoming, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Pin::new(&mut Pin::into_inner(self).inner).poll(cx) {
Poll::Ready(Ok(i)) => Poll::Ready(Ok(Incoming::clone(&i))),
Poll::Ready(Err(_)) => Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
))),
Poll::Pending => Poll::Pending,
}
}
}
/// A current validation leaf-work instance /// A current validation leaf-work instance
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct LiveValidationLeaf { pub(crate) struct LiveValidationLeaf {
...@@ -564,36 +525,6 @@ impl LiveValidationLeaves { ...@@ -564,36 +525,6 @@ impl LiveValidationLeaves {
} }
} }
/// Receiver for block data.
pub struct PoVReceiver {
outer: Receiver<Receiver<PoVBlock>>,
inner: Option<Receiver<PoVBlock>>
}
impl Future for PoVReceiver {
type Output = Result<PoVBlock, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
let map_err = |_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
);
if let Some(ref mut inner) = this.inner {
return Pin::new(inner).poll(cx).map_err(map_err);
}
match Pin::new(&mut this.outer).poll(cx).map_err(map_err)? {
Poll::Ready(inner) => {
this.inner = Some(inner);
Pin::new(this).poll(cx)
}
Poll::Pending => Poll::Pending,
}
}
}
/// Can fetch data for a given validation leaf-work instance. /// Can fetch data for a given validation leaf-work instance.
pub struct LeafWorkDataFetcher<P, E, N: NetworkService, T> { pub struct LeafWorkDataFetcher<P, E, N: NetworkService, T> {
network: Arc<N>, network: Arc<N>,
...@@ -658,9 +589,14 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where ...@@ -658,9 +589,14 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
E: Future<Output=()> + Clone + Send + 'static, E: Future<Output=()> + Clone + Send + 'static,
{ {
/// Fetch PoV block for the given candidate receipt. /// Fetch PoV block for the given candidate receipt.
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver { pub fn fetch_pov_block(&self, candidate: &CandidateReceipt)
-> Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>> {
let parachain = candidate.parachain_index; let parachain = candidate.parachain_index;
let parent_hash = self.parent_hash; let parent_hash = self.parent_hash;
let network = self.network.clone();
let candidate = candidate.clone();
let (tx, rx) = oneshot::channel();
let canon_roots = self.api.runtime_api().ingress( let canon_roots = self.api.runtime_api().ingress(
&BlockId::hash(parent_hash), &BlockId::hash(parent_hash),
...@@ -676,15 +612,24 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where ...@@ -676,15 +612,24 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
) )
); );
let candidate = candidate.clone(); async move {
let (tx, rx) = oneshot::channel(); network.with_spec(move |spec, ctx| {
self.network.with_spec(move |spec, ctx| { if let Ok(Some(canon_roots)) = canon_roots {
if let Ok(Some(canon_roots)) = canon_roots { let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots);
let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots); let _ = tx.send(inner_rx);
let _ = tx.send(inner_rx); }
} });
});
PoVReceiver { outer: rx, inner: None } let map_err = |_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
);
rx.await
.map_err(map_err)?
.await
.map_err(map_err)
}.boxed()
} }
} }
......
...@@ -8,7 +8,7 @@ edition = "2018" ...@@ -8,7 +8,7 @@ edition = "2018"
futures = "0.3.1" futures = "0.3.1"
futures-timer = "2.0" futures-timer = "2.0"
parking_lot = "0.9.0" parking_lot = "0.9.0"
tokio = { version = "0.2.4", features = ["rt-core", "blocking"] } tokio = { version = "0.2.4", features = ["rt-core"] }
derive_more = "0.14.1" derive_more = "0.14.1"
log = "0.4.8" log = "0.4.8"
exit-future = "0.2.0"