diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index 0a037ae7676aa2aea66002fea991390e7d0bf5f5..f204c3e14f2c03858b677b747996b9a63b72c0a5 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5157,6 +5157,7 @@ dependencies = [ "futures-timer 3.0.2", "log", "polkadot-node-network-protocol", + "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", @@ -5337,6 +5338,7 @@ name = "polkadot-node-core-candidate-selection" version = "0.1.0" dependencies = [ "futures 0.3.12", + "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-util", "polkadot-primitives", @@ -9286,6 +9288,7 @@ dependencies = [ name = "test-parachain-adder-collator" version = "0.7.26" dependencies = [ + "assert_matches", "futures 0.3.12", "futures-timer 3.0.2", "log", diff --git a/polkadot/node/collation-generation/src/lib.rs b/polkadot/node/collation-generation/src/lib.rs index 09f3db3fd0b02f8c5d615a09777dcb60fbbde6f0..4a93152f97873df1c549c2b0f2594cdf0e38db27 100644 --- a/polkadot/node/collation-generation/src/lib.rs +++ b/polkadot/node/collation-generation/src/lib.rs @@ -276,8 +276,8 @@ async fn handle_new_activations<Context: SubsystemContext>( ctx.spawn("collation generation collation builder", Box::pin(async move { let persisted_validation_data_hash = validation_data.hash(); - let collation = match (task_config.collator)(relay_parent, &validation_data).await { - Some(collation) => collation, + let (collation, result_sender) = match (task_config.collator)(relay_parent, &validation_data).await { + Some(collation) => collation.into_inner(), None => { tracing::debug!( target: LOG_TARGET, @@ -348,7 +348,7 @@ async fn handle_new_activations<Context: SubsystemContext>( metrics.on_collation_generated(); if let Err(err) = task_sender.send(AllMessages::CollatorProtocol( - CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity) + CollatorProtocolMessage::DistributeCollation(ccr, collation.proof_of_validity, result_sender) )).await { tracing::warn!( target: LOG_TARGET, @@ -465,7 +465,7 @@ mod tests { task::{Context as FuturesContext, Poll}, Future, }; - use polkadot_node_primitives::Collation; + use polkadot_node_primitives::{Collation, CollationResult}; use polkadot_node_subsystem::messages::{ AllMessages, RuntimeApiMessage, RuntimeApiRequest, }; @@ -496,10 +496,10 @@ mod tests { struct TestCollator; impl Future for TestCollator { - type Output = Option<Collation>; + type Output = Option<CollationResult>; fn poll(self: Pin<&mut Self>, _cx: &mut FuturesContext) -> Poll<Self::Output> { - Poll::Ready(Some(test_collation())) + Poll::Ready(Some(CollationResult { collation: test_collation(), result_sender: None })) } } @@ -755,6 +755,7 @@ mod tests { AllMessages::CollatorProtocol(CollatorProtocolMessage::DistributeCollation( CandidateReceipt { descriptor, .. }, _pov, + .. )) => { // signature generation is non-deterministic, so we can't just assert that the // expected descriptor is correct. What we can do is validate that the produced diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 5a8a783695852890548f15eb434285e2f9cb0a26..4551c4feaa486970778c59da1cea1c77a2c7c6e6 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -528,7 +528,12 @@ impl CandidateBackingJob { descriptor: candidate.descriptor.clone(), commitments, }); - self.sign_import_and_distribute_statement(statement, parent_span).await?; + if let Some(stmt) = self.sign_import_and_distribute_statement( + statement, + parent_span, + ).await? { + self.issue_candidate_seconded_message(stmt).await?; + } self.distribute_pov(candidate.descriptor, pov).await?; } } @@ -586,6 +591,15 @@ impl CandidateBackingJob { Ok(()) } + async fn issue_candidate_seconded_message( + &mut self, + statement: SignedFullStatement, + ) -> Result<(), Error> { + self.tx_from.send(AllMessages::from(CandidateSelectionMessage::Seconded(self.parent, statement)).into()).await?; + + Ok(()) + } + /// Kick off background validation with intent to second. #[tracing::instrument(level = "trace", skip(self, parent_span, pov), fields(subsystem = LOG_TARGET))] async fn validate_and_second( @@ -631,13 +645,14 @@ impl CandidateBackingJob { &mut self, statement: Statement, parent_span: &JaegerSpan, - ) -> Result<(), Error> { + ) -> Result<Option<SignedFullStatement>, Error> { if let Some(signed_statement) = self.sign_statement(statement).await { self.import_statement(&signed_statement, parent_span).await?; - self.distribute_signed_statement(signed_statement).await?; + self.distribute_signed_statement(signed_statement.clone()).await?; + Ok(Some(signed_statement)) + } else { + Ok(None) } - - Ok(()) } /// Check if there have happened any new misbehaviors and issue necessary messages. @@ -1486,6 +1501,14 @@ mod tests { } ); + assert_matches!( + virtual_overseer.recv().await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Seconded(hash, statement)) => { + assert_eq!(test_state.relay_parent, hash); + assert_matches!(statement.payload(), Statement::Seconded(_)); + } + ); + assert_matches!( virtual_overseer.recv().await, AllMessages::PoVDistribution(PoVDistributionMessage::DistributePoV(hash, descriptor, pov_received)) => { diff --git a/polkadot/node/core/candidate-selection/Cargo.toml b/polkadot/node/core/candidate-selection/Cargo.toml index c85a0cef3caf2d52c327bdfed018ee6a2c6543f1..f5b473d0beb2681fdc9cb80ab24973ca81d41f80 100644 --- a/polkadot/node/core/candidate-selection/Cargo.toml +++ b/polkadot/node/core/candidate-selection/Cargo.toml @@ -14,6 +14,7 @@ sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "maste polkadot-primitives = { path = "../../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } +polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } [dev-dependencies] diff --git a/polkadot/node/core/candidate-selection/src/lib.rs b/polkadot/node/core/candidate-selection/src/lib.rs index 51eaa80a47182e1fad33f0f7688f91b5d2327ce6..daee7d530db38be34b4f932f13e2ab1325bf47a7 100644 --- a/polkadot/node/core/candidate-selection/src/lib.rs +++ b/polkadot/node/core/candidate-selection/src/lib.rs @@ -39,6 +39,7 @@ use polkadot_node_subsystem_util::{ use polkadot_primitives::v1::{ CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV, }; +use polkadot_node_primitives::SignedFullStatement; use std::{pin::Pin, sync::Arc}; use thiserror::Error; @@ -190,6 +191,10 @@ impl CandidateSelectionJob { let _span = span.child("handle-invalid"); self.handle_invalid(candidate_receipt).await; } + Some(CandidateSelectionMessage::Seconded(_, statement)) => { + let _span = span.child("handle-seconded"); + self.handle_seconded(statement).await; + } None => break, } } @@ -251,9 +256,7 @@ impl CandidateSelectionJob { pov, &mut self.sender, &self.metrics, - ) - .await - { + ).await { Err(err) => tracing::warn!(target: LOG_TARGET, err = ?err, "failed to second a candidate"), Ok(()) => self.seconded_candidate = Some(collator_id), } @@ -293,6 +296,46 @@ impl CandidateSelectionJob { }; self.metrics.on_invalid_selection(result); } + + async fn handle_seconded(&mut self, statement: SignedFullStatement) { + let received_from = match &self.seconded_candidate { + Some(peer) => peer, + None => { + tracing::warn!( + target: LOG_TARGET, + "received seconded notice for a candidate we don't remember seconding" + ); + return; + } + }; + tracing::debug!( + target: LOG_TARGET, + statement = ?statement, + "received seconded note for candidate", + ); + + if let Err(e) = self.sender + .send(AllMessages::from(CollatorProtocolMessage::NoteGoodCollation(received_from.clone())).into()).await + { + tracing::debug!( + target: LOG_TARGET, + error = ?e, + "failed to note good collator" + ); + } + + if let Err(e) = self.sender + .send(AllMessages::from( + CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement) + ).into()).await + { + tracing::debug!( + target: LOG_TARGET, + error = ?e, + "failed to notify collator about seconded collation" + ); + } + } } // get a collation from the Collator Protocol subsystem diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index f392924e96a775d56a85f765fc56e466913c0d2b..6129467e9e89f3a44ccdb4b0fa6f891e0aced3a0 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -10,9 +10,9 @@ tracing = "0.1.22" tracing-futures = "0.2.4" thiserror = "1.0.23" - polkadot-primitives = { path = "../../../primitives" } polkadot-node-network-protocol = { path = "../../network/protocol" } +polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } diff --git a/polkadot/node/network/collator-protocol/src/collator_side.rs b/polkadot/node/network/collator-protocol/src/collator_side.rs index 7cbb312498bc6ce0c3a98b2b9c6ec846f524ec5e..57122567dac58aa47b22cb2a23920eef52ea9ac2 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side.rs @@ -18,10 +18,10 @@ use std::collections::{HashMap, HashSet}; use super::{LOG_TARGET, Result}; -use futures::{select, FutureExt}; +use futures::{select, FutureExt, channel::oneshot}; use polkadot_primitives::v1::{ - CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, + CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, CandidateHash, }; use polkadot_subsystem::{ jaeger, PerLeafSpan, @@ -38,6 +38,7 @@ use polkadot_node_subsystem_util::{ request_availability_cores_ctx, metrics::{self, prometheus}, }; +use polkadot_node_primitives::{SignedFullStatement, Statement}; #[derive(Clone, Default)] pub struct Metrics(Option<MetricsInner>); @@ -195,6 +196,9 @@ struct State { /// We will keep up to one local collation per relay-parent. collations: HashMap<Hash, (CandidateReceipt, PoV)>, + /// The result senders per collation. + collation_result_senders: HashMap<CandidateHash, oneshot::Sender<SignedFullStatement>>, + /// Our validator groups per active leaf. our_validators_groups: HashMap<Hash, ValidatorGroup>, @@ -230,6 +234,7 @@ async fn distribute_collation( id: ParaId, receipt: CandidateReceipt, pov: PoV, + result_sender: Option<oneshot::Sender<SignedFullStatement>>, ) -> Result<()> { let relay_parent = receipt.descriptor.relay_parent; @@ -289,6 +294,10 @@ async fn distribute_collation( state.our_validators_groups.insert(relay_parent, current_validators.into()); + if let Some(result_sender) = result_sender { + state.collation_result_senders.insert(receipt.hash(), result_sender); + } + state.collations.insert(relay_parent, (receipt, pov)); Ok(()) @@ -438,7 +447,7 @@ async fn process_msg( CollateOn(id) => { state.collating_on = Some(id); } - DistributeCollation(receipt, pov) => { + DistributeCollation(receipt, pov, result_sender) => { let _span1 = state.span_per_relay_parent .get(&receipt.descriptor.relay_parent).map(|s| s.child("distributing-collation")); let _span2 = jaeger::pov_span(&pov, "distributing-collation"); @@ -454,7 +463,7 @@ async fn process_msg( ); } Some(id) => { - distribute_collation(ctx, state, id, receipt, pov).await?; + distribute_collation(ctx, state, id, receipt, pov, result_sender).await?; } None => { tracing::warn!( @@ -483,6 +492,12 @@ async fn process_msg( "NoteGoodCollation message is not expected on the collator side of the protocol", ); } + NotifyCollationSeconded(_, _) => { + tracing::warn!( + target: LOG_TARGET, + "NotifyCollationSeconded message is not expected on the collator side of the protocol", + ); + } NetworkBridgeUpdateV1(event) => { if let Err(e) = handle_network_msg( ctx, @@ -591,6 +606,17 @@ async fn handle_incoming_peer_message( "Collation message is not expected on the collator side of the protocol", ); } + CollationSeconded(statement) => { + if !matches!(statement.payload(), Statement::Seconded(_)) { + tracing::warn!( + target: LOG_TARGET, + statement = ?statement, + "Collation seconded message received with none-seconded statement.", + ); + } else if let Some(sender) = state.collation_result_senders.remove(&statement.payload().candidate_hash()) { + let _ = sender.send(statement); + } + } } Ok(()) @@ -685,7 +711,9 @@ async fn handle_our_view_change( view: OurView, ) -> Result<()> { for removed in state.view.difference(&view) { - state.collations.remove(removed); + if let Some((receipt, _)) = state.collations.remove(removed) { + state.collation_result_senders.remove(&receipt.hash()); + } state.our_validators_groups.remove(removed); state.connection_requests.remove_all(removed); state.span_per_relay_parent.remove(removed); @@ -1054,7 +1082,7 @@ mod tests { overseer_send( virtual_overseer, - CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone()), + CollatorProtocolMessage::DistributeCollation(candidate.clone(), pov_block.clone(), None), ).await; // obtain the availability cores. diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index a53bdf4a178518e0a9cad6819b3d7e401a8afd25..236e60ca085564a646e542cd4be7010aaf7a2e14 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -39,6 +39,7 @@ use polkadot_node_network_protocol::{ v1 as protocol_v1, View, OurView, PeerId, ReputationChange as Rep, RequestId, }; use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}}; +use polkadot_node_primitives::{Statement, SignedFullStatement}; use super::{modify_reputation, LOG_TARGET, Result}; @@ -200,9 +201,6 @@ struct State { /// Delay after which a collation request would time out. request_timeout: Duration, - /// Possessed collations. - collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>, - /// Leaves have recently moved out of scope. /// These are looked into when we receive previously requested collations that we /// are no longer interested in. @@ -228,35 +226,13 @@ async fn fetch_collation<Context>( where Context: SubsystemContext<Message = CollatorProtocolMessage> { - // First take a look if we have already stored some of the relevant collations. - if let Some(collations) = state.collations.get(&(relay_parent, para_id)) { - for collation in collations.iter() { - if collation.0 == collator_id { - if let Err(e) = tx.send((collation.1.clone(), collation.2.clone())) { - // We do not want this to be fatal because the receving subsystem - // may have closed the results channel for some reason. - tracing::trace!( - target: LOG_TARGET, - err = ?e, - "Failed to send collation", - ); - } - return; - } + let relevant_advertiser = state.advertisements.iter().find_map(|(k, v)| { + if v.contains(&(para_id, relay_parent)) && state.known_collators.get(k) == Some(&collator_id) { + Some(k.clone()) + } else { + None } - } - - // Dodge multiple references to `state`. - let mut relevant_advertiser = None; - - // Has the collator in question advertised a relevant collation? - for (k, v) in state.advertisements.iter() { - if v.contains(&(para_id, relay_parent)) { - if state.known_collators.get(k) == Some(&collator_id) { - relevant_advertiser = Some(k.clone()); - } - } - } + }); // Request the collation. // Assume it is `request_collation`'s job to check and ignore duplicate requests. @@ -278,10 +254,8 @@ where // Since we have a one way map of PeerId -> CollatorId we have to // iterate here. Since a huge amount of peers is not expected this // is a tolerable thing to do. - for (k, v) in state.known_collators.iter() { - if *v == id { - modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await; - } + for (k, _) in state.known_collators.iter().filter(|d| *d.1 == id) { + modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await; } } @@ -295,10 +269,41 @@ async fn note_good_collation<Context>( where Context: SubsystemContext<Message = CollatorProtocolMessage> { - for (peer_id, collator_id) in state.known_collators.iter() { - if id == *collator_id { - modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await; - } + for (peer_id, _) in state.known_collators.iter().filter(|d| *d.1 == id) { + modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await; + } +} + +/// Notify a collator that its collation got seconded. +#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))] +async fn notify_collation_seconded( + ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>, + state: &mut State, + id: CollatorId, + statement: SignedFullStatement, +) { + if !matches!(statement.payload(), Statement::Seconded(_)) { + tracing::error!( + target: LOG_TARGET, + statement = ?statement, + "Notify collation seconded called with a wrong statement.", + ); + return; + } + + let peer_ids = state.known_collators.iter() + .filter_map(|(p, c)| if *c == id { Some(p.clone()) } else { None }) + .collect::<Vec<_>>(); + + if !peer_ids.is_empty() { + let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(statement); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + peer_ids, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + )).await; } } @@ -368,7 +373,7 @@ where if id == request_id { if let Some(per_request) = state.requests_info.remove(&id) { let _ = per_request.received.send(()); - if let Some(collator_id) = state.known_collators.get(&origin) { + if state.known_collators.get(&origin).is_some() { let pov = match pov.decompress() { Ok(pov) => pov, Err(error) => { @@ -395,11 +400,6 @@ where let _ = per_request.result.send((receipt.clone(), pov.clone())); state.metrics.on_request(Ok(())); - - state.collations - .entry((relay_parent, para_id)) - .or_default() - .push((collator_id.clone(), receipt, pov)); } } } @@ -558,6 +558,9 @@ where .map(|s| s.child("received-collation")); received_collation(ctx, state, origin, request_id, receipt, pov).await; } + CollationSeconded(_) => { + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + } } } @@ -584,8 +587,6 @@ async fn remove_relay_parent( } } - state.collations.retain(|k, _| k.0 != relay_parent); - Ok(()) } @@ -705,7 +706,7 @@ where "CollateOn message is not expected on the validator side of the protocol", ); } - DistributeCollation(_, _) => { + DistributeCollation(_, _, _) => { tracing::warn!( target: LOG_TARGET, "DistributeCollation message is not expected on the validator side of the protocol", @@ -721,6 +722,9 @@ where NoteGoodCollation(id) => { note_good_collation(ctx, state, id).await; } + NotifyCollationSeconded(id, statement) => { + notify_collation_seconded(ctx, state, id, statement).await; + } NetworkBridgeUpdateV1(event) => { if let Err(e) = handle_network_msg( ctx, diff --git a/polkadot/node/network/protocol/src/lib.rs b/polkadot/node/network/protocol/src/lib.rs index 81bd80f5270e1ecb8d01a13d3853925835efdc24..997d63063659cb05d38f8a1a8435d9aa7afe886d 100644 --- a/polkadot/node/network/protocol/src/lib.rs +++ b/polkadot/node/network/protocol/src/lib.rs @@ -396,6 +396,9 @@ pub mod v1 { /// A requested collation. #[codec(index = 3)] Collation(RequestId, CandidateReceipt, CompressedPoV), + /// A collation sent to a validator was seconded. + #[codec(index = 4)] + CollationSeconded(SignedFullStatement), } /// All network messages on the validation peer-set. diff --git a/polkadot/node/network/statement-distribution/src/lib.rs b/polkadot/node/network/statement-distribution/src/lib.rs index 960ff129bacf1be4c2a62dd9adb0d9ff5c0d5373..42e481631591e484597773040b7c03112fcc1210 100644 --- a/polkadot/node/network/statement-distribution/src/lib.rs +++ b/polkadot/node/network/statement-distribution/src/lib.rs @@ -40,7 +40,7 @@ use polkadot_node_network_protocol::{ }; use futures::prelude::*; -use futures::channel::{mpsc, oneshot}; +use futures::channel::oneshot; use indexmap::IndexSet; use std::collections::{HashMap, HashSet}; @@ -499,27 +499,6 @@ fn check_statement_signature( .and_then(|v| statement.check_signature(&signing_context, v)) } -type StatementListeners = Vec<mpsc::Sender<SignedFullStatement>>; - -/// Informs all registered listeners about a newly received statement. -/// -/// Removes all closed listeners. -#[tracing::instrument(level = "trace", skip(listeners), fields(subsystem = LOG_TARGET))] -async fn inform_statement_listeners( - statement: &SignedFullStatement, - listeners: &mut StatementListeners, -) { - // Ignore the errors since these will be removed later. - stream::iter(listeners.iter_mut()).for_each_concurrent( - None, - |listener| async move { - let _ = listener.send(statement.clone()).await; - } - ).await; - // Remove any closed listeners. - listeners.retain(|tx| !tx.is_closed()); -} - /// Places the statement in storage if it is new, and then /// circulates the statement to all peers who have not seen it yet, and /// sends all statements dependent on that statement to peers who could previously not receive @@ -699,7 +678,6 @@ async fn handle_incoming_message<'a>( ctx: &mut impl SubsystemContext<Message = StatementDistributionMessage>, message: protocol_v1::StatementDistributionMessage, metrics: &Metrics, - statement_listeners: &mut StatementListeners, ) -> Option<(Hash, &'a StoredStatement)> { let (relay_parent, statement) = match message { protocol_v1::StatementDistributionMessage::Statement(r, s) => (r, s), @@ -770,8 +748,6 @@ async fn handle_incoming_message<'a>( Ok(false) => {} } - inform_statement_listeners(&statement, statement_listeners).await; - // Note: `peer_data.receive` already ensures that the statement is not an unbounded equivocation // or unpinned to a seconded candidate. So it is safe to place it into the storage. match active_head.note_statement(statement) { @@ -841,7 +817,6 @@ async fn handle_network_update( our_view: &mut OurView, update: NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>, metrics: &Metrics, - statement_listeners: &mut StatementListeners, ) { match update { NetworkBridgeEvent::PeerConnected(peer, _role) => { @@ -864,7 +839,6 @@ async fn handle_network_update( ctx, message, metrics, - statement_listeners, ).await } None => None, @@ -931,7 +905,6 @@ impl StatementDistribution { let mut peers: HashMap<PeerId, PeerData> = HashMap::new(); let mut our_view = OurView::default(); let mut active_heads: HashMap<Hash, ActiveHeadData> = HashMap::new(); - let mut statement_listeners = StatementListeners::new(); let metrics = self.metrics; loop { @@ -993,10 +966,6 @@ impl StatementDistribution { StatementDistributionMessage::Share(relay_parent, statement) => { let _timer = metrics.time_share(); - inform_statement_listeners( - &statement, - &mut statement_listeners, - ).await; circulate_statement_and_dependents( &mut peers, &mut active_heads, @@ -1016,12 +985,8 @@ impl StatementDistribution { &mut our_view, event, &metrics, - &mut statement_listeners, ).await; } - StatementDistributionMessage::RegisterStatementListener(tx) => { - statement_listeners.push(tx); - } } } } diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 8eed3bfc5e5f414c421f036a9bfb4024dfb8a0de..c7f2c36a040d020ecf34da95b2a7499594c8def5 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -1962,7 +1962,7 @@ mod tests { use polkadot_primitives::v1::{BlockData, CollatorPair, PoV, CandidateHash}; use polkadot_subsystem::{messages::RuntimeApiRequest, messages::NetworkBridgeEvent, JaegerSpan}; - use polkadot_node_primitives::{Collation, CollationGenerationConfig}; + use polkadot_node_primitives::{CollationResult, CollationGenerationConfig}; use polkadot_node_network_protocol::{PeerId, ReputationChange}; use polkadot_node_subsystem_util::metered; @@ -2631,7 +2631,7 @@ mod tests { struct TestCollator; impl Future for TestCollator { - type Output = Option<Collation>; + type Output = Option<CollationResult>; fn poll(self: Pin<&mut Self>, _cx: &mut futures::task::Context) -> Poll<Self::Output> { panic!("at the Disco") diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs index 1727b1486809ad03b3576bb82ab4f8693ce67b4b..b00df8539d796a39435be1a92a23a9fbc2f84b00 100644 --- a/polkadot/node/primitives/src/lib.rs +++ b/polkadot/node/primitives/src/lib.rs @@ -157,13 +157,33 @@ pub struct Collation<BlockNumber = polkadot_primitives::v1::BlockNumber> { pub hrmp_watermark: BlockNumber, } +/// Result of the [`CollatorFn`] invocation. +pub struct CollationResult { + /// The collation that was build. + pub collation: Collation, + /// An optional result sender that should be informed about a successfully seconded collation. + /// + /// There is no guarantee that this sender is informed ever about any result, it is completly okay to just drop it. + /// However, if it is called, it should be called with the signed statement of a parachain validator seconding the + /// collation. + pub result_sender: Option<futures::channel::oneshot::Sender<SignedFullStatement>>, +} + +impl CollationResult { + /// Convert into the inner values. + pub fn into_inner(self) -> (Collation, Option<futures::channel::oneshot::Sender<SignedFullStatement>>) { + (self.collation, self.result_sender) + } +} + /// Collation function. /// -/// Will be called with the hash of the relay chain block the parachain -/// block should be build on and the [`ValidationData`] that provides -/// information about the state of the parachain on the relay chain. +/// Will be called with the hash of the relay chain block the parachain block should be build on and the +/// [`ValidationData`] that provides information about the state of the parachain on the relay chain. +/// +/// Returns an optional [`CollationResult`]. pub type CollatorFn = Box< - dyn Fn(Hash, &PersistedValidationData) -> Pin<Box<dyn Future<Output = Option<Collation>> + Send>> + dyn Fn(Hash, &PersistedValidationData) -> Pin<Box<dyn Future<Output = Option<CollationResult>> + Send>> + Send + Sync, >; diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 94cb6e44be87550645a9b6a8266d53d5cd54e46f..f670fdd5afb462482154f1b8c69055553f7098e2 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -63,8 +63,13 @@ pub enum CandidateSelectionMessage { /// A candidate collation can be fetched from a collator and should be considered for seconding. Collation(Hash, ParaId, CollatorId), /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. + /// /// The hash is the relay parent. Invalid(Hash, CandidateReceipt), + /// The candidate we recommended to be seconded was validated successfully. + /// + /// The hash is the relay parent. + Seconded(Hash, SignedFullStatement), } impl BoundToRelayParent for CandidateSelectionMessage { @@ -72,6 +77,7 @@ impl BoundToRelayParent for CandidateSelectionMessage { match self { Self::Collation(hash, ..) => *hash, Self::Invalid(hash, _) => *hash, + Self::Seconded(hash, _) => *hash, } } } @@ -174,8 +180,11 @@ pub enum CollatorProtocolMessage { /// /// This should be sent before any `DistributeCollation` message. CollateOn(ParaId), - /// Provide a collation to distribute to validators. - DistributeCollation(CandidateReceipt, PoV), + /// Provide a collation to distribute to validators with an optional result sender. + /// + /// The result sender should be informed when at least one parachain validator seconded the collation. It is also + /// completely okay to just drop the sender. + DistributeCollation(CandidateReceipt, PoV, Option<oneshot::Sender<SignedFullStatement>>), /// Fetch a collation under the given relay-parent for the given ParaId. FetchCollation(Hash, CollatorId, ParaId, oneshot::Sender<(CandidateReceipt, PoV)>), /// Report a collator as having provided an invalid collation. This should lead to disconnect @@ -183,6 +192,8 @@ pub enum CollatorProtocolMessage { ReportCollator(CollatorId), /// Note a collator as having provided a good collation. NoteGoodCollation(CollatorId), + /// Notify a collator that its collation was seconded. + NotifyCollationSeconded(CollatorId, SignedFullStatement), /// Get a network bridge update. NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>), } @@ -192,11 +203,12 @@ impl CollatorProtocolMessage { pub fn relay_parent(&self) -> Option<Hash> { match self { Self::CollateOn(_) => None, - Self::DistributeCollation(receipt, _) => Some(receipt.descriptor().relay_parent), + Self::DistributeCollation(receipt, _, _) => Some(receipt.descriptor().relay_parent), Self::FetchCollation(relay_parent, _, _, _) => Some(*relay_parent), Self::ReportCollator(_) => None, Self::NoteGoodCollation(_) => None, Self::NetworkBridgeUpdateV1(_) => None, + Self::NotifyCollationSeconded(_, _) => None, } } } @@ -503,8 +515,6 @@ pub enum StatementDistributionMessage { Share(Hash, SignedFullStatement), /// Event from the network bridge. NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::StatementDistributionMessage>), - /// Register a listener for shared statements. - RegisterStatementListener(mpsc::Sender<SignedFullStatement>), } impl StatementDistributionMessage { @@ -513,7 +523,6 @@ impl StatementDistributionMessage { match self { Self::Share(hash, _) => Some(*hash), Self::NetworkBridgeUpdateV1(_) => None, - Self::RegisterStatementListener(_) => None, } } } diff --git a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml index 4068d3c3f9e90bfa73c00f70e4cbcc1ac0f70c24..b869e08fd2d3e0ec5e7814f4bbfcac869175cfd6 100644 --- a/polkadot/parachain/test-parachains/adder/collator/Cargo.toml +++ b/polkadot/parachain/test-parachains/adder/collator/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3.12" futures-timer = "3.0.2" log = "0.4.13" structopt = "0.3.21" +assert_matches = "1.4.0" test-parachain-adder = { path = ".." } polkadot-primitives = { path = "../../../../primitives" } diff --git a/polkadot/parachain/test-parachains/adder/collator/src/lib.rs b/polkadot/parachain/test-parachains/adder/collator/src/lib.rs index d8d8d3c1bb467928060411b135c9f7e31b86330d..ce7acd40c93ce34b9210083b3300140808ee8556 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/lib.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/lib.rs @@ -17,16 +17,18 @@ //! Collator for the adder test parachain. use futures_timer::Delay; -use polkadot_node_primitives::{Collation, CollatorFn}; +use polkadot_node_primitives::{Collation, CollatorFn, CollationResult, Statement, SignedFullStatement}; use polkadot_primitives::v1::{CollatorId, CollatorPair, PoV}; use parity_scale_codec::{Encode, Decode}; -use sp_core::Pair; +use sp_core::{Pair, traits::SpawnNamed}; use std::{ collections::HashMap, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, atomic::{AtomicU32, Ordering}}, time::Duration, }; use test_parachain_adder::{execute, hash_state, BlockData, HeadData}; +use futures::channel::oneshot; +use assert_matches::assert_matches; /// The amount we add when producing a new block. /// @@ -102,6 +104,7 @@ impl State { pub struct Collator { state: Arc<Mutex<State>>, key: CollatorPair, + seconded_collations: Arc<AtomicU32>, } impl Collator { @@ -110,6 +113,7 @@ impl Collator { Self { state: Arc::new(Mutex::new(State::genesis())), key: CollatorPair::generate().0, + seconded_collations: Arc::new(AtomicU32::new(0)), } } @@ -142,10 +146,11 @@ impl Collator { /// Create the collation function. /// /// This collation function can be plugged into the overseer to generate collations for the adder parachain. - pub fn create_collation_function(&self) -> CollatorFn { + pub fn create_collation_function(&self, spawner: impl SpawnNamed + Clone + 'static) -> CollatorFn { use futures::FutureExt as _; let state = self.state.clone(); + let seconded_collations = self.seconded_collations.clone(); Box::new(move |relay_parent, validation_data| { let parent = HeadData::decode(&mut &validation_data.parent_head.0[..]) @@ -159,19 +164,33 @@ impl Collator { block_data, ); + let pov = PoV { block_data: block_data.encode().into() }; + let collation = Collation { upward_messages: Vec::new(), horizontal_messages: Vec::new(), new_validation_code: None, head_data: head_data.encode().into(), - proof_of_validity: PoV { - block_data: block_data.encode().into(), - }, + proof_of_validity: pov.clone(), processed_downward_messages: 0, hrmp_watermark: validation_data.relay_parent_number, }; - async move { Some(collation) }.boxed() + let (result_sender, recv) = oneshot::channel::<SignedFullStatement>(); + let seconded_collations = seconded_collations.clone(); + spawner.spawn("adder-collator-seconded", async move { + if let Ok(res) = recv.await { + assert_matches!( + res.payload(), + Statement::Seconded(s) if s.descriptor.pov_hash == pov.hash(), + "Seconded statement should match our collation!", + ); + + seconded_collations.fetch_add(1, Ordering::Relaxed); + } + }.boxed()); + + async move { Some(CollationResult { collation, result_sender: Some(result_sender) }) }.boxed() }) } @@ -188,6 +207,21 @@ impl Collator { } } } + + /// Wait until `seconded` collations of this collator are seconded by a parachain validator. + /// + /// The internal counter isn't de-duplicating the collations when counting the number of seconded collations. This + /// means when one collation is seconded by X validators, we record X seconded messages. + pub async fn wait_for_seconded_collations(&self, seconded: u32) { + let seconded_collations = self.seconded_collations.clone(); + loop { + Delay::new(Duration::from_secs(1)).await; + + if seconded <= seconded_collations.load(Ordering::Relaxed) { + return; + } + } + } } #[cfg(test)] @@ -200,8 +234,9 @@ mod tests { #[test] fn collator_works() { + let spawner = sp_core::testing::TaskExecutor::new(); let collator = Collator::new(); - let collation_function = collator.create_collation_function(); + let collation_function = collator.create_collation_function(spawner); for i in 0..5 { let parent_head = collator @@ -220,11 +255,15 @@ mod tests { let collation = block_on(collation_function(Default::default(), &validation_data)).unwrap(); - validate_collation(&collator, (*parent_head).clone(), collation); + validate_collation(&collator, (*parent_head).clone(), collation.collation); } } - fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) { + fn validate_collation( + collator: &Collator, + parent_head: HeadData, + collation: Collation, + ) { let ret = polkadot_parachain::wasm_executor::validate_candidate( collator.validation_code(), ValidationParams { diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index 4016e6d7d7f071379ee35aaba4849b339f605157..5ced3673e0cfe6c536a6d609d896b08b733dcace 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -81,7 +81,7 @@ fn main() -> Result<()> { let config = CollationGenerationConfig { key: collator.collator_key(), - collator: collator.create_collation_function(), + collator: collator.create_collation_function(full_node.task_manager.spawn_handle()), para_id, }; overseer_handler diff --git a/polkadot/parachain/test-parachains/adder/collator/tests/integration.rs b/polkadot/parachain/test-parachains/adder/collator/tests/integration.rs index 215e32d7cc0c8e51dbaf91ad47a241ea8dacc446..fbc20867525e496216c709e0a58f80c96ffdfb06 100644 --- a/polkadot/parachain/test-parachains/adder/collator/tests/integration.rs +++ b/polkadot/parachain/test-parachains/adder/collator/tests/integration.rs @@ -63,11 +63,18 @@ async fn collating_using_adder_collator(task_executor: sc_service::TaskExecutor) collator.collator_id(), ); - charlie.register_collator(collator.collator_key(), para_id, collator.create_collation_function()).await; + charlie.register_collator( + collator.collator_key(), + para_id, + collator.create_collation_function(charlie.task_manager.spawn_handle()), + ).await; // Wait until the parachain has 4 blocks produced. collator.wait_for_blocks(4).await; + // Wait until the collator received `12` seconded statements for its collations. + collator.wait_for_seconded_collations(12).await; + join!( alice.task_manager.clean_shutdown(), bob.task_manager.clean_shutdown(), diff --git a/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md b/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md index 5b9e8d6654b7cbebe49c3fdebd0b9fe44995e214..34be8ea7c139e6a739ab86b51eeb863cbe4bde1b 100644 --- a/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md +++ b/polkadot/roadmap/implementers-guide/src/node/collators/collation-generation.md @@ -32,8 +32,28 @@ pub struct Collation { pub proof_of_validity: PoV, } -type CollatorFn = Box< - dyn Fn(Hash, &PeristedValidationData) -> Pin<Box<dyn Future<Output = Option<Collation>>>> +/// Result of the [`CollatorFn`] invocation. +pub struct CollationResult { + /// The collation that was build. + collation: Collation, + /// An optional result sender that should be informed about a successfully seconded collation. + /// + /// There is no guarantee that this sender is informed ever about any result, it is completly okay to just drop it. + /// However, if it is called, it should be called with the signed statement of a parachain validator seconding the + /// collation. + result_sender: Option<oneshot::Sender<SignedFullStatement>>, +} + +/// Collation function. +/// +/// Will be called with the hash of the relay chain block the parachain block should be build on and the +/// [`ValidationData`] that provides information about the state of the parachain on the relay chain. +/// +/// Returns an optional [`CollationResult`]. +pub type CollatorFn = Box< + dyn Fn(Hash, &PersistedValidationData) -> Pin<Box<dyn Future<Output = Option<CollationResult>> + Send>> + + Send + + Sync, >; struct CollationGenerationConfig { diff --git a/polkadot/roadmap/implementers-guide/src/node/subsystems-and-jobs.md b/polkadot/roadmap/implementers-guide/src/node/subsystems-and-jobs.md index e792cd35f6e7211bdb60eec4049be18b2fc0ee45..1d0a6e7b7be6e1e243e065c95ad65366457c68a1 100644 --- a/polkadot/roadmap/implementers-guide/src/node/subsystems-and-jobs.md +++ b/polkadot/roadmap/implementers-guide/src/node/subsystems-and-jobs.md @@ -257,24 +257,18 @@ with implementing a gossip protocol: sequenceDiagram participant SD as StatementDistribution participant NB as NetworkBridge - participant Listener alt On receipt of a<br/>SignedStatement from CandidateBacking % fn circulate_statement_and_dependents SD ->> NB: SendValidationMessage Note right of NB: Bridge sends validation message to all appropriate peers - else On initialization, from other subsystems: - Listener ->> SD: RegisterStatementListener else On receipt of peer validation message NB ->> SD: NetworkBridgeUpdateV1 % fn handle_incoming_message alt if we aren't already aware of the relay parent for this statement SD ->> NB: ReportPeer - else the statement corresponds to our View - Note over SD,Listener: Forward the statement to each registered listener - SD ->> Listener: SignedFullStatement end % fn circulate_statement diff --git a/polkadot/roadmap/implementers-guide/src/types/network.md b/polkadot/roadmap/implementers-guide/src/types/network.md index eb7bbe7070e9c46087edefdec818d1cc85bf206f..a2da009d7cf78390205a1460118b1d70b7158d63 100644 --- a/polkadot/roadmap/implementers-guide/src/types/network.md +++ b/polkadot/roadmap/implementers-guide/src/types/network.md @@ -105,6 +105,8 @@ enum CollatorProtocolV1Message { RequestCollation(RequestId, Hash, ParaId), /// A requested collation. Collation(RequestId, CandidateReceipt, CompressedPoV), + /// A collation sent to a validator was seconded. + CollationSeconded(SignedFullStatement), } ``` diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 60994961a889e6d48f68c94c449eab501bdb5857..a9aa308983246715de85792986532c0a33562a66 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -234,10 +234,12 @@ These messages are sent to the [Candidate Selection subsystem](../node/backing/c ```rust enum CandidateSelectionMessage { - /// A candidate collation can be fetched from a collator and should be considered for seconding. - Collation(RelayParent, ParaId, CollatorId), - /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. - Invalid(CandidateReceipt), + /// A candidate collation can be fetched from a collator and should be considered for seconding. + Collation(RelayParent, ParaId, CollatorId), + /// We recommended a particular candidate to be seconded, but it was invalid; penalize the collator. + Invalid(RelayParent, CandidateReceipt), + /// The candidate we recommended to be seconded was validated successfully. + Seconded(RelayParent, SignedFullStatement), } ``` @@ -290,15 +292,20 @@ enum CollatorProtocolMessage { /// /// This should be sent before any `DistributeCollation` message. CollateOn(ParaId), - /// Provide a collation to distribute to validators. - DistributeCollation(CandidateReceipt, PoV), + /// Provide a collation to distribute to validators with an optional result sender. + /// + /// The result sender should be informed when at least one parachain validator seconded the collation. It is also + /// completely okay to just drop the sender. + DistributeCollation(CandidateReceipt, PoV, Option<oneshot::Sender<SignedFullStatement>>), /// Fetch a collation under the given relay-parent for the given ParaId. FetchCollation(Hash, ParaId, ResponseChannel<(CandidateReceipt, PoV)>), /// Report a collator as having provided an invalid collation. This should lead to disconnect /// and blacklist of the collator. ReportCollator(CollatorId), /// Note a collator as having provided a good collation. - NoteGoodCollation(CollatorId), + NoteGoodCollation(CollatorId, SignedFullStatement), + /// Notify a collator that its collation was seconded. + NotifyCollationSeconded(CollatorId, SignedFullStatement), } ``` @@ -539,8 +546,6 @@ enum StatementDistributionMessage { /// The statement distribution subsystem assumes that the statement should be correctly /// signed. Share(Hash, SignedFullStatement), - /// Register a listener to be notified on any new statements. - RegisterStatementListener(ResponseChannel<SignedFullStatement>), } ```