Unverified Commit da233a12 authored by Ashley's avatar Ashley
Browse files

tidy

parent 832f8054
......@@ -23,7 +23,7 @@
#![warn(missing_docs)]
use futures::prelude::*;
use futures::{channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
use futures::{channel::{mpsc, oneshot}, task::Spawn};
use keystore::KeyStorePtr;
use polkadot_primitives::{
Hash, Block,
......
......@@ -37,9 +37,8 @@ use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
};
use futures01::Future;
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt, future::select, task::SpawnExt};
use futures::{FutureExt, Sink, SinkExt, StreamExt, future::select, task::SpawnExt};
use keystore::KeyStorePtr;
use tokio::runtime::{Handle, Runtime as LocalRuntime};
......@@ -166,7 +165,7 @@ impl WorkerHandle {
impl Drop for WorkerHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
let _ = signal.fire();
}
if let Some(thread) = self.thread.take() {
......@@ -296,7 +295,7 @@ where
impl<PGM> Drop for Worker<PGM> {
fn drop(&mut self) {
for (_, signal) in self.registered_gossip_streams.drain() {
signal.fire();
let _ = signal.fire();
}
}
}
......@@ -416,7 +415,7 @@ where
let topic = erasure_coding_topic(relay_parent, receipt.erasure_root, chunk.index);
// need to remove gossip listener and stop it.
if let Some(signal) = self.registered_gossip_streams.remove(&topic) {
signal.fire();
let _ = signal.fire();
}
}
......@@ -493,7 +492,7 @@ where
let mut runtime = LocalRuntime::new()?;
let mut sender = worker.sender.clone();
let mut runtime_handle = runtime.handle().clone();
let runtime_handle = runtime.handle().clone();
// On startup, registers listeners (gossip streams) for all
// (relay_parent, erasure-root, i) in the awaited frontier.
......@@ -588,7 +587,7 @@ where
runtime.spawn(select(process_notification.boxed(), exit.clone()).map(drop));
exit.wait();
runtime.block_on(exit);
info!(target: LOG_TARGET, "Availability worker exiting");
......@@ -620,7 +619,7 @@ pub struct AvailabilityBlockImport<I, P> {
impl<I, P> Drop for AvailabilityBlockImport<I, P> {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
let _ = signal.fire();
}
}
}
......
......@@ -2,7 +2,7 @@
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8"/>
<title>Substrate node</title>
<title>Polkadot node</title>
<link rel="shortcut icon" href="/favicon.png" />
<script type="module">
import { start_client, default as init } from './pkg/polkadot_cli.js';
......
......@@ -34,7 +34,7 @@ use polkadot_primitives::parachain::{
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement, ErasureChunkMessage};
use futures::prelude::*;
use futures::task::SpawnExt;
use futures::{task::SpawnExt, future::{ready, select}};
use parking_lot::Mutex;
use log::{debug, trace};
......@@ -65,8 +65,8 @@ pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
// this will block internally until the gossip messages stream is obtained.
network.gossip_messages_for(topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)),
_ => future::ready(None)
GossipMessage::Statement(s) => ready(Some(s.signed_statement)),
_ => ready(None)
})
}
......@@ -175,10 +175,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion");
let work = futures::future::select(
work,
self.fetcher.exit().clone()
)
let work = select(work, self.fetcher.exit().clone())
.map(|_| ());
let _ = self.fetcher.executor().spawn(work);
}
......
......@@ -34,6 +34,7 @@ use futures::prelude::*;
use futures::task::SpawnExt;
pub use futures::task::Spawn as Executor;
use futures::channel::oneshot::{self, Receiver};
use futures::future::{ready, select};
use std::collections::hash_map::{HashMap, Entry};
use std::io;
......@@ -187,7 +188,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
{
type Error = String;
type TableRouter = Router<P, E, N, T>;
type BuildTableRouter = Box<dyn Future<Output=Result<Self::TableRouter,Self::Error>> + Send + Unpin>;
type BuildTableRouter = Box<dyn Future<Output=Result<Self::TableRouter, String>> + Send + Unpin>;
fn communication_for(
&self,
......@@ -219,11 +220,10 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let work = table_router.checked_statements()
.for_each(move |msg| {
table_router_clone.import_statement(msg);
futures::future::ready(())
ready(())
});
let work = futures::future::select(work, exit)
.map(drop);
let work = select(work, exit).map(drop);
let _ = executor.spawn(work);
......
......@@ -42,7 +42,7 @@ pub trait Collators: Clone {
/// Errors when producing collations.
type Error: std::fmt::Debug;
/// A full collation.
type Collation: Future<Output=Result<Collation,Self::Error>>;
type Collation: Future<Output=Result<Collation, Self::Error>>;
/// Collate on a specific parachain, building on a given relay chain parent hash.
///
......
......@@ -133,6 +133,6 @@ mod tests {
sender.update_candidate(hash1, true);
assert!(sender.is_complete());
futures::executor::block_on(recv).unwrap();
block_on(recv).unwrap();
}
}
......@@ -759,6 +759,12 @@ mod tests {
let hash = candidate.hash();
store.add_validator_index_and_n_validators(
&relay_parent,
local_index as u32,
n_validators as u32,
).unwrap();
let producer: ParachainWork<future::FutureResult<_, ::std::io::Error>> = ParachainWork {
work: Work {
candidate_receipt: candidate,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment