Unverified Commit 2d197804 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

enable disputes (#3478)



* initial integration and migration code

* fix tests

* fix counting test

* assume the current version on missing file

* use SelectRelayChain

* remove duplicate metric

* Update node/service/src/lib.rs

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* remove ApprovalCheckingVotingRule

* address my concern

* never mode for StagnantCheckInterval

* REVERTME: some logs

* w00t

* it's ugly but it works

* Revert "REVERTME: some logs"

This reverts commit e210505a

.

* it's handle, not handler

* fix a few typos

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
parent 6dfdeabf
Pipeline #148996 passed with stages
in 49 minutes and 18 seconds
......@@ -6631,6 +6631,7 @@ dependencies = [
"kv-log-macro",
"lru",
"metered-channel",
"parking_lot 0.11.1",
"polkadot-node-metrics",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
......@@ -6976,6 +6977,9 @@ dependencies = [
"polkadot-node-core-bitfield-signing",
"polkadot-node-core-candidate-validation",
"polkadot-node-core-chain-api",
"polkadot-node-core-chain-selection",
"polkadot-node-core-dispute-coordinator",
"polkadot-node-core-dispute-participation",
"polkadot-node-core-parachains-inherent",
"polkadot-node-core-provisioner",
"polkadot-node-core-runtime-api",
......
......@@ -28,6 +28,7 @@ use polkadot_node_subsystem::{
use kvdb::KeyValueDB;
use parity_scale_codec::Error as CodecError;
use futures::channel::oneshot;
use futures::future::Either;
use futures::prelude::*;
use std::time::{UNIX_EPOCH, Duration,SystemTime};
......@@ -244,7 +245,7 @@ impl Clock for SystemClock {
/// The interval, in seconds to check for stagnant blocks.
#[derive(Debug, Clone)]
pub struct StagnantCheckInterval(Duration);
pub struct StagnantCheckInterval(Option<Duration>);
impl Default for StagnantCheckInterval {
fn default() -> Self {
......@@ -255,28 +256,37 @@ impl Default for StagnantCheckInterval {
// between 2 validators is D + 5s.
const DEFAULT_STAGNANT_CHECK_INTERVAL: Duration = Duration::from_secs(5);
StagnantCheckInterval(DEFAULT_STAGNANT_CHECK_INTERVAL)
StagnantCheckInterval(Some(DEFAULT_STAGNANT_CHECK_INTERVAL))
}
}
impl StagnantCheckInterval {
/// Create a new stagnant-check interval wrapping the given duration.
pub fn new(interval: Duration) -> Self {
StagnantCheckInterval(interval)
StagnantCheckInterval(Some(interval))
}
fn timeout_stream(&self) -> impl Stream<Item = ()> {
let interval = self.0;
let mut delay = futures_timer::Delay::new(interval);
/// Create a `StagnantCheckInterval` which never triggers.
pub fn never() -> Self {
StagnantCheckInterval(None)
}
futures::stream::poll_fn(move |cx| {
let poll = delay.poll_unpin(cx);
if poll.is_ready() {
delay.reset(interval)
}
fn timeout_stream(&self) -> impl Stream<Item = ()> {
match self.0 {
Some(interval) => Either::Left({
let mut delay = futures_timer::Delay::new(interval);
futures::stream::poll_fn(move |cx| {
let poll = delay.poll_unpin(cx);
if poll.is_ready() {
delay.reset(interval)
}
poll.map(Some)
})
poll.map(Some)
})
}),
None => Either::Right(futures::stream::pending()),
}
}
}
......
......@@ -27,7 +27,7 @@ use polkadot_cli::{
create_default_subsystems,
service::{
AuthorityDiscoveryApi, AuxStore, BabeApi, Block, Error, HeaderBackend, Overseer,
OverseerGen, OverseerGenArgs, Handle, ParachainHost, ProvideRuntimeApi,
OverseerGen, OverseerGenArgs, OverseerHandle, ParachainHost, ProvideRuntimeApi,
SpawnNamed,
},
Cli,
......@@ -73,7 +73,7 @@ impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, Handle), Error>
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block> + AuthorityDiscoveryApi<Block>,
......
......@@ -10,6 +10,7 @@ client = { package = "sc-client-api", git = "https://github.com/paritytech/subst
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures = "0.3.15"
futures-timer = "3.0.2"
parking_lot = "0.11.1"
polkadot-node-network-protocol = { path = "../network/protocol" }
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem-types = { path = "../subsystem-types" }
......
......@@ -174,7 +174,7 @@ fn main() {
.replace_candidate_backing(Subsystem1)
;
let (overseer, _handler) = Overseer::new(
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
......
......@@ -123,7 +123,7 @@ impl SpawnNamed for DummySpawner{
struct DummyCtx;
fn main() {
let (overseer, _handler): (Xxx<_>, _) = Xxx::builder()
let (overseer, _handle): (Xxx<_>, _) = Xxx::builder()
.sub0(AwesomeSubSys::default())
.plinkos(GoblinTower::default())
.i_like_pi(::std::f64::consts::PI)
......
......@@ -73,6 +73,7 @@ use futures::{
Future, FutureExt, StreamExt,
};
use lru::LruCache;
use parking_lot::RwLock;
use polkadot_primitives::v1::{Block, BlockId,BlockNumber, Hash, ParachainHost};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
......@@ -159,13 +160,24 @@ impl<Client> HeadSupportsParachains for Arc<Client> where
}
/// A handler used to communicate with the [`Overseer`].
/// A handle used to communicate with the [`Overseer`].
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct Handle(pub OverseerHandle);
pub enum Handle {
/// Used only at initialization to break the cyclic dependency.
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
/// A handle to the overseer.
Connected(OverseerHandle),
}
impl Handle {
/// Create a new disconnected [`Handle`].
pub fn new_disconnected() -> Self {
Self::Disconnected(Arc::new(RwLock::new(None)))
}
/// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self, block: BlockInfo) {
self.send_and_log_error(Event::BlockImported(block)).await
......@@ -207,25 +219,59 @@ impl Handle {
/// Most basic operation, to stop a server.
async fn send_and_log_error(&mut self, event: Event) {
if self.0.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
self.try_connect();
if let Self::Connected(ref mut handle) = self {
if handle.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
} else {
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
}
}
/// Whether the overseer handler is connected to an overseer.
pub fn is_connected(&self) -> bool {
true
/// Whether the handle is disconnected.
pub fn is_disconnected(&self) -> bool {
match self {
Self::Disconnected(ref x) => x.read().is_none(),
_ => false,
}
}
/// Whether the handler is disconnected.
pub fn is_disconnected(&self) -> bool {
false
/// Connect this handle and all disconnected clones of it to the overseer.
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
match self {
Self::Disconnected(ref mut x) => {
let mut maybe_handle = x.write();
if maybe_handle.is_none() {
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
*maybe_handle = Some(handle);
} else {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect a clone of a connected Handle",
);
}
},
_ => {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect an already connected Handle",
);
},
}
}
/// Using this handler, connect another handler to the same
/// overseer, if any.
pub fn connect_other(&self, other: &mut Handle) {
*other = self.clone();
/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
fn try_connect(&mut self) {
if let Self::Disconnected(ref mut x) = self {
let guard = x.write();
if let Some(ref h) = *guard {
let handle = h.clone();
drop(guard);
*self = Self::Connected(handle);
}
}
}
}
......@@ -301,7 +347,7 @@ pub enum ExternalRequest {
/// import and finality notifications into the [`OverseerHandle`].
pub async fn forward_events<P: BlockchainEvents<Block>>(
client: Arc<P>,
mut handler: Handle,
mut handle: Handle,
) {
let mut finality = client.finality_notification_stream();
let mut imports = client.import_notification_stream();
......@@ -311,7 +357,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
f = finality.next() => {
match f {
Some(block) => {
handler.block_finalized(block.into()).await;
handle.block_finalized(block.into()).await;
}
None => break,
}
......@@ -319,7 +365,7 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
i = imports.next() => {
match i {
Some(block) => {
handler.block_imported(block.into()).await;
handle.block_imported(block.into()).await;
}
None => break,
}
......@@ -338,7 +384,6 @@ pub async fn forward_events<P: BlockchainEvents<Block>>(
network=NetworkBridgeEvent<protocol_v1::ValidationProtocol>,
)]
pub struct Overseer<SupportsParachains> {
#[subsystem(no_dispatch, CandidateValidationMessage)]
candidate_validation: CandidateValidation,
......@@ -390,16 +435,16 @@ pub struct Overseer<SupportsParachains> {
#[subsystem(no_dispatch, GossipSupportMessage)]
gossip_support: GossipSupport,
#[subsystem(no_dispatch, wip, DisputeCoordinatorMessage)]
dipute_coordinator: DisputeCoordinator,
#[subsystem(no_dispatch, DisputeCoordinatorMessage)]
dispute_coordinator: DisputeCoordinator,
#[subsystem(no_dispatch, wip, DisputeParticipationMessage)]
#[subsystem(no_dispatch, DisputeParticipationMessage)]
dispute_participation: DisputeParticipation,
#[subsystem(no_dispatch, wip, DisputeDistributionMessage)]
dipute_distribution: DisputeDistribution,
#[subsystem(no_dispatch, DisputeDistributionMessage)]
dispute_distribution: DisputeDistribution,
#[subsystem(no_dispatch, wip, ChainSelectionMessage)]
#[subsystem(no_dispatch, ChainSelectionMessage)]
chain_selection: ChainSelection,
/// External listeners waiting for a hash to be in the active-leave set.
......@@ -436,7 +481,7 @@ where
/// This returns the overseer along with an [`OverseerHandle`] which can
/// be used to send messages from external parts of the codebase.
///
/// The [`OverseerHandler`] returned from this function is connected to
/// The [`OverseerHandle`] returned from this function is connected to
/// the returned [`Overseer`].
///
/// ```text
......@@ -527,7 +572,7 @@ where
/// let spawner = sp_core::testing::TaskExecutor::new();
/// let all_subsystems = AllSubsystems::<()>::dummy()
/// .replace_candidate_validation(ValidationSubsystem);
/// let (overseer, _handler) = Overseer::new(
/// let (overseer, _handle) = Overseer::new(
/// vec![],
/// all_subsystems,
/// None,
......@@ -549,13 +594,13 @@ where
/// # });
/// # }
/// ```
pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>(
pub fn new<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>(
leaves: impl IntoIterator<Item = BlockInfo>,
all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>,
all_subsystems: AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>,
prometheus_registry: Option<&prometheus::Registry>,
supports_parachains: SupportsParachains,
s: S,
) -> SubsystemResult<(Self, Handle)>
) -> SubsystemResult<(Self, OverseerHandle)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>, SubsystemError> + Send,
......@@ -574,11 +619,15 @@ where
ApD: Subsystem<OverseerSubsystemContext<ApprovalDistributionMessage>, SubsystemError> + Send,
ApV: Subsystem<OverseerSubsystemContext<ApprovalVotingMessage>, SubsystemError> + Send,
GS: Subsystem<OverseerSubsystemContext<GossipSupportMessage>, SubsystemError> + Send,
DC: Subsystem<OverseerSubsystemContext<DisputeCoordinatorMessage>, SubsystemError> + Send,
DP: Subsystem<OverseerSubsystemContext<DisputeParticipationMessage>, SubsystemError> + Send,
DD: Subsystem<OverseerSubsystemContext<DisputeDistributionMessage>, SubsystemError> + Send,
CS: Subsystem<OverseerSubsystemContext<ChainSelectionMessage>, SubsystemError> + Send,
S: SpawnNamed,
{
let metrics: Metrics = <Metrics as MetricsTrait>::register(prometheus_registry)?;
let (mut overseer, handler) = Self::builder()
let (mut overseer, handle) = Self::builder()
.candidate_validation(all_subsystems.candidate_validation)
.candidate_backing(all_subsystems.candidate_backing)
.statement_distribution(all_subsystems.statement_distribution)
......@@ -596,6 +645,10 @@ where
.approval_distribution(all_subsystems.approval_distribution)
.approval_voting(all_subsystems.approval_voting)
.gossip_support(all_subsystems.gossip_support)
.dispute_coordinator(all_subsystems.dispute_coordinator)
.dispute_participation(all_subsystems.dispute_participation)
.dispute_distribution(all_subsystems.dispute_distribution)
.chain_selection(all_subsystems.chain_selection)
.leaves(Vec::from_iter(
leaves.into_iter().map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
))
......@@ -647,7 +700,7 @@ where
overseer.spawner().spawn("metrics_metronome", Box::pin(metronome));
}
Ok((overseer, Handle(handler)))
Ok((overseer, handle))
}
/// Stop the overseer.
......
......@@ -77,7 +77,7 @@ where
pub struct AllSubsystems<
CV = (), CB = (), SD = (), AD = (), AR = (), BS = (), BD = (), P = (),
RA = (), AS = (), NB = (), CA = (), CG = (), CP = (), ApD = (), ApV = (),
GS = (),
GS = (), DC = (), DP = (), DD = (), CS = (),
> {
/// A candidate validation subsystem.
pub candidate_validation: CV,
......@@ -113,10 +113,18 @@ pub struct AllSubsystems<
pub approval_voting: ApV,
/// A Connection Request Issuer subsystem.
pub gossip_support: GS,
/// A Dispute Coordinator subsystem.
pub dispute_coordinator: DC,
/// A Dispute Participation subsystem.
pub dispute_participation: DP,
/// A Dispute Distribution subsystem.
pub dispute_distribution: DD,
/// A Chain Selection subsystem.
pub chain_selection: CS,
}
impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>
AllSubsystems<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS, DC, DP, DD, CS>
{
/// Create a new instance of [`AllSubsystems`].
///
......@@ -148,6 +156,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
DummySubsystem,
> {
AllSubsystems {
candidate_validation: DummySubsystem,
......@@ -167,11 +179,15 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
approval_distribution: DummySubsystem,
approval_voting: DummySubsystem,
gossip_support: DummySubsystem,
dispute_coordinator: DummySubsystem,
dispute_participation: DummySubsystem,
dispute_distribution: DummySubsystem,
chain_selection: DummySubsystem,
}
}
/// Reference every individual subsystem.
pub fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS> {
pub fn as_ref(&self) -> AllSubsystems<&'_ CV, &'_ CB, &'_ SD, &'_ AD, &'_ AR, &'_ BS, &'_ BD, &'_ P, &'_ RA, &'_ AS, &'_ NB, &'_ CA, &'_ CG, &'_ CP, &'_ ApD, &'_ ApV, &'_ GS, &'_ DC, &'_ DP, &'_ DD, &'_ CS> {
AllSubsystems {
candidate_validation: &self.candidate_validation,
candidate_backing: &self.candidate_backing,
......@@ -190,6 +206,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
approval_distribution: &self.approval_distribution,
approval_voting: &self.approval_voting,
gossip_support: &self.gossip_support,
dispute_coordinator: &self.dispute_coordinator,
dispute_participation: &self.dispute_participation,
dispute_distribution: &self.dispute_distribution,
chain_selection: &self.chain_selection,
}
}
......@@ -213,6 +233,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
<Mapper as MapSubsystem<ApD>>::Output,
<Mapper as MapSubsystem<ApV>>::Output,
<Mapper as MapSubsystem<GS>>::Output,
<Mapper as MapSubsystem<DC>>::Output,
<Mapper as MapSubsystem<DP>>::Output,
<Mapper as MapSubsystem<DD>>::Output,
<Mapper as MapSubsystem<CS>>::Output,
>
where
Mapper: MapSubsystem<CV>,
......@@ -232,6 +256,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
Mapper: MapSubsystem<ApD>,
Mapper: MapSubsystem<ApV>,
Mapper: MapSubsystem<GS>,
Mapper: MapSubsystem<DC>,
Mapper: MapSubsystem<DP>,
Mapper: MapSubsystem<DD>,
Mapper: MapSubsystem<CS>,
{
AllSubsystems {
candidate_validation: <Mapper as MapSubsystem<CV>>::map_subsystem(&mapper, self.candidate_validation),
......@@ -251,6 +279,10 @@ impl<CV, CB, SD, AD, AR, BS, BD, P, RA, AS, NB, CA, CG, CP, ApD, ApV, GS>
approval_distribution: <Mapper as MapSubsystem<ApD>>::map_subsystem(&mapper, self.approval_distribution),
approval_voting: <Mapper as MapSubsystem<ApV>>::map_subsystem(&mapper, self.approval_voting),
gossip_support: <Mapper as MapSubsystem<GS>>::map_subsystem(&mapper, self.gossip_support),
dispute_coordinator: <Mapper as MapSubsystem<DC>>::map_subsystem(&mapper, self.dispute_coordinator),
dispute_participation: <Mapper as MapSubsystem<DP>>::map_subsystem(&mapper, self.dispute_participation),
dispute_distribution: <Mapper as MapSubsystem<DD>>::map_subsystem(&mapper, self.dispute_distribution),
chain_selection: <Mapper as MapSubsystem<CS>>::map_subsystem(&mapper, self.chain_selection),
}
}
}
......@@ -18,6 +18,7 @@ use std::sync::atomic;
use std::collections::HashMap;
use std::task::{Poll};
use futures::{executor, pin_mut, select, FutureExt, pending, poll, stream};
use futures::channel::mpsc;
use polkadot_primitives::v1::{CollatorPair, CandidateHash};
use polkadot_node_primitives::{CollationResult, CollationGenerationConfig, PoV, BlockData};
......@@ -166,13 +167,14 @@ fn overseer_works() {
.replace_candidate_validation(TestSubsystem1(s1_tx))
.replace_candidate_backing(TestSubsystem2(s2_tx));
let (overseer, mut handler) = Overseer::new(
let (overseer, handle) = Overseer::new(
vec![],
all_subsystems,
None,
MockSupportsParachains,
spawner,
).unwrap();
let mut handle = Handle::Connected(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
......@@ -188,7 +190,7 @@ fn overseer_works() {
Some(msg) => {
s1_results.push(msg);
if s1_results.len() == 10 {
handler.stop().await;
handle.stop().await;
}
}
None => break,
......@@ -236,21 +238,22 @@ fn overseer_metrics_work() {
let all_subsystems = AllSubsystems::<()>::dummy();
let registry = prometheus::Registry::new();
let (overseer, mut handler) = Overseer::new(
let (overseer, handle) = Overseer::new(
vec![first_block],
all_subsystems,
Some(&registry),
MockSupportsParachains,
spawner,
).unwrap();
let mut handle = Handle::Connected(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
handler.block_imported(second_block).await;
handler.block_imported(third_block).await;
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.stop().await;
handle.block_imported(second_block).await;
handle.block_imported(third_block).await;
handle.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handle.stop().await;
select! {
res = overseer_fut => {
......@@ -398,13 +401,14 @@ fn overseer_start_stop_works() {
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem5(tx_5))
.replace_candidate_backing(TestSubsystem6(tx_6));
let (overseer, mut handler) = Overseer::new(
let (overseer, handle) = Overseer::new(
vec![first_block],
all_subsystems,
None,
MockSupportsParachains,
spawner,
).unwrap();
let mut handle = Handle::Connected(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
......@@ -412,8 +416,8 @@ fn overseer_start_stop_works() {
let mut ss5_results = Vec::new();
let mut ss6_results = Vec::new();
handler.block_imported(second_block).await;
handler.block_imported(third_block).await;
handle.block_imported(second_block).await;
handle.block_imported(third_block).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(ActivatedLeaf {
......@@ -463,7 +467,7 @@ fn overseer_start_stop_works() {
if ss5_results.len() == expected_heartbeats.len() &&
ss6_results.len() == expected_heartbeats.len() {
handler.stop().await;
handle.stop().await;
}
}
......@@ -507,13 +511,14 @@ fn overseer_finalize_works() {
.replace_candidate_backing(TestSubsystem6(tx_6));
// start with two forks of different height.
let (overseer, mut handler) = Overseer::new(
let (overseer, handle) = Overseer::new(
vec![first_block, second_block],
all_subsystems,
None,
MockSupportsParachains,
spawner,
).unwrap();
let mut handle = Handle::Connected(handle);
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
......@@ -522,7 +527,7 @@ fn overseer_finalize_works() {
let mut ss6_results = Vec::new();
// this should stop work on both forks we started with earlier.
handler.block_finalized(third_block).await;
handle.block_finalized(third_block).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
......@@ -569,7 +574,7 @@ fn overseer_finalize_works() {
}
if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() {
handler.stop().await;
handle.stop().await;
}
}
......@@ -607,21 +612,22 @@ fn do_not_send_empty_leaves_update_on_block_finalization() {
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_backing(TestSubsystem6(tx_5));
let (overseer, mut handler) = Overseer::new(
let (overseer, handle) = Overseer::new(
Vec::new(),
all_subsystems,
None,
MockSupportsParachains,
spawner,
).unwrap();
let mut handle = Handle::Connected(handle);
let overseer_fut = overseer.run().fuse();