Unverified Commit fc02b1dc authored by Ashley's avatar Ashley
Browse files

Fix collator

parent 6c4ff5b3
......@@ -3532,7 +3532,6 @@ dependencies = [
name = "polkadot-collator"
version = "0.7.9"
dependencies = [
"futures 0.1.29 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-timer 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -3548,7 +3547,7 @@ dependencies = [
"sp-consensus 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sp-core 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"sp-keyring 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
......
......@@ -24,24 +24,23 @@ mod chain_spec;
mod browser;
use chain_spec::ChainSpec;
use futures::{Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt};
use futures::{
Future, FutureExt, TryFutureExt, future::select, channel::oneshot, compat::Future01CompatExt,
task::Spawn
};
use tokio::runtime::Runtime;
use std::sync::Arc;
use log::{info, error};
use structopt::StructOpt;
pub use service::{
AbstractService, CustomConfiguration,
ProvideRuntimeApi, CoreApi, ParachainHost,
WrappedExecutor
};
pub use cli::{VersionInfo, IntoExit, NoCustom};
pub use cli::{display_role, error};
type BoxedFuture = Box<dyn futures01::Future<Item = (), Error = ()> + Send>;
/// Abstraction over an executor that lets you spawn tasks in the background.
pub type TaskExecutor = Arc<dyn futures01::future::Executor<BoxedFuture> + Send + Sync>;
fn load_spec(id: &str) -> Result<Option<service::ChainSpec>, String> {
Ok(match ChainSpec::from(id) {
Some(spec) => Some(spec.load()?),
......@@ -64,13 +63,14 @@ pub trait Worker: IntoExit {
fn configuration(&self) -> service::CustomConfiguration { Default::default() }
/// Do work and schedule exit.
fn work<S, SC, B, CE>(self, service: &S, executor: TaskExecutor) -> Self::Work
fn work<S, SC, B, CE, SP>(self, service: &S, spawner: SP) -> Self::Work
where S: AbstractService<Block = service::Block, RuntimeApi = service::RuntimeApi,
Backend = B, SelectChain = SC,
NetworkSpecialization = service::PolkadotProtocol, CallExecutor = CE>,
SC: service::SelectChain<service::Block> + 'static,
B: service::Backend<service::Block, service::Blake2Hasher> + 'static,
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static;
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static;
}
#[derive(Debug, StructOpt, Clone)]
......@@ -183,7 +183,7 @@ fn run_until_exit<T, SC, B, CE, W>(
// but we need to keep holding a reference to the global telemetry guard
let _telemetry = service.telemetry();
let work = worker.work(&service, Arc::new(executor));
let work = worker.work(&service, WrappedExecutor(executor));
let service = service
.map_err(|err| error!("Error while running Service: {}", err))
.compat();
......
......@@ -6,8 +6,7 @@ description = "Collator node implementation"
edition = "2018"
[dependencies]
futures01 = { package = "futures", version = "0.1.17" }
futures = { version = "0.3.1", features = ["compat"] }
futures = "0.3.1"
client = { package = "sc-client", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
client-api = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
......@@ -20,7 +19,7 @@ polkadot-network = { path = "../network" }
polkadot-validation = { path = "../validation" }
polkadot-service = { path = "../service" }
log = "0.4.8"
tokio = "0.1.22"
tokio = "0.2.1"
futures-timer = "1.0"
[dev-dependencies]
......
......@@ -49,11 +49,7 @@ use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use futures::{
future, Future, Stream, FutureExt, TryFutureExt, StreamExt,
compat::{Future01CompatExt, Stream01CompatExt}
};
use futures01::{Future as _};
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
use log::{warn, error};
use client::BlockchainEvents;
use primitives::{Pair, Blake2Hasher};
......@@ -71,7 +67,7 @@ use polkadot_network::validation::{LeafWorkParams, ValidationNetwork};
use polkadot_network::{PolkadotNetworkService, PolkadotProtocol};
use polkadot_runtime::RuntimeApi;
pub use polkadot_cli::{VersionInfo, TaskExecutor};
pub use polkadot_cli::VersionInfo;
pub use polkadot_network::validation::Incoming;
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
......@@ -93,26 +89,19 @@ pub trait Network: Send + Sync {
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
}
impl<P, E> Network for ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor> where
impl<P, E, SP> Network for ValidationNetwork<P, E, PolkadotNetworkService, SP> where
P: 'static + Send + Sync,
E: 'static + Send + Sync,
SP: 'static + Spawn + Clone + Send + Sync,
{
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Output=Option<PeerId>> + Unpin + Send>
{
Box::new(
Self::collator_id_to_peer_id(self, collator_id)
.compat()
.map(|res| res.ok().and_then(|id| id))
)
Box::new(Self::collator_id_to_peer_id(self, collator_id))
}
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>> {
Box::new(
Self::checked_statements(self, relay_parent)
.compat()
.filter_map(|item| future::ready(item.ok()))
)
Box::new(Self::checked_statements(self, relay_parent))
}
}
......@@ -147,15 +136,16 @@ pub trait BuildParachainContext {
type ParachainContext: self::ParachainContext;
/// Build the `ParachainContext`.
fn build<B, E>(
fn build<B, E, SP>(
self,
client: Arc<PolkadotClient<B, E>>,
task_executor: TaskExecutor,
spawner: SP,
network: Arc<dyn Network>,
) -> Result<Self::ParachainContext, ()>
where
B: client_api::backend::Backend<Block, Blake2Hasher> + 'static,
E: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static;
E: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone;
}
/// Parachain context needed for collation.
......@@ -239,16 +229,17 @@ pub async fn collate<R, P>(
}
/// Polkadot-api context.
struct ApiContext<P, E> {
network: Arc<ValidationNetwork<P, E, PolkadotNetworkService, TaskExecutor>>,
struct ApiContext<P, E, SP> {
network: Arc<ValidationNetwork<P, E, PolkadotNetworkService, SP>>,
parent_hash: Hash,
validators: Vec<ValidatorId>,
}
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
impl<P: 'static, E: 'static, SP: 'staticW> RelayChainContext for ApiContext<P, E, SP> where
P: ProvideRuntimeApi + Send + Sync,
P::Api: ParachainHost<Block>,
E: futures::Future<Output=()> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync
{
type Error = String;
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
......@@ -262,7 +253,6 @@ impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
parent_hash: self.parent_hash,
authorities: self.validators.clone(),
})
.compat()
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));
Box::new(future::ok(ConsolidatedIngress(Vec::new())))
......@@ -302,7 +292,7 @@ impl<P, E> Worker for CollationNode<P, E> where
config
}
fn work<S, SC, B, CE>(self, service: &S, task_executor: TaskExecutor) -> Self::Work
fn work<S, SC, B, CE, SP>(self, service: &S, spawner: SP) -> Self::Work
where
S: AbstractService<
Block = Block,
......@@ -314,7 +304,8 @@ impl<P, E> Worker for CollationNode<P, E> where
>,
SC: polkadot_service::SelectChain<Block> + 'static,
B: client_api::backend::Backend<Block, Blake2Hasher> + 'static,
CE: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static
CE: client::CallExecutor<Block, Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static,
{
let CollationNode { build_parachain_context, exit, para_id, key } = self;
let client = service.client();
......@@ -356,12 +347,12 @@ impl<P, E> Worker for CollationNode<P, E> where
exit.clone(),
message_validator,
client.clone(),
task_executor.clone(),
spawner.clone(),
));
let parachain_context = match build_parachain_context.build(
client.clone(),
task_executor,
spawner,
validation_network.clone(),
) {
Ok(ctx) => ctx,
......@@ -433,13 +424,13 @@ impl<P, E> Worker for CollationNode<P, E> where
outgoing,
);
let exit = inner_exit_2.clone().unit_error().compat();
tokio::spawn(res.select(exit).then(|_| Ok(())));
let exit = inner_exit_2.clone();
tokio::spawn(future::select(res, exit).map(drop));
})
});
future::Either::Right(collation_work)
}).map(|_| Ok::<_, ()>(()));
});
let deadlined = future::select(
work,
......@@ -456,7 +447,7 @@ impl<P, E> Worker for CollationNode<P, E> where
let future = future::select(
silenced,
inner_exit.clone()
).map(|_| Ok::<_, ()>(())).compat();
).map(drop);
tokio::spawn(future);
future::ready(())
......
......@@ -16,7 +16,7 @@ codec = { package = "parity-scale-codec", version = "1.1.0", default-features =
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
futures = { version = "0.3.1", features = ["compat"] }
futures = "0.3.1"
log = "0.4.8"
exit-future = "0.2.0"
sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
......
......@@ -48,9 +48,9 @@ pub use chain_spec::ChainSpec;
#[cfg(not(target_os = "unknown"))]
pub use consensus::run_validation_worker;
/// A wrapped futures01::future::Executor.
/// Wrap a futures01 executor as a futures03 spawn.
#[derive(Clone)]
struct WrappedExecutor<T>(pub T);
pub struct WrappedExecutor<T>(pub T);
impl<T> Spawn for WrappedExecutor<T>
where T: futures01::future::Executor<Box<dyn futures01::Future<Item=(),Error=()> + Send + 'static>>
......
......@@ -18,9 +18,8 @@
#![warn(missing_docs)]
use cli::{AbstractService, VersionInfo, TaskExecutor};
use futures::channel::oneshot;
use futures::{future, FutureExt};
use cli::{AbstractService, VersionInfo};
use futures::{channel::oneshot, future, FutureExt, task::Spawn};
use std::cell::RefCell;
......@@ -46,13 +45,14 @@ impl cli::IntoExit for Worker {
impl cli::Worker for Worker {
type Work = <Self as cli::IntoExit>::Exit;
fn work<S, SC, B, CE>(self, _: &S, _: TaskExecutor) -> Self::Work
fn work<S, SC, B, CE, SP>(self, _: &S, _: SP) -> Self::Work
where S: AbstractService<Block = service::Block, RuntimeApi = service::RuntimeApi,
Backend = B, SelectChain = SC,
NetworkSpecialization = service::PolkadotProtocol, CallExecutor = CE>,
SC: service::SelectChain<service::Block> + 'static,
B: service::Backend<service::Block, service::Blake2Hasher> + 'static,
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static {
CE: service::CallExecutor<service::Block, service::Blake2Hasher> + Clone + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static {
use cli::IntoExit;
self.into_exit()
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment