diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 1c9aefab9bf7ccbb2bcd24e1fb741c2630add695..882a38a033b01da5d7f4bee9fd1ed28b43e4b856 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -122,7 +122,7 @@ struct State { /// all actions from peers not in this map will be ignored. /// Entries in this map will be cleared as validator groups in `our_validator_groups` /// go out of scope with their respective deactivated leafs. - known_validators: HashMap<PeerId, ValidatorId>, + known_validators: HashMap<ValidatorId, PeerId>, /// Use to await for the next validator connection and revoke the request. last_connection_request: Option<validator_discovery::ConnectionRequest>, @@ -195,6 +195,19 @@ where state.our_validators_groups.insert(relay_parent, our_validators.clone()); + // We may be already connected to some of the validators. In that case, + // advertise a collation to them right away. + for validator in our_validators.iter() { + if let Some(peer) = state.known_validators.get(&validator) { + if let Some(view) = state.peer_views.get(peer) { + if view.contains(&relay_parent) { + let peer = peer.clone(); + advertise_collation(ctx, state, relay_parent, vec![peer]).await?; + } + } + } + } + // Issue a discovery request for the validators of the current group and the next group. connect_to_validators(ctx, relay_parent, state, our_validators).await?; @@ -554,9 +567,11 @@ async fn handle_validator_connected<Context>( where Context: SubsystemContext<Message = CollatorProtocolMessage> { - state.peer_views.entry(peer_id.clone()).or_default(); - - declare(ctx, state, vec![peer_id]).await?; + if !state.peer_views.contains_key(&peer_id) { + // Only declare the new peers. + declare(ctx, state, vec![peer_id.clone()]).await?; + state.peer_views.insert(peer_id, Default::default()); + } Ok(()) } @@ -574,12 +589,14 @@ where match bridge_message { PeerConnected(_peer_id, _observed_role) => { - // validators first connection is handled by `handle_validator_connected` + // If it is possible that a disconnected validator would attempt a reconnect + // it should be handled here. } PeerViewChange(peer_id, view) => { handle_peer_view_change(ctx, state, peer_id, view).await?; } PeerDisconnected(peer_id) => { + state.known_validators.retain(|_, v| *v != peer_id); state.peer_views.remove(&peer_id); } OurViewChange(view) => { @@ -606,9 +623,7 @@ async fn handle_our_view_change( for removed in removed.into_iter() { state.collations.remove(removed); - if let Some(group) = state.our_validators_groups.remove(removed) { - state.known_validators.retain(|_, v| !group.contains(v)); - } + state.our_validators_groups.remove(removed); } Ok(()) @@ -636,7 +651,7 @@ where loop { if let Some(mut request) = state.last_connection_request.take() { while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) { - state.known_validators.insert(peer_id.clone(), validator_id); + state.known_validators.insert(validator_id, peer_id.clone()); if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id).await { warn!( target: TARGET, @@ -1129,6 +1144,127 @@ mod tests { ).await; assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none()); + + let pov_block = PoV { + block_data: BlockData(vec![45, 46, 47]), + }; + + let pov_hash = pov_block.hash(); + let current = Hash::repeat_byte(33); + + let candidate = TestCandidateBuilder { + para_id: test_state.chain_ids[0], + relay_parent: current, + pov_hash, + ..Default::default() + }.build(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(View(vec![current])), + ), + ).await; + + // Send info about peer's view. + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerViewChange( + test_state.validator_peer_id[2].clone(), + View(vec![current]), + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), + ).await; + + // obtain the availability cores. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::AvailabilityCores(tx) + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.availability_cores.clone())).unwrap(); + } + ); + + // Obtain the validator groups + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorGroups(tx) + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.validator_groups.clone())).unwrap(); + } + ); + + // obtain the validators per relay parent + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => { + assert_eq!(relay_parent, current); + tx.send(Ok(test_state.validator_public.clone())).unwrap(); + } + ); + + // The peer is interested in a leaf that we have a collation for; + // advertise it. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + relay_parent, + collating_on, + ) => { + assert_eq!(relay_parent, current); + assert_eq!(collating_on, test_state.chain_ids[0]); + } + ); + } + ); + + // obtain the validator_id to authority_id mapping + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators, tx), + )) => { + assert_eq!(relay_parent, current); + assert_eq!(validators.len(), 4); + assert!(validators.contains(&test_state.validator_public[2])); + assert!(validators.contains(&test_state.validator_public[0])); + assert!(validators.contains(&test_state.validator_public[4])); + assert!(validators.contains(&test_state.validator_public[1])); + + let result = vec![ + Some(test_state.validator_authority_id[2].clone()), + Some(test_state.validator_authority_id[0].clone()), + Some(test_state.validator_authority_id[4].clone()), + Some(test_state.validator_authority_id[1].clone()), + ]; + tx.send(Ok(result)).unwrap(); + } + ); }); } }