Unverified Commit 734eda87 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

past-session validator discovery APIs (#2009)



* guide: fix formatting for SessionInfo module

* primitives: SessionInfo type

* punt on approval keys

* ah, revert the type alias

* session info runtime module skeleton

* update the guide

* runtime/configuration: sync with the guide

* runtime/configuration: setters for newly added fields

* runtime/configuration: set codec indexes

* runtime/configuration: update test

* primitives: fix SessionInfo definition

* runtime/session_info: initial impl

* runtime/session_info: use initializer for session handling (wip)

* runtime/session_info: mock authority discovery trait

* guide: update the initializer's order

* runtime/session_info: tests skeleton

* runtime/session_info: store n_delay_tranches in Configuration

* runtime/session_info: punt on approval keys

* runtime/session_info: add some basic tests

* Update primitives/src/v1.rs

* small fixes

* remove codec index annotation on structs

* fix off-by-one error

* validator_discovery: accept a session index

* runtime: replace validator_discovery api with session_info

* Update runtime/parachains/src/session_info.rs

Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>

* runtime/session_info: add a comment about missing entries

* runtime/session_info: define the keys

* util: expose connect_to_past_session_validators

* util: allow session_info requests for jobs

* runtime-api: add mock test for session_info

* collator-protocol: add session_index to test state

* util: fix error message for runtime error

* fix compilation

* fix tests after merge with master

Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>
parent 9a32ab1d
Pipeline #115139 passed with stages
in 21 minutes and 47 seconds
......@@ -134,7 +134,7 @@ fn make_runtime_api_request<Client>(
Request::CandidatePendingAvailability(para, sender) =>
query!(candidate_pending_availability(para), sender),
Request::CandidateEvents(sender) => query!(candidate_events(), sender),
Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender),
Request::SessionInfo(index, sender) => query!(session_info(index), sender),
Request::DmqContents(id, sender) => query!(dmq_contents(id), sender),
Request::InboundHrmpChannelsContents(id, sender) => query!(inbound_hrmp_channels_contents(id), sender),
}
......@@ -201,8 +201,8 @@ mod tests {
use polkadot_primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId, InboundDownwardMessage,
BlockNumber, InboundHrmpMessage,
CommittedCandidateReceipt, CandidateEvent, InboundDownwardMessage,
BlockNumber, InboundHrmpMessage, SessionInfo,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor;
......@@ -216,6 +216,7 @@ mod tests {
availability_cores: Vec<CoreState>,
validation_data: HashMap<ParaId, ValidationData>,
session_index_for_child: SessionIndex,
session_info: HashMap<SessionIndex, SessionInfo>,
validation_code: HashMap<ParaId, ValidationCode>,
historical_validation_code: HashMap<ParaId, Vec<(BlockNumber, ValidationCode)>>,
validation_outputs_results: HashMap<ParaId, bool>,
......@@ -289,6 +290,10 @@ mod tests {
self.session_index_for_child.clone()
}
fn session_info(&self, index: SessionIndex) -> Option<SessionInfo> {
self.session_info.get(&index).cloned()
}
fn validation_code(
&self,
para: ParaId,
......@@ -321,10 +326,6 @@ mod tests {
self.candidate_events.clone()
}
fn validator_discovery(ids: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>> {
vec![None; ids.len()]
}
fn dmq_contents(
&self,
recipient: ParaId,
......@@ -569,6 +570,33 @@ mod tests {
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_session_info() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let session_index = 1;
runtime_api.session_info.insert(session_index, Default::default());
let runtime_api = Arc::new(runtime_api);
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::SessionInfo(session_index, tx))
}).await;
assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default()));
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn requests_validation_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
......
......@@ -738,6 +738,7 @@ mod tests {
use polkadot_primitives::v1::{
BlockData, CandidateDescriptor, CollatorPair, ScheduledCore,
ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
SessionIndex, SessionInfo,
};
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}};
use polkadot_node_subsystem_util::TimeoutExt;
......@@ -776,6 +777,7 @@ mod tests {
relay_parent: Hash,
availability_core: CoreState,
our_collator_pair: CollatorPair,
session_index: SessionIndex,
}
fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec<ValidatorId> {
......@@ -832,6 +834,7 @@ mod tests {
relay_parent,
availability_core,
our_collator_pair,
session_index: 1,
}
}
}
......@@ -841,6 +844,10 @@ mod tests {
&self.validator_groups.0[0]
}
fn current_session_index(&self) -> SessionIndex {
self.session_index
}
fn current_group_validator_peer_ids(&self) -> Vec<PeerId> {
self.current_group_validator_indices().iter().map(|i| self.validator_peer_id[*i as usize].clone()).collect()
}
......@@ -870,20 +877,6 @@ mod tests {
.collect()
}
fn next_group_validator_ids(&self) -> Vec<ValidatorId> {
self.next_group_validator_indices()
.iter()
.map(|i| self.validator_public[*i as usize].clone())
.collect()
}
/// Returns the unique count of validators in the current and next group.
fn current_and_next_group_unique_validator_count(&self) -> usize {
let mut indices = self.next_group_validator_indices().iter().collect::<HashSet<_>>();
indices.extend(self.current_group_validator_indices());
indices.len()
}
/// Generate a new relay parent and inform the subsystem about the new view.
///
/// If `merge_views == true` it means the subsystem will be informed that we working on the old `relay_parent`
......@@ -1090,20 +1083,33 @@ mod tests {
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(validators.len(), test_state.current_and_next_group_unique_validator_count());
tx.send(Ok(test_state.current_session_index())).unwrap();
}
);
let current_validators = test_state.current_group_validator_ids();
let next_validators = test_state.next_group_validator_ids();
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionInfo(index, tx),
)) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(index, test_state.current_session_index());
assert!(validators.iter().all(|v| current_validators.contains(&v) || next_validators.contains(&v)));
let validators = test_state.current_group_validator_ids();
let current_discovery_keys = test_state.current_group_validator_authority_ids();
let next_discovery_keys = test_state.next_group_validator_authority_ids();
let current_validators = test_state.current_group_validator_authority_ids();
let next_validators = test_state.next_group_validator_authority_ids();
let discovery_keys = [&current_discovery_keys[..], &next_discovery_keys[..]].concat();
tx.send(Ok(current_validators.into_iter().chain(next_validators).map(Some).collect())).unwrap();
tx.send(Ok(Some(SessionInfo {
validators,
discovery_keys,
..Default::default()
}))).unwrap();
}
);
......
......@@ -60,16 +60,6 @@ enum Error {
Prometheus(#[from] prometheus::PrometheusError),
}
impl From<util::validator_discovery::Error> for Error {
fn from(me: util::validator_discovery::Error) -> Self {
match me {
util::validator_discovery::Error::Subsystem(s) => Error::Subsystem(s),
util::validator_discovery::Error::RuntimeApi(ra) => Error::RuntimeApi(ra),
util::validator_discovery::Error::Oneshot(c) => Error::Oneshot(c),
}
}
}
type Result<T> = std::result::Result<T, Error>;
/// What side of the collator protocol is being engaged
......
......@@ -27,8 +27,6 @@ pub enum Error {
#[error(transparent)]
Runtime(#[from] polkadot_subsystem::errors::RuntimeApiError),
#[error(transparent)]
ValidatorDiscovery(#[from] polkadot_node_subsystem_util::validator_discovery::Error),
#[error(transparent)]
Util(#[from] polkadot_node_subsystem_util::Error),
}
......
......@@ -11,7 +11,7 @@ use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlockData, CoreState, GroupRotationInfo, Id as ParaId,
ScheduledCore, ValidatorIndex,
ScheduledCore, ValidatorIndex, SessionIndex, SessionInfo,
};
use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest};
use polkadot_node_subsystem_test_helpers as test_helpers;
......@@ -37,8 +37,10 @@ fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec<AuthorityDiscoveryI
val_ids.iter().map(|v| v.public().into()).collect()
}
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>;
struct TestHarness {
virtual_overseer: test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
virtual_overseer: VirtualOverseer,
}
fn test_harness<T: Future<Output = ()>>(
......@@ -75,7 +77,7 @@ fn test_harness<T: Future<Output = ()>>(
const TIMEOUT: Duration = Duration::from_millis(100);
async fn overseer_send(
overseer: &mut test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
overseer: &mut VirtualOverseer,
msg: PoVDistributionMessage,
) {
trace!("Sending message:\n{:?}", &msg);
......@@ -87,7 +89,7 @@ async fn overseer_send(
}
async fn overseer_recv(
overseer: &mut test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
overseer: &mut VirtualOverseer,
) -> AllMessages {
let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
.await
......@@ -99,7 +101,7 @@ async fn overseer_recv(
}
async fn overseer_recv_with_timeout(
overseer: &mut test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
overseer: &mut VirtualOverseer,
timeout: Duration,
) -> Option<AllMessages> {
trace!("Waiting for message...");
......@@ -110,7 +112,7 @@ async fn overseer_recv_with_timeout(
}
async fn overseer_signal(
overseer: &mut test_helpers::TestSubsystemContextHandle<PoVDistributionMessage>,
overseer: &mut VirtualOverseer,
signal: OverseerSignal,
) {
overseer
......@@ -130,6 +132,7 @@ struct TestState {
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
relay_parent: Hash,
availability_cores: Vec<CoreState>,
session_index: SessionIndex,
}
impl Default for TestState {
......@@ -184,10 +187,56 @@ impl Default for TestState {
validator_groups,
relay_parent,
availability_cores,
session_index: 1,
}
}
}
async fn test_validator_discovery(
virtual_overseer: &mut VirtualOverseer,
expected_relay_parent: Hash,
session_index: SessionIndex,
validator_ids: &[ValidatorId],
discovery_ids: &[AuthorityDiscoveryId],
validator_group: &[ValidatorIndex],
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(tx),
)) => {
assert_eq!(relay_parent, expected_relay_parent);
tx.send(Ok(session_index)).unwrap();
}
);
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionInfo(index, tx),
)) => {
assert_eq!(relay_parent, expected_relay_parent);
assert_eq!(index, session_index);
let validators = validator_group.iter()
.map(|idx| validator_ids[*idx as usize].clone())
.collect();
let discovery_keys = validator_group.iter()
.map(|idx| discovery_ids[*idx as usize].clone())
.collect();
tx.send(Ok(Some(SessionInfo {
validators,
discovery_keys,
..Default::default()
}))).unwrap();
}
);
}
#[test]
fn ask_validators_for_povs() {
let test_state = TestState::default();
......@@ -271,25 +320,14 @@ fn ask_validators_for_povs() {
}
);
// obtain the validator_id to authority_id mapping
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
)) => {
assert_eq!(relay_parent, current);
assert_eq!(validators.len(), 3);
assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
let result = vec![
Some(test_state.validator_authority_id[2].clone()),
Some(test_state.validator_authority_id[0].clone()),
Some(test_state.validator_authority_id[4].clone()),
];
tx.send(Ok(result)).unwrap();
}
);
test_validator_discovery(
&mut virtual_overseer,
current,
test_state.session_index,
&test_state.validator_public,
&test_state.validator_authority_id,
&test_state.validator_groups.0[0],
).await;
// We now should connect to our validator group.
assert_matches!(
......@@ -448,24 +486,14 @@ fn ask_validators_for_povs() {
);
// obtain the validator_id to authority_id mapping
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators, tx),
)) => {
assert_eq!(relay_parent, next_leaf);
assert_eq!(validators.len(), 3);
assert!(validators.iter().all(|v| test_state.validator_public.contains(&v)));
let result = vec![
Some(test_state.validator_authority_id[2].clone()),
Some(test_state.validator_authority_id[0].clone()),
Some(test_state.validator_authority_id[4].clone()),
];
tx.send(Ok(result)).unwrap();
}
);
test_validator_discovery(
&mut virtual_overseer,
next_leaf,
test_state.session_index,
&test_state.validator_public,
&test_state.validator_authority_id,
&test_state.validator_groups.0[0],
).await;
// We now should connect to our validator group.
assert_matches!(
......@@ -716,7 +744,7 @@ fn we_inform_peers_with_same_view_we_are_awaiting() {
RuntimeApiRequest::ValidatorGroups(tx)
)) => {
assert_eq!(relay_parent, hash_a);
tx.send(Ok(validator_groups)).unwrap();
tx.send(Ok(validator_groups.clone())).unwrap();
}
);
......@@ -731,25 +759,14 @@ fn we_inform_peers_with_same_view_we_are_awaiting() {
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators_res, tx),
)) => {
assert_eq!(relay_parent, hash_a);
assert_eq!(validators_res.len(), 3);
assert!(validators_res.iter().all(|v| validators.contains(&v)));
let result = vec![
Some(validator_authority_id[2].clone()),
Some(validator_authority_id[0].clone()),
Some(validator_authority_id[4].clone()),
];
tx.send(Ok(result)).unwrap();
}
);
test_validator_discovery(
&mut handle,
hash_a,
1,
&validators,
&validator_authority_id,
&validator_groups.0[0],
).await;
assert_matches!(
handle.recv().await,
......
......@@ -37,6 +37,7 @@ use polkadot_primitives::v1::{
CandidateEvent, CommittedCandidateReceipt, CoreState, EncodeAs, PersistedValidationData,
GroupRotationInfo, Hash, Id as ParaId, ValidationData, OccupiedCoreAssumption,
SessionIndex, Signed, SigningContext, ValidationCode, ValidatorId, ValidatorIndex,
SessionInfo,
};
use sp_core::{
traits::SpawnNamed,
......@@ -193,6 +194,7 @@ specialize_requests! {
fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
}
/// Request some data from the `RuntimeApi` via a SubsystemContext.
......@@ -274,6 +276,7 @@ specialize_requests_ctx! {
fn request_validation_code_ctx(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
fn request_candidate_pending_availability_ctx(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
fn request_candidate_events_ctx() -> Vec<CandidateEvent>; CandidateEvents;
fn request_session_info_ctx(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
}
/// From the given set of validators, find the first key we can sign with, if any.
......
......@@ -20,34 +20,20 @@ use std::collections::HashMap;
use std::pin::Pin;
use futures::{
channel::{mpsc, oneshot},
channel::mpsc,
task::{Poll, self},
stream,
};
use streamunordered::{StreamUnordered, StreamYield};
use thiserror::Error;
use polkadot_node_subsystem::{
errors::RuntimeApiError, SubsystemError,
messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeMessage},
errors::RuntimeApiError,
messages::{AllMessages, NetworkBridgeMessage},
SubsystemContext,
};
use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId};
use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId, SessionIndex};
use sc_network::PeerId;
/// Error when making a request to connect to validators.
#[derive(Debug, Error)]
pub enum Error {
/// Attempted to send or receive on a oneshot channel which had been canceled
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
/// A subsystem error.
#[error(transparent)]
Subsystem(#[from] SubsystemError),
/// An error in the Runtime API.
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
}
use crate::Error;
/// Utility function to make it easier to connect to validators.
pub async fn connect_to_validators<Context: SubsystemContext>(
......@@ -55,17 +41,42 @@ pub async fn connect_to_validators<Context: SubsystemContext>(
relay_parent: Hash,
validators: Vec<ValidatorId>,
) -> Result<ConnectionRequest, Error> {
// ValidatorId -> AuthorityDiscoveryId
let (tx, rx) = oneshot::channel();
let current_index = crate::request_session_index_for_child_ctx(relay_parent, ctx).await?.await??;
connect_to_past_session_validators(ctx, relay_parent, validators, current_index).await
}
ctx.send_message(AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
/// Utility function to make it easier to connect to validators in the past sessions.
pub async fn connect_to_past_session_validators<Context: SubsystemContext>(
ctx: &mut Context,
relay_parent: Hash,
validators: Vec<ValidatorId>,
session_index: SessionIndex,
) -> Result<ConnectionRequest, Error> {
let session_info = crate::request_session_info_ctx(
relay_parent,
RuntimeApiRequest::ValidatorDiscovery(validators.clone(), tx),
)
)).await;
session_index,
ctx,
).await?.await??;
let (session_validators, discovery_keys) = match session_info {
Some(info) => (info.validators, info.discovery_keys),
None => return Err(RuntimeApiError::from(
format!("No SessionInfo found for the index {}", session_index)
).into()),
};
let id_to_index = session_validators.iter()
.zip(0usize..)
.collect::<HashMap<_, _>>();
// We assume the same ordering in authorities as in validators so we can do an index search
let maybe_authorities: Vec<_> = validators.iter()
.map(|id| {
let validator_index = id_to_index.get(&id);
validator_index.and_then(|i| discovery_keys.get(*i).cloned())
})
.collect();
let maybe_authorities = rx.await??;
let authorities: Vec<_> = maybe_authorities.iter()
.cloned()
.filter_map(|id| id)
......
......@@ -31,7 +31,7 @@ use polkadot_node_primitives::{
CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult,
};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, AvailableData, BackedCandidate, BlockNumber,
AuthorityDiscoveryId, AvailableData, BackedCandidate, BlockNumber, SessionInfo,
Header as BlockHeader, CandidateDescriptor, CandidateEvent, CandidateReceipt,
CollatorId, CommittedCandidateReceipt, CoreState, ErasureChunk,
GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption,
......@@ -434,14 +434,8 @@ pub enum RuntimeApiRequest {
/// Get all events concerning candidates (backing, inclusion, time-out) in the parent of
/// the block in whose state this request is executed.
CandidateEvents(RuntimeApiSender<Vec<CandidateEvent>>),
/// Get the `AuthorityDiscoveryId`s corresponding to the given `ValidatorId`s.
/// Currently this request is limited to validators in the current session.
///
/// Returns `None` for validators not found in the current session.
ValidatorDiscovery(
Vec<ValidatorId>,
RuntimeApiSender<Vec<Option<AuthorityDiscoveryId>>>,
),
/// Get the session info for the given session, if stored.
SessionInfo(SessionIndex, RuntimeApiSender<Option<SessionInfo>>),
/// Get all the pending inbound messages in the downward message queue for a para.
DmqContents(
ParaId,
......
......@@ -25,6 +25,7 @@ use primitives::RuntimeDebug;
use runtime_primitives::traits::AppVerify;
use inherents::InherentIdentifier;
use sp_arithmetic::traits::{BaseArithmetic, Saturating, Zero};
use application_crypto::KeyTypeId;
pub use runtime_primitives::traits::{BlakeTwo256, Hash as HashT};
......@@ -57,6 +58,34 @@ pub use sp_authority_discovery::AuthorityId as AuthorityDiscoveryId;
/// Unique identifier for the Inclusion Inherent
pub const INCLUSION_INHERENT_IDENTIFIER: InherentIdentifier = *b"inclusn0";
/// The key type ID for a parachain approval voting key.
pub const APPROVAL_KEY_TYPE_ID: KeyTypeId = KeyTypeId(*b"aprv");
mod approval_app {
use application_crypto::{app_crypto, sr25519};
app_crypto!(sr25519, super::APPROVAL_KEY_TYPE_ID);
}