Unverified Commit 31acae0e authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

More tests for new request based statement distribution (#2875)

* More test coverage.

* Preserve peer order.

* Better test coverage.

* Even more test coverage.

* Add doc comment to `IndexMap`.

* Fix flaky test.

* Review remarks.

* Review remarks.
parent 76bfd9fa
Pipeline #134141 failed with stages
in 17 minutes and 1 second
......@@ -6507,6 +6507,7 @@ dependencies = [
"arrayvec 0.5.2",
"assert_matches",
"futures 0.3.13",
"futures-timer 3.0.2",
"indexmap",
"parity-scale-codec",
"polkadot-node-network-protocol",
......@@ -6522,6 +6523,7 @@ dependencies = [
"sp-keyring",
"sp-keystore",
"sp-staking",
"sp-tracing",
"tracing",
]
......
......@@ -26,4 +26,6 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
......@@ -53,7 +53,7 @@ use polkadot_node_network_protocol::{
use futures::{channel::mpsc, future::RemoteHandle, prelude::*};
use futures::channel::oneshot;
use indexmap::IndexSet;
use indexmap::{IndexSet, IndexMap, map::Entry as IEntry};
use std::collections::{HashMap, HashSet, hash_map::Entry};
......@@ -508,7 +508,10 @@ enum LargeStatementStatus {
struct FetchingInfo {
/// All peers that send us a `LargeStatement` or a `Valid` statement for the given
/// `CandidateHash`, together with their originally sent messages.
available_peers: HashMap<PeerId, Vec<protocol_v1::StatementDistributionMessage>>,
///
/// We use an `IndexMap` here to preserve the ordering of peers sending us messages. This is
/// desirable because we reward first sending peers with reputation.
available_peers: IndexMap<PeerId, Vec<protocol_v1::StatementDistributionMessage>>,
/// Peers left to try in case the background task needs it.
peers_to_try: Vec<PeerId>,
/// Sender for sending fresh peers to the fetching task in case of failure.
......@@ -1057,22 +1060,24 @@ async fn retrieve_statement_from_message<'a>(
match occupied.get_mut() {
LargeStatementStatus::Fetching(info) => {
let is_new_peer = !info.available_peers.contains_key(&peer);
let is_large_statement = message.is_large_statement();
match info.available_peers.entry(peer) {
Entry::Occupied(mut occupied) => {
occupied.get_mut().push(message);
}
Entry::Vacant(vacant) => {
vacant.insert(vec![message]);
}
}
let is_new_peer =
match info.available_peers.entry(peer) {
IEntry::Occupied(mut occupied) => {
occupied.get_mut().push(message);
false
}
IEntry::Vacant(vacant) => {
vacant.insert(vec![message]);
true
}
};
if is_new_peer & is_large_statement {
info.peers_to_try.push(peer);
// Answer any pending request for more peers:
if let Some(sender) = std::mem::take(&mut info.peer_sender) {
if let Some(sender) = info.peer_sender.take() {
let to_send = std::mem::take(&mut info.peers_to_try);
if let Err(peers) = sender.send(to_send) {
// Requester no longer interested for now, might want them
......@@ -1181,7 +1186,7 @@ async fn launch_request(
return None
}
let available_peers = {
let mut m = HashMap::new();
let mut m = IndexMap::new();
m.insert(peer, vec![protocol_v1::StatementDistributionMessage::LargeStatement(meta)]);
m
};
......@@ -1950,6 +1955,7 @@ impl metrics::Metrics for Metrics {
#[cfg(test)]
mod tests {
use std::time::Duration;
use parity_scale_codec::{Decode, Encode};
use super::*;
use std::sync::Arc;
......@@ -1959,9 +1965,10 @@ mod tests {
use polkadot_primitives::v1::{CommittedCandidateReceipt, ValidationCode};
use assert_matches::assert_matches;
use futures::executor::{self, block_on};
use futures_timer::Delay;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
use sc_keystore::LocalKeystore;
use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_node_network_protocol::{view, ObservedRole, request_response::Recipient};
use polkadot_subsystem::{jaeger, ActivatedLeaf};
use polkadot_node_network_protocol::request_response::{
Requests,
......@@ -2699,6 +2706,7 @@ mod tests {
#[test]
fn receiving_large_statement_from_one_sends_to_another_and_to_candidate_backing() {
sp_tracing::try_init_simple();
let hash_a = Hash::repeat_byte(1);
let hash_b = Hash::repeat_byte(2);
......@@ -2712,6 +2720,8 @@ mod tests {
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();
let peer_bad = PeerId::random();
let validators = vec![
Sr25519Keyring::Alice.public().into(),
......@@ -2780,6 +2790,16 @@ mod tests {
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
......@@ -2792,8 +2812,19 @@ mod tests {
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a])
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_c.clone(), view![hash_a])
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_bad.clone(), view![hash_a])
)
}).await;
// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
// receive a seconded statement from peer A, which does not provide the request data,
// then get that data from peer C. It should be propagated onwards to peer B and to
// candidate backing.
let statement = {
let signing_context = SigningContext {
......@@ -2815,7 +2846,7 @@ mod tests {
).await.ok().flatten().expect("should be signed")
};
let metadata =
let metadata =
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()).get_metadata();
handle.send(FromOverseer::Communication {
......@@ -2842,6 +2873,150 @@ mod tests {
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
// Just drop request - should trigger error.
}
);
// There is a race between request handler asking for more peers and processing of the
// coming `PeerMessage`s, we want the request handler to ask first here for better test
// coverage:
Delay::new(Duration::from_millis(20)).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_c.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
)
)
}).await;
// Malicious peer:
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_bad.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
)
)
}).await;
// Let c fail once too:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
}
);
// a fails again:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
}
);
// Send invalid response (all other peers have been tried now):
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_bad));
let bad_candidate = {
let mut bad = candidate.clone();
bad.descriptor.para_id = 0xeadbeaf.into();
bad
};
let response = StatementFetchingResponse::Statement(bad_candidate);
outgoing.pending_response.send(Ok(response.encode())).unwrap();
}
);
// Should get punished and never tried again:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_bad && r == COST_WRONG_HASH => {}
);
// a is tried again (retried in reverse order):
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
}
);
// c succeeds now:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
// On retry, we should have reverse order:
assert_eq!(outgoing.peer, Recipient::Peer(peer_c));
let response = StatementFetchingResponse::Statement(candidate.clone());
outgoing.pending_response.send(Ok(response.encode())).unwrap();
}
......@@ -2851,7 +3026,14 @@ mod tests {
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_RESPONSE => {}
) if p == peer_a && r == COST_FETCH_FAIL => {}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_c && r == BENEFIT_VALID_RESPONSE => {}
);
assert_matches!(
......@@ -2869,17 +3051,18 @@ mod tests {
);
// Now messages should go out:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
recipients,
mut recipients,
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::LargeStatement(meta)
),
)
) => {
assert_eq!(recipients, vec![peer_b.clone()]);
assert_eq!(recipients.sort(), vec![peer_b.clone(), peer_c.clone()].sort());
assert_eq!(meta.relay_parent, hash_a);
assert_eq!(meta.candidate_hash, statement.payload().candidate_hash());
assert_eq!(meta.signed_by, statement.validator_index());
......@@ -2889,7 +3072,7 @@ mod tests {
// Now that it has the candidate it should answer requests accordingly (even after a
// failed request):
// Failing request first:
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
......@@ -2906,7 +3089,7 @@ mod tests {
response_rx.await.unwrap().result,
Err(()) => {}
);
// Now the working one:
let (pending_response, response_rx) = oneshot::channel();
let inner_req = StatementFetchingRequest {
......
......@@ -178,6 +178,11 @@ pub async fn fetch(
// All our peers failed us - try getting new ones before trying again:
match try_get_new_peers(relay_parent, candidate_hash, &mut sender, &span).await {
Ok(Some(mut peers)) => {
tracing::trace!(
target: LOG_TARGET,
?peers,
"Received new peers."
);
// New arrivals will be tried first:
new_peers.append(&mut peers);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment