Unverified Commit b40c3566 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Rework `ConnectionsRequests` (#2081)

* Rework `ConnectionsRequests`

Instead of implementing the `Stream` trait, this struct now provides a
function `next()`. This enables us to encode into the type system that
it will always return a value or block indefinitely.

* Review feedback
parent 305d9414
Pipeline #116306 passed with stages
in 28 minutes and 31 seconds
......@@ -18,7 +18,7 @@ use std::collections::{HashMap, HashSet};
use super::{LOG_TARGET, Result};
use futures::{StreamExt, select, FutureExt};
use futures::{select, FutureExt};
use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId,
......@@ -691,21 +691,15 @@ pub(crate) async fn run(
loop {
select! {
res = state.connection_requests.next() => {
let (relay_parent, validator_id, peer_id) = match res {
Some(res) => res,
// Will never happen, but better to be safe.
None => return Ok(()),
};
res = state.connection_requests.next().fuse() => {
let _timer = state.metrics.time_handle_connection_request();
handle_validator_connected(
&mut ctx,
&mut state,
peer_id,
validator_id,
relay_parent,
res.peer_id,
res.validator_id,
res.relay_parent,
).await;
},
msg = ctx.recv().fuse() => match msg? {
......
......@@ -700,14 +700,7 @@ impl PoVDistribution {
// peer view update messages may be racy and we want connection notifications
// first.
futures::select_biased! {
v = state.connection_requests.next() => {
match v {
Some((_relay_parent, _validator_id, peer_id)) => {
handle_validator_connected(&mut state, peer_id);
}
None => break,
}
}
v = state.connection_requests.next().fuse() => handle_validator_connected(&mut state, v.peer_id),
v = ctx.recv().fuse() => {
match v? {
FromOverseer::Signal(signal) => if handle_signal(
......@@ -743,10 +736,8 @@ impl PoVDistribution {
}
}
}
};
}
}
Ok(())
}
}
......
......@@ -23,6 +23,7 @@ use futures::{
channel::mpsc,
task::{Poll, self},
stream,
StreamExt,
};
use streamunordered::{StreamUnordered, StreamYield};
......@@ -113,33 +114,60 @@ async fn connect_to_authorities<Context: SubsystemContext>(
connected_rx
}
/// Represents a discovered validator.
///
/// Result of [`ConnectionRequests::next`].
#[derive(Debug, PartialEq)]
pub struct DiscoveredValidator {
/// The relay parent associated with the connection request that returned a result.
pub relay_parent: Hash,
/// The [`ValidatorId`] that was resolved.
pub validator_id: ValidatorId,
/// The [`PeerId`] associated to the validator id.
pub peer_id: PeerId,
}
/// Used by [`ConnectionRequests::requests`] to map a [`ConnectionRequest`] item to a [`DiscoveredValidator`].
struct ConnectionRequestForRelayParent {
request: ConnectionRequest,
relay_parent: Hash,
}
impl stream::Stream for ConnectionRequestForRelayParent {
type Item = DiscoveredValidator;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
self.request
.poll_next_unpin(cx)
.map(|r| r.map(|(validator_id, peer_id)| DiscoveredValidator {
validator_id,
peer_id,
relay_parent: self.relay_parent,
}))
}
}
/// A struct that assists performing multiple concurrent connection requests.
///
/// This allows concurrent connections to validator sets at different `relay_parents`
/// and multiplexes their results into a single `Stream`.
/// This allows concurrent connections to validator sets at different `relay_parents`.
/// Use [`ConnectionRequests::next`] to wait for results of the added connection requests.
#[derive(Default)]
pub struct ConnectionRequests {
// added connection requests relay_parent -> StreamUnordered token
/// Connection requests relay_parent -> StreamUnordered token
id_map: HashMap<Hash, usize>,
// Connection requests themselves.
requests: StreamUnordered<ConnectionRequest>,
}
impl stream::FusedStream for ConnectionRequests {
fn is_terminated(&self) -> bool {
false
}
/// Connection requests themselves.
requests: StreamUnordered<ConnectionRequestForRelayParent>,
}
impl ConnectionRequests {
/// Insert a new connection request.
///
/// If a `ConnectionRequest` under a given `relay_parent` already exists it will
/// be revoked and substituted with a new one.
/// be revoked and substituted with the given one.
pub fn put(&mut self, relay_parent: Hash, request: ConnectionRequest) {
self.remove(&relay_parent);
let token = self.requests.push(request);
let token = self.requests.push(ConnectionRequestForRelayParent { relay_parent, request });
self.id_map.insert(relay_parent, token);
}
......@@ -155,39 +183,23 @@ impl ConnectionRequests {
pub fn contains_request(&self, relay_parent: &Hash) -> bool {
self.id_map.contains_key(relay_parent)
}
}
impl stream::Stream for ConnectionRequests {
/// (relay_parent, validator_id, peer_id).
type Item = (Hash, ValidatorId, PeerId);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
// If there are currently no requests going on, pend instead of
// polling `StreamUnordered` which would lead to it terminating
// and returning `Poll::Ready(None)`.
if self.requests.is_empty() {
return Poll::Pending;
}
match Pin::new(&mut self.requests).poll_next(cx) {
Poll::Ready(Some((yielded, token))) => {
match yielded {
StreamYield::Item(item) => {
if let Some((relay_parent, _)) = self.id_map.iter()
.find(|(_, &val)| val == token)
{
return Poll::Ready(Some((*relay_parent, item.0, item.1)));
}
}
StreamYield::Finished(_) => {
// `ConnectionRequest` is fullfilled, but not revoked
}
}
},
_ => {},
/// Returns the next available connection request result.
///
/// # Note
///
/// When there are no active requests this will wait indefinitely, like an always pending future.
pub async fn next(&mut self) -> DiscoveredValidator {
loop {
match self.requests.next().await {
Some((StreamYield::Item(item), _)) => {
return item
},
// Ignore finished requests, they are required to be removed.
Some((StreamYield::Finished(_), _)) => (),
None => futures::pending!(),
}
}
Poll::Pending
}
}
......@@ -231,14 +243,20 @@ mod tests {
use polkadot_primitives::v1::ValidatorPair;
use sp_core::{Pair, Public};
use futures::{executor, poll, StreamExt, SinkExt};
use futures::{executor, poll, SinkExt};
async fn check_next_is_pending(connection_requests: &mut ConnectionRequests) {
let next = connection_requests.next();
futures::pin_mut!(next);
assert_eq!(poll!(next), Poll::Pending);
}
#[test]
fn adding_a_connection_request_works() {
let mut connection_requests = ConnectionRequests::default();
executor::block_on(async move {
assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending);
check_next_is_pending(&mut connection_requests).await;
let validator_1 = ValidatorPair::generate().0.public();
let validator_2 = ValidatorPair::generate().0.public();
......@@ -267,16 +285,19 @@ mod tests {
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
rq1_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
let res = Pin::new(&mut connection_requests).next().await.unwrap();
assert_eq!(res, (relay_parent_1, validator_1, peer_id_1));
let res = Pin::new(&mut connection_requests).next().await.unwrap();
assert_eq!(res, (relay_parent_1, validator_2, peer_id_2));
let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 },
);
let res = connection_requests.next().await;
assert_eq!(
poll!(Pin::new(&mut connection_requests).next()),
Poll::Pending,
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_2, peer_id: peer_id_2 },
);
check_next_is_pending(&mut connection_requests).await;
});
}
......@@ -285,7 +306,7 @@ mod tests {
let mut connection_requests = ConnectionRequests::default();
executor::block_on(async move {
assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending);
check_next_is_pending(&mut connection_requests).await;
let validator_1 = ValidatorPair::generate().0.public();
let validator_2 = ValidatorPair::generate().0.public();
......@@ -325,16 +346,19 @@ mod tests {
rq1_tx.send((auth_1, peer_id_1.clone())).await.unwrap();
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
let res = Pin::new(&mut connection_requests).next().await.unwrap();
assert_eq!(res, (relay_parent_1, validator_1, peer_id_1));
let res = Pin::new(&mut connection_requests).next().await.unwrap();
assert_eq!(res, (relay_parent_2, validator_2, peer_id_2));
let res = connection_requests.next().await;
assert_eq!(
res,
DiscoveredValidator { relay_parent: relay_parent_1, validator_id: validator_1, peer_id: peer_id_1 },
);
let res = connection_requests.next().await;
assert_eq!(
poll!(Pin::new(&mut connection_requests).next()),
Poll::Pending,
res,
DiscoveredValidator { relay_parent: relay_parent_2, validator_id: validator_2, peer_id: peer_id_2 },
);
check_next_is_pending(&mut connection_requests).await;
});
}
......@@ -343,7 +367,7 @@ mod tests {
let mut connection_requests = ConnectionRequests::default();
executor::block_on(async move {
assert_eq!(poll!(Pin::new(&mut connection_requests).next()), Poll::Pending);
check_next_is_pending(&mut connection_requests).await;
let validator_1 = ValidatorPair::generate().0.public();
let validator_2 = ValidatorPair::generate().0.public();
......@@ -380,8 +404,8 @@ mod tests {
rq1_tx.send((auth_1.clone(), peer_id_1.clone())).await.unwrap();
let res = Pin::new(&mut connection_requests).next().await.unwrap();
assert_eq!(res, (relay_parent, validator_1, peer_id_1.clone()));
let res = connection_requests.next().await;
assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_1, peer_id: peer_id_1.clone() });
connection_requests.put(relay_parent.clone(), connection_request_2);
......@@ -389,13 +413,10 @@ mod tests {
rq2_tx.send((auth_2, peer_id_2.clone())).await.unwrap();
let res = Pin::new(&mut connection_requests).next().await.unwrap();
assert_eq!(res, (relay_parent, validator_2, peer_id_2));
let res = connection_requests.next().await;
assert_eq!(res, DiscoveredValidator { relay_parent, validator_id: validator_2, peer_id: peer_id_2 });
assert_eq!(
poll!(Pin::new(&mut connection_requests).next()),
Poll::Pending,
);
check_next_is_pending(&mut connection_requests).await;
});
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment