Unverified Commit 83c4c905 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Port availability recovery to use req/res (#2694)



* add AvailableDataFetchingRequest

* rename AvailabilityFetchingRequest to ChunkFetchingRequest

* rename AvailabilityFetchingResponse to Chunk_

* add AvailableDataFetching request

* add available data fetching request to availability recovery message

* remove availability recovery message

* fix

* update network bridge

* port availability recovery to request/response

* use validators.len(), not shuffling

* fix availability recovery tests

* update guide

* Update node/network/availability-recovery/src/lib.rs

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* Update node/network/availability-recovery/src/lib.rs

Co-authored-by: Arkadiy Paronyan's avatarArkadiy Paronyan <arkady.paronyan@gmail.com>

* remove println

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>
Co-authored-by: Arkadiy Paronyan's avatarArkadiy Paronyan <arkady.paronyan@gmail.com>
parent b1af78f0
Pipeline #130579 canceled with stages
in 11 minutes and 39 seconds
......@@ -5407,6 +5407,7 @@ dependencies = [
"futures-timer 3.0.2",
"log",
"lru",
"parity-scale-codec",
"polkadot-erasure-coding",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
......@@ -5414,11 +5415,11 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-primitives",
"rand 0.8.3",
"sc-network",
"smallvec 1.6.1",
"sp-application-crypto",
"sp-core",
"sp-keyring",
"streamunordered",
"thiserror",
"tracing",
]
......
......@@ -121,7 +121,7 @@ impl AvailabilityDistributionSubsystem {
return Ok(());
}
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::AvailabilityFetchingRequest(req),
msg: AvailabilityDistributionMessage::ChunkFetchingRequest(req),
} => {
answer_request_log(&mut ctx, req, &self.metrics).await
}
......
......@@ -24,7 +24,7 @@ use futures::{FutureExt, SinkExt};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
request::{OutgoingRequest, RequestError, Requests, Recipient},
v1::{AvailabilityFetchingRequest, AvailabilityFetchingResponse},
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlakeTwo256, ErasureChunk, GroupIndex, Hash, HashT, OccupiedCore,
......@@ -106,7 +106,7 @@ struct RunningTask {
group: Vec<AuthorityDiscoveryId>,
/// The request to send.
request: AvailabilityFetchingRequest,
request: ChunkFetchingRequest,
/// Root hash, for verifying the chunks validity.
erasure_root: Hash,
......@@ -154,7 +154,7 @@ impl FetchTaskConfig {
group: session_info.validator_groups.get(core.group_responsible.0 as usize)
.expect("The responsible group of a candidate should be available in the corresponding session. qed.")
.clone(),
request: AvailabilityFetchingRequest {
request: ChunkFetchingRequest {
candidate_hash: core.candidate_hash,
index: session_info.our_index,
},
......@@ -292,10 +292,10 @@ impl RunningTask {
}
};
let chunk = match resp {
AvailabilityFetchingResponse::Chunk(resp) => {
ChunkFetchingResponse::Chunk(resp) => {
resp.recombine_into_chunk(&self.request)
}
AvailabilityFetchingResponse::NoSuchChunk => {
ChunkFetchingResponse::NoSuchChunk => {
tracing::debug!(
target: LOG_TARGET,
validator = ?validator,
......@@ -327,10 +327,10 @@ impl RunningTask {
async fn do_request(
&mut self,
validator: &AuthorityDiscoveryId,
) -> std::result::Result<AvailabilityFetchingResponse, TaskError> {
) -> std::result::Result<ChunkFetchingResponse, TaskError> {
let (full_request, response_recv) =
OutgoingRequest::new(Recipient::Authority(validator.clone()), self.request);
let requests = Requests::AvailabilityFetching(full_request);
let requests = Requests::ChunkFetching(full_request);
self.sender
.send(FromFetchTask::Message(AllMessages::NetworkBridge(
......
......@@ -58,7 +58,7 @@ fn task_does_not_accept_invalid_chunk() {
let mut m = HashMap::new();
m.insert(
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
AvailabilityFetchingResponse::Chunk(
ChunkFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: vec![1,2,3],
proof: vec![vec![9,8,2], vec![2,3,4]],
......@@ -90,7 +90,7 @@ fn task_stores_valid_chunk() {
let mut m = HashMap::new();
m.insert(
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
AvailabilityFetchingResponse::Chunk(
ChunkFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
......@@ -126,7 +126,7 @@ fn task_does_not_accept_wrongly_indexed_chunk() {
let mut m = HashMap::new();
m.insert(
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
AvailabilityFetchingResponse::Chunk(
ChunkFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
......@@ -165,7 +165,7 @@ fn task_stores_valid_chunk_if_there_is_one() {
let mut m = HashMap::new();
m.insert(
Recipient::Authority(Sr25519Keyring::Alice.public().into()),
AvailabilityFetchingResponse::Chunk(
ChunkFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: chunk.chunk.clone(),
proof: chunk.proof,
......@@ -174,11 +174,11 @@ fn task_stores_valid_chunk_if_there_is_one() {
);
m.insert(
Recipient::Authority(Sr25519Keyring::Bob.public().into()),
AvailabilityFetchingResponse::NoSuchChunk
ChunkFetchingResponse::NoSuchChunk
);
m.insert(
Recipient::Authority(Sr25519Keyring::Charlie.public().into()),
AvailabilityFetchingResponse::Chunk(
ChunkFetchingResponse::Chunk(
v1::ChunkResponse {
chunk: vec![1,2,3],
proof: vec![vec![9,8,2], vec![2,3,4]],
......@@ -200,7 +200,7 @@ fn task_stores_valid_chunk_if_there_is_one() {
struct TestRun {
/// Response to deliver for a given validator index.
/// None means, answer with NetworkError.
chunk_responses: HashMap<Recipient, AvailabilityFetchingResponse>,
chunk_responses: HashMap<Recipient, ChunkFetchingResponse>,
/// Set of chunks that should be considered valid:
valid_chunks: HashSet<Vec<u8>>,
}
......@@ -227,7 +227,7 @@ impl TestRun {
);
match msg {
FromFetchTask::Concluded(_) => break,
FromFetchTask::Message(msg) =>
FromFetchTask::Message(msg) =>
end_ok = self.handle_message(msg).await,
}
}
......@@ -245,13 +245,13 @@ impl TestRun {
let mut valid_responses = 0;
for req in reqs {
let req = match req {
Requests::AvailabilityFetching(req) => req,
Requests::ChunkFetching(req) => req,
_ => panic!("Unexpected request"),
};
let response = self.chunk_responses.get(&req.peer)
.ok_or(network::RequestFailure::Refused);
if let Ok(AvailabilityFetchingResponse::Chunk(resp)) = &response {
if let Ok(ChunkFetchingResponse::Chunk(resp)) = &response {
if self.valid_chunks.contains(&resp.chunk) {
valid_responses += 1;
}
......@@ -285,7 +285,7 @@ fn get_test_running_task() -> (RunningTask, mpsc::Receiver<FromFetchTask>) {
session_index: 0,
group_index: GroupIndex(0),
group: Vec::new(),
request: AvailabilityFetchingRequest {
request: ChunkFetchingRequest {
candidate_hash: CandidateHash([43u8;32].into()),
index: ValidatorIndex(0),
},
......
......@@ -33,7 +33,7 @@ use crate::{LOG_TARGET, metrics::{Metrics, SUCCEEDED, FAILED, NOT_FOUND}};
/// Any errors of `answer_request` will simply be logged.
pub async fn answer_request_log<Context>(
ctx: &mut Context,
req: IncomingRequest<v1::AvailabilityFetchingRequest>,
req: IncomingRequest<v1::ChunkFetchingRequest>,
metrics: &Metrics,
) -> ()
where
......@@ -59,7 +59,7 @@ where
/// Returns: Ok(true) if chunk was found and served.
pub async fn answer_request<Context>(
ctx: &mut Context,
req: IncomingRequest<v1::AvailabilityFetchingRequest>,
req: IncomingRequest<v1::ChunkFetchingRequest>,
) -> Result<bool>
where
Context: SubsystemContext,
......@@ -84,8 +84,8 @@ where
);
let response = match chunk {
None => v1::AvailabilityFetchingResponse::NoSuchChunk,
Some(chunk) => v1::AvailabilityFetchingResponse::Chunk(chunk.into()),
None => v1::ChunkFetchingResponse::NoSuchChunk,
Some(chunk) => v1::ChunkFetchingResponse::Chunk(chunk.into()),
};
req.send_response(response).map_err(|_| Error::SendResponse)?;
......
......@@ -96,7 +96,7 @@ impl Default for TestState {
let mut cores = HashMap::new();
let mut chunks = HashMap::new();
cores.insert(relay_chain[0],
cores.insert(relay_chain[0],
vec![
CoreState::Scheduled(ScheduledCore {
para_id: chain_ids[0],
......@@ -148,7 +148,7 @@ impl Default for TestState {
}
impl TestState {
/// Run, but fail after some timeout.
pub async fn run(self, harness: TestHarness) {
// Make sure test won't run forever.
......@@ -178,7 +178,7 @@ impl TestState {
//
// Test will fail if this does not happen until timeout.
let mut remaining_stores = self.valid_chunks.len();
let TestSubsystemContextHandle { tx, mut rx } = virtual_overseer;
// Spawning necessary as incoming queue can only hold a single item, we don't want to dead
......@@ -210,7 +210,7 @@ impl TestState {
executor.spawn("Request forwarding",
overseer_send(
tx.clone(),
AvailabilityDistributionMessage::AvailabilityFetchingRequest(in_req)
AvailabilityDistributionMessage::ChunkFetchingRequest(in_req)
).boxed()
);
}
......@@ -294,9 +294,9 @@ async fn overseer_recv(
fn to_incoming_req(
executor: &TaskExecutor,
outgoing: Requests
) -> IncomingRequest<v1::AvailabilityFetchingRequest> {
) -> IncomingRequest<v1::ChunkFetchingRequest> {
match outgoing {
Requests::AvailabilityFetching(OutgoingRequest { payload, pending_response, .. }) => {
Requests::ChunkFetching(OutgoingRequest { payload, pending_response, .. }) => {
let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>)
= oneshot::channel();
executor.spawn("Message forwarding", async {
......
......@@ -16,8 +16,7 @@ polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
futures-timer = "3.0.2"
streamunordered = "0.5.1"
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
[dev-dependencies]
assert_matches = "1.4.0"
......@@ -29,5 +28,6 @@ smallvec = "1.5.1"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" }
......@@ -25,9 +25,6 @@ pub enum Error {
#[error(transparent)]
Subsystem(#[from] polkadot_subsystem::SubsystemError),
#[error("failed to query a chunk from store")]
CanceledQueryChunk(#[source] oneshot::Canceled),
#[error("failed to query full data from store")]
CanceledQueryFullData(#[source] oneshot::Canceled),
......
......@@ -22,6 +22,8 @@ use futures_timer::Delay;
use assert_matches::assert_matches;
use smallvec::smallvec;
use parity_scale_codec::Encode;
use super::*;
use polkadot_primitives::v1::{
......@@ -30,7 +32,7 @@ use polkadot_primitives::v1::{
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent}, jaeger};
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>;
......@@ -139,11 +141,10 @@ async fn overseer_recv(
use sp_keyring::Sr25519Keyring;
#[derive(Debug, Clone)]
enum HasAvailableData {
enum Has {
No,
Yes,
Timeout,
Other(AvailableData),
}
#[derive(Clone)]
......@@ -151,12 +152,10 @@ struct TestState {
validators: Vec<Sr25519Keyring>,
validator_public: Vec<ValidatorId>,
validator_authority_id: Vec<AuthorityDiscoveryId>,
validator_peer_id: Vec<PeerId>,
current: Hash,
candidate: CandidateReceipt,
session_index: SessionIndex,
persisted_validation_data: PersistedValidationData,
available_data: AvailableData,
......@@ -164,6 +163,26 @@ struct TestState {
}
impl TestState {
fn threshold(&self) -> usize {
recovery_threshold(self.validators.len()).unwrap()
}
fn impossibility_threshold(&self) -> usize {
self.validators.len() - self.threshold() + 1
}
fn all_have(&self) -> Vec<Has> {
(0..self.validators.len()).map(|_| Has::Yes).collect()
}
fn all_dont_have(&self) -> Vec<Has> {
(0..self.validators.len()).map(|_| Has::Yes).collect()
}
fn all_timeout(&self) -> Vec<Has> {
(0..self.validators.len()).map(|_| Has::Timeout).collect()
}
async fn test_runtime_api(
&self,
virtual_overseer: &mut VirtualOverseer,
......@@ -191,238 +210,104 @@ impl TestState {
);
}
async fn test_connect_to_all_validators(
&self,
virtual_overseer: &mut VirtualOverseer,
) {
self.test_connect_to_validators(virtual_overseer, self.validator_public.len()).await;
}
async fn test_connect_to_validators(
&self,
virtual_overseer: &mut VirtualOverseer,
n: usize,
) {
// Channels by AuthorityDiscoveryId to send results to.
// Gather them here and send in batch after the loop not to race.
let mut results = HashMap::new();
for _ in 0..n {
// Connect to shuffled validators one by one.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
connected,
..
}
) => {
for validator_id in validator_ids {
let idx = self.validator_authority_id
.iter()
.position(|x| *x == validator_id)
.unwrap();
results.insert(
(
self.validator_authority_id[idx].clone(),
self.validator_peer_id[idx].clone(),
),
connected.clone(),
);
}
}
);
}
for (k, mut v) in results.into_iter() {
v.send(k).await.unwrap();
}
}
async fn test_chunk_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
n: usize,
who_has: &[Has],
) {
for _ in 0..self.validator_public.len() {
// arbitrary order.
for _ in 0..n {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
_peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
NetworkBridgeMessage::SendRequests(
mut requests,
IfDisconnected::TryConnect,
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
candidate_hash_recvd,
validator_index,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
(request_id, validator_index)
}
);
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index.0 as usize].clone(),
protocol_v1::AvailabilityRecoveryMessage::Chunk(
request_id,
Some(self.chunks[validator_index.0 as usize].clone()),
)
)
)
).await;
}
);
}
}
assert_eq!(requests.len(), 1);
assert_matches!(
requests.pop().unwrap(),
Requests::ChunkFetching(req) => {
assert_eq!(req.payload.candidate_hash, candidate_hash);
let validator_index = req.payload.index.0 as usize;
let available_data = match who_has[validator_index] {
Has::No => Ok(None),
Has::Yes => Ok(Some(self.chunks[validator_index].clone().into())),
Has::Timeout => {
Err(sc_network::RequestFailure::Network(
sc_network::OutboundFailure::Timeout
))
}
};
async fn test_faulty_chunk_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
faulty: &[bool],
) {
for _ in 0..self.validator_public.len() {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
_peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
request_id,
candidate_hash_recvd,
validator_index,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
(request_id, validator_index)
}
);
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index.0 as usize].clone(),
protocol_v1::AvailabilityRecoveryMessage::Chunk(
request_id,
Some(self.chunks[validator_index.0 as usize].clone()),
let _ = req.pending_response.send(
available_data.map(|r|
req_res::v1::ChunkFetchingResponse::from(r).encode()
)
)
)
).await;
);
}
)
}
);
}
for i in 0..self.validator_public.len() {
if faulty[i] {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(
peer,
rep,
)
) => {
assert_eq!(rep, COST_MERKLE_PROOF_INVALID);
// These may arrive in any order since the interaction implementation
// uses `FuturesUnordered`.
assert!(self.validator_peer_id.iter().find(|p| **p == peer).is_some());
}
);
}
}
}
async fn test_full_data_requests(
&self,
candidate_hash: CandidateHash,
virtual_overseer: &mut VirtualOverseer,
who_has: &[HasAvailableData],
who_has: &[Has],
) {
for _ in 0..self.validator_public.len() {
self.test_connect_to_validators(virtual_overseer, 1).await;
for _ in 0..self.validators.len() {
// Receive a request for a chunk.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
peers,
protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
NetworkBridgeMessage::SendRequests(
mut requests,
IfDisconnected::TryConnect,
)
) => {
let (request_id, validator_index) = assert_matches!(
wire_message,
protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
request_id,
candidate_hash_recvd,
) => {
assert_eq!(candidate_hash_recvd, candidate_hash);
assert_eq!(peers.len(), 1);
let validator_index = self.validator_peer_id.iter().position(|p| p == &peers[0]).unwrap();
(request_id, validator_index)
}
);
let available_data = match who_has[validator_index] {
HasAvailableData::No => Some(None),
HasAvailableData::Yes => Some(Some(self.available_data.clone())),
HasAvailableData::Timeout => None,
HasAvailableData::Other(ref other) => Some(Some(other.clone())),
};
if let Some(maybe_data) = available_data {
overseer_send(
virtual_overseer,
AvailabilityRecoveryMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
self.validator_peer_id[validator_index].clone(),
protocol_v1::AvailabilityRecoveryMessage::FullData(