Unverified Commit a2244b66 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

collator-protocol: do not connect to the next group (#4261)

* collator-protocol: do not connect to the next group

* fmt
parent 9e8b26fe
Pipeline #165650 passed with stages
in 40 minutes and 21 seconds
......@@ -379,11 +379,11 @@ where
},
};
// Determine the group on that core and the next group on that core.
let (current_validators, next_validators) =
// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
if current_validators.validators.is_empty() && next_validators.validators.is_empty() {
if current_validators.validators.is_empty() {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
......@@ -401,23 +401,14 @@ where
pov_hash = ?pov.hash(),
core = ?our_core,
?current_validators,
?next_validators,
"Accepted collation, connecting to validators."
);
let validator_group: HashSet<_> =
current_validators.validators.iter().map(Clone::clone).collect();
// Issue a discovery request for the validators of the current group and the next group:
connect_to_validators(
ctx,
current_validators
.validators
.into_iter()
.chain(next_validators.validators.into_iter())
.collect(),
)
.await;
// Issue a discovery request for the validators of the current group:
connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;
state.our_validators_groups.insert(relay_parent, validator_group.into());
......@@ -471,16 +462,16 @@ struct GroupValidators {
validators: Vec<AuthorityDiscoveryId>,
}
/// Figure out current and next group of validators assigned to the para being collated on.
/// Figure out current group of validators assigned to the para being collated on.
///
/// Returns [`ValidatorId`]'s of current and next group as determined based on the `relay_parent`.
/// Returns [`ValidatorId`]'s of current group as determined based on the `relay_parent`.
async fn determine_our_validators<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
core_index: CoreIndex,
cores: usize,
relay_parent: Hash,
) -> Result<(GroupValidators, GroupValidators)>
) -> Result<GroupValidators>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
......@@ -500,22 +491,15 @@ where
.map(|v| v.as_slice())
.unwrap_or_default();
let next_group_idx = (current_group_index.0 as usize + 1) % groups.len();
let next_validators = groups.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default();
let validators = &info.discovery_keys;
let current_validators =
current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
let next_validators =
next_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
let current_validators =
GroupValidators { group: current_group_index, validators: current_validators };
let next_validators =
GroupValidators { group: GroupIndex(next_group_idx as u32), validators: next_validators };
Ok((current_validators, next_validators))
Ok(current_validators)
}
/// Issue a `Declare` collation message to the given `peer`.
......
......@@ -332,14 +332,12 @@ impl Default for PeerData {
struct GroupAssignments {
current: Option<ParaId>,
next: Option<ParaId>,
}
#[derive(Default)]
struct ActiveParas {
relay_parent_assignments: HashMap<Hash, GroupAssignments>,
current_assignments: HashMap<ParaId, usize>,
next_assignments: HashMap<ParaId, usize>,
}
impl ActiveParas {
......@@ -384,22 +382,16 @@ impl ActiveParas {
},
};
let (para_now, para_next) =
let para_now =
match polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore)
.await
.and_then(|(_, index)| {
polkadot_node_subsystem_util::find_validator_group(&groups, index)
}) {
Some(group) => {
let next_rotation_info = rotation_info.bump_rotation();
let core_now = rotation_info.core_for_group(group, cores.len());
let core_next = next_rotation_info.core_for_group(group, cores.len());
(
cores.get(core_now.0 as usize).and_then(|c| c.para_id()),
cores.get(core_next.0 as usize).and_then(|c| c.para_id()),
)
cores.get(core_now.0 as usize).and_then(|c| c.para_id())
},
None => {
tracing::trace!(target: LOG_TARGET, ?relay_parent, "Not a validator");
......@@ -429,19 +421,15 @@ impl ActiveParas {
}
}
if let Some(para_next) = para_next {
*self.next_assignments.entry(para_next).or_default() += 1;
}
self.relay_parent_assignments
.insert(relay_parent, GroupAssignments { current: para_now, next: para_next });
.insert(relay_parent, GroupAssignments { current: para_now });
}
}
fn remove_outgoing(&mut self, old_relay_parents: impl IntoIterator<Item = Hash>) {
for old_relay_parent in old_relay_parents {
if let Some(assignments) = self.relay_parent_assignments.remove(&old_relay_parent) {
let GroupAssignments { current, next } = assignments;
let GroupAssignments { current } = assignments;
if let Some(cur) = current {
if let Entry::Occupied(mut occupied) = self.current_assignments.entry(cur) {
......@@ -456,23 +444,10 @@ impl ActiveParas {
}
}
}
if let Some(next) = next {
if let Entry::Occupied(mut occupied) = self.next_assignments.entry(next) {
*occupied.get_mut() -= 1;
if *occupied.get() == 0 {
occupied.remove_entry();
}
}
}
}
}
}
fn is_current_or_next(&self, id: ParaId) -> bool {
self.current_assignments.contains_key(&id) || self.next_assignments.contains_key(&id)
}
fn is_current(&self, id: &ParaId) -> bool {
self.current_assignments.contains_key(id)
}
......@@ -846,13 +821,13 @@ async fn process_incoming_peer_message<Context>(
return
}
if state.active_paras.is_current_or_next(para_id) {
if state.active_paras.is_current(&para_id) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
?collator_id,
?para_id,
"Declared as collator for current or next para",
"Declared as collator for current para",
);
peer_data.set_collating(collator_id, para_id);
......@@ -895,20 +870,6 @@ async fn process_incoming_peer_message<Context>(
Some(p) => p,
};
if let PeerState::Collating(ref collating_state) = peer_data.state {
let para_id = collating_state.para_id;
if !state.active_paras.is_current(&para_id) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
%para_id,
?relay_parent,
"Received advertise collation, but we are assigned to the next group",
);
return
}
}
match peer_data.insert_advertisement(relay_parent, &state.view) {
Ok((id, para_id)) => {
tracing::debug!(
......@@ -1015,7 +976,7 @@ where
// If the peer hasn't declared yet, they will be disconnected if they do not
// declare.
if let Some(para_id) = peer_data.collating_para() {
if !state.active_paras.is_current_or_next(para_id) {
if !state.active_paras.is_current(&para_id) {
tracing::trace!(target: LOG_TARGET, "Disconnecting peer on view change");
disconnect_peer(ctx, peer_id.clone()).await;
}
......
......@@ -675,7 +675,7 @@ fn fetch_collations_works() {
}
#[test]
fn dont_fetch_collation_if_assigned_to_next_group() {
fn reject_connection_to_next_group() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
......@@ -701,15 +701,19 @@ fn dont_fetch_collation_if_assigned_to_next_group() {
)
.await;
advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await;
assert!(
overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30))
.await
.is_none(),
"There should be no PoV fetching request.",
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(
peer,
rep,
)) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_UNNEEDED_COLLATOR);
}
);
assert_collator_disconnect(&mut virtual_overseer, peer_b).await;
virtual_overseer
})
}
......
......@@ -47,7 +47,7 @@ digraph {
### Collators
It is assumed that collators are only collating on a single parachain. Collations are generated by the [Collation Generation][CG] subsystem. We will keep up to one local collation per relay-parent, based on `DistributeCollation` messages. If the para is not scheduled or next up on any core, at the relay-parent, or the relay-parent isn't in the active-leaves set, we ignore the message as it must be invalid in that case - although this indicates a logic error elsewhere in the node.
It is assumed that collators are only collating on a single parachain. Collations are generated by the [Collation Generation][CG] subsystem. We will keep up to one local collation per relay-parent, based on `DistributeCollation` messages. If the para is not scheduled on any core, at the relay-parent, or the relay-parent isn't in the active-leaves set, we ignore the message as it must be invalid in that case - although this indicates a logic error elsewhere in the node.
We keep track of the Para ID we are collating on as a collator. This starts as `None`, and is updated with each `CollateOn` message received. If the `ParaId` of a collation requested to be distributed does not match the one we expect, we ignore the message.
......@@ -57,8 +57,8 @@ For the purposes of actually distributing a collation, we need to be connected t
As seen in the [Scheduler Module][SCH] of the runtime, validator groups are fixed for an entire session and their rotations across cores are predictable. Collators will want to do these things when attempting to distribute collations at a given relay-parent:
* Determine which core the para collated-on is assigned to.
* Determine the group on that core and the next group on that core.
* Issue a discovery request for the validators of the current group and the next group with[`NetworkBridgeMessage`][NBM]`::ConnectToValidators`.
* Determine the group on that core.
* Issue a discovery request for the validators of the current group with[`NetworkBridgeMessage`][NBM]`::ConnectToValidators`.
Once connected to the relevant peers for the current group assigned to the core (transitively, the para), advertise the collation to any of them which advertise the relay-parent in their view (as provided by the [Network Bridge][NB]). If any respond with a request for the full collation, provide it. However, we only send one collation at a time per relay parent, other requests need to wait. This is done to reduce the bandwidth requirements of a collator and also increases the chance to fully send the collation to at least one validator. From the point where one validator has received the collation and seconded it, it will also start to share this collation with other validators in its backing group. Upon receiving a view update from any of these peers which includes a relay-parent for which we have a collation that they will find relevant, advertise the collation to them if we haven't already.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment