Unverified Commit 12e02573 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Cadidate selection check assignment (#2042)



* Cadidate selection check assignment

* Apply suggestions from code review

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Review fixes

* Punish collator for wrong announcements

* Update node/core/candidate-selection/src/lib.rs

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
parent b8042dde
Pipeline #115691 passed with stages
in 28 minutes and 24 seconds
......@@ -4960,6 +4960,7 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-core",
"sp-keystore",
"thiserror",
"tracing",
"tracing-futures",
......
......@@ -9,6 +9,9 @@ futures = "0.3.8"
tracing = "0.1.22"
tracing-futures = "0.2.4"
thiserror = "1.0.22"
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
......
......@@ -23,22 +23,28 @@ use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem::{
errors::ChainApiError,
messages::{
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
RuntimeApiRequest,
},
};
use polkadot_node_subsystem_util::{
self as util, delegated_subsystem, JobTrait, FromJobCommand, metrics::{self, prometheus},
self as util, request_from_runtime, request_validator_groups, delegated_subsystem,
JobTrait, FromJobCommand, Validator, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{
CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV,
};
use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV};
use std::pin::Pin;
use thiserror::Error;
const LOG_TARGET: &'static str = "candidate_selection";
struct CandidateSelectionJob {
assignment: ParaId,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
metrics: Metrics,
......@@ -57,30 +63,92 @@ enum Error {
ChainApi(#[from] ChainApiError),
}
macro_rules! try_runtime_api {
($x: expr) => {
match $x {
Ok(x) => x,
Err(e) => {
tracing::warn!(
target: LOG_TARGET,
err = ?e,
"Failed to fetch runtime API data for job",
);
// We can't do candidate selection work if we don't have the
// requisite runtime API data. But these errors should not take
// down the node.
return Ok(());
}
}
}
}
impl JobTrait for CandidateSelectionJob {
type ToJob = CandidateSelectionMessage;
type Error = Error;
type RunArgs = ();
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;
const NAME: &'static str = "CandidateSelectionJob";
#[tracing::instrument(skip(_relay_parent, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
_relay_parent: Hash,
_run_args: Self::RunArgs,
relay_parent: Hash,
keystore: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
sender: mpsc::Sender<FromJobCommand>,
mut sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
CandidateSelectionJob::new(metrics, sender, receiver).run_loop().await
let (groups, cores) = futures::try_join!(
try_runtime_api!(request_validator_groups(relay_parent, &mut sender).await),
try_runtime_api!(request_from_runtime(
relay_parent,
&mut sender,
|tx| RuntimeApiRequest::AvailabilityCores(tx),
).await),
)?;
let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
let cores = try_runtime_api!(cores);
let n_cores = cores.len();
let validator = match Validator::new(relay_parent, keystore.clone(), sender.clone()).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};
let mut assignment = None;
for (idx, core) in cores.into_iter().enumerate() {
// Ignore prospective assignments on occupied cores for the time being.
if let CoreState::Scheduled(scheduled) = core {
let core_index = CoreIndex(idx as _);
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if g.contains(&validator.index()) {
assignment = Some(scheduled.para_id);
break;
}
}
}
}
let assignment = match assignment {
Some(assignment) => assignment,
None => return Ok(()),
};
CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop().await
}.boxed()
}
}
impl CandidateSelectionJob {
pub fn new(
assignment: ParaId,
metrics: Metrics,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
......@@ -89,6 +157,7 @@ impl CandidateSelectionJob {
sender,
receiver,
metrics,
assignment,
seconded_candidate: None,
}
}
......@@ -128,6 +197,23 @@ impl CandidateSelectionJob {
) {
let _timer = self.metrics.time_handle_collation();
if self.assignment != para_id {
tracing::info!(
target: LOG_TARGET,
"Collator {:?} sent a collation outside of our assignment {:?}",
collator_id,
para_id,
);
if let Err(err) = forward_invalidity_note(&collator_id, &mut self.sender).await {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to forward invalidity note",
);
}
return;
}
if self.seconded_candidate.is_none() {
let (candidate_receipt, pov) =
match get_collation(
......@@ -342,7 +428,7 @@ impl metrics::Metrics for Metrics {
}
}
delegated_subsystem!(CandidateSelectionJob((), Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem);
delegated_subsystem!(CandidateSelectionJob(SyncCryptoStorePtr, Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem);
#[cfg(test)]
mod tests {
......@@ -365,6 +451,7 @@ mod tests {
let (to_job_tx, to_job_rx) = mpsc::channel(0);
let (from_job_tx, from_job_rx) = mpsc::channel(0);
let mut job = CandidateSelectionJob {
assignment: 123.into(),
sender: from_job_tx,
receiver: to_job_rx,
metrics: Default::default(),
......
......@@ -367,7 +367,7 @@ where
),
candidate_selection: CandidateSelectionSubsystem::new(
spawner.clone(),
(),
keystore.clone(),
Metrics::register(registry)?,
),
candidate_validation: CandidateValidationSubsystem::new(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment