Unverified Commit f17bc16a authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Only send one collation per relay parent at a time to validators (#3360)

* Only send one collation per relay parent at a time to validators

This changes the way we are sending collations to validators. Before we
answered every collation request immediatley. Now we only answer one
pov request at a time per relay parent. This should bring down the
bandwidth requirements and should help parachains to include bigger
blocks more easily.

* Guide updates

* Review feedback.
parent b1036a4b
Pipeline #144526 passed with stages
in 36 minutes and 28 seconds
......@@ -14,12 +14,15 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::collections::{HashMap, HashSet};
use std::{collections::{HashMap, HashSet, VecDeque}, pin::Pin};
use futures::{FutureExt, channel::oneshot};
use futures::{FutureExt, StreamExt, channel::oneshot, stream::FuturesUnordered, select, Future};
use sp_core::Pair;
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash, Id as ParaId};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateHash, CandidateReceipt, CollatorPair, CoreIndex, CoreState, GroupIndex, Hash,
Id as ParaId,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, jaeger,
messages::{
......@@ -27,13 +30,11 @@ use polkadot_subsystem::{
},
};
use polkadot_node_network_protocol::{
OurView, PeerId, View, peer_set::PeerSet,
OurView, PeerId, UnifiedReputationChange as Rep, View, peer_set::PeerSet,
request_response::{
IncomingRequest,
v1::{CollationFetchingRequest, CollationFetchingResponse},
IncomingRequest, request::OutgoingResponse, v1::{CollationFetchingRequest, CollationFetchingResponse}
},
v1 as protocol_v1,
UnifiedReputationChange as Rep,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
......@@ -59,6 +60,12 @@ impl Metrics {
}
}
fn on_collation_sent_requested(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_send_requested.inc();
}
}
fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
......@@ -75,6 +82,7 @@ impl Metrics {
struct MetricsInner {
advertisements_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
collations_send_requested: prometheus::Counter<prometheus::U64>,
process_msg: prometheus::Histogram,
}
......@@ -90,6 +98,13 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
collations_send_requested: prometheus::register(
prometheus::Counter::new(
"parachain_collations_sent_requested_total",
"A number of collations requested to be sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"parachain_collations_sent_total",
......@@ -185,6 +200,17 @@ struct Collation {
status: CollationStatus,
}
/// Stores the state for waiting collation fetches.
#[derive(Default)]
struct WaitingCollationFetches {
/// Is there currently a collation getting fetched?
collation_fetch_active: bool,
/// The collation fetches waiting to be fulfilled.
waiting: VecDeque<IncomingRequest<CollationFetchingRequest>>,
}
type ActiveCollationFetches = FuturesUnordered<Pin<Box<dyn Future<Output = Hash> + Send + 'static>>>;
struct State {
/// Our network peer id.
local_peer_id: PeerId,
......@@ -217,11 +243,23 @@ struct State {
/// Our validator groups per active leaf.
our_validators_groups: HashMap<Hash, ValidatorGroup>,
/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s by `PeerConnected` events.
/// The mapping from [`PeerId`] to [`ValidatorId`]. This is filled over time as we learn the [`PeerId`]'s
/// by `PeerConnected` events.
peer_ids: HashMap<PeerId, AuthorityDiscoveryId>,
/// Metrics.
metrics: Metrics,
/// All collation fetching requests that are still waiting to be answered.
///
/// They are stored per relay parent, when our view changes and the relay parent moves out, we will cancel the fetch
/// request.
waiting_collation_fetches: HashMap<Hash, WaitingCollationFetches>,
/// Active collation fetches.
///
/// Each future returns the relay parent of the finished collation fetch.
active_collation_fetches: ActiveCollationFetches,
}
impl State {
......@@ -240,6 +278,8 @@ impl State {
collation_result_senders: Default::default(),
our_validators_groups: Default::default(),
peer_ids: Default::default(),
waiting_collation_fetches: Default::default(),
active_collation_fetches: Default::default(),
}
}
......@@ -349,8 +389,9 @@ async fn distribute_collation(
state.collations.insert(relay_parent, Collation { receipt, pov, status: CollationStatus::Created });
let interested = state.peers_interested_in_leaf(&relay_parent);
// Make sure already connected peers get collations:
for peer_id in state.peers_interested_in_leaf(&relay_parent) {
for peer_id in interested {
advertise_collation(ctx, state, relay_parent, peer_id).await;
}
......@@ -373,6 +414,7 @@ async fn determine_core(
}
}
}
Ok(None)
}
......@@ -455,7 +497,7 @@ async fn declare(
async fn connect_to_validators(
ctx: &mut impl SubsystemContext,
validator_ids: Vec<AuthorityDiscoveryId>,
) {
) {
// ignore address resolution failure
// will reissue a new request on new collation
let (failed, _) = oneshot::channel();
......@@ -607,8 +649,18 @@ async fn process_msg(
return Ok(());
};
state.metrics.on_collation_sent_requested();
let _span = _span.as_ref().map(|s| s.child("sending"));
send_collation(state, incoming, receipt, pov).await;
let waiting = state.waiting_collation_fetches.entry(incoming.payload.relay_parent).or_default();
if waiting.collation_fetch_active {
waiting.waiting.push_back(incoming);
} else {
waiting.collation_fetch_active = true;
send_collation(state, incoming, receipt, pov).await;
}
} else {
tracing::warn!(
target: LOG_TARGET,
......@@ -640,12 +692,28 @@ async fn send_collation(
receipt: CandidateReceipt,
pov: PoV,
) {
if let Err(_) = request.send_response(CollationFetchingResponse::Collation(receipt, pov)) {
let (tx, rx) = oneshot::channel();
let relay_parent = request.payload.relay_parent;
let response = OutgoingResponse {
result: Ok(CollationFetchingResponse::Collation(receipt, pov)),
reputation_changes: Vec::new(),
sent_feedback: Some(tx),
};
if let Err(_) = request.send_outgoing_response(response) {
tracing::warn!(
target: LOG_TARGET,
"Sending collation response failed",
);
}
state.active_collation_fetches.push(async move {
let _ = rx.await;
relay_parent
}.boxed());
state.metrics.on_collation_sent();
}
......@@ -840,6 +908,7 @@ async fn handle_our_view_change(
}
state.our_validators_groups.remove(removed);
state.span_per_relay_parent.remove(removed);
state.waiting_collation_fetches.remove(removed);
}
state.view = view;
......@@ -861,17 +930,38 @@ pub(crate) async fn run(
let mut runtime = RuntimeInfo::new(None);
loop {
let msg = ctx.recv().fuse().await.map_err(Fatal::SubsystemReceive)?;
match msg {
Communication { msg } => {
log_error(
process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
"Failed to process message"
)?;
select! {
msg = ctx.recv().fuse() => match msg.map_err(Fatal::SubsystemReceive)? {
Communication { msg } => {
log_error(
process_msg(&mut ctx, &mut runtime, &mut state, msg).await,
"Failed to process message"
)?;
},
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(..)) => {}
Signal(Conclude) => return Ok(()),
},
Signal(ActiveLeaves(_update)) => {}
Signal(BlockFinalized(..)) => {}
Signal(Conclude) => return Ok(()),
relay_parent = state.active_collation_fetches.select_next_some() => {
let next = if let Some(waiting) = state.waiting_collation_fetches.get_mut(&relay_parent) {
if let Some(next) = waiting.waiting.pop_front() {
next
} else {
waiting.collation_fetch_active = false;
continue
}
} else {
// No waiting collation fetches means we already removed the relay parent from our view.
continue
};
if let Some(collation) = state.collations.get(&relay_parent) {
let receipt = collation.receipt.clone();
let pov = collation.pov.clone();
send_collation(&mut state, next, receipt, pov).await;
}
}
}
}
}
......@@ -20,6 +20,7 @@ use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use futures_timer::Delay;
use sp_core::{crypto::Pair, Decode};
use sp_keyring::Sr25519Keyring;
......@@ -31,7 +32,10 @@ use polkadot_node_network_protocol::{
request_response::request::IncomingRequest,
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex, SessionInfo, ValidatorId, ValidatorIndex};
use polkadot_primitives::v1::{
AuthorityDiscoveryId, CandidateDescriptor, CollatorPair, GroupRotationInfo, ScheduledCore, SessionIndex,
SessionInfo, ValidatorId, ValidatorIndex,
};
use polkadot_node_primitives::BlockData;
use polkadot_subsystem::{
jaeger,
......@@ -196,6 +200,18 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
collator_pair: CollatorPair,
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
.is_test(true)
.filter(
Some("polkadot_collator_protocol"),
log::LevelFilter::Trace,
)
.filter(
Some(LOG_TARGET),
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
......@@ -580,10 +596,7 @@ fn advertise_and_send_collation() {
)
).await;
// Re-requesting collation should fail:
assert_matches!(
rx.await,
Err(_) => {}
);
rx.await.unwrap_err();
assert!(overseer_recv_with_timeout(&mut virtual_overseer, TIMEOUT).await.is_none());
......@@ -605,6 +618,126 @@ fn advertise_and_send_collation() {
});
}
#[test]
fn send_only_one_collation_per_relay_parent_at_a_time() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
setup_system(&mut virtual_overseer, &test_state).await;
let DistributeCollation { candidate, pov_block } =
distribute_collation(&mut virtual_overseer, &test_state, true).await;
for (val, peer) in test_state.current_group_validator_authority_ids()
.into_iter()
.zip(test_state.current_group_validator_peer_ids())
{
connect_peer(&mut virtual_overseer, peer.clone(), Some(val.clone())).await;
}
// We declare to the connected validators that we are a collator.
// We need to catch all `Declare` messages to the validators we've
// previosly connected to.
for peer_id in test_state.current_group_validator_peer_ids() {
expect_declare_msg(&mut virtual_overseer, &test_state, &peer_id).await;
}
let validator_0 = test_state.current_group_validator_peer_ids()[0].clone();
let validator_1 = test_state.current_group_validator_peer_ids()[1].clone();
// Send info about peer's view.
send_peer_view_change(&mut virtual_overseer, &validator_0, vec![test_state.relay_parent]).await;
send_peer_view_change(&mut virtual_overseer, &validator_1, vec![test_state.relay_parent]).await;
// The peer is interested in a leaf that we have a collation for;
// advertise it.
expect_advertise_collation_msg(&mut virtual_overseer, &validator_0, test_state.relay_parent).await;
expect_advertise_collation_msg(&mut virtual_overseer, &validator_1, test_state.relay_parent).await;
// Request a collation.
let (tx, rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
validator_0,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
// Keep the feedback channel alive because we need to use it to inform about the finished transfer.
let feedback_tx = assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
full_response.sent_feedback.expect("Feedback channel is always set")
}
);
// Let the second validator request the collation.
let (tx, mut rx) = oneshot::channel();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::CollationFetchingRequest(
IncomingRequest::new(
validator_1,
CollationFetchingRequest {
relay_parent: test_state.relay_parent,
para_id: test_state.para_id,
},
tx,
)
)
).await;
Delay::new(Duration::from_millis(500)).await;
assert!(
rx.try_recv().unwrap().is_none(),
"We should not have send the collation yet to the second validator",
);
// Signal that the collation fetch is finished
feedback_tx.send(()).expect("Sending collation fetch finished");
// Now we should send it to the second validator
assert_matches!(
rx.await,
Ok(full_response) => {
let CollationFetchingResponse::Collation(receipt, pov): CollationFetchingResponse
= CollationFetchingResponse::decode(
&mut full_response.result
.expect("We should have a proper answer").as_ref()
)
.expect("Decoding should work");
assert_eq!(receipt, candidate);
assert_eq!(pov, pov_block);
full_response.sent_feedback.expect("Feedback channel is always set")
}
);
virtual_overseer
});
}
#[test]
fn collators_declare_to_connected_peers() {
let test_state = TestState::default();
......
......@@ -60,7 +60,7 @@ As seen in the [Scheduler Module][SCH] of the runtime, validator groups are fixe
* Determine the group on that core and the next group on that core.
* Issue a discovery request for the validators of the current group and the next group with[`NetworkBridgeMessage`][NBM]`::ConnectToValidators`.
Once connected to the relevant peers for the current group assigned to the core (transitively, the para), advertise the collation to any of them which advertise the relay-parent in their view (as provided by the [Network Bridge][NB]). If any respond with a request for the full collation, provide it. Upon receiving a view update from any of these peers which includes a relay-parent for which we have a collation that they will find relevant, advertise the collation to them if we haven't already.
Once connected to the relevant peers for the current group assigned to the core (transitively, the para), advertise the collation to any of them which advertise the relay-parent in their view (as provided by the [Network Bridge][NB]). If any respond with a request for the full collation, provide it. However, we only send one collation at a time per relay parent, other requests need to wait. This is done to reduce the bandwidth requirements of a collator and also increases the chance to fully send the collation to at least one validator. From the point where one validator has received the collation and seconded it, it will also start to share this collation with other validators in its backing group. Upon receiving a view update from any of these peers which includes a relay-parent for which we have a collation that they will find relevant, advertise the collation to them if we haven't already.
### Validators
......@@ -104,7 +104,7 @@ The protocol tracks advertisements received and the source of the advertisement.
As a validator, we will handle requests from other subsystems to fetch a collation on a specific `ParaId` and relay-parent. These requests are made with the request response protocol `CollationFetchingRequest` request. To do so, we need to first check if we have already gathered a collation on that `ParaId` and relay-parent. If not, we need to select one of the advertisements and issue a request for it. If we've already issued a request, we shouldn't issue another one until the first has returned.
When acting on an advertisement, we issue a `Requests::CollationFetching`. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators.
When acting on an advertisement, we issue a `Requests::CollationFetching`. However, we only request one collation at a time per relay parent. This reduces the bandwidth requirements and as we can second only one candidate per relay parent, the others are probably not required anyway. If the request times out, we need to note the collator as being unreliable and reduce its priority relative to other collators.
As a validator, once the collation has been fetched some other subsystem will inspect and do deeper validation of the collation. The subsystem will report to this subsystem with a [`CollatorProtocolMessage`][CPM]`::ReportCollator`. In that case, if we are connected directly to the collator, we apply a cost to the `PeerId` associated with the collator and potentially disconnect or blacklist it. If the collation is seconded, we notify the collator and apply a benefit to the `PeerId` associated with the collator.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment