Skip to content
Snippets Groups Projects
Commit 7ba08473 authored by eskimor's avatar eskimor Committed by GitHub
Browse files

Rate limit improvements (#6315)


* We actually don't need to rate limit redundant requests.

Those redundant requests should not actually happen, but still.

* Add some logging.

* Also log message when the receiving side hit the rate limit.

* Update node/network/dispute-distribution/src/sender/mod.rs

Co-authored-by: default avatarAlexandru Vasile <60601340+lexnv@users.noreply.github.com>

Co-authored-by: default avatareskimor <eskimor@no-such-url.com>
Co-authored-by: default avatarAlexandru Vasile <60601340+lexnv@users.noreply.github.com>
parent 1dec2433
No related merge requests found
...@@ -302,6 +302,12 @@ where ...@@ -302,6 +302,12 @@ where
// Queue request: // Queue request:
if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) { if let Err((authority_id, req)) = self.peer_queues.push_req(authority_id, req) {
gum::debug!(
target: LOG_TARGET,
?authority_id,
?peer,
"Peer hit the rate limit - dropping message."
);
req.send_outgoing_response(OutgoingResponse { req.send_outgoing_response(OutgoingResponse {
result: Err(()), result: Err(()),
reputation_changes: vec![COST_APPARENT_FLOOD], reputation_changes: vec![COST_APPARENT_FLOOD],
......
...@@ -108,8 +108,6 @@ impl DisputeSender { ...@@ -108,8 +108,6 @@ impl DisputeSender {
runtime: &mut RuntimeInfo, runtime: &mut RuntimeInfo,
msg: DisputeMessage, msg: DisputeMessage,
) -> Result<()> { ) -> Result<()> {
self.rate_limit.limit().await;
let req: DisputeRequest = msg.into(); let req: DisputeRequest = msg.into();
let candidate_hash = req.0.candidate_receipt.hash(); let candidate_hash = req.0.candidate_receipt.hash();
match self.disputes.entry(candidate_hash) { match self.disputes.entry(candidate_hash) {
...@@ -118,6 +116,8 @@ impl DisputeSender { ...@@ -118,6 +116,8 @@ impl DisputeSender {
return Ok(()) return Ok(())
}, },
Entry::Vacant(vacant) => { Entry::Vacant(vacant) => {
self.rate_limit.limit("in start_sender", candidate_hash).await;
let send_task = SendTask::new( let send_task = SendTask::new(
ctx, ctx,
runtime, runtime,
...@@ -169,10 +169,12 @@ impl DisputeSender { ...@@ -169,10 +169,12 @@ impl DisputeSender {
// Iterates in order of insertion: // Iterates in order of insertion:
let mut should_rate_limit = true; let mut should_rate_limit = true;
for dispute in self.disputes.values_mut() { for (candidate_hash, dispute) in self.disputes.iter_mut() {
if have_new_sessions || dispute.has_failed_sends() { if have_new_sessions || dispute.has_failed_sends() {
if should_rate_limit { if should_rate_limit {
self.rate_limit.limit().await; self.rate_limit
.limit("while going through new sessions/failed sends", *candidate_hash)
.await;
} }
let sends_happened = dispute let sends_happened = dispute
.refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics) .refresh_sends(ctx, runtime, &self.active_sessions, &self.metrics)
...@@ -193,7 +195,7 @@ impl DisputeSender { ...@@ -193,7 +195,7 @@ impl DisputeSender {
// recovered at startup will be relatively "old" anyway and we assume that no more than a // recovered at startup will be relatively "old" anyway and we assume that no more than a
// third of the validators will go offline at any point in time anyway. // third of the validators will go offline at any point in time anyway.
for dispute in unknown_disputes { for dispute in unknown_disputes {
self.rate_limit.limit().await; self.rate_limit.limit("while going through unknown disputes", dispute.1).await;
self.start_send_for_dispute(ctx, runtime, dispute).await?; self.start_send_for_dispute(ctx, runtime, dispute).await?;
} }
Ok(()) Ok(())
...@@ -383,7 +385,9 @@ impl RateLimit { ...@@ -383,7 +385,9 @@ impl RateLimit {
} }
/// Wait until ready and prepare for next call. /// Wait until ready and prepare for next call.
async fn limit(&mut self) { ///
/// String given as occasion and candidate hash are logged in case the rate limit hit.
async fn limit(&mut self, occasion: &'static str, candidate_hash: CandidateHash) {
// Wait for rate limit and add some logging: // Wait for rate limit and add some logging:
poll_fn(|cx| { poll_fn(|cx| {
let old_limit = Pin::new(&mut self.limit); let old_limit = Pin::new(&mut self.limit);
...@@ -391,6 +395,8 @@ impl RateLimit { ...@@ -391,6 +395,8 @@ impl RateLimit {
Poll::Pending => { Poll::Pending => {
gum::debug!( gum::debug!(
target: LOG_TARGET, target: LOG_TARGET,
?occasion,
?candidate_hash,
"Sending rate limit hit, slowing down requests" "Sending rate limit hit, slowing down requests"
); );
Poll::Pending Poll::Pending
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment