Unverified Commit 54e18e65 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Collation protocol: stricter validators (#2810)



* guide: declare one para as a collator

* add ParaId to Declare messages and clean up

* fix build

* fix the testerinos

* begin adding keystore to collator-protocol

* remove request_x_ctx

* add core_for_group

* add bump_rotation

* add some more helpers to subsystem-util

* change signing_key API to take ref

* determine current and next para assignments

* disconnect collators who are not on current or next para

* add collator peer count metric

* notes for later

* some fixes

* add data & keystore to test state

* add a test utility for answering runtime API requests

* fix existing collator tests

* add new tests

* remove sc_keystore

* update cargo lock

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 8c7a6d9f
Pipeline #132612 failed with stages
in 32 minutes and 14 seconds
......@@ -5594,6 +5594,7 @@ dependencies = [
"polkadot-primitives",
"sp-core",
"sp-keyring",
"sp-keystore",
"sp-runtime",
"thiserror",
"tracing",
......
......@@ -32,8 +32,8 @@ use polkadot_node_subsystem::{
FromOverseer, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemResult,
};
use polkadot_node_subsystem_util::{
request_availability_cores_ctx, request_persisted_validation_data_ctx,
request_validators_ctx, request_validation_code_ctx,
request_availability_cores, request_persisted_validation_data,
request_validators, request_validation_code,
metrics::{self, prometheus},
};
use polkadot_primitives::v1::{
......@@ -198,8 +198,8 @@ async fn handle_new_activations<Context: SubsystemContext>(
let _relay_parent_timer = metrics.time_new_activations_relay_parent();
let (availability_cores, validators) = join!(
request_availability_cores_ctx(relay_parent, ctx).await?,
request_validators_ctx(relay_parent, ctx).await?,
request_availability_cores(relay_parent, ctx.sender()).await,
request_validators(relay_parent, ctx.sender()).await,
);
let availability_cores = availability_cores??;
......@@ -248,13 +248,13 @@ async fn handle_new_activations<Context: SubsystemContext>(
// within the subtask loop, because we have only a single mutable handle to the
// context, so the work can't really be distributed
let validation_data = match request_persisted_validation_data_ctx(
let validation_data = match request_persisted_validation_data(
relay_parent,
scheduled_core.para_id,
assumption,
ctx,
ctx.sender(),
)
.await?
.await
.await??
{
Some(v) => v,
......@@ -271,13 +271,13 @@ async fn handle_new_activations<Context: SubsystemContext>(
}
};
let validation_code = match request_validation_code_ctx(
let validation_code = match request_validation_code(
relay_parent,
scheduled_core.para_id,
assumption,
ctx,
ctx.sender(),
)
.await?
.await
.await??
{
Some(v) => v,
......
......@@ -294,7 +294,7 @@ impl CandidateSelectionJob {
).await {
Ok(response) => response,
Err(err) => {
tracing::warn!(
tracing::debug!(
target: LOG_TARGET,
err = ?err,
"failed to get collation from collator protocol subsystem",
......
......@@ -109,13 +109,9 @@ impl From<SubsystemError> for Error {
/// Receive a response from a runtime request and convert errors.
pub(crate) async fn recv_runtime<V>(
r: std::result::Result<
oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
UtilError,
>,
r: oneshot::Receiver<std::result::Result<V, RuntimeApiError>>,
) -> std::result::Result<V, Error> {
r.map_err(Error::UtilRequest)?
.await
r.await
.map_err(Error::RuntimeRequestCanceled)?
.map_err(Error::RuntimeRequest)
}
......
......@@ -32,7 +32,7 @@ use futures::{
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem_util::request_availability_cores_ctx;
use polkadot_node_subsystem_util::request_availability_cores;
use polkadot_primitives::v1::{CandidateHash, CoreState, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
......@@ -235,7 +235,7 @@ async fn query_occupied_cores<Context>(
where
Context: SubsystemContext,
{
let cores = recv_runtime(request_availability_cores_ctx(relay_parent, ctx).await).await?;
let cores = recv_runtime(request_availability_cores(relay_parent, ctx.sender()).await).await?;
Ok(cores
.into_iter()
......
......@@ -23,7 +23,7 @@ use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_subsystem_util::{
request_session_index_for_child_ctx, request_session_info_ctx,
request_session_index_for_child, request_session_info,
};
use polkadot_primitives::v1::{GroupIndex, Hash, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
use polkadot_subsystem::SubsystemContext;
......@@ -93,7 +93,7 @@ impl Runtime {
Some(index) => Ok(*index),
None => {
let index =
recv_runtime(request_session_index_for_child_ctx(parent, ctx).await)
recv_runtime(request_session_index_for_child(parent, ctx.sender()).await)
.await?;
self.session_index_cache.put(parent, index);
Ok(index)
......@@ -117,7 +117,7 @@ impl Runtime {
/// Get `ExtendedSessionInfo` by session index.
///
/// `request_session_info_ctx` still requires the parent to be passed in, so we take the parent
/// `request_session_info` still requires the parent to be passed in, so we take the parent
/// in addition to the `SessionIndex`.
pub async fn get_session_info_by_index<'a, Context>(
&'a mut self,
......@@ -130,7 +130,7 @@ impl Runtime {
{
if !self.session_info_cache.contains(&session_index) {
let session_info =
recv_runtime(request_session_info_ctx(parent, session_index, ctx).await)
recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
.await?
.ok_or(Error::NoSuchSession(session_index))?;
let validator_info = self.get_validator_info(&session_info).await?;
......
......@@ -24,7 +24,7 @@ use sp_core::crypto::Public;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_node_subsystem_util::{
request_session_index_for_child_ctx, request_session_info_ctx,
request_session_index_for_child, request_session_info,
};
use polkadot_primitives::v1::SessionInfo as GlobalSessionInfo;
use polkadot_primitives::v1::{
......@@ -132,7 +132,7 @@ impl SessionCache {
Some(index) => *index,
None => {
let index =
recv_runtime(request_session_index_for_child_ctx(parent, ctx).await)
recv_runtime(request_session_index_for_child(parent, ctx.sender()).await)
.await?;
self.session_index_cache.put(parent, index);
index
......@@ -210,7 +210,7 @@ impl SessionCache {
/// Query needed information from runtime.
///
/// We need to pass in the relay parent for our call to `request_session_info_ctx`. We should
/// We need to pass in the relay parent for our call to `request_session_info`. We should
/// actually don't need that: I suppose it is used for internal caching based on relay parents,
/// which we don't use here. It should not do any harm though.
///
......@@ -229,7 +229,7 @@ impl SessionCache {
discovery_keys,
mut validator_groups,
..
} = recv_runtime(request_session_info_ctx(parent, session_index, ctx).await)
} = recv_runtime(request_session_info(parent, session_index, ctx.sender()).await)
.await?
.ok_or(Error::NoSuchSession(session_index))?;
......
......@@ -111,7 +111,7 @@ fn check_fetch_retry() {
})
.collect();
state.valid_chunks.retain(|(ch, _)| valid_candidate_hashes.contains(ch));
for (_, v) in state.chunks.iter_mut() {
// This should still succeed as cores are still pending availability on next block.
......
......@@ -49,7 +49,7 @@ use polkadot_node_network_protocol::{
request::RequestError,
},
};
use polkadot_node_subsystem_util::request_session_info_ctx;
use polkadot_node_subsystem_util::request_session_info;
use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1};
mod error;
......@@ -697,11 +697,11 @@ async fn handle_recover(
}
let _span = span.child("not-cached");
let session_info = request_session_info_ctx(
let session_info = request_session_info(
state.live_block.1,
session_index,
ctx,
).await?.await.map_err(error::Error::CanceledSessionInfo)??;
ctx.sender(),
).await.await.map_err(error::Error::CanceledSessionInfo)??;
let _span = span.child("session-info-ctx-received");
match session_info {
......
......@@ -1794,6 +1794,7 @@ mod tests {
let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare(
Sr25519Keyring::Alice.public().into(),
Default::default(),
Default::default(),
);
let message = protocol_v1::CollationProtocol::CollatorProtocol(
......@@ -2064,6 +2065,7 @@ mod tests {
let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare(
Sr25519Keyring::Alice.public().into(),
Default::default(),
Default::default(),
);
let message = protocol_v1::CollationProtocol::CollatorProtocol(
......
......@@ -11,8 +11,9 @@ futures-timer = "3"
thiserror = "1.0.23"
tracing = "0.1.25"
sp-core = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
......
......@@ -37,9 +37,9 @@ use polkadot_node_network_protocol::{
};
use polkadot_node_subsystem_util::{
validator_discovery,
request_validators_ctx,
request_validator_groups_ctx,
request_availability_cores_ctx,
request_validators,
request_validator_groups,
request_availability_cores,
metrics::{self, prometheus},
};
use polkadot_node_primitives::{SignedFullStatement, Statement, PoV, CompressedPoV};
......@@ -380,7 +380,7 @@ async fn determine_core(
para_id: ParaId,
relay_parent: Hash,
) -> Result<Option<(CoreIndex, usize)>> {
let cores = request_availability_cores_ctx(relay_parent, ctx).await?.await??;
let cores = request_availability_cores(relay_parent, ctx.sender()).await.await??;
for (idx, core) in cores.iter().enumerate() {
if let CoreState::Scheduled(occupied) = core {
......@@ -403,7 +403,7 @@ async fn determine_our_validators(
cores: usize,
relay_parent: Hash,
) -> Result<(HashSet<ValidatorId>, HashSet<ValidatorId>)> {
let groups = request_validator_groups_ctx(relay_parent, ctx).await?;
let groups = request_validator_groups(relay_parent, ctx.sender()).await;
let groups = groups.await??;
......@@ -413,7 +413,7 @@ async fn determine_our_validators(
let next_group_idx = (current_group_index.0 as usize + 1) % groups.0.len();
let next_validators = groups.0.get(next_group_idx).map(|v| v.as_slice()).unwrap_or_default();
let validators = request_validators_ctx(relay_parent, ctx).await?.await??;
let validators = request_validators(relay_parent, ctx.sender()).await.await??;
let current_validators = current_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
let next_validators = next_validators.iter().map(|i| validators[i.0 as usize].clone()).collect();
......@@ -430,17 +430,20 @@ async fn declare(
) {
let declare_signature_payload = protocol_v1::declare_signature_payload(&state.local_peer_id);
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(
state.collator_pair.public(),
state.collator_pair.sign(&declare_signature_payload),
);
if let Some(para_id) = state.collating_on {
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(
state.collator_pair.public(),
para_id,
state.collator_pair.sign(&declare_signature_payload),
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![peer],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![peer],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
}
}
/// Issue a connection request to a set of validators and
......@@ -476,11 +479,6 @@ async fn advertise_collation(
relay_parent: Hash,
peer: PeerId,
) {
let collating_on = match state.collating_on {
Some(collating_on) => collating_on,
None => return,
};
let should_advertise = state.our_validators_groups
.get(&relay_parent)
.map(|g| g.should_advertise_to(&peer))
......@@ -518,7 +516,6 @@ async fn advertise_collation(
let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
relay_parent,
collating_on,
);
ctx.send_message(AllMessages::NetworkBridge(
......@@ -705,14 +702,14 @@ async fn handle_incoming_peer_message(
use protocol_v1::CollatorProtocolMessage::*;
match msg {
Declare(_, _) => {
Declare(_, _, _) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
"Declare message is not expected on the collator side of the protocol",
);
}
AdvertiseCollation(_, _) => {
AdvertiseCollation(_) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
......@@ -772,6 +769,12 @@ async fn handle_validator_connected(
validator_id: ValidatorId,
relay_parent: Hash,
) {
tracing::trace!(
target: LOG_TARGET,
?validator_id,
"Connected to requested validator"
);
let not_declared = state.declared_at.insert(peer_id.clone());
if not_declared {
......@@ -1382,7 +1385,11 @@ mod tests {
}
/// Check that the next received message is a `Declare` message.
async fn expect_declare_msg(virtual_overseer: &mut VirtualOverseer, test_state: &TestState, peer: &PeerId) {
async fn expect_declare_msg(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
peer: &PeerId,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
......@@ -1394,12 +1401,17 @@ mod tests {
assert_eq!(to[0], *peer);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Declare(collator_id, signature) => {
protocol_v1::CollatorProtocolMessage::Declare(
collator_id,
para_id,
signature,
) => {
assert!(signature.verify(
&*protocol_v1::declare_signature_payload(&test_state.local_peer_id),
&collator_id),
);
assert_eq!(collator_id, test_state.collator_pair.public());
assert_eq!(para_id, test_state.para_id);
}
);
}
......@@ -1409,7 +1421,6 @@ mod tests {
/// Check that the next received message is a collation advertisment message.
async fn expect_advertise_collation_msg(
virtual_overseer: &mut VirtualOverseer,
test_state: &TestState,
peer: &PeerId,
expected_relay_parent: Hash,
) {
......@@ -1426,10 +1437,8 @@ mod tests {
wire_message,
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
relay_parent,
collating_on,
) => {
assert_eq!(relay_parent, expected_relay_parent);
assert_eq!(collating_on, test_state.para_id);
}
);
}
......@@ -1478,7 +1487,7 @@ mod tests {
// The peer is interested in a leaf that we have a collation for;
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Request a collation.
let (tx, rx) = oneshot::channel();
......@@ -1556,7 +1565,7 @@ mod tests {
)
).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
});
}
......@@ -1619,13 +1628,13 @@ mod tests {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
expect_declare_msg(&mut virtual_overseer, &test_state, &peer2).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
// The other validator announces that it changed its view.
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
// After changing the view we should receive the advertisement
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
})
}
......@@ -1669,10 +1678,10 @@ mod tests {
connected.try_send((validator_id2, peer2.clone())).unwrap();
send_peer_view_change(&mut virtual_overseer, &peer, vec![old_relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, old_relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, old_relay_parent).await;
send_peer_view_change(&mut virtual_overseer, &peer2, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer2, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer2, test_state.relay_parent).await;
})
}
......@@ -1698,7 +1707,7 @@ mod tests {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer).await;
send_peer_view_change(&mut virtual_overseer, &peer, vec![test_state.relay_parent]).await;
expect_advertise_collation_msg(&mut virtual_overseer, &test_state, &peer, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &peer, test_state.relay_parent).await;
// Disconnect and reconnect directly
disconnect_peer(&mut virtual_overseer, peer.clone()).await;
......
......@@ -25,6 +25,8 @@ use std::time::Duration;
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use thiserror::Error;
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange as Rep};
use polkadot_node_subsystem_util::{self as util, metrics::prometheus};
use polkadot_primitives::v1::CollatorPair;
......@@ -57,18 +59,33 @@ type Result<T> = std::result::Result<T, Error>;
/// A collator eviction policy - how fast to evict collators which are inactive.
#[derive(Debug, Clone, Copy)]
pub struct CollatorEvictionPolicy(pub Duration);
pub struct CollatorEvictionPolicy {
/// How fast to evict collators who are inactive.
pub inactive_collator: Duration,
/// How fast to evict peers which don't declare their para.
pub undeclared: Duration,
}
impl Default for CollatorEvictionPolicy {
fn default() -> Self {
CollatorEvictionPolicy(Duration::from_secs(24))
CollatorEvictionPolicy {
inactive_collator: Duration::from_secs(24),
undeclared: Duration::from_secs(1),
}
}
}
/// What side of the collator protocol is being engaged
pub enum ProtocolSide {
/// Validators operate on the relay chain.
Validator(CollatorEvictionPolicy, validator_side::Metrics),
Validator {
/// The keystore holding validator keys.
keystore: SyncCryptoStorePtr,
/// An eviction policy for inactive peers or validators.
eviction_policy: CollatorEvictionPolicy,
/// Prometheus metrics for validators.
metrics: validator_side::Metrics,
},
/// Collators operate on a parachain.
Collator(PeerId, CollatorPair, collator_side::Metrics),
}
......@@ -95,9 +112,10 @@ impl CollatorProtocolSubsystem {
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
match self.protocol_side {
ProtocolSide::Validator(policy, metrics) => validator_side::run(
ProtocolSide::Validator { keystore, eviction_policy, metrics } => validator_side::run(
ctx,
policy,
keystore,
eviction_policy,
metrics,
).await,
ProtocolSide::Collator(local_peer_id, collator_pair, metrics) => collator_side::run(
......
......@@ -104,7 +104,7 @@ async fn determine_relevant_authorities(
ctx: &mut impl SubsystemContext,
relay_parent: Hash,
) -> Result<Vec<AuthorityDiscoveryId>, util::Error> {
let authorities = util::request_authorities_ctx(relay_parent, ctx).await?.await??;
let authorities = util::request_authorities(relay_parent, ctx.sender()).await.await??;
Ok(authorities)
}
......@@ -135,7 +135,7 @@ impl State {
leaves: impl Iterator<Item = Hash>,
) -> Result<(), util::Error> {
for leaf in leaves {
let current_index = util::request_session_index_for_child_ctx(leaf, ctx).await?.await??;
let current_index = util::request_session_index_for_child(leaf, ctx.sender()).await.await??;
let maybe_new_session = match self.last_session_index {
Some(i) if i <= current_index => None,
_ => Some((current_index, leaf)),
......
......@@ -335,11 +335,11 @@ pub mod v1 {
/// Declare the intent to advertise collations under a collator ID, attaching a
/// signature of the `PeerId` of the node using the given collator ID key.
#[codec(index = 0)]
Declare(CollatorId, CollatorSignature),
Declare(CollatorId, ParaId, CollatorSignature),
/// Advertise a collation to a validator. Can only be sent once the peer has
/// declared that they are a collator with given ID.
#[codec(index = 1)]
AdvertiseCollation(Hash, ParaId),
AdvertiseCollation(Hash),
/// A collation sent to a validator was seconded.
#[codec(index = 4)]
CollationSeconded(SignedFullStatement),
......
......@@ -533,7 +533,11 @@ where
collator_pair,
Metrics::register(registry)?,
),
IsCollator::No => ProtocolSide::Validator(Default::default(),Metrics::register(registry)?),
IsCollator::No => ProtocolSide::Validator {
keystore: keystore.clone(),
eviction_policy: Default::default(),
metrics: Metrics::register(registry)?,
},
};
CollatorProtocolSubsystem::new(
side,
......
......@@ -39,13 +39,13 @@ use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex, SessionInfo,
AuthorityDiscoveryId,
AuthorityDiscoveryId, GroupIndex,
};
use sp_core::{traits::SpawnNamed, Public};
use sp_application_crypto::AppKey;
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, Error as KeystoreError};
use std::{
collections::{HashMap, hash_map::Entry}, convert::{TryFrom, TryInto}, marker::Unpin, pin::Pin, task::{Poll, Context},
collections::{HashMap, hash_map::Entry}, convert::TryFrom, marker::Unpin, pin::Pin, task::{Poll, Context},
time::Duration, fmt, sync::Arc,
};
use streamunordered::{StreamUnordered, StreamYield};
......@@ -179,98 +179,37 @@ specialize_requests! {
fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
}
/// Request some data from the `RuntimeApi` via a SubsystemContext.
async fn request_from_runtime_ctx<RequestBuilder, Context, Response>(