...@@ -100,6 +100,13 @@ pub struct Config { ...@@ -100,6 +100,13 @@ pub struct Config {
pub prep_worker_path: PathBuf, pub prep_worker_path: PathBuf,
/// Path to the execution worker binary /// Path to the execution worker binary
pub exec_worker_path: PathBuf, pub exec_worker_path: PathBuf,
/// The maximum number of pvf execution workers.
pub pvf_execute_workers_max_num: usize,
/// The maximum number of pvf workers that can be spawned in the pvf prepare pool for tasks
/// with the priority below critical.
pub pvf_prepare_workers_soft_max_num: usize,
/// The absolute number of pvf workers that can be spawned in the pvf prepare pool.
pub pvf_prepare_workers_hard_max_num: usize,
} }
/// The candidate validation subsystem. /// The candidate validation subsystem.
...@@ -224,6 +231,9 @@ async fn run<Context>( ...@@ -224,6 +231,9 @@ async fn run<Context>(
secure_validator_mode, secure_validator_mode,
prep_worker_path, prep_worker_path,
exec_worker_path, exec_worker_path,
pvf_execute_workers_max_num,
pvf_prepare_workers_soft_max_num,
pvf_prepare_workers_hard_max_num,
}: Config, }: Config,
) -> SubsystemResult<()> { ) -> SubsystemResult<()> {
let (validation_host, task) = polkadot_node_core_pvf::start( let (validation_host, task) = polkadot_node_core_pvf::start(
...@@ -233,6 +243,9 @@ async fn run<Context>( ...@@ -233,6 +243,9 @@ async fn run<Context>(
secure_validator_mode, secure_validator_mode,
prep_worker_path, prep_worker_path,
exec_worker_path, exec_worker_path,
pvf_execute_workers_max_num,
pvf_prepare_workers_soft_max_num,
pvf_prepare_workers_hard_max_num,
), ),
pvf_metrics, pvf_metrics,
) )
......
...@@ -55,13 +55,13 @@ use polkadot_primitives::{ ...@@ -55,13 +55,13 @@ use polkadot_primitives::{
use crate::{ use crate::{
error::{FatalError, FatalResult, JfyiError, JfyiErrorResult, Result}, error::{FatalError, FatalResult, JfyiError, JfyiErrorResult, Result},
fragment_tree::{ fragment_chain::{
CandidateStorage, CandidateStorageInsertionError, FragmentTree, Scope as TreeScope, CandidateStorage, CandidateStorageInsertionError, FragmentTree, Scope as TreeScope,
}, },
}; };
mod error; mod error;
mod fragment_tree; mod fragment_chain;
#[cfg(test)] #[cfg(test)]
mod tests; mod tests;
...@@ -349,7 +349,7 @@ fn prune_view_candidate_storage(view: &mut View, metrics: &Metrics) { ...@@ -349,7 +349,7 @@ fn prune_view_candidate_storage(view: &mut View, metrics: &Metrics) {
struct ImportablePendingAvailability { struct ImportablePendingAvailability {
candidate: CommittedCandidateReceipt, candidate: CommittedCandidateReceipt,
persisted_validation_data: PersistedValidationData, persisted_validation_data: PersistedValidationData,
compact: crate::fragment_tree::PendingAvailability, compact: crate::fragment_chain::PendingAvailability,
} }
#[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)] #[overseer::contextbounds(ProspectiveParachains, prefix = self::overseer)]
...@@ -394,7 +394,7 @@ async fn preprocess_candidates_pending_availability<Context>( ...@@ -394,7 +394,7 @@ async fn preprocess_candidates_pending_availability<Context>(
relay_parent_number: relay_parent.number, relay_parent_number: relay_parent.number,
relay_parent_storage_root: relay_parent.storage_root, relay_parent_storage_root: relay_parent.storage_root,
}, },
compact: crate::fragment_tree::PendingAvailability { compact: crate::fragment_chain::PendingAvailability {
candidate_hash: pending.candidate_hash, candidate_hash: pending.candidate_hash,
relay_parent, relay_parent,
}, },
...@@ -675,7 +675,7 @@ fn answer_hypothetical_frontier_request( ...@@ -675,7 +675,7 @@ fn answer_hypothetical_frontier_request(
let candidate_hash = c.candidate_hash(); let candidate_hash = c.candidate_hash();
let hypothetical = match c { let hypothetical = match c {
HypotheticalCandidate::Complete { receipt, persisted_validation_data, .. } => HypotheticalCandidate::Complete { receipt, persisted_validation_data, .. } =>
fragment_tree::HypotheticalCandidate::Complete { fragment_chain::HypotheticalCandidate::Complete {
receipt: Cow::Borrowed(receipt), receipt: Cow::Borrowed(receipt),
persisted_validation_data: Cow::Borrowed(persisted_validation_data), persisted_validation_data: Cow::Borrowed(persisted_validation_data),
}, },
...@@ -683,7 +683,7 @@ fn answer_hypothetical_frontier_request( ...@@ -683,7 +683,7 @@ fn answer_hypothetical_frontier_request(
parent_head_data_hash, parent_head_data_hash,
candidate_relay_parent, candidate_relay_parent,
.. ..
} => fragment_tree::HypotheticalCandidate::Incomplete { } => fragment_chain::HypotheticalCandidate::Incomplete {
relay_parent: *candidate_relay_parent, relay_parent: *candidate_relay_parent,
parent_head_data_hash: *parent_head_data_hash, parent_head_data_hash: *parent_head_data_hash,
}, },
......
...@@ -48,6 +48,9 @@ impl TestHost { ...@@ -48,6 +48,9 @@ impl TestHost {
false, false,
prepare_worker_path, prepare_worker_path,
execute_worker_path, execute_worker_path,
2,
1,
2,
); );
f(&mut config); f(&mut config);
let (host, task) = start(config, Metrics::default()).await.unwrap(); let (host, task) = start(config, Metrics::default()).await.unwrap();
......
...@@ -58,7 +58,7 @@ use crate::{host::PrecheckResultSender, worker_interface::WORKER_DIR_PREFIX}; ...@@ -58,7 +58,7 @@ use crate::{host::PrecheckResultSender, worker_interface::WORKER_DIR_PREFIX};
use always_assert::always; use always_assert::always;
use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData}; use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData};
use polkadot_parachain_primitives::primitives::ValidationCodeHash; use polkadot_parachain_primitives::primitives::ValidationCodeHash;
use polkadot_primitives::ExecutorParamsHash; use polkadot_primitives::ExecutorParamsPrepHash;
use std::{ use std::{
collections::HashMap, collections::HashMap,
fs, fs,
...@@ -85,22 +85,27 @@ pub fn generate_artifact_path(cache_path: &Path) -> PathBuf { ...@@ -85,22 +85,27 @@ pub fn generate_artifact_path(cache_path: &Path) -> PathBuf {
artifact_path artifact_path
} }
/// Identifier of an artifact. Encodes a code hash of the PVF and a hash of executor parameter set. /// Identifier of an artifact. Encodes a code hash of the PVF and a hash of preparation-related
/// executor parameter set.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ArtifactId { pub struct ArtifactId {
pub(crate) code_hash: ValidationCodeHash, pub(crate) code_hash: ValidationCodeHash,
pub(crate) executor_params_hash: ExecutorParamsHash, pub(crate) executor_params_prep_hash: ExecutorParamsPrepHash,
} }
impl ArtifactId { impl ArtifactId {
/// Creates a new artifact ID with the given hash. /// Creates a new artifact ID with the given hash.
pub fn new(code_hash: ValidationCodeHash, executor_params_hash: ExecutorParamsHash) -> Self { pub fn new(
Self { code_hash, executor_params_hash } code_hash: ValidationCodeHash,
executor_params_prep_hash: ExecutorParamsPrepHash,
) -> Self {
Self { code_hash, executor_params_prep_hash }
} }
/// Returns an artifact ID that corresponds to the PVF with given executor params. /// Returns an artifact ID that corresponds to the PVF with given preparation-related
/// executor parameters.
pub fn from_pvf_prep_data(pvf: &PvfPrepData) -> Self { pub fn from_pvf_prep_data(pvf: &PvfPrepData) -> Self {
Self::new(pvf.code_hash(), pvf.executor_params().hash()) Self::new(pvf.code_hash(), pvf.executor_params().prep_hash())
} }
} }
......
...@@ -562,6 +562,9 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) { ...@@ -562,6 +562,9 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
thus claim_idle cannot return None; thus claim_idle cannot return None;
qed.", qed.",
); );
queue
.metrics
.observe_execution_queued_time(job.waiting_since.elapsed().as_millis() as u32);
let execution_timer = queue.metrics.time_execution(); let execution_timer = queue.metrics.time_execution();
queue.mux.push( queue.mux.push(
async move { async move {
......
...@@ -188,6 +188,9 @@ impl Config { ...@@ -188,6 +188,9 @@ impl Config {
secure_validator_mode: bool, secure_validator_mode: bool,
prepare_worker_program_path: PathBuf, prepare_worker_program_path: PathBuf,
execute_worker_program_path: PathBuf, execute_worker_program_path: PathBuf,
execute_workers_max_num: usize,
prepare_workers_soft_max_num: usize,
prepare_workers_hard_max_num: usize,
) -> Self { ) -> Self {
Self { Self {
cache_path, cache_path,
...@@ -196,12 +199,12 @@ impl Config { ...@@ -196,12 +199,12 @@ impl Config {
prepare_worker_program_path, prepare_worker_program_path,
prepare_worker_spawn_timeout: Duration::from_secs(3), prepare_worker_spawn_timeout: Duration::from_secs(3),
prepare_workers_soft_max_num: 1, prepare_workers_soft_max_num,
prepare_workers_hard_max_num: 2, prepare_workers_hard_max_num,
execute_worker_program_path, execute_worker_program_path,
execute_worker_spawn_timeout: Duration::from_secs(3), execute_worker_spawn_timeout: Duration::from_secs(3),
execute_workers_max_num: 2, execute_workers_max_num,
} }
} }
} }
......
...@@ -74,6 +74,12 @@ impl Metrics { ...@@ -74,6 +74,12 @@ impl Metrics {
self.0.as_ref().map(|metrics| metrics.execution_time.start_timer()) self.0.as_ref().map(|metrics| metrics.execution_time.start_timer())
} }
pub(crate) fn observe_execution_queued_time(&self, queued_for_millis: u32) {
self.0.as_ref().map(|metrics| {
metrics.execution_queued_time.observe(queued_for_millis as f64 / 1000 as f64)
});
}
/// Observe memory stats for preparation. /// Observe memory stats for preparation.
#[allow(unused_variables)] #[allow(unused_variables)]
pub(crate) fn observe_preparation_memory_metrics(&self, memory_stats: MemoryStats) { pub(crate) fn observe_preparation_memory_metrics(&self, memory_stats: MemoryStats) {
...@@ -112,6 +118,7 @@ struct MetricsInner { ...@@ -112,6 +118,7 @@ struct MetricsInner {
execute_finished: prometheus::Counter<prometheus::U64>, execute_finished: prometheus::Counter<prometheus::U64>,
preparation_time: prometheus::Histogram, preparation_time: prometheus::Histogram,
execution_time: prometheus::Histogram, execution_time: prometheus::Histogram,
execution_queued_time: prometheus::Histogram,
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
preparation_max_rss: prometheus::Histogram, preparation_max_rss: prometheus::Histogram,
// Max. allocated memory, tracked by Jemallocator, polling-based // Max. allocated memory, tracked by Jemallocator, polling-based
...@@ -240,6 +247,31 @@ impl metrics::Metrics for Metrics { ...@@ -240,6 +247,31 @@ impl metrics::Metrics for Metrics {
)?, )?,
registry, registry,
)?, )?,
execution_queued_time: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"polkadot_pvf_execution_queued_time",
"Time spent in queue waiting for PVFs execution job to be assigned",
).buckets(vec![
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1.0,
2.0,
3.0,
4.0,
5.0,
6.0,
12.0,
24.0,
48.0,
]),
)?,
registry,
)?,
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
preparation_max_rss: prometheus::register( preparation_max_rss: prometheus::register(
prometheus::Histogram::with_opts( prometheus::Histogram::with_opts(
......
...@@ -26,7 +26,7 @@ use polkadot_node_core_pvf::{ ...@@ -26,7 +26,7 @@ use polkadot_node_core_pvf::{
ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR, ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
}; };
use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult}; use polkadot_parachain_primitives::primitives::{BlockData, ValidationParams, ValidationResult};
use polkadot_primitives::{ExecutorParam, ExecutorParams}; use polkadot_primitives::{ExecutorParam, ExecutorParams, PvfExecKind, PvfPrepKind};
use std::{io::Write, time::Duration}; use std::{io::Write, time::Duration};
use tokio::sync::Mutex; use tokio::sync::Mutex;
...@@ -63,6 +63,9 @@ impl TestHost { ...@@ -63,6 +63,9 @@ impl TestHost {
false, false,
prepare_worker_path, prepare_worker_path,
execute_worker_path, execute_worker_path,
2,
1,
2,
); );
f(&mut config); f(&mut config);
let (host, task) = start(config, Metrics::default()).await.unwrap(); let (host, task) = start(config, Metrics::default()).await.unwrap();
...@@ -556,3 +559,73 @@ async fn nonexistent_cache_dir() { ...@@ -556,3 +559,73 @@ async fn nonexistent_cache_dir() {
.await .await
.unwrap(); .unwrap();
} }
// Checks the the artifact is not re-prepared when the executor environment parameters change
// in a way not affecting the preparation
#[tokio::test]
async fn artifact_does_not_reprepare_on_non_meaningful_exec_parameter_change() {
let host = TestHost::new_with_config(|cfg| {
cfg.prepare_workers_hard_max_num = 1;
})
.await;
let cache_dir = host.cache_dir.path();
let set1 = ExecutorParams::default();
let set2 =
ExecutorParams::from(&[ExecutorParam::PvfExecTimeout(PvfExecKind::Backing, 2500)][..]);
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set1).await.unwrap();
let md1 = {
let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
assert_eq!(cache_dir.len(), 2);
let mut artifact_path = cache_dir.pop().unwrap().unwrap();
if artifact_path.path().is_dir() {
artifact_path = cache_dir.pop().unwrap().unwrap();
}
std::fs::metadata(artifact_path.path()).unwrap()
};
// FS times are not monotonical so we wait 2 secs here to be sure that the creation time of the
// second attifact will be different
tokio::time::sleep(Duration::from_secs(2)).await;
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set2).await.unwrap();
let md2 = {
let mut cache_dir: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
assert_eq!(cache_dir.len(), 2);
let mut artifact_path = cache_dir.pop().unwrap().unwrap();
if artifact_path.path().is_dir() {
artifact_path = cache_dir.pop().unwrap().unwrap();
}
std::fs::metadata(artifact_path.path()).unwrap()
};
assert_eq!(md1.created().unwrap(), md2.created().unwrap());
}
// Checks if the artifact is re-prepared if the re-preparation is needed by the nature of
// the execution environment parameters change
#[tokio::test]
async fn artifact_does_reprepare_on_meaningful_exec_parameter_change() {
let host = TestHost::new_with_config(|cfg| {
cfg.prepare_workers_hard_max_num = 1;
})
.await;
let cache_dir = host.cache_dir.path();
let set1 = ExecutorParams::default();
let set2 =
ExecutorParams::from(&[ExecutorParam::PvfPrepTimeout(PvfPrepKind::Prepare, 60000)][..]);
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set1).await.unwrap();
let cache_dir_contents: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
assert_eq!(cache_dir_contents.len(), 2);
let _stats = host.precheck_pvf(halt::wasm_binary_unwrap(), set2).await.unwrap();
let cache_dir_contents: Vec<_> = std::fs::read_dir(cache_dir).unwrap().collect();
assert_eq!(cache_dir_contents.len(), 3); // new artifact has been added
}
...@@ -826,7 +826,16 @@ pub(crate) fn handle_deactivate_leaves(state: &mut State, leaves: &[Hash]) { ...@@ -826,7 +826,16 @@ pub(crate) fn handle_deactivate_leaves(state: &mut State, leaves: &[Hash]) {
// clean up sessions based on everything remaining. // clean up sessions based on everything remaining.
let sessions: HashSet<_> = state.per_relay_parent.values().map(|r| r.session).collect(); let sessions: HashSet<_> = state.per_relay_parent.values().map(|r| r.session).collect();
state.per_session.retain(|s, _| sessions.contains(s)); state.per_session.retain(|s, _| sessions.contains(s));
state.unused_topologies.retain(|s, _| sessions.contains(s));
let last_session_index = state.unused_topologies.keys().max().copied();
// Do not clean-up the last saved toplogy unless we moved to the next session
// This is needed because handle_deactive_leaves, gets also called when
// prospective_parachains APIs are not present, so we would actually remove
// the topology without using it because `per_relay_parent` is empty until
// prospective_parachains gets enabled
state
.unused_topologies
.retain(|s, _| sessions.contains(s) || last_session_index == Some(*s));
} }
#[overseer::contextbounds(StatementDistribution, prefix=self::overseer)] #[overseer::contextbounds(StatementDistribution, prefix=self::overseer)]
......
...@@ -509,6 +509,12 @@ async fn setup_test_and_connect_peers( ...@@ -509,6 +509,12 @@ async fn setup_test_and_connect_peers(
// Send gossip topology and activate leaf. // Send gossip topology and activate leaf.
if send_topology_before_leaf { if send_topology_before_leaf {
send_new_topology(overseer, state.make_dummy_topology()).await; send_new_topology(overseer, state.make_dummy_topology()).await;
// Send cleaning up of a leaf to make sure it does not clear the save topology as well.
overseer
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::stop_work(Hash::random()),
)))
.await;
activate_leaf(overseer, &test_leaf, &state, true, vec![]).await; activate_leaf(overseer, &test_leaf, &state, true, vec![]).await;
} else { } else {
activate_leaf(overseer, &test_leaf, &state, true, vec![]).await; activate_leaf(overseer, &test_leaf, &state, true, vec![]).await;
......
...@@ -37,7 +37,8 @@ ...@@ -37,7 +37,8 @@
"/dns/dot14.rotko.net/tcp/33214/p2p/12D3KooWPyEvPEXghnMC67Gff6PuZiSvfx3fmziKiPZcGStZ5xff", "/dns/dot14.rotko.net/tcp/33214/p2p/12D3KooWPyEvPEXghnMC67Gff6PuZiSvfx3fmziKiPZcGStZ5xff",
"/dns/ibp-boot-polkadot.luckyfriday.io/tcp/30333/p2p/12D3KooWEjk6QXrZJ26fLpaajisJGHiz6WiQsR8k7mkM9GmWKnRZ", "/dns/ibp-boot-polkadot.luckyfriday.io/tcp/30333/p2p/12D3KooWEjk6QXrZJ26fLpaajisJGHiz6WiQsR8k7mkM9GmWKnRZ",
"/dns/ibp-boot-polkadot.luckyfriday.io/tcp/30334/wss/p2p/12D3KooWEjk6QXrZJ26fLpaajisJGHiz6WiQsR8k7mkM9GmWKnRZ", "/dns/ibp-boot-polkadot.luckyfriday.io/tcp/30334/wss/p2p/12D3KooWEjk6QXrZJ26fLpaajisJGHiz6WiQsR8k7mkM9GmWKnRZ",
"/dns/boot-polkadot.luckyfriday.io/tcp/443/wss/p2p/12D3KooWAdyiVAaeGdtBt6vn5zVetwA4z4qfm9Fi2QCSykN1wTBJ" "/dns/boot-polkadot.luckyfriday.io/tcp/443/wss/p2p/12D3KooWAdyiVAaeGdtBt6vn5zVetwA4z4qfm9Fi2QCSykN1wTBJ",
"/dns4/polkadot-0.boot.onfinality.io/tcp/24446/ws/p2p/12D3KooWT1PWaNdAwYrSr89dvStnoGdH3t4LNRbcVNN4JCtsotkR"
], ],
"telemetryEndpoints": [ "telemetryEndpoints": [
[ [
...@@ -643,6 +643,13 @@ pub struct NewFullParams<OverseerGenerator: OverseerGen> { ...@@ -643,6 +643,13 @@ pub struct NewFullParams<OverseerGenerator: OverseerGen> {
pub workers_path: Option<std::path::PathBuf>, pub workers_path: Option<std::path::PathBuf>,
/// Optional custom names for the prepare and execute workers. /// Optional custom names for the prepare and execute workers.
pub workers_names: Option<(String, String)>, pub workers_names: Option<(String, String)>,
/// An optional number of the maximum number of pvf execute workers.
pub execute_workers_max_num: Option<usize>,
/// An optional maximum number of pvf workers that can be spawned in the pvf prepare pool for
/// tasks with the priority below critical.
pub prepare_workers_soft_max_num: Option<usize>,
/// An optional absolute number of pvf workers that can be spawned in the pvf prepare pool.
pub prepare_workers_hard_max_num: Option<usize>,
pub overseer_gen: OverseerGenerator, pub overseer_gen: OverseerGenerator,
pub overseer_message_channel_capacity_override: Option<usize>, pub overseer_message_channel_capacity_override: Option<usize>,
#[allow(dead_code)] #[allow(dead_code)]
...@@ -738,6 +745,9 @@ pub fn new_full< ...@@ -738,6 +745,9 @@ pub fn new_full<
overseer_message_channel_capacity_override, overseer_message_channel_capacity_override,
malus_finality_delay: _malus_finality_delay, malus_finality_delay: _malus_finality_delay,
hwbench, hwbench,
execute_workers_max_num,
prepare_workers_soft_max_num,
prepare_workers_hard_max_num,
}: NewFullParams<OverseerGenerator>, }: NewFullParams<OverseerGenerator>,
) -> Result<NewFull, Error> { ) -> Result<NewFull, Error> {
use polkadot_node_network_protocol::request_response::IncomingRequest; use polkadot_node_network_protocol::request_response::IncomingRequest;
...@@ -943,6 +953,16 @@ pub fn new_full< ...@@ -943,6 +953,16 @@ pub fn new_full<
secure_validator_mode, secure_validator_mode,
prep_worker_path, prep_worker_path,
exec_worker_path, exec_worker_path,
pvf_execute_workers_max_num: execute_workers_max_num.unwrap_or_else(
|| match config.chain_spec.identify_chain() {
// The intention is to use this logic for gradual increasing from 2 to 4
// of this configuration chain by chain untill it reaches production chain.
Chain::Polkadot | Chain::Kusama => 2,
Chain::Rococo | Chain::Westend | Chain::Unknown => 4,
},
),
pvf_prepare_workers_soft_max_num: prepare_workers_soft_max_num.unwrap_or(1),
pvf_prepare_workers_hard_max_num: prepare_workers_hard_max_num.unwrap_or(2),
}) })
} else { } else {
None None
......
...@@ -161,6 +161,13 @@ impl ResourceUsage { ...@@ -161,6 +161,13 @@ impl ResourceUsage {
for (resource_name, values) in by_name { for (resource_name, values) in by_name {
let total = values.iter().map(|v| v.total).sum::<f64>() / values.len() as f64; let total = values.iter().map(|v| v.total).sum::<f64>() / values.len() as f64;
let per_block = values.iter().map(|v| v.per_block).sum::<f64>() / values.len() as f64; let per_block = values.iter().map(|v| v.per_block).sum::<f64>() / values.len() as f64;
let per_block_sd =
standard_deviation(&values.iter().map(|v| v.per_block).collect::<Vec<f64>>());
println!(
"[{}] standart_deviation {:.2}%",
resource_name,
per_block_sd / per_block * 100.0
);
average.push(Self { resource_name, total, per_block }); average.push(Self { resource_name, total, per_block });
} }
average average
...@@ -179,3 +186,11 @@ pub struct ChartItem { ...@@ -179,3 +186,11 @@ pub struct ChartItem {
pub unit: String, pub unit: String,
pub value: f64, pub value: f64,
} }
fn standard_deviation(values: &[f64]) -> f64 {
let n = values.len() as f64;
let mean = values.iter().sum::<f64>() / n;
let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / (n - 1.0);
variance.sqrt()
}
...@@ -97,6 +97,9 @@ pub fn new_full<OverseerGenerator: OverseerGen>( ...@@ -97,6 +97,9 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
overseer_message_channel_capacity_override: None, overseer_message_channel_capacity_override: None,
malus_finality_delay: None, malus_finality_delay: None,
hwbench: None, hwbench: None,
execute_workers_max_num: None,
prepare_workers_hard_max_num: None,
prepare_workers_soft_max_num: None,
}, },
), ),
sc_network::config::NetworkBackendType::Litep2p => sc_network::config::NetworkBackendType::Litep2p =>
...@@ -116,6 +119,9 @@ pub fn new_full<OverseerGenerator: OverseerGen>( ...@@ -116,6 +119,9 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
overseer_message_channel_capacity_override: None, overseer_message_channel_capacity_override: None,
malus_finality_delay: None, malus_finality_delay: None,
hwbench: None, hwbench: None,
execute_workers_max_num: None,
prepare_workers_hard_max_num: None,
prepare_workers_soft_max_num: None,
}, },
), ),
} }
......
...@@ -95,6 +95,9 @@ fn main() -> Result<()> { ...@@ -95,6 +95,9 @@ fn main() -> Result<()> {
overseer_message_channel_capacity_override: None, overseer_message_channel_capacity_override: None,
malus_finality_delay: None, malus_finality_delay: None,
hwbench: None, hwbench: None,
execute_workers_max_num: None,
prepare_workers_hard_max_num: None,
prepare_workers_soft_max_num: None,
}, },
) )
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
......
...@@ -97,6 +97,9 @@ fn main() -> Result<()> { ...@@ -97,6 +97,9 @@ fn main() -> Result<()> {
overseer_message_channel_capacity_override: None, overseer_message_channel_capacity_override: None,
malus_finality_delay: None, malus_finality_delay: None,
hwbench: None, hwbench: None,
execute_workers_max_num: None,
prepare_workers_hard_max_num: None,
prepare_workers_soft_max_num: None,
}, },
) )
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
......
...@@ -44,7 +44,7 @@ pub use v7::{ ...@@ -44,7 +44,7 @@ pub use v7::{
CandidateReceipt, CheckedDisputeStatementSet, CheckedMultiDisputeStatementSet, CollatorId, CandidateReceipt, CheckedDisputeStatementSet, CheckedMultiDisputeStatementSet, CollatorId,
CollatorSignature, CommittedCandidateReceipt, CompactStatement, ConsensusLog, CoreIndex, CollatorSignature, CommittedCandidateReceipt, CompactStatement, ConsensusLog, CoreIndex,
CoreState, DisputeState, DisputeStatement, DisputeStatementSet, DownwardMessage, EncodeAs, CoreState, DisputeState, DisputeStatement, DisputeStatementSet, DownwardMessage, EncodeAs,
ExecutorParam, ExecutorParamError, ExecutorParams, ExecutorParamsHash, ExecutorParam, ExecutorParamError, ExecutorParams, ExecutorParamsHash, ExecutorParamsPrepHash,
ExplicitDisputeStatement, GroupIndex, GroupRotationInfo, Hash, HashT, HeadData, Header, ExplicitDisputeStatement, GroupIndex, GroupRotationInfo, Hash, HashT, HeadData, Header,
HorizontalMessages, HrmpChannelId, Id, InboundDownwardMessage, InboundHrmpMessage, IndexedVec, HorizontalMessages, HrmpChannelId, Id, InboundDownwardMessage, InboundHrmpMessage, IndexedVec,
InherentData, InvalidDisputeStatementKind, Moment, MultiDisputeStatementSet, NodeFeatures, InherentData, InvalidDisputeStatementKind, Moment, MultiDisputeStatementSet, NodeFeatures,
......
...@@ -152,13 +152,42 @@ impl sp_std::fmt::LowerHex for ExecutorParamsHash { ...@@ -152,13 +152,42 @@ impl sp_std::fmt::LowerHex for ExecutorParamsHash {
} }
} }
/// Unit type wrapper around [`type@Hash`] that represents a hash of preparation-related
/// executor parameters.
///
/// This type is produced by [`ExecutorParams::prep_hash`].
#[derive(Clone, Copy, Encode, Decode, Hash, Eq, PartialEq, PartialOrd, Ord, TypeInfo)]
pub struct ExecutorParamsPrepHash(Hash);
impl sp_std::fmt::Display for ExecutorParamsPrepHash {
fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result {
self.0.fmt(f)
}
}
impl sp_std::fmt::Debug for ExecutorParamsPrepHash {
fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
impl sp_std::fmt::LowerHex for ExecutorParamsPrepHash {
fn fmt(&self, f: &mut sp_std::fmt::Formatter<'_>) -> sp_std::fmt::Result {
sp_std::fmt::LowerHex::fmt(&self.0, f)
}
}
/// # Deterministically serialized execution environment semantics /// # Deterministically serialized execution environment semantics
/// Represents an arbitrary semantics of an arbitrary execution environment, so should be kept as /// Represents an arbitrary semantics of an arbitrary execution environment, so should be kept as
/// abstract as possible. /// abstract as possible.
//
// ADR: For mandatory entries, mandatoriness should be enforced in code rather than separating them // ADR: For mandatory entries, mandatoriness should be enforced in code rather than separating them
// into individual fields of the structure. Thus, complex migrations shall be avoided when adding // into individual fields of the structure. Thus, complex migrations shall be avoided when adding
// new entries and removing old ones. At the moment, there's no mandatory parameters defined. If // new entries and removing old ones. At the moment, there's no mandatory parameters defined. If
// they show up, they must be clearly documented as mandatory ones. // they show up, they must be clearly documented as mandatory ones.
//
// !!! Any new parameter that does not affect the prepared artifact must be added to the exclusion
// !!! list in `prep_hash()` to avoid unneccessary artifact rebuilds.
#[derive( #[derive(
Clone, Debug, Default, Encode, Decode, PartialEq, Eq, TypeInfo, Serialize, Deserialize, Clone, Debug, Default, Encode, Decode, PartialEq, Eq, TypeInfo, Serialize, Deserialize,
)] )]
...@@ -175,6 +204,28 @@ impl ExecutorParams { ...@@ -175,6 +204,28 @@ impl ExecutorParams {
ExecutorParamsHash(BlakeTwo256::hash(&self.encode())) ExecutorParamsHash(BlakeTwo256::hash(&self.encode()))
} }
/// Returns hash of preparation-related executor parameters
pub fn prep_hash(&self) -> ExecutorParamsPrepHash {
use ExecutorParam::*;
let mut enc = b"prep".to_vec();
self.0
.iter()
.flat_map(|param| match param {
MaxMemoryPages(..) => None,
StackLogicalMax(..) => Some(param),
StackNativeMax(..) => None,
PrecheckingMaxMemory(..) => None,
PvfPrepTimeout(..) => Some(param),
PvfExecTimeout(..) => None,
WasmExtBulkMemory => Some(param),
})
.for_each(|p| enc.extend(p.encode()));
ExecutorParamsPrepHash(BlakeTwo256::hash(&enc))
}
/// Returns a PVF preparation timeout, if any /// Returns a PVF preparation timeout, if any
pub fn pvf_prep_timeout(&self, kind: PvfPrepKind) -> Option<Duration> { pub fn pvf_prep_timeout(&self, kind: PvfPrepKind) -> Option<Duration> {
for param in &self.0 { for param in &self.0 {
...@@ -336,3 +387,51 @@ impl From<&[ExecutorParam]> for ExecutorParams { ...@@ -336,3 +387,51 @@ impl From<&[ExecutorParam]> for ExecutorParams {
ExecutorParams(arr.to_vec()) ExecutorParams(arr.to_vec())
} }
} }
// This test ensures the hash generated by `prep_hash()` changes if any preparation-related
// executor parameter changes. If you're adding a new executor parameter, you must add it into
// this test, and if changing that parameter may not affect the artifact produced on the
// preparation step, it must be added to the list of exlusions in `pre_hash()` as well.
// See also `prep_hash()` comments.
#[test]
fn ensure_prep_hash_changes() {
use ExecutorParam::*;
let ep = ExecutorParams::from(
&[
MaxMemoryPages(0),
StackLogicalMax(0),
StackNativeMax(0),
PrecheckingMaxMemory(0),
PvfPrepTimeout(PvfPrepKind::Precheck, 0),
PvfPrepTimeout(PvfPrepKind::Prepare, 0),
PvfExecTimeout(PvfExecKind::Backing, 0),
PvfExecTimeout(PvfExecKind::Approval, 0),
WasmExtBulkMemory,
][..],
);
for p in ep.iter() {
let (ep1, ep2) = match p {
MaxMemoryPages(_) => continue,
StackLogicalMax(_) => (
ExecutorParams::from(&[StackLogicalMax(1)][..]),
ExecutorParams::from(&[StackLogicalMax(2)][..]),
),
StackNativeMax(_) => continue,
PrecheckingMaxMemory(_) => continue,
PvfPrepTimeout(PvfPrepKind::Precheck, _) => (
ExecutorParams::from(&[PvfPrepTimeout(PvfPrepKind::Precheck, 1)][..]),
ExecutorParams::from(&[PvfPrepTimeout(PvfPrepKind::Precheck, 2)][..]),
),
PvfPrepTimeout(PvfPrepKind::Prepare, _) => (
ExecutorParams::from(&[PvfPrepTimeout(PvfPrepKind::Prepare, 1)][..]),
ExecutorParams::from(&[PvfPrepTimeout(PvfPrepKind::Prepare, 2)][..]),
),
PvfExecTimeout(_, _) => continue,
WasmExtBulkMemory =>
(ExecutorParams::default(), ExecutorParams::from(&[WasmExtBulkMemory][..])),
};
assert_ne!(ep1.prep_hash(), ep2.prep_hash());
}
}