Unverified Commit e80340c0 authored by Bernhard Schuster's avatar Bernhard Schuster Committed by GitHub
Browse files

remove connected disconnected state, 3rd attempt (#3898)



* overseer: remove mut in connector

* rename SelectRelayChainWFallback -> SelectRelayChain

* split Basics

* introduce the OverseerConnector, use it

* introduce is_relay_chain to RelayChainSelection

* chore: rename var

* avoid dummy import in subsystem

* actually remove Disconnecte/Connected enum

* extract DummySubsystem into mod dummy.

* Handle::Connected -> Handle::new

* chore: fmt

* fix test

* select relay chain takes no arg, simplification

* fmt

* Update node/service/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* chore: improve malus tests

* avoid the deferred setting of `is_relay_chain` in `RelayChainSelection`

* positive assertion is not mandated, only the negative one, to avoid a stall

* chore: fmt

* assure the `RelayChainSelection` is not used before the overseer is up and running

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent fdf3dfde
Pipeline #159555 failed with stages
in 10 minutes and 51 seconds
...@@ -6884,6 +6884,7 @@ dependencies = [ ...@@ -6884,6 +6884,7 @@ dependencies = [
"kvdb", "kvdb",
"kvdb-rocksdb", "kvdb-rocksdb",
"log", "log",
"lru",
"pallet-babe", "pallet-babe",
"pallet-im-online", "pallet-im-online",
"pallet-mmr-primitives", "pallet-mmr-primitives",
......
...@@ -185,6 +185,7 @@ struct BehaveMaleficient; ...@@ -185,6 +185,7 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient { impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>( fn generate<'a, Spawner, RuntimeClient>(
&self, &self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>, args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error> ) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error>
where where
...@@ -213,7 +214,7 @@ impl OverseerGen for BehaveMaleficient { ...@@ -213,7 +214,7 @@ impl OverseerGen for BehaveMaleficient {
), ),
); );
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner) Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
.map_err(|e| e.into()) .map_err(|e| e.into())
// A builder pattern will simplify this further // A builder pattern will simplify this further
......
...@@ -20,8 +20,8 @@ use polkadot_node_subsystem_test_helpers::*; ...@@ -20,8 +20,8 @@ use polkadot_node_subsystem_test_helpers::*;
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
messages::{AllMessages, AvailabilityStoreMessage}, messages::{AllMessages, AvailabilityStoreMessage},
overseer::{gen::TimeoutExt, Subsystem}, overseer::{dummy::DummySubsystem, gen::TimeoutExt, Subsystem},
DummySubsystem, SubsystemError,
}; };
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
...@@ -48,34 +48,38 @@ where ...@@ -48,34 +48,38 @@ where
} }
} }
#[derive(Clone, Debug)]
struct PassInterceptor;
impl<Sender> MessageInterceptor<Sender> for PassInterceptor
where
Sender: overseer::SubsystemSender<AllMessages>
+ overseer::SubsystemSender<AvailabilityStoreMessage>
+ Clone
+ 'static,
{
type Message = AvailabilityStoreMessage;
}
async fn overseer_send<T: Into<AllMessages>>(overseer: &mut TestSubsystemContextHandle<T>, msg: T) { async fn overseer_send<T: Into<AllMessages>>(overseer: &mut TestSubsystemContextHandle<T>, msg: T) {
overseer.send(FromOverseer::Communication { msg }).await; overseer.send(FromOverseer::Communication { msg }).await;
} }
#[test] fn launch_harness<F, M, Sub, G>(test_gen: G)
fn integrity_test() { where
F: Future<Output = TestSubsystemContextHandle<M>> + Send,
M: Into<AllMessages> + std::fmt::Debug + Send + 'static,
AllMessages: From<M>,
Sub: Subsystem<TestSubsystemContext<M, sp_core::testing::TaskExecutor>, SubsystemError>,
G: Fn(TestSubsystemContextHandle<M>) -> (F, Sub),
{
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
let (context, mut overseer) = make_subsystem_context(pool); let (context, overseer) = make_subsystem_context(pool);
let sub = DummySubsystem;
let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor);
// Try to send a message we know is going to be filtered. let (test_fut, subsystem) = test_gen(overseer);
let test_fut = async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
};
let subsystem = async move { let subsystem = async move {
sub_intercepted.start(context).future.await.unwrap(); subsystem.start(context).future.await.unwrap();
}; };
futures::pin_mut!(test_fut); futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem); futures::pin_mut!(subsystem);
...@@ -88,3 +92,49 @@ fn integrity_test() { ...@@ -88,3 +92,49 @@ fn integrity_test() {
)) ))
.1; .1;
} }
#[test]
fn integrity_test_intercept() {
launch_harness(|mut overseer| {
let sub = DummySubsystem;
let sub_intercepted = InterceptedSubsystem::new(sub, BlackHoleInterceptor);
(
async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
},
sub_intercepted,
)
})
}
#[test]
fn integrity_test_pass() {
launch_harness(|mut overseer| {
let sub = DummySubsystem;
let sub_intercepted = InterceptedSubsystem::new(sub, PassInterceptor);
(
async move {
let (tx, rx) = futures::channel::oneshot::channel();
overseer_send(
&mut overseer,
AvailabilityStoreMessage::QueryChunk(Default::default(), 0.into(), tx),
)
.await;
let _ = rx.timeout(std::time::Duration::from_millis(100)).await.unwrap();
overseer
},
sub_intercepted,
)
})
}
...@@ -37,7 +37,7 @@ use polkadot_cli::{ ...@@ -37,7 +37,7 @@ use polkadot_cli::{
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem; use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_subsystem::{ use polkadot_node_subsystem::{
messages::{AllMessages, CandidateValidationMessage}, messages::{AllMessages, CandidateValidationMessage},
overseer::{self, OverseerHandle}, overseer::{self, OverseerConnector, OverseerHandle},
FromOverseer, FromOverseer,
}; };
...@@ -86,6 +86,7 @@ struct BehaveMaleficient; ...@@ -86,6 +86,7 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient { impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>( fn generate<'a, Spawner, RuntimeClient>(
&self, &self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>, args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error> ) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where where
...@@ -113,7 +114,7 @@ impl OverseerGen for BehaveMaleficient { ...@@ -113,7 +114,7 @@ impl OverseerGen for BehaveMaleficient {
}, },
); );
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner) Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
} }
......
...@@ -29,7 +29,8 @@ use polkadot_node_subsystem_types::messages::{ ...@@ -29,7 +29,8 @@ use polkadot_node_subsystem_types::messages::{
use polkadot_overseer::{ use polkadot_overseer::{
self as overseer, self as overseer,
gen::{FromOverseer, SpawnedSubsystem}, gen::{FromOverseer, SpawnedSubsystem},
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError, AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector,
OverseerSignal, SubsystemError,
}; };
use polkadot_primitives::v1::Hash; use polkadot_primitives::v1::Hash;
...@@ -173,8 +174,15 @@ fn main() { ...@@ -173,8 +174,15 @@ fn main() {
.replace_candidate_validation(|_| Subsystem2) .replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig); .replace_candidate_backing(|orig| orig);
let (overseer, _handle) = let (overseer, _handle) = Overseer::new(
Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap(); vec![],
all_subsystems,
None,
AlwaysSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let overseer_fut = overseer.run().fuse(); let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream; let timer_stream = timer_stream;
......
...@@ -130,9 +130,13 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream { ...@@ -130,9 +130,13 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
&mut self.handle &mut self.handle
} }
/// Obtain access to the overseer handle. /// Obtain access to the overseer handle.
pub fn as_handle(&mut self) -> &#handle { pub fn as_handle(&self) -> &#handle {
&self.handle &self.handle
} }
/// Obtain a clone of the handle.
pub fn handle(&self) -> #handle {
self.handle.clone()
}
} }
impl ::std::default::Default for #connector { impl ::std::default::Default for #connector {
......
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use crate::{AllMessages, OverseerSignal};
use polkadot_node_subsystem_types::errors::SubsystemError;
use polkadot_overseer_gen::{FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext};
/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
#[derive(Clone, Copy, Debug)]
pub struct DummySubsystem;
impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
where
Context: SubsystemContext<
Signal = OverseerSignal,
Error = SubsystemError,
AllMessages = AllMessages,
>,
{
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Err(_) => return Ok(()),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(overseer_msg) => {
tracing::debug!(
target: "dummy-subsystem",
"Discarding a message sent from overseer {:?}",
overseer_msg
);
continue
},
}
}
});
SpawnedSubsystem { name: "dummy-subsystem", future }
}
}
...@@ -70,7 +70,6 @@ use std::{ ...@@ -70,7 +70,6 @@ use std::{
use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt}; use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
use lru::LruCache; use lru::LruCache;
use parking_lot::RwLock;
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost}; use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost};
...@@ -91,15 +90,18 @@ pub use polkadot_node_subsystem_types::{ ...@@ -91,15 +90,18 @@ pub use polkadot_node_subsystem_types::{
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal, jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
}; };
/// Test helper supplements.
pub mod dummy;
pub use self::dummy::DummySubsystem;
// TODO legacy, to be deleted, left for easier integration // TODO legacy, to be deleted, left for easier integration
// TODO https://github.com/paritytech/polkadot/issues/3427 // TODO https://github.com/paritytech/polkadot/issues/3427
mod subsystems; mod subsystems;
pub use self::subsystems::{AllSubsystems, DummySubsystem}; pub use self::subsystems::AllSubsystems;
mod metrics; pub mod metrics;
use self::metrics::Metrics;
use polkadot_node_metrics::{ pub use polkadot_node_metrics::{
metrics::{prometheus, Metrics as MetricsTrait}, metrics::{prometheus, Metrics as MetricsTrait},
Metronome, Metronome,
}; };
...@@ -115,7 +117,7 @@ pub use polkadot_overseer_gen::{ ...@@ -115,7 +117,7 @@ pub use polkadot_overseer_gen::{
/// Store 2 days worth of blocks, not accounting for forks, /// Store 2 days worth of blocks, not accounting for forks,
/// in the LRU cache. Assumes a 6-second block time. /// in the LRU cache. Assumes a 6-second block time.
const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6; pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
...@@ -141,18 +143,12 @@ where ...@@ -141,18 +143,12 @@ where
/// ///
/// [`Overseer`]: struct.Overseer.html /// [`Overseer`]: struct.Overseer.html
#[derive(Clone)] #[derive(Clone)]
pub enum Handle { pub struct Handle(OverseerHandle);
/// Used only at initialization to break the cyclic dependency.
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
/// A handle to the overseer.
Connected(OverseerHandle),
}
impl Handle { impl Handle {
/// Create a new disconnected [`Handle`]. /// Create a new [`Handle`].
pub fn new_disconnected() -> Self { pub fn new(raw: OverseerHandle) -> Self {
Self::Disconnected(Arc::new(RwLock::new(None))) Self(raw)
} }
/// Inform the `Overseer` that that some block was imported. /// Inform the `Overseer` that that some block was imported.
...@@ -201,58 +197,8 @@ impl Handle { ...@@ -201,58 +197,8 @@ impl Handle {
/// Most basic operation, to stop a server. /// Most basic operation, to stop a server.
async fn send_and_log_error(&mut self, event: Event) { async fn send_and_log_error(&mut self, event: Event) {
self.try_connect(); if self.0.send(event).await.is_err() {
if let Self::Connected(ref mut handle) = self { tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
if handle.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
} else {
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
}
}
/// Whether the handle is disconnected.
pub fn is_disconnected(&self) -> bool {
match self {
Self::Disconnected(ref x) => x.read().is_none(),
_ => false,
}
}
/// Connect this handle and all disconnected clones of it to the overseer.
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
match self {
Self::Disconnected(ref mut x) => {
let mut maybe_handle = x.write();
if maybe_handle.is_none() {
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
*maybe_handle = Some(handle);
} else {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect a clone of a connected Handle",
);
}
},
_ => {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect an already connected Handle",
);
},
}
}
/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
fn try_connect(&mut self) {
if let Self::Disconnected(ref mut x) = self {
let guard = x.write();
if let Some(ref h) = *guard {
let handle = h.clone();
drop(guard);
*self = Self::Connected(handle);
}
} }
} }
} }
...@@ -439,7 +385,7 @@ pub struct Overseer<SupportsParachains> { ...@@ -439,7 +385,7 @@ pub struct Overseer<SupportsParachains> {
pub known_leaves: LruCache<Hash, ()>, pub known_leaves: LruCache<Hash, ()>,
/// Various Prometheus metrics. /// Various Prometheus metrics.
pub metrics: Metrics, pub metrics: crate::metrics::Metrics,
} }
impl<S, SupportsParachains> Overseer<S, SupportsParachains> impl<S, SupportsParachains> Overseer<S, SupportsParachains>
...@@ -490,12 +436,13 @@ where ...@@ -490,12 +436,13 @@ where
/// # use polkadot_primitives::v1::Hash; /// # use polkadot_primitives::v1::Hash;
/// # use polkadot_overseer::{ /// # use polkadot_overseer::{
/// # self as overseer, /// # self as overseer,
/// # Overseer,
/// # OverseerSignal, /// # OverseerSignal,
/// # OverseerConnector,
/// # SubsystemSender as _, /// # SubsystemSender as _,
/// # AllMessages, /// # AllMessages,
/// # AllSubsystems, /// # AllSubsystems,
/// # HeadSupportsParachains, /// # HeadSupportsParachains,
/// # Overseer,
/// # SubsystemError, /// # SubsystemError,
/// # gen::{ /// # gen::{
/// # SubsystemContext, /// # SubsystemContext,
...@@ -549,6 +496,7 @@ where ...@@ -549,6 +496,7 @@ where
/// None, /// None,
/// AlwaysSupportsParachains, /// AlwaysSupportsParachains,
/// spawner, /// spawner,
/// OverseerConnector::default(),
/// ).unwrap(); /// ).unwrap();
/// ///
/// let timer = Delay::new(Duration::from_millis(50)).fuse(); /// let timer = Delay::new(Duration::from_millis(50)).fuse();
...@@ -615,6 +563,7 @@ where ...@@ -615,6 +563,7 @@ where
prometheus_registry: Option<&prometheus::Registry>, prometheus_registry: Option<&prometheus::Registry>,
supports_parachains: SupportsParachains, supports_parachains: SupportsParachains,
s: S, s: S,
connector: OverseerConnector,
) -> SubsystemResult<(Self, OverseerHandle)> ) -> SubsystemResult<(Self, OverseerHandle)>
where where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send, CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
...@@ -643,7 +592,7 @@ where ...@@ -643,7 +592,7 @@ where
CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send, CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send,
S: SpawnNamed, S: SpawnNamed,
{ {
let metrics: Metrics = <Metrics as MetricsTrait>::register(prometheus_registry)?; let metrics = <crate::metrics::Metrics as MetricsTrait>::register(prometheus_registry)?;
let (mut overseer, handle) = Self::builder() let (mut overseer, handle) = Self::builder()
.candidate_validation(all_subsystems.candidate_validation) .candidate_validation(all_subsystems.candidate_validation)
...@@ -679,7 +628,7 @@ where ...@@ -679,7 +628,7 @@ where
.supports_parachains(supports_parachains) .supports_parachains(supports_parachains)
.metrics(metrics.clone()) .metrics(metrics.clone())
.spawner(s) .spawner(s)
.build()?; .build_with_connector(connector)?;
// spawn the metrics metronome task // spawn the metrics metronome task
{ {
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
//! Prometheus metrics related to the overseer and its channels. //! Prometheus metrics related to the overseer and its channels.
use super::*; use super::*;
use polkadot_node_metrics::metrics::{self, prometheus}; pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait};
use parity_util_mem::MemoryAllocationSnapshot; use parity_util_mem::MemoryAllocationSnapshot;
...@@ -110,7 +110,7 @@ impl Metrics { ...@@ -110,7 +110,7 @@ impl Metrics {
} }
} }
impl metrics::Metrics for Metrics { impl MetricsTrait for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> { fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner { let metrics = MetricsInner {
activated_heads_total: prometheus::register( activated_heads_total: prometheus::register(
......
...@@ -19,47 +19,9 @@ ...@@ -19,47 +19,9 @@
//! In the future, everything should be set up using the generated //! In the future, everything should be set up using the generated
//! overseer builder pattern instead. //! overseer builder pattern instead.
use crate::{AllMessages, OverseerSignal}; use crate::dummy::DummySubsystem;
use polkadot_node_subsystem_types::errors::SubsystemError;
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen; use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
use polkadot_overseer_gen::{ use polkadot_overseer_gen::MapSubsystem;
FromOverseer, MapSubsystem, SpawnedSubsystem, Subsystem, SubsystemContext,