Unverified Commit 76b4c689 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

validator_discovery: simplification (#3009)

* validator_discovery: simplification

* compilation fixes

* compilation fixes II

* compilation fixes III

* compilation fixes IV
parent 483d8f6d
Pipeline #137976 failed with stages
in 30 minutes and 13 seconds
......@@ -16,12 +16,12 @@
//! PoV requester takes care of requesting PoVs from validators of a backing group.
use futures::{FutureExt, channel::{mpsc, oneshot}, future::BoxFuture};
use futures::{FutureExt, channel::oneshot, future::BoxFuture};
use lru::LruCache;
use polkadot_subsystem::jaeger;
use polkadot_node_network_protocol::{
PeerId, peer_set::PeerSet,
peer_set::PeerSet,
request_response::{OutgoingRequest, Recipient, request::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse}}
};
......@@ -46,7 +46,7 @@ pub struct PoVRequester {
///
/// So we keep an LRU for managing connection requests of size 2.
/// Cache will contain `None` if we are not a validator in that session.
connected_validators: LruCache<SessionIndex, Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>>,
connected_validators: LruCache<SessionIndex, Option<oneshot::Sender<()>>>,
}
impl PoVRequester {
......@@ -78,8 +78,8 @@ impl PoVRequester {
if self.connected_validators.contains(&session_index) {
continue
}
let rx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?;
self.connected_validators.put(session_index, rx);
let tx = connect_to_relevant_validators(ctx, runtime, parent, session_index).await?;
self.connected_validators.put(session_index, tx);
}
Ok(())
}
......@@ -190,17 +190,16 @@ async fn connect_to_relevant_validators<Context>(
parent: Hash,
session: SessionIndex
)
-> super::Result<Option<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>>
-> super::Result<Option<oneshot::Sender<()>>>
where
Context: SubsystemContext,
{
if let Some(validator_ids) = determine_relevant_validators(ctx, runtime, parent, session).await? {
// We don't actually care about `PeerId`s, just keeping receiver so we stay connected:
let (tx, rx) = mpsc::channel(0);
let (tx, keep_alive) = oneshot::channel();
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ConnectToValidators {
validator_ids, peer_set: PeerSet::Validation, connected: tx
validator_ids, peer_set: PeerSet::Validation, keep_alive
})).await;
Ok(Some(rx))
Ok(Some(tx))
} else {
Ok(None)
}
......
......@@ -23,7 +23,6 @@
use parity_scale_codec::{Encode, Decode};
use parking_lot::Mutex;
use futures::prelude::*;
use futures::channel::mpsc;
use sc_network::Event as NetworkEvent;
use sp_consensus::SyncOracle;
......@@ -36,7 +35,7 @@ use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages,
CollatorProtocolMessage, NetworkBridgeEvent,
};
use polkadot_primitives::v1::{Hash, BlockNumber, AuthorityDiscoveryId};
use polkadot_primitives::v1::{Hash, BlockNumber};
use polkadot_node_network_protocol::{
PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
ObservedRole,
......@@ -314,12 +313,6 @@ impl From<SubsystemError> for UnexpectedAbort {
}
}
// notifications to be passed through to the validator discovery worker.
enum ValidatorDiscoveryNotification {
PeerConnected(PeerId, PeerSet, Option<AuthorityDiscoveryId>),
PeerDisconnected(PeerId, PeerSet),
}
#[derive(Default, Clone)]
struct Shared(Arc<Mutex<SharedInner>>);
......@@ -339,7 +332,6 @@ async fn handle_subsystem_messages<Context, N, AD>(
mut ctx: Context,
mut network_service: N,
mut authority_discovery_service: AD,
validator_discovery_notifications: mpsc::Receiver<ValidatorDiscoveryNotification>,
shared: Shared,
sync_oracle: Box<dyn SyncOracle + Send>,
metrics: Metrics,
......@@ -356,8 +348,6 @@ where
let mut mode = Mode::Syncing(sync_oracle);
let mut validator_discovery_notifications = validator_discovery_notifications.fuse();
loop {
futures::select! {
msg = ctx.recv().fuse() => match msg {
......@@ -514,7 +504,7 @@ where
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
peer_set,
connected,
keep_alive,
} => {
tracing::trace!(
target: LOG_TARGET,
......@@ -527,7 +517,7 @@ where
let (ns, ads) = validator_discovery.on_request(
validator_ids,
peer_set,
connected,
keep_alive,
network_service,
authority_discovery_service,
).await;
......@@ -538,19 +528,6 @@ where
}
Err(e) => return Err(e.into()),
},
notification = validator_discovery_notifications.next().fuse() => match notification {
None => return Ok(()),
Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_auth)) => {
validator_discovery.on_peer_connected(
peer.clone(),
peer_set,
maybe_auth,
).await;
}
Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => {
validator_discovery.on_peer_disconnected(&peer, peer_set);
}
},
}
}
}
......@@ -560,7 +537,6 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
mut network_service: impl Network,
mut authority_discovery_service: AD,
mut request_multiplexer: RequestMultiplexer,
mut validator_discovery_notifications: mpsc::Sender<ValidatorDiscoveryNotification>,
metrics: Metrics,
shared: Shared,
) -> Result<(), UnexpectedAbort> {
......@@ -612,13 +588,6 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
authority_discovery_service
.get_authority_id_by_peer_id(peer).await;
// Failure here means that the other side of the network bridge
// has concluded and this future will be dropped in due course.
let _ = validator_discovery_notifications.send(
ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_authority.clone())
).await;
match peer_set {
PeerSet::Validation => {
dispatch_validation_events_to_all(
......@@ -694,12 +663,6 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
w
};
// Failure here means that the other side of the network bridge
// has concluded and this future will be dropped in due course.
let _ = validator_discovery_notifications.send(
ValidatorDiscoveryNotification::PeerDisconnected(peer.clone(), peer_set)
).await;
if was_connected {
match peer_set {
PeerSet::Validation => dispatch_validation_event_to_all(
......@@ -858,14 +821,11 @@ where
.get_statement_fetching()
.expect("Gets initialized, must be `Some` on startup. qed.");
let (validation_worker_tx, validation_worker_rx) = mpsc::channel(1024);
let (remote, network_event_handler) = handle_network_messages(
ctx.sender().clone(),
network_service.clone(),
authority_discovery_service.clone(),
request_multiplexer,
validation_worker_tx,
metrics.clone(),
shared.clone(),
).remote_handle();
......@@ -880,7 +840,6 @@ where
ctx,
network_service,
authority_discovery_service,
validation_worker_rx,
shared,
sync_oracle,
metrics,
......
......@@ -22,9 +22,9 @@ use core::marker::PhantomData;
use std::collections::{HashSet, HashMap, hash_map};
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::channel::oneshot;
use sc_network::{config::parse_addr, multiaddr::Multiaddr};
use sc_network::multiaddr::Multiaddr;
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::AuthorityDiscoveryId;
......@@ -55,39 +55,24 @@ impl AuthorityDiscovery for AuthorityDiscoveryService {
/// This struct tracks the state for one `ConnectToValidators` request.
struct NonRevokedConnectionRequestState {
requested: Vec<AuthorityDiscoveryId>,
pending: HashSet<AuthorityDiscoveryId>,
sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
keep_alive: oneshot::Receiver<()>,
}
impl NonRevokedConnectionRequestState {
/// Create a new instance of `ConnectToValidatorsState`.
pub fn new(
requested: Vec<AuthorityDiscoveryId>,
pending: HashSet<AuthorityDiscoveryId>,
sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
keep_alive: oneshot::Receiver<()>,
) -> Self {
Self {
requested,
pending,
sender,
}
}
pub fn on_authority_connected(
&mut self,
authority: &AuthorityDiscoveryId,
peer_id: &PeerId,
) {
if self.pending.remove(authority) {
// an error may happen if the request was revoked or
// the channel's buffer is full, ignoring it is fine
let _ = self.sender.try_send((authority.clone(), peer_id.clone()));
keep_alive,
}
}
/// Returns `true` if the request is revoked.
pub fn is_revoked(&mut self) -> bool {
self.sender.is_closed()
self.keep_alive.try_recv().is_err()
}
pub fn requested(&self) -> &[AuthorityDiscoveryId] {
......@@ -120,8 +105,6 @@ pub(super) struct Service<N, AD> {
#[derive(Default)]
struct StatePerPeerSet {
// Peers that are connected to us and authority ids associated to them.
connected_peers: HashMap<PeerId, HashSet<AuthorityDiscoveryId>>,
// The `u64` counts the number of pending non-revoked requests for this validator
// note: the validators in this map are not necessarily present
// in the `connected_validators` map.
......@@ -138,97 +121,27 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
}
/// Find connected validators using the given `validator_ids`.
///
/// Returns a [`HashMap`] that contains the found [`AuthorityDiscoveryId`]'s and their associated [`PeerId`]'s.
#[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))]
async fn find_connected_validators(
&mut self,
validator_ids: &[AuthorityDiscoveryId],
peer_set: PeerSet,
authority_discovery_service: &mut AD,
) -> HashMap<AuthorityDiscoveryId, PeerId> {
let mut result = HashMap::new();
let state = &mut self.state[peer_set];
for id in validator_ids {
// First check if we already cached the validator
if let Some(pid) = state.connected_peers
.iter()
.find_map(|(pid, ids)| {
if ids.contains(&id) {
Some(pid)
} else {
None
}
})
{
result.insert(id.clone(), pid.clone());
continue;
}
// If not ask the authority discovery
if let Some(addresses) = authority_discovery_service.get_addresses_by_authority_id(id.clone()).await {
for (peer_id, _) in addresses.into_iter().filter_map(|a| parse_addr(a).ok()) {
if let Some(ids) = state.connected_peers.get_mut(&peer_id) {
ids.insert(id.clone());
result.insert(id.clone(), peer_id);
}
}
}
}
result
}
/// On a new connection request, a priority group update will be issued.
/// On a new connection request, a peer set update will be issued.
/// It will ask the network to connect to the validators and not disconnect
/// from them at least until all the pending requests containing them are revoked.
///
/// This method will also clean up all previously revoked requests.
/// it takes `network_service` and `authority_discovery_service` by value
/// and returns them as a workaround for the Future: Send requirement imposed by async fn impl.
#[tracing::instrument(level = "trace", skip(self, connected, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))]
pub async fn on_request(
&mut self,
validator_ids: Vec<AuthorityDiscoveryId>,
peer_set: PeerSet,
mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
keep_alive: oneshot::Receiver<()>,
mut network_service: N,
mut authority_discovery_service: AD,
) -> (N, AD) {
const MAX_ADDR_PER_PEER: usize = 3;
let already_connected = self.find_connected_validators(
&validator_ids,
peer_set,
&mut authority_discovery_service,
).await;
let state = &mut self.state[peer_set];
// Increment the counter of how many times the validators were requested.
validator_ids.iter().for_each(|id| *state.requested_validators.entry(id.clone()).or_default() += 1);
// try to send already connected peers
for (id, peer) in already_connected.iter() {
match connected.try_send((id.clone(), peer.clone())) {
Err(e) if e.is_disconnected() => {
// the request is already revoked
for peer_id in validator_ids {
let _ = on_revoke(&mut state.requested_validators, peer_id);
}
return (network_service, authority_discovery_service);
}
Err(_) => {
// the channel's buffer is full
// ignore the error, the receiver will miss out some peers
// but that's fine
break;
}
Ok(()) => continue,
}
}
// collect multiaddress of validators
let mut multiaddr_to_add = HashSet::new();
for authority in validator_ids.iter() {
......@@ -292,45 +205,13 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
multiaddr_to_remove.clone()
).await;
let pending = validator_ids.iter()
.cloned()
.filter(|id| !already_connected.contains_key(id))
.collect::<HashSet<_>>();
state.non_revoked_discovery_requests.push(NonRevokedConnectionRequestState::new(
validator_ids,
pending,
connected,
keep_alive,
));
(network_service, authority_discovery_service)
}
/// Should be called when a peer connected.
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn on_peer_connected(
&mut self,
peer_id: PeerId,
peer_set: PeerSet,
maybe_authority: Option<AuthorityDiscoveryId>,
) {
let state = &mut self.state[peer_set];
// check if it's an authority we've been waiting for
if let Some(authority) = maybe_authority {
for request in state.non_revoked_discovery_requests.iter_mut() {
let _ = request.on_authority_connected(&authority, &peer_id);
}
state.connected_peers.entry(peer_id).or_default().insert(authority);
} else {
state.connected_peers.insert(peer_id, Default::default());
}
}
/// Should be called when a peer disconnected.
pub fn on_peer_disconnected(&mut self, peer_id: &PeerId, peer_set: PeerSet) {
self.state[peer_set].connected_peers.remove(peer_id);
}
}
#[cfg(test)]
......@@ -339,8 +220,7 @@ mod tests {
use crate::network::{Network, NetworkAction};
use std::{borrow::Cow, pin::Pin};
use futures::{sink::Sink, stream::{BoxStream, StreamExt as _}};
use sc_network::multiaddr::Protocol;
use futures::{sink::Sink, stream::BoxStream};
use sc_network::{Event as NetworkEvent, IfDisconnected};
use sp_keyring::Sr25519Keyring;
use polkadot_node_network_protocol::request_response::request::Requests;
......@@ -440,132 +320,53 @@ mod tests {
#[test]
fn request_is_revoked_when_the_receiver_is_dropped() {
let (sender, receiver) = mpsc::channel(0);
let (keep_alive_handle, keep_alive) = oneshot::channel();
let mut request = NonRevokedConnectionRequestState::new(
Vec::new(),
HashSet::new(),
sender,
keep_alive,
);
assert!(!request.is_revoked());
drop(receiver);
drop(keep_alive_handle);
assert!(request.is_revoked());
}
#[test]
fn requests_are_fulfilled_immediately_for_already_connected_peers() {
let mut service = new_service();
let (ns, mut ads) = new_network();
let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect();
let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
futures::executor::block_on(async move {
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
let (sender, mut receiver) = mpsc::channel(2);
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let _ = service.on_request(
req1,
PeerSet::Validation,
sender,
ns,
ads,
).await;
// the results should be immediately available
let reply1 = receiver.next().await.unwrap();
assert_eq!(reply1.0, authority_ids[0]);
assert_eq!(reply1.1, peer_ids[0]);
});
}
#[test]
fn requests_are_fulfilled_on_peer_connection() {
let mut service = new_service();
let (ns, ads) = new_network();
let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect();
let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
futures::executor::block_on(async move {
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
let (sender, mut receiver) = mpsc::channel(2);
let (_, mut ads) = service.on_request(
req1,
PeerSet::Validation,
sender,
ns,
ads,
).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let reply1 = receiver.next().await.unwrap();
assert_eq!(reply1.0, authority_ids[0]);
assert_eq!(reply1.1, peer_ids[0]);
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let reply2 = receiver.next().await.unwrap();
assert_eq!(reply2.0, authority_ids[1]);
assert_eq!(reply2.1, peer_ids[1]);
});
}
// Test cleanup works.
#[test]
fn requests_are_removed_on_revoke() {
let mut service = new_service();
let (ns, mut ads) = new_network();
let (ns, ads) = new_network();
let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect();
let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let (keep_alive_handle, keep_alive) = oneshot::channel();
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone()],
PeerSet::Validation,
sender,
keep_alive,
ns,
ads,
).await;
let _ = receiver.next().await.unwrap();
// revoke the request
drop(receiver);
drop(keep_alive_handle);
let (sender, mut receiver) = mpsc::channel(1);
let (_keep_alive_handle, keep_alive) = oneshot::channel();
let _ = service.on_request(
vec![authority_ids[1].clone()],
PeerSet::Validation,
sender,
keep_alive,
ns,
ads,
).await;
let reply = receiver.next().await.unwrap();
assert_eq!(reply.0, authority_ids[1]);
assert_eq!(reply.1, peer_ids[1]);
let state = &service.state[PeerSet::Validation];
assert_eq!(state.non_revoked_discovery_requests.len(), 1);
});
......@@ -576,104 +377,54 @@ mod tests {
fn revoking_requests_with_overlapping_validator_sets() {
let mut service = new_service();
let (ns, mut ads) = new_network();
let (ns, ads) = new_network();
let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect();
let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect();
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let (keep_alive_handle, keep_alive) = oneshot::channel();
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone(), authority_ids[2].clone()],
PeerSet::Validation,
sender,
keep_alive,
ns,
ads,
).await;
let _ = receiver.next().await.unwrap();
// revoke the first request
drop(receiver);
drop(keep_alive_handle);
let (sender, mut receiver) = mpsc::channel(1);
let (keep_alive_handle, keep_alive) = oneshot::channel();
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone(), authority_ids[1].clone()],
PeerSet::Validation,
sender,
keep_alive,
ns,
ads,
).await;
let _ =