Unverified Commit f4897f74 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

cleanup validator discovery (#1992)

* use snake_case for log targets

* remove unused continue

* validator_discovery: when disconnecting, use all addresses

* validator_discovery: simplify request revokation

* fix a typo
parent e655654e
Pipeline #114654 passed with stages
in 29 minutes and 6 seconds
......@@ -44,7 +44,7 @@ use std::sync::Arc;
use futures::prelude::*;
const LOG_TARGET: &str = "ChainApiSubsystem";
const LOG_TARGET: &str = "chain_api";
/// The Chain API Subsystem implementation.
pub struct ChainApiSubsystem<Client> {
......
......@@ -40,7 +40,7 @@ use sp_api::{ProvideRuntimeApi};
use futures::prelude::*;
const LOG_TARGET: &str = "RuntimeApi";
const LOG_TARGET: &str = "runtime_api";
/// The `RuntimeApiSubsystem`. See module docs for more details.
pub struct RuntimeApiSubsystem<Client> {
......
......@@ -52,7 +52,7 @@ use std::collections::{HashMap, HashSet};
use std::iter;
use thiserror::Error;
const LOG_TARGET: &'static str = "AvailabilityDistribution";
const LOG_TARGET: &'static str = "availability_distribution";
#[derive(Debug, Error)]
enum Error {
......
......@@ -24,7 +24,7 @@ use parity_scale_codec::{Encode, Decode};
use futures::prelude::*;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use sc_network::Event as NetworkEvent;
......@@ -246,7 +246,6 @@ enum Action {
ConnectToValidators {
validator_ids: Vec<AuthorityDiscoveryId>,
connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
},
ReportPeer(PeerId, ReputationChange),
......@@ -278,11 +277,8 @@ fn action_from_overseer_message(
=> Action::SendValidationMessage(peers, msg),
NetworkBridgeMessage::SendCollationMessage(peers, msg)
=> Action::SendCollationMessage(peers, msg),
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
connected,
revoke,
} => Action::ConnectToValidators { validator_ids, connected, revoke },
NetworkBridgeMessage::ConnectToValidators { validator_ids, connected }
=> Action::ConnectToValidators { validator_ids, connected },
},
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_)))
=> Action::Nop,
......@@ -627,12 +623,10 @@ where
Action::ConnectToValidators {
validator_ids,
connected,
revoke,
} => {
let (ns, ads) = validator_discovery.on_request(
validator_ids,
connected,
revoke,
network_service,
authority_discovery_service,
).await;
......
......@@ -21,7 +21,7 @@ use std::collections::{HashSet, HashMap, hash_map};
use std::sync::Arc;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::channel::mpsc;
use sc_network::multiaddr::{Multiaddr, Protocol};
use sc_authority_discovery::Service as AuthorityDiscoveryService;
......@@ -29,7 +29,7 @@ use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
const PRIORITY_GROUP: &'static str = "parachain_validators";
const LOG_TARGET: &str = "ValidatorDiscovery";
const LOG_TARGET: &str = "validator_discovery";
/// An abstraction over networking for the purposes of validator discovery service.
#[async_trait]
......@@ -76,7 +76,6 @@ struct NonRevokedConnectionRequestState {
requested: Vec<AuthorityDiscoveryId>,
pending: HashSet<AuthorityDiscoveryId>,
sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
}
impl NonRevokedConnectionRequestState {
......@@ -85,13 +84,11 @@ impl NonRevokedConnectionRequestState {
requested: Vec<AuthorityDiscoveryId>,
pending: HashSet<AuthorityDiscoveryId>,
sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
) -> Self {
Self {
requested,
pending,
sender,
revoke,
}
}
......@@ -105,9 +102,7 @@ impl NonRevokedConnectionRequestState {
/// Returns `true` if the request is revoked.
pub fn is_revoked(&mut self) -> bool {
self.revoke
.try_recv()
.map_or(true, |r| r.is_some())
self.sender.is_closed()
}
pub fn requested(&self) -> &[AuthorityDiscoveryId] {
......@@ -187,7 +182,6 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
if let Some(ids) = self.connected_peers.get_mut(&peer_id) {
ids.insert(id.clone());
result.insert(id.clone(), peer_id.clone());
continue;
}
}
}
......@@ -203,12 +197,11 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
/// This method will also clean up all previously revoked requests.
/// it takes `network_service` and `authority_discovery_service` by value
/// and returns them as a workaround for the Future: Send requirement imposed by async fn impl.
#[tracing::instrument(level = "trace", skip(self, connected, revoke, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self, connected, network_service, authority_discovery_service), fields(subsystem = LOG_TARGET))]
pub async fn on_request(
&mut self,
validator_ids: Vec<AuthorityDiscoveryId>,
mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
mut network_service: N,
mut authority_discovery_service: AD,
) -> (N, AD) {
......@@ -276,7 +269,7 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
for id in revoked_validators.into_iter() {
let result = authority_discovery_service.get_addresses_by_authority_id(id).await;
if let Some(addresses) = result {
multiaddr_to_remove.extend(addresses.into_iter().take(MAX_ADDR_PER_PEER));
multiaddr_to_remove.extend(addresses.into_iter());
}
}
......@@ -300,7 +293,6 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
validator_ids,
pending,
connected,
revoke,
));
(network_service, authority_discovery_service)
......@@ -418,39 +410,18 @@ mod tests {
}
#[test]
fn request_is_revoked_on_send() {
let (revoke_tx, revoke_rx) = oneshot::channel();
let (sender, _receiver) = mpsc::channel(0);
fn request_is_revoked_when_the_receiver_is_dropped() {
let (sender, receiver) = mpsc::channel(0);
let mut request = NonRevokedConnectionRequestState::new(
Vec::new(),
HashSet::new(),
sender,
revoke_rx,
);
assert!(!request.is_revoked());
revoke_tx.send(()).unwrap();
assert!(request.is_revoked());
}
#[test]
fn request_is_revoked_when_the_sender_is_dropped() {
let (revoke_tx, revoke_rx) = oneshot::channel();
let (sender, _receiver) = mpsc::channel(0);
let mut request = NonRevokedConnectionRequestState::new(
Vec::new(),
HashSet::new(),
sender,
revoke_rx,
);
assert!(!request.is_revoked());
drop(revoke_tx);
drop(receiver);
assert!(request.is_revoked());
}
......@@ -467,14 +438,12 @@ mod tests {
futures::executor::block_on(async move {
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
let (sender, mut receiver) = mpsc::channel(2);
let (_revoke_tx, revoke_rx) = oneshot::channel();
service.on_peer_connected(&peer_ids[0], &mut ads).await;
let _ = service.on_request(
req1,
sender,
revoke_rx,
ns,
ads,
).await;
......@@ -499,12 +468,10 @@ mod tests {
futures::executor::block_on(async move {
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
let (sender, mut receiver) = mpsc::channel(2);
let (_revoke_tx, revoke_rx) = oneshot::channel();
let (_, mut ads) = service.on_request(
req1,
sender,
revoke_rx,
ns,
ads,
).await;
......@@ -534,7 +501,6 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
let (revoke_tx, revoke_rx) = oneshot::channel();
service.on_peer_connected(&peer_ids[0], &mut ads).await;
service.on_peer_connected(&peer_ids[1], &mut ads).await;
......@@ -542,22 +508,19 @@ mod tests {
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
let _ = receiver.next().await.unwrap();
// revoke the request
revoke_tx.send(()).unwrap();
drop(receiver);
let (sender, mut receiver) = mpsc::channel(1);
let (_revoke_tx, revoke_rx) = oneshot::channel();
let _ = service.on_request(
vec![authority_ids[1].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
......@@ -581,7 +544,6 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
let (revoke_tx, revoke_rx) = oneshot::channel();
service.on_peer_connected(&peer_ids[0], &mut ads).await;
service.on_peer_connected(&peer_ids[1], &mut ads).await;
......@@ -589,22 +551,19 @@ mod tests {
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone(), authority_ids[2].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
let _ = receiver.next().await.unwrap();
// revoke the first request
revoke_tx.send(()).unwrap();
drop(receiver);
let (sender, mut receiver) = mpsc::channel(1);
let (revoke_tx, revoke_rx) = oneshot::channel();
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone(), authority_ids[1].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
......@@ -614,15 +573,13 @@ mod tests {
assert_eq!(ns.priority_group.len(), 2);
// revoke the second request
revoke_tx.send(()).unwrap();
drop(receiver);
let (sender, mut receiver) = mpsc::channel(1);
let (_revoke_tx, revoke_rx) = oneshot::channel();
let (ns, _) = service.on_request(
vec![authority_ids[0].clone()],
sender,
revoke_rx,
ns,
ads,
).await;
......@@ -647,7 +604,6 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
let (_revoke_tx, revoke_rx) = oneshot::channel();
service.on_peer_connected(&validator_peer_id, &mut ads).await;
......@@ -658,7 +614,6 @@ mod tests {
let _ = service.on_request(
vec![validator_id.clone()],
sender,
revoke_rx,
ns,
ads,
).await;
......
......@@ -349,7 +349,7 @@ where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
if let Some(request) = state.last_connection_request.take() {
request.revoke();
drop(request);
}
let request = validator_discovery::connect_to_validators(
......
......@@ -76,33 +76,30 @@ pub async fn connect_to_validators<Context: SubsystemContext>(
.filter_map(|(k, v)| v.map(|v| (v, k)))
.collect::<HashMap<AuthorityDiscoveryId, ValidatorId>>();
let (connections, revoke) = connect_to_authorities(ctx, authorities).await?;
let connections = connect_to_authorities(ctx, authorities).await?;
Ok(ConnectionRequest {
validator_map,
connections,
revoke,
})
}
async fn connect_to_authorities<Context: SubsystemContext>(
ctx: &mut Context,
validator_ids: Vec<AuthorityDiscoveryId>,
) -> Result<(mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, oneshot::Sender<()>), Error> {
) -> Result<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, Error> {
const PEERS_CAPACITY: usize = 8;
let (revoke_tx, revoke) = oneshot::channel();
let (connected, connected_rx) = mpsc::channel(PEERS_CAPACITY);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
validator_ids,
connected,
revoke,
}
)).await?;
Ok((connected_rx, revoke_tx))
Ok(connected_rx)
}
/// A struct that assists performing multiple concurrent connection requests.
......@@ -176,15 +173,12 @@ impl stream::Stream for ConnectionRequests {
/// This struct implements `Stream` to allow for asynchronous
/// discovery of validator addresses.
///
/// NOTE: you should call `revoke` on this struct
/// when you're no longer interested in the requested validators.
/// NOTE: the request will be revoked on drop.
#[must_use = "dropping a request will result in its immediate revokation"]
pub struct ConnectionRequest {
validator_map: HashMap<AuthorityDiscoveryId, ValidatorId>,
#[must_use = "streams do nothing unless polled"]
connections: mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>,
#[must_use = "a request should be revoked at some point"]
revoke: oneshot::Sender<()>,
}
impl stream::Stream for ConnectionRequest {
......@@ -209,29 +203,13 @@ impl stream::Stream for ConnectionRequest {
}
}
impl ConnectionRequest {
/// By revoking the request the caller allows the network to
/// free some peer slots thus freeing the resources.
/// It doesn't necessarily lead to peers disconnection though.
/// The revokation is enacted on in the next connection request.
///
/// This can be done either by calling this function or dropping the request.
pub fn revoke(self) {
if let Err(_) = self.revoke.send(()) {
tracing::warn!(
"Failed to revoke a validator connection request",
);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use polkadot_primitives::v1::ValidatorPair;
use sp_core::{Pair, Public};
use futures::{executor, poll, channel::{mpsc, oneshot}, StreamExt, SinkExt};
use futures::{executor, poll, StreamExt, SinkExt};
#[test]
fn adding_a_connection_request_works() {
......@@ -251,7 +229,6 @@ mod tests {
validator_map.insert(auth_2.clone(), validator_2.clone());
let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
let (revoke_1_tx, _revoke_1_rx) = oneshot::channel();
let peer_id_1 = PeerId::random();
let peer_id_2 = PeerId::random();
......@@ -259,7 +236,6 @@ mod tests {
let connection_request_1 = ConnectionRequest {
validator_map,
connections: rq1_rx,
revoke: revoke_1_tx,
};
let relay_parent_1 = Hash::repeat_byte(1);
......@@ -302,10 +278,8 @@ mod tests {
validator_map_2.insert(auth_2.clone(), validator_2.clone());
let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
let (revoke_1_tx, _revoke_1_rx) = oneshot::channel();
let (mut rq2_tx, rq2_rx) = mpsc::channel(8);
let (revoke_2_tx, _revoke_2_rx) = oneshot::channel();
let peer_id_1 = PeerId::random();
let peer_id_2 = PeerId::random();
......@@ -313,13 +287,11 @@ mod tests {
let connection_request_1 = ConnectionRequest {
validator_map: validator_map_1,
connections: rq1_rx,
revoke: revoke_1_tx,
};
let connection_request_2 = ConnectionRequest {
validator_map: validator_map_2,
connections: rq2_rx,
revoke: revoke_2_tx,
};
let relay_parent_1 = Hash::repeat_byte(1);
......@@ -364,10 +336,8 @@ mod tests {
validator_map_2.insert(auth_2.clone(), validator_2.clone());
let (mut rq1_tx, rq1_rx) = mpsc::channel(8);
let (revoke_1_tx, _revoke_1_rx) = oneshot::channel();
let (mut rq2_tx, rq2_rx) = mpsc::channel(8);
let (revoke_2_tx, _revoke_2_rx) = oneshot::channel();
let peer_id_1 = PeerId::random();
let peer_id_2 = PeerId::random();
......@@ -375,13 +345,11 @@ mod tests {
let connection_request_1 = ConnectionRequest {
validator_map: validator_map_1,
connections: rq1_rx,
revoke: revoke_1_tx,
};
let connection_request_2 = ConnectionRequest {
validator_map: validator_map_2,
connections: rq2_rx,
revoke: revoke_2_tx,
};
let relay_parent = Hash::repeat_byte(3);
......
......@@ -208,6 +208,7 @@ pub enum NetworkBridgeMessage {
///
/// Also ask the network to stay connected to these peers at least
/// until the request is revoked.
/// This can be done by dropping the receiver.
ConnectToValidators {
/// Ids of the validators to connect to.
validator_ids: Vec<AuthorityDiscoveryId>,
......@@ -215,13 +216,6 @@ pub enum NetworkBridgeMessage {
/// the validators as they are connected.
/// The response is sent immediately for already connected peers.
connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
/// By revoking the request the caller allows the network to
/// free some peer slots thus freeing the resources.
/// It doesn't necessarily lead to peers disconnection though.
/// The revokation is enacted on in the next connection request.
///
/// This can be done by sending to the channel or dropping the sender.
revoke: oneshot::Receiver<()>,
},
}
......
......@@ -54,8 +54,8 @@ enum ApprovalVotingMessage {
/// Check if the assignment is valid and can be accepted by our view of the protocol.
/// Should not be sent unless the block hash is known.
CheckAndImportAssignment(
Hash,
AssignmentCert,
Hash,
AssignmentCert,
ValidatorIndex,
ResponseChannel<VoteCheckResult>,
),
......@@ -68,11 +68,11 @@ enum ApprovalVotingMessage {
ResponseChannel<VoteCheckResult>,
),
/// Returns the highest possible ancestor hash of the provided block hash which is
/// acceptable to vote on finality for.
/// acceptable to vote on finality for.
/// The `BlockNumber` provided is the number of the block's ancestor which is the
/// earliest possible vote.
///
/// It can also return the same block hash, if that is acceptable to vote upon.
///
/// It can also return the same block hash, if that is acceptable to vote upon.
/// Return `None` if the input hash is unrecognized.
ApprovedAncestor(Hash, BlockNumber, ResponseChannel<Option<Hash>>),
}
......@@ -122,8 +122,8 @@ Messages received by the availability recovery subsystem.
enum AvailabilityRecoveryMessage {
/// Recover available data from validators on the network.
RecoverAvailableData(
CandidateDescriptor,
SessionIndex,
CandidateDescriptor,
SessionIndex,
ResponseChannel<Option<AvailableData>>,
),
}
......@@ -293,6 +293,7 @@ enum NetworkBridgeMessage {
///
/// Also ask the network to stay connected to these peers at least
/// until the request is revoked.
/// This can be done by dropping the receiver.
ConnectToValidators {
/// Ids of the validators to connect to.
validator_ids: Vec<AuthorityDiscoveryId>,
......@@ -300,13 +301,6 @@ enum NetworkBridgeMessage {
/// the validators as they are connected.
/// The response is sent immediately for already connected peers.
connected: ResponseStream<(AuthorityDiscoveryId, PeerId)>,
/// By revoking the request the caller allows the network to
/// free some peer slots thus freeing the resources.
/// It doesn't necessarily lead to peers disconnection though.
/// The revokation is enacted on in the next connection request.
///
/// This can be done by sending to the channel or dropping the sender.
revoke: ReceiverChannel<()>,
},
}
```
......@@ -409,7 +403,7 @@ enum RuntimeApiRequest {
SessionIndex(ResponseChannel<SessionIndex>),
/// Get the validation code for a specific para, using the given occupied core assumption.
ValidationCode(ParaId, OccupiedCoreAssumption, ResponseChannel<Option<ValidationCode>>),
/// Fetch the historical validation code used by a para for candidates executed in
/// Fetch the historical validation code used by a para for candidates executed in
/// the context of a given block height in the current chain.
HistoricalValidationCode(ParaId, BlockNumber, ResponseChannel<Option<ValidationCode>>),
/// with the given occupied core assumption.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment