Unverified Commit 1d478329 authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

DoS protection on the collator protocol (#3446)



* Move on to next validator after timeout.

* Better naming.

* Wrong implementation of validator fetch timeouts.

* Validator side: Move on to next collator

if download takes too long.

* Drop multiple requests from same validator.

* Add test that next response is sent after timeout.

* Multiple requests by same validator should get dropped.

* Test that another collator is tried

after exclusive download time.

* Add dep.

* Cleanup.

* Merge fix.

* Review remarks.

* Fixes.

* Add log targets to trace logs
Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 567cfb99
Pipeline #146867 passed with stages
in 47 minutes
......@@ -5924,6 +5924,7 @@ dependencies = [
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-network",
"sp-core",
"sp-keyring",
"sp-keystore",
......
......@@ -28,5 +28,6 @@ assert_matches = "1.4.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" }
......@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin};
use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin, time::Duration};
use futures::{FutureExt, StreamExt, channel::oneshot, stream::FuturesUnordered, select, Future};
use sp_core::Pair;
......@@ -38,6 +38,7 @@ use polkadot_node_network_protocol::{
v1 as protocol_v1,
};
use polkadot_node_subsystem_util::{
TimeoutExt,
metrics::{self, prometheus},
runtime::{RuntimeInfo, get_availability_cores, get_group_rotation_info}
};
......@@ -50,6 +51,19 @@ use super::{LOG_TARGET, Result};
mod tests;
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
const COST_APPARENT_FLOOD: Rep = Rep::CostMinor("Message received when previous one was still being processed");
/// Time after starting an upload to a validator we will start another one to the next validator,
/// even if the upload was not finished yet.
///
/// This is to protect from a single slow validator preventing collations from happening.
///
/// With a collation size of 5Meg and bandwidth of 500Mbit/s (requirement for Kusama validators),
/// the transfer should be possible within 0.1 seconds. 400 milliseconds should therefore be
/// plenty and should be low enough for later validators to still be able to finish on time.
///
/// There is debug logging output, so we can adjust this value based on production results.
const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(400);
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
......@@ -208,9 +222,14 @@ struct WaitingCollationFetches {
collation_fetch_active: bool,
/// The collation fetches waiting to be fulfilled.
waiting: VecDeque<IncomingRequest<CollationFetchingRequest>>,
/// All peers that are waiting or actively uploading.
///
/// We will not accept multiple requests from the same peer, otherwise our DoS protection of
/// moving on to the next peer after `MAX_UNSHARED_UPLOAD_TIME` would be pointless.
waiting_peers: HashSet<PeerId>,
}
type ActiveCollationFetches = FuturesUnordered<Pin<Box<dyn Future<Output = Hash> + Send + 'static>>>;
type ActiveCollationFetches = FuturesUnordered<Pin<Box<dyn Future<Output = (Hash, PeerId)> + Send + 'static>>>;
struct State {
/// Our network peer id.
......@@ -684,6 +703,17 @@ where
let waiting = state.waiting_collation_fetches.entry(incoming.payload.relay_parent).or_default();
if !waiting.waiting_peers.insert(incoming.peer) {
tracing::debug!(
target: LOG_TARGET,
"Dropping incoming request as peer has a request in flight already."
);
ctx.send_message(
NetworkBridgeMessage::ReportPeer(incoming.peer, COST_APPARENT_FLOOD)
).await;
return Ok(())
}
if waiting.collation_fetch_active {
waiting.waiting.push_back(incoming);
} else {
......@@ -724,6 +754,7 @@ async fn send_collation(
let (tx, rx) = oneshot::channel();
let relay_parent = request.payload.relay_parent;
let peer_id = request.peer;
let response = OutgoingResponse {
result: Ok(CollationFetchingResponse::Collation(receipt, pov)),
......@@ -739,8 +770,16 @@ async fn send_collation(
}
state.active_collation_fetches.push(async move {
let _ = rx.await;
relay_parent
let r = rx.timeout(MAX_UNSHARED_UPLOAD_TIME).await;
if r.is_none() {
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
?peer_id,
"Sending collation to validator timed out, carrying on with next validator."
);
}
(relay_parent, peer_id)
}.boxed());
state.metrics.on_collation_sent();
......@@ -986,8 +1025,9 @@ where
FromOverseer::Signal(BlockFinalized(..)) => {}
FromOverseer::Signal(Conclude) => return Ok(()),
},
relay_parent = state.active_collation_fetches.select_next_some() => {
(relay_parent, peer_id) = state.active_collation_fetches.select_next_some() => {
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
waiting.waiting_peers.remove(&peer_id);
if let Some(next) = waiting.waiting.pop_front() {
next
} else {
......
......@@ -560,6 +560,34 @@ fn advertise_and_send_collation() {
)
)
).await;
// Second request by same validator should get dropped and peer reported:
{
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
peer,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(bad_peer, _)) => {
assert_eq!(bad_peer, peer);
}
);
assert_matches!(
rx.await,
Err(_),
"Multiple concurrent requests by the same validator should get dropped."
);
}
assert_matches!(
rx.await,
......@@ -620,124 +648,30 @@ fn advertise_and_send_collation() {
#[test]
fn send_only_one_collation_per_relay_parent_at_a_time() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
setup_system(&mut virtual_overseer, &test_state).await;
let DistributeCollation { candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state, true).await;
test_validator_send_sequence(|mut second_response_receiver, feedback_first_tx| async move {
Delay::new(Duration::from_millis(100)).await;
assert!(
second_response_receiver.try_recv().unwrap().is_none(),
"We should not have send the collation yet to the second validator",
);
for (val, peer) in test_state.current_group_validator_authority_ids()
.into_iter()
.zip(test_state.current_group_validator_peer_ids())
{
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
// Signal that the collation fetch is finished
feedback_first_tx.send(()).expect("Sending collation fetch finished");
second_response_receiver
}
);
}
// We declare to the connected validators that we are a collator.
// We need to catch all `Declare` messages to the validators we've
// previosly connected to.
for peer_id in test_state.current_group_validator_peer_ids() {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
#[test]
fn send_next_collation_after_max_unshared_upload_time() {
test_validator_send_sequence(|second_response_receiver, _| async move {
Delay::new(MAX_UNSHARED_UPLOAD_TIME + Duration::from_millis(50)).await;
second_response_receiver
}
let validator_0 = test_state.current_group_validator_peer_ids()[0].clone();
let validator_1 = test_state.current_group_validator_peer_ids()[1].clone();
// Send info about peer's view.
send_peer_view_change(&mut virtual_overseer, &validator_0, vec![test_state.relay_parent]).await;
send_peer_view_change(&mut virtual_overseer, &validator_1, vec![test_state.relay_parent]).await;
// The peer is interested in a leaf that we have a collation for;
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &validator_0, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &validator_1, test_state.relay_parent).await;
// Request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
validator_0,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Keep the feedback channel alive because we need to use it to inform about the finished transfer.
let feedback_tx = assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
full_response.sent_feedback.expect("Feedback channel is always set")
}
);
// Let the second validator request the collation.
let (tx, mut rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
validator_1,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
Delay::new(Duration::from_millis(500)).await;
assert!(
rx.try_recv().unwrap().is_none(),
"We should not have send the collation yet to the second validator",
);
// Signal that the collation fetch is finished
feedback_tx.send(()).expect("Sending collation fetch finished");
// Now we should send it to the second validator
assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
full_response.sent_feedback.expect("Feedback channel is always set")
}
);
virtual_overseer
});
);
}
#[test]
fn collators_declare_to_connected_peers() {
let test_state = TestState::default();
......@@ -924,3 +858,127 @@ fn collators_reject_declare_messages() {
virtual_overseer
})
}
/// Run tests on validator response sequence.
///
/// After the first response is done, the passed in lambda will be called with the receiver for the
/// next response and a sender for giving feedback on the response of the first transmission. After
/// the lamda has passed it is assumed that the second response is sent, which is checked by this
/// function.
///
/// The lambda can trigger occasions on which the second response should be sent, like timeouts,
/// successful completion.
fn test_validator_send_sequence<T, F>(handle_first_response: T)
where
T: FnOnce(oneshot::Receiver<sc_network::config::OutgoingResponse>, oneshot::Sender<()>) -> F,
F: Future<Output=oneshot::Receiver<sc_network::config::OutgoingResponse>>
{
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
setup_system(&mut virtual_overseer, &test_state).await;
let DistributeCollation { candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state, true).await;
for (val, peer) in test_state.current_group_validator_authority_ids()
.into_iter()
.zip(test_state.current_group_validator_peer_ids())
{
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
}
// We declare to the connected validators that we are a collator.
// We need to catch all `Declare` messages to the validators we've
// previosly connected to.
for peer_id in test_state.current_group_validator_peer_ids() {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
}
let validator_0 = test_state.current_group_validator_peer_ids()[0].clone();
let validator_1 = test_state.current_group_validator_peer_ids()[1].clone();
// Send info about peer's view.
send_peer_view_change(&mut virtual_overseer, &validator_0, vec![test_state.relay_parent]).await;
send_peer_view_change(&mut virtual_overseer, &validator_1, vec![test_state.relay_parent]).await;
// The peer is interested in a leaf that we have a collation for;
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &validator_0, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &validator_1, test_state.relay_parent).await;
// Request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
validator_0,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Keep the feedback channel alive because we need to use it to inform about the finished transfer.
let feedback_tx = assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
full_response.sent_feedback.expect("Feedback channel is always set")
}
);
// Let the second validator request the collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
validator_1,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
let rx = handle_first_response(rx, feedback_tx).await;
// Now we should send it to the second validator
assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
full_response.sent_feedback.expect("Feedback channel is always set")
}
);
virtual_overseer
});
}
......@@ -66,6 +66,19 @@ const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for
const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected");
const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem");
/// Time after starting a collation download from a collator we will start another one from the
/// next collator even if the upload was not finished yet.
///
/// This is to protect from a single slow collator preventing collations from happening.
///
/// With a collation size of 5Meg and bandwidth of 500Mbit/s (requirement for Kusama validators),
/// the transfer should be possible within 0.1 seconds. 400 milliseconds should therefore be
/// plenty, even with multiple heads and should be low enough for later collators to still be able
/// to finish on time.
///
/// There is debug logging output, so we can adjust this value based on production results.
const MAX_UNSHARED_DOWNLOAD_TIME: Duration = Duration::from_millis(400);
// How often to check all peers with activity.
#[cfg(not(test))]
const ACTIVITY_POLL: Duration = Duration::from_secs(1);
......@@ -178,7 +191,7 @@ struct CollatingPeerState {
enum PeerState {
// The peer has connected at the given instant.
Connected(Instant),
// Thepe
// Peer is collating.
Collating(CollatingPeerState),
}
......@@ -514,6 +527,11 @@ impl CollationStatus {
struct CollationsPerRelayParent {
/// What is the current status in regards to a collation for this relay parent?
status: CollationStatus,
/// Collation currently being fetched.
///
/// This is the currently last started fetch, which did not exceed `MAX_UNSHARED_DOWNLOAD_TIME`
/// yet.
waiting_collation: Option<CollatorId>,
/// Collation that were advertised to us, but we did not yet fetch.
unfetched_collations: Vec<(PendingCollation, CollatorId)>,
}
......@@ -523,14 +541,33 @@ impl CollationsPerRelayParent {
///
/// This will reset the status back to `Waiting` using [`CollationStatus::back_to_waiting`].
///
/// Returns `Some(_)` if there is any collation to fetch and the `status` is not `Seconded`.
pub fn get_next_collation_to_fetch(&mut self) -> Option<(PendingCollation, CollatorId)> {
/// Returns `Some(_)` if there is any collation to fetch, the `status` is not `Seconded` and
/// the passed in `finished_one` is the currently `waiting_collation`.
pub fn get_next_collation_to_fetch(
&mut self,
finished_one: Option<CollatorId>,
) -> Option<(PendingCollation, CollatorId)> {
// If finished one does not match waiting_collation, then we already dequeued another fetch
// to replace it.
if self.waiting_collation != finished_one {
tracing::trace!(
target: LOG_TARGET,
waiting_collation = ?self.waiting_collation,
?finished_one,
"Not proceeding to the next collation - has already been done."
);
return None
}
self.status.back_to_waiting();
match self.status {
// We don't need to fetch any other collation when we already have seconded one.
CollationStatus::Seconded => None,
CollationStatus::Waiting => self.unfetched_collations.pop(),
CollationStatus::Waiting => {
let next = self.unfetched_collations.pop();
self.waiting_collation = next.as_ref().map(|(_, collator_id)| collator_id.clone());
next
}
CollationStatus::WaitingOnValidation | CollationStatus::Fetching =>
unreachable!("We have reset the status above!"),
}
......@@ -565,6 +602,13 @@ struct State {
/// Keep track of all fetch collation requests
collation_fetches: FuturesUnordered<BoxFuture<'static, PendingCollationFetch>>,
/// When a timer in this `FuturesUnordered` triggers, we should dequeue the next request
/// attempt in the corresponding `collations_per_relay_parent`.
///
/// A triggering timer means that the fetching took too long for our taste and we should give
/// another collator the chance to be faster (dequeue next fetch request as well).
collation_fetch_timeouts: FuturesUnordered<BoxFuture<'static, (CollatorId, Hash)>>,
/// Information about the collations per relay parent.
collations_per_relay_parent: HashMap<Hash, CollationsPerRelayParent>,
......@@ -608,6 +652,13 @@ where
let (tx, rx) = oneshot::channel();
let PendingCollation { relay_parent, para_id, peer_id, .. } = pc;
let timeout = |collator_id, relay_parent| async move {
Delay::new(MAX_UNSHARED_DOWNLOAD_TIME).await;
(collator_id, relay_parent)
};
state.collation_fetch_timeouts.push(timeout(id.clone(), relay_parent.clone()).boxed());
if state.peer_data.get(&peer_id).map_or(false, |d| d.has_advertised(&relay_parent)) {
request_collation(ctx, state, relay_parent, para_id, peer_id, tx).await;
}
......@@ -815,6 +866,7 @@ where
);
modify_reputation(ctx, origin.clone(), COST_UNNEEDED_COLLATOR).await;
tracing::trace!(target: LOG_TARGET, "Disconnecting unneeded collator");
disconnect_peer(ctx, origin).await;
}
}
......@@ -863,7 +915,7 @@ where
collations.unfetched_collations.push((pending_collation, id)),
CollationStatus::Waiting => {
collations.status = CollationStatus::Fetching;
drop(collations);
collations.waiting_collation = Some(id.clone());
fetch_collation(ctx, state, pending_collation.clone(), id).await;
},
......@@ -959,6 +1011,7 @@ where
// declare.
if let Some(para_id) = peer_data.collating_para() {
if !state.active_paras.is_current_or_next(para_id) {
tracing::trace!(target: LOG_TARGET, "Disconnecting peer on view change");
disconnect_peer(ctx, peer_id.clone()).await;
}
}
......@@ -1092,14 +1145,9 @@ where
Entry::Vacant(_) => return,
};
report_collator(ctx, &state.peer_data, id).await;
report_collator(ctx, &state.peer_data, id.clone()).await;
if let Some((next, id)) = state.collations_per_relay_parent
.get_mut(&parent)
.and_then(|c| c.get_next_collation_to_fetch())
{
fetch_collation(ctx, state, next, id).await;
}
dequeue_next_collation_and_fetch(ctx, state, parent, id).await;
}
}
}
......@@ -1164,6 +1212,15 @@ where
res = state.collation_fetches.select_next_some() => {
handle_collation_fetched_result(&mut ctx, &mut state, res).await;
}
res = state.collation_fetch_timeouts.select_next_some() => {
let (collator_id, relay_parent) = res;
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
"Fetch for collation took too long, starting parallel download for next collator as well."
);
dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await;
}
}
let mut retained_requested = HashSet::new();
......@@ -1181,6 +1238,22 @@ where
Ok(())
}
/// Dequeue another collation and fetch.
async fn dequeue_next_collation_and_fetch(