Unverified Commit 11797c73 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Compress the PoV block before sending it over the network (#2288)



* Compress the PoV block before sending it over the network

This pr changes the way we send PoV blocks over the network. We now
compress the PoV block before it is send over the network. This should
reduce the size significant for PoVs which contain the runtime WASM for
example.

* Preallocate 1KB

* Try something..

* Switch to zstd and some renamings

* Make compression/decompression fail in browsers

* Use some sane maximum value

* Update roadmap/implementers-guide/src/types/network.md
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Fix and add test

* add
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
parent 8387af2b
Pipeline #121322 canceled with stages
in 19 minutes and 8 seconds
......@@ -5268,6 +5268,8 @@ dependencies = [
"polkadot-primitives",
"sc-network",
"strum 0.20.0",
"thiserror",
"zstd",
]
[[package]]
......
......@@ -501,11 +501,19 @@ async fn send_collation(
receipt: CandidateReceipt,
pov: PoV,
) {
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(
request_id,
receipt,
pov,
);
let pov = match protocol_v1::CompressedPoV::compress(&pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
);
return
}
};
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
......@@ -1280,7 +1288,7 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => {
assert_eq!(req_id, request_id);
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
assert_eq!(pov.decompress().unwrap(), pov_block);
}
);
}
......
......@@ -353,7 +353,7 @@ async fn received_collation<Context>(
origin: PeerId,
request_id: RequestId,
receipt: CandidateReceipt,
pov: PoV,
pov: protocol_v1::CompressedPoV,
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
......@@ -368,6 +368,21 @@ where
if let Some(per_request) = state.requests_info.remove(&id) {
let _ = per_request.received.send(());
if let Some(collator_id) = state.known_collators.get(&origin) {
let pov = match pov.decompress() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
%request_id,
?error,
"Failed to extract PoV",
);
return;
}
};
let _span = jaeger::pov_span(&pov, "received-collation");
tracing::debug!(
target: LOG_TARGET,
%request_id,
......@@ -529,9 +544,8 @@ where
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
Collation(request_id, receipt, pov) => {
let _span1 = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
let _span = state.span_per_relay_parent.get(&receipt.descriptor.relay_parent)
.map(|s| s.child("received-collation"));
let _span2 = jaeger::pov_span(&pov, "received-collation");
received_collation(ctx, state, origin, request_id, receipt, pov).await;
}
}
......@@ -1295,9 +1309,9 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_a.clone(),
PoV {
protocol_v1::CompressedPoV::compress(&PoV {
block_data: BlockData(vec![]),
},
}).unwrap(),
)
)
)
......@@ -1333,9 +1347,9 @@ mod tests {
protocol_v1::CollatorProtocolMessage::Collation(
request_id,
candidate_b.clone(),
PoV {
protocol_v1::CompressedPoV::compress(&PoV {
block_data: BlockData(vec![1, 2, 3]),
},
}).unwrap(),
)
)
)
......
......@@ -106,7 +106,7 @@ struct State {
}
struct BlockBasedState {
known: HashMap<Hash, Arc<PoV>>,
known: HashMap<Hash, (Arc<PoV>, protocol_v1::CompressedPoV)>,
/// All the PoVs we are or were fetching, coupled with channels expecting the data.
///
......@@ -131,11 +131,13 @@ fn awaiting_message(relay_parent: Hash, awaiting: Vec<Hash>)
)
}
fn send_pov_message(relay_parent: Hash, pov_hash: Hash, pov: PoV)
-> protocol_v1::ValidationProtocol
{
fn send_pov_message(
relay_parent: Hash,
pov_hash: Hash,
pov: &protocol_v1::CompressedPoV,
) -> protocol_v1::ValidationProtocol {
protocol_v1::ValidationProtocol::PoVDistribution(
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov)
protocol_v1::PoVDistributionMessage::SendPoV(relay_parent, pov_hash, pov.clone())
)
}
......@@ -267,7 +269,7 @@ async fn distribute_to_awaiting(
metrics: &Metrics,
relay_parent: Hash,
pov_hash: Hash,
pov: &PoV,
pov: &protocol_v1::CompressedPoV,
) {
// Send to all peers who are awaiting the PoV and have that relay-parent in their view.
//
......@@ -284,7 +286,7 @@ async fn distribute_to_awaiting(
if peers_to_send.is_empty() { return; }
let payload = send_pov_message(relay_parent, pov_hash, pov.clone());
let payload = send_pov_message(relay_parent, pov_hash, pov);
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
peers_to_send,
......@@ -379,7 +381,7 @@ async fn handle_fetch(
None => return,
};
if let Some(pov) = relay_parent_state.known.get(&descriptor.pov_hash) {
if let Some((pov, _)) = relay_parent_state.known.get(&descriptor.pov_hash) {
let _ = response_sender.send(pov.clone());
return;
}
......@@ -468,7 +470,17 @@ async fn handle_distribute(
}
}
relay_parent_state.known.insert(descriptor.pov_hash, pov.clone());
let encoded_pov = match protocol_v1::CompressedPoV::compress(&*pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`."
);
return
}
};
distribute_to_awaiting(
&mut state.peer_state,
......@@ -476,8 +488,10 @@ async fn handle_distribute(
&state.metrics,
relay_parent,
descriptor.pov_hash,
&*pov,
).await
&encoded_pov,
).await;
relay_parent_state.known.insert(descriptor.pov_hash, (pov, encoded_pov));
}
/// Report a reputation change for a peer.
......@@ -527,8 +541,9 @@ async fn handle_awaiting(
for pov_hash in pov_hashes {
// For all requested PoV hashes, if we have it, we complete the request immediately.
// Otherwise, we note that the peer is awaiting the PoV.
if let Some(pov) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, (&**pov).clone());
if let Some((_, ref pov)) = relay_parent_state.known.get(&pov_hash) {
let payload = send_pov_message(relay_parent, pov_hash, pov);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(vec![peer.clone()], payload)
)).await;
......@@ -544,23 +559,35 @@ async fn handle_awaiting(
/// Handle an incoming PoV from our peer. Reports them if unexpected, rewards them if not.
///
/// Completes any requests awaiting that PoV.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, state, encoded_pov), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_pov(
state: &mut State,
ctx: &mut impl SubsystemContext<Message = PoVDistributionMessage>,
peer: PeerId,
relay_parent: Hash,
pov_hash: Hash,
pov: PoV,
encoded_pov: protocol_v1::CompressedPoV,
) {
let relay_parent_state = match state.relay_parent_state.get_mut(&relay_parent) {
None => {
None => {
report_peer(ctx, peer, COST_UNEXPECTED_POV).await;
return;
},
Some(r) => r,
};
let pov = match encoded_pov.decompress() {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
error = ?error,
"Could not extract PoV",
);
return;
}
};
let pov = {
// Do validity checks and complete all senders awaiting this PoV.
let fetching = match relay_parent_state.fetching.get_mut(&pov_hash) {
......@@ -607,8 +634,10 @@ async fn handle_incoming_pov(
&state.metrics,
relay_parent,
pov_hash,
&*pov,
).await
&encoded_pov,
).await;
relay_parent_state.known.insert(pov_hash, (pov, encoded_pov));
}
/// Handles a newly connected validator in the context of some relay leaf.
......
......@@ -396,7 +396,11 @@ fn ask_validators_for_povs() {
PoVDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
test_state.validator_peer_id[2].clone(),
protocol_v1::PoVDistributionMessage::SendPoV(current, pov_hash, pov_block.clone()),
protocol_v1::PoVDistributionMessage::SendPoV(
current,
pov_hash,
protocol_v1::CompressedPoV::compress(&pov_block).unwrap(),
),
)
)
).await;
......@@ -631,7 +635,7 @@ fn distributes_to_those_awaiting_and_completes_local() {
assert_eq!(peers, vec![peer_a.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
);
}
)
......@@ -943,7 +947,7 @@ fn peer_complete_fetch_and_is_rewarded() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;
......@@ -952,7 +956,7 @@ fn peer_complete_fetch_and_is_rewarded() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;
......@@ -1033,7 +1037,7 @@ fn peer_punished_for_sending_bad_pov() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, bad_pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&bad_pov).unwrap()),
).focus().unwrap(),
).await;
......@@ -1098,7 +1102,7 @@ fn peer_punished_for_sending_unexpected_pov() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;
......@@ -1161,7 +1165,7 @@ fn peer_punished_for_sending_pov_out_of_our_view() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_b, pov_hash, pov.clone()),
send_pov_message(hash_b, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;
......@@ -1450,7 +1454,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;
......@@ -1474,7 +1478,7 @@ fn peer_complete_fetch_leads_to_us_completing_others() {
assert_eq!(peers, vec![peer_b.clone()]);
assert_eq!(
message,
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
);
}
);
......@@ -1534,7 +1538,7 @@ fn peer_completing_request_no_longer_awaiting() {
&mut ctx,
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
send_pov_message(hash_a, pov_hash, pov.clone()),
send_pov_message(hash_a, pov_hash, &protocol_v1::CompressedPoV::compress(&pov).unwrap()),
).focus().unwrap(),
).await;
......
......@@ -12,3 +12,7 @@ polkadot-node-jaeger = { path = "../../jaeger" }
parity-scale-codec = { version = "1.3.6", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
strum = { version = "0.20", features = ["derive"] }
thiserror = "1.0.23"
[target.'cfg(not(target_os = "unknown"))'.dependencies]
zstd = "0.5.0"
......@@ -16,7 +16,7 @@
//! Network protocol types for parachains.
#![deny(unused_crate_dependencies, unused_results)]
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
use polkadot_primitives::v1::{Hash, BlockNumber};
......@@ -286,8 +286,8 @@ pub mod v1 {
};
use polkadot_node_primitives::SignedFullStatement;
use parity_scale_codec::{Encode, Decode};
use std::convert::TryFrom;
use super::RequestId;
use std::convert::TryFrom;
/// Network messages used by the availability distribution subsystem
#[derive(Debug, Clone, Encode, Decode, PartialEq)]
......@@ -323,9 +323,9 @@ pub mod v1 {
#[codec(index = "0")]
Awaiting(Hash, Vec<Hash>),
/// Notification of an awaited PoV, in a given relay-parent context.
/// (relay_parent, pov_hash, pov)
/// (relay_parent, pov_hash, compressed_pov)
#[codec(index = "1")]
SendPoV(Hash, Hash, PoV),
SendPoV(Hash, Hash, CompressedPoV),
}
/// Network messages used by the statement distribution subsystem.
......@@ -336,6 +336,67 @@ pub mod v1 {
Statement(Hash, SignedFullStatement)
}
#[derive(Debug, Clone, Copy, PartialEq, thiserror::Error)]
#[allow(missing_docs)]
pub enum CompressedPoVError {
#[error("Failed to compress a PoV")]
Compress,
#[error("Failed to decompress a PoV")]
Decompress,
#[error("Failed to decode the uncompressed PoV")]
Decode,
#[error("Architecture is not supported")]
NotSupported,
}
/// SCALE and Zstd encoded [`PoV`].
#[derive(Debug, Clone, Encode, Decode, PartialEq)]
pub struct CompressedPoV(Vec<u8>);
impl CompressedPoV {
/// Compress the given [`PoV`] and returns a [`CompressedPoV`].
#[cfg(not(target_os = "unknown"))]
pub fn compress(pov: &PoV) -> Result<Self, CompressedPoVError> {
zstd::encode_all(pov.encode().as_slice(), 3).map_err(|_| CompressedPoVError::Compress).map(Self)
}
/// Compress the given [`PoV`] and returns a [`CompressedPoV`].
#[cfg(target_os = "unknown")]
pub fn compress(_: &PoV) -> Result<Self, CompressedPoVError> {
Err(CompressedPoVError::NotSupported)
}
/// Decompress `self` and returns the [`PoV`] on success.
#[cfg(not(target_os = "unknown"))]
pub fn decompress(&self) -> Result<PoV, CompressedPoVError> {
use std::io::Read;
const MAX_POV_BLOCK_SIZE: usize = 32 * 1024 * 1024;
struct InputDecoder<'a, T: std::io::BufRead>(&'a mut zstd::Decoder<T>, usize);
impl<'a, T: std::io::BufRead> parity_scale_codec::Input for InputDecoder<'a, T> {
fn read(&mut self, into: &mut [u8]) -> Result<(), parity_scale_codec::Error> {
self.1 = self.1.saturating_add(into.len());
if self.1 > MAX_POV_BLOCK_SIZE {
return Err("pov block too big".into())
}
self.0.read_exact(into).map_err(Into::into)
}
fn remaining_len(&mut self) -> Result<Option<usize>, parity_scale_codec::Error> {
Ok(None)
}
}
let mut decoder = zstd::Decoder::new(self.0.as_slice()).map_err(|_| CompressedPoVError::Decompress)?;
PoV::decode(&mut InputDecoder(&mut decoder, 0)).map_err(|_| CompressedPoVError::Decode)
}
/// Decompress `self` and returns the [`PoV`] on success.
#[cfg(target_os = "unknown")]
pub fn decompress(&self) -> Result<PoV, CompressedPoVError> {
Err(CompressedPoVError::NotSupported)
}
}
/// Network messages used by the collator protocol subsystem
#[derive(Debug, Clone, Encode, Decode, PartialEq)]
pub enum CollatorProtocolMessage {
......@@ -351,7 +412,7 @@ pub mod v1 {
RequestCollation(RequestId, Hash, ParaId),
/// A requested collation.
#[codec(index = "3")]
Collation(RequestId, CandidateReceipt, PoV),
Collation(RequestId, CandidateReceipt, CompressedPoV),
}
/// All network messages on the validation peer-set.
......@@ -389,3 +450,17 @@ pub mod v1 {
impl_try_from!(CollationProtocol, CollatorProtocol, CollatorProtocolMessage);
}
#[cfg(test)]
mod tests {
use polkadot_primitives::v1::PoV;
use super::v1::{CompressedPoV, CompressedPoVError};
#[test]
fn decompress_huge_pov_block_fails() {
let pov = PoV { block_data: vec![0; 63 * 1024 * 1024].into() };
let compressed = CompressedPoV::compress(&pov).unwrap();
assert_eq!(CompressedPoVError::Decode, compressed.decompress().unwrap_err());
}
}
......@@ -19,6 +19,9 @@ enum ObservedRole {
Full,
Light,
}
/// SCALE and zstd encoded `PoV`.
struct CompressedPoV(Vec<u8>);
```
## V1 Network Subsystem Message Types
......@@ -75,8 +78,8 @@ enum PoVDistributionV1Message {
/// specific relay-parent hash.
Awaiting(Hash, Vec<Hash>),
/// Notification of an awaited PoV, in a given relay-parent context.
/// (relay_parent, pov_hash, pov)
SendPoV(Hash, Hash, PoV),
/// (relay_parent, pov_hash, compressed_pov)
SendPoV(Hash, Hash, CompressedPoV),
}
```
......@@ -101,7 +104,7 @@ enum CollatorProtocolV1Message {
/// Request the advertised collation at that relay-parent.
RequestCollation(RequestId, Hash, ParaId),
/// A requested collation.
Collation(RequestId, CandidateReceipt, PoV),
Collation(RequestId, CandidateReceipt, CompressedPoV),
}
```
......
#!/usr/bin/env bash
set -e
#shellcheck source=lib.sh
source "$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )/lib.sh"
time cargo build --locked --target=wasm32-unknown-unknown --manifest-path runtime/polkadot/Cargo.toml
time cargo build --locked --target=wasm32-unknown-unknown --manifest-path runtime/kusama/Cargo.toml
time cargo build --locked --target=wasm32-unknown-unknown --manifest-path erasure-coding/Cargo.toml
time cargo build --locked --target=wasm32-unknown-unknown --manifest-path parachain/Cargo.toml
time cargo build --locked --target=wasm32-unknown-unknown --manifest-path primitives/Cargo.toml
time cargo build --locked --target=wasm32-unknown-unknown --manifest-path rpc/Cargo.toml
time cargo build --locked --target=wasm32-unknown-unknown --manifest-path statement-table/Cargo.toml
time cargo build --locked --target=wasm32-unknown-unknown --manifest-path cli/Cargo.toml --no-default-features --features browser
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment