Skip to content
Snippets Groups Projects
Unverified Commit 5ac32ee2 authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

authority-discovery: Set intervals to start when authority keys changes (#3764)

The authority-discovery mechanism has implemented a few exponential
timers for:
- publishing the authority records
- goes from 2 seconds (when freshly booted) to 1 hour if the node is
long-running
  - set to 1 hour after successfully publishing the authority record
- discovering other authority records
- goes from 2 seconds (when freshly booted) to 10 minutes if the node is
long-running

This PR resets the exponential publishing and discovery interval to
defaults ensuring that long-running nodes:
- will retry publishing the authority records as aggressively as freshly
booted nodes
- Currently, if a long-running node fails to publish the DHT record when
the keys change (ie DhtEvent::ValuePutFailed), it will only retry after
1 hour
- will rediscover other authorities faster (since there is a chance that
other authority keys changed)

The subp2p-explorer has difficulties discovering the authorities when
the authority set changes in the first few hours. This might be entirely
due to the recursive nature of the DHT and the needed time to propagate
the records. However, there is a small chance that the authority
publishing failed and is only retried in 1h.

Let me know if this makes sense :pray:

 

cc @paritytech/networking

---------

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: default avatarDmitry Markin <dmitry@markin.tech>
parent feee773d
No related merge requests found
Pipeline #459614 failed with stages
in 45 minutes and 43 seconds
......@@ -28,6 +28,7 @@ use std::{
///
/// Doubles interval duration on each tick until the configured maximum is reached.
pub struct ExpIncInterval {
start: Duration,
max: Duration,
next: Duration,
delay: Delay,
......@@ -37,14 +38,29 @@ impl ExpIncInterval {
/// Create a new [`ExpIncInterval`].
pub fn new(start: Duration, max: Duration) -> Self {
let delay = Delay::new(start);
Self { max, next: start * 2, delay }
Self { start, max, next: start * 2, delay }
}
/// Fast forward the exponentially increasing interval to the configured maximum.
/// Fast forward the exponentially increasing interval to the configured maximum, if not already
/// set.
pub fn set_to_max(&mut self) {
if self.next == self.max {
return;
}
self.next = self.max;
self.delay = Delay::new(self.next);
}
/// Rewind the exponentially increasing interval to the configured start, if not already set.
pub fn set_to_start(&mut self) {
if self.next == self.start * 2 {
return;
}
self.next = self.start * 2;
self.delay = Delay::new(self.start);
}
}
impl Stream for ExpIncInterval {
......
......@@ -129,6 +129,9 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
/// List of keys onto which addresses have been published at the latest publication.
/// Used to check whether they have changed.
latest_published_keys: HashSet<AuthorityId>,
/// List of the kademlia keys that have been published at the latest publication.
/// Used to associate DHT events with our published records.
latest_published_kad_keys: HashSet<KademliaKey>,
/// Same value as in the configuration.
publish_non_global_ips: bool,
......@@ -265,6 +268,7 @@ where
publish_interval,
publish_if_changed_interval,
latest_published_keys: HashSet::new(),
latest_published_kad_keys: HashSet::new(),
publish_non_global_ips: config.publish_non_global_ips,
public_addresses,
strict_record_validation: config.strict_record_validation,
......@@ -397,8 +401,17 @@ where
self.client.as_ref(),
).await?.into_iter().collect::<HashSet<_>>();
if only_if_changed && keys == self.latest_published_keys {
return Ok(())
if only_if_changed {
// If the authority keys did not change and the `publish_if_changed_interval` was
// triggered then do nothing.
if keys == self.latest_published_keys {
return Ok(())
}
// We have detected a change in the authority keys, reset the timers to
// publish and gather data faster.
self.publish_interval.set_to_start();
self.query_interval.set_to_start();
}
let addresses = serialize_addresses(self.addresses_to_publish());
......@@ -422,6 +435,8 @@ where
keys_vec,
)?;
self.latest_published_kad_keys = kv_pairs.iter().map(|(k, _)| k.clone()).collect();
for (key, value) in kv_pairs.into_iter() {
self.network.put_value(key, value);
}
......@@ -523,6 +538,10 @@ where
}
},
DhtEvent::ValuePut(hash) => {
if !self.latest_published_kad_keys.contains(&hash) {
return;
}
// Fast forward the exponentially increasing interval to the configured maximum. In
// case this was the first successful address publishing there is no need for a
// timely retry.
......@@ -535,6 +554,11 @@ where
debug!(target: LOG_TARGET, "Successfully put hash '{:?}' on Dht.", hash)
},
DhtEvent::ValuePutFailed(hash) => {
if !self.latest_published_kad_keys.contains(&hash) {
// Not a value we have published or received multiple times.
return;
}
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put_failed"]).inc();
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment