From 11b8e4c82187d2d2ca20f47137ad6a3663abc3e5 Mon Sep 17 00:00:00 2001 From: Robert Habermeier <rphmeier@gmail.com> Date: Sat, 3 Apr 2021 21:48:58 +0200 Subject: [PATCH] Collation protocol: stricter validators (#2810) * guide: declare one para as a collator * add ParaId to Declare messages and clean up * fix build * fix the testerinos * begin adding keystore to collator-protocol * remove request_x_ctx * add core_for_group * add bump_rotation * add some more helpers to subsystem-util * change signing_key API to take ref * determine current and next para assignments * disconnect collators who are not on current or next para * add collator peer count metric * notes for later * some fixes * add data & keystore to test state * add a test utility for answering runtime API requests * fix existing collator tests * add new tests * remove sc_keystore * update cargo lock Co-authored-by: Andronik Ordian <write@reusable.software> --- polkadot/Cargo.lock | 1 + polkadot/node/collation-generation/src/lib.rs | 20 +- .../node/core/candidate-selection/src/lib.rs | 2 +- .../availability-distribution/src/error.rs | 8 +- .../src/requester/mod.rs | 4 +- .../availability-distribution/src/runtime.rs | 8 +- .../src/session_cache.rs | 8 +- .../src/tests/mod.rs | 2 +- .../network/availability-recovery/src/lib.rs | 8 +- polkadot/node/network/bridge/src/lib.rs | 2 + .../node/network/collator-protocol/Cargo.toml | 5 +- .../collator-protocol/src/collator_side.rs | 81 +- .../node/network/collator-protocol/src/lib.rs | 28 +- .../collator-protocol/src/validator_side.rs | 1001 ++++++++++++++--- .../node/network/gossip-support/src/lib.rs | 4 +- polkadot/node/network/protocol/src/lib.rs | 4 +- polkadot/node/service/src/lib.rs | 6 +- polkadot/node/subsystem-util/src/lib.rs | 123 +- .../subsystem-util/src/validator_discovery.rs | 8 +- polkadot/primitives/src/v1.rs | 53 + .../src/node/collators/collator-protocol.md | 4 +- .../implementers-guide/src/types/network.md | 10 +- 22 files changed, 1060 insertions(+), 330 deletions(-) diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 74c0dfe3830..c71c4fa51ce 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5594,6 +5594,7 @@ dependencies = [ "polkadot-primitives", "sp-core", "sp-keyring", + "sp-keystore", "sp-runtime", "thiserror", "tracing", diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 6dba0435ca5..67b758de3a6 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -32,8 +32,8 @@ use polkadot_node_subsystem::{ FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult, }; use polkadot_node_subsystem_util::{ - request_availability_cores_ctx, request_persisted_validation_data_ctx, - request_validators_ctx, request_validation_code_ctx, + request_availability_cores, request_persisted_validation_data, + request_validators, request_validation_code, metrics::{self, prometheus}, }; use polkadot_primitives::v1::{ @@ -198,8 +198,8 @@ async fn handle_new_activations<Context: SubsystemContext>( let _relay_parent_timer = metrics.time_new_activations_relay_parent(); let (availability_cores, validators) = join!( - request_availability_cores_ctx(relay_parent, ctx).await?, - request_validators_ctx(relay_parent, ctx).await?, + request_availability_cores(relay_parent, ctx.sender()).await, + request_validators(relay_parent, ctx.sender()).await, ); let availability_cores = availability_cores??; @@ -248,13 +248,13 @@ async fn handle_new_activations<Context: SubsystemContext>( // within the subtask loop, because we have only a single mutable handle to the // context, so the work can't really be distributed - let validation_data = match request_persisted_validation_data_ctx( + let validation_data = match request_persisted_validation_data( relay_parent, scheduled_core.para_id, assumption, - ctx, + ctx.sender(), ) - .await? + .await .await?? { Some(v) => v, @@ -271,13 +271,13 @@ async fn handle_new_activations<Context: SubsystemContext>( } }; - let validation_code = match request_validation_code_ctx( + let validation_code = match request_validation_code( relay_parent, scheduled_core.para_id, assumption, - ctx, + ctx.sender(), ) - .await? + .await .await?? { Some(v) => v, diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 80cb8cfd3d4..25180f5004b 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -294,7 +294,7 @@ impl CandidateSelectionJob { ).await { Ok(response) => response, Err(err) => { - tracing::warn!( + tracing::debug!( target: LOG_TARGET, err = ?err, "failed to get collation from collator protocol subsystem", diff --git a/polkadot/node/network/availability-distribution/src/error.rs b/polkadot/node/network/availability-distribution/src/error.rs index a4491aac6c1..0b07efdc7a4 100644 --- a/polkadot/node/network/availability-distribution/src/error.rs +++ b/polkadot/node/network/availability-distribution/src/error.rs @@ -109,13 +109,9 @@ impl From<SubsystemError> for Error { /// Receive a response from a runtime request and convert errors. pub(crate) async fn recv_runtime<V>( - r: std::result::Result< - oneshot::Receiver<std::result::Result<V, RuntimeApiError>>, - UtilError, - >, + r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>, ) -> std::result::Result<V, Error> { - r.map_err(Error::UtilRequest)? - .await + r.await .map_err(Error::RuntimeRequestCanceled)? .map_err(Error::RuntimeRequest) } diff --git a/polkadot/node/network/availability-distribution/src/requester/mod.rs b/polkadot/node/network/availability-distribution/src/requester/mod.rs index 48819773558..eaef4e5f3cc 100644 --- a/polkadot/node/network/availability-distribution/src/requester/mod.rs +++ b/polkadot/node/network/availability-distribution/src/requester/mod.rs @@ -32,7 +32,7 @@ use futures::{ use sp_keystore::SyncCryptoStorePtr; -use polkadot_node_subsystem_util::request_availability_cores_ctx; +use polkadot_node_subsystem_util::request_availability_cores; use polkadot_primitives::v1::{CandidateHash, CoreState, Hash, OccupiedCore}; use polkadot_subsystem::{ messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf, @@ -235,7 +235,7 @@ async fn query_occupied_cores<Context>( where Context: SubsystemContext, { - let cores = recv_runtime(request_availability_cores_ctx(relay_parent, ctx).await).await?; + let cores = recv_runtime(request_availability_cores(relay_parent, ctx.sender()).await).await?; Ok(cores .into_iter() diff --git a/polkadot/node/network/availability-distribution/src/runtime.rs b/polkadot/node/network/availability-distribution/src/runtime.rs index 0bc81d3e70f..39022e82503 100644 --- a/polkadot/node/network/availability-distribution/src/runtime.rs +++ b/polkadot/node/network/availability-distribution/src/runtime.rs @@ -23,7 +23,7 @@ use sp_core::crypto::Public; use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; use polkadot_node_subsystem_util::{ - request_session_index_for_child_ctx, request_session_info_ctx, + request_session_index_for_child, request_session_info, }; use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex}; use polkadot_subsystem::SubsystemContext; @@ -93,7 +93,7 @@ impl Runtime { Some(index) => Ok(*index), None => { let index = - recv_runtime(request_session_index_for_child_ctx(parent, ctx).await) + recv_runtime(request_session_index_for_child(parent, ctx.sender()).await) .await?; self.session_index_cache.put(parent, index); Ok(index) @@ -117,7 +117,7 @@ impl Runtime { /// Get `ExtendedSessionInfo` by session index. /// - /// `request_session_info_ctx` still requires the parent to be passed in, so we take the parent + /// `request_session_info` still requires the parent to be passed in, so we take the parent /// in addition to the `SessionIndex`. pub async fn get_session_info_by_index<'a, Context>( &'a mut self, @@ -130,7 +130,7 @@ impl Runtime { { if !self.session_info_cache.contains(&session_index) { let session_info = - recv_runtime(request_session_info_ctx(parent, session_index, ctx).await) + recv_runtime(request_session_info(parent, session_index, ctx.sender()).await) .await? .ok_or(Error::NoSuchSession(session_index))?; let validator_info = self.get_validator_info(&session_info).await?; diff --git a/polkadot/node/network/availability-distribution/src/session_cache.rs b/polkadot/node/network/availability-distribution/src/session_cache.rs index e4e2bce41fc..0b2b519bb64 100644 --- a/polkadot/node/network/availability-distribution/src/session_cache.rs +++ b/polkadot/node/network/availability-distribution/src/session_cache.rs @@ -24,7 +24,7 @@ use sp_core::crypto::Public; use sp_keystore::{CryptoStore, SyncCryptoStorePtr}; use polkadot_node_subsystem_util::{ - request_session_index_for_child_ctx, request_session_info_ctx, + request_session_index_for_child, request_session_info, }; use polkadot_primitives::v1::SessionInfo as GlobalSessionInfo; use polkadot_primitives::v1::{ @@ -132,7 +132,7 @@ impl SessionCache { Some(index) => *index, None => { let index = - recv_runtime(request_session_index_for_child_ctx(parent, ctx).await) + recv_runtime(request_session_index_for_child(parent, ctx.sender()).await) .await?; self.session_index_cache.put(parent, index); index @@ -210,7 +210,7 @@ impl SessionCache { /// Query needed information from runtime. /// - /// We need to pass in the relay parent for our call to `request_session_info_ctx`. We should + /// We need to pass in the relay parent for our call to `request_session_info`. We should /// actually don't need that: I suppose it is used for internal caching based on relay parents, /// which we don't use here. It should not do any harm though. /// @@ -229,7 +229,7 @@ impl SessionCache { discovery_keys, mut validator_groups, .. - } = recv_runtime(request_session_info_ctx(parent, session_index, ctx).await) + } = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await) .await? .ok_or(Error::NoSuchSession(session_index))?; diff --git a/polkadot/node/network/availability-distribution/src/tests/mod.rs b/polkadot/node/network/availability-distribution/src/tests/mod.rs index 6a2caaca5ff..ed793e18b10 100644 --- a/polkadot/node/network/availability-distribution/src/tests/mod.rs +++ b/polkadot/node/network/availability-distribution/src/tests/mod.rs @@ -111,7 +111,7 @@ fn check_fetch_retry() { }) .collect(); state.valid_chunks.retain(|(ch, _)| valid_candidate_hashes.contains(ch)); - + for (_, v) in state.chunks.iter_mut() { // This should still succeed as cores are still pending availability on next block. diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 3a8474de8a1..75cd274bba9 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -49,7 +49,7 @@ use polkadot_node_network_protocol::{ request::RequestError, }, }; -use polkadot_node_subsystem_util::request_session_info_ctx; +use polkadot_node_subsystem_util::request_session_info; use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1}; mod error; @@ -697,11 +697,11 @@ async fn handle_recover( } let _span = span.child("not-cached"); - let session_info = request_session_info_ctx( + let session_info = request_session_info( state.live_block.1, session_index, - ctx, - ).await?.await.map_err(error::Error::CanceledSessionInfo)??; + ctx.sender(), + ).await.await.map_err(error::Error::CanceledSessionInfo)??; let _span = span.child("session-info-ctx-received"); match session_info { diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index e9816f4395e..ef1c0ac1fd6 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -1794,6 +1794,7 @@ mod tests { let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare( Sr25519Keyring::Alice.public().into(), Default::default(), + Default::default(), ); let message = protocol_v1::CollationProtocol::CollatorProtocol( @@ -2064,6 +2065,7 @@ mod tests { let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare( Sr25519Keyring::Alice.public().into(), Default::default(), + Default::default(), ); let message = protocol_v1::CollationProtocol::CollatorProtocol( diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index c8820c19551..817d6511b04 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -11,8 +11,9 @@ futures-timer = "3" thiserror = "1.0.23" tracing = "0.1.25" -sp-core = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" } -sp-runtime = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-primitives = { path = "../../../primitives" } polkadot-node-network-protocol = { path = "../../network/protocol" } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 5a492e87555..88dbe93d4f0 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -37,9 +37,9 @@ use polkadot_node_network_protocol::{ }; use polkadot_node_subsystem_util::{ validator_discovery, - request_validators_ctx, - request_validator_groups_ctx, - request_availability_cores_ctx, + request_validators, + request_validator_groups, + request_availability_cores, metrics::{self, prometheus}, }; use polkadot_node_primitives::{SignedFullStatement, Statement, PoV, CompressedPoV}; @@ -380,7 +380,7 @@ async fn determine_core( para_id: ParaId, relay_parent: Hash, ) -> Result<Option<(CoreIndex, usize)>> { - let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??; + let cores = request_availability_cores(relay_parent, ctx.sender()).await.await??; for (idx, core) in cores.iter().enumerate() { if let CoreState::Scheduled(occupied) = core { @@ -403,7 +403,7 @@ async fn determine_our_validators( cores: usize, relay_parent: Hash, ) -> Result<(HashSet<ValidatorId>, HashSet<ValidatorId>)> { - let groups = request_validator_groups_ctx(relay_parent, ctx).await?; + let groups = request_validator_groups(relay_parent, ctx.sender()).await; let groups = groups.await??; @@ -413,7 +413,7 @@ async fn determine_our_validators( let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len(); let next_validators = groups.0.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default(); - let validators = request_validators_ctx(relay_parent, ctx).await?.await??; + let validators = request_validators(relay_parent, ctx.sender()).await.await??; let current_validators = current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect(); let next_validators = next_validators.iter().map(|i| validators[i.0 as usize].clone()).collect(); @@ -430,17 +430,20 @@ async fn declare( ) { let declare_signature_payload = protocol_v1::declare_signature_payload(&state.local_peer_id); - let wire_message = protocol_v1::CollatorProtocolMessage::Declare( - state.collator_pair.public(), - state.collator_pair.sign(&declare_signature_payload), - ); + if let Some(para_id) = state.collating_on { + let wire_message = protocol_v1::CollatorProtocolMessage::Declare( + state.collator_pair.public(), + para_id, + state.collator_pair.sign(&declare_signature_payload), + ); - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - vec![peer], - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - )).await; + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + vec![peer], + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + )).await; + } } /// Issue a connection request to a set of validators and @@ -476,11 +479,6 @@ async fn advertise_collation( relay_parent: Hash, peer: PeerId, ) { - let collating_on = match state.collating_on { - Some(collating_on) => collating_on, - None => return, - }; - let should_advertise = state.our_validators_groups .get(&relay_parent) .map(|g| g.should_advertise_to(&peer)) @@ -518,7 +516,6 @@ async fn advertise_collation( let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation( relay_parent, - collating_on, ); ctx.send_message(AllMessages::NetworkBridge( @@ -705,14 +702,14 @@ async fn handle_incoming_peer_message( use protocol_v1::CollatorProtocolMessage::*; match msg { - Declare(_, _) => { + Declare(_, _, _) => { tracing::warn!( target: LOG_TARGET, ?origin, "Declare message is not expected on the collator side of the protocol", ); } - AdvertiseCollation(_, _) => { + AdvertiseCollation(_) => { tracing::warn!( target: LOG_TARGET, ?origin, @@ -772,6 +769,12 @@ async fn handle_validator_connected( validator_id: ValidatorId, relay_parent: Hash, ) { + tracing::trace!( + target: LOG_TARGET, + ?validator_id, + "Connected to requested validator" + ); + let not_declared = state.declared_at.insert(peer_id.clone()); if not_declared { @@ -1382,7 +1385,11 @@ mod tests { } /// Check that the next received message is a `Declare` message. - async fn expect_declare_msg(virtual_overseer: &mut VirtualOverseer, test_state: &TestState, peer: &PeerId) { + async fn expect_declare_msg( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, + peer: &PeerId, + ) { assert_matches!( overseer_recv(virtual_overseer).await, AllMessages::NetworkBridge( @@ -1394,12 +1401,17 @@ mod tests { assert_eq!(to[0], *peer); assert_matches!( wire_message, - protocol_v1::CollatorProtocolMessage::Declare(collator_id, signature) => { + protocol_v1::CollatorProtocolMessage::Declare( + collator_id, + para_id, + signature, + ) => { assert!(signature.verify( &*protocol_v1::declare_signature_payload(&test_state.local_peer_id), &collator_id), ); assert_eq!(collator_id, test_state.collator_pair.public()); + assert_eq!(para_id, test_state.para_id); } ); } @@ -1409,7 +1421,6 @@ mod tests { /// Check that the next received message is a collation advertisment message. async fn expect_advertise_collation_msg( virtual_overseer: &mut VirtualOverseer, - test_state: &TestState, peer: &PeerId, expected_relay_parent: Hash, ) { @@ -1426,10 +1437,8 @@ mod tests { wire_message, protocol_v1::CollatorProtocolMessage::AdvertiseCollation( relay_parent, - collating_on, ) => { assert_eq!(relay_parent, expected_relay_parent); - assert_eq!(collating_on, test_state.para_id); } ); } @@ -1478,7 +1487,7 @@ mod tests { // The peer is interested in a leaf that we have a collation for; // advertise it. - expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; + expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await; // Request a collation. let (tx, rx) = oneshot::channel(); @@ -1556,7 +1565,7 @@ mod tests { ) ).await; - expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; + expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await; }); } @@ -1619,13 +1628,13 @@ mod tests { expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await; - expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await; + expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await; // The other validator announces that it changed its view. send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; // After changing the view we should receive the advertisement - expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; + expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await; }) } @@ -1669,10 +1678,10 @@ mod tests { connected.try_send((validator_id2, peer2.clone())).unwrap(); send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await; - expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, old_relay_parent).await; + expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await; send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await; - expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await; + expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await; }) } @@ -1698,7 +1707,7 @@ mod tests { expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await; send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await; - expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await; + expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await; // Disconnect and reconnect directly disconnect_peer(&mut virtual_overseer, peer.clone()).await; diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index 62d264a8bbe..2f85f4c0c9e 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -25,6 +25,8 @@ use std::time::Duration; use futures::{channel::oneshot, FutureExt, TryFutureExt}; use thiserror::Error; +use sp_keystore::SyncCryptoStorePtr; + use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange as Rep}; use polkadot_node_subsystem_util::{self as util, metrics::prometheus}; use polkadot_primitives::v1::CollatorPair; @@ -57,18 +59,33 @@ type Result<T> = std::result::Result<T, Error>; /// A collator eviction policy - how fast to evict collators which are inactive. #[derive(Debug, Clone, Copy)] -pub struct CollatorEvictionPolicy(pub Duration); +pub struct CollatorEvictionPolicy { + /// How fast to evict collators who are inactive. + pub inactive_collator: Duration, + /// How fast to evict peers which don't declare their para. + pub undeclared: Duration, +} impl Default for CollatorEvictionPolicy { fn default() -> Self { - CollatorEvictionPolicy(Duration::from_secs(24)) + CollatorEvictionPolicy { + inactive_collator: Duration::from_secs(24), + undeclared: Duration::from_secs(1), + } } } /// What side of the collator protocol is being engaged pub enum ProtocolSide { /// Validators operate on the relay chain. - Validator(CollatorEvictionPolicy, validator_side::Metrics), + Validator { + /// The keystore holding validator keys. + keystore: SyncCryptoStorePtr, + /// An eviction policy for inactive peers or validators. + eviction_policy: CollatorEvictionPolicy, + /// Prometheus metrics for validators. + metrics: validator_side::Metrics, + }, /// Collators operate on a parachain. Collator(PeerId, CollatorPair, collator_side::Metrics), } @@ -95,9 +112,10 @@ impl CollatorProtocolSubsystem { Context: SubsystemContext<Message = CollatorProtocolMessage>, { match self.protocol_side { - ProtocolSide::Validator(policy, metrics) => validator_side::run( + ProtocolSide::Validator { keystore, eviction_policy, metrics } => validator_side::run( ctx, - policy, + keystore, + eviction_policy, metrics, ).await, ProtocolSide::Collator(local_peer_id, collator_pair, metrics) => collator_side::run( diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index c489643a192..81a904fe69e 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see <http://www.gnu.org/licenses/>. use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll}; +use std::collections::hash_map::Entry; use std::time::{Duration, Instant}; use always_assert::never; use futures::{ @@ -22,6 +23,8 @@ use futures::{ }; use futures_timer::Delay; +use sp_keystore::SyncCryptoStorePtr; + use polkadot_node_network_protocol::{ request_response as req_res, v1 as protocol_v1, peer_set::PeerSet, @@ -41,7 +44,7 @@ use polkadot_subsystem::{ AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, IfDisconnected, NetworkBridgeEvent, NetworkBridgeMessage, }, - FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, + FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, SubsystemSender, }; use super::{modify_reputation, Result, LOG_TARGET}; @@ -54,6 +57,8 @@ const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error"); const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out"); const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Invalid network message signature"); const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem"); +const COST_WRONG_PARA: Rep = Rep::Malicious("A collator provided a collation for the wrong para"); +const COST_UNNEEDED_COLLATOR: Rep = Rep::CostMinor("An unneeded collator connected"); const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem"); // How often to check all peers with activity. @@ -85,6 +90,11 @@ impl Metrics { fn time_handle_collation_request_result(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> { self.0.as_ref().map(|metrics| metrics.handle_collation_request_result.start_timer()) } + + /// Note the current number of collator peers. + fn note_collator_peer_count(&self, collator_peers: usize) { + self.0.as_ref().map(|metrics| metrics.collator_peer_count.set(collator_peers as u64)); + } } #[derive(Clone)] @@ -92,6 +102,7 @@ struct MetricsInner { collation_requests: prometheus::CounterVec<prometheus::U64>, process_msg: prometheus::Histogram, handle_collation_request_result: prometheus::Histogram, + collator_peer_count: prometheus::Gauge<prometheus::U64>, } impl metrics::Metrics for Metrics { @@ -127,6 +138,13 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + collator_peer_count: prometheus::register( + prometheus::Gauge::new( + "parachain_collator_peer_count", + "Amount of collator peers connected", + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) @@ -142,25 +160,132 @@ struct PerRequest { span: Option<jaeger::Span>, } +struct CollatingPeerState { + collator_id: CollatorId, + para_id: ParaId, + // Advertised relay parents. + advertisements: HashSet<Hash>, + last_active: Instant, +} + +enum PeerState { + // The peer has connected at the given instant. + Connected(Instant), + // Thepe + Collating(CollatingPeerState), +} + +#[derive(Debug)] +enum AdvertisementError { + Duplicate, + OutOfOurView, + UndeclaredCollator, +} + struct PeerData { view: View, - last_active: Instant, + state: PeerState, } impl PeerData { fn new(view: View) -> Self { PeerData { view, + state: PeerState::Connected(Instant::now()), + } + } + + /// Update the view, clearing all advertisements that are no longer in the + /// current view. + fn update_view(&mut self, new_view: View) { + let old_view = std::mem::replace(&mut self.view, new_view); + if let PeerState::Collating(ref mut peer_state) = self.state { + for removed in old_view.difference(&self.view) { + let _ = peer_state.advertisements.remove(&removed); + } + } + } + + /// Prune old advertisements relative to our view. + fn prune_old_advertisements(&mut self, our_view: &View) { + if let PeerState::Collating(ref mut peer_state) = self.state { + peer_state.advertisements.retain(|a| our_view.contains(a)); + } + } + + /// Note an advertisement by the collator. Returns `true` if the advertisement was imported + /// successfully. Fails if the advertisement is duplicate, out of view, or the peer has not + /// declared itself a collator. + fn insert_advertisement( + &mut self, + on_relay_parent: Hash, + our_view: &View, + ) + -> std::result::Result<(CollatorId, ParaId), AdvertisementError> + { + match self.state { + PeerState::Connected(_) => Err(AdvertisementError::UndeclaredCollator), + _ if !our_view.contains(&on_relay_parent) => Err(AdvertisementError::OutOfOurView), + PeerState::Collating(ref mut state) => { + if state.advertisements.insert(on_relay_parent) { + state.last_active = Instant::now(); + Ok((state.collator_id.clone(), state.para_id.clone())) + } else { + Err(AdvertisementError::Duplicate) + } + } + } + } + + /// Whether a peer is collating. + fn is_collating(&self) -> bool { + match self.state { + PeerState::Connected(_) => false, + PeerState::Collating(_) => true, + } + } + + /// Note that a peer is now collating with the given collator and para ids. + /// + /// This will overwrite any previous call to `set_collating` and should only be called + /// if `is_collating` is false. + fn set_collating(&mut self, collator_id: CollatorId, para_id: ParaId) { + self.state = PeerState::Collating(CollatingPeerState { + collator_id, + para_id, + advertisements: HashSet::new(), last_active: Instant::now(), + }); + } + + fn collator_id(&self) -> Option<&CollatorId> { + match self.state { + PeerState::Connected(_) => None, + PeerState::Collating(ref state) => Some(&state.collator_id), + } + } + + fn collating_para(&self) -> Option<ParaId> { + match self.state { + PeerState::Connected(_) => None, + PeerState::Collating(ref state) => Some(state.para_id), } } - fn note_active(&mut self) { - self.last_active = Instant::now(); + /// Whether the peer has advertised the given collation. + fn has_advertised(&self, relay_parent: &Hash) -> bool { + match self.state { + PeerState::Connected(_) => false, + PeerState::Collating(ref state) => state.advertisements.contains(relay_parent), + } } - fn active_since(&self, instant: Instant) -> bool { - self.last_active >= instant + /// Whether the peer is now inactive according to the current instant and the eviction policy. + fn is_inactive(&self, now: Instant, policy: &crate::CollatorEvictionPolicy) -> bool { + match self.state { + PeerState::Connected(connected_at) => connected_at + policy.undeclared < now, + PeerState::Collating(ref state) => state.last_active + policy.inactive_collator < now, + } } } @@ -170,22 +295,160 @@ impl Default for PeerData { } } +struct GroupAssignments { + current: Option<ParaId>, + next: Option<ParaId>, +} + +#[derive(Default)] +struct ActiveParas { + relay_parent_assignments: HashMap<Hash, GroupAssignments>, + current_assignments: HashMap<ParaId, usize>, + next_assignments: HashMap<ParaId, usize> +} + +impl ActiveParas { + async fn assign_incoming( + &mut self, + sender: &mut impl SubsystemSender, + keystore: &SyncCryptoStorePtr, + new_relay_parents: impl IntoIterator<Item = Hash>, + ) { + for relay_parent in new_relay_parents { + let mv = polkadot_node_subsystem_util::request_validators(relay_parent, sender) + .await + .await + .ok() + .map(|x| x.ok()) + .flatten(); + + let mg = polkadot_node_subsystem_util::request_validator_groups(relay_parent, sender) + .await + .await + .ok() + .map(|x| x.ok()) + .flatten(); + + + let mc = polkadot_node_subsystem_util::request_availability_cores(relay_parent, sender) + .await + .await + .ok() + .map(|x| x.ok()) + .flatten(); + + let (validators, groups, rotation_info, cores) = match (mv, mg, mc) { + (Some(v), Some((g, r)), Some(c)) => (v, g, r, c), + _ => { + tracing::debug!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + "Failed to query runtime API for relay-parent", + ); + + continue + } + }; + + let (para_now, para_next) = match polkadot_node_subsystem_util + ::signing_key_and_index(&validators, keystore) + .await + .and_then(|(_, index)| polkadot_node_subsystem_util::find_validator_group( + &groups, + index, + )) + { + Some(group) => { + let next_rotation_info = rotation_info.bump_rotation(); + + let core_now = rotation_info.core_for_group(group, cores.len()); + let core_next = next_rotation_info.core_for_group(group, cores.len()); + + ( + cores.get(core_now.0 as usize).and_then(|c| c.para_id()), + cores.get(core_next.0 as usize).and_then(|c| c.para_id()), + ) + } + None => { + tracing::trace!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + "Not a validator", + ); + + continue + } + }; + + // This code won't work well, if at all for parathreads. For parathreads we'll + // have to be aware of which core the parathread claim is going to be multiplexed + // onto. The parathread claim will also have a known collator, and we should always + // allow an incoming connection from that collator. If not even connecting to them + // directly. + // + // However, this'll work fine for parachains, as each parachain gets a dedicated + // core. + if let Some(para_now) = para_now { + *self.current_assignments.entry(para_now).or_default() += 1; + } + + if let Some(para_next) = para_next { + *self.next_assignments.entry(para_next).or_default() += 1; + } + + self.relay_parent_assignments.insert( + relay_parent, + GroupAssignments { current: para_now, next: para_next }, + ); + } + } + + fn remove_outgoing( + &mut self, + old_relay_parents: impl IntoIterator<Item = Hash>, + ) { + for old_relay_parent in old_relay_parents { + if let Some(assignments) = self.relay_parent_assignments.remove(&old_relay_parent) { + let GroupAssignments { current, next } = assignments; + + if let Some(cur) = current { + if let Entry::Occupied(mut occupied) = self.current_assignments.entry(cur) { + *occupied.get_mut() -= 1; + if *occupied.get() == 0 { + occupied.remove_entry(); + } + } + } + + if let Some(next) = next { + if let Entry::Occupied(mut occupied) = self.next_assignments.entry(next) { + *occupied.get_mut() -= 1; + if *occupied.get() == 0 { + occupied.remove_entry(); + } + } + } + } + } + } + + fn is_current_or_next(&self, id: ParaId) -> bool { + self.current_assignments.contains_key(&id) || self.next_assignments.contains_key(&id) + } +} + /// All state relevant for the validator side of the protocol lives here. #[derive(Default)] struct State { /// Our own view. view: OurView, + /// Active paras based on our view. We only accept collators from these paras. + active_paras: ActiveParas, + /// Track all active collators and their data. peer_data: HashMap<PeerId, PeerData>, - /// Peers that have declared themselves as collators. - known_collators: HashMap<PeerId, CollatorId>, - - /// Advertisements received from collators. We accept one advertisement - /// per collator per source per relay-parent. - advertisements: HashMap<PeerId, HashSet<(ParaId, Hash)>>, - /// The collations we have requested by relay parent and para id. /// /// For each relay parent and para id we may be connected to a number @@ -200,6 +463,24 @@ struct State { span_per_relay_parent: HashMap<Hash, PerLeafSpan>, } +// O(n) search for collator ID by iterating through the peers map. This should be fast enough +// unless a large amount of peers is expected. +fn collator_peer_id( + peer_data: &HashMap<PeerId, PeerData>, + collator_id: &CollatorId, +) -> Option<PeerId> { + peer_data.iter() + .find_map(|(peer, data)| + data.collator_id().filter(|c| c == &collator_id).map(|_| peer.clone()) + ) +} + +async fn disconnect_peer(ctx: &mut impl SubsystemContext, peer_id: PeerId) { + ctx.send_message( + NetworkBridgeMessage::DisconnectPeer(peer_id, PeerSet::Collation).into() + ).await +} + /// Another subsystem has requested to fetch collations on a particular leaf for some para. #[tracing::instrument(level = "trace", skip(ctx, state, tx), fields(subsystem = LOG_TARGET))] async fn fetch_collation<Context>( @@ -213,59 +494,51 @@ async fn fetch_collation<Context>( where Context: SubsystemContext<Message = CollatorProtocolMessage> { - let relevant_advertiser = state.advertisements.iter().find_map(|(k, v)| { - if v.contains(&(para_id, relay_parent)) && state.known_collators.get(k) == Some(&collator_id) { - Some(k.clone()) - } else { - None - } - }); + let peer_id = match collator_peer_id(&state.peer_data, &collator_id) { + None => return, + Some(p) => p, + }; - // Request the collation. - // Assume it is `request_collation`'s job to check and ignore duplicate requests. - if let Some(relevant_advertiser) = relevant_advertiser { - request_collation(ctx, state, relay_parent, para_id, relevant_advertiser, tx).await; + if state.peer_data.get(&peer_id).map_or(false, |d| d.has_advertised(&relay_parent)) { + request_collation(ctx, state, relay_parent, para_id, peer_id, tx).await; } } /// Report a collator for some malicious actions. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, peer_data), fields(subsystem = LOG_TARGET))] async fn report_collator<Context>( ctx: &mut Context, - state: &mut State, + peer_data: &HashMap<PeerId, PeerData>, id: CollatorId, ) where Context: SubsystemContext<Message = CollatorProtocolMessage> { - // Since we have a one way map of PeerId -> CollatorId we have to - // iterate here. Since a huge amount of peers is not expected this - // is a tolerable thing to do. - for (k, _) in state.known_collators.iter().filter(|d| *d.1 == id) { - modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await; + if let Some(peer_id) = collator_peer_id(peer_data, &id) { + modify_reputation(ctx, peer_id, COST_REPORT_BAD).await; } } /// Some other subsystem has reported a collator as a good one, bump reputation. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, peer_data), fields(subsystem = LOG_TARGET))] async fn note_good_collation<Context>( ctx: &mut Context, - state: &mut State, + peer_data: &HashMap<PeerId, PeerData>, id: CollatorId, ) where Context: SubsystemContext<Message = CollatorProtocolMessage> { - for (peer_id, _) in state.known_collators.iter().filter(|d| *d.1 == id) { - modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await; + if let Some(peer_id) = collator_peer_id(peer_data, &id) { + modify_reputation(ctx, peer_id, BENEFIT_NOTIFY_GOOD).await; } } /// Notify a collator that its collation got seconded. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, peer_data), fields(subsystem = LOG_TARGET))] async fn notify_collation_seconded( ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>, - state: &mut State, + peer_data: &HashMap<PeerId, PeerData>, id: CollatorId, statement: SignedFullStatement, ) { @@ -278,16 +551,12 @@ async fn notify_collation_seconded( return; } - let peer_ids = state.known_collators.iter() - .filter_map(|(p, c)| if *c == id { Some(p.clone()) } else { None }) - .collect::<Vec<_>>(); - - if !peer_ids.is_empty() { + if let Some(peer_id) = collator_peer_id(peer_data, &id) { let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(statement); ctx.send_message(AllMessages::NetworkBridge( NetworkBridgeMessage::SendCollationMessage( - peer_ids, + vec![peer_id], protocol_v1::CollationProtocol::CollatorProtocol(wire_message), ) )).await; @@ -303,19 +572,11 @@ async fn handle_peer_view_change( peer_id: PeerId, view: View, ) -> Result<()> { - let current = state.peer_data.entry(peer_id.clone()).or_default(); - - let removed: Vec<_> = current.view.difference(&view).cloned().collect(); + let peer_data = state.peer_data.entry(peer_id.clone()).or_default(); - current.view = view; - - if let Some(advertisements) = state.advertisements.get_mut(&peer_id) { - advertisements.retain(|(_, relay_parent)| !removed.contains(relay_parent)); - } - - for removed in removed.into_iter() { - state.requested_collations.retain(|k, _| k.0 != removed || k.2 != peer_id); - } + peer_data.update_view(view); + state.requested_collations + .retain(|(rp, _, pid), _| pid != &peer_id || !peer_data.has_advertised(&rp)); Ok(()) } @@ -374,7 +635,6 @@ where s.child("collation-request") .with_para_id(para_id) }), - }; state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), per_request); @@ -426,74 +686,100 @@ where use protocol_v1::CollatorProtocolMessage::*; use sp_runtime::traits::AppVerify; - if let Some(d) = state.peer_data.get_mut(&origin) { - d.note_active(); - } - match msg { - Declare(id, signature) => { - if !signature.verify(&*protocol_v1::declare_signature_payload(&origin), &id) { - modify_reputation(ctx, origin, COST_INVALID_SIGNATURE).await; - return; + Declare(collator_id, para_id, signature) => { + if collator_peer_id(&state.peer_data, &collator_id).is_some() { + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + return } - tracing::debug!( - target: LOG_TARGET, - peer_id = ?origin, - "Declared as collator", - ); + let peer_data = match state.peer_data.get_mut(&origin) { + Some(p) => p, + None => { + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + return + } + }; - if state.known_collators.insert(origin.clone(), id).is_some() { - modify_reputation(ctx, origin.clone(), COST_UNEXPECTED_MESSAGE).await; + if peer_data.is_collating() { + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + return } - } - AdvertiseCollation(relay_parent, para_id) => { - let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation")); - if !state.view.contains(&relay_parent) { + if !signature.verify(&*protocol_v1::declare_signature_payload(&origin), &collator_id) { + modify_reputation(ctx, origin, COST_INVALID_SIGNATURE).await; + return + } + + if state.active_paras.is_current_or_next(para_id) { tracing::debug!( target: LOG_TARGET, peer_id = ?origin, - %para_id, - ?relay_parent, - "Advertise collation out of view", + ?collator_id, + ?para_id, + "Declared as collator for current or next para", ); - modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; - return; - } - - if !state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent)) { + peer_data.set_collating(collator_id, para_id); + } else { tracing::debug!( target: LOG_TARGET, peer_id = ?origin, - %para_id, - ?relay_parent, - "Multiple collations for same relay-parent advertised", + ?collator_id, + ?para_id, + "Declared as collator for unneeded para", ); - modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; - return; + modify_reputation(ctx, origin.clone(), COST_UNNEEDED_COLLATOR).await; + disconnect_peer(ctx, origin).await; } + } + AdvertiseCollation(relay_parent) => { + let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation")); - if let Some(collator) = state.known_collators.get(&origin) { + if !state.view.contains(&relay_parent) { tracing::debug!( target: LOG_TARGET, peer_id = ?origin, - %para_id, ?relay_parent, - "Received advertise collation", + "Advertise collation out of view", ); - notify_candidate_selection(ctx, collator.clone(), relay_parent, para_id).await; - } else { - tracing::debug!( - target: LOG_TARGET, - peer_id = ?origin, - %para_id, - ?relay_parent, - "Advertise collation received from an unknown collator", - ); + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + return; + } + + let peer_data = match state.peer_data.get_mut(&origin) { + None => { + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + return; + } + Some(p) => p, + }; + + match peer_data.insert_advertisement(relay_parent, &state.view) { + Ok((collator_id, para_id)) => { + tracing::debug!( + target: LOG_TARGET, + peer_id = ?origin, + %para_id, + ?relay_parent, + "Received advertise collation", + ); + + notify_candidate_selection(ctx, collator_id, relay_parent, para_id).await; + } + Err(e) => { + tracing::debug!( + target: LOG_TARGET, + peer_id = ?origin, + ?relay_parent, + error = ?e, + "Invalid advertisement", + ); + + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + } } } CollationSeconded(_) => { @@ -522,9 +808,11 @@ async fn remove_relay_parent( } /// Our view has changed. -#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, state, keystore), fields(subsystem = LOG_TARGET))] async fn handle_our_view_change( + ctx: &mut impl SubsystemContext, state: &mut State, + keystore: &SyncCryptoStorePtr, view: OurView, ) -> Result<()> { let old_view = std::mem::replace(&mut state.view, view); @@ -535,28 +823,48 @@ async fn handle_our_view_change( .filter(|v| !old_view.contains(&v.0)) .map(|v| (v.0.clone(), v.1.clone())) .collect(); + added.into_iter().for_each(|(h, s)| { state.span_per_relay_parent.insert(h, PerLeafSpan::new(s, "validator-side")); }); + let added = state.view.difference(&old_view).cloned().collect::<Vec<_>>(); let removed = old_view .difference(&state.view) .cloned() .collect::<Vec<_>>(); - for removed in removed.into_iter() { + for removed in removed.iter().cloned() { remove_relay_parent(state, removed).await?; state.span_per_relay_parent.remove(&removed); } + state.active_paras.assign_incoming(ctx.sender(), keystore, added).await; + state.active_paras.remove_outgoing(removed); + + for (peer_id, peer_data) in state.peer_data.iter_mut() { + peer_data.prune_old_advertisements(&state.view); + + // Disconnect peers who are not relevant to our current or next para. + // + // If the peer hasn't declared yet, they will be disconnected if they do not + // declare. + if let Some(para_id) = peer_data.collating_para() { + if !state.active_paras.is_current_or_next(para_id) { + disconnect_peer(ctx, peer_id.clone()).await; + } + } + } + Ok(()) } /// Bridge event switch. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, state, keystore), fields(subsystem = LOG_TARGET))] async fn handle_network_msg<Context>( ctx: &mut Context, state: &mut State, + keystore: &SyncCryptoStorePtr, bridge_message: NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>, ) -> Result<()> where @@ -567,16 +875,17 @@ where match bridge_message { PeerConnected(peer_id, _role) => { state.peer_data.entry(peer_id).or_default(); + state.metrics.note_collator_peer_count(state.peer_data.len()); }, PeerDisconnected(peer_id) => { - state.known_collators.remove(&peer_id); state.peer_data.remove(&peer_id); + state.metrics.note_collator_peer_count(state.peer_data.len()); }, PeerViewChange(peer_id, view) => { handle_peer_view_change(state, peer_id, view).await?; }, OurViewChange(view) => { - handle_our_view_change(state, view).await?; + handle_our_view_change(ctx, state, keystore, view).await?; }, PeerMessage(remote, msg) => { process_incoming_peer_message(ctx, state, remote, msg).await; @@ -587,9 +896,10 @@ where } /// The main message receiver switch. -#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(level = "trace", skip(ctx, keystore, state), fields(subsystem = LOG_TARGET))] async fn process_msg<Context>( ctx: &mut Context, + keystore: &SyncCryptoStorePtr, msg: CollatorProtocolMessage, state: &mut State, ) @@ -619,18 +929,19 @@ where fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await; } ReportCollator(id) => { - report_collator(ctx, state, id).await; + report_collator(ctx, &state.peer_data, id).await; } NoteGoodCollation(id) => { - note_good_collation(ctx, state, id).await; + note_good_collation(ctx, &state.peer_data, id).await; } NotifyCollationSeconded(id, statement) => { - notify_collation_seconded(ctx, state, id, statement).await; + notify_collation_seconded(ctx, &state.peer_data, id, statement).await; } NetworkBridgeUpdateV1(event) => { if let Err(e) = handle_network_msg( ctx, state, + keystore, event, ).await { tracing::warn!( @@ -662,9 +973,10 @@ async fn wait_until_next_check(last_poll: Instant) -> Instant { } /// The main run loop. -#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))] +#[tracing::instrument(skip(ctx, keystore, metrics), fields(subsystem = LOG_TARGET))] pub(crate) async fn run<Context>( mut ctx: Context, + keystore: SyncCryptoStorePtr, eviction_policy: crate::CollatorEvictionPolicy, metrics: Metrics, ) -> Result<()> @@ -704,7 +1016,12 @@ pub(crate) async fn run<Context>( tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message"); match msg { - Communication { msg } => process_msg(&mut ctx, msg, &mut state).await, + Communication { msg } => process_msg( + &mut ctx, + &keystore, + msg, + &mut state, + ).await, Signal(BlockFinalized(..)) => {} Signal(ActiveLeaves(_)) => {} Signal(Conclude) => { break } @@ -713,7 +1030,7 @@ pub(crate) async fn run<Context>( continue } Some(Either::Right(())) => { - disconnect_inactive_peers(&mut ctx, eviction_policy, &state.peer_data).await; + disconnect_inactive_peers(&mut ctx, &eviction_policy, &state.peer_data).await; continue } None => {} @@ -741,19 +1058,13 @@ pub(crate) async fn run<Context>( // receipt of the `PeerDisconnected` event. async fn disconnect_inactive_peers( ctx: &mut impl SubsystemContext, - eviction_policy: crate::CollatorEvictionPolicy, + eviction_policy: &crate::CollatorEvictionPolicy, peers: &HashMap<PeerId, PeerData>, ) { - let cutoff = match Instant::now().checked_sub(eviction_policy.0) { - None => return, - Some(i) => i, - }; - + let now = Instant::now(); for (peer, peer_data) in peers { - if !peer_data.active_since(cutoff) { - ctx.send_message( - NetworkBridgeMessage::DisconnectPeer(peer.clone(), PeerSet::Collation).into() - ).await; + if peer_data.is_inactive(now, &eviction_policy) { + disconnect_peer(ctx, peer.clone()).await; } } } @@ -834,6 +1145,19 @@ where // same can happen for penalities on timeouts, which we also have. modify_reputation(ctx, *peer_id, COST_REQUEST_TIMED_OUT).await; } + Ok(CollationFetchingResponse::Collation(receipt, _)) + if receipt.descriptor().para_id != *para_id => + { + tracing::debug!( + target: LOG_TARGET, + expected_para_id = ?para_id, + got_para_id = ?receipt.descriptor().para_id, + peer_id = ?peer_id, + "Got wrong para ID for requested collation." + ); + + modify_reputation(ctx, *peer_id, COST_WRONG_PARA).await; + } Ok(CollationFetchingResponse::Collation(receipt, compressed_pov)) => { match compressed_pov.decompress() { Ok(pov) => { @@ -891,25 +1215,39 @@ where mod tests { use super::*; use std::{iter, time::Duration}; + use std::sync::Arc; use futures::{executor, future, Future}; - use polkadot_node_subsystem_util::TimeoutExt; use sp_core::{crypto::Pair, Encode}; + use sp_keystore::SyncCryptoStore; + use sp_keystore::testing::KeyStore as TestKeyStore; + use sp_keyring::Sr25519Keyring; use assert_matches::assert_matches; - use polkadot_primitives::v1::CollatorPair; + use polkadot_primitives::v1::{ + CollatorPair, ValidatorId, ValidatorIndex, CoreState, CandidateDescriptor, + GroupRotationInfo, ScheduledCore, OccupiedCore, GroupIndex, + }; use polkadot_node_primitives::{BlockData, CompressedPoV}; + use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; + use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest}; use polkadot_node_network_protocol::{our_view, ObservedRole, request_response::Requests }; const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(50); + const DECLARE_TIMEOUT: Duration = Duration::from_millis(25); #[derive(Clone)] struct TestState { chain_ids: Vec<ParaId>, relay_parent: Hash, collators: Vec<CollatorPair>, + validators: Vec<Sr25519Keyring>, + validator_public: Vec<ValidatorId>, + validator_groups: Vec<Vec<ValidatorIndex>>, + group_rotation_info: GroupRotationInfo, + cores: Vec<CoreState>, } impl Default for TestState { @@ -924,16 +1262,67 @@ mod tests { .take(4) .collect(); + let validators = vec![ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Eve, + ]; + + let validator_public = validators.iter().map(|k| k.public().into()).collect(); + let validator_groups = vec![ + vec![ValidatorIndex(0), ValidatorIndex(1)], + vec![ValidatorIndex(2), ValidatorIndex(3)], + vec![ValidatorIndex(4)], + ]; + + let group_rotation_info = GroupRotationInfo { + session_start_block: 0, + group_rotation_frequency: 1, + now: 0, + }; + + let cores = vec![ + CoreState::Scheduled(ScheduledCore { + para_id: chain_ids[0], + collator: None, + }), + CoreState::Free, + CoreState::Occupied(OccupiedCore { + next_up_on_available: None, + occupied_since: 0, + time_out_at: 1, + next_up_on_time_out: None, + availability: Default::default(), + group_responsible: GroupIndex(0), + candidate_hash: Default::default(), + candidate_descriptor: { + let mut d = CandidateDescriptor::default(); + d.para_id = chain_ids[1]; + + d + }, + }), + ]; + Self { chain_ids, relay_parent, collators, + validators, + validator_public, + validator_groups, + group_rotation_info, + cores, } } } + type VirtualOverseer = test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>; + struct TestHarness { - virtual_overseer: test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>, + virtual_overseer: VirtualOverseer, } fn test_harness<T: Future<Output = ()>>(test: impl FnOnce(TestHarness) -> T) { @@ -953,9 +1342,19 @@ mod tests { let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); + let keystore = TestKeyStore::new(); + keystore.sr25519_generate_new( + polkadot_primitives::v1::PARACHAIN_KEY_TYPE_ID, + Some(&Sr25519Keyring::Alice.to_seed()), + ).unwrap(); + let subsystem = run( context, - crate::CollatorEvictionPolicy(ACTIVITY_TIMEOUT), + Arc::new(keystore), + crate::CollatorEvictionPolicy { + inactive_collator: ACTIVITY_TIMEOUT, + undeclared: DECLARE_TIMEOUT, + }, Metrics::default(), ); @@ -970,7 +1369,7 @@ mod tests { const TIMEOUT: Duration = Duration::from_millis(200); async fn overseer_send( - overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>, + overseer: &mut VirtualOverseer, msg: CollatorProtocolMessage, ) { tracing::trace!("Sending message:\n{:?}", &msg); @@ -982,7 +1381,7 @@ mod tests { } async fn overseer_recv( - overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>, + overseer: &mut VirtualOverseer, ) -> AllMessages { let msg = overseer_recv_with_timeout(overseer, TIMEOUT) .await @@ -994,7 +1393,7 @@ mod tests { } async fn overseer_recv_with_timeout( - overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>, + overseer: &mut VirtualOverseer, timeout: Duration, ) -> Option<AllMessages> { tracing::trace!("Waiting for message..."); @@ -1004,6 +1403,44 @@ mod tests { .await } + async fn respond_to_core_info_queries( + virtual_overseer: &mut VirtualOverseer, + test_state: &TestState, + ) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::Validators(tx), + )) => { + let _ = tx.send(Ok(test_state.validator_public.clone())); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::ValidatorGroups(tx), + )) => { + let _ = tx.send(Ok(( + test_state.validator_groups.clone(), + test_state.group_rotation_info.clone(), + ))); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::AvailabilityCores(tx), + )) => { + let _ = tx.send(Ok(test_state.cores.clone())); + } + ); + } + // As we receive a relevant advertisement act on it and issue a collation request. #[test] fn act_on_advertisement() { @@ -1024,8 +1461,20 @@ mod tests { ) ).await; + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + let peer_b = PeerId::random(); + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b, + ObservedRole::Full, + ), + ) + ).await; + overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( @@ -1033,6 +1482,7 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::Declare( pair.public(), + test_state.chain_ids[0], pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), ) ) @@ -1046,7 +1496,6 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::AdvertiseCollation( test_state.relay_parent, - test_state.chain_ids[0], ) ) ) @@ -1084,9 +1533,31 @@ mod tests { ) ).await; + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + let peer_b = PeerId::random(); let peer_c = PeerId::random(); + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b, + ObservedRole::Full, + ), + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_c, + ObservedRole::Full, + ), + ) + ).await; + overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( @@ -1094,6 +1565,7 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::Declare( test_state.collators[0].public(), + test_state.chain_ids[0], test_state.collators[0].sign(&protocol_v1::declare_signature_payload(&peer_b)), ), ) @@ -1107,6 +1579,7 @@ mod tests { peer_c.clone(), protocol_v1::CollatorProtocolMessage::Declare( test_state.collators[1].public(), + test_state.chain_ids[0], test_state.collators[1].sign(&protocol_v1::declare_signature_payload(&peer_c)), ), ) @@ -1157,6 +1630,16 @@ mod tests { let peer_b = PeerId::random(); + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b, + ObservedRole::Full, + ), + ) + ).await; + // the peer sends a declare message but sign the wrong payload overseer_send( &mut virtual_overseer, @@ -1164,6 +1647,7 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::Declare( test_state.collators[0].public(), + test_state.chain_ids[0], test_state.collators[0].sign(&[42]), ), )), @@ -1208,9 +1692,31 @@ mod tests { ), ).await; + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + let peer_b = PeerId::random(); let peer_c = PeerId::random(); + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b, + ObservedRole::Full, + ), + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_c, + ObservedRole::Full, + ), + ) + ).await; + overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( @@ -1218,6 +1724,7 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::Declare( test_state.collators[0].public(), + test_state.chain_ids[0], test_state.collators[0].sign(&protocol_v1::declare_signature_payload(&peer_b)), ) ) @@ -1231,6 +1738,7 @@ mod tests { peer_c.clone(), protocol_v1::CollatorProtocolMessage::Declare( test_state.collators[1].public(), + test_state.chain_ids[0], test_state.collators[1].sign(&protocol_v1::declare_signature_payload(&peer_c)), ) ) @@ -1244,7 +1752,6 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::AdvertiseCollation( test_state.relay_parent, - test_state.chain_ids[0], ) ) ) @@ -1270,7 +1777,6 @@ mod tests { peer_c.clone(), protocol_v1::CollatorProtocolMessage::AdvertiseCollation( test_state.relay_parent, - test_state.chain_ids[0], ) ) ) @@ -1390,7 +1896,6 @@ mod tests { } = test_harness; let pair = CollatorPair::generate().0; - tracing::trace!("activating"); let hash_a = test_state.relay_parent; @@ -1401,6 +1906,7 @@ mod tests { ) ).await; + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; let peer_b = PeerId::random(); @@ -1421,6 +1927,7 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::Declare( pair.public(), + test_state.chain_ids[0], pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), ) ) @@ -1434,7 +1941,6 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::AdvertiseCollation( test_state.relay_parent, - test_state.chain_ids[0], ) ) ) @@ -1478,7 +1984,6 @@ mod tests { } = test_harness; let pair = CollatorPair::generate().0; - tracing::trace!("activating"); let hash_a = test_state.relay_parent; let hash_b = Hash::repeat_byte(1); @@ -1491,6 +1996,10 @@ mod tests { ) ).await; + // 3 heads, 3 times. + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; let peer_b = PeerId::random(); @@ -1504,8 +2013,6 @@ mod tests { ) ).await; - Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; - overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( @@ -1513,6 +2020,7 @@ mod tests { peer_b.clone(), protocol_v1::CollatorProtocolMessage::Declare( pair.public(), + test_state.chain_ids[0], pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), ) ) @@ -1527,8 +2035,7 @@ mod tests { NetworkBridgeEvent::PeerMessage( peer_b.clone(), protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - test_state.relay_parent, - test_state.chain_ids[0], + hash_a, ) ) ) @@ -1555,8 +2062,7 @@ mod tests { NetworkBridgeEvent::PeerMessage( peer_b.clone(), protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - hash_b, - test_state.chain_ids[1], + hash_b ) ) ) @@ -1571,7 +2077,34 @@ mod tests { ) ) => { assert_eq!(relay_parent, hash_b); - assert_eq!(para_id, test_state.chain_ids[1]); + assert_eq!(para_id, test_state.chain_ids[0]); + assert_eq!(collator, pair.public()); + }); + + Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + hash_c, + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + ) + ) => { + assert_eq!(relay_parent, hash_c); + assert_eq!(para_id, test_state.chain_ids[0]); assert_eq!(collator, pair.public()); }); @@ -1589,4 +2122,188 @@ mod tests { ) }); } + + #[test] + fn disconnect_if_no_declare() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) + ) + ).await; + + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + + let peer_b = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + peer, + peer_set, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(peer_set, PeerSet::Collation); + } + ) + }) + } + + #[test] + fn disconnect_if_wrong_declare() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + let pair = CollatorPair::generate().0; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) + ) + ).await; + + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + + let peer_b = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + pair.public(), + ParaId::from(69), + pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + peer, + rep, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_UNNEEDED_COLLATOR); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + peer, + peer_set, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(peer_set, PeerSet::Collation); + } + ); + }) + } + + #[test] + fn view_change_clears_old_collators() { + let mut test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + let pair = CollatorPair::generate().0; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) + ) + ).await; + + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + + let peer_b = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + pair.public(), + test_state.chain_ids[0], + pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), + ) + ) + ) + ).await; + + let hash_b = Hash::repeat_byte(69); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(our_view![hash_b]) + ) + ).await; + + test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation(); + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + peer, + peer_set, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(peer_set, PeerSet::Collation); + } + ); + }) + } } diff --git a/polkadot/node/network/gossip-support/src/lib.rs b/polkadot/node/network/gossip-support/src/lib.rs index 1b25e6971e2..3cb1759071b 100644 --- a/polkadot/node/network/gossip-support/src/lib.rs +++ b/polkadot/node/network/gossip-support/src/lib.rs @@ -104,7 +104,7 @@ async fn determine_relevant_authorities( ctx: &mut impl SubsystemContext, relay_parent: Hash, ) -> Result<Vec<AuthorityDiscoveryId>, util::Error> { - let authorities = util::request_authorities_ctx(relay_parent, ctx).await?.await??; + let authorities = util::request_authorities(relay_parent, ctx.sender()).await.await??; Ok(authorities) } @@ -135,7 +135,7 @@ impl State { leaves: impl Iterator<Item = Hash>, ) -> Result<(), util::Error> { for leaf in leaves { - let current_index = util::request_session_index_for_child_ctx(leaf, ctx).await?.await??; + let current_index = util::request_session_index_for_child(leaf, ctx.sender()).await.await??; let maybe_new_session = match self.last_session_index { Some(i) if i <= current_index => None, _ => Some((current_index, leaf)), diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index d6918df09f7..06304859e6c 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -335,11 +335,11 @@ pub mod v1 { /// Declare the intent to advertise collations under a collator ID, attaching a /// signature of the `PeerId` of the node using the given collator ID key. #[codec(index = 0)] - Declare(CollatorId, CollatorSignature), + Declare(CollatorId, ParaId, CollatorSignature), /// Advertise a collation to a validator. Can only be sent once the peer has /// declared that they are a collator with given ID. #[codec(index = 1)] - AdvertiseCollation(Hash, ParaId), + AdvertiseCollation(Hash), /// A collation sent to a validator was seconded. #[codec(index = 4)] CollationSeconded(SignedFullStatement), diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index f51f204b7c1..6110154c497 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -533,7 +533,11 @@ where collator_pair, Metrics::register(registry)?, ), - IsCollator::No => ProtocolSide::Validator(Default::default(),Metrics::register(registry)?), + IsCollator::No => ProtocolSide::Validator { + keystore: keystore.clone(), + eviction_policy: Default::default(), + metrics: Metrics::register(registry)?, + }, }; CollatorProtocolSubsystem::new( side, diff --git a/polkadot/node/subsystem-util/src/lib.rs b/polkadot/node/subsystem-util/src/lib.rs index fbcc6588bc3..7f51cebd88c 100644 --- a/polkadot/node/subsystem-util/src/lib.rs +++ b/polkadot/node/subsystem-util/src/lib.rs @@ -39,13 +39,13 @@ use polkadot_primitives::v1::{ CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption, SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, SessionInfo, - AuthorityDiscoveryId, + AuthorityDiscoveryId, GroupIndex, }; use sp_core::{traits::SpawnNamed, Public}; use sp_application_crypto::AppKey; use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError}; use std::{ - collections::{HashMap, hash_map::Entry}, convert::{TryFrom, TryInto}, marker::Unpin, pin::Pin, task::{Poll, Context}, + collections::{HashMap, hash_map::Entry}, convert::TryFrom, marker::Unpin, pin::Pin, task::{Poll, Context}, time::Duration, fmt, sync::Arc, }; use streamunordered::{StreamUnordered, StreamYield}; @@ -179,98 +179,37 @@ specialize_requests! { fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo; } -/// Request some data from the `RuntimeApi` via a SubsystemContext. -async fn request_from_runtime_ctx<RequestBuilder, Context, Response>( - parent: Hash, - ctx: &mut Context, - request_builder: RequestBuilder, -) -> Result<RuntimeApiReceiver<Response>, Error> -where - RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest, - Context: SubsystemContext, +/// From the given set of validators, find the first key we can sign with, if any. +pub async fn signing_key(validators: &[ValidatorId], keystore: &SyncCryptoStorePtr) + -> Option<ValidatorId> { - let (tx, rx) = oneshot::channel(); - - ctx.send_message( - AllMessages::RuntimeApi(RuntimeApiMessage::Request(parent, request_builder(tx))) - .try_into() - .map_err(|err| Error::SenderConversion(format!("{:?}", err)))?, - ).await; - - Ok(rx) + signing_key_and_index(validators, keystore).await.map(|(k, _)| k) } - -/// Construct specialized request functions for the runtime. -/// -/// These would otherwise get pretty repetitive. -macro_rules! specialize_requests_ctx { - // expand return type name for documentation purposes - (fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => { - specialize_requests_ctx!{ - named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant; - } - }; - - // create a single specialized request function - (named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => { - #[doc = "Request `"] - #[doc = $doc_name] - #[doc = "` from the runtime via a `SubsystemContext`"] - pub async fn $func_name<Context: SubsystemContext>( - parent: Hash, - $( - $param_name: $param_ty, - )* - ctx: &mut Context, - ) -> Result<RuntimeApiReceiver<$return_ty>, Error> { - request_from_runtime_ctx(parent, ctx, |tx| RuntimeApiRequest::$request_variant( - $( $param_name, )* tx - )).await - } - }; - - // recursive decompose - ( - fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident; - $( - fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident; - )+ - ) => { - specialize_requests_ctx!{ - fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ; - } - specialize_requests_ctx!{ - $( - fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ; - )+ - } - }; -} - -specialize_requests_ctx! { - fn request_authorities_ctx() -> Vec<AuthorityDiscoveryId>; Authorities; - fn request_validators_ctx() -> Vec<ValidatorId>; Validators; - fn request_validator_groups_ctx() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups; - fn request_availability_cores_ctx() -> Vec<CoreState>; AvailabilityCores; - fn request_persisted_validation_data_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData; - fn request_session_index_for_child_ctx() -> SessionIndex; SessionIndexForChild; - fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode; - fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability; - fn request_candidate_events_ctx() -> Vec<CandidateEvent>; CandidateEvents; - fn request_session_info_ctx(index: SessionIndex) -> Option<SessionInfo>; SessionInfo; -} - -/// From the given set of validators, find the first key we can sign with, if any. -pub async fn signing_key(validators: &[ValidatorId], keystore: SyncCryptoStorePtr) -> Option<ValidatorId> { - for v in validators.iter() { - if CryptoStore::has_keys(&*keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await { - return Some(v.clone()); +/// From the given set of validators, find the first key we can sign with, if any, and return it +/// along with the validator index. +pub async fn signing_key_and_index(validators: &[ValidatorId], keystore: &SyncCryptoStorePtr) + -> Option<(ValidatorId, ValidatorIndex)> +{ + for (i, v) in validators.iter().enumerate() { + if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), ValidatorId::ID)]).await { + return Some((v.clone(), ValidatorIndex(i as _))); } } None } +/// Find the validator group the given validator index belongs to. +pub fn find_validator_group(groups: &[Vec<ValidatorIndex>], index: ValidatorIndex) + -> Option<GroupIndex> +{ + groups.iter().enumerate().find_map(|(i, g)| if g.contains(&index) { + Some(GroupIndex(i as _)) + } else { + None + }) +} + /// Chooses a random subset of sqrt(v.len()), but at least `min` elements. pub fn choose_random_sqrt_subset<T>(mut v: Vec<T>, min: usize) -> Vec<T> { use rand::seq::SliceRandom as _; @@ -299,7 +238,7 @@ impl Validator { pub async fn new( parent: Hash, keystore: SyncCryptoStorePtr, - sender: &mut JobSender<impl SubsystemSender>, + sender: &mut impl SubsystemSender, ) -> Result<Self, Error> { // Note: request_validators and request_session_index_for_child do not and cannot // run concurrently: they both have a mutable handle to the same sender. @@ -327,13 +266,9 @@ impl Validator { signing_context: SigningContext, keystore: SyncCryptoStorePtr, ) -> Result<Self, Error> { - let key = signing_key(validators, keystore).await.ok_or(Error::NotAValidator)?; - let index = validators - .iter() - .enumerate() - .find(|(_, k)| k == &&key) - .map(|(idx, _)| ValidatorIndex(idx as u32)) - .expect("signing_key would have already returned NotAValidator if the item we're searching for isn't in this list; qed"); + let (key, index) = signing_key_and_index(validators, &keystore) + .await + .ok_or(Error::NotAValidator)?; Ok(Validator { signing_context, diff --git a/polkadot/node/subsystem-util/src/validator_discovery.rs b/polkadot/node/subsystem-util/src/validator_discovery.rs index d851ebd6ebd..57a55a9f1c7 100644 --- a/polkadot/node/subsystem-util/src/validator_discovery.rs +++ b/polkadot/node/subsystem-util/src/validator_discovery.rs @@ -46,7 +46,7 @@ pub async fn connect_to_validators<Context: SubsystemContext>( validators: Vec<ValidatorId>, peer_set: PeerSet, ) -> Result<ConnectionRequest, Error> { - let current_index = crate::request_session_index_for_child_ctx(relay_parent, ctx).await?.await??; + let current_index = crate::request_session_index_for_child(relay_parent, ctx.sender()).await.await??; connect_to_validators_in_session( ctx, relay_parent, @@ -64,11 +64,11 @@ pub async fn connect_to_validators_in_session<Context: SubsystemContext>( peer_set: PeerSet, session_index: SessionIndex, ) -> Result<ConnectionRequest, Error> { - let session_info = crate::request_session_info_ctx( + let session_info = crate::request_session_info( relay_parent, session_index, - ctx, - ).await?.await??; + ctx.sender(), + ).await.await??; let (session_validators, discovery_keys) = match session_info { Some(info) => (info.validators, info.discovery_keys), diff --git a/polkadot/primitives/src/v1.rs b/polkadot/primitives/src/v1.rs index 866e6ff69bc..a9ff43fa64e 100644 --- a/polkadot/primitives/src/v1.rs +++ b/polkadot/primitives/src/v1.rs @@ -613,9 +613,42 @@ impl GroupRotationInfo { let blocks_since_start = self.now.saturating_sub(self.session_start_block); let rotations = blocks_since_start / self.group_rotation_frequency; + // g = c + r mod cores + let idx = (core_index.0 as usize + rotations as usize) % cores; GroupIndex(idx as u32) } + + /// Returns the index of the group assigned to the given core. This does no checking or + /// whether the group index is in-bounds. + /// + /// `core_index` should be less than `cores`, which is capped at u32::max(). + pub fn core_for_group(&self, group_index: GroupIndex, cores: usize) -> CoreIndex { + if self.group_rotation_frequency == 0 { return CoreIndex(group_index.0) } + if cores == 0 { return CoreIndex(0) } + + let cores = sp_std::cmp::min(cores, u32::max_value() as usize); + let blocks_since_start = self.now.saturating_sub(self.session_start_block); + let rotations = blocks_since_start / self.group_rotation_frequency; + let rotations = rotations % cores as u32; + + // g = c + r mod cores + // c = g - r mod cores + // x = x + cores mod cores + // c = (g + cores) - r mod cores + + let idx = (group_index.0 as usize + cores - rotations as usize) % cores; + CoreIndex(idx as u32) + } + + /// Create a new `GroupRotationInfo` with one further rotation applied. + pub fn bump_rotation(&self) -> Self { + GroupRotationInfo { + session_start_block: self.session_start_block, + group_rotation_frequency: self.group_rotation_frequency, + now: self.next_rotation_at(), + } + } } impl<N: Saturating + BaseArithmetic + Copy> GroupRotationInfo<N> { @@ -1107,6 +1140,26 @@ mod tests { assert_eq!(info.last_rotation_at(), 15); } + #[test] + fn group_for_core_is_core_for_group() { + + for cores in 1..=256 { + for rotations in 0..(cores * 2) { + let info = GroupRotationInfo { + session_start_block: 0u32, + now: rotations, + group_rotation_frequency: 1, + }; + + for core in 0..cores { + let group = info.group_for_core(CoreIndex(core), cores as usize); + assert_eq!(info.core_for_group(group, cores as usize).0, core); + } + } + } + + } + #[test] fn collator_signature_payload_is_valid() { // if this fails, collator signature verification code has to be updated. diff --git a/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md b/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md index e0968e7fd06..052729acec4 100644 --- a/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/node/collators/collator-protocol.md @@ -55,8 +55,6 @@ As with most other subsystems, we track the active leaves set by following `Acti For the purposes of actually distributing a collation, we need to be connected to the validators who are interested in collations on that `ParaId` at this point in time. We assume that there is a discovery API for connecting to a set of validators. -> TODO: design & expose the discovery API not just for connecting to such peers but also to determine which of our current peers are validators. - As seen in the [Scheduler Module][SCH] of the runtime, validator groups are fixed for an entire session and their rotations across cores are predictable. Collators will want to do these things when attempting to distribute collations at a given relay-parent: * Determine which core the para collated-on is assigned to. * Determine the group on that core and the next group on that core. @@ -100,7 +98,7 @@ digraph G { } ``` -When peers connect to us, they can `Declare` that they represent a collator with given public key. Once they've declared that, and we checked their signature, they can begin to send advertisements of collations. The peers should not send us any advertisements for collations that are on a relay-parent outside of our view. +When peers connect to us, they can `Declare` that they represent a collator with given public key and intend to collate on a specific para ID. Once they've declared that, and we checked their signature, they can begin to send advertisements of collations. The peers should not send us any advertisements for collations that are on a relay-parent outside of our view or for a para outside of the one they've declared. The protocol tracks advertisements received and the source of the advertisement. The advertisement source is the `PeerId` of the peer who sent the message. We accept one advertisement per collator per source per relay-parent. diff --git a/polkadot/roadmap/implementers-guide/src/types/network.md b/polkadot/roadmap/implementers-guide/src/types/network.md index d7404ae446e..0dc9d05699e 100644 --- a/polkadot/roadmap/implementers-guide/src/types/network.md +++ b/polkadot/roadmap/implementers-guide/src/types/network.md @@ -102,16 +102,12 @@ enum StatementDistributionV1Message { ```rust enum CollatorProtocolV1Message { - /// Declare the intent to advertise collations under a collator ID, attaching a + /// Declare the intent to advertise collations under a collator ID and `Para`, attaching a /// signature of the `PeerId` of the node using the given collator ID key. - Declare(CollatorId, CollatorSignature), + Declare(CollatorId, ParaId, CollatorSignature), /// Advertise a collation to a validator. Can only be sent once the peer has /// declared that they are a collator with given ID. - AdvertiseCollation(Hash, ParaId), - /// Request the advertised collation at that relay-parent. - RequestCollation(RequestId, Hash, ParaId), - /// A requested collation. - Collation(RequestId, CandidateReceipt, CompressedPoV), + AdvertiseCollation(Hash), /// A collation sent to a validator was seconded. CollationSeconded(SignedFullStatement), } -- GitLab