Unverified Commit a0541ce7 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

PoV Distribution optimization (#1990)

* Initial commit

* Remove unnecessary struct

* Some review nits

* Update node/network/pov-distribution/src/lib.rs

* Update parachain/test-parachains/adder/collator/tests/integration.rs

* Review nits

* notify_all_we_are_awaiting

* Both ways of peers connections should work the same

* Add mod-level docs to error.rs

* Avoid multiple connection requests at same parent

* Dont bail on errors

* FusedStream for ConnectionRequests

* Fix build after merge

* Improve error handling

* Remove whitespace formatting
parent 029c8a2a
Pipeline #114983 canceled with stages
in 5 minutes and 47 seconds
......@@ -5202,13 +5202,18 @@ name = "polkadot-pov-distribution"
version = "0.1.0"
dependencies = [
"assert_matches",
"env_logger 0.8.2",
"futures 0.3.8",
"log",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"smallvec 1.5.0",
"sp-core",
"sp-keyring",
"thiserror",
"tracing",
"tracing-futures",
]
......
......@@ -6,8 +6,10 @@ edition = "2018"
[dependencies]
futures = "0.3.8"
thiserror = "1.0.21"
tracing = "0.1.22"
tracing-futures = "0.2.4"
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
......@@ -15,5 +17,11 @@ polkadot-node-network-protocol = { path = "../../network/protocol" }
[dev-dependencies]
assert_matches = "1.4.0"
env_logger = "0.8.1"
log = "0.4.11"
smallvec = "1.4.2"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The `Error` and `Result` types used by the subsystem.
use thiserror::Error;
#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Subsystem(#[from] polkadot_subsystem::SubsystemError),
#[error(transparent)]
OneshotRecv(#[from] futures::channel::oneshot::Canceled),
#[error(transparent)]
Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError),
#[error(transparent)]
ValidatorDiscovery(#[from] polkadot_node_subsystem_util::validator_discovery::Error),
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
......@@ -22,16 +22,26 @@
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
use polkadot_primitives::v1::{Hash, PoV, CandidateDescriptor};
use polkadot_primitives::v1::{
Hash, PoV, CandidateDescriptor, ValidatorId, Id as ParaId, CoreIndex, CoreState,
};
use polkadot_subsystem::{
ActiveLeavesUpdate, OverseerSignal, SubsystemContext, Subsystem, SubsystemResult, SubsystemError,
ActiveLeavesUpdate, OverseerSignal, SubsystemContext, SubsystemResult, SubsystemError, Subsystem,
FromOverseer, SpawnedSubsystem,
messages::{
PoVDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, AllMessages, NetworkBridgeMessage,
PoVDistributionMessage, AllMessages, NetworkBridgeMessage,
},
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_network_protocol::{v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View};
use polkadot_node_subsystem_util::{
validator_discovery,
request_validators_ctx,
request_validator_groups_ctx,
request_availability_cores_ctx,
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, ReputationChange as Rep, NetworkBridgeEvent, PeerId, View,
};
use futures::prelude::*;
use futures::channel::oneshot;
......@@ -39,6 +49,8 @@ use futures::channel::oneshot;
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::sync::Arc;
mod error;
#[cfg(test)]
mod tests;
......@@ -75,20 +87,33 @@ impl<C> Subsystem<C> for PoVDistribution
}
}
#[derive(Default)]
struct State {
/// A state of things going on on a per-relay-parent basis.
relay_parent_state: HashMap<Hash, BlockBasedState>,
/// Info on peers.
peer_state: HashMap<PeerId, PeerState>,
/// Our own view.
our_view: View,
/// Connect to relevant groups of validators at different relay parents.
connection_requests: validator_discovery::ConnectionRequests,
/// Metrics.
metrics: Metrics,
}
struct BlockBasedState {
known: HashMap<Hash, Arc<PoV>>,
/// All the PoVs we are or were fetching, coupled with channels expecting the data.
///
/// This may be an empty list, which indicates that we were once awaiting this PoV but have
/// received it already.
fetching: HashMap<Hash, Vec<oneshot::Sender<Arc<PoV>>>>,
n_validators: usize,
}
......@@ -128,38 +153,45 @@ async fn handle_signal(
let _timer = state.metrics.time_handle_signal();
for relay_parent in activated {
let (vals_tx, vals_rx) = oneshot::channel();
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(vals_tx),
))).await;
let n_validators = match vals_rx.await? {
Ok(v) => v.len(),
match request_validators_ctx(relay_parent.clone(), ctx).await {
Ok(vals_rx) => {
let n_validators = match vals_rx.await? {
Ok(v) => v.len(),
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error fetching validators from runtime API for active leaf",
);
// Not adding bookkeeping here might make us behave funny, but we
// shouldn't take down the node on spurious runtime API errors.
//
// and this is "behave funny" as in be bad at our job, but not in any
// slashable or security-related way.
continue;
}
};
state.relay_parent_state.insert(relay_parent, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators,
});
}
Err(e) => {
// continue here also as above.
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Error fetching validators from runtime API for active leaf",
);
// Not adding bookkeeping here might make us behave funny, but we
// shouldn't take down the node on spurious runtime API errors.
//
// and this is "behave funny" as in be bad at our job, but not in any
// slashable or security-related way.
continue;
}
};
state.relay_parent_state.insert(relay_parent, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: n_validators,
});
}
}
for relay_parent in deactivated {
state.connection_requests.remove(&relay_parent);
state.relay_parent_state.remove(&relay_parent);
}
......@@ -197,7 +229,7 @@ async fn notify_all_we_are_awaiting(
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
payload,
))).await
))).await;
}
/// Notify one peer about everything we're awaiting at a given relay-parent.
......@@ -224,7 +256,7 @@ async fn notify_one_we_are_awaiting_many(
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
vec![peer.clone()],
payload,
))).await
))).await;
}
/// Distribute a PoV to peers who are awaiting it.
......@@ -262,6 +294,75 @@ async fn distribute_to_awaiting(
metrics.on_pov_distributed();
}
/// Get the Id of the Core that is assigned to the para being collated on if any
/// and the total number of cores.
async fn determine_core(
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
para_id: ParaId,
relay_parent: Hash,
) -> error::Result<Option<(CoreIndex, usize)>> {
let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??;
for (idx, core) in cores.iter().enumerate() {
if let CoreState::Scheduled(occupied) = core {
if occupied.para_id == para_id {
return Ok(Some(((idx as u32).into(), cores.len())));
}
}
}
Ok(None)
}
/// Figure out a group of validators assigned to a given `ParaId`.
async fn determine_validators_for_core(
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
core_index: CoreIndex,
num_cores: usize,
relay_parent: Hash,
) -> error::Result<Option<Vec<ValidatorId>>> {
let groups = request_validator_groups_ctx(relay_parent, ctx).await?.await??;
let group_index = groups.1.group_for_core(core_index, num_cores);
let connect_to_validators = match groups.0.get(group_index.0 as usize) {
Some(group) => group.clone(),
None => return Ok(None),
};
let validators = request_validators_ctx(relay_parent, ctx).await?.await??;
let validators = connect_to_validators
.into_iter()
.map(|idx| validators[idx as usize].clone())
.collect();
Ok(Some(validators))
}
async fn determine_relevant_validators(
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
relay_parent: Hash,
para_id: ParaId,
) -> error::Result<Option<Vec<ValidatorId>>> {
// Determine which core the para_id is assigned to.
let (core, num_cores) = match determine_core(ctx, para_id, relay_parent).await? {
Some(core) => core,
None => {
tracing::warn!(
target: LOG_TARGET,
"Looks like no core is assigned to {:?} at {:?}",
para_id,
relay_parent,
);
return Ok(None);
}
};
determine_validators_for_core(ctx, core, num_cores, relay_parent).await
}
/// Handles a `FetchPoV` message.
#[tracing::instrument(level = "trace", skip(ctx, state, response_sender), fields(subsystem = LOG_TARGET))]
async fn handle_fetch(
......@@ -291,7 +392,35 @@ async fn handle_fetch(
return;
}
Entry::Vacant(e) => {
e.insert(vec![response_sender]);
if let Ok(Some(relevant_validators)) = determine_relevant_validators(
ctx,
relay_parent,
descriptor.para_id,
).await {
// We only need one connection request per (relay_parent, para_id)
// so here we take this shortcut to avoid calling `connect_to_validators`
// more than once.
if !state.connection_requests.contains_request(&relay_parent) {
match validator_discovery::connect_to_validators(
ctx,
relay_parent,
relevant_validators.clone(),
).await {
Ok(new_connection_request) => {
state.connection_requests.put(relay_parent, new_connection_request);
}
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
"Failed to create a validator connection request {:?}",
e,
);
}
}
}
e.insert(vec![response_sender]);
}
}
}
}
......@@ -482,6 +611,11 @@ async fn handle_incoming_pov(
).await
}
/// Handles a newly connected validator in the context of some relay leaf.
fn handle_validator_connected(state: &mut State, peer_id: PeerId) {
state.peer_state.entry(peer_id).or_default();
}
/// Handles a network bridge update.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
......@@ -493,7 +627,7 @@ async fn handle_network_update(
match update {
NetworkBridgeEvent::PeerConnected(peer, _observed_role) => {
state.peer_state.insert(peer, PeerState { awaited: HashMap::new() });
handle_validator_connected(state, peer);
}
NetworkBridgeEvent::PeerDisconnected(peer) => {
state.peer_state.remove(&peer);
......@@ -558,44 +692,61 @@ impl PoVDistribution {
self,
mut ctx: impl SubsystemContext<Message = PoVDistributionMessage>,
) -> SubsystemResult<()> {
let mut state = State {
relay_parent_state: HashMap::new(),
peer_state: HashMap::new(),
our_view: View(Vec::new()),
metrics: self.metrics,
};
let mut state = State::default();
state.metrics = self.metrics;
loop {
match ctx.recv().await? {
FromOverseer::Signal(signal) => if handle_signal(&mut state, &mut ctx, signal).await? {
return Ok(());
},
FromOverseer::Communication { msg } => match msg {
PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) =>
handle_fetch(
&mut state,
&mut ctx,
relay_parent,
descriptor,
response_sender,
).await,
PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) =>
handle_distribute(
&mut state,
&mut ctx,
relay_parent,
descriptor,
pov,
).await,
PoVDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
// `select_biased` is used since receiving connection notifications and
// peer view update messages may be racy and we want connection notifications
// first.
futures::select_biased! {
v = state.connection_requests.next() => {
match v {
Some((_relay_parent, _validator_id, peer_id)) => {
handle_validator_connected(&mut state, peer_id);
}
None => break,
}
}
v = ctx.recv().fuse() => {
match v? {
FromOverseer::Signal(signal) => if handle_signal(
&mut state,
&mut ctx,
event,
).await,
},
}
signal,
).await? {
return Ok(());
}
FromOverseer::Communication { msg } => match msg {
PoVDistributionMessage::FetchPoV(relay_parent, descriptor, response_sender) =>
handle_fetch(
&mut state,
&mut ctx,
relay_parent,
descriptor,
response_sender,
).await,
PoVDistributionMessage::DistributePoV(relay_parent, descriptor, pov) =>
handle_distribute(
&mut state,
&mut ctx,
relay_parent,
descriptor,
pov,
).await,
PoVDistributionMessage::NetworkBridgeUpdateV1(event) =>
handle_network_update(
&mut state,
&mut ctx,
event,
).await,
}
}
}
};
}
Ok(())
}
}
......
This diff is collapsed.
......@@ -115,6 +115,12 @@ pub struct ConnectionRequests {
requests: StreamUnordered<ConnectionRequest>,
}
impl stream::FusedStream for ConnectionRequests {
fn is_terminated(&self) -> bool {
false
}
}
impl ConnectionRequests {
/// Insert a new connection request.
///
......@@ -133,6 +139,11 @@ impl ConnectionRequests {
Pin::new(&mut self.requests).remove(token);
}
}
/// Is a connection at this relay parent already present in the request
pub fn contains_request(&self, relay_parent: &Hash) -> bool {
self.id_map.contains_key(relay_parent)
}
}
impl stream::Stream for ConnectionRequests {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment