"]
edition = "2018"
@@ -14,17 +14,17 @@ log = "0.4.8"
futures = "0.3.4"
tokio = { version = "0.2.13", features = ["rt-core"] }
exit-future = "0.2.0"
-codec = { package = "parity-scale-codec", version = "1.3.0", features = ["derive"] }
+codec = { package = "parity-scale-codec", version = "1.3.4", features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "master" }
-client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
+client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master", version = "2.0.0-rc5" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
-kvdb = "0.6.0"
-kvdb-memorydb = "0.6.0"
+kvdb = "0.7.0"
+kvdb-memorydb = "0.7.0"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
-kvdb-rocksdb = "0.8.0"
+kvdb-rocksdb = "0.9.0"
diff --git a/availability-store/src/lib.rs b/availability-store/src/lib.rs
index 8e9feea687e185e1197469990c5091267d9ba0b2..4b973c7d027114101e1e4b0510ffe565ded5ea69 100644
--- a/availability-store/src/lib.rs
+++ b/availability-store/src/lib.rs
@@ -23,22 +23,21 @@
#![warn(missing_docs)]
use futures::prelude::*;
-use futures::{channel::{mpsc, oneshot}, task::Spawn};
+use futures::channel::{mpsc, oneshot};
use keystore::KeyStorePtr;
-use polkadot_primitives::{
+use polkadot_primitives::v0::{
Hash, Block,
- parachain::{
- PoVBlock, AbridgedCandidateReceipt, ErasureChunk,
- ParachainHost, AvailableData, OmittedValidationData,
- },
+ PoVBlock, AbridgedCandidateReceipt, ErasureChunk,
+ ParachainHost, AvailableData, OmittedValidationData,
};
use sp_runtime::traits::HashFor;
-use sp_blockchain::{Result as ClientResult};
+use sp_blockchain::Result as ClientResult;
use client::{
BlockchainEvents, BlockBackend,
};
use sp_api::{ApiExt, ProvideRuntimeApi};
use codec::{Encode, Decode};
+use sp_core::traits::SpawnNamed;
use log::warn;
@@ -58,7 +57,7 @@ use worker::{
Worker, WorkerHandle, IncludedParachainBlocks, WorkerMsg, MakeAvailable, Chunks
};
-use store::{Store as InnerStore};
+use store::Store as InnerStore;
const LOG_TARGET: &str = "availability";
@@ -174,7 +173,7 @@ impl Store {
&self,
wrapped_block_import: I,
client: Arc,
- spawner: impl Spawn,
+ spawner: impl SpawnNamed,
keystore: KeyStorePtr,
) -> ClientResult>
where
diff --git a/availability-store/src/store.rs b/availability-store/src/store.rs
index e3b1e35929795211130d09e84454c28a34faeea4..cd76de5d44adb9ebddf28d16db69fe9e2b15b8b2 100644
--- a/availability-store/src/store.rs
+++ b/availability-store/src/store.rs
@@ -18,12 +18,9 @@
use kvdb_rocksdb::{Database, DatabaseConfig};
use kvdb::{KeyValueDB, DBTransaction};
use codec::{Encode, Decode};
-use polkadot_erasure_coding::{self as erasure};
-use polkadot_primitives::{
- Hash,
- parachain::{
- ErasureChunk, AvailableData, AbridgedCandidateReceipt,
- },
+use polkadot_erasure_coding as erasure;
+use polkadot_primitives::v0::{
+ Hash, ErasureChunk, AvailableData, AbridgedCandidateReceipt,
};
use parking_lot::Mutex;
@@ -273,7 +270,7 @@ impl Store {
// If there are no block data in the store at this point,
// check that they can be reconstructed now and add them to store if they can.
if self.execution_data(&candidate_hash).is_none() {
- if let Ok(available_data) = erasure::reconstruct(
+ if let Ok(available_data) = erasure::reconstruct_v0(
n_validators as usize,
v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize)),
)
@@ -390,7 +387,7 @@ impl Store {
mod tests {
use super::*;
use polkadot_erasure_coding::{self as erasure};
- use polkadot_primitives::parachain::{
+ use polkadot_primitives::v0::{
Id as ParaId, BlockData, AvailableData, PoVBlock, OmittedValidationData,
};
@@ -489,7 +486,7 @@ mod tests {
let available_data = available_data(&[42; 8]);
let n_validators = 5;
- let erasure_chunks = erasure::obtain_chunks(
+ let erasure_chunks = erasure::obtain_chunks_v0(
n_validators,
&available_data,
).unwrap();
diff --git a/availability-store/src/worker.rs b/availability-store/src/worker.rs
index 0ff59a9fca38377bd30c1fd405184f4796cce7f1..a7cf7ec41dae3a01aa0c83e4ff7c17938dcfb233 100644
--- a/availability-store/src/worker.rs
+++ b/availability-store/src/worker.rs
@@ -20,7 +20,7 @@ use std::sync::Arc;
use std::thread;
use log::{error, info, trace, warn};
-use sp_blockchain::{Result as ClientResult};
+use sp_blockchain::Result as ClientResult;
use sp_runtime::traits::{Header as HeaderT, Block as BlockT, HashFor, BlakeTwo256};
use sp_api::{ApiExt, ProvideRuntimeApi};
use client::{
@@ -32,12 +32,13 @@ use consensus_common::{
ImportResult,
import_queue::CacheKeyId,
};
-use polkadot_primitives::{Block, BlockId, Hash};
-use polkadot_primitives::parachain::{
+use sp_core::traits::SpawnNamed;
+use polkadot_primitives::v0::{
+ Block, BlockId, Hash,
ParachainHost, ValidatorId, AbridgedCandidateReceipt, AvailableData,
ValidatorPair, ErasureChunk,
};
-use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
+use futures::{prelude::*, future::select, channel::{mpsc, oneshot}};
use futures::future::AbortHandle;
use keystore::KeyStorePtr;
@@ -531,7 +532,7 @@ impl Worker {
}
}
-/// Implementor of the [`BlockImport`] trait.
+/// Implementer of the [`BlockImport`] trait.
///
/// Used to embed `availability-store` logic into the block imporing pipeline.
///
@@ -641,7 +642,7 @@ impl AvailabilityBlockImport {
pub(crate) fn new(
client: Arc ,
block_import: I,
- spawner: impl Spawn,
+ spawner: impl SpawnNamed,
keystore: KeyStorePtr,
to_worker: mpsc::UnboundedSender,
) -> Self
@@ -662,9 +663,7 @@ impl AvailabilityBlockImport {
to_worker.clone(),
));
- if let Err(_) = spawner.spawn(prune_available.map(drop)) {
- error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
- }
+ spawner.spawn("polkadot-prune-availibility", prune_available.map(drop).boxed());
AvailabilityBlockImport {
client,
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index 5dcaadd57b08fe54f870c1cb20bfa79d95bea2b2..99eb04e9dd6377a9286bf3cf918d796ad9ea36e1 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "polkadot-cli"
-version = "0.8.11"
+version = "0.8.22"
authors = ["Parity Technologies "]
description = "Polkadot Relay-chain Client Node"
edition = "2018"
@@ -34,12 +34,15 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master
wasm-bindgen = { version = "0.2.57", optional = true }
wasm-bindgen-futures = { version = "0.4.7", optional = true }
browser-utils = { package = "substrate-browser-utils", git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
+# this crate is used only to enable `trie-memory-tracker` feature
+# see https://github.com/paritytech/substrate/pull/6745
+sp-trie = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
[build-dependencies]
substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" }
[features]
-default = [ "wasmtime", "db", "cli", "service-old" ]
+default = [ "wasmtime", "db", "cli", "service-old", "trie-memory-tracker" ]
wasmtime = [ "sc-cli/wasmtime" ]
db = [ "service/db" ]
cli = [
@@ -57,3 +60,4 @@ browser = [
]
runtime-benchmarks = [ "service/runtime-benchmarks" ]
service-rewr = [ "service-new/full-node" ]
+trie-memory-tracker = [ "sp-trie/memory-tracker" ]
diff --git a/cli/src/browser.rs b/cli/src/browser.rs
index 6f3a4000843ae1c2394681f32d8789469bf6cba2..d3523e92a60005ff305963cf7bff516e1a67341a 100644
--- a/cli/src/browser.rs
+++ b/cli/src/browser.rs
@@ -46,8 +46,7 @@ async fn start_inner(chain_spec: String, log_level: String) -> Result,
- #[allow(missing_docs)]
#[structopt(flatten)]
pub run: RunCmd,
}
diff --git a/cli/src/command.rs b/cli/src/command.rs
index 5e9d78382833682be0b267afd93e9255ba6fd3f3..4952bdf9c13782fd7de8808445e9b3f69114cdf8 100644
--- a/cli/src/command.rs
+++ b/cli/src/command.rs
@@ -19,8 +19,7 @@ use log::info;
use service::{IdentifyVariant, self};
#[cfg(feature = "service-rewr")]
use service_new::{IdentifyVariant, self as service};
-use sc_executor::NativeExecutionDispatch;
-use sc_cli::{SubstrateCli, Result};
+use sc_cli::{SubstrateCli, Result, RuntimeVersion, Role};
use crate::cli::{Cli, Subcommand};
fn get_exec_name() -> Option {
@@ -31,19 +30,19 @@ fn get_exec_name() -> Option {
}
impl SubstrateCli for Cli {
- fn impl_name() -> &'static str { "Parity Polkadot" }
+ fn impl_name() -> String { "Parity Polkadot".into() }
- fn impl_version() -> &'static str { env!("SUBSTRATE_CLI_IMPL_VERSION") }
+ fn impl_version() -> String { env!("SUBSTRATE_CLI_IMPL_VERSION").into() }
- fn description() -> &'static str { env!("CARGO_PKG_DESCRIPTION") }
+ fn description() -> String { env!("CARGO_PKG_DESCRIPTION").into() }
- fn author() -> &'static str { env!("CARGO_PKG_AUTHORS") }
+ fn author() -> String { env!("CARGO_PKG_AUTHORS").into() }
- fn support_url() -> &'static str { "https://github.com/paritytech/polkadot/issues/new" }
+ fn support_url() -> String { "https://github.com/paritytech/polkadot/issues/new".into() }
fn copyright_start_year() -> i32 { 2017 }
- fn executable_name() -> &'static str { "polkadot" }
+ fn executable_name() -> String { "polkadot".into() }
fn load_spec(&self, id: &str) -> std::result::Result, String> {
let id = if id == "" {
@@ -54,27 +53,47 @@ impl SubstrateCli for Cli {
.unwrap_or("polkadot")
} else { id };
Ok(match id {
- "polkadot-dev" | "dev" => Box::new(service::chain_spec::polkadot_development_config()),
- "polkadot-local" => Box::new(service::chain_spec::polkadot_local_testnet_config()),
- "polkadot-staging" => Box::new(service::chain_spec::polkadot_staging_testnet_config()),
- "kusama-dev" => Box::new(service::chain_spec::kusama_development_config()),
- "kusama-local" => Box::new(service::chain_spec::kusama_local_testnet_config()),
- "kusama-staging" => Box::new(service::chain_spec::kusama_staging_testnet_config()),
+ "polkadot-dev" | "dev" => Box::new(service::chain_spec::polkadot_development_config()?),
+ "polkadot-local" => Box::new(service::chain_spec::polkadot_local_testnet_config()?),
+ "polkadot-staging" => Box::new(service::chain_spec::polkadot_staging_testnet_config()?),
+ "kusama-dev" => Box::new(service::chain_spec::kusama_development_config()?),
+ "kusama-local" => Box::new(service::chain_spec::kusama_local_testnet_config()?),
+ "kusama-staging" => Box::new(service::chain_spec::kusama_staging_testnet_config()?),
"polkadot" => Box::new(service::chain_spec::polkadot_config()?),
"westend" => Box::new(service::chain_spec::westend_config()?),
"kusama" => Box::new(service::chain_spec::kusama_config()?),
- "westend-dev" => Box::new(service::chain_spec::westend_development_config()),
- "westend-local" => Box::new(service::chain_spec::westend_local_testnet_config()),
- "westend-staging" => Box::new(service::chain_spec::westend_staging_testnet_config()),
- path if self.run.force_kusama => {
- Box::new(service::KusamaChainSpec::from_json_file(std::path::PathBuf::from(path))?)
+ "westend-dev" => Box::new(service::chain_spec::westend_development_config()?),
+ "westend-local" => Box::new(service::chain_spec::westend_local_testnet_config()?),
+ "westend-staging" => Box::new(service::chain_spec::westend_staging_testnet_config()?),
+ path => {
+ let path = std::path::PathBuf::from(path);
+
+ let starts_with = |prefix: &str| {
+ path.file_name().map(|f| f.to_str().map(|s| s.starts_with(&prefix))).flatten().unwrap_or(false)
+ };
+
+ // When `force_*` is given or the file name starts with the name of one of the known chains,
+ // we use the chain spec for the specific chain.
+ if self.run.force_kusama || starts_with("kusama") {
+ Box::new(service::KusamaChainSpec::from_json_file(path)?)
+ } else if self.run.force_westend || starts_with("westend") {
+ Box::new(service::WestendChainSpec::from_json_file(path)?)
+ } else {
+ Box::new(service::PolkadotChainSpec::from_json_file(path)?)
+ }
},
- path if self.run.force_westend => {
- Box::new(service::WestendChainSpec::from_json_file(std::path::PathBuf::from(path))?)
- },
- path => Box::new(service::PolkadotChainSpec::from_json_file(std::path::PathBuf::from(path))?),
})
}
+
+ fn native_runtime_version(spec: &Box) -> &'static RuntimeVersion {
+ if spec.is_kusama() {
+ &service::kusama_runtime::VERSION
+ } else if spec.is_westend() {
+ &service::westend_runtime::VERSION
+ } else {
+ &service::polkadot_runtime::VERSION
+ }
+ }
}
/// Parses polkadot specific CLI arguments and run the service.
@@ -97,8 +116,8 @@ pub fn run() -> Result<()> {
match &cli.subcommand {
None => {
- let runtime = cli.create_runner(&cli.run.base)?;
- let chain_spec = &runtime.config().chain_spec;
+ let runner = cli.create_runner(&cli.run.base)?;
+ let chain_spec = &runner.config().chain_spec;
set_default_ss58_version(chain_spec);
@@ -115,90 +134,49 @@ pub fn run() -> Result<()> {
info!(" endorsed by the ");
info!(" KUSAMA FOUNDATION ");
info!("----------------------------");
-
- runtime.run_node(
- |config| {
- service::kusama_new_light(config)
- },
- |config| {
- service::kusama_new_full(
- config,
- None,
- None,
- authority_discovery_enabled,
- 6000,
- grandpa_pause,
- None,
- ).map(|(s, _, _)| s)
- },
- service::KusamaExecutor::native_version().runtime_version
- )
- } else if chain_spec.is_westend() {
- runtime.run_node(
- |config| {
- service::westend_new_light(config)
- },
- |config| {
- service::westend_new_full(
- config,
- None,
- None,
- authority_discovery_enabled,
- 6000,
- grandpa_pause,
- None,
- ).map(|(s, _, _)| s)
- },
- service::WestendExecutor::native_version().runtime_version
- )
- } else {
- runtime.run_node(
- |config| {
- service::polkadot_new_light(config)
- },
- |config| {
- service::polkadot_new_full(
- config,
- None,
- None,
- authority_discovery_enabled,
- 6000,
- grandpa_pause,
- None,
- ).map(|(s, _, _)| s)
- },
- service::PolkadotExecutor::native_version().runtime_version
- )
}
+
+ runner.run_node_until_exit(|config| {
+ let role = config.role.clone();
+
+ match role {
+ Role::Light => service::build_light(config).map(|(task_manager, _)| task_manager),
+ _ => service::build_full(
+ config,
+ None,
+ None,
+ authority_discovery_enabled,
+ 6000,
+ grandpa_pause,
+ ).map(|r| r.0),
+ }
+ })
},
Some(Subcommand::Base(subcommand)) => {
- let runtime = cli.create_runner(subcommand)?;
- let chain_spec = &runtime.config().chain_spec;
+ let runner = cli.create_runner(subcommand)?;
+ let chain_spec = &runner.config().chain_spec;
set_default_ss58_version(chain_spec);
if chain_spec.is_kusama() {
- runtime.run_subcommand(subcommand, |config|
+ runner.run_subcommand(subcommand, |config|
service::new_chain_ops::<
service::kusama_runtime::RuntimeApi,
service::KusamaExecutor,
- service::kusama_runtime::UncheckedExtrinsic,
>(config)
)
} else if chain_spec.is_westend() {
- runtime.run_subcommand(subcommand, |config|
+ runner.run_subcommand(subcommand, |config|
service::new_chain_ops::<
service::westend_runtime::RuntimeApi,
service::WestendExecutor,
- service::westend_runtime::UncheckedExtrinsic,
>(config)
)
} else {
- runtime.run_subcommand(subcommand, |config|
+ runner.run_subcommand(subcommand, |config|
service::new_chain_ops::<
service::polkadot_runtime::RuntimeApi,
service::PolkadotExecutor,
- service::polkadot_runtime::UncheckedExtrinsic,
>(config)
)
}
@@ -215,21 +193,21 @@ pub fn run() -> Result<()> {
}
},
Some(Subcommand::Benchmark(cmd)) => {
- let runtime = cli.create_runner(cmd)?;
- let chain_spec = &runtime.config().chain_spec;
+ let runner = cli.create_runner(cmd)?;
+ let chain_spec = &runner.config().chain_spec;
set_default_ss58_version(chain_spec);
if chain_spec.is_kusama() {
- runtime.sync_run(|config| {
+ runner.sync_run(|config| {
cmd.run::(config)
})
} else if chain_spec.is_westend() {
- runtime.sync_run(|config| {
+ runner.sync_run(|config| {
cmd.run::(config)
})
} else {
- runtime.sync_run(|config| {
+ runner.sync_run(|config| {
cmd.run::(config)
})
}
diff --git a/cli/src/lib.rs b/cli/src/lib.rs
index be2f3c6cd646444761e813d005ace4282ef52e19..385a24d364c8582065827054dcd6d52b88da26d8 100644
--- a/cli/src/lib.rs
+++ b/cli/src/lib.rs
@@ -28,14 +28,14 @@ mod command;
#[cfg(not(feature = "service-rewr"))]
pub use service::{
- AbstractService, ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
+ ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
Block, self, RuntimeApiCollection, TFullClient
};
#[cfg(feature = "service-rewr")]
pub use service_new::{
self as service,
- AbstractService, ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
+ ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
Block, self, RuntimeApiCollection, TFullClient
};
diff --git a/collator/Cargo.toml b/collator/Cargo.toml
index a183901bb4a4c742129b58e0d755d4037094c20a..38cb3ee2e23c9fa9df48f80be7983a49ec19d6d1 100644
--- a/collator/Cargo.toml
+++ b/collator/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "polkadot-collator"
-version = "0.8.11"
+version = "0.8.22"
authors = ["Parity Technologies "]
description = "Collator node implementation"
edition = "2018"
@@ -26,7 +26,7 @@ polkadot-service-new = { path = "../node/service", optional = true }
log = "0.4.8"
tokio = "0.2.13"
futures-timer = "2.0"
-codec = { package = "parity-scale-codec", version = "1.3.0" }
+codec = { package = "parity-scale-codec", version = "1.3.4" }
[dev-dependencies]
keyring = { package = "sp-keyring", git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/collator/src/lib.rs b/collator/src/lib.rs
index 932648f3e1597ff4e7f3d6482d1f20f4a2629a08..8e0cabb0eb9906fbbd056cf3f720ab0294c7d609 100644
--- a/collator/src/lib.rs
+++ b/collator/src/lib.rs
@@ -50,37 +50,34 @@ use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
-use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
-use log::warn;
-use sc_client_api::{StateBackend, BlockchainEvents};
-use sp_blockchain::HeaderBackend;
+use futures::{future, Future, Stream, FutureExt, StreamExt};
use sp_core::Pair;
-use polkadot_primitives::{
- BlockId, Hash, Block,
- parachain::{
- self, BlockData, DutyRoster, HeadData, Id as ParaId,
- PoVBlock, ValidatorId, CollatorPair, LocalValidationData, GlobalValidationSchedule,
- }
-};
-use polkadot_cli::{
- ProvideRuntimeApi, AbstractService, ParachainHost, IdentifyVariant,
- service::{self, Role}
+use polkadot_primitives::v0::{
+ BlockId, Hash, Block, DownwardMessage,
+ BlockData, DutyRoster, HeadData, Id as ParaId,
+ PoVBlock, ValidatorId, CollatorPair, LocalValidationData, GlobalValidationData,
+ Collation, CollationInfo, collator_signature_payload,
};
+use polkadot_cli::service::{self, Role};
pub use polkadot_cli::service::Configuration;
pub use polkadot_cli::Cli;
pub use polkadot_validation::SignedStatement;
-pub use polkadot_primitives::parachain::CollatorId;
+pub use polkadot_primitives::v0::CollatorId;
pub use sc_network::PeerId;
-pub use service::RuntimeApiCollection;
+pub use service::{RuntimeApiCollection, Client};
pub use sc_cli::SubstrateCli;
-use sp_api::{ConstructRuntimeApi, ApiExt, HashFor};
#[cfg(not(feature = "service-rewr"))]
-use polkadot_service::{FullNodeHandles, PolkadotClient};
+use polkadot_service::{FullNodeHandles, AbstractClient, ClientHandle};
#[cfg(feature = "service-rewr")]
use polkadot_service_new::{
self as polkadot_service,
- Error as ServiceError, FullNodeHandles, PolkadotClient,
+ Error as ServiceError, FullNodeHandles, AbstractClient,
};
+use sc_service::SpawnTaskHandle;
+use sp_core::traits::SpawnNamed;
+use sp_runtime::traits::BlakeTwo256;
+use consensus_common::SyncOracle;
+use sc_client_api::Backend as BackendT;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
@@ -100,24 +97,17 @@ impl Network for polkadot_network::protocol::Service {
}
}
-/// Error to return when the head data was invalid.
-#[derive(Clone, Copy, Debug)]
-pub struct InvalidHead;
-
/// Collation errors.
#[derive(Debug)]
pub enum Error {
/// Error on the relay-chain side of things.
Polkadot(String),
- /// Error on the collator side of things.
- Collator(InvalidHead),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
- Error::Collator(_) => write!(f, "Collator node error: Invalid head data"),
}
}
}
@@ -128,18 +118,19 @@ pub trait BuildParachainContext {
type ParachainContext: self::ParachainContext;
/// Build the `ParachainContext`.
- fn build(
+ fn build(
self,
client: Arc,
spawner: SP,
- network: impl Network + Clone + 'static,
+ network: impl Network + SyncOracle + Clone + 'static,
) -> Result
where
- Client: ProvideRuntimeApi + HeaderBackend + BlockchainEvents + Send + Sync + 'static,
- Client::Api: RuntimeApiCollection,
- >::StateBackend: StateBackend>,
- Extrinsic: codec::Codec + Send + Sync + 'static,
- SP: Spawn + Clone + Send + Sync + 'static;
+ SP: SpawnNamed + Clone + Send + Sync + 'static,
+ Backend: BackendT,
+ Backend::State: sp_api::StateBackend,
+ Client: polkadot_service::AbstractClient + 'static,
+ Client::Api: RuntimeApiCollection,
+ ;
}
/// Parachain context needed for collation.
@@ -147,15 +138,16 @@ pub trait BuildParachainContext {
/// This can be implemented through an externally attached service or a stub.
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
- type ProduceCandidate: Future>;
+ type ProduceCandidate: Future>;
/// Produce a candidate, given the relay parent hash, the latest ingress queue information
/// and the last parachain head.
fn produce_candidate(
&mut self,
relay_parent: Hash,
- global_validation: GlobalValidationSchedule,
+ global_validation: GlobalValidationData,
local_validation: LocalValidationData,
+ downward_messages: Vec,
) -> Self::ProduceCandidate;
}
@@ -163,12 +155,12 @@ pub trait ParachainContext: Clone {
pub async fn collate(
relay_parent: Hash,
local_id: ParaId,
- global_validation: GlobalValidationSchedule,
+ global_validation: GlobalValidationData,
local_validation_data: LocalValidationData,
+ downward_messages: Vec,
mut para_context: P,
key: Arc,
-)
- -> Result
+) -> Option
where
P: ParachainContext,
P::ProduceCandidate: Send,
@@ -177,20 +169,21 @@ pub async fn collate(
relay_parent,
global_validation,
local_validation_data,
- ).map_err(Error::Collator).await?;
+ downward_messages,
+ ).await?;
let pov_block = PoVBlock {
block_data,
};
let pov_block_hash = pov_block.hash();
- let signature = key.sign(¶chain::collator_signature_payload(
+ let signature = key.sign(&collator_signature_payload(
&relay_parent,
&local_id,
&pov_block_hash,
));
- let info = parachain::CollationInfo {
+ let info = CollationInfo {
parachain_index: local_id,
relay_parent,
collator: key.public(),
@@ -199,185 +192,201 @@ pub async fn collate
(
pov_block_hash,
};
- let collation = parachain::Collation {
+ let collation = Collation {
info,
pov: pov_block,
};
- Ok(collation)
+ Some(collation)
}
+/// Build a collator service based on the `ClientHandle`.
#[cfg(feature = "service-rewr")]
-fn build_collator_service(
- _spawner: SP,
- _handles: FullNodeHandles,
- _client: Arc,
- _para_id: ParaId,
- _key: Arc,
- _build_parachain_context: P,
-) -> Result, polkadot_service::Error>
+pub fn build_collator_service(
+ spawner: SpawnTaskHandle,
+ handles: FullNodeHandles,
+ client: impl ClientHandle,
+ para_id: ParaId,
+ key: Arc,
+ build_parachain_context: P,
+) -> Result + Send + 'static>>, polkadot_service::Error>
where
- C: PolkadotClient<
- service::Block,
- service::TFullBackend,
- R
- > + 'static,
- R: ConstructRuntimeApi + Sync + Send,
- >::RuntimeApi:
- sp_api::ApiExt<
- service::Block,
- StateBackend = as service::Backend>::State,
- >
- + RuntimeApiCollection<
- Extrinsic,
- StateBackend = as service::Backend>::State,
- >
- + Sync + Send,
P: BuildParachainContext,
P::ParachainContext: Send + 'static,
::ProduceCandidate: Send,
- Extrinsic: service::Codec + Send + Sync + 'static,
- SP: Spawn + Clone + Send + Sync + 'static,
{
Err("Collator is not functional with the new service yet".into())
}
-
-#[cfg(not(feature = "service-rewr"))]
-fn build_collator_service(
- spawner: SP,
- handles: FullNodeHandles,
- client: Arc,
+struct BuildCollationWork {
+ handles: polkadot_service::FullNodeHandles,
para_id: ParaId,
key: Arc,
build_parachain_context: P,
-) -> Result + Send + 'static, polkadot_service::Error>
+ spawner: SpawnTaskHandle,
+}
+
+impl polkadot_service::ExecuteWithClient for BuildCollationWork
where
- C: PolkadotClient<
- service::Block,
- service::TFullBackend,
- R
- > + 'static,
- R: ConstructRuntimeApi + Sync + Send,
- >::RuntimeApi:
- sp_api::ApiExt<
- service::Block,
- StateBackend = as service::Backend>::State,
- >
- + RuntimeApiCollection<
- Extrinsic,
- StateBackend = as service::Backend>::State,
- >
- + Sync + Send,
P: BuildParachainContext,
P::ParachainContext: Send + 'static,
::ProduceCandidate: Send,
- Extrinsic: service::Codec + Send + Sync + 'static,
- SP: Spawn + Clone + Send + Sync + 'static,
{
- let polkadot_network = handles.polkadot_network
- .ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;
-
- // We don't require this here, but we need to make sure that the validation service is started.
- // This service makes sure the collator is joining the correct gossip topics and receives the appropiate
- // messages.
- handles.validation_service_handle
- .ok_or_else(|| "Collator cannot run when validation networking has not been started")?;
-
- let parachain_context = match build_parachain_context.build(
- client.clone(),
- spawner,
- polkadot_network.clone(),
- ) {
- Ok(ctx) => ctx,
- Err(()) => {
- return Err("Could not build the parachain context!".into())
- }
- };
-
- let work = async move {
- let mut notification_stream = client.import_notification_stream();
-
- while let Some(notification) = notification_stream.next().await {
- macro_rules! try_fr {
- ($e:expr) => {
- match $e {
- Ok(x) => x,
- Err(e) => return future::Either::Left(future::err(Error::Polkadot(
- format!("{:?}", e)
- ))),
+ type Output = Result + Send + 'static>>, polkadot_service::Error>;
+
+ fn execute_with_client(self, client: Arc) -> Self::Output
+ where>::StateBackend: sp_api::StateBackend,
+ Backend: sc_client_api::Backend,
+ Backend::State: sp_api::StateBackend,
+ Api: RuntimeApiCollection,
+ Client: AbstractClient + 'static,
+ {
+ let polkadot_network = self.handles
+ .polkadot_network
+ .ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;
+
+ // We don't require this here, but we need to make sure that the validation service is started.
+ // This service makes sure the collator is joining the correct gossip topics and receives the appropiate
+ // messages.
+ self.handles.validation_service_handle
+ .ok_or_else(|| "Collator cannot run when validation networking has not been started")?;
+
+ let parachain_context = match self.build_parachain_context.build(
+ client.clone(),
+ self.spawner.clone(),
+ polkadot_network.clone(),
+ ) {
+ Ok(ctx) => ctx,
+ Err(()) => {
+ return Err("Could not build the parachain context!".into())
+ }
+ };
+
+ let key = self.key;
+ let para_id = self.para_id;
+ let spawner = self.spawner;
+
+ let res = async move {
+ let mut notification_stream = client.import_notification_stream();
+
+ while let Some(notification) = notification_stream.next().await {
+ macro_rules! try_fr {
+ ($e:expr) => {
+ match $e {
+ Ok(x) => x,
+ Err(e) => return future::Either::Left(future::err(Error::Polkadot(
+ format!("{:?}", e)
+ ))),
+ }
}
}
- }
-
- let relay_parent = notification.hash;
- let id = BlockId::hash(relay_parent);
-
- let network = polkadot_network.clone();
- let client = client.clone();
- let key = key.clone();
- let parachain_context = parachain_context.clone();
-
- let work = future::lazy(move |_| {
- let api = client.runtime_api();
- let global_validation = try_fr!(api.global_validation_schedule(&id));
- let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
- Some(local_validation) => local_validation,
- None => return future::Either::Left(future::ok(())),
- };
-
- let validators = try_fr!(api.validators(&id));
-
- let targets = compute_targets(
- para_id,
- validators.as_slice(),
- try_fr!(api.duty_roster(&id)),
- );
- let collation_work = collate(
- relay_parent,
- para_id,
- global_validation,
- local_validation,
- parachain_context,
- key,
- ).map_ok(move |collation| {
- network.distribute_collation(targets, collation)
+ let relay_parent = notification.hash;
+ let id = BlockId::hash(relay_parent);
+
+ let network = polkadot_network.clone();
+ let client = client.clone();
+ let key = key.clone();
+ let parachain_context = parachain_context.clone();
+
+ let work = future::lazy(move |_| {
+ let api = client.runtime_api();
+ let global_validation = try_fr!(api.global_validation_data(&id));
+ let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
+ Some(local_validation) => local_validation,
+ None => return future::Either::Left(future::ok(())),
+ };
+ let downward_messages = try_fr!(api.downward_messages(&id, para_id));
+
+ let validators = try_fr!(api.validators(&id));
+
+ let targets = compute_targets(
+ para_id,
+ validators.as_slice(),
+ try_fr!(api.duty_roster(&id)),
+ );
+
+ let collation_work = collate(
+ relay_parent,
+ para_id,
+ global_validation,
+ local_validation,
+ downward_messages,
+ parachain_context,
+ key,
+ ).map(move |collation| {
+ match collation {
+ Some(collation) => network.distribute_collation(targets, collation),
+ None => log::trace!("Skipping collation as `collate` returned `None`"),
+ }
+
+ Ok(())
+ });
+
+ future::Either::Right(collation_work)
});
- future::Either::Right(collation_work)
- });
+ let deadlined = future::select(
+ work.then(|f| f).boxed(),
+ futures_timer::Delay::new(COLLATION_TIMEOUT)
+ );
- let deadlined = future::select(
- work.then(|f| f).boxed(),
- futures_timer::Delay::new(COLLATION_TIMEOUT)
- );
+ let silenced = deadlined
+ .map(|either| {
+ match either {
+ future::Either::Right(_) => log::warn!("Collation failure: timeout"),
+ future::Either::Left((Err(e), _)) => {
+ log::error!("Collation failed: {:?}", e)
+ }
+ future::Either::Left((Ok(()), _)) => {},
+ }
+ });
- let silenced = deadlined
- .map(|either| {
- if let future::Either::Right(_) = either {
- warn!("Collation failure: timeout");
- }
- });
+ let future = silenced.map(drop);
- let future = silenced.map(drop);
+ spawner.spawn("collation-work", future);
+ }
+ };
- tokio::spawn(future);
- }
- }.boxed();
+ Ok(res.boxed())
+ }
+}
- Ok(work)
+/// Build a collator service based on the `ClientHandle`.
+#[cfg(not(feature = "service-rewr"))]
+pub fn build_collator_service(
+ spawner: SpawnTaskHandle,
+ handles: FullNodeHandles,
+ client: impl ClientHandle,
+ para_id: ParaId,
+ key: Arc,
+ build_parachain_context: P,
+) -> Result + Send + 'static>>, polkadot_service::Error>
+ where
+ P: BuildParachainContext,
+ P::ParachainContext: Send + 'static,
+ ::ProduceCandidate: Send,
+{
+ client.execute_with(BuildCollationWork {
+ handles,
+ para_id,
+ key,
+ build_parachain_context,
+ spawner,
+ })
}
/// Async function that will run the collator node with the given `RelayChainContext` and `ParachainContext`
/// built by the given `BuildParachainContext` and arguments to the underlying polkadot node.
-pub async fn start_collator(
+pub fn start_collator
(
build_parachain_context: P,
para_id: ParaId,
key: Arc,
config: Configuration,
- informant_prefix: Option,
-) -> Result<(), polkadot_service::Error>
+) -> Result<
+ (Pin + Send>>, sc_service::TaskManager),
+ polkadot_service::Error
+>
where
P: 'static + BuildParachainContext,
P::ParachainContext: Send + 'static,
@@ -389,71 +398,30 @@ where
.into());
}
- if config.chain_spec.is_kusama() {
- let (service, client, handlers) = service::kusama_new_full(
- config,
- Some((key.public(), para_id)),
- None,
- false,
- 6000,
- None,
- informant_prefix,
- )?;
- let spawn_handle = service.spawn_task_handle();
- build_collator_service(
- spawn_handle,
- handlers,
- client,
- para_id,
- key,
- build_parachain_context
- )?.await;
- } else if config.chain_spec.is_westend() {
- let (service, client, handlers) = service::westend_new_full(
- config,
- Some((key.public(), para_id)),
- None,
- false,
- 6000,
- None,
- informant_prefix,
- )?;
- let spawn_handle = service.spawn_task_handle();
- build_collator_service(
- spawn_handle,
- handlers,
- client,
- para_id,
- key,
- build_parachain_context
- )?.await;
- } else {
- let (service, client, handles) = service::polkadot_new_full(
- config,
- Some((key.public(), para_id)),
- None,
- false,
- 6000,
- None,
- informant_prefix,
- )?;
- let spawn_handle = service.spawn_task_handle();
- build_collator_service(
- spawn_handle,
- handles,
- client,
- para_id,
- key,
- build_parachain_context,
- )?.await;
- }
-
- Ok(())
+ let (task_manager, client, handles) = polkadot_service::build_full(
+ config,
+ Some((key.public(), para_id)),
+ None,
+ false,
+ 6000,
+ None,
+ )?;
+
+ let future = build_collator_service(
+ task_manager.spawn_handle(),
+ handles,
+ client,
+ para_id,
+ key,
+ build_parachain_context,
+ )?;
+
+ Ok((future, task_manager))
}
#[cfg(not(feature = "service-rewr"))]
fn compute_targets(para_id: ParaId, session_keys: &[ValidatorId], roster: DutyRoster) -> HashSet {
- use polkadot_primitives::parachain::Chain;
+ use polkadot_primitives::v0::Chain;
roster.validator_duty.iter().enumerate()
.filter(|&(_, c)| c == &Chain::Parachain(para_id))
@@ -470,19 +438,20 @@ mod tests {
struct DummyParachainContext;
impl ParachainContext for DummyParachainContext {
- type ProduceCandidate = future::Ready>;
+ type ProduceCandidate = future::Ready>;
fn produce_candidate(
&mut self,
_relay_parent: Hash,
- _global: GlobalValidationSchedule,
+ _global: GlobalValidationData,
_local_validation: LocalValidationData,
+ _: Vec,
) -> Self::ProduceCandidate {
// send messages right back.
- future::ok((
+ future::ready(Some((
BlockData(vec![1, 2, 3, 4, 5,]),
HeadData(vec![9, 9, 9]),
- ))
+ )))
}
}
@@ -491,31 +460,37 @@ mod tests {
impl BuildParachainContext for BuildDummyParachainContext {
type ParachainContext = DummyParachainContext;
- fn build(
+ fn build(
self,
- _: Arc,
+ _: Arc,
_: SP,
_: impl Network + Clone + 'static,
- ) -> Result {
+ ) -> Result
+ where
+ SP: SpawnNamed + Clone + Send + Sync + 'static,
+ Backend: BackendT,
+ Backend::State: sp_api::StateBackend,
+ Client: polkadot_service::AbstractClient + 'static,
+ Client::Api: RuntimeApiCollection,
+ {
Ok(DummyParachainContext)
}
}
- // Make sure that the future returned by `start_collator` implementes `Send`.
+ // Make sure that the future returned by `start_collator` implements `Send`.
#[test]
fn start_collator_is_send() {
fn check_send(_: T) {}
let cli = Cli::from_iter(&["-dev"]);
- let task_executor = Arc::new(|_, _| unimplemented!());
- let config = cli.create_configuration(&cli.run.base, task_executor).unwrap();
+ let task_executor = |_, _| async {};
+ let config = cli.create_configuration(&cli.run.base, task_executor.into()).unwrap();
check_send(start_collator(
BuildDummyParachainContext,
0.into(),
Arc::new(CollatorPair::generate().0),
config,
- None,
));
}
}
diff --git a/core-primitives/Cargo.toml b/core-primitives/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..cd4dac0f187d1cf4ced8ba938b48521fc3bf2669
--- /dev/null
+++ b/core-primitives/Cargo.toml
@@ -0,0 +1,20 @@
+[package]
+name = "polkadot-core-primitives"
+version = "0.7.30"
+authors = ["Parity Technologies "]
+edition = "2018"
+
+[dependencies]
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
+sp-std = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
+sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
+codec = { package = "parity-scale-codec", version = "1.3.4", default-features = false, features = [ "derive" ] }
+
+[features]
+default = [ "std" ]
+std = [
+ "sp-core/std",
+ "sp-runtime/std",
+ "sp-std/std",
+ "codec/std",
+]
diff --git a/core-primitives/src/lib.rs b/core-primitives/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..ffb346467d9e53c0c64ebca4438a9260492fdd8e
--- /dev/null
+++ b/core-primitives/src/lib.rs
@@ -0,0 +1,101 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+#![cfg_attr(not(feature = "std"), no_std)]
+
+//! Core Polkadot types.
+//!
+//! These core Polkadot types are used by the relay chain and the Parachains.
+
+use sp_runtime::{generic, MultiSignature, traits::{Verify, BlakeTwo256, IdentifyAccount}};
+
+/// The block number type used by Polkadot.
+/// 32-bits will allow for 136 years of blocks assuming 1 block per second.
+pub type BlockNumber = u32;
+
+/// An instant or duration in time.
+pub type Moment = u64;
+
+/// Alias to type for a signature for a transaction on the relay chain. This allows one of several
+/// kinds of underlying crypto to be used, so isn't a fixed size when encoded.
+pub type Signature = MultiSignature;
+
+/// Alias to the public key used for this chain, actually a `MultiSigner`. Like the signature, this
+/// also isn't a fixed size when encoded, as different cryptos have different size public keys.
+pub type AccountPublic = ::Signer;
+
+/// Alias to the opaque account ID type for this chain, actually a `AccountId32`. This is always
+/// 32 bytes.
+pub type AccountId = ::AccountId;
+
+/// The type for looking up accounts. We don't expect more than 4 billion of them.
+pub type AccountIndex = u32;
+
+/// Identifier for a chain. 32-bit should be plenty.
+pub type ChainId = u32;
+
+/// A hash of some data used by the relay chain.
+pub type Hash = sp_core::H256;
+
+/// Index of a transaction in the relay chain. 32-bit should be plenty.
+pub type Nonce = u32;
+
+/// The balance of an account.
+/// 128-bits (or 38 significant decimal figures) will allow for 10m currency (10^7) at a resolution
+/// to all for one second's worth of an annualised 50% reward be paid to a unit holder (10^11 unit
+/// denomination), or 10^18 total atomic units, to grow at 50%/year for 51 years (10^9 multiplier)
+/// for an eventual total of 10^27 units (27 significant decimal figures).
+/// We round denomination to 10^12 (12 sdf), and leave the other redundancy at the upper end so
+/// that 32 bits may be multiplied with a balance in 128 bits without worrying about overflow.
+pub type Balance = u128;
+
+/// Header type.
+pub type Header = generic::Header;
+/// Block type.
+pub type Block = generic::Block;
+/// Block ID.
+pub type BlockId = generic::BlockId;
+
+/// Opaque, encoded, unchecked extrinsic.
+pub use sp_runtime::OpaqueExtrinsic as UncheckedExtrinsic;
+
+/// The information that goes alongside a transfer_into_parachain operation. Entirely opaque, it
+/// will generally be used for identifying the reason for the transfer. Typically it will hold the
+/// destination account to which the transfer should be credited. If still more information is
+/// needed, then this should be a hash with the pre-image presented via an off-chain mechanism on
+/// the parachain.
+pub type Remark = [u8; 32];
+
+/// These are special "control" messages that can be passed from the Relaychain to a parachain.
+/// They should be handled by all parachains.
+#[derive(codec::Encode, codec::Decode, Clone, sp_runtime::RuntimeDebug, PartialEq)]
+pub enum DownwardMessage {
+ /// Some funds were transferred into the parachain's account. The hash is the identifier that
+ /// was given with the transfer.
+ TransferInto(AccountId, Balance, Remark),
+ /// An opaque blob of data. The relay chain must somehow know how to form this so that the
+ /// destination parachain does something sensible.
+ ///
+ /// NOTE: Be very careful not to allow users to place arbitrary size information in here.
+ Opaque(sp_std::vec::Vec),
+ /// XCMP message for the Parachain.
+ XCMPMessage(sp_std::vec::Vec),
+}
+
+/// V1 primitives.
+pub mod v1 {
+ pub use super::*;
+}
diff --git a/docker/sentry-docker-compose.yml b/docker/sentry-docker-compose.yml
index 18e7621fe71f8224952a0ad08423ca20d8a0f19d..af0510592d0858c2039512ba3c5c916bb2f4f7be 100644
--- a/docker/sentry-docker-compose.yml
+++ b/docker/sentry-docker-compose.yml
@@ -47,7 +47,7 @@ services:
- "--name"
- "${VALIDATOR_NANE:-AlicesNode}"
- "--reserved-nodes"
- - "${VALIDATOR_RESERVED_NODES:-/dns4/sentry/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi}"
+ - "${VALIDATOR_RESERVED_NODES:-/dns/sentry/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi}"
# Not only bind to localhost.
- "--ws-external"
- "--rpc-external"
@@ -90,7 +90,7 @@ services:
- "--name"
- "${SENTRY_NAME:-CharliesNode}"
- "--bootnodes"
- - "${SENTRY_BOOTNODES:-/dns4/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR}"
+ - "${SENTRY_BOOTNODES:-/dns/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR}"
- "--no-telemetry"
- "--rpc-cors"
- "all"
diff --git a/erasure-coding/Cargo.toml b/erasure-coding/Cargo.toml
index 8d4aef53086ced562d45d799dbc3bc871883bdab..1965150ca67abef411509179466d50f7ffb964d1 100644
--- a/erasure-coding/Cargo.toml
+++ b/erasure-coding/Cargo.toml
@@ -1,13 +1,13 @@
[package]
name = "polkadot-erasure-coding"
-version = "0.8.11"
+version = "0.8.22"
authors = ["Parity Technologies "]
edition = "2018"
[dependencies]
primitives = { package = "polkadot-primitives", path = "../primitives" }
reed_solomon = { package = "reed-solomon-erasure", version = "4.0.2"}
-codec = { package = "parity-scale-codec", version = "1.3.0", default-features = false, features = ["derive"] }
+codec = { package = "parity-scale-codec", version = "1.3.4", default-features = false, features = ["derive"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
trie = { package = "sp-trie", git = "https://github.com/paritytech/substrate", branch = "master" }
derive_more = "0.15.0"
diff --git a/erasure-coding/src/lib.rs b/erasure-coding/src/lib.rs
index 98a2776d8848a543b31e170e9cb3f56ff4906929..708a167d627675abaf3ec13befed01400bce219c 100644
--- a/erasure-coding/src/lib.rs
+++ b/erasure-coding/src/lib.rs
@@ -26,8 +26,8 @@
use codec::{Encode, Decode};
use reed_solomon::galois_16::{self, ReedSolomon};
-use primitives::{Hash as H256, BlakeTwo256, HashT};
-use primitives::parachain::AvailableData;
+use primitives::v0::{self, Hash as H256, BlakeTwo256, HashT};
+use primitives::v1;
use sp_core::Blake2Hasher;
use trie::{EMPTY_PREFIX, MemoryDB, Trie, TrieMut, trie_types::{TrieDBMut, TrieDB}};
@@ -124,14 +124,32 @@ fn code_params(n_validators: usize) -> Result {
})
}
+/// Obtain erasure-coded chunks for v0 `AvailableData`, one for each validator.
+///
+/// Works only up to 65536 validators, and `n_validators` must be non-zero.
+pub fn obtain_chunks_v0(n_validators: usize, data: &v0::AvailableData)
+ -> Result>, Error>
+{
+ obtain_chunks(n_validators, data)
+}
+
+/// Obtain erasure-coded chunks for v1 `AvailableData`, one for each validator.
+///
+/// Works only up to 65536 validators, and `n_validators` must be non-zero.
+pub fn obtain_chunks_v1(n_validators: usize, data: &v1::AvailableData)
+ -> Result>, Error>
+{
+ obtain_chunks(n_validators, data)
+}
+
/// Obtain erasure-coded chunks, one for each validator.
///
/// Works only up to 65536 validators, and `n_validators` must be non-zero.
-pub fn obtain_chunks(n_validators: usize, available_data: &AvailableData)
+fn obtain_chunks(n_validators: usize, data: &T)
-> Result>, Error>
{
let params = code_params(n_validators)?;
- let encoded = available_data.encode();
+ let encoded = data.encode();
if encoded.is_empty() {
return Err(Error::BadPayload);
@@ -145,15 +163,42 @@ pub fn obtain_chunks(n_validators: usize, available_data: &AvailableData)
Ok(shards.into_iter().map(|w| w.into_inner()).collect())
}
-/// Reconstruct the block data from a set of chunks.
+/// Reconstruct the v0 available data from a set of chunks.
+///
+/// Provide an iterator containing chunk data and the corresponding index.
+/// The indices of the present chunks must be indicated. If too few chunks
+/// are provided, recovery is not possible.
+///
+/// Works only up to 65536 validators, and `n_validators` must be non-zero.
+pub fn reconstruct_v0<'a, I: 'a>(n_validators: usize, chunks: I)
+ -> Result
+ where I: IntoIterator-
+{
+ reconstruct(n_validators, chunks)
+}
+
+/// Reconstruct the v1 available data from a set of chunks.
+///
+/// Provide an iterator containing chunk data and the corresponding index.
+/// The indices of the present chunks must be indicated. If too few chunks
+/// are provided, recovery is not possible.
+///
+/// Works only up to 65536 validators, and `n_validators` must be non-zero.
+pub fn reconstruct_v1<'a, I: 'a>(n_validators: usize, chunks: I)
+ -> Result
+ where I: IntoIterator-
+{
+ reconstruct(n_validators, chunks)
+}
+
+/// Reconstruct decodable data from a set of chunks.
///
/// Provide an iterator containing chunk data and the corresponding index.
/// The indices of the present chunks must be indicated. If too few chunks
/// are provided, recovery is not possible.
///
/// Works only up to 65536 validators, and `n_validators` must be non-zero.
-pub fn reconstruct<'a, I: 'a>(n_validators: usize, chunks: I)
- -> Result
+fn reconstruct<'a, I: 'a, T: Decode>(n_validators: usize, chunks: I) -> Result
where I: IntoIterator-
{
let params = code_params(n_validators)?;
@@ -343,7 +388,7 @@ impl<'a, I: Iterator
- > codec::Input for ShardInput<'a, I> {
#[cfg(test)]
mod tests {
use super::*;
- use primitives::parachain::{BlockData, PoVBlock};
+ use primitives::v0::{AvailableData, BlockData, PoVBlock};
#[test]
fn field_order_is_right_size() {
@@ -420,7 +465,7 @@ mod tests {
assert_eq!(chunks.len(), 10);
// any 4 chunks should work.
- let reconstructed = reconstruct(
+ let reconstructed: AvailableData = reconstruct(
10,
[
(&*chunks[1], 1),
diff --git a/network/Cargo.toml b/network/Cargo.toml
index 96546dfbd605d577441242dba97393f4e3e5002a..748bc20c097dc43d4c098f5e78a5bc4874ad2d0e 100644
--- a/network/Cargo.toml
+++ b/network/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "polkadot-network"
-version = "0.8.11"
+version = "0.8.22"
authors = ["Parity Technologies
"]
description = "Polkadot-specific networking protocol"
edition = "2018"
@@ -14,18 +14,20 @@ av_store = { package = "polkadot-availability-store", path = "../availability-st
polkadot-validation = { path = "../validation" }
polkadot-primitives = { path = "../primitives" }
polkadot-erasure-coding = { path = "../erasure-coding" }
-codec = { package = "parity-scale-codec", version = "1.3.0", default-features = false, features = ["derive"] }
+codec = { package = "parity-scale-codec", version = "1.3.4", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network-gossip = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
-futures = "0.3.4"
+futures = "0.3.5"
log = "0.4.8"
exit-future = "0.2.0"
futures-timer = "2.0"
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
wasm-timer = "0.2.4"
+rand = "0.7.3"
[dev-dependencies]
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/network/src/legacy/collator_pool.rs b/network/src/legacy/collator_pool.rs
index a0c0a0458e908eeb396883aa80304a29a47095ab..f2b168e0f592b3c85cf11c8948995d9f88e68465 100644
--- a/network/src/legacy/collator_pool.rs
+++ b/network/src/legacy/collator_pool.rs
@@ -17,8 +17,7 @@
//! Bridge between the network and consensus service for getting collations to it.
use codec::{Encode, Decode};
-use polkadot_primitives::Hash;
-use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
+use polkadot_primitives::v0::{Hash, CollatorId, Id as ParaId, Collation};
use sc_network::PeerId;
use futures::channel::oneshot;
@@ -236,7 +235,7 @@ impl CollatorPool {
mod tests {
use super::*;
use sp_core::crypto::UncheckedInto;
- use polkadot_primitives::parachain::{CollationInfo, BlockData, PoVBlock};
+ use polkadot_primitives::v0::{CollationInfo, BlockData, PoVBlock};
use futures::executor::block_on;
fn make_pov(block_data: Vec) -> PoVBlock {
diff --git a/network/src/legacy/gossip/attestation.rs b/network/src/legacy/gossip/attestation.rs
index a47f75288bf40220bcc639003d30c38ab1e492fb..2d20ce63b995450fbcc229a9363691739397175e 100644
--- a/network/src/legacy/gossip/attestation.rs
+++ b/network/src/legacy/gossip/attestation.rs
@@ -33,7 +33,7 @@
use sc_network_gossip::{ValidationResult as GossipValidationResult};
use sc_network::ReputationChange;
use polkadot_validation::GenericStatement;
-use polkadot_primitives::Hash;
+use polkadot_primitives::v0::Hash;
use std::collections::HashMap;
diff --git a/network/src/legacy/gossip/mod.rs b/network/src/legacy/gossip/mod.rs
index 7e97eb688b15cd8cac4d84c100628ef76e7a6a41..9e18d7ce217109f8449740baffb69a37ce73308d 100644
--- a/network/src/legacy/gossip/mod.rs
+++ b/network/src/legacy/gossip/mod.rs
@@ -58,8 +58,8 @@ use sc_network_gossip::{
ValidatorContext, MessageIntent,
};
use polkadot_validation::{SignedStatement};
-use polkadot_primitives::{Block, Hash};
-use polkadot_primitives::parachain::{
+use polkadot_primitives::v0::{
+ Block, Hash,
ParachainHost, ValidatorId, ErasureChunk as PrimitiveChunk, SigningContext, PoVBlock,
};
use polkadot_erasure_coding::{self as erasure};
@@ -295,7 +295,7 @@ pub(crate) fn pov_block_topic(parent_hash: Hash) -> Hash {
pub fn register_validator(
service: Arc>,
chain: C,
- executor: &impl futures::task::Spawn,
+ executor: &impl sp_core::traits::SpawnNamed,
) -> RegisteredMessageValidator
{
let s = service.clone();
@@ -331,12 +331,7 @@ pub fn register_validator(
let fut = futures::future::poll_fn(move |cx| {
gossip_engine.lock().poll_unpin(cx)
});
- let spawn_res = executor.spawn_obj(futures::task::FutureObj::from(Box::new(fut)));
-
- // Note: we consider the chances of an error to spawn a background task almost null.
- if spawn_res.is_err() {
- log::error!(target: "polkadot-gossip", "Failed to spawn background task");
- }
+ executor.spawn("polkadot-legacy-gossip-engine", fut.boxed());
}
RegisteredMessageValidator {
@@ -760,7 +755,7 @@ mod tests {
use sc_network_gossip::Validator as ValidatorT;
use std::sync::mpsc;
use parking_lot::Mutex;
- use polkadot_primitives::parachain::{AbridgedCandidateReceipt, BlockData};
+ use polkadot_primitives::v0::{AbridgedCandidateReceipt, BlockData};
use sp_core::sr25519::Signature as Sr25519Signature;
use polkadot_validation::GenericStatement;
diff --git a/network/src/legacy/local_collations.rs b/network/src/legacy/local_collations.rs
index f1a6615e88b82cb044de6823be6b44d03f2363c3..d85911548613bf4558755ec53386f0c2619e9360 100644
--- a/network/src/legacy/local_collations.rs
+++ b/network/src/legacy/local_collations.rs
@@ -19,11 +19,12 @@
//! Collations are attempted to be repropagated when a new validator connects,
//! a validator changes his session key, or when they are generated.
-use polkadot_primitives::{Hash, parachain::{ValidatorId}};
+use polkadot_primitives::v0::{Hash, ValidatorId};
use crate::legacy::collator_pool::Role;
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use wasm_timer::Instant;
+use rand::seq::SliceRandom;
const LIVE_FOR: Duration = Duration::from_secs(60 * 5);
@@ -106,9 +107,7 @@ impl LocalCollations {
relay_parent: Hash,
targets: HashSet,
collation: C
- )
- -> impl Iterator- + 'a
- {
+ ) -> impl Iterator
- + 'a {
self.local_collations.insert(relay_parent, LocalCollation {
targets,
collation,
@@ -119,8 +118,17 @@ impl
LocalCollations {
.expect("just inserted to this key; qed");
let borrowed_collation = &local.collation;
+
+ // If we are conected to multiple validators,
+ // make sure we always send the collation to one of the validators
+ // we are registered as backup. This ensures that one collator that
+ // is primary at multiple validators, desn't block the Parachain from progressing.
+ let mut rng = rand::thread_rng();
+ let diff = local.targets.difference(&self.primary_for).collect::>();
+
local.targets
.intersection(&self.primary_for)
+ .chain(diff.choose(&mut rng).map(|r| r.clone()))
.map(move |k| (k.clone(), borrowed_collation.clone()))
}
@@ -136,7 +144,7 @@ impl LocalCollations {
mod tests {
use super::*;
use sp_core::crypto::UncheckedInto;
- use polkadot_primitives::parachain::ValidatorId;
+ use polkadot_primitives::v0::ValidatorId;
#[test]
fn add_validator_with_ready_collation() {
@@ -149,7 +157,7 @@ mod tests {
};
let mut tracker = LocalCollations::new();
- assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none());
+ assert!(tracker.add_collation(relay_parent, targets, 5).next().is_some());
assert_eq!(tracker.note_validator_role(key, Role::Primary), vec![(relay_parent, 5)]);
}
@@ -165,7 +173,7 @@ mod tests {
};
let mut tracker: LocalCollations = LocalCollations::new();
- assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none());
+ assert!(tracker.add_collation(relay_parent, targets, 5).next().is_some());
assert!(tracker.note_validator_role(orig_key.clone(), Role::Primary).is_empty());
assert_eq!(tracker.fresh_key(&orig_key, &new_key), vec![(relay_parent, 5u8)]);
}
diff --git a/network/src/legacy/mod.rs b/network/src/legacy/mod.rs
index 28ea77a6bdc10cac191bb184ae70f4f234071602..42698657c05355e2a2736fb0965d726c31dbfddb 100644
--- a/network/src/legacy/mod.rs
+++ b/network/src/legacy/mod.rs
@@ -25,7 +25,7 @@ pub mod gossip;
use codec::Decode;
use futures::prelude::*;
-use polkadot_primitives::Hash;
+use polkadot_primitives::v0::Hash;
use sc_network::PeerId;
use sc_network_gossip::TopicNotification;
use log::debug;
diff --git a/network/src/lib.rs b/network/src/lib.rs
index 5048f09adaf51d17dd108686f783cb7c7d41d037..eaed7b34d2cb11af22d2dc996c86a456af2163bc 100644
--- a/network/src/lib.rs
+++ b/network/src/lib.rs
@@ -21,7 +21,7 @@
#![recursion_limit="256"]
-use polkadot_primitives::{Block, Hash, BlakeTwo256, HashT};
+use polkadot_primitives::v0::{Block, Hash, BlakeTwo256, HashT};
pub mod legacy;
pub mod protocol;
diff --git a/network/src/protocol/mod.rs b/network/src/protocol/mod.rs
index 0ed2d9ac4395014667b91d201161127a9455df5d..2626a5a3735ffd9b641e651d6157e44751a10d27 100644
--- a/network/src/protocol/mod.rs
+++ b/network/src/protocol/mod.rs
@@ -26,16 +26,14 @@ use codec::{Decode, Encode};
use futures::channel::{mpsc, oneshot};
use futures::future::Either;
use futures::prelude::*;
-use futures::task::{Spawn, SpawnExt, Context, Poll};
+use futures::task::{Context, Poll};
use futures::stream::{FuturesUnordered, StreamFuture};
use log::{debug, trace};
-use polkadot_primitives::{
+use polkadot_primitives::v0::{
Hash, Block,
- parachain::{
- PoVBlock, ValidatorId, ValidatorIndex, Collation, AbridgedCandidateReceipt,
- ErasureChunk, ParachainHost, Id as ParaId, CollatorId,
- },
+ PoVBlock, ValidatorId, ValidatorIndex, Collation, AbridgedCandidateReceipt,
+ ErasureChunk, ParachainHost, Id as ParaId, CollatorId,
};
use polkadot_validation::{
SharedTable, TableRouter, Network as ParachainNetwork, Validated, GenericStatement, Collators,
@@ -44,6 +42,7 @@ use polkadot_validation::{
use sc_network::{ObservedRole, Event, PeerId};
use sp_api::ProvideRuntimeApi;
use sp_runtime::ConsensusEngineId;
+use sp_core::traits::SpawnNamed;
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::pin::Pin;
@@ -126,7 +125,9 @@ enum ServiceToWorkerMsg {
/// Messages from a background task to the main worker task.
enum BackgroundToWorkerMsg {
// Spawn a given future.
- Spawn(future::BoxFuture<'static, ()>),
+ //
+ // The name is used for the future task.
+ Spawn(&'static str, future::BoxFuture<'static, ()>),
}
/// Operations that a handle to an underlying network service should provide.
@@ -221,7 +222,7 @@ pub fn start(
C: ChainContext + 'static,
Api: ProvideRuntimeApi + Send + Sync + 'static,
Api::Api: ParachainHost,
- SP: Spawn + Clone + Send + 'static,
+ SP: SpawnNamed + Clone + Send + Unpin + 'static,
{
const SERVICE_TO_WORKER_BUF: usize = 256;
@@ -234,67 +235,73 @@ pub fn start(
chain_context,
&executor,
);
- executor.spawn(worker_loop(
- config,
- service.clone(),
- gossip_validator,
- api,
- worker_receiver,
- executor.clone(),
- ))?;
+ executor.spawn(
+ "polkadot-network-worker",
+ worker_loop(
+ config,
+ service.clone(),
+ gossip_validator,
+ api,
+ worker_receiver,
+ executor.clone(),
+ ).boxed(),
+ );
let polkadot_service = Service {
sender: worker_sender.clone(),
network_service: service.clone(),
};
- executor.spawn(async move {
- while let Some(event) = event_stream.next().await {
- let res = match event {
- Event::Dht(_) => continue,
- Event::NotificationStreamOpened {
- remote,
- engine_id,
- role,
- } => {
- if engine_id != POLKADOT_ENGINE_ID { continue }
-
- worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await
- },
- Event::NotificationStreamClosed {
- remote,
- engine_id,
- } => {
- if engine_id != POLKADOT_ENGINE_ID { continue }
-
- worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await
- },
- Event::NotificationsReceived {
- remote,
- messages,
- } => {
- let our_notifications = messages.into_iter()
- .filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID {
- Some(message)
- } else {
- None
- })
- .collect();
+ executor.spawn(
+ "polkadot-network-notifications",
+ async move {
+ while let Some(event) = event_stream.next().await {
+ let res = match event {
+ Event::Dht(_) => continue,
+ Event::NotificationStreamOpened {
+ remote,
+ engine_id,
+ role,
+ } => {
+ if engine_id != POLKADOT_ENGINE_ID { continue }
+
+ worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await
+ },
+ Event::NotificationStreamClosed {
+ remote,
+ engine_id,
+ } => {
+ if engine_id != POLKADOT_ENGINE_ID { continue }
- worker_sender.send(
- ServiceToWorkerMsg::PeerMessage(remote, our_notifications)
- ).await
- }
- };
+ worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await
+ },
+ Event::NotificationsReceived {
+ remote,
+ messages,
+ } => {
+ let our_notifications = messages.into_iter()
+ .filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID {
+ Some(message)
+ } else {
+ None
+ })
+ .collect();
+
+ worker_sender.send(
+ ServiceToWorkerMsg::PeerMessage(remote, our_notifications)
+ ).await
+ }
+ };
- if let Err(e) = res {
- // full is impossible here, as we've `await`ed the value being sent.
- if e.is_disconnected() {
- break
+ if let Err(e) = res {
+ // full is impossible here, as we've `await`ed the value being sent.
+ if e.is_disconnected() {
+ break
+ }
}
}
- }
- })?;
+ }.boxed(),
+ );
Ok(polkadot_service)
}
@@ -395,28 +402,6 @@ struct ConsensusNetworkingInstance {
_drop_signal: exit_future::Signal,
}
-/// A utility future that resolves when the receiving end of a channel has hung up.
-///
-/// This is an `.await`-friendly interface around `poll_canceled`.
-// TODO: remove in favor of https://github.com/rust-lang/futures-rs/pull/2092/
-// once published.
-#[must_use = "futures do nothing unless you `.await` or poll them"]
-#[derive(Debug)]
-pub struct AwaitCanceled<'a, T> {
- inner: &'a mut oneshot::Sender,
-}
-
-impl Future for AwaitCanceled<'_, T> {
- type Output = ();
-
- fn poll(
- mut self: Pin<&mut Self>,
- cx: &mut futures::task::Context<'_>,
- ) -> futures::task::Poll<()> {
- self.inner.poll_canceled(cx)
- }
-}
-
/// Protocol configuration.
#[derive(Default)]
pub struct Config {
@@ -845,7 +830,7 @@ struct Worker {
impl Worker where
Api: ProvideRuntimeApi + Send + Sync + 'static,
Api::Api: ParachainHost,
- Sp: Spawn + Clone,
+ Sp: SpawnNamed + Clone + Unpin + 'static,
Gossip: GossipOps,
{
// spawns a background task to spawn consensus networking.
@@ -888,14 +873,18 @@ impl Worker where
// glue the incoming messages, shared table, and validation
// work together.
- let _ = self.executor.spawn(statement_import_loop(
- relay_parent,
- table,
- self.api.clone(),
- self.gossip_handle.clone(),
- self.background_to_main_sender.clone(),
- exit,
- ));
+ self.executor.spawn(
+ "polkadot-statement-import-loop",
+ statement_import_loop(
+ relay_parent,
+ table,
+ self.api.clone(),
+ self.gossip_handle.clone(),
+ self.background_to_main_sender.clone(),
+ exit,
+ self.executor.clone(),
+ ).boxed(),
+ );
}
fn handle_service_message(&mut self, message: ServiceToWorkerMsg) {
@@ -932,12 +921,15 @@ impl Worker where
// before placing in the pool, so we can safely check by candidate hash.
let get_msg = fetch_pov_from_gossip(&candidate, &self.gossip_handle);
- let _ = self.executor.spawn(async move {
- let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
- if let Either::Left((pov_block, _)) = res {
- let _ = sender.send(pov_block);
- }
- });
+ self.executor.spawn(
+ "polkadot-fetch-pov-block",
+ async move {
+ let res = future::select(get_msg, sender.cancellation()).await;
+ if let Either::Left((pov_block, _)) = res {
+ let _ = sender.send(pov_block);
+ }
+ }.boxed(),
+ );
}
ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
@@ -963,12 +955,15 @@ impl Worker where
"gossip message streams do not conclude early; qed"
));
- let _ = self.executor.spawn(async move {
- let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
- if let Either::Left((chunk, _)) = res {
- let _ = sender.send(chunk);
- }
- });
+ self.executor.spawn(
+ "polkadot-fetch-erasure-chunk",
+ async move {
+ let res = future::select(get_msg, sender.cancellation()).await;
+ if let Either::Left((chunk, _)) = res {
+ let _ = sender.send(chunk);
+ }
+ }.boxed(),
+ );
}
ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
@@ -1017,8 +1012,8 @@ impl Worker where
fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) {
match message {
- BackgroundToWorkerMsg::Spawn(task) => {
- let _ = self.executor.spawn(task);
+ BackgroundToWorkerMsg::Spawn(name, task) => {
+ let _ = self.executor.spawn(name, task);
}
}
}
@@ -1068,7 +1063,7 @@ async fn worker_loop(
) where
Api: ProvideRuntimeApi + Send + Sync + 'static,
Api::Api: ParachainHost,
- Sp: Spawn + Clone,
+ Sp: SpawnNamed + Clone + Unpin + 'static,
{
const BACKGROUND_TO_MAIN_BUF: usize = 16;
@@ -1155,6 +1150,7 @@ async fn statement_import_loop(
gossip_handle: impl GossipOps,
mut to_worker: mpsc::Sender,
mut exit: exit_future::Exit,
+ spawner: impl SpawnNamed + Clone + Unpin + 'static,
) where
Api: ProvideRuntimeApi + Send + Sync + 'static,
Api::Api: ParachainHost,
@@ -1227,7 +1223,7 @@ async fn statement_import_loop(
let table = table.clone();
let gossip_handle = gossip_handle.clone();
- let work = producer.prime(api.clone()).validate().map(move |res| {
+ let work = producer.prime(api.clone(), spawner.clone()).validate().map(move |res| {
let validated = match res {
Err(e) => {
debug!(target: "p_net", "Failed to act on statement: {}", e);
@@ -1250,7 +1246,7 @@ async fn statement_import_loop(
let work = future::select(work.boxed(), exit.clone()).map(drop);
if let Err(_) = to_worker.send(
- BackgroundToWorkerMsg::Spawn(work.boxed())
+ BackgroundToWorkerMsg::Spawn("polkadot-statement-import-loop-sub-task", work.boxed())
).await {
// can fail only if remote has hung up - worker is dead,
// we should die too. this is defensive, since the exit future
@@ -1492,6 +1488,16 @@ impl av_store::ErasureNetworking for Service {
}
}
+impl sp_consensus::SyncOracle for Service where for<'r> &'r N: sp_consensus::SyncOracle {
+ fn is_major_syncing(&mut self) -> bool {
+ self.network_service.is_major_syncing()
+ }
+
+ fn is_offline(&mut self) -> bool {
+ self.network_service.is_offline()
+ }
+}
+
/// Errors when interacting with the statement router.
#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum RouterError {
diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs
index a1cad24be34dce79bb54808f43a378a2a6d109ce..3bc4537cfa742fcab785674fd1dc8ee1983d9ce2 100644
--- a/network/src/protocol/tests.rs
+++ b/network/src/protocol/tests.rs
@@ -17,20 +17,20 @@ use super::*;
use crate::legacy::gossip::GossipPoVBlock;
use parking_lot::Mutex;
-use polkadot_primitives::Block;
-use polkadot_primitives::parachain::{
+use polkadot_primitives::v0::{
+ Block,
Id as ParaId, Chain, DutyRoster, ParachainHost, ValidatorId,
Retriable, CollatorId, AbridgedCandidateReceipt,
- GlobalValidationSchedule, LocalValidationData, ErasureChunk, SigningContext,
+ GlobalValidationData, LocalValidationData, SigningContext,
PoVBlock, BlockData, ValidationCode,
};
use polkadot_validation::{SharedTable, TableRouter};
-use av_store::{Store as AvailabilityStore, ErasureNetworking};
+use av_store::Store as AvailabilityStore;
use sc_network_gossip::TopicNotification;
use sp_api::{ApiRef, ProvideRuntimeApi};
use sp_runtime::traits::Block as BlockT;
-use sp_core::crypto::Pair;
+use sp_core::{crypto::Pair, testing::TaskExecutor};
use sp_keyring::Sr25519Keyring;
use futures::executor::LocalPool;
@@ -66,10 +66,6 @@ impl MockGossip {
self.inner.lock().insert(topic, (rx, o_tx));
(tx, o_rx)
}
-
- fn contains_listener(&self, topic: &Hash) -> bool {
- self.inner.lock().contains_key(topic)
- }
}
impl NetworkServiceOps for MockNetworkOps {
@@ -167,7 +163,7 @@ sp_api::mock_impl_runtime_apis! {
Some(ValidationCode(Vec::new()))
}
- fn global_validation_schedule() -> GlobalValidationSchedule {
+ fn global_validation_data() -> GlobalValidationData {
Default::default()
}
@@ -185,6 +181,9 @@ sp_api::mock_impl_runtime_apis! {
parent_hash: Default::default(),
}
}
+ fn downward_messages(_: ParaId) -> Vec {
+ Vec::new()
+ }
}
}
@@ -240,7 +239,7 @@ fn test_setup(config: Config) -> (
mock_gossip.clone(),
api.clone(),
worker_rx,
- pool.spawner(),
+ TaskExecutor::new(),
);
let service = Service {
@@ -464,68 +463,6 @@ fn validator_key_spillover_cleaned() {
});
}
-#[test]
-fn erasure_fetch_drop_also_drops_gossip_sender() {
- let (service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
- let candidate_hash = [1; 32].into();
-
- let expected_index = 1;
-
- let spawner = pool.spawner();
-
- spawner.spawn_local(worker_task).unwrap();
- let topic = crate::erasure_coding_topic(&candidate_hash);
- let (mut gossip_tx, gossip_taken_rx) = gossip.add_gossip_stream(topic);
-
- let test_work = async move {
- let chunk_listener = service.fetch_erasure_chunk(
- &candidate_hash,
- expected_index,
- );
-
- // spawn an abortable handle to the chunk listener future.
- // we will wait until this future has proceeded enough to start grabbing
- // messages from gossip, and then we will abort the future.
- let (chunk_listener, abort_handle) = future::abortable(chunk_listener);
- let handle = spawner.spawn_with_handle(chunk_listener).unwrap();
- gossip_taken_rx.await.unwrap();
-
- // gossip listener was taken. and is active.
- assert!(!gossip.contains_listener(&topic));
- assert!(!gossip_tx.is_closed());
-
- abort_handle.abort();
-
- // we must `await` this, otherwise context may never transfer over
- // to the spawned `Abortable` future.
- assert!(handle.await.is_err());
- loop {
- // if dropping the sender leads to the gossip listener
- // being cleaned up, we will eventually be unable to send a message
- // on the sender.
- if gossip_tx.is_closed() { break }
-
- let fake_chunk = GossipMessage::ErasureChunk(
- crate::legacy::gossip::ErasureChunkMessage {
- chunk: ErasureChunk {
- chunk: vec![],
- index: expected_index + 1,
- proof: vec![],
- },
- candidate_hash,
- }
- ).encode();
-
- match gossip_tx.send(TopicNotification { message: fake_chunk, sender: None }).await {
- Err(e) => { assert!(e.is_disconnected()); break },
- Ok(_) => continue,
- }
- }
- };
-
- pool.run_until(test_work);
-}
-
#[test]
fn fetches_pov_block_from_gossip() {
let (service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
diff --git a/network/test/Cargo.toml b/network/test/Cargo.toml
index 4cd342aef04d806dfcac38063af28ee1c0e7fcd9..d59123fad98e0871e856bdb432df8195cefd735f 100644
--- a/network/test/Cargo.toml
+++ b/network/test/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "polkadot-network-test"
-version = "0.8.11"
+version = "0.8.22"
license = "GPL-3.0"
authors = ["Parity Technologies "]
edition = "2018"
diff --git a/network/test/src/block_import.rs b/network/test/src/block_import.rs
index ce4c9f8ba0b6725e3ca6561c5029945ee2040cab..4ecc38a537f67b9f11c77d4988fa7fd7b034a716 100644
--- a/network/test/src/block_import.rs
+++ b/network/test/src/block_import.rs
@@ -29,7 +29,7 @@ fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock)
let mut client = polkadot_test_runtime_client::new();
let mut builder = client.new_block(Default::default()).unwrap();
- let extrinsics = polkadot_test_runtime_client::needed_extrinsics(vec![]);
+ let extrinsics = polkadot_test_runtime_client::needed_extrinsics(0);
for extrinsic in &extrinsics {
builder.push(extrinsic.clone()).unwrap();
@@ -60,7 +60,7 @@ fn import_single_good_block_works() {
let mut expected_aux = ImportedAux::default();
expected_aux.is_new_best = true;
- match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
+ match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org))
if *num == number as u32 && *aux == expected_aux && *org == Some(peer_id) => {}
r @ _ => panic!("{:?}", r)
@@ -70,7 +70,7 @@ fn import_single_good_block_works() {
#[test]
fn import_single_good_known_block_is_ignored() {
let (mut client, _hash, number, _, block) = prepare_good_block();
- match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
+ match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number as u32 => {}
_ => panic!()
}
@@ -80,7 +80,7 @@ fn import_single_good_known_block_is_ignored() {
fn import_single_good_block_without_header_fails() {
let (_, _, _, peer_id, mut block) = prepare_good_block();
block.header = None;
- match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
+ match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {}
_ => panic!()
}
@@ -88,10 +88,10 @@ fn import_single_good_block_without_header_fails() {
#[test]
fn async_import_queue_drops() {
- let executor = sp_core::testing::SpawnBlockingExecutor::new();
+ let executor = sp_core::testing::TaskExecutor::new();
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
- let verifier = PassThroughVerifier(true);
+ let verifier = PassThroughVerifier::new(true);
let queue = BasicQueue::new(
verifier,
diff --git a/network/test/src/lib.rs b/network/test/src/lib.rs
index c48bfabdfbaa74222b5b89452d3874c787d8145d..4978db5834c85e41b1b8c46bb156ac628e5a8ad9 100644
--- a/network/test/src/lib.rs
+++ b/network/test/src/lib.rs
@@ -573,7 +573,7 @@ pub trait TestNetFactory: Sized {
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
- &sp_core::testing::SpawnBlockingExecutor::new(),
+ &sp_core::testing::TaskExecutor::new(),
None,
));
@@ -600,7 +600,7 @@ pub trait TestNetFactory: Sized {
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
- block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
+ block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
}).unwrap();
@@ -650,7 +650,7 @@ pub trait TestNetFactory: Sized {
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
- &sp_core::testing::SpawnBlockingExecutor::new(),
+ &sp_core::testing::TaskExecutor::new(),
None,
));
@@ -677,7 +677,7 @@ pub trait TestNetFactory: Sized {
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
- block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
+ block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
}).unwrap();
@@ -758,13 +758,13 @@ pub trait TestNetFactory: Sized {
futures::executor::block_on(futures::future::poll_fn::<(), _>(|cx| self.poll_until_idle(cx)));
}
- /// Polls the testnet. Processes all the pending actions and returns `NotReady`.
+ /// Polls the testnet. Processes all the pending actions.
fn poll(&mut self, cx: &mut FutureContext) {
self.mut_peers(|peers| {
for peer in peers {
trace!(target: "sync", "-- Polling {}", peer.id());
- if let Poll::Ready(res) = Pin::new(&mut peer.network).poll(cx) {
- res.unwrap();
+ if let Poll::Ready(()) = peer.network.poll_unpin(cx) {
+ panic!("NetworkWorker has terminated unexpectedly.")
}
trace!(target: "sync", "-- Polling complete {}", peer.id());
@@ -804,7 +804,7 @@ impl TestNetFactory for TestNet {
fn make_verifier(&self, _client: PeersClient, _config: &ProtocolConfig, _peer_data: &())
-> Self::Verifier
{
- PassThroughVerifier(false)
+ PassThroughVerifier::new(false)
}
fn peer(&mut self, i: usize) -> &mut Peer<()> {
diff --git a/node/collation-generation/Cargo.toml b/node/collation-generation/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..f7d5e7f162efc0c8230cd8d6714035a97b2fc33c
--- /dev/null
+++ b/node/collation-generation/Cargo.toml
@@ -0,0 +1,19 @@
+[package]
+name = "polkadot-node-collation-generation"
+version = "0.1.0"
+authors = ["Parity Technologies "]
+edition = "2018"
+
+[dependencies]
+derive_more = "0.99.9"
+futures = "0.3.5"
+log = "0.4.8"
+polkadot-erasure-coding = { path = "../../erasure-coding" }
+polkadot-node-primitives = { path = "../primitives" }
+polkadot-node-subsystem = { path = "../subsystem" }
+polkadot-node-subsystem-util = { path = "../subsystem-util" }
+polkadot-primitives = { path = "../../primitives" }
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+
+[dev-dependencies]
+polkadot-node-subsystem-test-helpers = { path = "../subsystem-test-helpers" }
diff --git a/node/collation-generation/src/lib.rs b/node/collation-generation/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..c2f6f9bc2ceb74ce21de9648be0ebf2768f63992
--- /dev/null
+++ b/node/collation-generation/src/lib.rs
@@ -0,0 +1,672 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! The collation generation subsystem is the interface between polkadot and the collators.
+
+#![deny(missing_docs)]
+
+use futures::{
+ channel::{mpsc, oneshot},
+ future::FutureExt,
+ join,
+ select,
+ sink::SinkExt,
+ stream::StreamExt,
+};
+use polkadot_node_primitives::CollationGenerationConfig;
+use polkadot_node_subsystem::{
+ errors::RuntimeApiError,
+ messages::{AllMessages, CollationGenerationMessage, CollatorProtocolMessage},
+ FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError, SubsystemResult,
+ metrics::{self, prometheus},
+};
+use polkadot_node_subsystem_util::{
+ self as util, request_availability_cores_ctx, request_full_validation_data_ctx,
+ request_validators_ctx,
+};
+use polkadot_primitives::v1::{
+ collator_signature_payload, AvailableData, CandidateCommitments,
+ CandidateDescriptor, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption,
+ PersistedValidationData, PoV,
+};
+use sp_core::crypto::Pair;
+use std::sync::Arc;
+
+/// Collation Generation Subsystem
+pub struct CollationGenerationSubsystem {
+ config: Option>,
+ metrics: Metrics,
+}
+
+impl CollationGenerationSubsystem {
+ /// Create a new instance of the `CollationGenerationSubsystem`.
+ pub fn new(metrics: Metrics) -> Self {
+ Self {
+ config: None,
+ metrics,
+ }
+ }
+
+ /// Run this subsystem
+ ///
+ /// Conceptually, this is very simple: it just loops forever.
+ ///
+ /// - On incoming overseer messages, it starts or stops jobs as appropriate.
+ /// - On other incoming messages, if they can be converted into Job::ToJob and
+ /// include a hash, then they're forwarded to the appropriate individual job.
+ /// - On outgoing messages from the jobs, it forwards them to the overseer.
+ ///
+ /// If `err_tx` is not `None`, errors are forwarded onto that channel as they occur.
+ /// Otherwise, most are logged and then discarded.
+ async fn run(mut self, mut ctx: Context)
+ where
+ Context: SubsystemContext,
+ {
+ // when we activate new leaves, we spawn a bunch of sub-tasks, each of which is
+ // expected to generate precisely one message. We don't want to block the main loop
+ // at any point waiting for them all, so instead, we create a channel on which they can
+ // send those messages. We can then just monitor the channel and forward messages on it
+ // to the overseer here, via the context.
+ let (sender, mut receiver) = mpsc::channel(0);
+
+ loop {
+ select! {
+ incoming = ctx.recv().fuse() => {
+ if self.handle_incoming::(incoming, &mut ctx, &sender).await {
+ break;
+ }
+ },
+ msg = receiver.next().fuse() => {
+ if let Some(msg) = msg {
+ if let Err(err) = ctx.send_message(msg).await {
+ log::warn!(target: "collation_generation", "failed to forward message to overseer: {:?}", err);
+ break;
+ }
+ }
+ },
+ }
+ }
+ }
+
+ // handle an incoming message. return true if we should break afterwards.
+ // note: this doesn't strictly need to be a separate function; it's more an administrative function
+ // so that we don't clutter the run loop. It could in principle be inlined directly into there.
+ // it should hopefully therefore be ok that it's an async function mutably borrowing self.
+ async fn handle_incoming(
+ &mut self,
+ incoming: SubsystemResult>,
+ ctx: &mut Context,
+ sender: &mpsc::Sender,
+ ) -> bool
+ where
+ Context: SubsystemContext,
+ {
+ use polkadot_node_subsystem::ActiveLeavesUpdate;
+ use polkadot_node_subsystem::FromOverseer::{Communication, Signal};
+ use polkadot_node_subsystem::OverseerSignal::{ActiveLeaves, BlockFinalized, Conclude};
+
+ match incoming {
+ Ok(Signal(ActiveLeaves(ActiveLeavesUpdate { activated, .. }))) => {
+ // follow the procedure from the guide
+ if let Some(config) = &self.config {
+ let metrics = self.metrics.clone();
+ if let Err(err) =
+ handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await
+ {
+ log::warn!(target: "collation_generation", "failed to handle new activations: {:?}", err);
+ return true;
+ };
+ }
+ false
+ }
+ Ok(Signal(Conclude)) => true,
+ Ok(Communication {
+ msg: CollationGenerationMessage::Initialize(config),
+ }) => {
+ if self.config.is_some() {
+ log::warn!(target: "collation_generation", "double initialization");
+ true
+ } else {
+ self.config = Some(Arc::new(config));
+ false
+ }
+ }
+ Ok(Signal(BlockFinalized(_))) => false,
+ Err(err) => {
+ log::error!(target: "collation_generation", "error receiving message from subsystem context: {:?}", err);
+ true
+ }
+ }
+ }
+}
+
+impl Subsystem for CollationGenerationSubsystem
+where
+ Context: SubsystemContext,
+{
+ type Metrics = Metrics;
+
+ fn start(self, ctx: Context) -> SpawnedSubsystem {
+ let future = Box::pin(self.run(ctx));
+
+ SpawnedSubsystem {
+ name: "collation-generation-subsystem",
+ future,
+ }
+ }
+}
+
+#[derive(Debug, derive_more::From)]
+enum Error {
+ #[from]
+ Subsystem(SubsystemError),
+ #[from]
+ OneshotRecv(oneshot::Canceled),
+ #[from]
+ Runtime(RuntimeApiError),
+ #[from]
+ Util(util::Error),
+ #[from]
+ Erasure(polkadot_erasure_coding::Error),
+}
+
+type Result = std::result::Result;
+
+async fn handle_new_activations(
+ config: Arc,
+ activated: &[Hash],
+ ctx: &mut Context,
+ metrics: Metrics,
+ sender: &mpsc::Sender,
+) -> Result<()> {
+ // follow the procedure from the guide:
+ // https://w3f.github.io/parachain-implementers-guide/node/collators/collation-generation.html
+
+ for relay_parent in activated.iter().copied() {
+ // double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and
+ // returns a receiver. The second layer of requests actually polls those receivers to completion.
+ let (availability_cores, validators) = join!(
+ request_availability_cores_ctx(relay_parent, ctx).await?,
+ request_validators_ctx(relay_parent, ctx).await?,
+ );
+
+ let availability_cores = availability_cores??;
+ let n_validators = validators??.len();
+
+ for core in availability_cores {
+ let (scheduled_core, assumption) = match core {
+ CoreState::Scheduled(scheduled_core) => {
+ (scheduled_core, OccupiedCoreAssumption::Free)
+ }
+ CoreState::Occupied(_occupied_core) => {
+ // TODO: https://github.com/paritytech/polkadot/issues/1573
+ continue;
+ }
+ _ => continue,
+ };
+
+ if scheduled_core.para_id != config.para_id {
+ continue;
+ }
+
+ // we get validation data synchronously for each core instead of
+ // within the subtask loop, because we have only a single mutable handle to the
+ // context, so the work can't really be distributed
+ let validation_data = match request_full_validation_data_ctx(
+ relay_parent,
+ scheduled_core.para_id,
+ assumption,
+ ctx,
+ )
+ .await?
+ .await??
+ {
+ Some(v) => v,
+ None => continue,
+ };
+
+ let task_config = config.clone();
+ let mut task_sender = sender.clone();
+ let metrics = metrics.clone();
+ ctx.spawn("collation generation collation builder", Box::pin(async move {
+ let persisted_validation_data_hash = validation_data.persisted.hash();
+
+ let collation = (task_config.collator)(&validation_data).await;
+
+ let pov_hash = collation.proof_of_validity.hash();
+
+ let signature_payload = collator_signature_payload(
+ &relay_parent,
+ &scheduled_core.para_id,
+ &persisted_validation_data_hash,
+ &pov_hash,
+ );
+
+ let erasure_root = match erasure_root(
+ n_validators,
+ validation_data.persisted,
+ collation.proof_of_validity.clone(),
+ ) {
+ Ok(erasure_root) => erasure_root,
+ Err(err) => {
+ log::error!(target: "collation_generation", "failed to calculate erasure root for para_id {}: {:?}", scheduled_core.para_id, err);
+ return
+ }
+ };
+
+ let commitments = CandidateCommitments {
+ fees: collation.fees,
+ upward_messages: collation.upward_messages,
+ new_validation_code: collation.new_validation_code,
+ head_data: collation.head_data,
+ erasure_root,
+ };
+
+ let ccr = CandidateReceipt {
+ commitments_hash: commitments.hash(),
+ descriptor: CandidateDescriptor {
+ signature: task_config.key.sign(&signature_payload),
+ para_id: scheduled_core.para_id,
+ relay_parent,
+ collator: task_config.key.public(),
+ persisted_validation_data_hash,
+ pov_hash,
+ },
+ };
+
+ metrics.on_collation_generated();
+
+ if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
+ CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity)
+ )).await {
+ log::warn!(target: "collation_generation", "failed to send collation result for para_id {}: {:?}", scheduled_core.para_id, err);
+ }
+ })).await?;
+ }
+ }
+
+ Ok(())
+}
+
+fn erasure_root(
+ n_validators: usize,
+ persisted_validation: PersistedValidationData,
+ pov: PoV,
+) -> Result {
+ let available_data = AvailableData {
+ validation_data: persisted_validation,
+ pov,
+ };
+
+ let chunks = polkadot_erasure_coding::obtain_chunks_v1(n_validators, &available_data)?;
+ Ok(polkadot_erasure_coding::branches(&chunks).root())
+}
+
+#[derive(Clone)]
+struct MetricsInner {
+ collations_generated_total: prometheus::Counter,
+}
+
+/// CollationGenerationSubsystem metrics.
+#[derive(Default, Clone)]
+pub struct Metrics(Option);
+
+impl Metrics {
+ fn on_collation_generated(&self) {
+ if let Some(metrics) = &self.0 {
+ metrics.collations_generated_total.inc();
+ }
+ }
+}
+
+impl metrics::Metrics for Metrics {
+ fn try_register(registry: &prometheus::Registry) -> std::result::Result {
+ let metrics = MetricsInner {
+ collations_generated_total: prometheus::register(
+ prometheus::Counter::new(
+ "parachain_collations_generated_total",
+ "Number of collations generated."
+ )?,
+ registry,
+ )?,
+ };
+ Ok(Metrics(Some(metrics)))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ mod handle_new_activations {
+ use super::super::*;
+ use futures::{
+ lock::Mutex,
+ task::{Context as FuturesContext, Poll},
+ Future,
+ };
+ use polkadot_node_primitives::Collation;
+ use polkadot_node_subsystem::messages::{
+ AllMessages, RuntimeApiMessage, RuntimeApiRequest,
+ };
+ use polkadot_node_subsystem_test_helpers::{
+ subsystem_test_harness, TestSubsystemContextHandle,
+ };
+ use polkadot_primitives::v1::{
+ BlockData, BlockNumber, CollatorPair, Id as ParaId,
+ PersistedValidationData, PoV, ScheduledCore, ValidationData,
+ };
+ use std::pin::Pin;
+
+ fn test_collation() -> Collation {
+ Collation {
+ fees: Default::default(),
+ upward_messages: Default::default(),
+ new_validation_code: Default::default(),
+ head_data: Default::default(),
+ proof_of_validity: PoV {
+ block_data: BlockData(Vec::new()),
+ },
+ }
+ }
+
+ // Box + Unpin + Send
+ struct TestCollator;
+
+ impl Future for TestCollator {
+ type Output = Collation;
+
+ fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll {
+ Poll::Ready(test_collation())
+ }
+ }
+
+ impl Unpin for TestCollator {}
+
+ fn test_config>(para_id: Id) -> Arc {
+ Arc::new(CollationGenerationConfig {
+ key: CollatorPair::generate().0,
+ collator: Box::new(|_vd: &ValidationData| {
+ Box::new(TestCollator)
+ }),
+ para_id: para_id.into(),
+ })
+ }
+
+ fn scheduled_core_for>(para_id: Id) -> ScheduledCore {
+ ScheduledCore {
+ para_id: para_id.into(),
+ collator: None,
+ }
+ }
+
+ #[test]
+ fn requests_availability_per_relay_parent() {
+ let activated_hashes: Vec = vec![
+ [1; 32].into(),
+ [4; 32].into(),
+ [9; 32].into(),
+ [16; 32].into(),
+ ];
+
+ let requested_availability_cores = Arc::new(Mutex::new(Vec::new()));
+
+ let overseer_requested_availability_cores = requested_availability_cores.clone();
+ let overseer = |mut handle: TestSubsystemContextHandle| async move {
+ loop {
+ match handle.try_recv().await {
+ None => break,
+ Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, RuntimeApiRequest::AvailabilityCores(tx)))) => {
+ overseer_requested_availability_cores.lock().await.push(hash);
+ tx.send(Ok(vec![])).unwrap();
+ }
+ Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(_hash, RuntimeApiRequest::Validators(tx)))) => {
+ tx.send(Ok(vec![Default::default(); 3])).unwrap();
+ }
+ Some(msg) => panic!("didn't expect any other overseer requests given no availability cores; got {:?}", msg),
+ }
+ }
+ };
+
+ let (tx, _rx) = mpsc::channel(0);
+
+ let subsystem_activated_hashes = activated_hashes.clone();
+ subsystem_test_harness(overseer, |mut ctx| async move {
+ handle_new_activations(
+ test_config(123u32),
+ &subsystem_activated_hashes,
+ &mut ctx,
+ Metrics(None),
+ &tx,
+ )
+ .await
+ .unwrap();
+ });
+
+ let mut requested_availability_cores = Arc::try_unwrap(requested_availability_cores)
+ .expect("overseer should have shut down by now")
+ .into_inner();
+ requested_availability_cores.sort();
+
+ assert_eq!(requested_availability_cores, activated_hashes);
+ }
+
+ #[test]
+ fn requests_validation_data_for_scheduled_matches() {
+ let activated_hashes: Vec = vec![
+ Hash::repeat_byte(1),
+ Hash::repeat_byte(4),
+ Hash::repeat_byte(9),
+ Hash::repeat_byte(16),
+ ];
+
+ let requested_full_validation_data = Arc::new(Mutex::new(Vec::new()));
+
+ let overseer_requested_full_validation_data = requested_full_validation_data.clone();
+ let overseer = |mut handle: TestSubsystemContextHandle| async move {
+ loop {
+ match handle.try_recv().await {
+ None => break,
+ Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ hash,
+ RuntimeApiRequest::AvailabilityCores(tx),
+ ))) => {
+ tx.send(Ok(vec![
+ CoreState::Free,
+ // this is weird, see explanation below
+ CoreState::Scheduled(scheduled_core_for(
+ (hash.as_fixed_bytes()[0] * 4) as u32,
+ )),
+ CoreState::Scheduled(scheduled_core_for(
+ (hash.as_fixed_bytes()[0] * 5) as u32,
+ )),
+ ]))
+ .unwrap();
+ }
+ Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ hash,
+ RuntimeApiRequest::FullValidationData(
+ _para_id,
+ _occupied_core_assumption,
+ tx,
+ ),
+ ))) => {
+ overseer_requested_full_validation_data
+ .lock()
+ .await
+ .push(hash);
+ tx.send(Ok(Default::default())).unwrap();
+ }
+ Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ _hash,
+ RuntimeApiRequest::Validators(tx),
+ ))) => {
+ tx.send(Ok(vec![Default::default(); 3])).unwrap();
+ }
+ Some(msg) => {
+ panic!("didn't expect any other overseer requests; got {:?}", msg)
+ }
+ }
+ }
+ };
+
+ let (tx, _rx) = mpsc::channel(0);
+
+ subsystem_test_harness(overseer, |mut ctx| async move {
+ handle_new_activations(test_config(16), &activated_hashes, &mut ctx, Metrics(None), &tx)
+ .await
+ .unwrap();
+ });
+
+ let requested_full_validation_data = Arc::try_unwrap(requested_full_validation_data)
+ .expect("overseer should have shut down by now")
+ .into_inner();
+
+ // the only activated hash should be from the 4 hash:
+ // each activated hash generates two scheduled cores: one with its value * 4, one with its value * 5
+ // given that the test configuration has a para_id of 16, there's only one way to get that value: with the 4
+ // hash.
+ assert_eq!(requested_full_validation_data, vec![[4; 32].into()]);
+ }
+
+ #[test]
+ fn sends_distribute_collation_message() {
+ let activated_hashes: Vec = vec![
+ Hash::repeat_byte(1),
+ Hash::repeat_byte(4),
+ Hash::repeat_byte(9),
+ Hash::repeat_byte(16),
+ ];
+
+ let overseer = |mut handle: TestSubsystemContextHandle| async move {
+ loop {
+ match handle.try_recv().await {
+ None => break,
+ Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ hash,
+ RuntimeApiRequest::AvailabilityCores(tx),
+ ))) => {
+ tx.send(Ok(vec![
+ CoreState::Free,
+ // this is weird, see explanation below
+ CoreState::Scheduled(scheduled_core_for(
+ (hash.as_fixed_bytes()[0] * 4) as u32,
+ )),
+ CoreState::Scheduled(scheduled_core_for(
+ (hash.as_fixed_bytes()[0] * 5) as u32,
+ )),
+ ]))
+ .unwrap();
+ }
+ Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ _hash,
+ RuntimeApiRequest::FullValidationData(
+ _para_id,
+ _occupied_core_assumption,
+ tx,
+ ),
+ ))) => {
+ tx.send(Ok(Some(Default::default()))).unwrap();
+ }
+ Some(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
+ _hash,
+ RuntimeApiRequest::Validators(tx),
+ ))) => {
+ tx.send(Ok(vec![Default::default(); 3])).unwrap();
+ }
+ Some(msg) => {
+ panic!("didn't expect any other overseer requests; got {:?}", msg)
+ }
+ }
+ }
+ };
+
+ let config = test_config(16);
+ let subsystem_config = config.clone();
+
+ let (tx, rx) = mpsc::channel(0);
+
+ // empty vec doesn't allocate on the heap, so it's ok we throw it away
+ let sent_messages = Arc::new(Mutex::new(Vec::new()));
+ let subsystem_sent_messages = sent_messages.clone();
+ subsystem_test_harness(overseer, |mut ctx| async move {
+ handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, Metrics(None), &tx)
+ .await
+ .unwrap();
+
+ std::mem::drop(tx);
+
+ // collect all sent messages
+ *subsystem_sent_messages.lock().await = rx.collect().await;
+ });
+
+ let sent_messages = Arc::try_unwrap(sent_messages)
+ .expect("subsystem should have shut down by now")
+ .into_inner();
+
+ // we expect a single message to be sent, containing a candidate receipt.
+ // we don't care too much about the commitments_hash right now, but let's ensure that we've calculated the
+ // correct descriptor
+ let expect_pov_hash = test_collation().proof_of_validity.hash();
+ let expect_validation_data_hash
+ = PersistedValidationData::::default().hash();
+ let expect_relay_parent = Hash::repeat_byte(4);
+ let expect_payload = collator_signature_payload(
+ &expect_relay_parent,
+ &config.para_id,
+ &expect_validation_data_hash,
+ &expect_pov_hash,
+ );
+ let expect_descriptor = CandidateDescriptor {
+ signature: config.key.sign(&expect_payload),
+ para_id: config.para_id,
+ relay_parent: expect_relay_parent,
+ collator: config.key.public(),
+ persisted_validation_data_hash: expect_validation_data_hash,
+ pov_hash: expect_pov_hash,
+ };
+
+ assert_eq!(sent_messages.len(), 1);
+ match &sent_messages[0] {
+ AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
+ CandidateReceipt { descriptor, .. },
+ _pov,
+ )) => {
+ // signature generation is non-deterministic, so we can't just assert that the
+ // expected descriptor is correct. What we can do is validate that the produced
+ // descriptor has a valid signature, then just copy in the generated signature
+ // and check the rest of the fields for equality.
+ assert!(CollatorPair::verify(
+ &descriptor.signature,
+ &collator_signature_payload(
+ &descriptor.relay_parent,
+ &descriptor.para_id,
+ &descriptor.persisted_validation_data_hash,
+ &descriptor.pov_hash,
+ )
+ .as_ref(),
+ &descriptor.collator,
+ ));
+ let expect_descriptor = {
+ let mut expect_descriptor = expect_descriptor;
+ expect_descriptor.signature = descriptor.signature.clone();
+ expect_descriptor
+ };
+ assert_eq!(descriptor, &expect_descriptor);
+ }
+ _ => panic!("received wrong message type"),
+ }
+ }
+ }
+}
diff --git a/node/core/README.md b/node/core/README.md
index a53faa966a73869690b3bff4df2d5f100560d456..1656bb569fe404634cacc09a449f7ed89a045d0e 100644
--- a/node/core/README.md
+++ b/node/core/README.md
@@ -1 +1 @@
-Stub - This folder will hold core subsystem implementations, each with their own crate.
+This folder contains core subsystems, each with their own crate.
diff --git a/node/core/av-store/Cargo.toml b/node/core/av-store/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..68137b6ae77dd45db34357f973fb6adb9b0077fe
--- /dev/null
+++ b/node/core/av-store/Cargo.toml
@@ -0,0 +1,24 @@
+[package]
+name = "polkadot-node-core-av-store"
+version = "0.1.0"
+authors = ["Parity Technologies "]
+edition = "2018"
+
+[dependencies]
+futures = "0.3.5"
+polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
+polkadot-overseer = { path = "../../overseer" }
+polkadot-primitives = { path = "../../../primitives" }
+erasure = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
+kvdb = "0.7.0"
+kvdb-rocksdb = "0.9.0"
+codec = { package = "parity-scale-codec", version = "1.3.1", features = ["derive"] }
+log = "0.4.8"
+derive_more = "0.99.9"
+
+[dev-dependencies]
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+futures = { version = "0.3.5", features = ["thread-pool"] }
+polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
+kvdb-memorydb = "0.7.0"
+assert_matches = "1.3.0"
diff --git a/node/core/av-store/src/lib.rs b/node/core/av-store/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..5837377dd5f975bb8e6e75fec1115fe78c1dbd68
--- /dev/null
+++ b/node/core/av-store/src/lib.rs
@@ -0,0 +1,568 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Implements a `AvailabilityStoreSubsystem`.
+
+#![recursion_limit="256"]
+#![warn(missing_docs)]
+
+use std::collections::HashMap;
+use std::io;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use codec::{Encode, Decode};
+use futures::{select, channel::oneshot, FutureExt};
+use kvdb_rocksdb::{Database, DatabaseConfig};
+use kvdb::{KeyValueDB, DBTransaction};
+
+use polkadot_primitives::v1::{
+ Hash, AvailableData, ErasureChunk, ValidatorIndex,
+};
+use polkadot_subsystem::{
+ FromOverseer, SubsystemError, Subsystem, SubsystemContext, SpawnedSubsystem,
+ metrics::{self, prometheus},
+};
+use polkadot_subsystem::messages::AvailabilityStoreMessage;
+
+const LOG_TARGET: &str = "availability";
+
+mod columns {
+ pub const DATA: u32 = 0;
+ pub const NUM_COLUMNS: u32 = 1;
+}
+
+#[derive(Debug, derive_more::From)]
+enum Error {
+ #[from]
+ Erasure(erasure::Error),
+ #[from]
+ Io(io::Error),
+ #[from]
+ Oneshot(oneshot::Canceled),
+ #[from]
+ Subsystem(SubsystemError),
+}
+
+/// An implementation of the Availability Store subsystem.
+pub struct AvailabilityStoreSubsystem {
+ inner: Arc,
+ metrics: Metrics,
+}
+
+fn available_data_key(candidate_hash: &Hash) -> Vec {
+ (candidate_hash, 0i8).encode()
+}
+
+fn erasure_chunk_key(candidate_hash: &Hash, index: u32) -> Vec {
+ (candidate_hash, index, 0i8).encode()
+}
+
+#[derive(Encode, Decode)]
+struct StoredAvailableData {
+ data: AvailableData,
+ n_validators: u32,
+}
+
+/// Configuration for the availability store.
+pub struct Config {
+ /// Total cache size in megabytes. If `None` the default (128 MiB per column) is used.
+ pub cache_size: Option,
+ /// Path to the database.
+ pub path: PathBuf,
+}
+
+impl AvailabilityStoreSubsystem {
+ /// Create a new `AvailabilityStoreSubsystem` with a given config on disk.
+ pub fn new_on_disk(config: Config, metrics: Metrics) -> io::Result {
+ let mut db_config = DatabaseConfig::with_columns(columns::NUM_COLUMNS);
+
+ if let Some(cache_size) = config.cache_size {
+ let mut memory_budget = HashMap::new();
+
+ for i in 0..columns::NUM_COLUMNS {
+ memory_budget.insert(i, cache_size / columns::NUM_COLUMNS as usize);
+ }
+ db_config.memory_budget = memory_budget;
+ }
+
+ let path = config.path.to_str().ok_or_else(|| io::Error::new(
+ io::ErrorKind::Other,
+ format!("Bad database path: {:?}", config.path),
+ ))?;
+
+ let db = Database::open(&db_config, &path)?;
+
+ Ok(Self {
+ inner: Arc::new(db),
+ metrics,
+ })
+ }
+
+ #[cfg(test)]
+ fn new_in_memory(inner: Arc) -> Self {
+ Self {
+ inner,
+ metrics: Metrics(None),
+ }
+ }
+}
+
+async fn run(subsystem: AvailabilityStoreSubsystem, mut ctx: Context)
+ -> Result<(), Error>
+where
+ Context: SubsystemContext,
+{
+ let ctx = &mut ctx;
+ loop {
+ select! {
+ incoming = ctx.recv().fuse() => {
+ match incoming {
+ Ok(FromOverseer::Signal(Conclude)) => break,
+ Ok(FromOverseer::Signal(_)) => (),
+ Ok(FromOverseer::Communication { msg }) => {
+ process_message(&subsystem.inner, &subsystem.metrics, msg)?;
+ }
+ Err(_) => break,
+ }
+ }
+ complete => break,
+ }
+ }
+
+ Ok(())
+}
+
+fn process_message(db: &Arc, metrics: &Metrics, msg: AvailabilityStoreMessage) -> Result<(), Error> {
+ use AvailabilityStoreMessage::*;
+ match msg {
+ QueryAvailableData(hash, tx) => {
+ tx.send(available_data(db, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?;
+ }
+ QueryDataAvailability(hash, tx) => {
+ tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?;
+ }
+ QueryChunk(hash, id, tx) => {
+ tx.send(get_chunk(db, &hash, id, metrics)?).map_err(|_| oneshot::Canceled)?;
+ }
+ QueryChunkAvailability(hash, id, tx) => {
+ tx.send(get_chunk(db, &hash, id, metrics)?.is_some()).map_err(|_| oneshot::Canceled)?;
+ }
+ StoreChunk(hash, id, chunk, tx) => {
+ match store_chunk(db, &hash, id, chunk) {
+ Err(e) => {
+ tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
+ return Err(e);
+ }
+ Ok(()) => {
+ tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
+ }
+ }
+ }
+ StoreAvailableData(hash, id, n_validators, av_data, tx) => {
+ match store_available_data(db, &hash, id, n_validators, av_data, metrics) {
+ Err(e) => {
+ tx.send(Err(())).map_err(|_| oneshot::Canceled)?;
+ return Err(e);
+ }
+ Ok(()) => {
+ tx.send(Ok(())).map_err(|_| oneshot::Canceled)?;
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
+
+fn available_data(db: &Arc, candidate_hash: &Hash) -> Option {
+ query_inner(db, columns::DATA, &available_data_key(candidate_hash))
+}
+
+fn store_available_data(
+ db: &Arc,
+ candidate_hash: &Hash,
+ id: Option,
+ n_validators: u32,
+ available_data: AvailableData,
+ metrics: &Metrics,
+) -> Result<(), Error> {
+ let mut tx = DBTransaction::new();
+
+ if let Some(index) = id {
+ let chunks = get_chunks(&available_data, n_validators as usize, metrics)?;
+ store_chunk(db, candidate_hash, n_validators, chunks[index as usize].clone())?;
+ }
+
+ let stored_data = StoredAvailableData {
+ data: available_data,
+ n_validators,
+ };
+
+ tx.put_vec(
+ columns::DATA,
+ available_data_key(&candidate_hash).as_slice(),
+ stored_data.encode(),
+ );
+
+ db.write(tx)?;
+
+ Ok(())
+}
+
+fn store_chunk(db: &Arc, candidate_hash: &Hash, _n_validators: u32, chunk: ErasureChunk)
+ -> Result<(), Error>
+{
+ let mut tx = DBTransaction::new();
+
+ let dbkey = erasure_chunk_key(candidate_hash, chunk.index);
+
+ tx.put_vec(columns::DATA, &dbkey, chunk.encode());
+ db.write(tx)?;
+
+ Ok(())
+}
+
+fn get_chunk(db: &Arc, candidate_hash: &Hash, index: u32, metrics: &Metrics)
+ -> Result, Error>
+{
+ if let Some(chunk) = query_inner(
+ db,
+ columns::DATA,
+ &erasure_chunk_key(candidate_hash, index)) {
+ return Ok(Some(chunk));
+ }
+
+ if let Some(data) = available_data(db, candidate_hash) {
+ let mut chunks = get_chunks(&data.data, data.n_validators as usize, metrics)?;
+ let desired_chunk = chunks.get(index as usize).cloned();
+ for chunk in chunks.drain(..) {
+ store_chunk(db, candidate_hash, data.n_validators, chunk)?;
+ }
+ return Ok(desired_chunk);
+ }
+
+ Ok(None)
+}
+
+fn query_inner(db: &Arc, column: u32, key: &[u8]) -> Option {
+ match db.get(column, key) {
+ Ok(Some(raw)) => {
+ let res = D::decode(&mut &raw[..]).expect("all stored data serialized correctly; qed");
+ Some(res)
+ }
+ Ok(None) => None,
+ Err(e) => {
+ log::warn!(target: LOG_TARGET, "Error reading from the availability store: {:?}", e);
+ None
+ }
+ }
+}
+
+impl Subsystem for AvailabilityStoreSubsystem
+ where
+ Context: SubsystemContext,
+{
+ type Metrics = Metrics;
+
+ fn start(self, ctx: Context) -> SpawnedSubsystem {
+ let future = Box::pin(async move {
+ if let Err(e) = run(self, ctx).await {
+ log::error!(target: "availabilitystore", "Subsystem exited with an error {:?}", e);
+ }
+ });
+
+ SpawnedSubsystem {
+ name: "availability-store-subsystem",
+ future,
+ }
+ }
+}
+
+fn get_chunks(data: &AvailableData, n_validators: usize, metrics: &Metrics) -> Result, Error> {
+ let chunks = erasure::obtain_chunks_v1(n_validators, data)?;
+ metrics.on_chunks_received(chunks.len());
+ let branches = erasure::branches(chunks.as_ref());
+
+ Ok(chunks
+ .iter()
+ .zip(branches.map(|(proof, _)| proof))
+ .enumerate()
+ .map(|(index, (chunk, proof))| ErasureChunk {
+ chunk: chunk.clone(),
+ proof,
+ index: index as u32,
+ })
+ .collect()
+ )
+}
+
+#[derive(Clone)]
+struct MetricsInner {
+ received_availability_chunks_total: prometheus::Counter,
+}
+
+/// Availability metrics.
+#[derive(Default, Clone)]
+pub struct Metrics(Option);
+
+impl Metrics {
+ fn on_chunks_received(&self, count: usize) {
+ if let Some(metrics) = &self.0 {
+ use core::convert::TryFrom as _;
+ // assume usize fits into u64
+ let by = u64::try_from(count).unwrap_or_default();
+ metrics.received_availability_chunks_total.inc_by(by);
+ }
+ }
+}
+
+impl metrics::Metrics for Metrics {
+ fn try_register(registry: &prometheus::Registry) -> Result {
+ let metrics = MetricsInner {
+ received_availability_chunks_total: prometheus::register(
+ prometheus::Counter::new(
+ "parachain_received_availability_chunks_total",
+ "Number of availability chunks received.",
+ )?,
+ registry,
+ )?,
+ };
+ Ok(Metrics(Some(metrics)))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use futures::{
+ future,
+ channel::oneshot,
+ executor,
+ Future,
+ };
+ use std::cell::RefCell;
+ use polkadot_primitives::v1::{
+ AvailableData, BlockData, HeadData, PersistedValidationData, PoV,
+ };
+ use polkadot_node_subsystem_test_helpers as test_helpers;
+
+ struct TestHarness {
+ virtual_overseer: test_helpers::TestSubsystemContextHandle,
+ }
+
+ thread_local! {
+ static TIME_NOW: RefCell> = RefCell::new(None);
+ }
+
+ struct TestState {
+ persisted_validation_data: PersistedValidationData,
+ }
+
+ impl Default for TestState {
+ fn default() -> Self {
+
+ let persisted_validation_data = PersistedValidationData {
+ parent_head: HeadData(vec![7, 8, 9]),
+ block_number: Default::default(),
+ hrmp_mqc_heads: Vec::new(),
+ };
+ Self {
+ persisted_validation_data,
+ }
+ }
+ }
+
+ fn test_harness>(
+ store: Arc,
+ test: impl FnOnce(TestHarness) -> T,
+ ) {
+ let pool = sp_core::testing::TaskExecutor::new();
+ let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
+
+ let subsystem = AvailabilityStoreSubsystem::new_in_memory(store);
+ let subsystem = run(subsystem, context);
+
+ let test_fut = test(TestHarness {
+ virtual_overseer,
+ });
+
+ futures::pin_mut!(test_fut);
+ futures::pin_mut!(subsystem);
+
+ executor::block_on(future::select(test_fut, subsystem));
+ }
+
+ #[test]
+ fn store_chunk_works() {
+ let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
+ test_harness(store.clone(), |test_harness| async move {
+ let TestHarness { mut virtual_overseer } = test_harness;
+ let relay_parent = Hash::from([1; 32]);
+ let validator_index = 5;
+
+ let chunk = ErasureChunk {
+ chunk: vec![1, 2, 3],
+ index: validator_index,
+ proof: vec![vec![3, 4, 5]],
+ };
+
+ let (tx, rx) = oneshot::channel();
+
+ let chunk_msg = AvailabilityStoreMessage::StoreChunk(
+ relay_parent,
+ validator_index,
+ chunk.clone(),
+ tx,
+ );
+
+ virtual_overseer.send(FromOverseer::Communication{ msg: chunk_msg }).await;
+ assert_eq!(rx.await.unwrap(), Ok(()));
+
+ let (tx, rx) = oneshot::channel();
+ let query_chunk = AvailabilityStoreMessage::QueryChunk(
+ relay_parent,
+ validator_index,
+ tx,
+ );
+
+ virtual_overseer.send(FromOverseer::Communication{ msg: query_chunk }).await;
+
+ assert_eq!(rx.await.unwrap().unwrap(), chunk);
+ });
+ }
+
+ #[test]
+ fn store_block_works() {
+ let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
+ let test_state = TestState::default();
+ test_harness(store.clone(), |test_harness| async move {
+ let TestHarness { mut virtual_overseer } = test_harness;
+ let candidate_hash = Hash::from([1; 32]);
+ let validator_index = 5;
+ let n_validators = 10;
+
+ let pov = PoV {
+ block_data: BlockData(vec![4, 5, 6]),
+ };
+
+ let available_data = AvailableData {
+ pov,
+ validation_data: test_state.persisted_validation_data,
+ };
+
+
+ let (tx, rx) = oneshot::channel();
+ let block_msg = AvailabilityStoreMessage::StoreAvailableData(
+ candidate_hash,
+ Some(validator_index),
+ n_validators,
+ available_data.clone(),
+ tx,
+ );
+
+ virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
+ assert_eq!(rx.await.unwrap(), Ok(()));
+
+ let pov = query_available_data(&mut virtual_overseer, candidate_hash).await.unwrap();
+ assert_eq!(pov, available_data);
+
+ let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap();
+
+ let chunks = erasure::obtain_chunks_v1(10, &available_data).unwrap();
+
+ let mut branches = erasure::branches(chunks.as_ref());
+
+ let branch = branches.nth(5).unwrap();
+ let expected_chunk = ErasureChunk {
+ chunk: branch.1.to_vec(),
+ index: 5,
+ proof: branch.0,
+ };
+
+ assert_eq!(chunk, expected_chunk);
+ });
+ }
+
+
+ #[test]
+ fn store_pov_and_query_chunk_works() {
+ let store = Arc::new(kvdb_memorydb::create(columns::NUM_COLUMNS));
+ let test_state = TestState::default();
+
+ test_harness(store.clone(), |test_harness| async move {
+ let TestHarness { mut virtual_overseer } = test_harness;
+ let candidate_hash = Hash::from([1; 32]);
+ let n_validators = 10;
+
+ let pov = PoV {
+ block_data: BlockData(vec![4, 5, 6]),
+ };
+
+ let available_data = AvailableData {
+ pov,
+ validation_data: test_state.persisted_validation_data,
+ };
+
+ let no_metrics = Metrics(None);
+ let chunks_expected = get_chunks(&available_data, n_validators as usize, &no_metrics).unwrap();
+
+ let (tx, rx) = oneshot::channel();
+ let block_msg = AvailabilityStoreMessage::StoreAvailableData(
+ candidate_hash,
+ None,
+ n_validators,
+ available_data,
+ tx,
+ );
+
+ virtual_overseer.send(FromOverseer::Communication{ msg: block_msg }).await;
+
+ assert_eq!(rx.await.unwrap(), Ok(()));
+
+ for validator_index in 0..n_validators {
+ let chunk = query_chunk(&mut virtual_overseer, candidate_hash, validator_index).await.unwrap();
+
+ assert_eq!(chunk, chunks_expected[validator_index as usize]);
+ }
+ });
+ }
+
+ async fn query_available_data(
+ virtual_overseer: &mut test_helpers::TestSubsystemContextHandle,
+ candidate_hash: Hash,
+ ) -> Option {
+ let (tx, rx) = oneshot::channel();
+
+ let query = AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx);
+ virtual_overseer.send(FromOverseer::Communication{ msg: query }).await;
+
+ rx.await.unwrap()
+ }
+
+ async fn query_chunk(
+ virtual_overseer: &mut test_helpers::TestSubsystemContextHandle,
+ candidate_hash: Hash,
+ index: u32,
+ ) -> Option {
+ let (tx, rx) = oneshot::channel();
+
+ let query = AvailabilityStoreMessage::QueryChunk(candidate_hash, index, tx);
+ virtual_overseer.send(FromOverseer::Communication{ msg: query }).await;
+
+ rx.await.unwrap()
+ }
+}
diff --git a/node/core/backing/Cargo.toml b/node/core/backing/Cargo.toml
new file mode 100644
index 0000000000000000000000000000000000000000..9401290f2b5b59b045fd98fe21c5c0dd377ee42c
--- /dev/null
+++ b/node/core/backing/Cargo.toml
@@ -0,0 +1,28 @@
+[package]
+name = "polkadot-node-core-backing"
+version = "0.1.0"
+authors = ["Parity Technologies "]
+edition = "2018"
+
+[dependencies]
+futures = "0.3.5"
+sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
+keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
+polkadot-primitives = { path = "../../../primitives" }
+polkadot-node-primitives = { path = "../../primitives" }
+polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
+polkadot-node-subsystem-util = { path = "../../subsystem-util" }
+erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
+statement-table = { package = "polkadot-statement-table", path = "../../../statement-table" }
+derive_more = "0.99.9"
+bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
+log = "0.4.8"
+
+[dev-dependencies]
+sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
+sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
+futures = { version = "0.3.5", features = ["thread-pool"] }
+assert_matches = "1.3.0"
+polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
diff --git a/node/core/backing/src/lib.rs b/node/core/backing/src/lib.rs
new file mode 100644
index 0000000000000000000000000000000000000000..7e6e64ee47d42ee3fd7a06c7e5a0e60798c366a6
--- /dev/null
+++ b/node/core/backing/src/lib.rs
@@ -0,0 +1,1892 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Implements a `CandidateBackingSubsystem`.
+
+use std::collections::{HashMap, HashSet};
+use std::convert::TryFrom;
+use std::pin::Pin;
+use std::sync::Arc;
+
+use bitvec::vec::BitVec;
+use futures::{
+ channel::{mpsc, oneshot},
+ Future, FutureExt, SinkExt, StreamExt,
+};
+
+use keystore::KeyStorePtr;
+use polkadot_primitives::v1::{
+ CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
+ ValidatorIndex, SigningContext, PoV,
+ CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
+ CandidateCommitments, CoreState, CoreIndex, CollatorId,
+};
+use polkadot_node_primitives::{
+ FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport,
+ ValidationOutputs, ValidationResult,
+};
+use polkadot_subsystem::{
+ messages::{
+ AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
+ CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
+ ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
+ RuntimeApiRequest,
+ },
+ metrics::{self, prometheus},
+};
+use polkadot_node_subsystem_util::{
+ self as util,
+ request_session_index_for_child,
+ request_validator_groups,
+ request_validators,
+ request_from_runtime,
+ Validator,
+ delegated_subsystem,
+};
+use statement_table::{
+ generic::AttestedCandidate as TableAttestedCandidate,
+ Context as TableContextTrait,
+ Table,
+ v1::{
+ Statement as TableStatement,
+ SignedStatement as TableSignedStatement, Summary as TableSummary,
+ },
+};
+
+#[derive(Debug, derive_more::From)]
+enum Error {
+ CandidateNotFound,
+ InvalidSignature,
+ StoreFailed,
+ #[from]
+ Erasure(erasure_coding::Error),
+ #[from]
+ ValidationFailed(ValidationFailed),
+ #[from]
+ Oneshot(oneshot::Canceled),
+ #[from]
+ Mpsc(mpsc::SendError),
+ #[from]
+ UtilError(util::Error),
+}
+
+/// Holds all data needed for candidate backing job operation.
+struct CandidateBackingJob {
+ /// The hash of the relay parent on top of which this job is doing it's work.
+ parent: Hash,
+ /// Inbound message channel receiving part.
+ rx_to: mpsc::Receiver,
+ /// Outbound message channel sending part.
+ tx_from: mpsc::Sender,
+ /// The `ParaId` assigned to this validator
+ assignment: ParaId,
+ /// The collator required to author the candidate, if any.
+ required_collator: Option,
+ /// We issued `Valid` or `Invalid` statements on about these candidates.
+ issued_statements: HashSet,
+ /// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
+ seconded: Option,
+ /// We have already reported misbehaviors for these validators.
+ reported_misbehavior_for: HashSet,
+ table: Table,
+ table_context: TableContext,
+ metrics: Metrics,
+}
+
+const fn group_quorum(n_validators: usize) -> usize {
+ (n_validators / 2) + 1
+}
+
+#[derive(Default)]
+struct TableContext {
+ signing_context: SigningContext,
+ validator: Option