Unverified Commit 162486cd authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Overseer (#1152)



* Initial commit

* Licenses, spaces, docs

* Add a spawner

* Watch spawned subsystems with a FuturesUnordered

* Move the types around a bit

* Suggested fixes by Max

* Add a handler to talk to the Overseer

* FromOverseer and ToOverseer msgs and stopping

* Docs and return errors

* Dont broadcast, have add a from field to messages

* Allow communication between subsystems and outside world

* A message with a oneshot to send result example

* Remove leftover can_recv_msg

* Remove from field from messages

* Dont be generic over stuff

* Gather messages with StreamUnordered

* Fix comments and formatting

* More docs fixes and an example

* Apply suggestions from code review

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* Fixes from review

Move function from impl block.
Do not panic but resolve with errors if spawner fails or subsystem
resolves.

* Dropping a handler results in a flaky test

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
parent 0ceb54e5
Pipeline #94978 passed with stages
in 20 minutes and 56 seconds
......@@ -1144,6 +1144,21 @@ dependencies = [
"libc",
]
[[package]]
name = "femme"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b6b21baebbed15551f2170010ca4101b9ed3fdc05822791c8bd4631840eab81"
dependencies = [
"cfg-if",
"js-sys",
"log 0.4.8",
"serde",
"serde_derive",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "file-per-thread-logger"
version = "0.1.3"
......@@ -3284,6 +3299,18 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
[[package]]
name = "overseer"
version = "0.1.0"
dependencies = [
"femme",
"futures 0.3.5",
"futures-timer 3.0.2",
"kv-log-macro",
"log 0.4.8",
"streamunordered",
]
[[package]]
name = "owning_ref"
version = "0.4.1"
......@@ -7183,6 +7210,18 @@ dependencies = [
"generic-array",
]
[[package]]
name = "streamunordered"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9394ee1338fee8370bee649f8a7170b3a56917903a0956467ad192dcf8699ca"
dependencies = [
"futures-core",
"futures-sink",
"futures-util",
"slab",
]
[[package]]
name = "string"
version = "0.2.1"
......@@ -8347,6 +8386,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c7d40d09cdbf0f4895ae58cf57d92e1e57a9dd8ed2e8390514b54a47cc5551"
dependencies = [
"cfg-if",
"serde",
"serde_json",
"wasm-bindgen-macro",
]
......
......@@ -27,6 +27,7 @@ members = [
"erasure-coding",
"network",
"network/test",
"overseer",
"primitives",
"runtime/common",
"runtime/polkadot",
......
[package]
name = "overseer"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
futures-timer = "3.0.2"
femme = "2.0.1"
log = "0.4.8"
kv-log-macro = "1.0.6"
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Shows a basic usage of the `Overseer`:
//! * Spawning subsystems and subsystem child jobs
//! * Establishing message passing
use std::time::Duration;
use futures::{
pending, pin_mut, executor, select, stream,
FutureExt, StreamExt,
};
use futures_timer::Delay;
use kv_log_macro as log;
use overseer::{
AllMessages, CandidateBackingSubsystemMessage, FromOverseer,
Overseer, Subsystem, SubsystemContext, SpawnedSubsystem, ValidationSubsystemMessage,
};
struct Subsystem1;
impl Subsystem1 {
async fn run(mut ctx: SubsystemContext<CandidateBackingSubsystemMessage>) {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
if let FromOverseer::Communication { msg } = msg {
log::info!("msg {:?}", msg);
}
continue;
}
Ok(None) => (),
Err(_) => {
log::info!("exiting");
return;
}
}
Delay::new(Duration::from_secs(1)).await;
ctx.send_msg(AllMessages::Validation(
ValidationSubsystemMessage::ValidityAttestation
)).await.unwrap();
}
}
}
impl Subsystem<CandidateBackingSubsystemMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingSubsystemMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
}
}
struct Subsystem2;
impl Subsystem2 {
async fn run(mut ctx: SubsystemContext<ValidationSubsystemMessage>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
Delay::new(Duration::from_secs(1)).await;
}
})).await.unwrap();
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
log::info!("Subsystem2 received message {:?}", msg);
continue;
}
Ok(None) => { pending!(); }
Err(_) => {
log::info!("exiting");
return;
},
}
}
}
}
impl Subsystem<ValidationSubsystemMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<ValidationSubsystemMessage>) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
}
}
fn main() {
femme::with_level(femme::LevelFilter::Trace);
let spawner = executor::ThreadPool::new().unwrap();
futures::executor::block_on(async {
let timer_stream = stream::repeat(()).then(|_| async {
Delay::new(Duration::from_secs(1)).await;
});
let (overseer, _handler) = Overseer::new(
Box::new(Subsystem2),
Box::new(Subsystem1),
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;
pin_mut!(timer_stream);
pin_mut!(overseer_fut);
loop {
select! {
_ = overseer_fut => break,
_ = timer_stream.next() => {
log::info!("tick");
}
complete => break,
}
}
});
}
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment