Unverified Commit ac5ef00e authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Notify collators about seconded collation (#2430)

* Notify collators about seconded collation

This pr adds functionality to inform a collator that its collation was
seconded by a parachain validator. Before this signed statement was only
gossiped over the validation substream. Now, we explicitly send the
seconded statement to the collator after it was validated successfully.

Besides that it changes the `CollatorFn` to return an optional result
sender that is informed when the build collation was seconded by a
parachain validator.

* Add test

* Make sure we only send `Seconded` statements

* Make sure we only receive valid statements

* Review feedback
parent 137e6d63
Pipeline #124081 passed with stages
in 30 minutes and 33 seconds
......@@ -5157,6 +5157,7 @@ dependencies = [
"futures-timer 3.0.2",
"log",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
......@@ -5337,6 +5338,7 @@ name = "polkadot-node-core-candidate-selection"
version = "0.1.0"
dependencies = [
"futures 0.3.12",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-primitives",
......@@ -9286,6 +9288,7 @@ dependencies = [
name = "test-parachain-adder-collator"
version = "0.7.26"
dependencies = [
"assert_matches",
"futures 0.3.12",
"futures-timer 3.0.2",
"log",
......
......@@ -276,8 +276,8 @@ async fn handle_new_activations<Context: SubsystemContext>(
ctx.spawn("collation generation collation builder", Box::pin(async move {
let persisted_validation_data_hash = validation_data.hash();
let collation = match (task_config.collator)(relay_parent, &validation_data).await {
Some(collation) => collation,
let (collation, result_sender) = match (task_config.collator)(relay_parent, &validation_data).await {
Some(collation) => collation.into_inner(),
None => {
tracing::debug!(
target: LOG_TARGET,
......@@ -348,7 +348,7 @@ async fn handle_new_activations<Context: SubsystemContext>(
metrics.on_collation_generated();
if let Err(err) = task_sender.send(AllMessages::CollatorProtocol(
CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity)
CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity, result_sender)
)).await {
tracing::warn!(
target: LOG_TARGET,
......@@ -465,7 +465,7 @@ mod tests {
task::{Context as FuturesContext, Poll},
Future,
};
use polkadot_node_primitives::Collation;
use polkadot_node_primitives::{Collation, CollationResult};
use polkadot_node_subsystem::messages::{
AllMessages, RuntimeApiMessage, RuntimeApiRequest,
};
......@@ -496,10 +496,10 @@ mod tests {
struct TestCollator;
impl Future for TestCollator {
type Output = Option<Collation>;
type Output = Option<CollationResult>;
fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> {
Poll::Ready(Some(test_collation()))
Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None }))
}
}
......@@ -755,6 +755,7 @@ mod tests {
AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation(
CandidateReceipt { descriptor, .. },
_pov,
..
)) => {
// signature generation is non-deterministic, so we can't just assert that the
// expected descriptor is correct. What we can do is validate that the produced
......
......@@ -528,7 +528,12 @@ impl CandidateBackingJob {
descriptor: candidate.descriptor.clone(),
commitments,
});
self.sign_import_and_distribute_statement(statement, parent_span).await?;
if let Some(stmt) = self.sign_import_and_distribute_statement(
statement,
parent_span,
).await? {
self.issue_candidate_seconded_message(stmt).await?;
}
self.distribute_pov(candidate.descriptor, pov).await?;
}
}
......@@ -586,6 +591,15 @@ impl CandidateBackingJob {
Ok(())
}
async fn issue_candidate_seconded_message(
&mut self,
statement: SignedFullStatement,
) -> Result<(), Error> {
self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Seconded(self.parent, statement)).into()).await?;
Ok(())
}
/// Kick off background validation with intent to second.
#[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))]
async fn validate_and_second(
......@@ -631,13 +645,14 @@ impl CandidateBackingJob {
&mut self,
statement: Statement,
parent_span: &JaegerSpan,
) -> Result<(), Error> {
) -> Result<Option<SignedFullStatement>, Error> {
if let Some(signed_statement) = self.sign_statement(statement).await {
self.import_statement(&signed_statement, parent_span).await?;
self.distribute_signed_statement(signed_statement).await?;
self.distribute_signed_statement(signed_statement.clone()).await?;
Ok(Some(signed_statement))
} else {
Ok(None)
}
Ok(())
}
/// Check if there have happened any new misbehaviors and issue necessary messages.
......@@ -1486,6 +1501,14 @@ mod tests {
}
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::CandidateSelection(CandidateSelectionMessage::Seconded(hash, statement)) => {
assert_eq!(test_state.relay_parent, hash);
assert_matches!(statement.payload(), Statement::Seconded(_));
}
);
assert_matches!(
virtual_overseer.recv().await,
AllMessages::PoVDistribution(PoVDistributionMessage::DistributePoV(hash, descriptor, pov_received)) => {
......
......@@ -14,6 +14,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
[dev-dependencies]
......
......@@ -39,6 +39,7 @@ use polkadot_node_subsystem_util::{
use polkadot_primitives::v1::{
CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV,
};
use polkadot_node_primitives::SignedFullStatement;
use std::{pin::Pin, sync::Arc};
use thiserror::Error;
......@@ -190,6 +191,10 @@ impl CandidateSelectionJob {
let _span = span.child("handle-invalid");
self.handle_invalid(candidate_receipt).await;
}
Some(CandidateSelectionMessage::Seconded(_, statement)) => {
let _span = span.child("handle-seconded");
self.handle_seconded(statement).await;
}
None => break,
}
}
......@@ -251,9 +256,7 @@ impl CandidateSelectionJob {
pov,
&mut self.sender,
&self.metrics,
)
.await
{
).await {
Err(err) => tracing::warn!(target: LOG_TARGET, err = ?err, "failed to second a candidate"),
Ok(()) => self.seconded_candidate = Some(collator_id),
}
......@@ -293,6 +296,46 @@ impl CandidateSelectionJob {
};
self.metrics.on_invalid_selection(result);
}
async fn handle_seconded(&mut self, statement: SignedFullStatement) {
let received_from = match &self.seconded_candidate {
Some(peer) => peer,
None => {
tracing::warn!(
target: LOG_TARGET,
"received seconded notice for a candidate we don't remember seconding"
);
return;
}
};
tracing::debug!(
target: LOG_TARGET,
statement = ?statement,
"received seconded note for candidate",
);
if let Err(e) = self.sender
.send(AllMessages::from(CollatorProtocolMessage::NoteGoodCollation(received_from.clone())).into()).await
{
tracing::debug!(
target: LOG_TARGET,
error = ?e,
"failed to note good collator"
);
}
if let Err(e) = self.sender
.send(AllMessages::from(
CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement)
).into()).await
{
tracing::debug!(
target: LOG_TARGET,
error = ?e,
"failed to notify collator about seconded collation"
);
}
}
}
// get a collation from the Collator Protocol subsystem
......
......@@ -10,9 +10,9 @@ tracing = "0.1.22"
tracing-futures = "0.2.4"
thiserror = "1.0.23"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
......
......@@ -18,10 +18,10 @@ use std::collections::{HashMap, HashSet};
use super::{LOG_TARGET, Result};
use futures::{select, FutureExt};
use futures::{select, FutureExt, channel::oneshot};
use polkadot_primitives::v1::{
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId,
CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, CandidateHash,
};
use polkadot_subsystem::{
jaeger, PerLeafSpan,
......@@ -38,6 +38,7 @@ use polkadot_node_subsystem_util::{
request_availability_cores_ctx,
metrics::{self, prometheus},
};
use polkadot_node_primitives::{SignedFullStatement, Statement};
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
......@@ -195,6 +196,9 @@ struct State {
/// We will keep up to one local collation per relay-parent.
collations: HashMap<Hash, (CandidateReceipt, PoV)>,
/// The result senders per collation.
collation_result_senders: HashMap<CandidateHash, oneshot::Sender<SignedFullStatement>>,
/// Our validator groups per active leaf.
our_validators_groups: HashMap<Hash, ValidatorGroup>,
......@@ -230,6 +234,7 @@ async fn distribute_collation(
id: ParaId,
receipt: CandidateReceipt,
pov: PoV,
result_sender: Option<oneshot::Sender<SignedFullStatement>>,
) -> Result<()> {
let relay_parent = receipt.descriptor.relay_parent;
......@@ -289,6 +294,10 @@ async fn distribute_collation(
state.our_validators_groups.insert(relay_parent, current_validators.into());
if let Some(result_sender) = result_sender {
state.collation_result_senders.insert(receipt.hash(), result_sender);
}
state.collations.insert(relay_parent, (receipt, pov));
Ok(())
......@@ -438,7 +447,7 @@ async fn process_msg(
CollateOn(id) => {
state.collating_on = Some(id);
}
DistributeCollation(receipt, pov) => {
DistributeCollation(receipt, pov, result_sender) => {
let _span1 = state.span_per_relay_parent
.get(&receipt.descriptor.relay_parent).map(|s| s.child("distributing-collation"));
let _span2 = jaeger::pov_span(&pov, "distributing-collation");
......@@ -454,7 +463,7 @@ async fn process_msg(
);
}
Some(id) => {
distribute_collation(ctx, state, id, receipt, pov).await?;
distribute_collation(ctx, state, id, receipt, pov, result_sender).await?;
}
None => {
tracing::warn!(
......@@ -483,6 +492,12 @@ async fn process_msg(
"NoteGoodCollation message is not expected on the collator side of the protocol",
);
}
NotifyCollationSeconded(_, _) => {
tracing::warn!(
target: LOG_TARGET,
"NotifyCollationSeconded message is not expected on the collator side of the protocol",
);
}
NetworkBridgeUpdateV1(event) => {
if let Err(e) = handle_network_msg(
ctx,
......@@ -591,6 +606,17 @@ async fn handle_incoming_peer_message(
"Collation message is not expected on the collator side of the protocol",
);
}
CollationSeconded(statement) => {
if !matches!(statement.payload(), Statement::Seconded(_)) {
tracing::warn!(
target: LOG_TARGET,
statement = ?statement,
"Collation seconded message received with none-seconded statement.",
);
} else if let Some(sender) = state.collation_result_senders.remove(&statement.payload().candidate_hash()) {
let _ = sender.send(statement);
}
}
}
Ok(())
......@@ -685,7 +711,9 @@ async fn handle_our_view_change(
view: OurView,
) -> Result<()> {
for removed in state.view.difference(&view) {
state.collations.remove(removed);
if let Some((receipt, _)) = state.collations.remove(removed) {
state.collation_result_senders.remove(&receipt.hash());
}
state.our_validators_groups.remove(removed);
state.connection_requests.remove_all(removed);
state.span_per_relay_parent.remove(removed);
......@@ -1054,7 +1082,7 @@ mod tests {
overseer_send(
virtual_overseer,
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()),
CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone(), None),
).await;
// obtain the availability cores.
......
......@@ -39,6 +39,7 @@ use polkadot_node_network_protocol::{
v1 as protocol_v1, View, OurView, PeerId, ReputationChange as Rep, RequestId,
};
use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}};
use polkadot_node_primitives::{Statement, SignedFullStatement};
use super::{modify_reputation, LOG_TARGET, Result};
......@@ -200,9 +201,6 @@ struct State {
/// Delay after which a collation request would time out.
request_timeout: Duration,
/// Possessed collations.
collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>,
/// Leaves have recently moved out of scope.
/// These are looked into when we receive previously requested collations that we
/// are no longer interested in.
......@@ -228,35 +226,13 @@ async fn fetch_collation<Context>(
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
// First take a look if we have already stored some of the relevant collations.
if let Some(collations) = state.collations.get(&(relay_parent, para_id)) {
for collation in collations.iter() {
if collation.0 == collator_id {
if let Err(e) = tx.send((collation.1.clone(), collation.2.clone())) {
// We do not want this to be fatal because the receving subsystem
// may have closed the results channel for some reason.
tracing::trace!(
target: LOG_TARGET,
err = ?e,
"Failed to send collation",
);
}
return;
}
let relevant_advertiser = state.advertisements.iter().find_map(|(k, v)| {
if v.contains(&(para_id, relay_parent)) && state.known_collators.get(k) == Some(&collator_id) {
Some(k.clone())
} else {
None
}
}
// Dodge multiple references to `state`.
let mut relevant_advertiser = None;
// Has the collator in question advertised a relevant collation?
for (k, v) in state.advertisements.iter() {
if v.contains(&(para_id, relay_parent)) {
if state.known_collators.get(k) == Some(&collator_id) {
relevant_advertiser = Some(k.clone());
}
}
}
});
// Request the collation.
// Assume it is `request_collation`'s job to check and ignore duplicate requests.
......@@ -278,10 +254,8 @@ where
// Since we have a one way map of PeerId -> CollatorId we have to
// iterate here. Since a huge amount of peers is not expected this
// is a tolerable thing to do.
for (k, v) in state.known_collators.iter() {
if *v == id {
modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await;
}
for (k, _) in state.known_collators.iter().filter(|d| *d.1 == id) {
modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await;
}
}
......@@ -295,10 +269,41 @@ async fn note_good_collation<Context>(
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
for (peer_id, collator_id) in state.known_collators.iter() {
if id == *collator_id {
modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await;
}
for (peer_id, _) in state.known_collators.iter().filter(|d| *d.1 == id) {
modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await;
}
}
/// Notify a collator that its collation got seconded.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn notify_collation_seconded(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
id: CollatorId,
statement: SignedFullStatement,
) {
if !matches!(statement.payload(), Statement::Seconded(_)) {
tracing::error!(
target: LOG_TARGET,
statement = ?statement,
"Notify collation seconded called with a wrong statement.",
);
return;
}
let peer_ids = state.known_collators.iter()
.filter_map(|(p, c)| if *c == id { Some(p.clone()) } else { None })
.collect::<Vec<_>>();
if !peer_ids.is_empty() {
let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(statement);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
peer_ids,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
}
}
......@@ -368,7 +373,7 @@ where
if id == request_id {
if let Some(per_request) = state.requests_info.remove(&id) {
let _ = per_request.received.send(());
if let Some(collator_id) = state.known_collators.get(&origin) {
if state.known_collators.get(&origin).is_some() {
let pov = match pov.decompress() {
Ok(pov) => pov,
Err(error) => {
......@@ -395,11 +400,6 @@ where
let _ = per_request.result.send((receipt.clone(), pov.clone()));
state.metrics.on_request(Ok(()));
state.collations
.entry((relay_parent, para_id))
.or_default()
.push((collator_id.clone(), receipt, pov));
}
}
}
......@@ -558,6 +558,9 @@ where
.map(|s| s.child("received-collation"));
received_collation(ctx, state, origin, request_id, receipt, pov).await;
}
CollationSeconded(_) => {
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
}
}
......@@ -584,8 +587,6 @@ async fn remove_relay_parent(
}
}
state.collations.retain(|k, _| k.0 != relay_parent);
Ok(())
}
......@@ -705,7 +706,7 @@ where
"CollateOn message is not expected on the validator side of the protocol",
);
}
DistributeCollation(_, _) => {
DistributeCollation(_, _, _) => {
tracing::warn!(
target: LOG_TARGET,
"DistributeCollation message is not expected on the validator side of the protocol",
......@@ -721,6 +722,9 @@ where
NoteGoodCollation(id) => {
note_good_collation(ctx, state, id).await;
}
NotifyCollationSeconded(id, statement) => {
notify_collation_seconded(ctx, state, id, statement).await;
}
NetworkBridgeUpdateV1(event) => {
if let Err(e) = handle_network_msg(
ctx,
......
......@@ -396,6 +396,9 @@ pub mod v1 {
/// A requested collation.
#[codec(index = 3)]
Collation(RequestId, CandidateReceipt, CompressedPoV),
/// A collation sent to a validator was seconded.
#[codec(index = 4)]
CollationSeconded(SignedFullStatement),
}
/// All network messages on the validation peer-set.
......
......@@ -40,7 +40,7 @@ use polkadot_node_network_protocol::{
};
use futures::prelude::*;
use futures::channel::{mpsc, oneshot};
use futures::channel::oneshot;
use indexmap::IndexSet;
use std::collections::{HashMap, HashSet};
......@@ -499,27 +499,6 @@ fn check_statement_signature(
.and_then(|v| statement.check_signature(&signing_context, v))
}
type StatementListeners = Vec<mpsc::Sender<SignedFullStatement>>;
/// Informs all registered listeners about a newly received statement.
///
/// Removes all closed listeners.
#[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))]
async fn inform_statement_listeners(
statement: &SignedFullStatement,
listeners: &mut StatementListeners,
) {
// Ignore the errors since these will be removed later.
stream::iter(listeners.iter_mut()).for_each_concurrent(
None,
|listener| async move {
let _ = listener.send(statement.clone()).await;
}
).await;
// Remove any closed listeners.
listeners.retain(|tx| !tx.is_closed());
}
/// Places the statement in storage if it is new, and then
/// circulates the statement to all peers who have not seen it yet, and
/// sends all statements dependent on that statement to peers who could previously not receive
......@@ -699,7 +678,6 @@ async fn handle_incoming_message<'a>(
ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>,
message: protocol_v1::StatementDistributionMessage,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) -> Option<(Hash, &'a StoredStatement)> {
let (relay_parent, statement) = match message {
protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s),
......@@ -770,8 +748,6 @@ async fn handle_incoming_message<'a>(
Ok(false) => {}
}
inform_statement_listeners(&statement, statement_listeners).await;
// Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation
// or unpinned to a seconded candidate. So it is safe to place it into the storage.
match active_head.note_statement(statement) {
......@@ -841,7 +817,6 @@ async fn handle_network_update(
our_view: &mut OurView,
update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>,
metrics: &Metrics,
statement_listeners: &mut StatementListeners,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, _role) => {
......@@ -864,7 +839,6 @@ async fn handle_network_update(
ctx,
message,
metrics,
statement_listeners,
).await
}
None => None,
......@@ -931,7 +905,6 @@ impl StatementDistribution {
let mut peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut our_view = OurView::default();
let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new();