Unverified Commit f0a352a4 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Advertise to already connected validators (#1790)



* Advertise to already connected validators

* Merge the loops and check the view

* Extend a test to capture new logic

* Fix a comment

* Update node/network/collator-protocol/src/collator_side.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Update comment

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 923c97a4
Pipeline #109800 passed with stages
in 15 minutes and 18 seconds
......@@ -122,7 +122,7 @@ struct State {
/// all actions from peers not in this map will be ignored.
/// Entries in this map will be cleared as validator groups in `our_validator_groups`
/// go out of scope with their respective deactivated leafs.
known_validators: HashMap<PeerId, ValidatorId>,
known_validators: HashMap<ValidatorId, PeerId>,
/// Use to await for the next validator connection and revoke the request.
last_connection_request: Option<validator_discovery::ConnectionRequest>,
......@@ -195,6 +195,19 @@ where
state.our_validators_groups.insert(relay_parent, our_validators.clone());
// We may be already connected to some of the validators. In that case,
// advertise a collation to them right away.
for validator in our_validators.iter() {
if let Some(peer) = state.known_validators.get(&validator) {
if let Some(view) = state.peer_views.get(peer) {
if view.contains(&relay_parent) {
let peer = peer.clone();
advertise_collation(ctx, state, relay_parent, vec![peer]).await?;
}
}
}
}
// Issue a discovery request for the validators of the current group and the next group.
connect_to_validators(ctx, relay_parent, state, our_validators).await?;
......@@ -554,9 +567,11 @@ async fn handle_validator_connected<Context>(
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
state.peer_views.entry(peer_id.clone()).or_default();
declare(ctx, state, vec![peer_id]).await?;
if !state.peer_views.contains_key(&peer_id) {
// Only declare the new peers.
declare(ctx, state, vec![peer_id.clone()]).await?;
state.peer_views.insert(peer_id, Default::default());
}
Ok(())
}
......@@ -574,12 +589,14 @@ where
match bridge_message {
PeerConnected(_peer_id, _observed_role) => {
// validators first connection is handled by `handle_validator_connected`
// If it is possible that a disconnected validator would attempt a reconnect
// it should be handled here.
}
PeerViewChange(peer_id, view) => {
handle_peer_view_change(ctx, state, peer_id, view).await?;
}
PeerDisconnected(peer_id) => {
state.known_validators.retain(|_, v| *v != peer_id);
state.peer_views.remove(&peer_id);
}
OurViewChange(view) => {
......@@ -606,9 +623,7 @@ async fn handle_our_view_change(
for removed in removed.into_iter() {
state.collations.remove(removed);
if let Some(group) = state.our_validators_groups.remove(removed) {
state.known_validators.retain(|_, v| !group.contains(v));
}
state.our_validators_groups.remove(removed);
}
Ok(())
......@@ -636,7 +651,7 @@ where
loop {
if let Some(mut request) = state.last_connection_request.take() {
while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) {
state.known_validators.insert(peer_id.clone(), validator_id);
state.known_validators.insert(validator_id, peer_id.clone());
if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id).await {
warn!(
target: TARGET,
......@@ -1129,6 +1144,127 @@ mod tests {
).await;
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
let pov_block = PoV {
block_data: BlockData(vec![45, 46, 47]),
};
let pov_hash = pov_block.hash();
let current = Hash::repeat_byte(33);
let candidate = TestCandidateBuilder {
para_id: test_state.chain_ids[0],
relay_parent: current,
pov_hash,
..Default::default()
}.build();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(View(vec![current])),
),
).await;
// Send info about peer's view.
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(
test_state.validator_peer_id[2].clone(),
View(vec![current]),
)
)
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()),
).await;
// obtain the availability cores.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, current);
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
// Obtain the validator groups
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorGroups(tx)
)) => {
assert_eq!(relay_parent, current);
tx.send(Ok(test_state.validator_groups.clone())).unwrap();
}
);
// obtain the validators per relay parent
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(tx),
)) => {
assert_eq!(relay_parent, current);
tx.send(Ok(test_state.validator_public.clone())).unwrap();
}
);
// The peer is interested in a leaf that we have a collation for;
// advertise it.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
to,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
) => {
assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
relay_parent,
collating_on,
) => {
assert_eq!(relay_parent, current);
assert_eq!(collating_on, test_state.chain_ids[0]);
}
);
}
);
// obtain the validator_id to authority_id mapping
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
)) => {
assert_eq!(relay_parent, current);
assert_eq!(validators.len(), 4);
assert!(validators.contains(&test_state.validator_public[2]));
assert!(validators.contains(&test_state.validator_public[0]));
assert!(validators.contains(&test_state.validator_public[4]));
assert!(validators.contains(&test_state.validator_public[1]));
let result = vec![
Some(test_state.validator_authority_id[2].clone()),
Some(test_state.validator_authority_id[0].clone()),
Some(test_state.validator_authority_id[4].clone()),
Some(test_state.validator_authority_id[1].clone()),
];
tx.send(Ok(result)).unwrap();
}
);
});
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment