Unverified Commit 134090c2 authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Request based collation fetching (#2621)

* Introduce collation fetching protocol

also move to mod.rs

* Allow `PeerId`s in requests to network bridge.

* Fix availability distribution tests.

* Move CompressedPoV to primitives.

* Request based collator protocol: validator side

- Missing: tests
- Collator side
- don't connect, if not connected

* Fixes.

* Basic request based collator side.

* Minor fix on collator side.

* Don't connect in requests in collation protocol.

Also some cleanup.

* Fix PoV distribution

* Bump substrate

* Add back metrics + whitespace fixes.

* Add back missing spans.

* More cleanup.

* Guide update.

* Fix tests

* Handle results in tests.

* Fix weird compilation issue.

* Add missing )

* Get rid of dead code.

* Get rid of redundant import.

* Fix runtime build.

* Cleanup.

* Fix wasm build.

* Format fixes.

Thanks @andronik !
parent 402e1c0a
Pipeline #129224 passed with stages
in 36 minutes and 25 seconds
......@@ -96,6 +96,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "always-assert"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbf688625d06217d5b1bb0ea9d9c44a1635fd0ee3534466388d18203174f4d11"
[[package]]
name = "ansi_term"
version = "0.11.0"
......@@ -5322,10 +5328,10 @@ dependencies = [
name = "polkadot-collator-protocol"
version = "0.1.0"
dependencies = [
"always-assert",
"assert_matches",
"env_logger 0.8.2",
"futures 0.3.12",
"futures-timer 3.0.2",
"log",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
......@@ -5649,8 +5655,6 @@ dependencies = [
"polkadot-primitives",
"sc-network",
"strum",
"thiserror",
"zstd",
]
[[package]]
......@@ -5845,6 +5849,8 @@ dependencies = [
"sp-std",
"sp-trie",
"sp-version",
"thiserror",
"zstd",
]
[[package]]
......
......@@ -23,7 +23,7 @@ use futures::{FutureExt, SinkExt};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
request::{OutgoingRequest, RequestError, Requests},
request::{OutgoingRequest, RequestError, Requests, Recipient},
v1::{AvailabilityFetchingRequest, AvailabilityFetchingResponse},
};
use polkadot_primitives::v1::{
......@@ -31,7 +31,7 @@ use polkadot_primitives::v1::{
SessionIndex,
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage,
AllMessages, AvailabilityStoreMessage, NetworkBridgeMessage, IfDisconnected,
};
use polkadot_subsystem::{SubsystemContext, jaeger};
......@@ -330,12 +330,12 @@ impl RunningTask {
validator: &AuthorityDiscoveryId,
) -> std::result::Result<AvailabilityFetchingResponse, TaskError> {
let (full_request, response_recv) =
OutgoingRequest::new(validator.clone(), self.request);
OutgoingRequest::new(Recipient::Authority(validator.clone()), self.request);
let requests = Requests::AvailabilityFetching(full_request);
self.sender
.send(FromFetchTask::Message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(vec![requests]),
NetworkBridgeMessage::SendRequests(vec![requests], IfDisconnected::TryConnect)
)))
.await
.map_err(|_| TaskError::ShuttingDown)?;
......
......@@ -28,6 +28,7 @@ use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::{BlockData, CandidateHash, PoV, ValidatorIndex};
use polkadot_node_network_protocol::request_response::v1;
use polkadot_node_network_protocol::request_response::Recipient;
use polkadot_subsystem::messages::AllMessages;
use crate::metrics::Metrics;
......@@ -56,7 +57,7 @@ fn task_does_not_accept_invalid_chunk() {
chunk_responses: {
let mut m = HashMap::new();
m.insert(
Sr25519Keyring::Alice.public().into(),
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: vec![1,2,3],
......@@ -88,7 +89,7 @@ fn task_stores_valid_chunk() {
chunk_responses: {
let mut m = HashMap::new();
m.insert(
Sr25519Keyring::Alice.public().into(),
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
......@@ -124,7 +125,7 @@ fn task_does_not_accept_wrongly_indexed_chunk() {
chunk_responses: {
let mut m = HashMap::new();
m.insert(
Sr25519Keyring::Alice.public().into(),
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
......@@ -163,7 +164,7 @@ fn task_stores_valid_chunk_if_there_is_one() {
chunk_responses: {
let mut m = HashMap::new();
m.insert(
Sr25519Keyring::Alice.public().into(),
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
......@@ -172,11 +173,11 @@ fn task_stores_valid_chunk_if_there_is_one() {
)
);
m.insert(
Sr25519Keyring::Bob.public().into(),
Recipient::Authority(Sr25519Keyring::Bob.public().into()),
AvailabilityFetchingResponse::NoSuchChunk
);
m.insert(
Sr25519Keyring::Charlie.public().into(),
Recipient::Authority(Sr25519Keyring::Charlie.public().into()),
AvailabilityFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: vec![1,2,3],
......@@ -199,7 +200,7 @@ fn task_stores_valid_chunk_if_there_is_one() {
struct TestRun {
/// Response to deliver for a given validator index.
/// None means, answer with NetworkError.
chunk_responses: HashMap<AuthorityDiscoveryId, AvailabilityFetchingResponse>,
chunk_responses: HashMap<Recipient, AvailabilityFetchingResponse>,
/// Set of chunks that should be considered valid:
valid_chunks: HashSet<Vec<u8>>,
}
......@@ -240,11 +241,12 @@ impl TestRun {
/// end.
async fn handle_message(&self, msg: AllMessages) -> bool {
match msg {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs)) => {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::TryConnect)) => {
let mut valid_responses = 0;
for req in reqs {
let req = match req {
Requests::AvailabilityFetching(req) => req,
_ => panic!("Unexpected request"),
};
let response = self.chunk_responses.get(&req.peer)
.ok_or(network::RequestFailure::Refused);
......
......@@ -29,6 +29,7 @@ use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_keyring::Sr25519Keyring;
use sp_core::{traits::SpawnNamed, testing::TaskExecutor};
use sc_network as network;
use sc_network::IfDisconnected;
use sc_network::config as netconfig;
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, messages::{AllMessages,
......@@ -201,7 +202,7 @@ impl TestState {
tracing::trace!(target: LOG_TARGET, remaining_stores, "Stores left to go");
let msg = overseer_recv(&mut rx).await;
match msg {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs)) => {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::TryConnect)) => {
for req in reqs {
// Forward requests:
let in_req = to_incoming_req(&executor, req);
......@@ -313,5 +314,6 @@ fn to_incoming_req(
tx
)
}
_ => panic!("Unexpected request!"),
}
}
......@@ -24,7 +24,7 @@ use polkadot_node_network_protocol::{
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber};
use polkadot_subsystem::messages::{AllMessages, NetworkBridgeMessage};
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
use sc_network::Event as NetworkEvent;
use sc_network::{Event as NetworkEvent, IfDisconnected};
use polkadot_node_network_protocol::{request_response::Requests, ObservedRole};
......@@ -45,7 +45,7 @@ pub(crate) enum Action {
SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),
/// Ask network to send requests.
SendRequests(Vec<Requests>),
SendRequests(Vec<Requests>, IfDisconnected),
/// Ask network to connect to validators.
ConnectToValidators {
......@@ -125,7 +125,7 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
Action::SendCollationMessages(vec![(peers, msg)])
}
NetworkBridgeMessage::SendRequests(reqs) => Action::SendRequests(reqs),
NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => Action::SendRequests(reqs, if_disconnected),
NetworkBridgeMessage::SendValidationMessages(msgs) => {
Action::SendValidationMessages(msgs)
}
......
......@@ -235,11 +235,11 @@ where
}
}
Action::SendRequests(reqs) => {
Action::SendRequests(reqs, if_disconnected) => {
for req in reqs {
bridge
.network_service
.start_request(&mut bridge.authority_discovery_service, req)
.start_request(&mut bridge.authority_discovery_service, req, if_disconnected)
.await;
}
},
......@@ -604,7 +604,7 @@ mod tests {
use parking_lot::Mutex;
use assert_matches::assert_matches;
use sc_network::Event as NetworkEvent;
use sc_network::{Event as NetworkEvent, IfDisconnected};
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
use polkadot_subsystem::messages::{
......@@ -681,7 +681,7 @@ mod tests {
Box::pin((&mut self.action_tx).sink_map_err(Into::into))
}
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests) {
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
}
}
......
......@@ -136,6 +136,11 @@ fn multiplex_single(
decode_with_peer::<v1::AvailabilityFetchingRequest>(peer, payload)?,
pending_response,
)),
Protocol::CollationFetching => From::from(IncomingRequest::new(
peer,
decode_with_peer::<v1::CollationFetchingRequest>(peer, payload)?,
pending_response,
)),
};
Ok(r)
}
......
......@@ -29,7 +29,7 @@ use sc_network::{IfDisconnected, NetworkService, OutboundFailure, RequestFailure
use polkadot_node_network_protocol::{
peer_set::PeerSet,
request_response::{OutgoingRequest, Requests},
request_response::{OutgoingRequest, Requests, Recipient},
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_primitives::v1::{Block, Hash};
......@@ -113,6 +113,7 @@ pub trait Network: Send + 'static {
&self,
authority_discovery: &mut AD,
req: Requests,
if_disconnected: IfDisconnected,
);
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
......@@ -202,6 +203,7 @@ impl Network for Arc<NetworkService<Block, Hash>> {
&self,
authority_discovery: &mut AD,
req: Requests,
if_disconnected: IfDisconnected,
) {
let (
protocol,
......@@ -212,14 +214,18 @@ impl Network for Arc<NetworkService<Block, Hash>> {
},
) = req.encode_request();
let peer_id = authority_discovery
.get_addresses_by_authority_id(peer)
let peer_id = match peer {
Recipient::Peer(peer_id) => Some(peer_id),
Recipient::Authority(authority) =>
authority_discovery
.get_addresses_by_authority_id(authority)
.await
.and_then(|addrs| {
addrs
.into_iter()
.find_map(|addr| peer_id_from_multiaddr(&addr))
});
}),
};
let peer_id = match peer_id {
None => {
......@@ -244,7 +250,7 @@ impl Network for Arc<NetworkService<Block, Hash>> {
protocol.into_protocol_name(),
payload,
pending_response,
IfDisconnected::TryConnect,
if_disconnected,
);
}
}
......@@ -14,12 +14,12 @@ polkadot-node-network-protocol = { path = "../../network/protocol" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
always-assert = "0.1.2"
[dev-dependencies]
log = "0.4.13"
env_logger = "0.8.2"
assert_matches = "1.4.0"
futures-timer = "3.0.2"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
......
......@@ -21,7 +21,9 @@ use super::{LOG_TARGET, Result};
use futures::{select, FutureExt, channel::oneshot};
use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, CandidateHash,
CandidateHash, CandidateReceipt, CollatorId, CompressedPoV, CoreIndex,
CoreState, Hash, Id as ParaId,
PoV, ValidatorId
};
use polkadot_subsystem::{
jaeger, PerLeafSpan,
......@@ -29,7 +31,9 @@ use polkadot_subsystem::{
messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage, NetworkBridgeEvent},
};
use polkadot_node_network_protocol::{
peer_set::PeerSet, v1 as protocol_v1, View, PeerId, RequestId, OurView,
OurView, PeerId, View, peer_set::PeerSet,
request_response::{IncomingRequest, v1::{CollationFetchingRequest, CollationFetchingResponse}},
v1 as protocol_v1
};
use polkadot_node_subsystem_util::{
validator_discovery,
......@@ -562,25 +566,61 @@ async fn process_msg(
);
}
},
CollationFetchingRequest(incoming) => {
let _span = state.span_per_relay_parent.get(&incoming.payload.relay_parent).map(|s| s.child("request-collation"));
match state.collating_on {
Some(our_para_id) => {
if our_para_id == incoming.payload.para_id {
let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&incoming.payload.relay_parent) {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = %incoming.payload.relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);
return Ok(());
};
let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(state, incoming, receipt, pov).await;
} else {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %incoming.payload.para_id,
our_para_id = %our_para_id,
"received a `CollationFetchingRequest` for unexpected para_id",
);
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %incoming.payload.para_id,
"received a `RequestCollation` while not collating on any para",
);
}
}
}
}
Ok(())
}
/// Issue a response to a previously requested collation.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(state, pov), fields(subsystem = LOG_TARGET))]
async fn send_collation(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
request_id: RequestId,
origin: PeerId,
request: IncomingRequest<CollationFetchingRequest>,
receipt: CandidateReceipt,
pov: PoV,
) {
let pov = match protocol_v1::CompressedPoV::compress(&pov) {
let pov = match CompressedPoV::compress(&pov) {
Ok(pov) => pov,
Err(error) => {
tracing::debug!(
tracing::error!(
target: LOG_TARGET,
error = ?error,
"Failed to create `CompressedPov`",
......@@ -589,22 +629,18 @@ async fn send_collation(
}
};
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(request_id, receipt, pov);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![origin],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
if let Err(_) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) {
tracing::warn!(
target: LOG_TARGET,
"Sending collation response failed",
);
}
state.metrics.on_collation_sent();
}
/// A networking messages switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_peer_message(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
......@@ -624,50 +660,6 @@ async fn handle_incoming_peer_message(
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
}
RequestCollation(request_id, relay_parent, para_id) => {
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("request-collation"));
match state.collating_on {
Some(our_para_id) => {
if our_para_id == para_id {
let (receipt, pov) = if let Some(collation) = state.collations.get_mut(&relay_parent) {
collation.status.advance_to_requested();
(collation.receipt.clone(), collation.pov.clone())
} else {
tracing::warn!(
target: LOG_TARGET,
relay_parent = %relay_parent,
"received a `RequestCollation` for a relay parent we don't have collation stored.",
);
return Ok(());
};
let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(ctx, state, request_id, origin, receipt, pov).await;
} else {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %para_id,
our_para_id = %our_para_id,
"received a `RequestCollation` for unexpected para_id",
);
}
}
None => {
tracing::warn!(
target: LOG_TARGET,
for_para_id = %para_id,
"received a `RequestCollation` while not collating on any para",
);
}
}
}
Collation(_, _, _) => {
tracing::warn!(
target: LOG_TARGET,
"Collation message is not expected on the collator side of the protocol",
);
}
CollationSeconded(statement) => {
if !matches!(statement.payload(), Statement::Seconded(_)) {
tracing::warn!(
......@@ -759,7 +751,7 @@ async fn handle_network_msg(
handle_our_view_change(state, view).await?;
}
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, state, remote, msg).await?;
handle_incoming_peer_message(state, remote, msg).await?;
}
}
......@@ -861,7 +853,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::{executor, future, Future, channel::mpsc};
use sp_core::crypto::Pair;
use sp_core::{crypto::Pair, Decode};
use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::{
......@@ -872,7 +864,11 @@ mod tests {
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::{view, our_view};
use polkadot_node_network_protocol::{
our_view,
view,
request_response::request::IncomingRequest,
};
#[derive(Default)]
struct TestCandidateBuilder {
......@@ -1380,43 +1376,35 @@ mod tests {
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
let request_id = 42;
// Request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
protocol_v1::CollatorProtocolMessage::RequestCollation(
request_id,
test_state.relay_parent,
test_state.para_id,
)
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Wait for the reply.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
) => {
assert_eq!(to, vec![peer]);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Collation(req_id, receipt, pov) => {
assert_eq!(req_id, request_id);
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov.decompress().unwrap(), pov_block);
}
);
}
);
let old_relay_parent = test_state.relay_parent;
test_state.advance_to_new_round(&mut virtual_overseer, false).await;
......@@ -1424,19 +1412,25 @@ mod tests {
let peer = test_state.validator_peer_id[2].clone();
// Re-request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer.clone(),
protocol_v1::CollatorProtocolMessage::RequestCollation(
43,
old_relay_parent,
test_state.para_id,
)
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: old_relay_parent,