Unverified Commit 8fd49a52 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Fixes bug that collator wasn't sending `Declare` message (#1895)

* Fixes bug that collator wasn't sending `Declare` message

* Apply suggestions from code review

I'm an idiot
parent 3b1bb87d
Pipeline #112876 passed with stages
in 19 minutes and 38 seconds
......@@ -187,8 +187,7 @@ impl CandidateSelectionJob {
para_id,
collator_id,
)) => {
self.handle_collation(relay_parent, para_id, collator_id)
.await;
self.handle_collation(relay_parent, para_id, collator_id).await;
}
ToJob::CandidateSelection(CandidateSelectionMessage::Invalid(
_,
......
......@@ -563,11 +563,15 @@ async fn handle_validator_connected<Context>(
ctx: &mut Context,
state: &mut State,
peer_id: PeerId,
validator_id: ValidatorId,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
if !state.peer_views.contains_key(&peer_id) {
// Check if the validator is already known or if maybe its peer id chaned(should not happen)
let unknown = state.known_validators.insert(validator_id, peer_id.clone()).map(|o| o != peer_id).unwrap_or(true);
if unknown {
// Only declare the new peers.
declare(ctx, state, vec![peer_id.clone()]).await?;
state.peer_views.insert(peer_id, Default::default());
......@@ -651,8 +655,7 @@ where
loop {
if let Some(mut request) = state.last_connection_request.take() {
while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) {
state.known_validators.insert(validator_id, peer_id.clone());
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id).await {
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id, validator_id).await {
warn!(
target: TARGET,
"Failed to declare our collator id: {:?}",
......@@ -698,6 +701,7 @@ mod tests {
use polkadot_subsystem::ActiveLeavesUpdate;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::ObservedRole;
#[derive(Default)]
struct TestCandidateBuilder {
......@@ -889,9 +893,7 @@ mod tests {
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
let current = test_state.relay_parent;
let TestHarness {
mut virtual_overseer,
} = test_harness;
let mut virtual_overseer = test_harness.virtual_overseer;
let pov_block = PoV {
block_data: BlockData(vec![42, 43, 44]),
......@@ -976,10 +978,7 @@ mod tests {
)) => {
assert_eq!(relay_parent, current);
assert_eq!(validators.len(), 4);
assert!(validators.contains(&test_state.validator_public[2]));
assert!(validators.contains(&test_state.validator_public[0]));
assert!(validators.contains(&test_state.validator_public[4]));
assert!(validators.contains(&test_state.validator_public[1]));
assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
let result = vec![
Some(test_state.validator_authority_id[2].clone()),
......@@ -1002,10 +1001,7 @@ mod tests {
}
) => {
assert_eq!(validator_ids.len(), 4);
assert!(validator_ids.contains(&test_state.validator_authority_id[2]));
assert!(validator_ids.contains(&test_state.validator_authority_id[0]));
assert!(validator_ids.contains(&test_state.validator_authority_id[4]));
assert!(validator_ids.contains(&test_state.validator_authority_id[1]));
assert!(validator_ids.iter().all(|id| test_state.validator_authority_id.contains(id)));
let result = vec![
(test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()),
......@@ -1014,9 +1010,7 @@ mod tests {
(test_state.validator_authority_id[1].clone(), test_state.validator_peer_id[1].clone()),
];
for (id, peer_id) in result.into_iter() {
connected.try_send((id, peer_id)).unwrap();
}
result.into_iter().for_each(|r| connected.try_send(r).unwrap());
}
);
......@@ -1251,10 +1245,7 @@ mod tests {
)) => {
assert_eq!(relay_parent, current);
assert_eq!(validators.len(), 4);
assert!(validators.contains(&test_state.validator_public[2]));
assert!(validators.contains(&test_state.validator_public[0]));
assert!(validators.contains(&test_state.validator_public[4]));
assert!(validators.contains(&test_state.validator_public[1]));
assert!(validators.iter().all(|p| test_state.validator_public.contains(p)));
let result = vec![
Some(test_state.validator_authority_id[2].clone()),
......@@ -1267,4 +1258,161 @@ mod tests {
);
});
}
/// This test ensures that we declare a collator at a validator by sending the `Declare` message as soon as the
/// collator is aware of the validator being connected.
#[test]
fn collators_are_registered_correctly_at_validators() {
let test_state = TestState::default();
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.validator_peer_id[0].clone();
let validator_id = test_state.validator_authority_id[0].clone();
// Setup the system correctly
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollateOn(test_state.chain_ids[0]),
).await;
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![test_state.relay_parent],
deactivated: smallvec![],
}),
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(View(vec![test_state.relay_parent])),
),
).await;
// A validator connected to us
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Authority),
),
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer.clone(), View(Default::default())),
),
).await;
// Now we want to distribute a PoVBlock
let pov_block = PoV {
block_data: BlockData(vec![42, 43, 44]),
};
let pov_hash = pov_block.hash();
let candidate = TestCandidateBuilder {
para_id: test_state.chain_ids[0],
relay_parent: test_state.relay_parent,
pov_hash,
..Default::default()
}.build();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()),
).await;
// obtain the availability cores.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
// Obtain the validator groups
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorGroups(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(test_state.validator_groups.clone())).unwrap();
}
);
// obtain the validators per relay parent
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(test_state.validator_public.clone())).unwrap();
}
);
// obtain the validator_id to authority_id mapping
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(validators.len(), 4);
assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
let result = vec![
Some(test_state.validator_authority_id[2].clone()),
Some(test_state.validator_authority_id[0].clone()),
Some(test_state.validator_authority_id[4].clone()),
Some(test_state.validator_authority_id[1].clone()),
];
tx.send(Ok(result)).unwrap();
}
);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
mut connected,
..
}
) => {
connected.try_send((validator_id, peer.clone())).unwrap();
}
);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
peer_id,
msg,
)
) => {
assert_matches!(
msg,
protocol_v1::CollationProtocol::CollatorProtocol(
protocol_v1::CollatorProtocolMessage::Declare(collator_id),
) if collator_id == test_state.our_collator_pair.public()
);
assert_eq!(peer, peer_id[0]);
}
);
})
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment