Unverified Commit ba74791b authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Improve collator side of the collator-protocol (#1955)

* Improve collator side of the collator-protocol

This pr improves the collator-protocol implementation of the collator
side. Besides cleaning up code and rewriting it, the following changed:

- Before on `PeerViewChange` we send an advertisment to every peer, now
this only happens for validators.
- It also adds a check that we send an advertisment message only once
for a connected peer.
- If the same validator was part of the current and next group, we
requested to be connected to this validator two times. This is also
fixed now.
- Instead of having only one connection request, we now are being able
to store multiple of them. This is required as we can have multiple
active leafs at any point of time.

* Switch to common `ConnectionRequests`

* Update node/network/collator-protocol/src/collator_side.rs
parent 4a94b71e
Pipeline #114722 passed with stages
in 29 minutes
......@@ -14,15 +14,14 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use super::{LOG_TARGET, Result};
use futures::{StreamExt, task::Poll};
use futures::{StreamExt, select, FutureExt};
use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt,
PoV, ValidatorId,
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, SubsystemContext,
......@@ -120,6 +119,60 @@ impl metrics::Metrics for Metrics {
}
}
/// The group of validators that is assigned to our para at a given point of time.
///
/// This structure is responsible for keeping track of which validators belong to a certain group for a para. It also
/// stores a mapping from [`PeerId`] to [`ValidatorId`] as we learn about it over the lifetime of this object. Besides
/// that it also keeps track to which validators we advertised our collation.
struct ValidatorGroup {
/// All [`ValidatorId`]'s that are assigned to us in this group.
validator_ids: HashSet<ValidatorId>,
/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s from the
/// authority discovery. It is not ensured that this will contain *all* validators of this group.
peer_ids: HashMap<PeerId, ValidatorId>,
/// All [`ValidatorId`]'s of the current group to that we advertised our collation.
advertised_to: HashSet<ValidatorId>,
}
impl ValidatorGroup {
/// Returns `true` if we should advertise our collation to the given peer.
fn should_advertise_to(&self, peer: &PeerId) -> bool {
match self.peer_ids.get(peer) {
Some(validator_id) => !self.advertised_to.contains(validator_id),
None => false,
}
}
/// Should be called after we advertised our collation to the given `peer` to keep track of it.
fn advertised_to_peer(&mut self, peer: &PeerId) {
if let Some(validator_id) = self.peer_ids.get(peer) {
self.advertised_to.insert(validator_id.clone());
}
}
/// Add a [`PeerId`] that belongs to the given [`ValidatorId`].
///
/// This returns `true` if the given validator belongs to this group and we could insert its [`PeerId`].
fn add_peer_id_for_validator(&mut self, peer_id: &PeerId, validator_id: &ValidatorId) -> bool {
if !self.validator_ids.contains(validator_id) {
false
} else {
self.peer_ids.insert(peer_id.clone(), validator_id.clone());
true
}
}
}
impl From<HashSet<ValidatorId>> for ValidatorGroup {
fn from(validator_ids: HashSet<ValidatorId>) -> Self {
Self {
validator_ids,
peer_ids: HashMap::new(),
advertised_to: HashSet::new(),
}
}
}
#[derive(Default)]
struct State {
/// Our id.
......@@ -141,24 +194,26 @@ struct State {
/// We will keep up to one local collation per relay-parent.
collations: HashMap<Hash, (CandidateReceipt, PoV)>,
/// Our validator groups active leafs.
our_validators_groups: HashMap<Hash, Vec<ValidatorId>>,
/// Our validator groups per active leaf.
our_validators_groups: HashMap<Hash, ValidatorGroup>,
/// Validators we know about via `ConnectToValidators` message.
///
/// These are the only validators we are interested in talking to and as such
/// all actions from peers not in this map will be ignored.
/// Entries in this map will be cleared as validator groups in `our_validator_groups`
/// go out of scope with their respective deactivated leafs.
known_validators: HashMap<ValidatorId, PeerId>,
/// List of peers where we declared ourself as a collator.
declared_at: HashSet<PeerId>,
/// Use to await for the next validator connection and revoke the request.
last_connection_request: Option<validator_discovery::ConnectionRequest>,
/// The connection requests to validators per relay parent.
connection_requests: validator_discovery::ConnectionRequests,
/// Metrics.
metrics: Metrics,
}
impl State {
/// Returns `true` if the given `peer` is interested in the leaf that is represented by `relay_parent`.
fn peer_interested_in_leaf(&self, peer: &PeerId, relay_parent: &Hash) -> bool {
self.peer_views.get(peer).map(|v| v.contains(relay_parent)).unwrap_or(false)
}
}
/// Distribute a collation.
///
/// Figure out the core our para is assigned to and the relevant validators.
......@@ -168,16 +223,13 @@ struct State {
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
async fn distribute_collation<Context>(
ctx: &mut Context,
async fn distribute_collation(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
id: ParaId,
receipt: CandidateReceipt,
pov: PoV,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<()> {
let relay_parent = receipt.descriptor.relay_parent;
// This collation is not in the active-leaves set.
......@@ -207,41 +259,28 @@ where
relay_parent = %relay_parent,
"looks like no core is assigned to {} at {}", id, relay_parent,
);
return Ok(());
return Ok(())
}
};
// Determine the group on that core and the next group on that core.
let our_validators = match determine_our_validators(ctx, our_core, num_cores, relay_parent).await? {
Some(validators) => validators,
None => {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
"there are no validators assigned to core",
);
return Ok(());
}
};
let (current_validators, next_validators) = determine_our_validators(ctx, our_core, num_cores, relay_parent).await?;
state.our_validators_groups.insert(relay_parent, our_validators.clone());
if current_validators.is_empty() && next_validators.is_empty() {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
"there are no validators assigned to core",
);
// We may be already connected to some of the validators. In that case,
// advertise a collation to them right away.
for validator in our_validators.iter() {
if let Some(peer) = state.known_validators.get(&validator) {
if let Some(view) = state.peer_views.get(peer) {
if view.contains(&relay_parent) {
let peer = peer.clone();
advertise_collation(ctx, state, relay_parent, vec![peer]).await?;
}
}
}
return Ok(())
}
// Issue a discovery request for the validators of the current group and the next group.
connect_to_validators(ctx, relay_parent, state, our_validators).await?;
connect_to_validators(ctx, relay_parent, state, current_validators.union(&next_validators).cloned().collect()).await?;
state.our_validators_groups.insert(relay_parent, current_validators.into());
state.collations.insert(relay_parent, (receipt, pov));
......@@ -251,14 +290,11 @@ where
/// Get the Id of the Core that is assigned to the para being collated on if any
/// and the total number of cores.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn determine_core<Context>(
ctx: &mut Context,
async fn determine_core(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
para_id: ParaId,
relay_parent: Hash,
) -> Result<Option<(CoreIndex, usize)>>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<Option<(CoreIndex, usize)>> {
let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??;
for (idx, core) in cores.iter().enumerate() {
......@@ -272,63 +308,46 @@ where
Ok(None)
}
/// Figure out a group of validators assigned to the para being collated on.
/// Figure out current and next group of validators assigned to the para being collated on.
///
/// This returns validators for the current group and the next group.
/// Returns [`ValidatorId`]'s of current and next group as determined based on the `relay_parent`.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn determine_our_validators<Context>(
ctx: &mut Context,
async fn determine_our_validators(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
core_index: CoreIndex,
cores: usize,
relay_parent: Hash,
) -> Result<Option<Vec<ValidatorId>>>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<(HashSet<ValidatorId>, HashSet<ValidatorId>)> {
let groups = request_validator_groups_ctx(relay_parent, ctx).await?;
let groups = groups.await??;
let current_group_index = groups.1.group_for_core(core_index, cores);
let mut connect_to_validators = match groups.0.get(current_group_index.0 as usize) {
Some(group) => group.clone(),
None => return Ok(None),
};
let current_validators = groups.0.get(current_group_index.0 as usize).map(|v| v.as_slice()).unwrap_or_default();
let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len();
let next_validators = groups.0.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default();
if let Some(next_group) = groups.0.get(next_group_idx) {
connect_to_validators.extend_from_slice(&next_group);
}
let validators = request_validators_ctx(relay_parent, ctx).await?;
let validators = validators.await??;
let validators = request_validators_ctx(relay_parent, ctx).await?.await??;
let validators = connect_to_validators
.into_iter()
.map(|idx| validators[idx as usize].clone())
.collect();
let current_validators = current_validators.iter().map(|i| validators[*i as usize].clone()).collect();
let next_validators = next_validators.iter().map(|i| validators[*i as usize].clone()).collect();
Ok(Some(validators))
Ok((current_validators, next_validators))
}
/// Issue a `Declare` collation message to a set of peers.
/// Issue a `Declare` collation message to the given `peer`.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn declare<Context>(
ctx: &mut Context,
async fn declare(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
to: Vec<PeerId>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
peer: PeerId,
) -> Result<()> {
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(state.our_id.clone());
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
vec![peer],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await?;
......@@ -339,41 +358,34 @@ where
/// Issue a connection request to a set of validators and
/// revoke the previous connection request.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn connect_to_validators<Context>(
ctx: &mut Context,
async fn connect_to_validators(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
relay_parent: Hash,
state: &mut State,
validators: Vec<ValidatorId>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
if let Some(request) = state.last_connection_request.take() {
drop(request);
}
) -> Result<()> {
let request = validator_discovery::connect_to_validators(
ctx,
relay_parent,
validators,
).await?;
state.last_connection_request = Some(request);
state.connection_requests.put(relay_parent, request);
Ok(())
}
/// Advertise collation to a set of relay chain validators.
/// Advertise collation to the given `peer`.
///
/// This will only advertise a collation if there exists one for the given `relay_parent` and the given `peer` is
/// set as validator for our para at the given `relay_parent`.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn advertise_collation<Context>(
ctx: &mut Context,
async fn advertise_collation(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
relay_parent: Hash,
to: Vec<PeerId>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
peer: PeerId,
) -> Result<()> {
let collating_on = match state.collating_on {
Some(collating_on) => collating_on,
None => {
......@@ -381,15 +393,28 @@ where
}
};
let should_advertise = state.our_validators_groups
.get(&relay_parent)
.map(|g| g.should_advertise_to(&peer))
.unwrap_or(false);
if !state.collations.contains_key(&relay_parent) || !should_advertise {
return Ok(())
}
let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent, collating_on);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
vec![peer.clone()],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await?;
if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
validators.advertised_to_peer(&peer);
}
state.metrics.on_advertisment_made();
Ok(())
......@@ -397,14 +422,11 @@ where
/// The main incoming message dispatching switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn process_msg<Context>(
ctx: &mut Context,
async fn process_msg(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
msg: CollatorProtocolMessage,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<()> {
use CollatorProtocolMessage::*;
let _timer = state.metrics.time_process_msg();
......@@ -475,17 +497,14 @@ where
/// Issue a response to a previously requested collation.
#[tracing::instrument(level = "trace", skip(ctx, state, pov), fields(subsystem = LOG_TARGET))]
async fn send_collation<Context>(
ctx: &mut Context,
async fn send_collation(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
request_id: RequestId,
origin: PeerId,
receipt: CandidateReceipt,
pov: PoV,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<()> {
let wire_message = protocol_v1::CollatorProtocolMessage::Collation(
request_id,
receipt,
......@@ -506,15 +525,12 @@ where
/// A networking messages switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_peer_message<Context>(
ctx: &mut Context,
async fn handle_incoming_peer_message(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<()> {
use protocol_v1::CollatorProtocolMessage::*;
match msg {
......@@ -568,15 +584,12 @@ where
/// Our view has changed.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_peer_view_change<Context>(
ctx: &mut Context,
async fn handle_peer_view_change(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
peer_id: PeerId,
view: View,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<()> {
let current = state.peer_views.entry(peer_id.clone()).or_default();
let added: Vec<Hash> = view.difference(&*current).cloned().collect();
......@@ -584,9 +597,7 @@ where
*current = view;
for added in added.into_iter() {
if state.collations.contains_key(&added) {
advertise_collation(ctx, state, added.clone(), vec![peer_id.clone()]).await?;
}
advertise_collation(ctx, state, added, peer_id.clone()).await?;
}
Ok(())
......@@ -596,22 +607,30 @@ where
///
/// `Declare` that we are a collator with a given `CollatorId`.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_validator_connected<Context>(
ctx: &mut Context,
async fn handle_validator_connected(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
peer_id: PeerId,
validator_id: ValidatorId,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
// Check if the validator is already known or if maybe its peer id chaned(should not happen)
let unknown = state.known_validators.insert(validator_id, peer_id.clone()).map(|o| o != peer_id).unwrap_or(true);
if unknown {
// Only declare the new peers.
declare(ctx, state, vec![peer_id.clone()]).await?;
state.peer_views.insert(peer_id, Default::default());
relay_parent: Hash,
) -> Result<()> {
let not_declared = state.declared_at.insert(peer_id.clone());
if not_declared {
declare(ctx, state, peer_id.clone()).await?;
}
// Store the PeerId and find out if we should advertise to this peer.
//
// If this peer does not belong to the para validators, we also don't need to try to advertise our collation.
let advertise = if let Some(validators) = state.our_validators_groups.get_mut(&relay_parent) {
validators.add_peer_id_for_validator(&peer_id, &validator_id)
} else {
false
};
if advertise && state.peer_interested_in_leaf(&peer_id, &relay_parent) {
advertise_collation(ctx, state, relay_parent, peer_id).await?;
}
Ok(())
......@@ -619,14 +638,11 @@ where
/// Bridge messages switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
ctx: &mut Context,
async fn handle_network_msg(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
bridge_message: NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<()> {
use NetworkBridgeEvent::*;
match bridge_message {
......@@ -638,8 +654,8 @@ where
handle_peer_view_change(ctx, state, peer_id, view).await?;
}
PeerDisconnected(peer_id) => {
state.known_validators.retain(|_, v| *v != peer_id);
state.peer_views.remove(&peer_id);
state.declared_at.remove(&peer_id);
}
OurViewChange(view) => {
handle_our_view_change(state, view).await?;
......@@ -658,59 +674,59 @@ async fn handle_our_view_change(
state: &mut State,
view: View,
) -> Result<()> {
let old_view = std::mem::replace(&mut (state.view), view);
let view = state.view.clone();
let removed = old_view.difference(&view).collect::<Vec<_>>();
for removed in removed.into_iter() {
for removed in state.view.difference(&view) {
state.collations.remove(removed);
state.our_validators_groups.remove(removed);
state.connection_requests.remove(removed);
}
state.view = view;
Ok(())
}
/// The collator protocol collator side main loop.
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
pub(crate) async fn run<Context>(
mut ctx: Context,
pub(crate) async fn run(
mut ctx: impl SubsystemContext<Message = CollatorProtocolMessage>,
our_id: CollatorId,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
) -> Result<()> {
use FromOverseer::*;
use OverseerSignal::*;
let mut state = State {
metrics,
our_id,
..Default::default()
};
state.our_id = our_id;
loop {
if let Some(mut request) = state.last_connection_request.take() {
let _timer = state.metrics.time_handle_connection_request();
while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) {
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await {
select! {
res = state.connection_requests.next().fuse() => {
let (relay_parent, validator_id, peer_id) = match res {
Some(res) => res,
// Will never happen, but better to be safe.
None => continue,
};
let _timer = state.metrics.time_handle_connection_request();
if let Err(err) = handle_validator_connected(