Unverified Commit f560edf6 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Only fetch one collation at a time per relay parent (#3333)

* Only fetch one collation at a time per relay parent

Before a validator would fetch all collations that were advertised to
him. This pr changes the behavior to always just fetch one collation at
a time. If fetching fails, the validator will start fetching one of the
other collations.

* Use enum to be more explicit

* Review comments
parent 95df77e0
Pipeline #143584 canceled with stages
in 24 minutes and 7 seconds
......@@ -164,6 +164,7 @@ struct PerRequest {
span: Option<jaeger::Span>,
}
#[derive(Debug)]
struct CollatingPeerState {
collator_id: CollatorId,
para_id: ParaId,
......@@ -172,6 +173,7 @@ struct CollatingPeerState {
last_active: Instant,
}
#[derive(Debug)]
enum PeerState {
// The peer has connected at the given instant.
Connected(Instant),
......@@ -186,6 +188,7 @@ enum AdvertisementError {
UndeclaredCollator,
}
#[derive(Debug)]
struct PeerData {
view: View,
state: PeerState,
......@@ -465,8 +468,7 @@ struct PendingCollation {
impl PendingCollation {
fn new(relay_parent: Hash, para_id: &ParaId, peer_id: &PeerId) -> Self {
let commitments_hash = None;
Self { relay_parent, para_id: para_id.clone(), peer_id: peer_id.clone(), commitments_hash }
Self { relay_parent, para_id: para_id.clone(), peer_id: peer_id.clone(), commitments_hash: None }
}
}
......@@ -477,6 +479,32 @@ type PendingCollationFetch = (
std::result::Result<(CandidateReceipt, PoV), oneshot::Canceled>,
);
/// The status of the collations in [`CollationsPerRelayParent`].
#[derive(Debug)]
enum CollationStatus {
/// We are waiting for a collation to be advertised to us.
Waiting,
/// We are currently fetching a collation.
Fetching,
/// We have seconded a collation.
Seconded,
}
impl Default for CollationStatus {
fn default() -> Self {
Self::Waiting
}
}
/// Information about collations per relay parent.
#[derive(Default)]
struct CollationsPerRelayParent {
/// What is the current status in regards to a collation for this relay parent?
status: CollationStatus,
/// Collation that were advertised to us, but we did not yet fetch.
unfetched_collations: Vec<(PendingCollation, CollatorId)>,
}
/// All state relevant for the validator side of the protocol lives here.
#[derive(Default)]
struct State {
......@@ -503,7 +531,10 @@ struct State {
span_per_relay_parent: HashMap<Hash, PerLeafSpan>,
/// Keep track of all fetch collation requests
collations: FuturesUnordered<BoxFuture<'static, PendingCollationFetch>>,
collation_fetches: FuturesUnordered<BoxFuture<'static, PendingCollationFetch>>,
/// Information about the collations per relay parent.
collations_per_relay_parent: HashMap<Hash, CollationsPerRelayParent>,
/// Keep track of all pending candidate collations
pending_candidates: HashMap<Hash, CollationEvent>,
......@@ -528,19 +559,20 @@ async fn disconnect_peer(ctx: &mut impl SubsystemContext, peer_id: PeerId) {
}
/// Another subsystem has requested to fetch collations on a particular leaf for some para.
async fn fetch_collation<Context>(
ctx: &mut Context,
async fn fetch_collation(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
pc: PendingCollation,
tx: oneshot::Sender<(CandidateReceipt, PoV)>
)
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
id: CollatorId,
) {
let (tx, rx) = oneshot::channel();
let PendingCollation { relay_parent, para_id, peer_id, .. } = pc;
if state.peer_data.get(&peer_id).map_or(false, |d| d.has_advertised(&relay_parent)) {
request_collation(ctx, state, relay_parent, para_id, peer_id, tx).await;
}
state.collation_fetches.push(rx.map(|r| ((id, pc), r)).boxed());
}
/// Report a collator for some malicious actions.
......@@ -770,22 +802,26 @@ where
?relay_parent,
"Received advertise collation",
);
let (tx, rx) = oneshot::channel::<(
CandidateReceipt,
PoV,
)>();
let pending_collation = PendingCollation::new(
relay_parent,
&para_id,
&origin,
);
fetch_collation(ctx, state, pending_collation.clone(), tx).await;
let future = rx.map(|r|
((id, pending_collation), r)
);
state.collations.push(Box::pin(future));
let collations = state.collations_per_relay_parent.entry(relay_parent).or_default();
match collations.status {
CollationStatus::Fetching =>
collations.unfetched_collations.push((pending_collation, id)),
CollationStatus::Waiting => {
collations.status = CollationStatus::Fetching;
drop(collations);
fetch_collation(ctx, state, pending_collation.clone(), id).await;
},
CollationStatus::Seconded => {},
}
}
Err(error) => {
tracing::debug!(
......@@ -824,6 +860,8 @@ async fn remove_relay_parent(
state.pending_candidates.retain(|k, _| {
k != &relay_parent
});
state.collations_per_relay_parent.remove(&relay_parent);
Ok(())
}
......@@ -973,6 +1011,10 @@ where
let PendingCollation { relay_parent, peer_id, .. } = pending_collation;
note_good_collation(ctx, &state.peer_data, collator_id).await;
notify_collation_seconded(ctx, peer_id, relay_parent, stmt).await;
if let Some(collations) = state.collations_per_relay_parent.get_mut(&parent) {
collations.status = CollationStatus::Seconded;
}
} else {
tracing::debug!(
target: LOG_TARGET,
......@@ -982,12 +1024,11 @@ where
}
}
Invalid(parent, candidate_receipt) => {
if match state.pending_candidates.get(&parent) {
Some(collation_event)
if Some(candidate_receipt.commitments_hash) == collation_event.1.commitments_hash
=> true,
_ => false,
} {
if state.pending_candidates
.get(&parent)
.map(|e| e.1.commitments_hash == Some(candidate_receipt.commitments_hash))
.unwrap_or_default()
{
if let Some((id, _)) = state.pending_candidates.remove(&parent) {
report_collator(ctx, &state.peer_data, id).await;
}
......@@ -1022,7 +1063,6 @@ pub(crate) async fn run<Context>(
let mut state = State {
metrics,
..Default::default()
};
......@@ -1053,53 +1093,9 @@ pub(crate) async fn run<Context>(
_ = next_inactivity_stream.next() => {
disconnect_inactive_peers(&mut ctx, &eviction_policy, &state.peer_data).await;
}
res = state.collations.next() => {
// If no prior collation for this relay parent has been seconded, then
// memoize the collation_event for that relay_parent, such that we may
// notify the collator of their successful second backing
if let Some((relay_parent, collation_event)) = match res {
Some(
(mut collation_event, Ok((candidate_receipt, pov)))
) => {
let relay_parent = &collation_event.1.relay_parent;
// Verify whether this relay_parent has already been seconded
if state.pending_candidates.get(relay_parent).is_none() {
// Forward Candidate Receipt and PoV to candidate backing [CB]
collation_event.1
.commitments_hash = Some(candidate_receipt.commitments_hash);
ctx.send_message(
CandidateBackingMessage::Second(
relay_parent.clone(),
candidate_receipt,
pov,
).into()
).await;
Some((relay_parent.clone(), collation_event))
} else {
tracing::debug!(
target: LOG_TARGET,
relay_parent = ?relay_parent,
collator_id = ?collation_event.0,
"Collation for this relay parent has already been seconded.",
);
None
}
}
Some(
(collation_event, _)
) => {
let (id, pending_collation) = collation_event;
tracing::debug!(
target: LOG_TARGET,
relay_parent = ?pending_collation.relay_parent,
collator_id = ?id,
"Collation fetching has timed out.",
);
None
}
_ => None,
} {
state.pending_candidates.insert(relay_parent, collation_event);
res = state.collation_fetches.next() => {
if let Some(res) = res {
handle_collation_fetched_result(&mut ctx, &mut state, res).await;
}
}
}
......@@ -1119,6 +1115,68 @@ pub(crate) async fn run<Context>(
Ok(())
}
/// Handle a fetched collation result.
async fn handle_collation_fetched_result(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
(mut collation_event, res): PendingCollationFetch,
) {
// If no prior collation for this relay parent has been seconded, then
// memoize the collation_event for that relay_parent, such that we may
// notify the collator of their successful second backing
let relay_parent = collation_event.1.relay_parent;
let (candidate_receipt, pov) = match res {
Ok(res) => res,
Err(e) => {
tracing::debug!(
target: LOG_TARGET,
relay_parent = ?collation_event.1.relay_parent,
para_id = ?collation_event.1.para_id,
peer_id = ?collation_event.1.peer_id,
collator_id = ?collation_event.0,
error = ?e,
"Failed to fetch collation.",
);
let (next_try, id) = if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) {
if let Some(next_try) = collations.unfetched_collations.pop() {
next_try
} else if matches!(collations.status, CollationStatus::Fetching) {
collations.status = CollationStatus::Waiting;
return
} else {
tracing::error!(
target: LOG_TARGET,
status = ?collations.status,
"Expected status `CollationStatus::Fetching` but got unexpected status."
);
return
}
} else {
return
};
fetch_collation(ctx, state, next_try, id).await;
return
},
};
if let Entry::Vacant(entry) = state.pending_candidates.entry(relay_parent) {
collation_event.1.commitments_hash = Some(candidate_receipt.commitments_hash);
ctx.send_message(
CandidateBackingMessage::Second(
relay_parent.clone(),
candidate_receipt,
pov,
).into()
).await;
entry.insert(collation_event);
}
}
// This issues `NetworkBridge` notifications to disconnect from all inactive peers at the
// earliest possible point. This does not yet clean up any metadata, as that will be done upon
// receipt of the `PeerDisconnected` event.
......@@ -1145,7 +1203,7 @@ async fn poll_collation_response<Context>(
metrics: &Metrics,
spans: &HashMap<Hash, PerLeafSpan>,
pending_collation: &PendingCollation,
per_req: &mut PerRequest
per_req: &mut PerRequest,
)
-> bool
where
......
......@@ -44,7 +44,7 @@ pub use sc_network::config::RequestResponseConfig;
/// All requests that can be sent to the network bridge.
pub mod request;
pub use request::{IncomingRequest, OutgoingRequest, Requests, Recipient, OutgoingResult};
pub use request::{IncomingRequest, OutgoingRequest, Requests, Recipient, OutgoingResult, ResponseSender};
///// Multiplexer for incoming requests.
// pub mod multiplexer;
......
......@@ -29,6 +29,9 @@ use crate::UnifiedReputationChange;
use super::{v1, Protocol};
/// Used by the network to send us a response to a request.
pub type ResponseSender = oneshot::Sender<Result<Vec<u8>, network::RequestFailure>>;
/// Common properties of any `Request`.
pub trait IsRequest {
/// Each request has a corresponding `Response`.
......@@ -109,7 +112,7 @@ pub struct OutgoingRequest<Req> {
/// The actual request to send over the wire.
pub payload: Req,
/// Sender which is used by networking to get us back a response.
pub pending_response: oneshot::Sender<Result<Vec<u8>, network::RequestFailure>>,
pub pending_response: ResponseSender,
}
/// Any error that can occur when sending a request.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment