Unverified Commit 73563253 authored by Ashley's avatar Ashley
Browse files

Tidy up network

parent 1c9cf042
......@@ -15,8 +15,7 @@ codec = { package = "parity-scale-codec", version = "1.1.0", default-features =
substrate-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
substrate-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
futures = "0.1"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
futures = { version = "0.3.1", features = ["compat"] }
log = "0.4.8"
exit-future = { git = "https://github.com/expenses/exit-future", branch = "modernize" }
substrate-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
......
......@@ -20,7 +20,7 @@ use codec::{Encode, Decode};
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
use substrate_network::PeerId;
use futures03::channel::oneshot;
use futures::channel::oneshot;
use std::collections::hash_map::{HashMap, Entry};
use std::time::{Duration, Instant};
......@@ -196,7 +196,7 @@ impl CollatorPool {
}
/// Wait for a collation from a parachain.
pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: futures03::channel::oneshot::Sender<Collation>) {
pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
self.collations.entry((relay_parent, para_id))
.or_insert_with(CollationSlot::blank_now)
.entries
......@@ -230,7 +230,7 @@ mod tests {
use polkadot_primitives::parachain::{
CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
};
use futures03::executor::block_on;
use futures::executor::block_on;
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
......
......@@ -26,7 +26,8 @@ pub mod validation;
pub mod gossip;
use codec::{Decode, Encode};
use futures03::channel::mpsc;
use futures::channel::{oneshot, mpsc};
use futures::prelude::*;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
Id as ParaId, BlockData, CollatorId, CandidateReceipt, Collation, PoVBlock,
......@@ -152,19 +153,19 @@ impl GossipService for consensus_gossip::ConsensusGossip<Block> {
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: Box<dyn futures03::Stream<Item = TopicNotification> + Unpin + Send>,
topic_stream: Box<dyn Stream<Item = TopicNotification> + Unpin + Send>,
}
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: Box<dyn futures03::Stream<Item = TopicNotification> + Unpin + Send>) -> Self {
pub fn new(topic_stream: Box<dyn Stream<Item = TopicNotification> + Unpin + Send>) -> Self {
Self {
topic_stream,
}
}
}
impl futures03::Stream for GossipMessageStream {
impl Stream for GossipMessageStream {
type Item = (GossipMessage, Option<PeerId>);
fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll<Option<Self::Item>> {
......@@ -196,7 +197,7 @@ struct PoVBlockRequest {
validation_leaf: Hash,
candidate_hash: Hash,
block_data_hash: Hash,
sender: futures03::channel::oneshot::Sender<PoVBlock>,
sender: oneshot::Sender<PoVBlock>,
canon_roots: StructuredUnroutedIngress,
}
......@@ -333,8 +334,8 @@ impl PolkadotProtocol {
candidate: &CandidateReceipt,
relay_parent: Hash,
canon_roots: StructuredUnroutedIngress,
) -> futures03::channel::oneshot::Receiver<PoVBlock> {
let (tx, rx) = futures03::channel::oneshot::channel();
) -> oneshot::Receiver<PoVBlock> {
let (tx, rx) = oneshot::channel();
self.pending.push(PoVBlockRequest {
attempted_peers: Default::default(),
......@@ -660,7 +661,7 @@ impl Specialization<Block> for PolkadotProtocol {
let retain = peer != &who;
if !retain {
// swap with a dummy value which will be dropped immediately.
let (sender, _) = futures03::channel::oneshot::channel();
let (sender, _) = oneshot::channel();
pending.push(::std::mem::replace(val, PoVBlockRequest {
attempted_peers: Default::default(),
validation_leaf: Default::default(),
......@@ -755,8 +756,8 @@ impl PolkadotProtocol {
}
}
fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> futures03::channel::oneshot::Receiver<Collation> {
let (tx, rx) = futures03::channel::oneshot::channel();
fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
let (tx, rx) = oneshot::channel();
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
self.collators.await_collation(relay_parent, para_id, tx);
rx
......
......@@ -32,9 +32,9 @@ use polkadot_primitives::parachain::{
OutgoingMessages, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock,
};
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement};
use futures03::task::SpawnExt;
use futures03::TryFutureExt;
use futures03::FutureExt;
use futures::prelude::*;
use futures::task::SpawnExt;
use parking_lot::Mutex;
use log::{debug, trace};
......@@ -59,16 +59,14 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
impl futures03::Stream<Item=SignedStatement> {
impl Stream<Item=SignedStatement> {
// spin up a task in the background that processes all incoming statements
// validation has been done already by the gossip validator.
// this will block internally until the gossip messages stream is obtained.
use futures03::StreamExt;
network.gossip_messages_for(topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => futures03::future::ready(Some(s.signed_statement)),
_ => futures03::future::ready(None)
GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)),
_ => future::ready(None)
})
}
......@@ -103,7 +101,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements(&self) -> impl futures03::Stream<Item=SignedStatement> {
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement> {
checked_statements(&**self.network(), self.attestation_topic)
}
......@@ -132,7 +130,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
E: Future<Output=()> + Clone + Send + Unpin + 'static,
{
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) {
......@@ -177,7 +175,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion");
let work = futures03::future::select(
let work = futures::future::select(
work,
self.fetcher.exit().clone()
)
......@@ -189,9 +187,9 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
}
fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
-> impl futures03::Future<Output=()> + Send + 'static
-> impl Future<Output=()> + Send + 'static
where
D: futures03::Future<Output=Result<PoVBlock,io::Error>> + Send + Unpin + 'static,
D: Future<Output=Result<PoVBlock,io::Error>> + Send + Unpin + 'static,
{
let table = self.table.clone();
let network = self.network().clone();
......@@ -232,7 +230,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + 'static,
E: Future<Output=()> + Clone + Send + 'static,
{
type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver;
......
......@@ -33,7 +33,7 @@ use substrate_network::{
specialization::NetworkSpecialization,
};
use futures03::executor::block_on;
use futures::executor::block_on;
mod validation;
......
......@@ -41,12 +41,12 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures03::{prelude::*, channel::mpsc};
use futures::{prelude::*, channel::mpsc};
use codec::Encode;
use super::{TestContext, TestChainContext};
type TaskExecutor = Arc<dyn futures03::task::Spawn + Send + Sync>;
type TaskExecutor = Arc<dyn futures::task::Spawn + Send + Sync>;
#[derive(Clone, Copy)]
struct NeverExit;
......
......@@ -30,9 +30,10 @@ use polkadot_primitives::parachain::{
ValidatorId, PoVBlock
};
use futures03::channel::oneshot::{self, Receiver};
pub use futures03::task::{Spawn as Executor, SpawnExt};
use futures03::{StreamExt as _, FutureExt as _, TryFutureExt};
use futures::prelude::*;
use futures::task::SpawnExt;
pub use futures::task::Spawn as Executor;
use futures::channel::oneshot::{self, Receiver};
use std::collections::hash_map::{HashMap, Entry};
use std::io;
......@@ -97,7 +98,7 @@ impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
E: Clone + futures03::Future<Output=()> + Send + Sync + 'static,
E: Clone + Future<Output=()> + Send + Sync + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
......@@ -157,9 +158,9 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
impl futures03::Future<Output=Option<PeerId>> + Send
impl Future<Output=Option<PeerId>> + Send
{
let (send, recv) = futures03::channel::oneshot::channel();
let (send, recv) = oneshot::channel();
self.network.with_spec(move |spec, _| {
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned());
});
......@@ -171,7 +172,7 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub fn checked_statements(&self, relay_parent: Hash) -> impl futures03::Stream<Item=SignedStatement> {
pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream<Item=SignedStatement> {
crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent))
}
}
......@@ -180,13 +181,13 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
E: Clone + futures03::Future<Output=()> + Send + Sync + Unpin + 'static,
E: Clone + Future<Output=()> + Send + Sync + Unpin + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
type Error = String;
type TableRouter = Router<P, E, N, T>;
type BuildTableRouter = Box<dyn futures03::Future<Output=Result<Self::TableRouter,Self::Error>> + Send + Unpin>;
type BuildTableRouter = Box<dyn Future<Output=Result<Self::TableRouter,Self::Error>> + Send + Unpin>;
fn communication_for(
&self,
......@@ -218,10 +219,10 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let work = table_router.checked_statements()
.for_each(move |msg| {
table_router_clone.import_statement(msg);
futures03::future::ready(())
futures::future::ready(())
});
let work = futures03::future::select(work, exit)
let work = futures::future::select(work, exit)
.map(|_| ());
let _ = executor.spawn(work);
......@@ -239,11 +240,11 @@ pub struct NetworkDown;
/// A future that resolves when a collation is received.
pub struct AwaitingCollation {
outer: futures03::channel::oneshot::Receiver<::futures03::channel::oneshot::Receiver<Collation>>,
inner: Option<futures03::channel::oneshot::Receiver<Collation>>
outer: oneshot::Receiver<oneshot::Receiver<Collation>>,
inner: Option<oneshot::Receiver<Collation>>
}
impl futures03::Future for AwaitingCollation {
impl Future for AwaitingCollation {
type Output = Result<Collation, NetworkDown>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
......@@ -272,7 +273,7 @@ impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
type Collation = AwaitingCollation;
fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
let (tx, rx) = futures03::channel::oneshot::channel();
let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, _| {
let collation = spec.await_collation(relay_parent, parachain);
let _ = tx.send(collation);
......@@ -346,10 +347,10 @@ impl Knowledge {
/// receiver for incoming data.
#[derive(Clone)]
pub struct IncomingReceiver {
inner: futures03::future::Shared<Receiver<Incoming>>
inner: future::Shared<Receiver<Incoming>>
}
impl futures03::Future for IncomingReceiver {
impl Future for IncomingReceiver {
type Output = Result<Incoming, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
......@@ -565,7 +566,7 @@ pub struct PoVReceiver {
inner: Option<Receiver<PoVBlock>>
}
impl futures03::Future for PoVReceiver {
impl Future for PoVReceiver {
type Output = Result<PoVBlock, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
......@@ -650,7 +651,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + 'static,
E: Future<Output=()> + Clone + Send + 'static,
{
/// Fetch PoV block for the given candidate receipt.
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
......@@ -672,7 +673,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
);
let candidate = candidate.clone();
let (tx, rx) = futures03::channel::oneshot::channel();
let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, ctx| {
if let Ok(Some(canon_roots)) = canon_roots {
let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment