Skip to content
Snippets Groups Projects
Commit 0b39ae43 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Revert "collator-protocol: short-term fixes for connectivity (#4640)" (#4914)

* Revert "collator-protocol: fix wrong warning (#4909)"

This reverts commit 128421b5.

* Revert "collator-protocol: short-term fixes for connectivity (#4640)"

This reverts commit aff88a86

.

* make the slots great again

Co-authored-by: default avatarAndronik <write@reusable.software>
parent 6fdd0260
Branches
No related merge requests found
......@@ -321,14 +321,15 @@ impl State {
/// Distribute a collation.
///
/// If the para is not scheduled on any core, at the relay parent,
/// or the relay parent isn't in our view or we already collated on the relay parent,
/// we ignore the message as it must be invalid in that case -
/// although this indicates a logic error elsewhere in the node.
///
/// Otherwise, start advertising the collation to interested peers.
/// Figure out the core our para is assigned to and the relevant validators.
/// Issue a connection request to these validators.
/// If the para is not scheduled or next up on any core, at the relay-parent,
/// or the relay-parent isn't in the active-leaves set, we ignore the message
/// as it must be invalid in that case - although this indicates a logic error
/// elsewhere in the node.
async fn distribute_collation<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
id: ParaId,
receipt: CandidateReceipt,
......@@ -357,8 +358,32 @@ where
return Ok(())
}
if !state.our_validators_groups.contains_key(&relay_parent) {
tracing::warn!(target: LOG_TARGET, "Could not determine validators assigned to the core.");
// Determine which core the para collated-on is assigned to.
// If it is not scheduled then ignore the message.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => core,
None => {
tracing::warn!(
target: LOG_TARGET,
para_id = %id,
?relay_parent,
"looks like no core is assigned to {} at {}", id, relay_parent,
);
return Ok(())
},
};
// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
if current_validators.validators.is_empty() {
tracing::warn!(
target: LOG_TARGET,
core = ?our_core,
"there are no validators assigned to core",
);
return Ok(())
}
......@@ -369,9 +394,16 @@ where
relay_parent = %relay_parent,
candidate_hash = ?receipt.hash(),
pov_hash = ?pov.hash(),
"Accepted collation",
core = ?our_core,
?current_validators,
"Accepted collation, connecting to validators."
);
// Issue a discovery request for the validators of the current group:
connect_to_validators(ctx, current_validators.validators.into_iter().collect()).await;
state.our_validators_groups.insert(relay_parent, ValidatorGroup::new());
if let Some(result_sender) = result_sender {
state.collation_result_senders.insert(receipt.hash(), result_sender);
}
......@@ -490,7 +522,7 @@ where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
// ignore address resolution failure
// will reissue a new request on new relay parent
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
ctx.send_message(NetworkBridgeMessage::ConnectToValidators {
validator_ids,
......@@ -601,7 +633,8 @@ where
);
},
Some(id) => {
distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
distribute_collation(ctx, runtime, state, id, receipt, pov, result_sender)
.await?;
},
None => {
tracing::warn!(
......@@ -886,7 +919,7 @@ where
},
OurViewChange(view) => {
tracing::trace!(target: LOG_TARGET, ?view, "Own view change");
handle_our_view_change(ctx, runtime, state, view).await?;
handle_our_view_change(state, view).await?;
},
PeerMessage(remote, msg) => {
handle_incoming_peer_message(ctx, runtime, state, remote, msg).await?;
......@@ -900,16 +933,7 @@ where
}
/// Handles our view changes.
async fn handle_our_view_change<Context>(
ctx: &mut Context,
runtime: &mut RuntimeInfo,
state: &mut State,
view: OurView,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
{
async fn handle_our_view_change(state: &mut State, view: OurView) -> Result<()> {
for removed in state.view.difference(&view) {
tracing::debug!(target: LOG_TARGET, relay_parent = ?removed, "Removing relay parent because our view changed.");
......@@ -943,68 +967,6 @@ where
}
state.view = view;
if state.view.is_empty() {
return Ok(())
}
let id = match state.collating_on {
Some(id) => id,
None => return Ok(()),
};
// all validators assigned to the core
// across all active leaves
// this is typically our current group
// but can also include the previous group at
// rotation boundaries and considering forks
let mut group_validators = HashSet::new();
let mut maybe_core = None;
for relay_parent in state.view.iter().cloned() {
tracing::debug!(
target: LOG_TARGET,
?relay_parent,
para_id = ?id,
"Processing relay parent.",
);
// Determine our assigned core.
// If it is not scheduled then ignore the relay parent.
let (our_core, num_cores) = match determine_core(ctx, id, relay_parent).await? {
Some(core) => {
maybe_core = Some(core);
core
},
None => continue,
};
// Determine the group on that core.
let current_validators =
determine_our_validators(ctx, runtime, our_core, num_cores, relay_parent).await?;
let validators = current_validators.validators;
group_validators.extend(validators);
state.our_validators_groups.entry(relay_parent).or_insert(ValidatorGroup::new());
}
let validators: Vec<_> = group_validators.into_iter().collect();
let no_one_is_assigned = validators.is_empty();
if no_one_is_assigned {
if let Some(core) = maybe_core {
tracing::warn!(target: LOG_TARGET, ?core, "No validators assigned to our core.");
} else {
tracing::debug!(target: LOG_TARGET, "Core is occupied for all active leaves.");
}
return Ok(())
}
tracing::debug!(
target: LOG_TARGET,
?validators,
para_id = ?id,
"Connecting to validators.",
);
connect_to_validators(ctx, validators).await;
Ok(())
}
......
......@@ -29,7 +29,7 @@ use sp_core::crypto::Pair;
use sp_keyring::Sr25519Keyring;
use sp_runtime::traits::AppVerify;
use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view, OurView};
use polkadot_node_network_protocol::{our_view, request_response::IncomingRequest, view};
use polkadot_node_primitives::BlockData;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::{
......@@ -172,7 +172,13 @@ impl TestState {
our_view![self.relay_parent]
};
set_our_view(virtual_overseer, &self, our_view).await;
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
our_view,
)),
)
.await;
}
}
......@@ -272,83 +278,13 @@ async fn setup_system(virtual_overseer: &mut VirtualOverseer, test_state: &TestS
)
.await;
set_our_view(virtual_overseer, test_state, our_view![test_state.relay_parent]).await;
}
/// Check our view change triggers the right messages
async fn set_our_view(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
our_view: OurView,
) {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
our_view.clone(),
our_view![test_state.relay_parent],
)),
)
.await;
for parent in our_view.iter().cloned() {
// obtain the availability cores.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, parent);
tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
}
);
// We don't know precisely what is going to come as session info might be cached:
loop {
match overseer_recv(virtual_overseer).await {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, relay_parent);
tx.send(Ok(test_state.current_session_index())).unwrap();
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionInfo(index, tx),
)) => {
assert_eq!(relay_parent, parent);
assert_eq!(index, test_state.current_session_index());
tx.send(Ok(Some(test_state.session_info.clone()))).unwrap();
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorGroups(tx),
)) => {
assert_eq!(relay_parent, parent);
tx.send(Ok((
test_state.session_info.validator_groups.clone(),
test_state.group_rotation_info.clone(),
)))
.unwrap();
// This call is mandatory - we are done:
break
},
other => panic!("Unexpected message received: {:?}", other),
}
}
}
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
..
}
) => {}
);
}
/// Result of [`distribute_collation`]
......@@ -361,6 +297,8 @@ struct DistributeCollation {
async fn distribute_collation(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
// whether or not we expect a connection request or not.
should_connect: bool,
) -> DistributeCollation {
// Now we want to distribute a `PoVBlock`
let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) };
......@@ -381,6 +319,67 @@ async fn distribute_collation(
)
.await;
// obtain the availability cores.
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx)
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(vec![test_state.availability_core.clone()])).unwrap();
}
);
// We don't know precisely what is going to come as session info might be cached:
loop {
match overseer_recv(virtual_overseer).await {
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok(test_state.current_session_index())).unwrap();
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionInfo(index, tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(index, test_state.current_session_index());
tx.send(Ok(Some(test_state.session_info.clone()))).unwrap();
},
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorGroups(tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
tx.send(Ok((
test_state.session_info.validator_groups.clone(),
test_state.group_rotation_info.clone(),
)))
.unwrap();
// This call is mandatory - we are done:
break
},
other => panic!("Unexpected message received: {:?}", other),
}
}
if should_connect {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ConnectToValidators {
..
}
) => {}
);
}
DistributeCollation { candidate, pov_block }
}
......@@ -509,7 +508,7 @@ fn advertise_and_send_collation() {
setup_system(&mut virtual_overseer, &test_state).await;
let DistributeCollation { candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
for (val, peer) in test_state
.current_group_validator_authority_ids()
......@@ -626,7 +625,7 @@ fn advertise_and_send_collation() {
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
distribute_collation(&mut virtual_overseer, &test_state).await;
distribute_collation(&mut virtual_overseer, &test_state, true).await;
// Send info about peer's view.
overseer_send(
......@@ -714,7 +713,7 @@ fn collations_are_only_advertised_to_validators_with_correct_view() {
// And let it tell us that it is has the same view.
send_peer_view_change(virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
distribute_collation(virtual_overseer, &test_state).await;
distribute_collation(virtual_overseer, &test_state, true).await;
expect_advertise_collation_msg(virtual_overseer, &peer2, test_state.relay_parent).await;
......@@ -753,14 +752,14 @@ fn collate_on_two_different_relay_chain_blocks() {
expect_declare_msg(virtual_overseer, &test_state, &peer).await;
expect_declare_msg(virtual_overseer, &test_state, &peer2).await;
distribute_collation(virtual_overseer, &test_state).await;
distribute_collation(virtual_overseer, &test_state, true).await;
let old_relay_parent = test_state.relay_parent;
// Advance to a new round, while informing the subsystem that the old and the new relay parent are active.
test_state.advance_to_new_round(virtual_overseer, true).await;
distribute_collation(virtual_overseer, &test_state).await;
distribute_collation(virtual_overseer, &test_state, true).await;
send_peer_view_change(virtual_overseer, &peer, vec![old_relay_parent]).await;
expect_advertise_collation_msg(virtual_overseer, &peer, old_relay_parent).await;
......@@ -790,7 +789,7 @@ fn validator_reconnect_does_not_advertise_a_second_time() {
connect_peer(virtual_overseer, peer.clone(), Some(validator_id.clone())).await;
expect_declare_msg(virtual_overseer, &test_state, &peer).await;
distribute_collation(virtual_overseer, &test_state).await;
distribute_collation(virtual_overseer, &test_state, true).await;
send_peer_view_change(virtual_overseer, &peer, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(virtual_overseer, &peer, test_state.relay_parent).await;
......@@ -875,7 +874,7 @@ where
setup_system(virtual_overseer, &test_state).await;
let DistributeCollation { candidate, pov_block } =
distribute_collation(virtual_overseer, &test_state).await;
distribute_collation(virtual_overseer, &test_state, true).await;
for (val, peer) in test_state
.current_group_validator_authority_ids()
......
......@@ -58,7 +58,7 @@ pub struct CollatorEvictionPolicy {
impl Default for CollatorEvictionPolicy {
fn default() -> Self {
CollatorEvictionPolicy {
inactive_collator: Duration::from_secs(5),
inactive_collator: Duration::from_secs(24),
undeclared: Duration::from_secs(1),
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment