Unverified Commit 7ed4cb9c authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Add new Runtime API messages and make runtime API request fallible (#1485)

* polkadot-subsystem: update runtime API message types

* update all networking subsystems to use fallible runtime APIs

* fix bitfield-signing and make it use new runtime APIs

* port candidate-backing to handle runtime API errors and new types

* remove old runtime API messages

* remove unused imports

* fix grumbles

* fix backing tests
parent ea74f81d
Pipeline #101941 passed with stages
in 20 minutes and 57 seconds
......@@ -4606,6 +4606,7 @@ dependencies = [
"bitvec",
"derive_more 0.99.9",
"futures 0.3.5",
"log 0.4.8",
"polkadot-erasure-coding",
"polkadot-node-primitives",
"polkadot-node-subsystem",
......
......@@ -17,6 +17,7 @@ erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure
statement-table = { package = "polkadot-statement-table", path = "../../../statement-table" }
derive_more = "0.99.9"
bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
log = "0.4.8"
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......
......@@ -32,7 +32,7 @@ use polkadot_primitives::v1::{
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
ValidatorIndex, SigningContext, PoV, OmittedValidationData,
CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
CandidateCommitments,
CandidateCommitments, CoreState, CoreIndex,
};
use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport,
......@@ -44,12 +44,14 @@ use polkadot_subsystem::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, NewBackedCandidate, PoVDistributionMessage, ProvisionableData,
ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, ValidationFailed,
RuntimeApiRequest,
},
util::{
self,
request_signing_context,
request_session_index_for_child,
request_validator_groups,
request_validators,
request_from_runtime,
Validator,
},
};
......@@ -680,19 +682,56 @@ impl util::JobTrait for CandidateBackingJob {
mut tx_from: mpsc::Sender<Self::FromJob>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let (validators, roster, signing_context) = futures::try_join!(
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(e) => {
log::warn!(
target: "candidate_backing",
"Failed to fetch runtime API data for job: {:?}",
e,
);
// We can't do candidate validation work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(());
}
}
}
}
let (validators, groups, session_index, cores) = futures::try_join!(
request_validators(parent, &mut tx_from).await?,
request_validator_groups(parent, &mut tx_from).await?,
request_signing_context(parent, &mut tx_from).await?,
request_session_index_for_child(parent, &mut tx_from).await?,
request_from_runtime(
parent,
&mut tx_from,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await?,
)?;
let validators = try_runtime_api!(validators);
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let session_index = try_runtime_api!(session_index);
let cores = try_runtime_api!(cores);
let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = Validator::construct(&validators, signing_context, keystore.clone())?;
let mut groups = HashMap::new();
for assignment in roster.scheduled {
if let Some(g) = roster.validator_groups.get(assignment.group_idx.0 as usize) {
groups.insert(assignment.para_id, g.clone());
let n_cores = cores.len();
for (idx, core) in cores.into_iter().enumerate() {
// Ignore prospective assignments on occupied cores for the time being.
if let CoreState::Scheduled(scheduled) = core {
let core_index = CoreIndex(idx as _);
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
groups.insert(scheduled.para_id, g.clone());
}
}
}
......@@ -779,12 +818,12 @@ mod tests {
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use polkadot_primitives::v1::{
AssignmentKind, BlockData, CandidateCommitments, CollatorId, CoreAssignment, CoreIndex,
LocalValidationData, GlobalValidationData, GroupIndex, HeadData,
ValidatorPair, ValidityAttestation,
ScheduledCore, BlockData, CandidateCommitments, CollatorId,
LocalValidationData, GlobalValidationData, HeadData,
ValidatorPair, ValidityAttestation, GroupRotationInfo,
};
use polkadot_subsystem::{
messages::{RuntimeApiRequest, SchedulerRoster},
messages::RuntimeApiRequest,
ActiveLeavesUpdate, FromOverseer, OverseerSignal,
};
use sp_keyring::Sr25519Keyring;
......@@ -801,7 +840,8 @@ mod tests {
validator_public: Vec<ValidatorId>,
global_validation_data: GlobalValidationData,
local_validation_data: LocalValidationData,
roster: SchedulerRoster,
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
availability_cores: Vec<CoreState>,
head_data: HashMap<ParaId, HeadData>,
signing_context: SigningContext,
relay_parent: Hash,
......@@ -830,53 +870,39 @@ mod tests {
let validator_public = validator_pubkeys(&validators);
let chain_a_assignment = CoreAssignment {
core: CoreIndex::from(0),
para_id: chain_a,
kind: AssignmentKind::Parachain,
group_idx: GroupIndex::from(0),
};
let chain_b_assignment = CoreAssignment {
core: CoreIndex::from(1),
para_id: chain_b,
kind: AssignmentKind::Parachain,
group_idx: GroupIndex::from(1),
let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]];
let group_rotation_info = GroupRotationInfo {
session_start_block: 0,
group_rotation_frequency: 100,
now: 1,
};
let thread_collator: CollatorId = Sr25519Keyring::Two.public().into();
let availability_cores = vec![
CoreState::Scheduled(ScheduledCore {
para_id: chain_a,
collator: None,
}),
CoreState::Scheduled(ScheduledCore {
para_id: chain_b,
collator: None,
}),
CoreState::Scheduled(ScheduledCore {
para_id: thread_a,
collator: Some(thread_collator.clone()),
}),
];
let thread_a_assignment = CoreAssignment {
core: CoreIndex::from(2),
para_id: thread_a,
kind: AssignmentKind::Parathread(thread_collator.clone(), 0),
group_idx: GroupIndex::from(2),
};
let mut head_data = HashMap::new();
head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
let validator_groups = vec![vec![2, 0, 3], vec![1], vec![4]];
let relay_parent = Hash::from([5; 32]);
let parent_hash_1 = [1; 32].into();
let roster = SchedulerRoster {
validator_groups,
scheduled: vec![
chain_a_assignment,
chain_b_assignment,
thread_a_assignment,
],
upcoming: vec![],
availability_cores: vec![],
};
let signing_context = SigningContext {
session_index: 1,
parent_hash: parent_hash_1,
parent_hash: relay_parent,
};
let mut head_data = HashMap::new();
head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
let relay_parent = Hash::from([5; 32]);
let local_validation_data = LocalValidationData {
parent_head: HeadData(vec![7, 8, 9]),
balance: Default::default(),
......@@ -895,7 +921,8 @@ mod tests {
keystore,
validators,
validator_public,
roster,
validator_groups: (validator_groups, group_rotation_info),
availability_cores,
head_data,
local_validation_data,
global_validation_data,
......@@ -984,7 +1011,7 @@ mod tests {
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::Validators(tx))
) if parent == test_state.relay_parent => {
tx.send(test_state.validator_public.clone()).unwrap();
tx.send(Ok(test_state.validator_public.clone())).unwrap();
}
);
......@@ -994,19 +1021,29 @@ mod tests {
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::ValidatorGroups(tx))
) if parent == test_state.relay_parent => {
tx.send(test_state.roster.clone()).unwrap();
tx.send(Ok(test_state.validator_groups.clone())).unwrap();
}
);
// Check that subsystem job issues a request for the signing context.
// Check that subsystem job issues a request for the session index for child.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::SigningContext(tx))
RuntimeApiMessage::Request(parent, RuntimeApiRequest::SessionIndexForChild(tx))
) if parent == test_state.relay_parent => {
tx.send(test_state.signing_context.clone()).unwrap();
tx.send(Ok(test_state.signing_context.session_index)).unwrap();
}
);
// Check that subsystem job issues a request for the availability cores.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
}
// Test that a `CandidateBackingMessage::Second` issues validation work
......
......@@ -30,7 +30,7 @@ use polkadot_node_subsystem::{
},
util::{self, JobManager, JobTrait, ToJobTrait, Validator},
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreOccupied, Hash};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash};
use std::{convert::TryFrom, pin::Pin, time::Duration};
use wasm_timer::{Delay, Instant};
......@@ -130,8 +130,7 @@ pub enum Error {
// this function exists mainly to collect a bunch of potential error points into one.
async fn get_core_availability(
relay_parent: Hash,
idx: usize,
core: Option<CoreOccupied>,
core: CoreState,
sender: &mpsc::Sender<FromJob>,
) -> Result<bool, Error> {
use messages::{
......@@ -144,18 +143,23 @@ async fn get_core_availability(
// we have to (cheaply) clone this sender so we can mutate it to actually send anything
let mut sender = sender.clone();
// REVIEW: is it safe to ignore parathreads here, or do they also figure in the availability mapping?
if let Some(CoreOccupied::Parachain) = core {
if let CoreState::Occupied(core) = core {
let (tx, rx) = oneshot::channel();
sender
.send(RuntimeApi(Request(
relay_parent,
CandidatePendingAvailability(idx.into(), tx),
CandidatePendingAvailability(core.para_id, tx),
)))
.await?;
let committed_candidate_receipt = match rx.await? {
Some(ccr) => ccr,
None => return Ok(false),
Ok(Some(ccr)) => ccr,
Ok(None) => return Ok(false),
Err(e) => {
// Don't take down the node on runtime API errors.
log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", e);
return Ok(false);
}
};
let (tx, rx) = oneshot::channel();
sender
......@@ -171,35 +175,41 @@ async fn get_core_availability(
// the way this function works is not intuitive:
//
// - get the scheduler roster so we have a list of cores, in order.
// - get the availability cores so we have a list of cores, in order.
// - for each occupied core, fetch `candidate_pending_availability` from runtime
// - from there, we can get the `CandidateDescriptor`
// - from there, we can send a `AvailabilityStore::QueryPoV` and set the indexed bit to 1 if it returns Some(_)
async fn construct_availability_bitfield(
relay_parent: Hash,
sender: &mut mpsc::Sender<FromJob>,
) -> Result<AvailabilityBitfield, Error> {
) -> Result<Option<AvailabilityBitfield>, Error> {
use futures::lock::Mutex;
use messages::RuntimeApiRequest::ValidatorGroups;
use messages::RuntimeApiRequest::AvailabilityCores;
use FromJob::RuntimeApi;
use RuntimeApiMessage::Request;
// request the validator groups so we can get the scheduler roster
// request the availability cores metadata from runtime.
let (tx, rx) = oneshot::channel();
sender
.send(RuntimeApi(Request(relay_parent, ValidatorGroups(tx))))
.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx))))
.await?;
// we now need sender to be immutable so we can copy the reference to multiple concurrent closures
let sender = &*sender;
// wait for the scheduler roster
let scheduler_roster = rx.await?;
// wait for the cores
let availability_cores = match rx.await? {
Ok(a) => a,
Err(e) => {
log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", e);
return Ok(None);
}
};
// prepare outputs
let out =
Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; scheduler_roster.availability_cores.len()));
Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len()));
// in principle, we know that we never want concurrent access to the _same_ bit within the vec;
// we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding
// any need to ever wait to lock this mutex.
......@@ -213,9 +223,9 @@ async fn construct_availability_bitfield(
//
// In principle, this work is all concurrent, not parallel. In practice, we can't guarantee it, which is why
// we need the mutexes and explicit references above.
stream::iter(scheduler_roster.availability_cores.into_iter().enumerate())
stream::iter(availability_cores.into_iter().enumerate())
.for_each_concurrent(None, |(idx, core)| async move {
let availability = match get_core_availability(relay_parent, idx, core, sender).await {
let availability = match get_core_availability(relay_parent, core, sender).await {
Ok(availability) => availability,
Err(err) => {
errs_ref.lock().await.push(err);
......@@ -228,7 +238,7 @@ async fn construct_availability_bitfield(
let errs = errs.into_inner();
if errs.is_empty() {
Ok(out.into_inner().into())
Ok(Some(out.into_inner().into()))
} else {
Err(errs.into())
}
......@@ -264,7 +274,13 @@ impl JobTrait for BitfieldSigningJob {
// wait a bit before doing anything else
Delay::new_at(wait_until).await?;
let bitfield = construct_availability_bitfield(relay_parent, &mut sender).await?;
let bitfield =
match construct_availability_bitfield(relay_parent, &mut sender).await?
{
None => return Ok(()),
Some(b) => b,
};
let signed_bitfield = validator.sign(bitfield);
// make an anonymous scope to contain some use statements to simplify creating the outbound message
......
......@@ -161,17 +161,23 @@ impl BitfieldDistribution {
for relay_parent in activated {
trace!(target: "bitd", "Start {:?}", relay_parent);
// query basic system parameters once
let (validator_set, signing_context) =
query_basics(&mut ctx, relay_parent).await?;
let _ = state.per_relay_parent.insert(
relay_parent,
PerRelayParentData {
signing_context,
validator_set,
..Default::default()
},
);
if let Some((validator_set, signing_context)) =
query_basics(&mut ctx, relay_parent).await?
{
// If our runtime API fails, we don't take down the node,
// but we might alter peers' reputations erroneously as a result
// of not having the correct bookkeeping. If we have lost a race
// with state pruning, it is unlikely that peers will be sending
// us anything to do with this relay-parent anyway.
let _ = state.per_relay_parent.insert(
relay_parent,
PerRelayParentData {
signing_context,
validator_set,
..Default::default()
},
);
}
}
for relay_parent in deactivated {
......@@ -562,12 +568,12 @@ where
async fn query_basics<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> SubsystemResult<(Vec<ValidatorId>, SigningContext)>
) -> SubsystemResult<Option<(Vec<ValidatorId>, SigningContext)>>
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
let (validators_tx, validators_rx) = oneshot::channel();
let (signing_tx, signing_rx) = oneshot::channel();
let (session_tx, session_rx) = oneshot::channel();
let query_validators = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent.clone(),
......@@ -576,13 +582,22 @@ where
let query_signing = AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent.clone(),
RuntimeApiRequest::SigningContext(signing_tx),
RuntimeApiRequest::SessionIndexForChild(session_tx),
));
ctx.send_messages(std::iter::once(query_validators).chain(std::iter::once(query_signing)))
.await?;
Ok((validators_rx.await?, signing_rx.await?))
match (validators_rx.await?, session_rx.await?) {
(Ok(v), Ok(s)) => Ok(Some((
v,
SigningContext { parent_hash: relay_parent, session_index: s },
))),
(Err(e), _) | (_, Err(e)) => {
warn!(target: "bitd", "Failed to fetch basics from runtime API: {:?}", e);
Ok(None)
}
}
}
#[cfg(test)]
......
......@@ -115,10 +115,27 @@ async fn handle_signal(
RuntimeApiRequest::Validators(vals_tx),
))).await?;
let n_validators = match vals_rx.await? {
Ok(v) => v.len(),
Err(e) => {
log::warn!(target: "pov_distribution",
"Error fetching validators from runtime API for active leaf: {:?}",
e
);
// Not adding bookkeeping here might make us behave funny, but we
// shouldn't take down the node on spurious runtime API errors.
//
// and this is "behave funny" as in be bad at our job, but not in any
// slashable or security-related way.
continue;
}
};
state.relay_parent_state.insert(relay_parent, BlockBasedState {
known: HashMap::new(),
fetching: HashMap::new(),
n_validators: vals_rx.await?.len(),
n_validators: n_validators,
});
}
......
......@@ -847,17 +847,37 @@ async fn run(
let (session_tx, session_rx) = oneshot::channel();
let val_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Validators(val_tx)),
RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(val_tx),
),
);
let session_message = AllMessages::RuntimeApi(
RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::SigningContext(session_tx)),
RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::SessionIndexForChild(session_tx),
),
);
ctx.send_messages(
std::iter::once(val_message).chain(std::iter::once(session_message))
).await?;
(val_rx.await?, session_rx.await?.session_index)
match (val_rx.await?, session_rx.await?) {
(Ok(v), Ok(s)) => (v, s),
(Err(e), _) | (_, Err(e)) => {
log::warn!(
target: "statement_distribution",
"Failed to fetch runtime API data for active leaf: {:?}",
e,
);
// Lacking this bookkeeping might make us behave funny, although
// not in any slashable way. But we shouldn't take down the node
// on what are likely spurious runtime API errors.
continue;
}
}
};
active_heads.entry(relay_parent)
......
......@@ -25,11 +25,13 @@
use futures::channel::{mpsc, oneshot};
use polkadot_primitives::v1::{
BlockNumber, Hash, CommittedCandidateReceipt,
Hash, CommittedCandidateReceipt,
CandidateReceipt, PoV, ErasureChunk, BackedCandidate, Id as ParaId,
SignedAvailabilityBitfield, SigningContext, ValidatorId, ValidationCode, ValidatorIndex,
CoreAssignment, CoreOccupied, HeadData, CandidateDescriptor,
ValidatorSignature, OmittedValidationData, AvailableData,
SignedAvailabilityBitfield, ValidatorId, ValidationCode, ValidatorIndex,
CoreAssignment, CoreOccupied, CandidateDescriptor,
ValidatorSignature, OmittedValidationData, AvailableData, GroupRotationInfo,
CoreState, LocalValidationData, GlobalValidationData, OccupiedCoreAssumption,
CandidateEvent, SessionIndex,
};
use polkadot_node_primitives::{
MisbehaviorReport, SignedFullStatement, View, ProtocolId, ValidationResult,
......@@ -286,23 +288,39 @@ pub struct SchedulerRoster {
pub availability_cores: Vec<Option<CoreOccupied>>,
}
/// A description of an error causing the runtime API request to be unservable.
#[derive(Debug, Clone)]
pub struct RuntimeApiError(String);
/// A sender for the result of a runtime API request.
pub type RuntimeApiSender<T> = oneshot::Sender<Result<T, RuntimeApiError>>;
/// A request to the Runtime API subsystem.
#[derive(Debug)]
pub enum RuntimeApiRequest {
/// Get the current validator set.
Validators(oneshot::Sender<Vec<ValidatorId>>),
/// Get the assignments of validators to cores.
ValidatorGroups(oneshot::Sender<SchedulerRoster>),
/// Get a signing context for bitfields and statements.
SigningContext(oneshot::Sender<SigningContext>),
/// Get the validation code for a specific para, assuming execution under given block number, and
/// an optional block number representing an intermediate parablock executed in the context of
/// that block.
ValidationCode(ParaId, BlockNumber, Option<BlockNumber>, oneshot::Sender<ValidationCode>),
/// Get head data for a specific para.
HeadData(ParaId, oneshot::Sender<HeadData>),
Validators(RuntimeApiSender<Vec<ValidatorId>>),
/// Get the validator groups and group rotation info.
ValidatorGroups(RuntimeApiSender<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo)>),
/// Get information on all availability cores.
AvailabilityCores(RuntimeApiSender<Vec<CoreState>>),
/// Get the global validation data.
GlobalValidationData(RuntimeApiSender<GlobalValidationData>),
/// Get the local validation data for a particular para, taking the given
/// `OccupiedCoreAssumption`, which will inform on how the validation data should be computed
/// if the para currently occupies a core.
LocalValidationData(
ParaId,
OccupiedCoreAssumption,
RuntimeApiSender<Option<LocalValidationData>>,
),
/// Get the session index that a child of the block will have.
SessionIndexForChild(RuntimeApiSender<SessionIndex>),
/// Get a the candidate pending availability for a particular parachain by parachain / core index
CandidatePendingAvailability(ParaId, oneshot::Sender<Option<CommittedCandidateReceipt>>),
CandidatePendingAvailability(ParaId, RuntimeApiSender<Option<CommittedCandidateReceipt>>),
/// Get all