Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The provisioner is responsible for assembling a relay chain block
//! from a set of available parachain candidates of its choice.
#![deny(missing_docs, unused_crate_dependencies, unused_results)]
use futures::{
channel::{mpsc, oneshot},
prelude::*,
};
use polkadot_node_subsystem::{
errors::ChainApiError,
messages::{
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
},
};
use polkadot_node_subsystem_util::{
self as util, delegated_subsystem, JobTrait, FromJobCommand, metrics::{self, prometheus},
use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV};
const LOG_TARGET: &'static str = "candidate_selection";
struct CandidateSelectionJob {
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
metrics: Metrics,
seconded_candidate: Option<CollatorId>,
}
#[error(transparent)]
Sending(#[from] mpsc::SendError),
#[error(transparent)]
Util(#[from] util::Error),
#[error(transparent)]
OneshotRecv(#[from] oneshot::Canceled),
#[error(transparent)]
ChainApi(#[from] ChainApiError),
}
impl JobTrait for CandidateSelectionJob {
type Error = Error;
type RunArgs = ();
type Metrics = Metrics;
const NAME: &'static str = "CandidateSelectionJob";
#[tracing::instrument(skip(_relay_parent, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
_relay_parent: Hash,
_run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
CandidateSelectionJob::new(metrics, sender, receiver).run_loop().await
}.boxed()
}
}
impl CandidateSelectionJob {
pub fn new(
metrics: Metrics,
sender: mpsc::Sender<FromJobCommand>,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
) -> Self {
Self {
sender,
receiver,
metrics,
seconded_candidate: None,
}
}
async fn run_loop(&mut self) -> Result<(), Error> {
loop {
match self.receiver.next().await {
Some(CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator_id,
)) => {
self.handle_collation(relay_parent, para_id, collator_id).await;
Some(CandidateSelectionMessage::Invalid(
_,
candidate_receipt,
)) => {
self.handle_invalid(candidate_receipt).await;
}
}
}
// closing the sender here means that we don't deadlock in tests
self.sender.close_channel();
Ok(())
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn handle_collation(
&mut self,
relay_parent: Hash,
para_id: ParaId,
collator_id: CollatorId,
) {
let _timer = self.metrics.time_handle_collation();
if self.seconded_candidate.is_none() {
let (candidate_receipt, pov) =
match get_collation(
relay_parent,
para_id,
collator_id.clone(),
self.sender.clone(),
).await {
Ok(response) => response,
Err(err) => {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to get collation from collator protocol subsystem",
);
return;
}
};
match second_candidate(
relay_parent,
candidate_receipt,
pov,
&mut self.sender,
&self.metrics,
)
.await
{
Err(err) => tracing::warn!(target: LOG_TARGET, err = ?err, "failed to second a candidate"),
Ok(()) => self.seconded_candidate = Some(collator_id),
}
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
async fn handle_invalid(&mut self, candidate_receipt: CandidateReceipt) {
let _timer = self.metrics.time_handle_invalid();
let received_from = match &self.seconded_candidate {
Some(peer) => peer,
None => {
tracing::warn!(
target: LOG_TARGET,
"received invalidity notice for a candidate we don't remember seconding"
);
return;
}
};
tracing::info!(
target: LOG_TARGET,
candidate_receipt = ?candidate_receipt,
"received invalidity note for candidate",
if let Err(err) = forward_invalidity_note(received_from, &mut self.sender).await {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"failed to forward invalidity note",
self.metrics.on_invalid_selection(result);
}
}
// get a collation from the Collator Protocol subsystem
//
// note that this gets an owned clone of the sender; that's becuase unlike `forward_invalidity_note`, it's expected to take a while longer
#[tracing::instrument(level = "trace", skip(sender), fields(subsystem = LOG_TARGET))]
async fn get_collation(
relay_parent: Hash,
para_id: ParaId,
mut sender: mpsc::Sender<FromJobCommand>,
) -> Result<(CandidateReceipt, PoV), Error> {
let (tx, rx) = oneshot::channel();
sender
.send(AllMessages::from(CollatorProtocolMessage::FetchCollation(
para_id,
tx,
.await?;
rx.await.map_err(Into::into)
}
async fn second_candidate(
relay_parent: Hash,
candidate_receipt: CandidateReceipt,
pov: PoV,
sender: &mut mpsc::Sender<FromJobCommand>,
metrics: &Metrics,
) -> Result<(), Error> {
match sender
.send(AllMessages::from(CandidateBackingMessage::Second(
relay_parent,
candidate_receipt,
pov,
.await
{
Err(err) => {
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to send a seconding message");
metrics.on_second(Err(()));
Err(err.into())
}
Ok(_) => {
metrics.on_second(Ok(()));
Ok(())
}
}
}
async fn forward_invalidity_note(
received_from: &CollatorId,
sender: &mut mpsc::Sender<FromJobCommand>,
) -> Result<(), Error> {
sender
.send(AllMessages::from(CollatorProtocolMessage::ReportCollator(
received_from.clone(),
.await
.map_err(Into::into)
}
#[derive(Clone)]
struct MetricsInner {
seconds: prometheus::CounterVec<prometheus::U64>,
invalid_selections: prometheus::CounterVec<prometheus::U64>,
handle_collation: prometheus::Histogram,
handle_invalid: prometheus::Histogram,
/// Candidate selection metrics.
#[derive(Default, Clone)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_second(&self, result: Result<(), ()>) {
if let Some(metrics) = &self.0 {
let label = if result.is_ok() { "succeeded" } else { "failed" };
metrics.seconds.with_label_values(&[label]).inc();
}
}
fn on_invalid_selection(&self, result: Result<(), ()>) {
if let Some(metrics) = &self.0 {
let label = if result.is_ok() { "succeeded" } else { "failed" };
metrics.invalid_selections.with_label_values(&[label]).inc();
}
}
/// Provide a timer for `handle_collation` which observes on drop.
fn time_handle_collation(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.handle_collation.start_timer())
}
/// Provide a timer for `handle_invalid` which observes on drop.
fn time_handle_invalid(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.handle_invalid.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
seconds: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"candidate_selection_seconds_total",
"Number of Candidate Selection subsystem seconding events.",
&["success"],
)?,
registry,
)?,
invalid_selections: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"candidate_selection_invalid_selections_total",
"Number of Candidate Selection subsystem seconding selections which proved to be invalid.",
),
&["success"],
)?,
registry,
)?,
handle_collation: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_candidate_selection_handle_collation",
"Time spent within `candidate_selection::handle_collation`",
)
)?,
registry,
)?,
handle_invalid: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_candidate_selection:handle_invalid",
"Time spent within `candidate_selection::handle_invalid`",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
delegated_subsystem!(CandidateSelectionJob((), Metrics) <- CandidateSelectionMessage as CandidateSelectionSubsystem);
#[cfg(test)]
mod tests {
use super::*;
use futures::lock::Mutex;
use polkadot_primitives::v1::BlockData;
use sp_core::crypto::Public;
use std::sync::Arc;
fn test_harness<Preconditions, TestBuilder, Test, Postconditions>(
preconditions: Preconditions,
test: TestBuilder,
postconditions: Postconditions,
) where
Preconditions: FnOnce(&mut CandidateSelectionJob),
TestBuilder: FnOnce(mpsc::Sender<CandidateSelectionMessage>, mpsc::Receiver<FromJobCommand>) -> Test,
Test: Future<Output = ()>,
Postconditions: FnOnce(CandidateSelectionJob, Result<(), Error>),
{
let (to_job_tx, to_job_rx) = mpsc::channel(0);
let (from_job_tx, from_job_rx) = mpsc::channel(0);
let mut job = CandidateSelectionJob {
sender: from_job_tx,
receiver: to_job_rx,
metrics: Default::default(),
seconded_candidate: None,
};
preconditions(&mut job);
let (_, job_result) = futures::executor::block_on(future::join(
test(to_job_tx, from_job_rx),
));
postconditions(job, job_result);
}
/// when nothing is seconded so far, the collation is fetched and seconded
#[test]
fn fetches_and_seconds_a_collation() {
let relay_parent = Hash::random();
let para_id: ParaId = 123.into();
let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
let collator_id_clone = collator_id.clone();
let candidate_receipt = CandidateReceipt::default();
let pov = PoV {
block_data: BlockData((0..32).cycle().take(256).collect()),
};
let was_seconded = Arc::new(Mutex::new(false));
let was_seconded_clone = was_seconded.clone();
test_harness(
|_job| {},
|mut to_job, mut from_job| async move {
to_job
.send(CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator_id_clone.clone(),
))
.await
.unwrap();
std::mem::drop(to_job);
while let Some(msg) = from_job.next().await {
match msg {
FromJobCommand::SendMessage(AllMessages::CollatorProtocol(CollatorProtocolMessage::FetchCollation(
got_relay_parent,
got_para_id,
return_sender,
assert_eq!(got_relay_parent, relay_parent);
assert_eq!(got_para_id, para_id);
assert_eq!(collator_id, collator_id_clone);
return_sender
.send((candidate_receipt.clone(), pov.clone()))
.unwrap();
}
FromJobCommand::SendMessage(AllMessages::CandidateBacking(CandidateBackingMessage::Second(
got_relay_parent,
got_candidate_receipt,
got_pov,
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
assert_eq!(got_relay_parent, relay_parent);
assert_eq!(got_candidate_receipt, candidate_receipt);
assert_eq!(got_pov, pov);
*was_seconded_clone.lock().await = true;
}
other => panic!("unexpected message from job: {:?}", other),
}
}
},
|job, job_result| {
assert!(job_result.is_ok());
assert_eq!(job.seconded_candidate.unwrap(), collator_id);
},
);
assert!(Arc::try_unwrap(was_seconded).unwrap().into_inner());
}
/// when something has been seconded, further collation notifications are ignored
#[test]
fn ignores_collation_notifications_after_the_first() {
let relay_parent = Hash::random();
let para_id: ParaId = 123.into();
let prev_collator_id = CollatorId::from_slice(&(0..32).rev().collect::<Vec<u8>>());
let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
let collator_id_clone = collator_id.clone();
let was_seconded = Arc::new(Mutex::new(false));
let was_seconded_clone = was_seconded.clone();
test_harness(
|job| job.seconded_candidate = Some(prev_collator_id.clone()),
|mut to_job, mut from_job| async move {
to_job
.send(CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator_id_clone,
))
.await
.unwrap();
std::mem::drop(to_job);
while let Some(msg) = from_job.next().await {
match msg {
FromJobCommand::SendMessage(AllMessages::CandidateBacking(CandidateBackingMessage::Second(
_got_relay_parent,
_got_candidate_receipt,
_got_pov,
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
*was_seconded_clone.lock().await = true;
}
other => panic!("unexpected message from job: {:?}", other),
}
}
},
|job, job_result| {
assert!(job_result.is_ok());
assert_eq!(job.seconded_candidate.unwrap(), prev_collator_id);
},
);
assert!(!Arc::try_unwrap(was_seconded).unwrap().into_inner());
}
/// reports of invalidity from candidate backing are propagated
#[test]
fn propagates_invalidity_reports() {
let relay_parent = Hash::random();
let collator_id = CollatorId::from_slice(&(0..32).collect::<Vec<u8>>());
let collator_id_clone = collator_id.clone();
let candidate_receipt = CandidateReceipt::default();
let sent_report = Arc::new(Mutex::new(false));
let sent_report_clone = sent_report.clone();
test_harness(
|job| job.seconded_candidate = Some(collator_id.clone()),
|mut to_job, mut from_job| async move {
to_job
.send(CandidateSelectionMessage::Invalid(relay_parent, candidate_receipt))
.await
.unwrap();
std::mem::drop(to_job);
while let Some(msg) = from_job.next().await {
match msg {
FromJobCommand::SendMessage(AllMessages::CollatorProtocol(CollatorProtocolMessage::ReportCollator(
got_collator_id,
assert_eq!(got_collator_id, collator_id_clone);
*sent_report_clone.lock().await = true;
}
other => panic!("unexpected message from job: {:?}", other),
}
}
},
|job, job_result| {
assert!(job_result.is_ok());
assert_eq!(job.seconded_candidate.unwrap(), collator_id);
},
);
assert!(Arc::try_unwrap(sent_report).unwrap().into_inner());
}
}