Unverified Commit 0c2a0fd9 authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Log info about low connectivity and unreachable validators (#3916)

* Attempt to add log stats to gossip-support.

* WIP: Keep track of connected validators.

* Clarify metric.

* WIP: Make gossip support report connectivity.

* WIP: Fixing tests.

* Fix network bridge + integrate in overseer.

* Consistent naming.

* Fix logic error

* cargo fmt

* Pretty logs.

* cargo fmt

* Use `Delay` to trigger periodic checks.

* fmt

* Fix warning for authority set size of 1.

* More correct ratio report if there are no resolved validators.

* Prettier rendering of empty set.

* Fix typo.

* Another typo.

* Don't check on every leaf update.

* Make compatible with older rustc.

* Fix tests.

* Demote warning.
parent 0d7ebdd1
Pipeline #159400 passed with stages
in 44 minutes and 29 seconds
......@@ -6063,7 +6063,10 @@ name = "polkadot-gossip-support"
version = "0.9.9"
dependencies = [
"assert_matches",
"async-trait",
"futures 0.3.17",
"futures-timer 3.0.2",
"lazy_static",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
......@@ -6071,11 +6074,13 @@ dependencies = [
"polkadot-primitives",
"rand 0.8.4",
"rand_chacha 0.3.1",
"sc-network",
"sp-application-crypto",
"sp-consensus-babe",
"sp-core",
"sp-keyring",
"sp-keystore",
"sp-tracing",
"tracing",
]
......
......@@ -198,7 +198,7 @@ impl metrics::Metrics for Metrics {
prometheus::GaugeVec::new(
prometheus::Opts::new(
"parachain_desired_peer_count",
"The number of peers that the local node is expected to connect to on a parachain-related peer-set",
"The number of peers that the local node is expected to connect to on a parachain-related peer-set (either including or not including unresolvable authorities, depending on whether `ConnectToValidators` or `ConnectToValidatorsResolved` was used.)",
),
&["protocol"]
)?,
......@@ -552,6 +552,27 @@ where
network_service = ns;
authority_discovery_service = ads;
}
NetworkBridgeMessage::ConnectToResolvedValidators {
validator_addrs,
peer_set,
} => {
tracing::trace!(
target: LOG_TARGET,
action = "ConnectToPeers",
peer_set = ?peer_set,
?validator_addrs,
"Received a resolved validator connection request",
);
metrics.note_desired_peer_count(peer_set, validator_addrs.len());
let all_addrs = validator_addrs.into_iter().flatten().collect();
network_service = validator_discovery.on_resolved_request(
all_addrs,
peer_set,
network_service,
).await;
}
NetworkBridgeMessage::NewGossipTopology {
our_neighbors,
} => {
......
......@@ -37,7 +37,8 @@ use polkadot_primitives::v1::AuthorityDiscoveryId;
use polkadot_subsystem::{
jaeger,
messages::{
ApprovalDistributionMessage, BitfieldDistributionMessage, StatementDistributionMessage,
ApprovalDistributionMessage, BitfieldDistributionMessage, GossipSupportMessage,
StatementDistributionMessage,
},
ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal,
};
......@@ -337,6 +338,13 @@ async fn assert_sends_validation_event_to_all(
ApprovalDistributionMessage::NetworkBridgeUpdateV1(e)
) if e == event.focus().expect("could not focus message")
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::GossipSupport(
GossipSupportMessage::NetworkBridgeUpdateV1(e)
) if e == event.focus().expect("could not focus message")
);
}
async fn assert_sends_collation_event_to_all(
......@@ -1189,7 +1197,7 @@ fn send_messages_to_peers() {
fn spread_event_to_subsystems_is_up_to_date() {
// Number of subsystems expected to be interested in a network event,
// and hence the network event broadcasted to.
const EXPECTED_COUNT: usize = 3;
const EXPECTED_COUNT: usize = 4;
let mut cnt = 0_usize;
for msg in AllMessages::dispatch_iter(NetworkBridgeEvent::PeerDisconnected(PeerId::random())) {
......@@ -1219,7 +1227,9 @@ fn spread_event_to_subsystems_is_up_to_date() {
AllMessages::ApprovalDistribution(_) => {
cnt += 1;
},
AllMessages::GossipSupport(_) => unreachable!("Not interested in network events"),
AllMessages::GossipSupport(_) => {
cnt += 1;
},
AllMessages::DisputeCoordinator(_) => unreachable!("Not interested in network events"),
AllMessages::DisputeParticipation(_) =>
unreachable!("Not interested in network events"),
......
......@@ -47,6 +47,44 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
Self { state: Default::default(), _phantom: PhantomData }
}
/// Connect to already resolved addresses:
pub async fn on_resolved_request(
&mut self,
newly_requested: HashSet<Multiaddr>,
peer_set: PeerSet,
mut network_service: N,
) -> N {
let state = &mut self.state[peer_set];
// clean up revoked requests
let multiaddr_to_remove: HashSet<_> =
state.previously_requested.difference(&newly_requested).cloned().collect();
let multiaddr_to_add: HashSet<_> =
newly_requested.difference(&state.previously_requested).cloned().collect();
state.previously_requested = newly_requested;
tracing::debug!(
target: LOG_TARGET,
?peer_set,
added = multiaddr_to_add.len(),
removed = multiaddr_to_remove.len(),
"New ConnectToValidators resolved request",
);
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service
.add_to_peers_set(peer_set.into_protocol_name(), multiaddr_to_add)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service
.remove_from_peers_set(peer_set.into_protocol_name(), multiaddr_to_remove)
.await;
network_service
}
/// On a new connection request, a peer set update will be issued.
/// It will ask the network to connect to the validators and not disconnect
/// from them at least until the next request is issued for the same peer set.
......@@ -59,7 +97,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
validator_ids: Vec<AuthorityDiscoveryId>,
peer_set: PeerSet,
failed: oneshot::Sender<usize>,
mut network_service: N,
network_service: N,
mut authority_discovery_service: AD,
) -> (N, AD) {
// collect multiaddress of validators
......@@ -82,39 +120,19 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
}
let state = &mut self.state[peer_set];
// clean up revoked requests
let multiaddr_to_remove: HashSet<_> =
state.previously_requested.difference(&newly_requested).cloned().collect();
let multiaddr_to_add: HashSet<_> =
newly_requested.difference(&state.previously_requested).cloned().collect();
state.previously_requested = newly_requested;
tracing::debug!(
target: LOG_TARGET,
?peer_set,
?requested,
added = multiaddr_to_add.len(),
removed = multiaddr_to_remove.len(),
?failed_to_resolve,
"New ConnectToValidators request",
);
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service
.add_to_peers_set(peer_set.into_protocol_name(), multiaddr_to_add)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service
.remove_from_peers_set(peer_set.into_protocol_name(), multiaddr_to_remove)
.await;
let r = self.on_resolved_request(newly_requested, peer_set, network_service).await;
let _ = failed.send(failed_to_resolve);
(network_service, authority_discovery_service)
(r, authority_discovery_service)
}
}
......
......@@ -8,6 +8,7 @@ edition = "2018"
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-network-protocol = { path = "../protocol" }
polkadot-node-subsystem = { path = "../../subsystem" }
......@@ -15,6 +16,7 @@ polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-primitives = { path = "../../../primitives" }
futures = "0.3.17"
futures-timer = "3.0.2"
rand = { version = "0.8.3", default-features = false }
rand_chacha = { version = "0.3.1", default-features = false }
tracing = "0.1.28"
......@@ -22,7 +24,10 @@ tracing = "0.1.28"
[dev-dependencies]
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
assert_matches = "1.4.0"
async-trait = "0.1.51"
lazy_static = "1.4.0"
......@@ -24,20 +24,35 @@
//! in this graph will be forwarded to the network bridge with
//! the `NetworkBridgeMessage::NewGossipTopology` message.
use futures::{channel::oneshot, FutureExt as _};
use polkadot_node_network_protocol::peer_set::PeerSet;
use std::{
collections::HashMap,
fmt,
time::{Duration, Instant},
};
use futures::{channel::oneshot, select, FutureExt as _};
use futures_timer::Delay;
use rand::{seq::SliceRandom as _, SeedableRng};
use rand_chacha::ChaCha20Rng;
use sc_network::Multiaddr;
use sp_application_crypto::{AppKey, Public};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_network_protocol::{
authority_discovery::AuthorityDiscovery, peer_set::PeerSet, v1::GossipSuppportNetworkMessage,
PeerId,
};
use polkadot_node_subsystem::{
messages::{GossipSupportMessage, NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest},
messages::{
GossipSupportMessage, NetworkBridgeEvent, NetworkBridgeMessage, RuntimeApiMessage,
RuntimeApiRequest,
},
overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext,
SubsystemError,
};
use polkadot_node_subsystem_util as util;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Hash, SessionIndex};
use rand::{seq::SliceRandom as _, SeedableRng};
use rand_chacha::ChaCha20Rng;
use sp_application_crypto::{AppKey, Public};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use std::time::{Duration, Instant};
#[cfg(test)]
mod tests;
......@@ -56,13 +71,13 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5);
/// https://github.com/paritytech/substrate/blob/fc49802f263529160635471c8a17888846035f5d/client/authority-discovery/src/lib.rs#L88
const LOW_CONNECTIVITY_WARN_DELAY: Duration = Duration::from_secs(600);
/// If connectivity is lower than this in percent, issue warning in logs.
const LOW_CONNECTIVITY_WARN_THRESHOLD: usize = 90;
/// The Gossip Support subsystem.
pub struct GossipSupport {
pub struct GossipSupport<AD> {
keystore: SyncCryptoStorePtr,
}
#[derive(Default)]
struct State {
last_session_index: Option<SessionIndex>,
// Some(timestamp) if we failed to resolve
// at least a third of authorities the last time.
......@@ -75,31 +90,58 @@ struct State {
/// potential sequence of failed attempts. It will be cleared once we reached >2/3
/// connectivity.
failure_start: Option<Instant>,
/// Successfully resolved connections
///
/// waiting for actual connection.
resolved_authorities: HashMap<AuthorityDiscoveryId, Vec<Multiaddr>>,
/// Actually connected authorities.
connected_authorities: HashMap<AuthorityDiscoveryId, PeerId>,
/// By `PeerId`.
///
/// Needed for efficient handling of disconnect events.
connected_authorities_by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
/// Authority discovery service.
authority_discovery: AD,
}
impl GossipSupport {
impl<AD> GossipSupport<AD>
where
AD: AuthorityDiscovery,
{
/// Create a new instance of the [`GossipSupport`] subsystem.
pub fn new(keystore: SyncCryptoStorePtr) -> Self {
Self { keystore }
pub fn new(keystore: SyncCryptoStorePtr, authority_discovery: AD) -> Self {
Self {
keystore,
last_session_index: None,
last_failure: None,
failure_start: None,
resolved_authorities: HashMap::new(),
connected_authorities: HashMap::new(),
connected_authorities_by_peer_id: HashMap::new(),
authority_discovery,
}
async fn run<Context>(self, ctx: Context)
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let mut state = State::default();
self.run_inner(ctx, &mut state).await;
}
async fn run_inner<Context>(self, mut ctx: Context, state: &mut State)
async fn run<Context>(mut self, mut ctx: Context) -> Self
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let Self { keystore } = self;
fn get_connectivity_check_delay() -> Delay {
Delay::new(LOW_CONNECTIVITY_WARN_DELAY)
}
let mut next_connectivity_check = get_connectivity_check_delay().fuse();
loop {
let message = match ctx.recv().await {
let message = select!(
_ = next_connectivity_check => {
self.check_connectivity();
next_connectivity_check = get_connectivity_check_delay().fuse();
continue
}
result = ctx.recv().fuse() =>
match result {
Ok(message) => message,
Err(e) => {
tracing::debug!(
......@@ -107,11 +149,14 @@ impl GossipSupport {
err = ?e,
"Failed to receive a message from Overseer, exiting",
);
return
return self
},
};
}
);
match message {
FromOverseer::Communication { .. } => {},
FromOverseer::Communication {
msg: GossipSupportMessage::NetworkBridgeUpdateV1(ev),
} => self.handle_connect_disconnect(ev),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated,
..
......@@ -119,14 +164,190 @@ impl GossipSupport {
tracing::trace!(target: LOG_TARGET, "active leaves signal");
let leaves = activated.into_iter().map(|a| a.hash);
if let Err(e) = state.handle_active_leaves(&mut ctx, &keystore, leaves).await {
if let Err(e) = self.handle_active_leaves(&mut ctx, leaves).await {
tracing::debug!(target: LOG_TARGET, error = ?e);
}
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, _number)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return,
FromOverseer::Signal(OverseerSignal::Conclude) => return self,
}
}
}
/// 1. Determine if the current session index has changed.
/// 2. If it has, determine relevant validators
/// and issue a connection request.
async fn handle_active_leaves<Context>(
&mut self,
ctx: &mut Context,
leaves: impl Iterator<Item = Hash>,
) -> Result<(), util::Error>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
for leaf in leaves {
let current_index =
util::request_session_index_for_child(leaf, ctx.sender()).await.await??;
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
let force_request = since_failure >= BACKOFF_DURATION;
let leaf_session = Some((current_index, leaf));
let maybe_new_session = match self.last_session_index {
Some(i) if current_index <= i => None,
_ => leaf_session,
};
let maybe_issue_connection =
if force_request { leaf_session } else { maybe_new_session };
if let Some((session_index, relay_parent)) = maybe_issue_connection {
let is_new_session = maybe_new_session.is_some();
if is_new_session {
tracing::debug!(
target: LOG_TARGET,
%session_index,
"New session detected",
);
}
let all_authorities = determine_relevant_authorities(ctx, relay_parent).await?;
let our_index = ensure_i_am_an_authority(&self.keystore, &all_authorities).await?;
let other_authorities = {
let mut authorities = all_authorities.clone();
authorities.swap_remove(our_index);
authorities
};
self.issue_connection_request(ctx, other_authorities).await?;
if is_new_session {
self.last_session_index = Some(session_index);
update_gossip_topology(ctx, our_index, all_authorities, relay_parent).await?;
}
}
}
Ok(())
}
async fn issue_connection_request<Context>(
&mut self,
ctx: &mut Context,
authorities: Vec<AuthorityDiscoveryId>,
) -> Result<(), util::Error>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let num = authorities.len();
let mut validator_addrs = Vec::with_capacity(authorities.len());
let mut failures = 0;
let mut resolved = HashMap::with_capacity(authorities.len());
for authority in authorities {
if let Some(addrs) =
self.authority_discovery.get_addresses_by_authority_id(authority.clone()).await
{
validator_addrs.push(addrs.clone());
resolved.insert(authority, addrs);
} else {
failures += 1;
tracing::debug!(
target: LOG_TARGET,
"Couldn't resolve addresses of authority: {:?}",
authority
);
}
}
self.resolved_authorities = resolved;
tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
ctx.send_message(NetworkBridgeMessage::ConnectToResolvedValidators {
validator_addrs,
peer_set: PeerSet::Validation,
})
.await;
// issue another request for the same session
// if at least a third of the authorities were not resolved.
if 3 * failures >= num {
let timestamp = Instant::now();
match self.failure_start {
None => self.failure_start = Some(timestamp),
Some(first) if first.elapsed() >= LOW_CONNECTIVITY_WARN_DELAY => {
tracing::warn!(
target: LOG_TARGET,
connected = ?(num - failures),
target = ?num,
"Low connectivity - authority lookup failed for too many validators."
);
},
Some(_) => {
tracing::debug!(
target: LOG_TARGET,
connected = ?(num - failures),
target = ?num,
"Low connectivity (due to authority lookup failures) - expected on startup."
);
},
}
self.last_failure = Some(timestamp);
} else {
self.last_failure = None;
self.failure_start = None;
};
Ok(())
}
fn handle_connect_disconnect(&mut self, ev: NetworkBridgeEvent<GossipSuppportNetworkMessage>) {
match ev {
NetworkBridgeEvent::PeerConnected(peer_id, _, o_authority) => {
if let Some(authority) = o_authority {
self.connected_authorities.insert(authority.clone(), peer_id);
self.connected_authorities_by_peer_id.insert(peer_id, authority);
}
},
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
if let Some(authority) = self.connected_authorities_by_peer_id.remove(&peer_id) {
self.connected_authorities.remove(&authority);
}
},
NetworkBridgeEvent::OurViewChange(_) => {},
NetworkBridgeEvent::PeerViewChange(_, _) => {},
NetworkBridgeEvent::NewGossipTopology(_) => {},
NetworkBridgeEvent::PeerMessage(_, v) => {
match v {};
},
}
}
/// Check connectivity and report on it in logs.
fn check_connectivity(&mut self) {
let absolute_connected = self.connected_authorities.len();
let absolute_resolved = self.resolved_authorities.len();
let connected_ratio =
(100 * absolute_connected).checked_div(absolute_resolved).unwrap_or(100);
let unconnected_authorities = self
.resolved_authorities
.iter()
.filter(|(a, _)| !self.connected_authorities.contains_key(a));
// TODO: Make that warning once connectivity issues are fixed (no point in warning, if
// we already know it is broken.
// https://github.com/paritytech/polkadot/issues/3921
if connected_ratio <= LOW_CONNECTIVITY_WARN_THRESHOLD {
tracing::debug!(
target: LOG_TARGET,
"Connectivity seems low, we are only connected to {}% of available validators (see debug logs for details)", connected_ratio
);
}
tracing::debug!(
target: LOG_TARGET,
?connected_ratio,
?absolute_connected,
?absolute_resolved,
unconnected_authorities = %PrettyAuthorities(unconnected_authorities),
"Connectivity Report"
);
}
}
......@@ -161,22 +382,6 @@ async fn ensure_i_am_an_authority(
Err(util::Error::NotAValidator)
}
/// A helper function for making a `ConnectToValidators` request.
async fn connect_to_authorities<Context>(
ctx: &mut Context,
validator_ids: Vec<AuthorityDiscoveryId>,
peer_set: PeerSet,
) -> oneshot::Receiver<usize>
where
Context: SubsystemContext<Message = GossipSupportMessage>,
Context: overseer::SubsystemContext<Message = GossipSupportMessage>,
{
let (failed, failed_rx) = oneshot::channel();
ctx.send_message(NetworkBridgeMessage::ConnectToValidators { validator_ids, peer_set, failed })
.await;
failed_rx
}
/// We partition the list of all sorted `authorities` into `sqrt(len)` groups of `sqrt(len)` size
/// and form a matrix where each validator is connected to all validators in its row and column.
/// This is similar to `[web3]` research proposed topology, except for the groups are not parachain
......@@ -253,123 +458,40 @@ fn matrix_neighbors(our_index: usize, len: usize) -> impl Iterator<Item = usize>
row_neighbors.chain(column_neighbors).filter(move |i| *i != our_index)
}
impl State {
/// 1. Determine if the current session index has changed.
/// 2. If it has, determine relevant validators
/// and issue a connection request.
async fn handle_active_leaves<Context>(
&mut self,