From bd9b743872c9dd594879ccf220566c96f0f3f2a1 Mon Sep 17 00:00:00 2001 From: Andronik Ordian <write@reusable.software> Date: Mon, 26 Jul 2021 14:46:31 +0200 Subject: [PATCH] enable disputes (#3478) * initial integration and migration code * fix tests * fix counting test * assume the current version on missing file * use SelectRelayChain * remove duplicate metric * Update node/service/src/lib.rs Co-authored-by: Robert Habermeier <rphmeier@gmail.com> * remove ApprovalCheckingVotingRule * address my concern * never mode for StagnantCheckInterval * REVERTME: some logs * w00t * it's ugly but it works * Revert "REVERTME: some logs" This reverts commit e210505a2e83e31c381394924500b69277bb042e. * it's handle, not handler * fix a few typos Co-authored-by: Robert Habermeier <rphmeier@gmail.com> --- polkadot/Cargo.lock | 4 + polkadot/node/core/chain-selection/src/lib.rs | 36 ++-- polkadot/node/malus/src/variant-a.rs | 4 +- polkadot/node/overseer/Cargo.toml | 1 + .../node/overseer/examples/minimal-example.rs | 2 +- .../overseer/overseer-gen/examples/dummy.rs | 2 +- polkadot/node/overseer/src/lib.rs | 115 +++++++--- polkadot/node/overseer/src/subsystems.rs | 40 +++- polkadot/node/overseer/src/tests.rs | 132 ++++++++---- polkadot/node/service/Cargo.toml | 6 + polkadot/node/service/src/grandpa_support.rs | 198 +----------------- polkadot/node/service/src/lib.rs | 88 ++++---- polkadot/node/service/src/overseer.rs | 51 ++++- .../node/service/src/parachains_db/mod.rs | 19 +- .../node/service/src/parachains_db/upgrade.rs | 24 ++- .../node/service/src/relay_chain_selection.rs | 14 +- .../node/subsystem-test-helpers/src/lib.rs | 7 +- polkadot/node/test/service/src/lib.rs | 20 +- .../adder/collator/src/main.rs | 10 +- 19 files changed, 405 insertions(+), 368 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 25672b1fc6d..d7b10d49007 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -6631,6 +6631,7 @@ dependencies = [ "kv-log-macro", "lru", "metered-channel", + "parking_lot 0.11.1", "polkadot-node-metrics", "polkadot-node-network-protocol", "polkadot-node-primitives", @@ -6976,6 +6977,9 @@ dependencies = [ "polkadot-node-core-bitfield-signing", "polkadot-node-core-candidate-validation", "polkadot-node-core-chain-api", + "polkadot-node-core-chain-selection", + "polkadot-node-core-dispute-coordinator", + "polkadot-node-core-dispute-participation", "polkadot-node-core-parachains-inherent", "polkadot-node-core-provisioner", "polkadot-node-core-runtime-api", diff --git a/polkadot/node/core/chain-selection/src/lib.rs b/polkadot/node/core/chain-selection/src/lib.rs index 9862f60d7de..91e766f53df 100644 --- a/polkadot/node/core/chain-selection/src/lib.rs +++ b/polkadot/node/core/chain-selection/src/lib.rs @@ -28,6 +28,7 @@ use polkadot_node_subsystem::{ use kvdb::KeyValueDB; use parity_scale_codec::Error as CodecError; use futures::channel::oneshot; +use futures::future::Either; use futures::prelude::*; use std::time::{UNIX_EPOCH, Duration,SystemTime}; @@ -244,7 +245,7 @@ impl Clock for SystemClock { /// The interval, in seconds to check for stagnant blocks. #[derive(Debug, Clone)] -pub struct StagnantCheckInterval(Duration); +pub struct StagnantCheckInterval(Option<Duration>); impl Default for StagnantCheckInterval { fn default() -> Self { @@ -255,28 +256,37 @@ impl Default for StagnantCheckInterval { // between 2 validators is D + 5s. const DEFAULT_STAGNANT_CHECK_INTERVAL: Duration = Duration::from_secs(5); - StagnantCheckInterval(DEFAULT_STAGNANT_CHECK_INTERVAL) + StagnantCheckInterval(Some(DEFAULT_STAGNANT_CHECK_INTERVAL)) } } impl StagnantCheckInterval { /// Create a new stagnant-check interval wrapping the given duration. pub fn new(interval: Duration) -> Self { - StagnantCheckInterval(interval) + StagnantCheckInterval(Some(interval)) } - fn timeout_stream(&self) -> impl Stream<Item = ()> { - let interval = self.0; - let mut delay = futures_timer::Delay::new(interval); + /// Create a `StagnantCheckInterval` which never triggers. + pub fn never() -> Self { + StagnantCheckInterval(None) + } - futures::stream::poll_fn(move |cx| { - let poll = delay.poll_unpin(cx); - if poll.is_ready() { - delay.reset(interval) - } + fn timeout_stream(&self) -> impl Stream<Item = ()> { + match self.0 { + Some(interval) => Either::Left({ + let mut delay = futures_timer::Delay::new(interval); + + futures::stream::poll_fn(move |cx| { + let poll = delay.poll_unpin(cx); + if poll.is_ready() { + delay.reset(interval) + } - poll.map(Some) - }) + poll.map(Some) + }) + }), + None => Either::Right(futures::stream::pending()), + } } } diff --git a/polkadot/node/malus/src/variant-a.rs b/polkadot/node/malus/src/variant-a.rs index f53e9d36f84..fa2e1423d14 100644 --- a/polkadot/node/malus/src/variant-a.rs +++ b/polkadot/node/malus/src/variant-a.rs @@ -27,7 +27,7 @@ use polkadot_cli::{ create_default_subsystems, service::{ AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer, - OverseerGen, OverseerGenArgs, Handle, ParachainHost, ProvideRuntimeApi, + OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost, ProvideRuntimeApi, SpawnNamed, }, Cli, @@ -73,7 +73,7 @@ impl OverseerGen for BehaveMaleficient { fn generate<'a, Spawner, RuntimeClient>( &self, args: OverseerGenArgs<'a, Spawner, RuntimeClient>, - ) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, Handle), Error> + ) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error> where RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index 6f436c0d33f..40b0fadf8b3 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -10,6 +10,7 @@ client = { package = "sc-client-api", git = "https://github.com/paritytech/subst sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = "0.3.15" futures-timer = "3.0.2" +parking_lot = "0.11.1" polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem-types = { path = "../subsystem-types" } diff --git a/polkadot/node/overseer/examples/minimal-example.rs b/polkadot/node/overseer/examples/minimal-example.rs index 71a703f9933..604ef3358d6 100644 --- a/polkadot/node/overseer/examples/minimal-example.rs +++ b/polkadot/node/overseer/examples/minimal-example.rs @@ -174,7 +174,7 @@ fn main() { .replace_candidate_backing(Subsystem1) ; - let (overseer, _handler) = Overseer::new( + let (overseer, _handle) = Overseer::new( vec![], all_subsystems, None, diff --git a/polkadot/node/overseer/overseer-gen/examples/dummy.rs b/polkadot/node/overseer/overseer-gen/examples/dummy.rs index 401e70e89f2..732851ca386 100644 --- a/polkadot/node/overseer/overseer-gen/examples/dummy.rs +++ b/polkadot/node/overseer/overseer-gen/examples/dummy.rs @@ -123,7 +123,7 @@ impl SpawnNamed for DummySpawner{ struct DummyCtx; fn main() { - let (overseer, _handler): (Xxx<_>, _) = Xxx::builder() + let (overseer, _handle): (Xxx<_>, _) = Xxx::builder() .sub0(AwesomeSubSys::default()) .plinkos(GoblinTower::default()) .i_like_pi(::std::f64::consts::PI) diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 1185d768c1a..4a6bfc23f51 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -73,6 +73,7 @@ use futures::{ Future, FutureExt, StreamExt, }; use lru::LruCache; +use parking_lot::RwLock; use polkadot_primitives::v1::{Block, BlockId,BlockNumber, Hash, ParachainHost}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; @@ -159,13 +160,24 @@ impl<Client> HeadSupportsParachains for Arc<Client> where } -/// A handler used to communicate with the [`Overseer`]. +/// A handle used to communicate with the [`Overseer`]. /// /// [`Overseer`]: struct.Overseer.html #[derive(Clone)] -pub struct Handle(pub OverseerHandle); +pub enum Handle { + /// Used only at initialization to break the cyclic dependency. + // TODO: refactor in https://github.com/paritytech/polkadot/issues/3427 + Disconnected(Arc<RwLock<Option<OverseerHandle>>>), + /// A handle to the overseer. + Connected(OverseerHandle), +} impl Handle { + /// Create a new disconnected [`Handle`]. + pub fn new_disconnected() -> Self { + Self::Disconnected(Arc::new(RwLock::new(None))) + } + /// Inform the `Overseer` that that some block was imported. pub async fn block_imported(&mut self, block: BlockInfo) { self.send_and_log_error(Event::BlockImported(block)).await @@ -207,25 +219,59 @@ impl Handle { /// Most basic operation, to stop a server. async fn send_and_log_error(&mut self, event: Event) { - if self.0.send(event).await.is_err() { - tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); + self.try_connect(); + if let Self::Connected(ref mut handle) = self { + if handle.send(event).await.is_err() { + tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer"); + } + } else { + tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer"); } } - /// Whether the overseer handler is connected to an overseer. - pub fn is_connected(&self) -> bool { - true + /// Whether the handle is disconnected. + pub fn is_disconnected(&self) -> bool { + match self { + Self::Disconnected(ref x) => x.read().is_none(), + _ => false, + } } - /// Whether the handler is disconnected. - pub fn is_disconnected(&self) -> bool { - false + /// Connect this handle and all disconnected clones of it to the overseer. + pub fn connect_to_overseer(&mut self, handle: OverseerHandle) { + match self { + Self::Disconnected(ref mut x) => { + let mut maybe_handle = x.write(); + if maybe_handle.is_none() { + tracing::info!(target: LOG_TARGET, "ðŸ–‡ï¸ Connecting all Handles to Overseer"); + *maybe_handle = Some(handle); + } else { + tracing::warn!( + target: LOG_TARGET, + "Attempting to connect a clone of a connected Handle", + ); + } + }, + _ => { + tracing::warn!( + target: LOG_TARGET, + "Attempting to connect an already connected Handle", + ); + }, + } } - /// Using this handler, connect another handler to the same - /// overseer, if any. - pub fn connect_other(&self, other: &mut Handle) { - *other = self.clone(); + /// Try upgrading from `Self::Disconnected` to `Self::Connected` state + /// after calling `connect_to_overseer` on `self` or a clone of `self`. + fn try_connect(&mut self) { + if let Self::Disconnected(ref mut x) = self { + let guard = x.write(); + if let Some(ref h) = *guard { + let handle = h.clone(); + drop(guard); + *self = Self::Connected(handle); + } + } } } @@ -301,7 +347,7 @@ pub enum ExternalRequest { /// import and finality notifications into the [`OverseerHandle`]. pub async fn forward_events<P: BlockchainEvents<Block>>( client: Arc<P>, - mut handler: Handle, + mut handle: Handle, ) { let mut finality = client.finality_notification_stream(); let mut imports = client.import_notification_stream(); @@ -311,7 +357,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>( f = finality.next() => { match f { Some(block) => { - handler.block_finalized(block.into()).await; + handle.block_finalized(block.into()).await; } None => break, } @@ -319,7 +365,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>( i = imports.next() => { match i { Some(block) => { - handler.block_imported(block.into()).await; + handle.block_imported(block.into()).await; } None => break, } @@ -338,7 +384,6 @@ pub async fn forward_events<P: BlockchainEvents<Block>>( network=NetworkBridgeEvent<protocol_v1::ValidationProtocol>, )] pub struct Overseer<SupportsParachains> { - #[subsystem(no_dispatch, CandidateValidationMessage)] candidate_validation: CandidateValidation, @@ -390,16 +435,16 @@ pub struct Overseer<SupportsParachains> { #[subsystem(no_dispatch, GossipSupportMessage)] gossip_support: GossipSupport, - #[subsystem(no_dispatch, wip, DisputeCoordinatorMessage)] - dipute_coordinator: DisputeCoordinator, + #[subsystem(no_dispatch, DisputeCoordinatorMessage)] + dispute_coordinator: DisputeCoordinator, - #[subsystem(no_dispatch, wip, DisputeParticipationMessage)] + #[subsystem(no_dispatch, DisputeParticipationMessage)] dispute_participation: DisputeParticipation, - #[subsystem(no_dispatch, wip, DisputeDistributionMessage)] - dipute_distribution: DisputeDistribution, + #[subsystem(no_dispatch, DisputeDistributionMessage)] + dispute_distribution: DisputeDistribution, - #[subsystem(no_dispatch, wip, ChainSelectionMessage)] + #[subsystem(no_dispatch, ChainSelectionMessage)] chain_selection: ChainSelection, /// External listeners waiting for a hash to be in the active-leave set. @@ -436,7 +481,7 @@ where /// This returns the overseer along with an [`OverseerHandle`] which can /// be used to send messages from external parts of the codebase. /// - /// The [`OverseerHandler`] returned from this function is connected to + /// The [`OverseerHandle`] returned from this function is connected to /// the returned [`Overseer`]. /// /// ```text @@ -527,7 +572,7 @@ where /// let spawner = sp_core::testing::TaskExecutor::new(); /// let all_subsystems = AllSubsystems::<()>::dummy() /// .replace_candidate_validation(ValidationSubsystem); - /// let (overseer, _handler) = Overseer::new( + /// let (overseer, _handle) = Overseer::new( /// vec![], /// all_subsystems, /// None, @@ -549,13 +594,13 @@ where /// # }); /// # } /// ``` - pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>( + pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>( leaves: impl IntoIterator<Item = BlockInfo>, - all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>, + all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>, prometheus_registry: Option<&prometheus::Registry>, supports_parachains: SupportsParachains, s: S, - ) -> SubsystemResult<(Self, Handle)> + ) -> SubsystemResult<(Self, OverseerHandle)> where CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send, CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>, SubsystemError> + Send, @@ -574,11 +619,15 @@ where ApD: Subsystem<OverseerSubsystemContext<ApprovalDistributionMessage>, SubsystemError> + Send, ApV: Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>, SubsystemError> + Send, GS: Subsystem<OverseerSubsystemContext<GossipSupportMessage>, SubsystemError> + Send, + DC: Subsystem<OverseerSubsystemContext<DisputeCoordinatorMessage>, SubsystemError> + Send, + DP: Subsystem<OverseerSubsystemContext<DisputeParticipationMessage>, SubsystemError> + Send, + DD: Subsystem<OverseerSubsystemContext<DisputeDistributionMessage>, SubsystemError> + Send, + CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send, S: SpawnNamed, { let metrics: Metrics = <Metrics as MetricsTrait>::register(prometheus_registry)?; - let (mut overseer, handler) = Self::builder() + let (mut overseer, handle) = Self::builder() .candidate_validation(all_subsystems.candidate_validation) .candidate_backing(all_subsystems.candidate_backing) .statement_distribution(all_subsystems.statement_distribution) @@ -596,6 +645,10 @@ where .approval_distribution(all_subsystems.approval_distribution) .approval_voting(all_subsystems.approval_voting) .gossip_support(all_subsystems.gossip_support) + .dispute_coordinator(all_subsystems.dispute_coordinator) + .dispute_participation(all_subsystems.dispute_participation) + .dispute_distribution(all_subsystems.dispute_distribution) + .chain_selection(all_subsystems.chain_selection) .leaves(Vec::from_iter( leaves.into_iter().map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)) )) @@ -647,7 +700,7 @@ where overseer.spawner().spawn("metrics_metronome", Box::pin(metronome)); } - Ok((overseer, Handle(handler))) + Ok((overseer, handle)) } /// Stop the overseer. diff --git a/polkadot/node/overseer/src/subsystems.rs b/polkadot/node/overseer/src/subsystems.rs index 3b3894b8369..192cbb8ba7b 100644 --- a/polkadot/node/overseer/src/subsystems.rs +++ b/polkadot/node/overseer/src/subsystems.rs @@ -77,7 +77,7 @@ where pub struct AllSubsystems< CV = (), CB = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (), RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (), - GS = (), + GS = (), DC = (), DP = (), DD = (), CS = (), > { /// A candidate validation subsystem. pub candidate_validation: CV, @@ -113,10 +113,18 @@ pub struct AllSubsystems< pub approval_voting: ApV, /// A Connection Request Issuer subsystem. pub gossip_support: GS, + /// A Dispute Coordinator subsystem. + pub dispute_coordinator: DC, + /// A Dispute Participation subsystem. + pub dispute_participation: DP, + /// A Dispute Distribution subsystem. + pub dispute_distribution: DD, + /// A Chain Selection subsystem. + pub chain_selection: CS, } -impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> - AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> +impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS> + AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS> { /// Create a new instance of [`AllSubsystems`]. /// @@ -148,6 +156,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, + DummySubsystem, > { AllSubsystems { candidate_validation: DummySubsystem, @@ -167,11 +179,15 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> approval_distribution: DummySubsystem, approval_voting: DummySubsystem, gossip_support: DummySubsystem, + dispute_coordinator: DummySubsystem, + dispute_participation: DummySubsystem, + dispute_distribution: DummySubsystem, + chain_selection: DummySubsystem, } } /// Reference every individual subsystem. - pub fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> { + pub fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS, &'_ DC, &'_ DP, &'_ DD, &'_ CS> { AllSubsystems { candidate_validation: &self.candidate_validation, candidate_backing: &self.candidate_backing, @@ -190,6 +206,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> approval_distribution: &self.approval_distribution, approval_voting: &self.approval_voting, gossip_support: &self.gossip_support, + dispute_coordinator: &self.dispute_coordinator, + dispute_participation: &self.dispute_participation, + dispute_distribution: &self.dispute_distribution, + chain_selection: &self.chain_selection, } } @@ -213,6 +233,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> <Mapper as MapSubsystem<ApD>>::Output, <Mapper as MapSubsystem<ApV>>::Output, <Mapper as MapSubsystem<GS>>::Output, + <Mapper as MapSubsystem<DC>>::Output, + <Mapper as MapSubsystem<DP>>::Output, + <Mapper as MapSubsystem<DD>>::Output, + <Mapper as MapSubsystem<CS>>::Output, > where Mapper: MapSubsystem<CV>, @@ -232,6 +256,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> Mapper: MapSubsystem<ApD>, Mapper: MapSubsystem<ApV>, Mapper: MapSubsystem<GS>, + Mapper: MapSubsystem<DC>, + Mapper: MapSubsystem<DP>, + Mapper: MapSubsystem<DD>, + Mapper: MapSubsystem<CS>, { AllSubsystems { candidate_validation: <Mapper as MapSubsystem<CV>>::map_subsystem(&mapper, self.candidate_validation), @@ -251,6 +279,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS> approval_distribution: <Mapper as MapSubsystem<ApD>>::map_subsystem(&mapper, self.approval_distribution), approval_voting: <Mapper as MapSubsystem<ApV>>::map_subsystem(&mapper, self.approval_voting), gossip_support: <Mapper as MapSubsystem<GS>>::map_subsystem(&mapper, self.gossip_support), + dispute_coordinator: <Mapper as MapSubsystem<DC>>::map_subsystem(&mapper, self.dispute_coordinator), + dispute_participation: <Mapper as MapSubsystem<DP>>::map_subsystem(&mapper, self.dispute_participation), + dispute_distribution: <Mapper as MapSubsystem<DD>>::map_subsystem(&mapper, self.dispute_distribution), + chain_selection: <Mapper as MapSubsystem<CS>>::map_subsystem(&mapper, self.chain_selection), } } } diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs index a4dd0359c70..9921b890658 100644 --- a/polkadot/node/overseer/src/tests.rs +++ b/polkadot/node/overseer/src/tests.rs @@ -18,6 +18,7 @@ use std::sync::atomic; use std::collections::HashMap; use std::task::{Poll}; use futures::{executor, pin_mut, select, FutureExt, pending, poll, stream}; +use futures::channel::mpsc; use polkadot_primitives::v1::{CollatorPair, CandidateHash}; use polkadot_node_primitives::{CollationResult, CollationGenerationConfig, PoV, BlockData}; @@ -166,13 +167,14 @@ fn overseer_works() { .replace_candidate_validation(TestSubsystem1(s1_tx)) .replace_candidate_backing(TestSubsystem2(s2_tx)); - let (overseer, mut handler) = Overseer::new( + let (overseer, handle) = Overseer::new( vec![], all_subsystems, None, MockSupportsParachains, spawner, ).unwrap(); + let mut handle = Handle::Connected(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -188,7 +190,7 @@ fn overseer_works() { Some(msg) => { s1_results.push(msg); if s1_results.len() == 10 { - handler.stop().await; + handle.stop().await; } } None => break, @@ -236,21 +238,22 @@ fn overseer_metrics_work() { let all_subsystems = AllSubsystems::<()>::dummy(); let registry = prometheus::Registry::new(); - let (overseer, mut handler) = Overseer::new( + let (overseer, handle) = Overseer::new( vec![first_block], all_subsystems, Some(®istry), MockSupportsParachains, spawner, ).unwrap(); + let mut handle = Handle::Connected(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); - handler.block_imported(second_block).await; - handler.block_imported(third_block).await; - handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; - handler.stop().await; + handle.block_imported(second_block).await; + handle.block_imported(third_block).await; + handle.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; + handle.stop().await; select! { res = overseer_fut => { @@ -398,13 +401,14 @@ fn overseer_start_stop_works() { let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_validation(TestSubsystem5(tx_5)) .replace_candidate_backing(TestSubsystem6(tx_6)); - let (overseer, mut handler) = Overseer::new( + let (overseer, handle) = Overseer::new( vec![first_block], all_subsystems, None, MockSupportsParachains, spawner, ).unwrap(); + let mut handle = Handle::Connected(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -412,8 +416,8 @@ fn overseer_start_stop_works() { let mut ss5_results = Vec::new(); let mut ss6_results = Vec::new(); - handler.block_imported(second_block).await; - handler.block_imported(third_block).await; + handle.block_imported(second_block).await; + handle.block_imported(third_block).await; let expected_heartbeats = vec![ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf { @@ -463,7 +467,7 @@ fn overseer_start_stop_works() { if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() { - handler.stop().await; + handle.stop().await; } } @@ -507,13 +511,14 @@ fn overseer_finalize_works() { .replace_candidate_backing(TestSubsystem6(tx_6)); // start with two forks of different height. - let (overseer, mut handler) = Overseer::new( + let (overseer, handle) = Overseer::new( vec![first_block, second_block], all_subsystems, None, MockSupportsParachains, spawner, ).unwrap(); + let mut handle = Handle::Connected(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); @@ -522,7 +527,7 @@ fn overseer_finalize_works() { let mut ss6_results = Vec::new(); // this should stop work on both forks we started with earlier. - handler.block_finalized(third_block).await; + handle.block_finalized(third_block).await; let expected_heartbeats = vec![ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -569,7 +574,7 @@ fn overseer_finalize_works() { } if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() { - handler.stop().await; + handle.stop().await; } } @@ -607,21 +612,22 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { let all_subsystems = AllSubsystems::<()>::dummy() .replace_candidate_backing(TestSubsystem6(tx_5)); - let (overseer, mut handler) = Overseer::new( + let (overseer, handle) = Overseer::new( Vec::new(), all_subsystems, None, MockSupportsParachains, spawner, ).unwrap(); + let mut handle = Handle::Connected(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); let mut ss5_results = Vec::new(); - handler.block_finalized(finalized_block.clone()).await; - handler.block_imported(imported_block.clone()).await; + handle.block_finalized(finalized_block.clone()).await; + handle.block_imported(imported_block.clone()).await; let expected_heartbeats = vec![ OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { @@ -652,7 +658,7 @@ fn do_not_send_empty_leaves_update_on_block_finalization() { } if ss5_results.len() == expected_heartbeats.len() { - handler.stop().await; + handle.stop().await; } } @@ -809,10 +815,35 @@ fn test_approval_voting_msg() -> ApprovalVotingMessage { ApprovalVotingMessage::ApprovedAncestor(Default::default(), 0, sender) } +fn test_dispute_coordinator_msg() -> DisputeCoordinatorMessage { + let (sender, _) = oneshot::channel(); + DisputeCoordinatorMessage::RecentDisputes(sender) +} + +fn test_dispute_participation_msg() -> DisputeParticipationMessage { + let (sender, _) = oneshot::channel(); + DisputeParticipationMessage::Participate { + candidate_hash: Default::default(), + candidate_receipt: Default::default(), + session: 0, + n_validators: 0, + report_availability: sender, + } +} + +fn test_dispute_distribution_msg() -> DisputeDistributionMessage { + let (_, receiver) = mpsc::channel(1); + DisputeDistributionMessage::DisputeSendingReceiver(receiver) +} + +fn test_chain_selection_msg() -> ChainSelectionMessage { + ChainSelectionMessage::Approved(Default::default()) +} + // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { - const NUM_SUBSYSTEMS: usize = 17; + const NUM_SUBSYSTEMS: usize = 21; // -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 3; @@ -846,20 +877,25 @@ fn overseer_all_subsystems_receive_signals_and_messages() { approval_distribution: subsystem.clone(), approval_voting: subsystem.clone(), gossip_support: subsystem.clone(), + dispute_coordinator: subsystem.clone(), + dispute_participation: subsystem.clone(), + dispute_distribution: subsystem.clone(), + chain_selection: subsystem.clone(), }; - let (overseer, mut handler) = Overseer::new( + let (overseer, handle) = Overseer::new( vec![], all_subsystems, None, MockSupportsParachains, spawner, ).unwrap(); + let mut handle = Handle::Connected(handle); let overseer_fut = overseer.run().fuse(); pin_mut!(overseer_fut); // send a signal to each subsystem - handler.block_imported(BlockInfo { + handle.block_imported(BlockInfo { hash: Default::default(), parent_hash: Default::default(), number: Default::default(), @@ -867,22 +903,26 @@ fn overseer_all_subsystems_receive_signals_and_messages() { // send a msg to each subsystem // except for BitfieldSigning and GossipSupport as the messages are not instantiable - handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; - handler.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await; - handler.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await; - handler.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await; - handler.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; - handler.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await; - // handler.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await; - // handler.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await; - handler.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await; - handler.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await; - handler.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await; - handler.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await; - handler.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await; - handler.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await; - handler.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await; - handler.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await; + handle.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; + handle.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await; + handle.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await; + handle.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await; + handle.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; + handle.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await; + // handle.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await; + // handle.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await; + handle.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await; + handle.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await; + handle.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await; + handle.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await; + handle.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await; + handle.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await; + handle.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await; + handle.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await; + handle.send_msg_anon(AllMessages::DisputeCoordinator(test_dispute_coordinator_msg())).await; + handle.send_msg_anon(AllMessages::DisputeParticipation(test_dispute_participation_msg())).await; + handle.send_msg_anon(AllMessages::DisputeDistribution(test_dispute_distribution_msg())).await; + handle.send_msg_anon(AllMessages::ChainSelection(test_chain_selection_msg())).await; // Wait until all subsystems have received. Otherwise the messages might race against // the conclude signal. @@ -903,7 +943,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { } // send a stop signal to each subsystems - handler.stop().await; + handle.stop().await; let res = overseer_fut.await; assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS); @@ -933,6 +973,10 @@ fn context_holds_onto_message_until_enough_signals_received() { let (approval_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (approval_voting_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (gossip_support_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (dispute_coordinator_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (dispute_participation_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (dispute_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (chain_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (candidate_validation_unbounded_tx, _) = metered::unbounded(); let (candidate_backing_unbounded_tx, _) = metered::unbounded(); @@ -951,6 +995,10 @@ fn context_holds_onto_message_until_enough_signals_received() { let (approval_distribution_unbounded_tx, _) = metered::unbounded(); let (approval_voting_unbounded_tx, _) = metered::unbounded(); let (gossip_support_unbounded_tx, _) = metered::unbounded(); + let (dispute_coordinator_unbounded_tx, _) = metered::unbounded(); + let (dispute_participation_unbounded_tx, _) = metered::unbounded(); + let (dispute_distribution_unbounded_tx, _) = metered::unbounded(); + let (chain_selection_unbounded_tx, _) = metered::unbounded(); let channels_out = ChannelsOut { candidate_validation: candidate_validation_bounded_tx.clone(), @@ -970,6 +1018,10 @@ fn context_holds_onto_message_until_enough_signals_received() { approval_distribution: approval_distribution_bounded_tx.clone(), approval_voting: approval_voting_bounded_tx.clone(), gossip_support: gossip_support_bounded_tx.clone(), + dispute_coordinator: dispute_coordinator_bounded_tx.clone(), + dispute_participation: dispute_participation_bounded_tx.clone(), + dispute_distribution: dispute_distribution_bounded_tx.clone(), + chain_selection: chain_selection_bounded_tx.clone(), candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), @@ -988,6 +1040,10 @@ fn context_holds_onto_message_until_enough_signals_received() { approval_distribution_unbounded: approval_distribution_unbounded_tx.clone(), approval_voting_unbounded: approval_voting_unbounded_tx.clone(), gossip_support_unbounded: gossip_support_unbounded_tx.clone(), + dispute_coordinator_unbounded: dispute_coordinator_unbounded_tx.clone(), + dispute_participation_unbounded: dispute_participation_unbounded_tx.clone(), + dispute_distribution_unbounded: dispute_distribution_unbounded_tx.clone(), + chain_selection_unbounded: chain_selection_unbounded_tx.clone(), }; let (mut signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY); diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index f5938b4d473..14ddc256ec6 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -103,6 +103,9 @@ polkadot-node-core-backing = { path = "../core/backing", optional = true } polkadot-node-core-bitfield-signing = { path = "../core/bitfield-signing", optional = true } polkadot-node-core-candidate-validation = { path = "../core/candidate-validation", optional = true } polkadot-node-core-chain-api = { path = "../core/chain-api", optional = true } +polkadot-node-core-chain-selection = { path = "../core/chain-selection", optional = true } +polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator", optional = true } +polkadot-node-core-dispute-participation = { path = "../core/dispute-participation", optional = true } polkadot-node-core-provisioner = { path = "../core/provisioner", optional = true } polkadot-node-core-runtime-api = { path = "../core/runtime-api", optional = true } polkadot-statement-distribution = { path = "../network/statement-distribution", optional = true } @@ -133,6 +136,9 @@ full-node = [ "polkadot-node-core-bitfield-signing", "polkadot-node-core-candidate-validation", "polkadot-node-core-chain-api", + "polkadot-node-core-chain-selection", + "polkadot-node-core-dispute-coordinator", + "polkadot-node-core-dispute-participation", "polkadot-node-core-provisioner", "polkadot-node-core-runtime-api", "polkadot-statement-distribution", diff --git a/polkadot/node/service/src/grandpa_support.rs b/polkadot/node/service/src/grandpa_support.rs index 742e6bffced..8e0affc7264 100644 --- a/polkadot/node/service/src/grandpa_support.rs +++ b/polkadot/node/service/src/grandpa_support.rs @@ -23,179 +23,7 @@ use sp_runtime::generic::BlockId; use sp_runtime::traits::Header as _; #[cfg(feature = "full-node")] -use { - polkadot_primitives::v1::{Hash, Block as PolkadotBlock, Header as PolkadotHeader}, - polkadot_subsystem::messages::ApprovalVotingMessage, - prometheus_endpoint::{self, Registry}, - polkadot_overseer::Handle, - futures::channel::oneshot, -}; - -/// A custom GRANDPA voting rule that acts as a diagnostic for the approval -/// voting subsystem's desired votes. -/// -/// The practical effect of this voting rule is to implement a fixed delay of -/// blocks and to issue a prometheus metric on the lag behind the head that -/// approval checking would indicate. -#[cfg(feature = "full-node")] -#[derive(Clone)] -pub(crate) struct ApprovalCheckingVotingRule { - checking_lag: Option<prometheus_endpoint::Gauge<prometheus_endpoint::U64>>, - overseer: Handle, -} - -#[cfg(feature = "full-node")] -impl ApprovalCheckingVotingRule { - /// Create a new approval checking diagnostic voting rule. - pub fn new(overseer: Handle, registry: Option<&Registry>) - -> Result<Self, prometheus_endpoint::PrometheusError> - { - Ok(ApprovalCheckingVotingRule { - checking_lag: if let Some(registry) = registry { - Some(prometheus_endpoint::register( - prometheus_endpoint::Gauge::with_opts( - prometheus_endpoint::Opts::new( - "parachain_approval_checking_finality_lag", - "How far behind the head of the chain the Approval Checking protocol wants to vote", - ) - )?, - registry, - )?) - } else { - None - }, - overseer, - }) - } -} - -#[cfg(feature = "full-node")] -#[derive(Debug, PartialEq)] -/// Vote explicitly on the given hash. -enum ParachainVotingRuleTarget<H, N> { - Explicit((H, N)), - /// Vote on the current target. - Current, - /// Vote on the base target - the minimal possible vote. - Base, -} - -#[cfg(feature = "full-node")] -fn approval_checking_vote_to_grandpa_vote<H, N: PartialOrd>( - approval_checking_vote: Option<(H, N)>, - current_number: N, -) -> ParachainVotingRuleTarget<H, N> { - match approval_checking_vote { - Some((hash, number)) => if number > current_number { - // respect other voting rule. - ParachainVotingRuleTarget::Current - } else { - ParachainVotingRuleTarget::Explicit((hash, number)) - }, - // If approval-voting chooses 'None', that means we should vote on the base (last round estimate). - None => ParachainVotingRuleTarget::Base, - } -} - -/// The maximum amount of unfinalized blocks we are willing to allow due to approval checking lag. -/// This is a safety net that should be removed at some point in the future. -#[cfg(feature = "full-node")] -const MAX_APPROVAL_CHECKING_FINALITY_LAG: polkadot_primitives::v1::BlockNumber = 50; - -#[cfg(feature = "full-node")] -impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingVotingRule - where B: sp_blockchain::HeaderBackend<PolkadotBlock> + 'static -{ - fn restrict_vote( - &self, - backend: Arc<B>, - base: &PolkadotHeader, - best_target: &PolkadotHeader, - current_target: &PolkadotHeader, - ) -> grandpa::VotingRuleResult<PolkadotBlock> { - // Query approval checking and issue metrics. - let mut overseer = self.overseer.clone(); - let checking_lag = self.checking_lag.clone(); - - let best_hash = best_target.hash(); - let best_number = best_target.number.clone(); - let best_header = best_target.clone(); - - let current_hash = current_target.hash(); - let current_number = current_target.number.clone(); - - let base_hash = base.hash(); - let base_number = base.number; - - Box::pin(async move { - let (tx, rx) = oneshot::channel(); - let approval_checking_subsystem_vote = { - overseer.send_msg( - ApprovalVotingMessage::ApprovedAncestor( - best_hash, - base_number, - tx, - ), - std::any::type_name::<Self>(), - ).await; - - rx.await.ok().and_then(|v| v) - }; - - let approval_checking_subsystem_lag = approval_checking_subsystem_vote.map_or( - best_number - base_number, - |(_h, n)| best_number - n, - ); - - if let Some(ref checking_lag) = checking_lag { - checking_lag.set(approval_checking_subsystem_lag as _); - } - - let min_vote = { - let diff = best_number.saturating_sub(base_number); - if diff >= MAX_APPROVAL_CHECKING_FINALITY_LAG { - // Catch up to the best, with some extra lag. - let target_number = best_number - MAX_APPROVAL_CHECKING_FINALITY_LAG; - if target_number >= current_number { - Some((current_hash, current_number)) - } else { - walk_backwards_to_target_block(&*backend, target_number, &best_header).ok() - } - } else { - Some((base_hash, base_number)) - } - }; - - let vote = match approval_checking_vote_to_grandpa_vote( - approval_checking_subsystem_vote, - current_number, - ) { - ParachainVotingRuleTarget::Explicit(vote) => { - if min_vote.as_ref().map_or(false, |min| min.1 > vote.1) { - min_vote - } else { - Some(vote) - } - } - ParachainVotingRuleTarget::Current => Some((current_hash, current_number)), - ParachainVotingRuleTarget::Base => min_vote.or(Some((base_hash, base_number))), - }; - - tracing::trace!( - target: "parachain::approval-voting", - ?vote, - ?approval_checking_subsystem_vote, - approval_checking_subsystem_lag, - current_number, - best_number, - base_number, - "GRANDPA: voting based on approved ancestor.", - ); - - vote - }) - } -} +use polkadot_primitives::v1::Hash; /// Returns the block hash of the block at the given `target_number` by walking /// backwards from the given `current_header`. @@ -433,7 +261,6 @@ mod tests { use sp_runtime::{generic::BlockId, traits::Header}; use consensus_common::BlockOrigin; use std::sync::Arc; - use super::{approval_checking_vote_to_grandpa_vote, ParachainVotingRuleTarget}; #[test] fn grandpa_pause_voting_rule_works() { @@ -545,27 +372,4 @@ mod tests { None, ); } - - #[test] - fn approval_checking_to_grandpa_rules() { - assert_eq!( - approval_checking_vote_to_grandpa_vote::<(), _>(None, 5), - ParachainVotingRuleTarget::Base, - ); - - assert_eq!( - approval_checking_vote_to_grandpa_vote(Some(("2", 2)), 3), - ParachainVotingRuleTarget::Explicit(("2", 2)), - ); - - assert_eq!( - approval_checking_vote_to_grandpa_vote(Some(("2", 2)), 2), - ParachainVotingRuleTarget::Explicit(("2", 2)), - ); - - assert_eq!( - approval_checking_vote_to_grandpa_vote(Some(("2", 2)), 1), - ParachainVotingRuleTarget::Current, - ); - } } diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 50c2af70719..c6d1241d8da 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -42,6 +42,11 @@ use { polkadot_node_core_av_store::Error as AvailabilityError, polkadot_node_core_approval_voting::Config as ApprovalVotingConfig, polkadot_node_core_candidate_validation::Config as CandidateValidationConfig, + polkadot_node_core_chain_selection::{ + self as chain_selection_subsystem, + Config as ChainSelectionConfig, + }, + polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig, polkadot_overseer::BlockInfo, sp_trie::PrefixedMemoryDB, sc_client_api::ExecutorProvider, @@ -56,7 +61,7 @@ pub use { sp_authority_discovery::AuthorityDiscoveryApi, sc_client_api::AuxStore, polkadot_primitives::v1::ParachainHost, - polkadot_overseer::{Overseer, Handle}, + polkadot_overseer::{Overseer, Handle, OverseerHandle}, }; pub use sp_core::traits::SpawnNamed; @@ -214,7 +219,7 @@ fn jaeger_launch_collector_with_agent(spawner: impl SpawnNamed, config: &Configu } #[cfg(feature = "full-node")] -type FullSelectChain = sc_consensus::LongestChain<FullBackend, Block>; +type FullSelectChain = relay_chain_selection::SelectRelayChain<FullBackend>; #[cfg(feature = "full-node")] type FullGrandpaBlockImport<RuntimeApi, Executor> = grandpa::GrandpaBlockImport< FullBackend, Block, FullClient<RuntimeApi, Executor>, FullSelectChain @@ -298,7 +303,11 @@ fn new_partial<RuntimeApi, Executor>( jaeger_launch_collector_with_agent(task_manager.spawn_handle(), &*config, jaeger_agent)?; - let select_chain = sc_consensus::LongestChain::new(backend.clone()); + let select_chain = relay_chain_selection::SelectRelayChain::new( + backend.clone(), + Handle::new_disconnected(), + polkadot_node_subsystem_util::metrics::Metrics::register(config.prometheus_registry())?, + ); let transaction_pool = sc_transaction_pool::BasicPool::new_full( config.transaction_pool.clone(), @@ -427,7 +436,7 @@ fn new_partial<RuntimeApi, Executor>( pub struct NewFull<C> { pub task_manager: TaskManager, pub client: C, - pub overseer_handler: Option<Handle>, + pub overseer_handle: Option<Handle>, pub network: Arc<sc_network::NetworkService<Block, <Block as BlockT>::Hash>>, pub rpc_handlers: RpcHandlers, pub backend: Arc<FullBackend>, @@ -440,7 +449,7 @@ impl<C> NewFull<C> { NewFull { client: func(self.client), task_manager: self.task_manager, - overseer_handler: self.overseer_handler, + overseer_handle: self.overseer_handle, network: self.network, rpc_handlers: self.rpc_handlers, backend: self.backend, @@ -480,7 +489,7 @@ impl IsCollator { /// Returns the active leaves the overseer should start with. #[cfg(feature = "full-node")] async fn active_leaves<RuntimeApi, Executor>( - select_chain: &sc_consensus::LongestChain<FullBackend, Block>, + select_chain: &impl SelectChain<Block>, client: &FullClient<RuntimeApi, Executor>, ) -> Result<Vec<BlockInfo>, Error> where @@ -573,7 +582,7 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( backend, mut task_manager, keystore_container, - select_chain, + mut select_chain, import_queue, transaction_pool, other: (rpc_extensions_builder, import_setup, rpc_setup, slot_duration, mut telemetry) @@ -655,6 +664,15 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( }, }; + let chain_selection_config = ChainSelectionConfig { + col_data: crate::parachains_db::REAL_COLUMNS.col_chain_selection_data, + stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(), + }; + + let dispute_coordinator_config = DisputeCoordinatorConfig { + col_data: crate::parachains_db::REAL_COLUMNS.col_dispute_coordinator_data, + }; + let chain_spec = config.chain_spec.cloned_box(); let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams { config, @@ -714,8 +732,6 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( None }; - // we'd say let overseer_handler = authority_discovery_service.map(|authority_discovery_service|, ...), - // but in that case we couldn't use ? to propagate errors let local_keystore = keystore_container.local_keystore(); if local_keystore.is_none() { tracing::info!("Cannot run as validator without local keystore."); @@ -724,8 +740,8 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( let maybe_params = local_keystore .and_then(move |k| authority_discovery_service.map(|a| (a, k))); - let overseer_handler = if let Some((authority_discovery_service, keystore)) = maybe_params { - let (overseer, overseer_handler) = overseer_gen.generate::< + let overseer_handle = if let Some((authority_discovery_service, keystore)) = maybe_params { + let (overseer, overseer_handle) = overseer_gen.generate::< service::SpawnTaskHandle, FullClient<RuntimeApi, Executor>, >( @@ -734,23 +750,26 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( keystore, runtime_client: overseer_client.clone(), parachains_db, - availability_config, - approval_voting_config, network_service: network.clone(), authority_discovery_service, request_multiplexer, registry: prometheus_registry.as_ref(), spawner, is_collator, + approval_voting_config, + availability_config, candidate_validation_config, + chain_selection_config, + dispute_coordinator_config, } )?; - let overseer_handler_clone = overseer_handler.clone(); + let handle = Handle::Connected(overseer_handle.clone()); + let handle_clone = handle.clone(); task_manager.spawn_essential_handle().spawn_blocking("overseer", Box::pin(async move { use futures::{pin_mut, select, FutureExt}; - let forward = polkadot_overseer::forward_events(overseer_client, overseer_handler_clone); + let forward = polkadot_overseer::forward_events(overseer_client, handle_clone); let forward = forward.fuse(); let overseer_fut = overseer.run().fuse(); @@ -764,8 +783,19 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( complete => (), } })); + // we should remove this check before we deploy parachains on polkadot + // TODO: https://github.com/paritytech/polkadot/issues/3326 + let should_connect_overseer = chain_spec.is_kusama() + || chain_spec.is_westend() + || chain_spec.is_rococo() + || chain_spec.is_wococo(); - Some(overseer_handler) + if should_connect_overseer { + select_chain.connect_to_overseer(overseer_handle.clone()); + } else { + tracing::info!("Overseer is running in the disconnected state"); + } + Some(handle) } else { None }; @@ -783,7 +813,7 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( ); let client_clone = client.clone(); - let overseer_handler = overseer_handler.as_ref().ok_or(Error::AuthoritiesRequireRealOverseer)?.clone(); + let overseer_handle = overseer_handle.as_ref().ok_or(Error::AuthoritiesRequireRealOverseer)?.clone(); let slot_duration = babe_link.config().slot_duration(); let babe_config = babe::BabeParams { keystore: keystore_container.sync_keystore(), @@ -795,11 +825,11 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( justification_sync_link: network.clone(), create_inherent_data_providers: move |parent, ()| { let client_clone = client_clone.clone(); - let overseer_handler = overseer_handler.clone(); + let overseer_handle = overseer_handle.clone(); async move { let parachain = polkadot_node_core_parachains_inherent::ParachainsInherentDataProvider::create( &*client_clone, - overseer_handler, + overseer_handle, parent, ).await.map_err(|e| Box::new(e))?; @@ -889,24 +919,6 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( // after the given pause block is finalized and restarting after the // given delay. let builder = grandpa::VotingRulesBuilder::default(); - // we should enable approval checking voting rule before we deploy parachains on polkadot - let enable_approval_checking_voting_rule = chain_spec.is_kusama() - || chain_spec.is_westend() - || chain_spec.is_rococo() - || chain_spec.is_wococo(); - - let builder = if let Some(ref overseer) = overseer_handler { - if enable_approval_checking_voting_rule { - builder.add(grandpa_support::ApprovalCheckingVotingRule::new( - overseer.clone(), - prometheus_registry.as_ref(), - )?) - } else { - builder - } - } else { - builder - }; let voting_rule = match grandpa_pause { Some((block, delay)) => { @@ -946,7 +958,7 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>( Ok(NewFull { task_manager, client, - overseer_handler, + overseer_handle, network, rpc_handlers, backend, diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 2bf6dd74716..b104481a9e8 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -28,7 +28,9 @@ use polkadot_network_bridge::RequestMultiplexer; use polkadot_node_core_av_store::Config as AvailabilityConfig; use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig; use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig; -use polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, Handle}; +use polkadot_node_core_chain_selection::Config as ChainSelectionConfig; +use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; +use polkadot_overseer::{AllSubsystems, BlockInfo, Overseer, OverseerHandle}; use polkadot_primitives::v1::ParachainHost; use sc_authority_discovery::Service as AuthorityDiscoveryService; use sp_api::ProvideRuntimeApi; @@ -54,6 +56,10 @@ pub use polkadot_availability_recovery::AvailabilityRecoverySubsystem; pub use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem; pub use polkadot_node_core_approval_voting::ApprovalVotingSubsystem; pub use polkadot_gossip_support::GossipSupport as GossipSupportSubsystem; +pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem; +pub use polkadot_node_core_dispute_participation::DisputeParticipationSubsystem; +pub use polkadot_dispute_distribution::DisputeDistributionSubsystem; +pub use polkadot_node_core_chain_selection::ChainSelectionSubsystem; /// Arguments passed for overseer construction. pub struct OverseerGenArgs<'a, Spawner, RuntimeClient> where @@ -69,10 +75,6 @@ pub struct OverseerGenArgs<'a, Spawner, RuntimeClient> where pub runtime_client: Arc<RuntimeClient>, /// The underlying key value store for the parachains. pub parachains_db: Arc<dyn kvdb::KeyValueDB>, - /// Configuration for the availability store subsystem. - pub availability_config: AvailabilityConfig, - /// Configuration for the approval voting subsystem. - pub approval_voting_config: ApprovalVotingConfig, /// Underlying network service implementation. pub network_service: Arc<sc_network::NetworkService<Block, Hash>>, /// Underlying authority discovery service. @@ -85,8 +87,16 @@ pub struct OverseerGenArgs<'a, Spawner, RuntimeClient> where pub spawner: Spawner, /// Determines the behavior of the collator. pub is_collator: IsCollator, + /// Configuration for the approval voting subsystem. + pub approval_voting_config: ApprovalVotingConfig, + /// Configuration for the availability store subsystem. + pub availability_config: AvailabilityConfig, /// Configuration for the candidate validation subsystem. pub candidate_validation_config: CandidateValidationConfig, + /// Configuration for the chain selection subsystem. + pub chain_selection_config: ChainSelectionConfig, + /// Configuration for the dispute coordinator subsystem. + pub dispute_coordinator_config: DisputeCoordinatorConfig, } /// Create a default, unaltered set of subsystems. @@ -99,15 +109,17 @@ pub fn create_default_subsystems<'a, Spawner, RuntimeClient> keystore, runtime_client, parachains_db, - availability_config, - approval_voting_config, network_service, authority_discovery_service, request_multiplexer, registry, spawner, is_collator, + approval_voting_config, + availability_config, candidate_validation_config, + chain_selection_config, + dispute_coordinator_config, .. } : OverseerGenArgs<'a, Spawner, RuntimeClient> ) -> Result< @@ -129,6 +141,10 @@ pub fn create_default_subsystems<'a, Spawner, RuntimeClient> ApprovalDistributionSubsystem, ApprovalVotingSubsystem, GossipSupportSubsystem, + DisputeCoordinatorSubsystem, + DisputeParticipationSubsystem, + DisputeDistributionSubsystem<AuthorityDiscoveryService>, + ChainSelectionSubsystem, >, Error > @@ -218,7 +234,7 @@ where ), approval_voting: ApprovalVotingSubsystem::with_config( approval_voting_config, - parachains_db, + parachains_db.clone(), keystore.clone(), Box::new(network_service.clone()), Metrics::register(registry)?, @@ -226,6 +242,21 @@ where gossip_support: GossipSupportSubsystem::new( keystore.clone(), ), + dispute_coordinator: DisputeCoordinatorSubsystem::new( + parachains_db.clone(), + dispute_coordinator_config, + keystore.clone(), + ), + dispute_participation: DisputeParticipationSubsystem::new(), + dispute_distribution: DisputeDistributionSubsystem::new( + keystore.clone(), + authority_discovery_service.clone(), + Metrics::register(registry)?, + ), + chain_selection: ChainSelectionSubsystem::new( + chain_selection_config, + parachains_db, + ), }; Ok(all_subsystems) } @@ -237,7 +268,7 @@ where /// would do. pub trait OverseerGen { /// Overwrite the full generation of the overseer, including the subsystems. - fn generate<'a, Spawner, RuntimeClient>(&self, args: OverseerGenArgs<'a, Spawner, RuntimeClient>) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, Handle), Error> + fn generate<'a, Spawner, RuntimeClient>(&self, args: OverseerGenArgs<'a, Spawner, RuntimeClient>) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error> where RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, @@ -256,7 +287,7 @@ pub struct RealOverseerGen; impl OverseerGen for RealOverseerGen { fn generate<'a, Spawner, RuntimeClient>(&self, args : OverseerGenArgs<'a, Spawner, RuntimeClient> - ) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, Handle), Error> + ) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error> where RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore, RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>, diff --git a/polkadot/node/service/src/parachains_db/mod.rs b/polkadot/node/service/src/parachains_db/mod.rs index 4af16c42b34..123d154a129 100644 --- a/polkadot/node/service/src/parachains_db/mod.rs +++ b/polkadot/node/service/src/parachains_db/mod.rs @@ -22,16 +22,21 @@ use { kvdb::KeyValueDB, }; +#[cfg(feature = "full-node")] mod upgrade; #[cfg(any(test,feature = "full-node"))] -mod columns { - pub const NUM_COLUMNS: u32 = 3; - +pub(crate) mod columns { + pub mod v0 { + pub const NUM_COLUMNS: u32 = 3; + } + pub const NUM_COLUMNS: u32 = 5; pub const COL_AVAILABILITY_DATA: u32 = 0; pub const COL_AVAILABILITY_META: u32 = 1; pub const COL_APPROVAL_DATA: u32 = 2; + pub const COL_CHAIN_SELECTION_DATA: u32 = 3; + pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; } /// Columns used by different subsystems. @@ -44,6 +49,10 @@ pub struct ColumnsConfig { pub col_availability_meta: u32, /// The column used by approval voting for data. pub col_approval_data: u32, + /// The column used by chain selection for data. + pub col_chain_selection_data: u32, + /// The column used by dispute coordinator for data. + pub col_dispute_coordinator_data: u32, } /// The real columns used by the parachains DB. @@ -52,6 +61,8 @@ pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { col_availability_data: columns::COL_AVAILABILITY_DATA, col_availability_meta: columns::COL_AVAILABILITY_META, col_approval_data: columns::COL_APPROVAL_DATA, + col_chain_selection_data: columns::COL_CHAIN_SELECTION_DATA, + col_dispute_coordinator_data: columns::COL_DISPUTE_COORDINATOR_DATA, }; /// The cache size for each column, in megabytes. @@ -76,7 +87,7 @@ impl Default for CacheSizes { } #[cfg(feature = "full-node")] -fn other_io_error(err: String) -> io::Error { +pub(crate) fn other_io_error(err: String) -> io::Error { io::Error::new(io::ErrorKind::Other, err) } diff --git a/polkadot/node/service/src/parachains_db/upgrade.rs b/polkadot/node/service/src/parachains_db/upgrade.rs index 6cf47b80cea..de4e09368b2 100644 --- a/polkadot/node/service/src/parachains_db/upgrade.rs +++ b/polkadot/node/service/src/parachains_db/upgrade.rs @@ -27,7 +27,7 @@ type Version = u32; const VERSION_FILE_NAME: &'static str = "parachain_db_version"; /// Current db version. -const CURRENT_VERSION: Version = 0; +const CURRENT_VERSION: Version = 1; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -56,6 +56,7 @@ pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> { let is_empty = db_path.read_dir().map_or(true, |mut d| d.next().is_none()); if !is_empty { match current_version(db_path)? { + 0 => migrate_from_version_0_to_1(db_path)?, CURRENT_VERSION => (), v => return Err(Error::FutureVersion { current: CURRENT_VERSION, @@ -68,10 +69,10 @@ pub fn try_upgrade_db(db_path: &Path) -> Result<(), Error> { } /// Reads current database version from the file at given path. -/// If the file does not exist, assumes version 0. +/// If the file does not exist, assumes the current version. fn current_version(path: &Path) -> Result<Version, Error> { match fs::read_to_string(version_file_path(path)) { - Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(0), + Err(ref err) if err.kind() == io::ErrorKind::NotFound => Ok(CURRENT_VERSION), Err(err) => Err(err.into()), Ok(content) => u32::from_str(&content).map_err(|_| Error::CorruptedVersionFile), } @@ -90,3 +91,20 @@ fn version_file_path(path: &Path) -> PathBuf { file_path.push(VERSION_FILE_NAME); file_path } + + +/// Migration from version 0 to version 1: +/// * the number of columns has changed from 3 to 5; +fn migrate_from_version_0_to_1(path: &Path) -> Result<(), Error> { + use kvdb_rocksdb::{Database, DatabaseConfig}; + + let db_path = path.to_str() + .ok_or_else(|| super::other_io_error("Invalid database path".into()))?; + let db_cfg = DatabaseConfig::with_columns(super::columns::v0::NUM_COLUMNS); + let db = Database::open(&db_cfg, db_path)?; + + db.add_column()?; + db.add_column()?; + + Ok(()) +} diff --git a/polkadot/node/service/src/relay_chain_selection.rs b/polkadot/node/service/src/relay_chain_selection.rs index 5d03712a5a7..0116e2a03ae 100644 --- a/polkadot/node/service/src/relay_chain_selection.rs +++ b/polkadot/node/service/src/relay_chain_selection.rs @@ -41,7 +41,7 @@ use { }, polkadot_subsystem::messages::{ApprovalVotingMessage, ChainSelectionMessage}, polkadot_node_subsystem_util::metrics::{self, prometheus}, - polkadot_overseer::Handle, + polkadot_overseer::{Handle, OverseerHandle}, futures::channel::oneshot, consensus_common::{Error as ConsensusError, SelectChain}, sp_blockchain::HeaderBackend, @@ -125,7 +125,6 @@ impl<B> SelectRelayChain<B> { /// Create a new [`SelectRelayChain`] wrapping the given chain backend /// and a handle to the overseer. - #[allow(unused)] pub fn new(backend: Arc<B>, overseer: Handle, metrics: Metrics) -> Self { SelectRelayChain { fallback: sc_consensus::LongestChain::new(backend.clone()), @@ -167,14 +166,13 @@ impl<B> SelectRelayChain<B> } impl<B> SelectRelayChain<B> { - /// Given an overseer handler, this connects the [`SelectRelayChain`]'s - /// internal handler to the same overseer. - #[allow(unused)] - pub fn connect_overseer_handler( + /// Given an overseer handle, connects the [`SelectRelayChain`]'s + /// internal handle and its clones to the same overseer. + pub fn connect_to_overseer( &mut self, - other_handler: &Handle, + handle: OverseerHandle, ) { - other_handler.connect_other(&mut self.overseer); + self.overseer.connect_to_overseer(handle); } } diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 5ac6c5acdca..2a7a66ac480 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -375,7 +375,7 @@ where #[cfg(test)] mod tests { use super::*; - use polkadot_overseer::{Overseer, HeadSupportsParachains, AllSubsystems}; + use polkadot_overseer::{Overseer, Handle, HeadSupportsParachains, AllSubsystems}; use futures::executor::block_on; use polkadot_primitives::v1::Hash; use polkadot_node_subsystem::messages::CollatorProtocolMessage; @@ -390,17 +390,18 @@ mod tests { let spawner = sp_core::testing::TaskExecutor::new(); let (tx, rx) = mpsc::channel(2); let all_subsystems = AllSubsystems::<()>::dummy().replace_collator_protocol(ForwardSubsystem(tx)); - let (overseer, mut handler) = Overseer::new( + let (overseer, handle) = Overseer::new( Vec::new(), all_subsystems, None, AlwaysSupportsParachains, spawner.clone(), ).unwrap(); + let mut handle = Handle::Connected(handle); spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed()); - block_on(handler.send_msg_anon(CollatorProtocolMessage::CollateOn(Default::default()))); + block_on(handle.send_msg_anon(CollatorProtocolMessage::CollateOn(Default::default()))); assert!(matches!(block_on(rx.into_future()).0.unwrap(), CollatorProtocolMessage::CollateOn(_))); } } diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index 6e17e37edeb..bd38724b006 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -223,17 +223,17 @@ pub fn run_validator_node( ) -> PolkadotTestNode { let config = node_config(storage_update_func, task_executor, key, boot_nodes, true); let multiaddr = config.network.listen_addresses[0].clone(); - let NewFull { task_manager, client, network, rpc_handlers, overseer_handler, .. } = + let NewFull { task_manager, client, network, rpc_handlers, overseer_handle, .. } = new_full(config, IsCollator::No, worker_program_path).expect("could not create Polkadot test service"); - let overseer_handler = overseer_handler.expect("test node must have an overseer handler"); + let overseer_handle = overseer_handle.expect("test node must have an overseer handle"); let peer_id = network.local_peer_id().clone(); let addr = MultiaddrWithPeerId { multiaddr, peer_id }; PolkadotTestNode { task_manager, client, - overseer_handler, + overseer_handle, addr, rpc_handlers, } @@ -265,19 +265,19 @@ pub fn run_collator_node( client, network, rpc_handlers, - overseer_handler, + overseer_handle, .. } = new_full(config, IsCollator::Yes(collator_pair), None) .expect("could not create Polkadot test service"); - let overseer_handler = overseer_handler.expect("test node must have an overseer handler"); + let overseer_handle = overseer_handle.expect("test node must have an overseer handle"); let peer_id = network.local_peer_id().clone(); let addr = MultiaddrWithPeerId { multiaddr, peer_id }; PolkadotTestNode { task_manager, client, - overseer_handler, + overseer_handle, addr, rpc_handlers, } @@ -289,8 +289,8 @@ pub struct PolkadotTestNode { pub task_manager: TaskManager, /// Client's instance. pub client: Arc<Client>, - /// The overseer handler. - pub overseer_handler: Handle, + /// A handle to Overseer. + pub overseer_handle: Handle, /// The `MultiaddrWithPeerId` to this node. This is useful if you want to pass it as "boot node" to other nodes. pub addr: MultiaddrWithPeerId, /// `RPCHandlers` to make RPC queries. @@ -347,11 +347,11 @@ impl PolkadotTestNode { para_id, }; - self.overseer_handler + self.overseer_handle .send_msg(CollationGenerationMessage::Initialize(config), "Collator") .await; - self.overseer_handler + self.overseer_handle .send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator") .await; } diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index 83a65c447cb..28f34ef5a0a 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -67,9 +67,9 @@ fn main() -> Result<()> { None, polkadot_service::RealOverseerGen, ).map_err(|e| e.to_string())?; - let mut overseer_handler = full_node - .overseer_handler - .expect("Overseer handler should be initialized for collators"); + let mut overseer_handle = full_node + .overseer_handle + .expect("Overseer handle should be initialized for collators"); let genesis_head_hex = format!("0x{:?}", HexDisplay::from(&collator.genesis_head())); @@ -87,11 +87,11 @@ fn main() -> Result<()> { collator: collator.create_collation_function(full_node.task_manager.spawn_handle()), para_id, }; - overseer_handler + overseer_handle .send_msg(CollationGenerationMessage::Initialize(config), "Collator") .await; - overseer_handler + overseer_handle .send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator") .await; -- GitLab