Unverified Commit 220b87a6 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Use SpawnNamed instead of Spawn in Overseer (#1430)

* Use SpawnNamed instead of Spawn in Overseer

* reexport SpawnNamed and fix doc tests

* Fix deps
parent 7071a022
Pipeline #100895 passed with stages
in 22 minutes and 49 seconds
......@@ -4457,6 +4457,7 @@ dependencies = [
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sc-network",
"sp-core",
"sp-runtime",
"streamunordered",
]
......@@ -4538,6 +4539,7 @@ dependencies = [
"parity-scale-codec",
"polkadot-primitives",
"polkadot-statement-table",
"sp-core",
"sp-runtime",
]
......@@ -4571,9 +4573,11 @@ dependencies = [
"futures-timer 3.0.2",
"kv-log-macro",
"log 0.4.8",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-primitives",
"sc-client-api",
"sp-core",
"streamunordered",
]
......@@ -4611,6 +4615,7 @@ dependencies = [
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sc-network",
"sp-core",
"sp-runtime",
"streamunordered",
]
......@@ -4963,6 +4968,7 @@ dependencies = [
"polkadot-node-subsystem",
"polkadot-primitives",
"polkadot-subsystem-test-helpers",
"sp-core",
"sp-keyring",
"sp-runtime",
"sp-staking",
......@@ -4986,6 +4992,7 @@ dependencies = [
"futures 0.3.5",
"parking_lot 0.10.2",
"polkadot-node-subsystem",
"sp-core",
]
[[package]]
......
......@@ -10,7 +10,6 @@ sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
......@@ -20,6 +19,7 @@ derive_more = "0.99.9"
bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.5", features = ["thread-pool"] }
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
......
......@@ -24,7 +24,6 @@ use std::sync::Arc;
use bitvec::vec::BitVec;
use futures::{
channel::{mpsc, oneshot},
task::{Spawn, SpawnError},
Future, FutureExt, SinkExt, StreamExt,
};
......@@ -37,7 +36,7 @@ use polkadot_primitives::v1::{
};
use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport,
ValidationOutputs, ValidationResult,
ValidationOutputs, ValidationResult, SpawnNamed,
};
use polkadot_subsystem::{
Subsystem, SubsystemContext, SpawnedSubsystem,
......@@ -77,8 +76,6 @@ enum Error {
#[from]
Mpsc(mpsc::SendError),
#[from]
Spawn(SpawnError),
#[from]
UtilError(util::Error),
}
......@@ -735,7 +732,7 @@ pub struct CandidateBackingSubsystem<Spawner, Context> {
impl<Spawner, Context> CandidateBackingSubsystem<Spawner, Context>
where
Spawner: Clone + Spawn + Send + Unpin,
Spawner: Clone + SpawnNamed + Send + Unpin,
Context: SubsystemContext,
ToJob: From<<Context as SubsystemContext>::Message>,
{
......@@ -754,7 +751,7 @@ where
impl<Spawner, Context> Subsystem<Context> for CandidateBackingSubsystem<Spawner, Context>
where
Spawner: Spawn + Send + Clone + Unpin + 'static,
Spawner: SpawnNamed + Send + Clone + Unpin + 'static,
Context: SubsystemContext,
<Context as SubsystemContext>::Message: Into<ToJob>,
{
......@@ -769,10 +766,7 @@ where
mod tests {
use super::*;
use assert_matches::assert_matches;
use futures::{
executor::{self, ThreadPool},
future, Future,
};
use futures::{executor, future, Future};
use polkadot_primitives::v1::{
AssignmentKind, BlockData, CandidateCommitments, CollatorId, CoreAssignment, CoreIndex,
LocalValidationData, GlobalValidationSchedule, GroupIndex, HeadData,
......@@ -905,7 +899,7 @@ mod tests {
}
fn test_harness<T: Future<Output=()>>(keystore: KeyStorePtr, test: impl FnOnce(TestHarness) -> T) {
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool.clone());
......
......@@ -20,3 +20,4 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......@@ -184,7 +184,10 @@ impl<Net, Context> Subsystem<Context> for NetworkBridge<Net>
fn start(self, ctx: Context) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
SpawnedSubsystem(run_network(self.0, ctx).map(|_| ()).boxed())
SpawnedSubsystem {
name: "network-bridge-subsystem",
future: run_network(self.0, ctx).map(|_| ()).boxed(),
}
}
}
......@@ -521,7 +524,7 @@ async fn run_network<N: Network>(
mod tests {
use super::*;
use futures::channel::mpsc;
use futures::executor::{self, ThreadPool};
use futures::executor;
use std::sync::Arc;
use parking_lot::Mutex;
......@@ -632,8 +635,7 @@ mod tests {
}
fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (network, network_handle) = new_test_network();
let (context, virtual_overseer) = subsystem_test::make_subsystem_context(pool);
......
......@@ -20,3 +20,4 @@ polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsys
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......@@ -69,7 +69,10 @@ impl<C> Subsystem<C> for PoVDistribution
fn start(self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run`.
SpawnedSubsystem(run(ctx).map(|_| ()).boxed())
SpawnedSubsystem {
name: "pov-distribution-subsystem",
future: run(ctx).map(|_| ()).boxed(),
}
}
}
......@@ -548,7 +551,7 @@ async fn run(
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::{self, ThreadPool};
use futures::executor;
use polkadot_primitives::v1::BlockData;
use assert_matches::assert_matches;
......@@ -616,7 +619,7 @@ mod tests {
our_view: View(vec![hash_a, hash_b]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
......@@ -696,7 +699,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
......@@ -774,7 +777,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -846,7 +849,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -934,7 +937,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -997,7 +1000,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1058,7 +1061,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1116,7 +1119,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1201,7 +1204,7 @@ mod tests {
our_view: View(vec![hash_a, hash_b]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1263,7 +1266,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1340,7 +1343,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1424,7 +1427,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......
......@@ -24,3 +24,4 @@ parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......@@ -70,7 +70,10 @@ impl<C> Subsystem<C> for StatementDistribution
fn start(self, ctx: C) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run`.
SpawnedSubsystem(run(ctx).map(|_| ()).boxed())
SpawnedSubsystem {
name: "statement-distribution-subsystem",
future: run(ctx).map(|_| ()).boxed(),
}
}
}
......@@ -892,7 +895,7 @@ mod tests {
use node_primitives::Statement;
use polkadot_primitives::v1::CommittedCandidateReceipt;
use assert_matches::assert_matches;
use futures::executor::{self, ThreadPool};
use futures::executor;
#[test]
fn active_head_accepts_only_2_seconded_per_validator() {
......@@ -1209,7 +1212,7 @@ mod tests {
},
};
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
let peer = PeerId::random();
......@@ -1301,7 +1304,7 @@ mod tests {
(peer_c.clone(), peer_data_from_view(peer_c_view)),
].into_iter().collect();
let pool = ThreadPool::new().unwrap();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let (mut ctx, mut handle) = subsystem_test::make_subsystem_context(pool);
executor::block_on(async move {
......
......@@ -12,9 +12,11 @@ streamunordered = "0.5.1"
polkadot-primitives = { path = "../../primitives" }
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" }
async-trait = "0.1"
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = { version = "0.3.5", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.0.1"
......
......@@ -21,7 +21,7 @@
use std::time::Duration;
use futures::{
channel::oneshot,
pending, pin_mut, executor, select, stream,
pending, pin_mut, select, stream,
FutureExt, StreamExt,
};
use futures_timer::Delay;
......@@ -77,9 +77,14 @@ impl<C> Subsystem<C> for Subsystem1
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
let future = Box::pin(async move {
Self::run(ctx).await;
}))
});
SpawnedSubsystem {
name: "subsystem-1",
future,
}
}
}
......@@ -87,12 +92,15 @@ struct Subsystem2;
impl Subsystem2 {
async fn run(mut ctx: impl SubsystemContext<Message=CandidateValidationMessage>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
Delay::new(Duration::from_secs(1)).await;
}
})).await.unwrap();
ctx.spawn(
"subsystem-2-job",
Box::pin(async {
loop {
log::info!("Job tick");
Delay::new(Duration::from_secs(1)).await;
}
}),
).await.unwrap();
loop {
match ctx.try_recv().await {
......@@ -114,16 +122,20 @@ impl<C> Subsystem<C> for Subsystem2
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
let future = Box::pin(async move {
Self::run(ctx).await;
}))
});
SpawnedSubsystem {
name: "subsystem-2",
future,
}
}
}
fn main() {
femme::with_level(femme::LevelFilter::Trace);
let spawner = executor::ThreadPool::new().unwrap();
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
futures::executor::block_on(async {
let timer_stream = stream::repeat(()).then(|_| async {
Delay::new(Duration::from_secs(1)).await;
......
......@@ -64,9 +64,8 @@ use std::collections::HashSet;
use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll, select,
future::{BoxFuture, RemoteHandle},
future::BoxFuture,
stream::{self, FuturesUnordered},
task::{Spawn, SpawnExt},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
......@@ -86,6 +85,7 @@ pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem,
};
use polkadot_node_primitives::SpawnNamed;
// A capacity of bounded channels inside the overseer.
......@@ -109,8 +109,8 @@ enum ToOverseer {
/// spawn on the overseer and a `oneshot::Sender` to signal the result
/// of the spawn.
SpawnJob {
name: &'static str,
s: BoxFuture<'static, ()>,
res: oneshot::Sender<SubsystemResult<()>>,
},
}
......@@ -279,14 +279,15 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
self.rx.next().await.ok_or(SubsystemError)
}
async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
let (tx, rx) = oneshot::channel();
async fn spawn(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
-> SubsystemResult<()>
{
self.tx.send(ToOverseer::SpawnJob {
name,
s,
res: tx,
}).await?;
rx.await?
Ok(())
}
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
......@@ -322,7 +323,7 @@ struct OverseenSubsystem<M> {
}
/// The `Overseer` itself.
pub struct Overseer<S: Spawn> {
pub struct Overseer<S: SpawnNamed> {
/// A candidate validation subsystem.
candidate_validation_subsystem: OverseenSubsystem<CandidateValidationMessage>,
......@@ -361,7 +362,7 @@ pub struct Overseer<S: Spawn> {
s: S,
/// Here we keep handles to spawned subsystems to be notified when they terminate.
running_subsystems: FuturesUnordered<RemoteHandle<()>>,
running_subsystems: FuturesUnordered<BoxFuture<'static, ()>>,
/// Gather running subsystms' outbound streams into one.
running_subsystems_rx: StreamUnordered<mpsc::Receiver<ToOverseer>>,
......@@ -416,7 +417,7 @@ pub struct AllSubsystems<CV, CB, CS, SD, AD, BD, P, PoVD, RA, AS, NB> {
impl<S> Overseer<S>
where
S: Spawn,
S: SpawnNamed,
{
/// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s.
///
......@@ -467,16 +468,19 @@ where
/// self,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }))
/// SpawnedSubsystem {
/// name: "validation-subsystem",
/// future: Box::pin(async move {
/// loop {
/// Delay::new(Duration::from_secs(1)).await;
/// }
/// }),
/// }
/// }
/// }
///
/// # fn main() { executor::block_on(async move {
/// let spawner = executor::ThreadPool::new().unwrap();
/// let spawner = sp_core::testing::SpawnBlockingExecutor::new();
/// let all_subsystems = AllSubsystems {
/// candidate_validation: ValidationSubsystem,
/// candidate_backing: DummySubsystem,
......@@ -737,10 +741,8 @@ where
) {
match msg {
ToOverseer::SubsystemMessage(msg) => self.route_message(msg).await,
ToOverseer::SpawnJob { s, res } => {
let s = self.spawn_job(s);
let _ = res.send(s);
ToOverseer::SpawnJob { name, s } => {
self.spawn_job(name, s);
}
}
}
......@@ -897,26 +899,33 @@ where
}
}
fn spawn_job(&mut self, j: BoxFuture<'static, ()>) -> SubsystemResult<()> {
self.s.spawn(j).map_err(|_| SubsystemError)
fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.s.spawn(name, j);
}
}
fn spawn<S: Spawn, M: Send + 'static>(
fn spawn<S: SpawnNamed, M: Send + 'static>(
spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>,
futures: &mut FuturesUnordered<BoxFuture<'static, ()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
s: impl Subsystem<OverseerSubsystemContext<M>>,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
let ctx = OverseerSubsystemContext { rx: to_rx, tx: from_tx };
let f = s.start(ctx);
let SpawnedSubsystem { future, name } = s.start(ctx);
let handle = spawner.spawn_with_handle(f.0)?;
let (tx, rx) = oneshot::channel();
let fut = Box::pin(async move {
future.await;
let _ = tx.send(());
});
spawner.spawn(name, fut);
streams.push(from_rx);
futures.push(handle);
futures.push(Box::pin(rx.map(|_| ())));
let instance = Some(SubsystemInstance {
tx: to_tx,
......@@ -944,21 +953,24 @@ mod tests {
{
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0;
SpawnedSubsystem(Box::pin(async move {
let mut i = 0;
loop {
match ctx.recv().await {
Ok(FromOverseer::Communication { .. }) => {
let _ = sender.send(i).await;
i += 1;
continue;
SpawnedSubsystem {
name: "test-subsystem-1",
future: Box::pin(async move {
let mut i = 0;
loop {
match ctx.recv().await {
Ok(FromOverseer::Communication { .. }) => {
let _ = sender.send(i).await;
i += 1;
continue;
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
Err(_) => return,
_ => (),
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return,
Err(_) => return,
_ => (),
}
}