Unverified Commit 532f0e01 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Remove legacy network code (#860)

* expunge legacy code from polkadot-network

* mostly rip out old legacy protocol from service

* ensure validation work is spawned by incoming messages

* decouple availabliity store from network logic; clean up data flow

* av_store: test helpers and use futures-abort

* update polkadot-validation to pass n_validators when submitting chunks

* fallible erasure-chunk fetching

* implement `ErasureNetworking` for new network prot

* API for registering availability store in network

* fully integrate new network service into service

* fix validation tests

* scaffolding for porting collator over to new network

* track connected validators' peer IDs and distribute collators' collations

* helper in network for fetching all checked statements

* fix adder-collator

* actually register notifications protocol

* Update service/src/lib.rs

* merge with master
parent dab60d5b
Pipeline #81771 passed with stages
in 18 minutes and 57 seconds
......@@ -32,7 +32,7 @@ use polkadot_primitives::{
ParachainHost, AvailableData, OmittedValidationData,
},
};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT, HashFor};
use sp_runtime::traits::HashFor;
use sp_blockchain::{Result as ClientResult};
use client::{
BlockchainEvents, BlockBody,
......@@ -55,7 +55,7 @@ pub use worker::AvailabilityBlockImport;
pub use store::AwaitedFrontierEntry;
use worker::{
Worker, WorkerHandle, Chunks, IncludedParachainBlocks, WorkerMsg, MakeAvailable,
Worker, WorkerHandle, IncludedParachainBlocks, WorkerMsg, MakeAvailable, Chunks
};
use store::{Store as InnerStore};
......@@ -70,23 +70,7 @@ pub struct Config {
pub path: PathBuf,
}
/// Compute gossip topic for the erasure chunk messages given the relay parent,
/// root and the chunk index.
///
/// Since at this point we are not able to use [`network`] directly, but both
/// of them need to compute these topics, this lives here and not there.
///
/// [`network`]: ../polkadot_network/index.html
pub fn erasure_coding_topic(relay_parent: Hash, erasure_root: Hash, index: u32) -> Hash {
let mut v = relay_parent.as_ref().to_vec();
v.extend(erasure_root.as_ref());
v.extend(&index.to_le_bytes()[..]);
v.extend(b"erasure_chunks");
BlakeTwo256::hash(&v[..])
}
/// A trait that provides a shim for the [`NetworkService`] trait.
/// An abstraction around networking for the availablity-store.
///
/// Currently it is not possible to use the networking code in the availability store
/// core directly due to a number of loop dependencies it require:
......@@ -95,26 +79,25 @@ pub fn erasure_coding_topic(relay_parent: Hash, erasure_root: Hash, index: u32)
///
/// `availability-store` -> `network` -> `validation` -> `availability-store`
///
/// So we provide this shim trait that gets implemented for a wrapper newtype in
/// the [`network`] module.
/// So we provide this trait that gets implemented for a type in
/// the [`network`] module or a mock in tests.
///
/// [`NetworkService`]: ../polkadot_network/trait.NetworkService.html
/// [`network`]: ../polkadot_network/index.html
pub trait ProvideGossipMessages {
/// Get a stream of gossip erasure chunk messages for a given topic.
///
/// Each item is a tuple (relay_parent, candidate_hash, erasure_chunk)
fn gossip_messages_for(
pub trait ErasureNetworking {
/// Errors that can occur when fetching erasure chunks.
type Error: std::fmt::Debug + 'static;
/// Fetch an erasure chunk from the networking service.
fn fetch_erasure_chunk(
&self,
topic: Hash,
) -> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>;
candidate_hash: &Hash,
index: u32,
) -> Pin<Box<dyn Future<Output = Result<ErasureChunk, Self::Error>> + Send>>;
/// Gossip an erasure chunk message.
fn gossip_erasure_chunk(
/// Distributes an erasure chunk to the correct validator node.
fn distribute_erasure_chunk(
&self,
relay_parent: Hash,
candidate_hash: Hash,
erasure_root: Hash,
chunk: ErasureChunk,
);
}
......@@ -148,11 +131,11 @@ impl Store {
/// Creating a store among other things starts a background worker thread which
/// handles most of the write operations to the storage.
#[cfg(not(target_os = "unknown"))]
pub fn new<PGM>(config: Config, gossip: PGM) -> io::Result<Self>
where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static
pub fn new<EN>(config: Config, network: EN) -> io::Result<Self>
where EN: ErasureNetworking + Send + Sync + Clone + 'static
{
let inner = InnerStore::new(config)?;
let worker = Arc::new(Worker::start(inner.clone(), gossip));
let worker = Arc::new(Worker::start(inner.clone(), network));
let to_worker = worker.to_worker().clone();
Ok(Self {
......@@ -166,11 +149,11 @@ impl Store {
///
/// Creating a store among other things starts a background worker thread
/// which handles most of the write operations to the storage.
pub fn new_in_memory<PGM>(gossip: PGM) -> Self
where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static
pub fn new_in_memory<EN>(network: EN) -> Self
where EN: ErasureNetworking + Send + Sync + Clone + 'static
{
let inner = InnerStore::new_in_memory();
let worker = Arc::new(Worker::start(inner.clone(), gossip));
let worker = Arc::new(Worker::start(inner.clone(), network));
let to_worker = worker.to_worker().clone();
Self {
......@@ -204,7 +187,6 @@ impl Store {
let to_worker = self.to_worker.clone();
let import = AvailabilityBlockImport::new(
self.inner.clone(),
client,
wrapped_block_import,
spawner,
......@@ -261,35 +243,38 @@ impl Store {
pub async fn add_erasure_chunk(
&self,
candidate: AbridgedCandidateReceipt,
n_validators: u32,
chunk: ErasureChunk,
) -> io::Result<()> {
self.add_erasure_chunks(candidate, vec![chunk]).await
self.add_erasure_chunks(candidate, n_validators, std::iter::once(chunk)).await
}
/// Adds a set of erasure chunks to storage.
///
/// The chunks should be checked for validity against the root of encoding
/// and it's proof prior to calling this.
/// and its proof prior to calling this.
///
/// This method will send the chunks to the background worker, allowing caller to
/// asynchrounously waiting for the result.
pub async fn add_erasure_chunks<I>(
&self,
candidate: AbridgedCandidateReceipt,
n_validators: u32,
chunks: I,
) -> io::Result<()>
where I: IntoIterator<Item = ErasureChunk>
{
let candidate_hash = candidate.hash();
let relay_parent = candidate.relay_parent;
self.add_candidate(candidate).await?;
let (s, r) = oneshot::channel();
let chunks = chunks.into_iter().collect();
let msg = WorkerMsg::Chunks(Chunks {
relay_parent,
candidate_hash,
chunks,
n_validators,
result: s,
});
......
......@@ -60,33 +60,31 @@ fn candidate_key(candidate_hash: &Hash) -> Vec<u8> {
(candidate_hash, 2i8).encode()
}
fn available_chunks_key(relay_parent: &Hash, erasure_root: &Hash) -> Vec<u8> {
(relay_parent, erasure_root, 3i8).encode()
}
fn candidates_with_relay_parent_key(relay_block: &Hash) -> Vec<u8> {
(relay_block, 4i8).encode()
}
// meta keys
fn awaited_chunks_key() -> [u8; 14] {
*b"awaited_chunks"
}
const AWAITED_CHUNKS_KEY: [u8; 14] = *b"awaited_chunks";
fn validator_index_and_n_validators_key(relay_parent: &Hash) -> Vec<u8> {
(relay_parent, 1i8).encode()
}
fn available_chunks_key(candidate_hash: &Hash) -> Vec<u8> {
(candidate_hash, 2i8).encode()
}
/// An entry in the awaited frontier of chunks we are interested in.
#[derive(Encode, Decode, Debug, Hash, PartialEq, Eq, Clone)]
pub struct AwaitedFrontierEntry {
/// The relay-chain parent block hash.
/// The hash of the candidate for which we want to fetch a chunk for.
/// There will be duplicate entries in the case of multiple candidates with
/// the same erasure-root, but this is unlikely.
pub candidate_hash: Hash,
/// Although the relay-parent is implicitly referenced by the candidate hash,
/// we include it here as well for convenience in pruning the set.
pub relay_parent: Hash,
/// The erasure-chunk trie root we are comparing against.
///
/// We index by erasure-root because there may be multiple candidates
/// with the same erasure root.
pub erasure_root: Hash,
/// The index of the validator we represent.
pub validator_index: u32,
}
......@@ -153,7 +151,7 @@ impl Store {
/// Get a set of all chunks we are waiting for.
pub fn awaited_chunks(&self) -> Option<HashSet<AwaitedFrontierEntry>> {
self.query_inner(columns::META, &awaited_chunks_key()).map(|vec: Vec<AwaitedFrontierEntry>| {
self.query_inner(columns::META, &AWAITED_CHUNKS_KEY).map(|vec: Vec<AwaitedFrontierEntry>| {
HashSet::from_iter(vec.into_iter())
})
}
......@@ -183,21 +181,21 @@ impl Store {
if let Some((validator_index, _)) = self.get_validator_index_and_n_validators(relay_parent) {
let candidates = candidates.clone();
let awaited_frontier: Vec<AwaitedFrontierEntry> = self
.query_inner(columns::META, &awaited_chunks_key())
.query_inner(columns::META, &AWAITED_CHUNKS_KEY)
.unwrap_or_else(|| Vec::new());
let mut awaited_frontier: HashSet<AwaitedFrontierEntry> =
HashSet::from_iter(awaited_frontier.into_iter());
awaited_frontier.extend(candidates.iter().filter_map(|candidate| {
self.get_candidate(&candidate).map(|receipt| AwaitedFrontierEntry {
awaited_frontier.extend(candidates.iter().cloned().map(|candidate_hash| {
AwaitedFrontierEntry {
relay_parent: relay_parent.clone(),
erasure_root: receipt.commitments.erasure_root,
candidate_hash,
validator_index,
})
}
}));
let awaited_frontier = Vec::from_iter(awaited_frontier.into_iter());
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}
let mut descendent_candidates = self.get_candidates_with_relay_parent(relay_parent);
......@@ -246,15 +244,12 @@ impl Store {
let mut v = self.query_inner(columns::DATA, &dbkey).unwrap_or(Vec::new());
let av_chunks_key = available_chunks_key(
&receipt.relay_parent,
&receipt.commitments.erasure_root,
);
let av_chunks_key = available_chunks_key(candidate_hash);
let mut have_chunks = self.query_inner(columns::META, &av_chunks_key).unwrap_or(Vec::new());
let awaited_frontier: Option<Vec<AwaitedFrontierEntry>> = self.query_inner(
columns::META,
&awaited_chunks_key(),
&AWAITED_CHUNKS_KEY,
);
for chunk in chunks.into_iter() {
......@@ -268,19 +263,21 @@ impl Store {
awaited_frontier.retain(|entry| {
!(
entry.relay_parent == receipt.relay_parent &&
entry.erasure_root == receipt.commitments.erasure_root &&
&entry.candidate_hash == candidate_hash &&
have_chunks.contains(&entry.validator_index)
)
});
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}
// If therea are no block data in the store at this point,
// If there are no block data in the store at this point,
// check that they can be reconstructed now and add them to store if they can.
if self.execution_data(&candidate_hash).is_none() {
if let Ok(available_data) = erasure::reconstruct(
n_validators as usize,
v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize))) {
v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize)),
)
{
self.make_available(*candidate_hash, available_data)?;
}
}
......@@ -339,11 +336,11 @@ impl Store {
let mut tx = DBTransaction::new();
let awaited_frontier: Option<Vec<AwaitedFrontierEntry>> = self
.query_inner(columns::META, &awaited_chunks_key());
.query_inner(columns::META, &AWAITED_CHUNKS_KEY);
if let Some(mut awaited_frontier) = awaited_frontier {
awaited_frontier.retain(|entry| entry.relay_parent != relay_parent);
tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode());
tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode());
}
let candidates = self.get_candidates_with_relay_parent(&relay_parent);
......@@ -354,6 +351,8 @@ impl Store {
tx.delete(columns::DATA, execution_data_key(&candidate).as_slice());
tx.delete(columns::DATA, &erasure_chunks_key(&candidate));
tx.delete(columns::DATA, &candidate_key(&candidate));
tx.delete(columns::META, &available_chunks_key(&candidate));
}
self.inner.write(tx)
......@@ -576,7 +575,6 @@ mod tests {
proof: Vec::new(),
};
let candidates = vec![receipt_1_hash, receipt_2_hash];
let erasure_roots = vec![erasure_root_1, erasure_root_2];
let store = Store::new_in_memory();
......@@ -596,10 +594,9 @@ mod tests {
let expected: HashSet<_> = candidates
.clone()
.into_iter()
.zip(erasure_roots.iter())
.map(|(_c, &e)| AwaitedFrontierEntry {
.map(|c| AwaitedFrontierEntry {
relay_parent,
erasure_root: e,
candidate_hash: c,
validator_index,
})
.collect();
......@@ -612,7 +609,7 @@ mod tests {
// Now we wait for the other chunk that we haven't received yet.
let expected: HashSet<_> = vec![AwaitedFrontierEntry {
relay_parent,
erasure_root: erasure_roots[1],
candidate_hash: receipt_2_hash,
validator_index,
}].into_iter().collect();
......
This diff is collapsed.
......@@ -127,12 +127,13 @@ where
service::Roles::LIGHT =>
sc_cli::run_service_until_exit(
config,
|config| service::new_light::<R, D, E>(config, None),
|config| service::new_light::<R, D, E>(config),
),
_ =>
sc_cli::run_service_until_exit(
config,
|config| service::new_full::<R, D, E>(config, None, None, authority_discovery_enabled, 6000),
|config| service::new_full::<R, D, E>(config, None, None, authority_discovery_enabled, 6000)
.map(|(s, _)| s),
),
}
}
......
......@@ -48,6 +48,7 @@ use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
use log::warn;
......@@ -63,10 +64,9 @@ use polkadot_primitives::{
};
use polkadot_cli::{
ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama,
service::{self, Roles, SelectChain}
service::{self, Roles}
};
use polkadot_network::legacy::validation::ValidationNetwork;
use polkadot_network::PolkadotProtocol;
pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration};
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
......@@ -77,30 +77,17 @@ const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
/// An abstraction over the `Network` with useful functions for a `Collator`.
pub trait Network: Send + Sync {
/// Convert the given `CollatorId` to a `PeerId`.
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Output=Option<PeerId>> + Send>;
/// Create a `Stream` of checked statements for the given `relay_parent`.
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>>;
fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement>>>;
}
impl<P, SP> Network for ValidationNetwork<P, SP> where
P: 'static + Send + Sync,
SP: 'static + Spawn + Clone + Send + Sync,
{
fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Output=Option<PeerId>> + Send>
{
Box::new(Self::collator_id_to_peer_id(self, collator_id))
}
fn checked_statements(&self, relay_parent: Hash) -> Box<dyn Stream<Item=SignedStatement>> {
Box::new(Self::checked_statements(self, relay_parent))
impl Network for polkadot_network::protocol::Service {
fn checked_statements(&self, relay_parent: Hash) -> Pin<Box<dyn Stream<Item=SignedStatement>>> {
polkadot_network::protocol::Service::checked_statements(self, relay_parent)
}
}
......@@ -139,7 +126,7 @@ pub trait BuildParachainContext {
self,
client: Arc<PolkadotClient<B, E, R>>,
spawner: SP,
network: Arc<dyn Network>,
network: impl Network + Clone + 'static,
) -> Result<Self::ParachainContext, ()>
where
PolkadotClient<B, E, R>: ProvideRuntimeApi<Block>,
......@@ -219,13 +206,13 @@ pub async fn collate<P>(
}
fn build_collator_service<S, P, Extrinsic>(
service: S,
service: (S, polkadot_service::FullNodeHandles),
para_id: ParaId,
key: Arc<CollatorPair>,
build_parachain_context: P,
) -> Result<S, polkadot_service::Error>
where
S: AbstractService<Block = service::Block, NetworkSpecialization = service::PolkadotProtocol>,
S: AbstractService<Block = service::Block, NetworkSpecialization = PolkadotProtocol>,
sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi>: ProvideRuntimeApi<Block>,
<sc_client::Client<S::Backend, S::CallExecutor, service::Block, S::RuntimeApi> as ProvideRuntimeApi<Block>>::Api:
RuntimeApiCollection<
......@@ -247,52 +234,22 @@ fn build_collator_service<S, P, Extrinsic>(
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
Extrinsic: service::Codec + Send + Sync + 'static,
{
let (service, handles) = service;
let spawner = service.spawn_task_handle();
let client = service.client();
let network = service.network();
let known_oracle = client.clone();
let select_chain = if let Some(select_chain) = service.select_chain() {
select_chain
} else {
return Err("The node cannot work because it can't select chain.".into())
};
let is_known = move |block_hash: &Hash| {
use consensus_common::BlockStatus;
use polkadot_network::legacy::gossip::Known;
match known_oracle.block_status(&BlockId::hash(*block_hash)) {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) =>
match select_chain.leaves() {
Err(_) => None,
Ok(leaves) => if leaves.contains(block_hash) {
Some(Known::Leaf)
} else {
Some(Known::Old)
},
}
}
let polkadot_network = match handles.polkadot_network {
None => return Err(
"Collator cannot run when Polkadot-specific networking has not been started".into()
),
Some(n) => n,
};
let message_validator = polkadot_network::legacy::gossip::register_validator(
network.clone(),
(is_known, client.clone()),
&spawner,
);
let validation_network = Arc::new(ValidationNetwork::new(
message_validator,
client.clone(),
spawner.clone(),
));
let client = service.client();
let parachain_context = match build_parachain_context.build(
client.clone(),
spawner,
validation_network.clone(),
polkadot_network.clone(),
) {
Ok(ctx) => ctx,
Err(()) => {
......@@ -318,7 +275,7 @@ fn build_collator_service<S, P, Extrinsic>(
let relay_parent = notification.hash;
let id = BlockId::hash(relay_parent);
let network = network.clone();
let network = polkadot_network.clone();
let client = client.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();
......@@ -345,14 +302,7 @@ fn build_collator_service<S, P, Extrinsic>(
parachain_context,
key,
).map_ok(move |collation| {
network.with_spec(move |spec, ctx| {
spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
);
})
network.distribute_collation(targets, collation)
});
future::Either::Right(collation_work)
......@@ -395,13 +345,9 @@ where
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
{
match (config.expect_chain_spec().is_kusama(), config.roles) {
(true, Roles::LIGHT) =>
build_collator_service(
service::kusama_new_light(config, Some((key.public(), para_id)))?,
para_id,
key,
build_parachain_context,
)?.await,
(_, Roles::LIGHT) => return Err(
polkadot_service::Error::Other("light nodes are unsupported as collator".into())
).into(),
(true, _) =>
build_collator_service(
service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
......@@ -409,13 +355,6 @@ where
key,
build_parachain_context,
)?.await,
(false, Roles::LIGHT) =>
build_collator_service(
service::polkadot_new_light(config, Some((key.public(), para_id)))?,
para_id,
key,
build_parachain_context,
)?.await,
(false, _) =>
build_collator_service(
service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?,
......@@ -451,15 +390,9 @@ pub fn run_collator<P>(
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
{
match (config.expect_chain_spec().is_kusama(), config.roles) {
(true, Roles::LIGHT) =>
sc_cli::run_service_until_exit(config, |config| {
build_collator_service(
service::kusama_new_light(config, Some((key.public(), para_id)))?,
para_id,
key,
build_parachain_context,
)
}),
(_, Roles::LIGHT) => return Err(
polkadot_cli::Error::Input("light nodes are unsupported as collator".into())
).into(),
(true, _) =>
sc_cli::run_service_until_exit(config, |config| {
build_collator_service(
......@@ -469,15 +402,6 @@ pub fn run_collator<P>(
build_parachain_context,
)
}),
(false, Roles::LIGHT) =>
sc_cli::run_service_until_exit(config, |config| {
build_collator_service(
service::polkadot_new_light(config, Some((key.public(), para_id)))?,
para_id,
key,
build_parachain_context,
)
}),
(false, _) =>
sc_cli::run_service_until_exit(config, |config| {
build_collator_service(
......
......@@ -38,10 +38,10 @@ use polkadot_primitives::Hash;
use std::collections::{HashMap, HashSet};
use log::warn;
use crate::legacy::router::attestation_topic;
use super::{cost, benefit, MAX_CHAIN_HEADS, LeavesVec,
ChainContext, Known, MessageValidationData, GossipStatement
use super::{
cost, benefit, attestation_topic, MAX_CHAIN_HEADS, LeavesVec,
ChainContext, Known, MessageValidationData, GossipStatement,
};
// knowledge about attestations on a single parent-hash.
......
......@@ -51,7 +51,7 @@
use sp_runtime::traits::{BlakeTwo256, Hash as HashT};
use sp_blockchain::Error as ClientError;
use sc_network::{config::Roles, Context, PeerId, ReputationChange};
use sc_network::{config::Roles, PeerId, ReputationChange};