Unverified Commit 7a1e581c authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Integrate Approval Voting into Overseer / Service / GRANDPA (#2412)

* integrate approval voting into overseer

* expose public API and make keystore arc

* integrate overseer in service

* guide: `ApprovedAncestor` returns block number

* return block number along with hash from ApprovedAncestor

* introduce a voting rule for reporting on approval checking

* integrate the delay voting rule

* Rococo configuration

* fix compilation and add slack

* fix web-wasm build

* tweak parameterization

* migrate voting rules to asycn

* remove hack comment
parent 69b1058d
Pipeline #124261 passed with stages
in 33 minutes and 18 seconds
......@@ -5851,6 +5851,7 @@ dependencies = [
"polkadot-collator-protocol",
"polkadot-network-bridge",
"polkadot-node-collation-generation",
"polkadot-node-core-approval-voting",
"polkadot-node-core-av-store",
"polkadot-node-core-backing",
"polkadot-node-core-bitfield-signing",
......@@ -5883,6 +5884,7 @@ dependencies = [
"sc-executor",
"sc-finality-grandpa",
"sc-finality-grandpa-warp-sync",
"sc-keystore",
"sc-network",
"sc-service",
"sc-telemetry",
......
......@@ -76,11 +76,23 @@ const LOG_TARGET: &str = "approval_voting";
/// The approval voting subsystem.
pub struct ApprovalVotingSubsystem<T> {
keystore: LocalKeystore,
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: Arc<T>,
}
impl<T> ApprovalVotingSubsystem<T> {
/// Create a new approval voting subsystem with the given keystore, slot duration,
/// and underlying DB.
pub fn new(keystore: Arc<LocalKeystore>, slot_duration_millis: u64, db: Arc<T>) -> Self {
ApprovalVotingSubsystem {
keystore,
slot_duration_millis,
db,
}
}
}
impl<T, C> Subsystem<C> for ApprovalVotingSubsystem<T>
where T: AuxStore + Send + Sync + 'static, C: SubsystemContext<Message = ApprovalVotingMessage> {
fn start(self, ctx: C) -> SpawnedSubsystem {
......@@ -229,7 +241,7 @@ use approval_db_v1_reader::ApprovalDBV1Reader;
struct State<T> {
session_window: import::RollingSessionWindow,
keystore: LocalKeystore,
keystore: Arc<LocalKeystore>,
slot_duration_millis: u64,
db: T,
clock: Box<dyn Clock + Send + Sync>,
......@@ -529,10 +541,10 @@ async fn handle_approved_ancestor(
db: &impl DBReader,
target: Hash,
lower_bound: BlockNumber,
) -> SubsystemResult<Option<Hash>> {
) -> SubsystemResult<Option<(Hash, BlockNumber)>> {
let mut all_approved_max = None;
let block_number = {
let target_number = {
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::BlockNumber(target, tx).into()).await;
......@@ -544,17 +556,17 @@ async fn handle_approved_ancestor(
}
};
if block_number <= lower_bound { return Ok(None) }
if target_number <= lower_bound { return Ok(None) }
// request ancestors up to but not including the lower bound,
// as a vote on the lower bound is implied if we cannot find
// anything else.
let ancestry = if block_number > lower_bound + 1 {
let ancestry = if target_number > lower_bound + 1 {
let (tx, rx) = oneshot::channel();
ctx.send_message(ChainApiMessage::Ancestors {
hash: target,
k: (block_number - (lower_bound + 1)) as usize,
k: (target_number - (lower_bound + 1)) as usize,
response_channel: tx,
}.into()).await;
......@@ -566,7 +578,7 @@ async fn handle_approved_ancestor(
Vec::new()
};
for block_hash in std::iter::once(target).chain(ancestry) {
for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() {
// Block entries should be present as the assumption is that
// nothing here is finalized. If we encounter any missing block
// entries we can fail.
......@@ -577,7 +589,9 @@ async fn handle_approved_ancestor(
if entry.is_fully_approved() {
if all_approved_max.is_none() {
all_approved_max = Some(block_hash);
// First iteration of the loop is target, i = 0. After that,
// ancestry is moving backwards.
all_approved_max = Some((block_hash, target_number - i as BlockNumber));
}
} else {
all_approved_max = None;
......
......@@ -184,7 +184,7 @@ impl DBReader for TestStore {
fn blank_state() -> State<TestStore> {
State {
session_window: import::RollingSessionWindow::default(),
keystore: LocalKeystore::in_memory(),
keystore: Arc::new(LocalKeystore::in_memory()),
slot_duration_millis: SLOT_DURATION_MILLIS,
db: TestStore::default(),
clock: Box::new(MockClock::default()),
......@@ -1490,7 +1490,7 @@ fn approved_ancestor_all_approved() {
let test_fut = Box::pin(async move {
assert_eq!(
handle_approved_ancestor(&mut ctx, &state.db, block_hash_4, 0).await.unwrap(),
Some(block_hash_4),
Some((block_hash_4, 4)),
)
});
......@@ -1572,7 +1572,7 @@ fn approved_ancestor_missing_approval() {
let test_fut = Box::pin(async move {
assert_eq!(
handle_approved_ancestor(&mut ctx, &state.db, block_hash_4, 0).await.unwrap(),
Some(block_hash_2),
Some((block_hash_2, 2)),
)
});
......
This diff is collapsed.
......@@ -19,6 +19,7 @@ sc-executor = { git = "https://github.com/paritytech/substrate", branch = "maste
sc-finality-grandpa-warp-sync = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-transaction-pool = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "master" }
......@@ -95,6 +96,7 @@ polkadot-node-core-runtime-api = { path = "../core/runtime-api", optional = true
polkadot-pov-distribution = { path = "../network/pov-distribution", optional = true }
polkadot-statement-distribution = { path = "../network/statement-distribution", optional = true }
polkadot-approval-distribution = { path = "../network/approval-distribution", optional = true }
polkadot-node-core-approval-voting = { path = "../core/approval-voting", optional = true }
[dev-dependencies]
polkadot-test-client = { path = "../test/client" }
......@@ -132,4 +134,5 @@ real-overseer = [
"polkadot-pov-distribution",
"polkadot-statement-distribution",
"polkadot-approval-distribution",
"polkadot-node-core-approval-voting",
]
......@@ -870,7 +870,6 @@ fn rococo_staging_testnet_config_genesis(wasm_binary: &[u8]) -> rococo_runtime::
group_rotation_frequency: 20,
chain_availability_period: 4,
thread_availability_period: 4,
no_show_slots: 10,
max_upward_queue_count: 8,
max_upward_queue_size: 8 * 1024,
max_downward_message_size: 1024,
......@@ -893,6 +892,11 @@ fn rococo_staging_testnet_config_genesis(wasm_binary: &[u8]) -> rococo_runtime::
hrmp_max_parachain_outbound_channels: 4,
hrmp_max_parathread_outbound_channels: 4,
hrmp_max_message_num_per_candidate: 5,
no_show_slots: 2,
n_delay_tranches: 25,
needed_approvals: 2,
relay_vrf_modulo_samples: 10,
zeroth_delay_tranche_width: 0,
..Default::default()
},
}),
......@@ -1376,7 +1380,6 @@ pub fn rococo_testnet_genesis(
group_rotation_frequency: 20,
chain_availability_period: 4,
thread_availability_period: 4,
no_show_slots: 10,
max_upward_queue_count: 8,
max_upward_queue_size: 8 * 1024,
max_downward_message_size: 1024,
......@@ -1399,6 +1402,11 @@ pub fn rococo_testnet_genesis(
hrmp_max_parachain_outbound_channels: 4,
hrmp_max_parathread_outbound_channels: 4,
hrmp_max_message_num_per_candidate: 5,
no_show_slots: 2,
n_delay_tranches: 25,
needed_approvals: 2,
relay_vrf_modulo_samples: 10,
zeroth_delay_tranche_width: 0,
..Default::default()
},
}),
......
......@@ -18,9 +18,138 @@
use std::sync::Arc;
#[cfg(feature = "full-node")]
use polkadot_primitives::v1::Hash;
use polkadot_primitives::v1::{Block as PolkadotBlock, Header as PolkadotHeader, BlockNumber, Hash};
use polkadot_subsystem::messages::ApprovalVotingMessage;
use sp_runtime::traits::{Block as BlockT, NumberFor};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::Header as _;
use prometheus_endpoint::{self, Registry};
use polkadot_overseer::OverseerHandler;
use futures::channel::oneshot;
/// A custom GRANDPA voting rule that acts as a diagnostic for the approval
/// voting subsystem's desired votes.
///
/// The practical effect of this voting rule is to implement a fixed delay of
/// blocks and to issue a prometheus metric on the lag behind the head that
/// approval checking would indicate.
#[cfg(feature = "full-node")]
#[derive(Clone)]
pub(crate) struct ApprovalCheckingDiagnostic {
checking_lag: Option<prometheus_endpoint::Histogram>,
overseer: OverseerHandler,
}
#[cfg(feature = "full-node")]
impl ApprovalCheckingDiagnostic {
/// Create a new approval checking diagnostic voting rule.
pub fn new(overseer: OverseerHandler, registry: Option<&Registry>)
-> Result<Self, prometheus_endpoint::PrometheusError>
{
Ok(ApprovalCheckingDiagnostic {
checking_lag: if let Some(registry) = registry {
Some(prometheus_endpoint::register(
prometheus_endpoint::Histogram::with_opts(
prometheus_endpoint::HistogramOpts::new(
"approval_checking_finality_lag",
"How far behind the head of the chain the Approval Checking protocol wants to vote",
).buckets(vec![1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0])
)?,
registry,
)?)
} else {
None
},
overseer,
})
}
}
#[cfg(feature = "full-node")]
impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingDiagnostic
where B: sp_blockchain::HeaderBackend<PolkadotBlock>
{
fn restrict_vote(
&self,
backend: Arc<B>,
base: &PolkadotHeader,
_best_target: &PolkadotHeader,
current_target: &PolkadotHeader,
) -> grandpa::VotingRuleResult<PolkadotBlock> {
// always wait 50 blocks behind the head to finalize.
const DIAGNOSTIC_GRANDPA_DELAY: BlockNumber = 50;
let aux = || {
let find_target = |target_number: BlockNumber, current_header: &PolkadotHeader| {
let mut target_hash = current_header.hash();
let mut target_header = current_header.clone();
loop {
if *target_header.number() < target_number {
unreachable!(
"we are traversing backwards from a known block; \
blocks are stored contiguously; \
qed"
);
}
if *target_header.number() == target_number {
return Some((target_hash, target_number));
}
target_hash = *target_header.parent_hash();
target_header = backend.header(BlockId::Hash(target_hash)).ok()?
.expect("Header known to exist due to the existence of one of its descendents; qed");
}
};
let target_number = std::cmp::max(
current_target.number().saturating_sub(DIAGNOSTIC_GRANDPA_DELAY),
base.number().clone(),
);
find_target(target_number, current_target)
};
let actual_vote_target = aux();
// Query approval checking and issue metrics.
let mut overseer = self.overseer.clone();
let checking_lag = self.checking_lag.clone();
let current_hash = current_target.hash();
let current_number = current_target.number.clone();
let base_number = base.number;
Box::pin(async move {
let (tx, rx) = oneshot::channel();
let approval_checking_subsystem_vote = {
overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor(
current_hash,
base_number,
tx,
)).await;
rx.await.ok().and_then(|v| v)
};
let approval_checking_subsystem_lag = approval_checking_subsystem_vote.map_or(
current_number - base_number,
|(_h, n)| current_number - n,
);
if let Some(ref checking_lag) = checking_lag {
checking_lag.observe(approval_checking_subsystem_lag as _);
}
actual_vote_target
})
}
}
/// A custom GRANDPA voting rule that "pauses" voting (i.e. keeps voting for the
/// same last finalized block) after a given block at height `N` has been
......@@ -42,9 +171,6 @@ where
current_target: &Block::Header,
) -> grandpa::VotingRuleResult<Block> {
let aux = || {
use sp_runtime::generic::BlockId;
use sp_runtime::traits::Header as _;
// walk backwards until we find the target block
let find_target = |target_number: NumberFor<Block>, current_header: &Block::Header| {
let mut target_hash = current_header.hash();
......
......@@ -22,7 +22,6 @@ pub mod chain_spec;
mod grandpa_support;
mod client;
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
#[cfg(feature = "full-node")]
use {
std::convert::TryInto,
......@@ -35,10 +34,11 @@ use {
polkadot_primitives::v1::ParachainHost,
sc_authority_discovery::Service as AuthorityDiscoveryService,
sp_blockchain::HeaderBackend,
sp_keystore::SyncCryptoStorePtr,
sp_trie::PrefixedMemoryDB,
sc_client_api::ExecutorProvider,
sc_client_api::{AuxStore, ExecutorProvider},
sc_keystore::LocalKeystore,
babe_primitives::BabeApi,
grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider},
};
#[cfg(feature = "real-overseer")]
use polkadot_network_bridge::RequestMultiplexer;
......@@ -348,7 +348,7 @@ fn new_partial<RuntimeApi, Executor>(config: &mut Configuration, jaeger_agent: O
#[cfg(all(feature="full-node", not(feature = "real-overseer")))]
fn real_overseer<Spawner, RuntimeClient>(
leaves: impl IntoIterator<Item = BlockInfo>,
_: SyncCryptoStorePtr,
_: Arc<LocalKeystore>,
_: Arc<RuntimeClient>,
_: AvailabilityConfig,
_: Arc<sc_network::NetworkService<Block, Hash>>,
......@@ -361,8 +361,8 @@ fn real_overseer<Spawner, RuntimeClient>(
_: u64,
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block>,
RuntimeClient::Api: ParachainHost<Block>,
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
Overseer::new(
......@@ -376,7 +376,7 @@ where
#[cfg(all(feature = "full-node", feature = "real-overseer"))]
fn real_overseer<Spawner, RuntimeClient>(
leaves: impl IntoIterator<Item = BlockInfo>,
keystore: SyncCryptoStorePtr,
keystore: Arc<LocalKeystore>,
runtime_client: Arc<RuntimeClient>,
availability_config: AvailabilityConfig,
network_service: Arc<sc_network::NetworkService<Block, Hash>>,
......@@ -386,10 +386,10 @@ fn real_overseer<Spawner, RuntimeClient>(
spawner: Spawner,
is_collator: IsCollator,
isolation_strategy: IsolationStrategy,
_slot_duration: u64, // TODO [now]: instantiate approval voting.
slot_duration: u64,
) -> Result<(Overseer<Spawner>, OverseerHandler), Error>
where
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block>,
RuntimeClient: 'static + ProvideRuntimeApi<Block> + HeaderBackend<Block> + AuxStore,
RuntimeClient::Api: ParachainHost<Block> + BabeApi<Block>,
Spawner: 'static + SpawnNamed + Clone + Unpin,
{
......@@ -412,6 +412,7 @@ where
use polkadot_statement_distribution::StatementDistribution as StatementDistributionSubsystem;
use polkadot_availability_recovery::AvailabilityRecoverySubsystem;
use polkadot_approval_distribution::ApprovalDistribution as ApprovalDistributionSubsystem;
use polkadot_node_core_approval_voting::ApprovalVotingSubsystem;
let all_subsystems = AllSubsystems {
availability_distribution: AvailabilityDistributionSubsystem::new(
......@@ -477,7 +478,7 @@ where
Metrics::register(registry)?,
),
runtime_api: RuntimeApiSubsystem::new(
runtime_client,
runtime_client.clone(),
Metrics::register(registry)?,
spawner.clone(),
),
......@@ -487,6 +488,11 @@ where
approval_distribution: ApprovalDistributionSubsystem::new(
Metrics::register(registry)?,
),
approval_voting: ApprovalVotingSubsystem::new(
keystore.clone(),
slot_duration,
runtime_client.clone(),
),
};
Overseer::new(
......@@ -563,7 +569,12 @@ pub fn new_full<RuntimeApi, Executor>(
let role = config.role.clone();
let force_authoring = config.force_authoring;
let backoff_authoring_blocks =
Some(sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging::default());
Some(sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging {
#[cfg(feature = "real-overseer")]
unfinalized_slack: 100,
..Default::default()
});
let disable_grandpa = config.disable_grandpa;
let name = config.network.node_name.clone();
......@@ -705,10 +716,18 @@ pub fn new_full<RuntimeApi, Executor>(
// we'd say let overseer_handler = authority_discovery_service.map(|authority_discovery_service|, ...),
// but in that case we couldn't use ? to propagate errors
let overseer_handler = if let Some(authority_discovery_service) = authority_discovery_service {
let local_keystore = keystore_container.local_keystore();
if local_keystore.is_none() {
tracing::info!("Cannot run as validator without local keystore.");
}
let maybe_params = local_keystore
.and_then(move |k| authority_discovery_service.map(|a| (a, k)));
let overseer_handler = if let Some((authority_discovery_service, keystore)) = maybe_params {
let (overseer, overseer_handler) = real_overseer(
leaves,
keystore_container.sync_keystore(),
keystore,
overseer_client.clone(),
availability_config,
network.clone(),
......@@ -803,6 +822,17 @@ pub fn new_full<RuntimeApi, Executor>(
// add a custom voting rule to temporarily stop voting for new blocks
// after the given pause block is finalized and restarting after the
// given delay.
let builder = grandpa::VotingRulesBuilder::default();
let builder = if let Some(ref overseer) = overseer_handler {
builder.add(grandpa_support::ApprovalCheckingDiagnostic::new(
overseer.clone(),
prometheus_registry.as_ref(),
)?)
} else {
builder
};
let voting_rule = match grandpa_pause {
Some((block, delay)) => {
info!(
......@@ -813,11 +843,11 @@ pub fn new_full<RuntimeApi, Executor>(
delay,
);
grandpa::VotingRulesBuilder::default()
builder
.add(grandpa_support::PauseAfterBlockFor(block, delay))
.build()
}
None => grandpa::VotingRulesBuilder::default().build(),
None => builder.build(),
};
let grandpa_config = grandpa::GrandpaParams {
......
......@@ -661,7 +661,7 @@ pub enum ApprovalVotingMessage {
///
/// It can also return the same block hash, if that is acceptable to vote upon.
/// Return `None` if the input hash is unrecognized.
ApprovedAncestor(Hash, BlockNumber, oneshot::Sender<Option<Hash>>),
ApprovedAncestor(Hash, BlockNumber, oneshot::Sender<Option<(Hash, BlockNumber)>>),
}
/// Message to the Approval Distribution subsystem.
......
......@@ -218,9 +218,9 @@ On receiving a `CheckAndImportApproval(indirect_approval_vote, response_channel)
On receiving an `ApprovedAncestor(Hash, BlockNumber, response_channel)`:
* Iterate over the ancestry of the hash all the way back to block number given, starting from the provided block hash.
* Keep track of an `all_approved_max: Option<Hash>`.
* Keep track of an `all_approved_max: Option<(Hash, BlockNumber)>`.
* For each block hash encountered, load the `BlockEntry` associated. If any are not found, return `None` on the response channel and conclude.
* If the block entry's `approval_bitfield` has all bits set to 1 and `all_approved_max == None`, set `all_approved_max = Some(current_hash)`.
* If the block entry's `approval_bitfield` has all bits set to 1 and `all_approved_max == None`, set `all_approved_max = Some((current_hash, current_number))`.
* If the block entry's `approval_bitfield` has any 0 bits, set `all_approved_max = None`.
* After iterating all ancestry, return `all_approved_max`.
......
......@@ -81,7 +81,7 @@ enum ApprovalVotingMessage {
///
/// It can also return the same block hash, if that is acceptable to vote upon.
/// Return `None` if the input hash is unrecognized.
ApprovedAncestor(Hash, BlockNumber, ResponseChannel<Option<Hash>>),
ApprovedAncestor(Hash, BlockNumber, ResponseChannel<Option<(Hash, BlockNumber)>>),
}
```
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment