Unverified Commit 48d5b143 authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Req/res optimization for statement distribution (#2803)

* Wip

* Increase proposer timeout.

* WIP.

* Better timeout values now that we are going to be connected to all nodes. (#2778)

* Better timeout values.

* Fix typo.

* Fix validator bandwidth.

* Fix compilation.

* Better and more consistent sizes.

Most importantly code size is now 5 Meg, which is the limit we currently
want to support in statement distribution.

* Introduce statement fetching request.

* WIP

* Statement cache retrieval logic.

* Review remarks by @rphmeier

* Fixes.

* Better requester logic.

* WIP: Handle requester messages.

* Missing dep.

* Fix request launching logic.

* Finish fetching logic.

* Sending logic.

* Redo code size calculations.

Now that max code size is compressed size.

* Update Cargo.lock (new dep)

* Get request receiver to statement distribution.

* Expose new functionality for responding to requests.

* Cleanup.

* Responder logic.

* Fixes + Cleanup.

* Cargo.lock

* Whitespace.

* Add lost copyright.

* Launch responder task.

* Typo.

* info -> warn

* Typo.

* Fix.

* Fix.

* Update comment.

* Doc fix.

* Better large statement heuristics.

* Fix tests.

* Fix network bridge tests.

* Add test for size estimate.

* Very simple tests that checks we get LargeStatement.

* Basic check, that fetching of large candidates is performed.

* More tests.

* Basic metrics for responder.

* More metrics.

* Use Encode::encoded_size().

* Some useful spans.

* Get rid of redundant metrics.

* Don't add peer on duplicate.

* Properly check hash

instead of relying on signatures alone.

* Preserve ordering + better flood protection.

* Get rid of redundant clone.

* Don't shutdown responder on failed query.

And add test for this.

* Smaller fixes.

* Quotes.

* Better queue size calculation.

* A bit saner response sizes.

* Fixes.
parent 373cf924
Pipeline #133930 failed with stages
in 17 minutes and 45 seconds
......@@ -6497,6 +6497,7 @@ dependencies = [
"assert_matches",
"futures 0.3.13",
"indexmap",
"parity-scale-codec",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
......@@ -6504,6 +6505,7 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-keystore",
"sc-network",
"sp-application-crypto",
"sp-core",
"sp-keyring",
......
......@@ -40,8 +40,12 @@ use sp_transaction_pool::TransactionPool;
use prometheus_endpoint::Registry as PrometheusRegistry;
use std::{fmt, pin::Pin, sync::Arc, time};
/// How long proposal can take before we give up and err out
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(2500);
/// How long proposing can take, before we give up and err out. We need a relatively large timeout
/// here as long as we have large payload in statement distribution. Assuming we can reach most
/// nodes within two hops, we will take about 2 seconds for transferring statements (data transfer
/// only). If necessary, we could be able to reduce this to 3 seconds. To consider: The lower the
/// riskier that we will not be able to include a candidate.
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(4000);
/// Custom Proposer factory for Polkadot
pub struct ProposerFactory<TxPool, Backend, Client> {
......
......@@ -28,8 +28,9 @@ use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;
use polkadot_subsystem::{
ActiveLeavesUpdate, ActivatedLeaf, Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemError,
SubsystemResult, SubsystemSender, OverseerSignal, FromOverseer,
ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem,
Subsystem, SubsystemContext, SubsystemError, SubsystemResult, SubsystemSender,
messages::StatementDistributionMessage
};
use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages,
......@@ -842,12 +843,16 @@ where
let NetworkBridge {
network_service,
request_multiplexer,
mut request_multiplexer,
authority_discovery_service,
metrics,
sync_oracle,
} = bridge;
let statement_receiver = request_multiplexer
.get_statement_fetching()
.expect("Gets initialized, must be `Some` on startup. qed.");
let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024);
let (remote, network_event_handler) = handle_network_messages(
......@@ -861,6 +866,10 @@ where
ctx.spawn("network-bridge-network-worker", Box::pin(remote)).await?;
ctx.send_message(AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(statement_receiver)
)).await;
let subsystem_event_handler = handle_subsystem_messages(
ctx,
network_service,
......@@ -1777,6 +1786,13 @@ mod tests {
let view = view![Hash::repeat_byte(1)];
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
......@@ -1822,6 +1838,13 @@ mod tests {
ObservedRole::Full,
).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
......@@ -1887,6 +1910,13 @@ mod tests {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
......@@ -1964,6 +1994,13 @@ mod tests {
network_handle.connect_peer(peer_a.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer_b.clone(), PeerSet::Collation, ObservedRole::Full).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
......@@ -2052,6 +2089,13 @@ mod tests {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
......@@ -2205,6 +2249,13 @@ mod tests {
network_handle.connect_peer(peer.clone(), PeerSet::Validation, ObservedRole::Full).await;
network_handle.connect_peer(peer.clone(), PeerSet::Collation, ObservedRole::Full).await;
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
......@@ -2366,6 +2417,13 @@ mod tests {
0,
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::StatementFetchingReceiver(_)
)
);
assert_sends_validation_event_to_all(
NetworkBridgeEvent::OurViewChange(our_view.clone()),
&mut virtual_overseer,
......
......@@ -37,8 +37,11 @@ use polkadot_subsystem::messages::AllMessages;
/// type, useful for the network bridge to send them via the `Overseer` to other subsystems.
///
/// The resulting stream will end once any of its input ends.
///
/// TODO: Get rid of this: https://github.com/paritytech/polkadot/issues/2842
pub struct RequestMultiplexer {
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
statement_fetching: Option<mpsc::Receiver<network::IncomingRequest>>,
next_poll: usize,
}
......@@ -58,21 +61,38 @@ impl RequestMultiplexer {
/// `RequestMultiplexer` from it. The returned `RequestResponseConfig`s must be passed to the
/// network implementation.
pub fn new() -> (Self, Vec<RequestResponseConfig>) {
let (receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter()
let (mut receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter()
.map(|p| {
let (rx, cfg) = p.get_config();
((p, rx), cfg)
})
.unzip();
let index = receivers.iter().enumerate().find_map(|(i, (p, _))|
if let Protocol::StatementFetching = p {
Some(i)
} else {
None
}
).expect("Statement fetching must be registered. qed.");
let statement_fetching = Some(receivers.remove(index).1);
(
Self {
receivers,
statement_fetching,
next_poll: 0,
},
cfgs,
)
}
/// Get the receiver for handling statement fetching requests.
///
/// This function will only return `Some` once.
pub fn get_statement_fetching(&mut self) -> Option<mpsc::Receiver<network::IncomingRequest>> {
std::mem::take(&mut self.statement_fetching)
}
}
impl Stream for RequestMultiplexer {
......@@ -151,6 +171,9 @@ fn multiplex_single(
decode_with_peer::<v1::AvailableDataFetchingRequest>(peer, payload)?,
pending_response,
)),
Protocol::StatementFetching => {
panic!("Statement fetching requests are handled directly. qed.");
}
};
Ok(r)
}
......
......@@ -291,10 +291,7 @@ pub mod v1 {
use parity_scale_codec::{Encode, Decode};
use std::convert::TryFrom;
use polkadot_primitives::v1::{
CandidateIndex, CollatorId, Hash, Id as ParaId, SignedAvailabilityBitfield,
CollatorSignature,
};
use polkadot_primitives::v1::{CandidateHash, CandidateIndex, CollatorId, CollatorSignature, CompactStatement, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex, ValidatorSignature};
use polkadot_node_primitives::{
approval::{IndirectAssignmentCert, IndirectSignedApprovalVote},
SignedFullStatement,
......@@ -313,7 +310,68 @@ pub mod v1 {
pub enum StatementDistributionMessage {
/// A signed full statement under a given relay-parent.
#[codec(index = 0)]
Statement(Hash, SignedFullStatement)
Statement(Hash, SignedFullStatement),
/// Seconded statement with large payload (e.g. containing a runtime upgrade).
///
/// We only gossip the hash in that case, actual payloads can be fetched from sending node
/// via req/response.
#[codec(index = 1)]
LargeStatement(StatementMetadata),
}
/// Data that maes a statement unique.
#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq, Hash)]
pub struct StatementMetadata {
/// Relayt parent this statement is relevant under.
pub relay_parent: Hash,
/// Hash of the candidate that got validated.
pub candidate_hash: CandidateHash,
/// Validator that attested the valididty.
pub signed_by: ValidatorIndex,
/// Signature of seconding validator.
pub signature: ValidatorSignature,
}
impl StatementDistributionMessage {
/// Get meta data of the given `StatementDistributionMessage`.
pub fn get_metadata(&self) -> StatementMetadata {
match self {
Self::Statement(relay_parent, statement) => StatementMetadata {
relay_parent: *relay_parent,
candidate_hash: statement.payload().candidate_hash(),
signed_by: statement.validator_index(),
signature: statement.signature().clone(),
},
Self::LargeStatement(metadata) => metadata.clone(),
}
}
/// Get fingerprint describing the contained statement uniquely.
pub fn get_fingerprint(&self) -> (CompactStatement, ValidatorIndex) {
match self {
Self::Statement(_, statement) =>
(statement.payload().to_compact(), statement.validator_index()),
Self::LargeStatement(meta) =>
(CompactStatement::Seconded(meta.candidate_hash), meta.signed_by),
}
}
/// Get contained relay parent.
pub fn get_relay_parent(&self) -> Hash {
match self {
Self::Statement(r, _) => *r,
Self::LargeStatement(meta) => meta.relay_parent,
}
}
/// Whether or not this message contains a large statement.
pub fn is_large_statement(&self) -> bool {
if let Self::LargeStatement(_) = self {
true
} else {
false
}
}
}
/// Network messages used by the approval distribution subsystem.
......
......@@ -32,11 +32,12 @@
//!
//! Versioned (v1 module): The actual requests and responses as sent over the network.
use std::borrow::Cow;
use std::{borrow::Cow, u64};
use std::time::Duration;
use futures::channel::mpsc;
use polkadot_node_primitives::MAX_POV_SIZE;
use polkadot_primitives::v1::MAX_CODE_SIZE;
use strum::EnumIter;
pub use sc_network::config as network;
......@@ -64,8 +65,15 @@ pub enum Protocol {
PoVFetching,
/// Protocol for fetching available data.
AvailableDataFetching,
/// Fetching of statements that are too large for gossip.
StatementFetching,
}
/// Minimum bandwidth we expect for validators - 500Mbit/s is the recommendation, so approximately
/// 50Meg bytes per second:
const MIN_BANDWIDTH_BYTES: u64 = 50 * 1024 * 1024;
/// Default request timeout in seconds.
///
/// When decreasing this value, take into account that the very first request might need to open a
......@@ -78,14 +86,22 @@ const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);
/// peer set as well).
const DEFAULT_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_secs(1);
/// Minimum bandwidth we expect for validators - 500Mbit/s is the recommendation, so approximately
/// 50Meg bytes per second:
const MIN_BANDWIDTH_BYTES: u64 = 50 * 1024 * 1024;
/// Timeout for PoV like data, 2 times what it should take, assuming we can fully utilize the
/// bandwidth. This amounts to two seconds right now.
const POV_REQUEST_TIMEOUT_CONNECTED: Duration =
Duration::from_millis(2 * 1000 * (MAX_POV_SIZE as u64) / MIN_BANDWIDTH_BYTES);
/// We want timeout statement requests fast, so we don't waste time on slow nodes. Responders will
/// try their best to either serve within that timeout or return an error immediately. (We need to
/// fit statement distribution within a block of 6 seconds.)
const STATEMENTS_TIMEOUT: Duration = Duration::from_secs(1);
/// We don't want a slow peer to slow down all the others, at the same time we want to get out the
/// data quickly in full to at least some peers (as this will reduce load on us as they then can
/// start serving the data). So this value is a tradeoff. 3 seems to be sensible. So we would need
/// to have 3 slow noded connected, to delay transfer for others by `STATEMENTS_TIMEOUT`.
pub const MAX_PARALLEL_STATEMENT_REQUESTS: u32 = 3;
impl Protocol {
/// Get a configuration for a given Request response protocol.
///
......@@ -105,16 +121,16 @@ impl Protocol {
let cfg = match self {
Protocol::ChunkFetching => RequestResponseConfig {
name: p_name,
max_request_size: 10_000,
max_response_size: 10_000_000,
max_request_size: 1_000,
max_response_size: MAX_POV_SIZE as u64 / 10,
// We are connected to all validators:
request_timeout: DEFAULT_REQUEST_TIMEOUT_CONNECTED,
inbound_queue: Some(tx),
},
Protocol::CollationFetching => RequestResponseConfig {
name: p_name,
max_request_size: 10_000,
max_response_size: MAX_POV_SIZE as u64,
max_request_size: 1_000,
max_response_size: MAX_POV_SIZE as u64 + 1000,
// Taken from initial implementation in collator protocol:
request_timeout: POV_REQUEST_TIMEOUT_CONNECTED,
inbound_queue: Some(tx),
......@@ -130,10 +146,28 @@ impl Protocol {
name: p_name,
max_request_size: 1_000,
// Available data size is dominated by the PoV size.
max_response_size: MAX_POV_SIZE as u64,
max_response_size: MAX_POV_SIZE as u64 + 1000,
request_timeout: POV_REQUEST_TIMEOUT_CONNECTED,
inbound_queue: Some(tx),
},
Protocol::StatementFetching => RequestResponseConfig {
name: p_name,
max_request_size: 1_000,
// Available data size is dominated code size.
// + 1000 to account for protocol overhead (should be way less).
max_response_size: MAX_CODE_SIZE as u64 + 1000,
// We need statement fetching to be fast and will try our best at the responding
// side to answer requests within that timeout, assuming a bandwidth of 500Mbit/s
// - which is the recommended minimum bandwidth for nodes on Kusama as of April
// 2021.
// Responders will reject requests, if it is unlikely they can serve them within
// the timeout, so the requester can immediately try another node, instead of
// waiting for timeout on an overloaded node. Fetches from slow nodes will likely
// fail, but this is desired, so we can quickly move on to a faster one - we should
// also decrease its reputation.
request_timeout: Duration::from_secs(1),
inbound_queue: Some(tx),
},
};
(rx, cfg)
}
......@@ -154,6 +188,26 @@ impl Protocol {
// Validators are constantly self-selecting to request available data which may lead
// to constant load and occasional burstiness.
Protocol::AvailableDataFetching => 100,
// Our queue size approximation is how many blocks of the size of
// a runtime we can transfer within a statements timeout, minus the requests we handle
// in parallel.
Protocol::StatementFetching => {
// We assume we can utilize up to 70% of the available bandwidth for statements.
// This is just a guess/estimate, with the following considerations: If we are
// faster than that, queue size will stay low anyway, even if not - requesters will
// get an immediate error, but if we are slower, requesters will run in a timeout -
// waisting precious time.
let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10;
let size = u64::saturating_sub(
STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64),
MAX_PARALLEL_STATEMENT_REQUESTS as u64
);
debug_assert!(
size > 0,
"We should have a channel size greater zero, otherwise we won't accept any requests."
);
size as usize
}
}
}
......@@ -169,6 +223,7 @@ impl Protocol {
Protocol::CollationFetching => "/polkadot/req_collation/1",
Protocol::PoVFetching => "/polkadot/req_pov/1",
Protocol::AvailableDataFetching => "/polkadot/req_available_data/1",
Protocol::StatementFetching => "/polkadot/req_statement/1",
}
}
}
......@@ -25,6 +25,8 @@ use sc_network::PeerId;
use polkadot_primitives::v1::AuthorityDiscoveryId;
use crate::UnifiedReputationChange;
use super::{v1, Protocol};
/// Common properties of any `Request`.
......@@ -47,6 +49,8 @@ pub enum Requests {
PoVFetching(OutgoingRequest<v1::PoVFetchingRequest>),
/// Request full available data from a node.
AvailableDataFetching(OutgoingRequest<v1::AvailableDataFetchingRequest>),
/// Requests for fetching large statements as part of statement distribution.
StatementFetching(OutgoingRequest<v1::StatementFetchingRequest>),
}
impl Requests {
......@@ -57,6 +61,7 @@ impl Requests {
Self::CollationFetching(_) => Protocol::CollationFetching,
Self::PoVFetching(_) => Protocol::PoVFetching,
Self::AvailableDataFetching(_) => Protocol::AvailableDataFetching,
Self::StatementFetching(_) => Protocol::StatementFetching,
}
}
......@@ -73,6 +78,7 @@ impl Requests {
Self::CollationFetching(r) => r.encode_request(),
Self::PoVFetching(r) => r.encode_request(),
Self::AvailableDataFetching(r) => r.encode_request(),
Self::StatementFetching(r) => r.encode_request(),
}
}
}
......@@ -199,6 +205,22 @@ pub struct IncomingRequest<Req> {
pending_response: oneshot::Sender<netconfig::OutgoingResponse>,
}
/// Typed variant of [`netconfig::OutgoingResponse`].
///
/// Responses to `IncomingRequest`s.
pub struct OutgoingResponse<Response> {
/// The payload of the response.
pub result: Result<Response, ()>,
/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<UnifiedReputationChange>,
/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
/// peer.
pub sent_feedback: Option<oneshot::Sender<()>>,
}
impl<Req> IncomingRequest<Req>
where
Req: IsRequest,
......@@ -232,6 +254,31 @@ where
})
.map_err(|_| resp)
}
/// Send response with additional options.
///
/// This variant allows for waiting for the response to be sent out, allows for changing peer's
/// reputation and allows for not sending a response at all (for only changing the peer's
/// reputation).
pub fn send_outgoing_response(self, resp: OutgoingResponse<<Req as IsRequest>::Response>)
-> Result<(), ()> {
let OutgoingResponse {
result,
reputation_changes,
sent_feedback,
} = resp;
let response = netconfig::OutgoingResponse {
result: result.map(|v| v.encode()),
reputation_changes: reputation_changes
.into_iter()
.map(|c| c.into_base_rep())
.collect(),
sent_feedback,
};
self.pending_response.send(response).map_err(|_| ())
}
}
/// Future for actually receiving a typed response for an OutgoingRequest.
......
......@@ -18,10 +18,7 @@
use parity_scale_codec::{Decode, Encode};
use polkadot_primitives::v1::{
CandidateHash, CandidateReceipt, ValidatorIndex,
Hash,
};
use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, CommittedCandidateReceipt, Hash, ValidatorIndex};
use polkadot_primitives::v1::Id as ParaId;
use polkadot_node_primitives::{AvailableData, PoV, ErasureChunk};
......@@ -169,3 +166,29 @@ impl IsRequest for AvailableDataFetchingRequest {
type Response = AvailableDataFetchingResponse;
const PROTOCOL: Protocol = Protocol::AvailableDataFetching;
}
/// Request for fetching a large statement via request/response.
#[derive(Debug, Clone, Encode, Decode)]
pub struct StatementFetchingRequest {
/// Data needed to locate and identify the needed statement.
pub relay_parent: Hash,
/// Hash of candidate that was used create the CommitedCandidateRecept.
pub candidate_hash: CandidateHash,
}
/// Respond with found full statement.
///
/// In this protocol the requester will only request data it was previously notified about,
/// therefore not having the data is not really an option and would just result in a
/// `RequestFailure`.
#[derive(Debug, Clone, Encode, Decode)]
pub enum StatementFetchingResponse {
/// Data missing to reconstruct the full signed statement.
#[codec(index = 0)]
Statement(CommittedCandidateReceipt),
}
impl IsRequest for StatementFetchingRequest {
type Response = StatementFetchingResponse;
const PROTOCOL: Protocol = Protocol::StatementFetching;
}
......@@ -10,12 +10,14 @@ futures = "0.3.12"
tracing = "0.1.25"
polkadot-primitives = { path = "../../../primitives" }
sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
arrayvec = "0.5.2"
indexmap = "1.6.1"
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
[dev-dependencies]
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of