Unverified Commit a6dd54ea authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

collator-protocol/validator_side: a couple of fixes (#4179)

* collator-protocol/validator: do not wait 1s to poll requested collations

* collator-protocol/validator: do not request collation for the next group

* put everything into select

* fmt

* more hacks yay

* a test

* review nits

* remove outdated comment
parent fb730e41
Pipeline #164114 passed with stages
in 41 minutes and 5 seconds
......@@ -19,7 +19,7 @@ use futures::{
channel::oneshot,
future::{BoxFuture, Fuse, FusedFuture},
select,
stream::FuturesUnordered,
stream::{FusedStream, FuturesUnordered},
FutureExt, StreamExt,
};
use futures_timer::Delay;
......@@ -94,6 +94,11 @@ const ACTIVITY_POLL: Duration = Duration::from_secs(1);
#[cfg(test)]
const ACTIVITY_POLL: Duration = Duration::from_millis(10);
// How often to poll collation responses.
// This is a hack that should be removed in a refactoring.
// See https://github.com/paritytech/polkadot/issues/4182
const CHECK_COLLATIONS_POLL: Duration = Duration::from_millis(5);
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
......@@ -467,6 +472,10 @@ impl ActiveParas {
fn is_current_or_next(&self, id: ParaId) -> bool {
self.current_assignments.contains_key(&id) || self.next_assignments.contains_key(&id)
}
fn is_current(&self, id: &ParaId) -> bool {
self.current_assignments.contains_key(id)
}
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
......@@ -886,6 +895,20 @@ async fn process_incoming_peer_message<Context>(
Some(p) => p,
};
if let PeerState::Collating(ref collating_state) = peer_data.state {
let para_id = collating_state.para_id;
if !state.active_paras.is_current(&para_id) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
%para_id,
?relay_parent,
"Received advertise collation, but we are assigned to the next group",
);
return
}
}
match peer_data.insert_advertisement(relay_parent, &state.view) {
Ok((id, para_id)) => {
tracing::debug!(
......@@ -1136,6 +1159,13 @@ async fn wait_until_next_check(last_poll: Instant) -> Instant {
Instant::now()
}
fn infinite_stream(every: Duration) -> impl FusedStream<Item = ()> {
futures::stream::unfold(Instant::now() + every, |next_check| async move {
Some(((), wait_until_next_check(next_check).await))
})
.fuse()
}
/// The main run loop.
pub(crate) async fn run<Context>(
mut ctx: Context,
......@@ -1147,18 +1177,14 @@ where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
use OverseerSignal::*;
let mut state = State { metrics, ..Default::default() };
let next_inactivity_stream =
futures::stream::unfold(Instant::now() + ACTIVITY_POLL, |next_check| async move {
Some(((), wait_until_next_check(next_check).await))
})
.fuse();
let next_inactivity_stream = infinite_stream(ACTIVITY_POLL);
futures::pin_mut!(next_inactivity_stream);
let check_collations_stream = infinite_stream(CHECK_COLLATIONS_POLL);
futures::pin_mut!(check_collations_stream);
loop {
select! {
res = ctx.recv().fuse() => {
......@@ -1172,8 +1198,8 @@ where
&mut state,
).await;
}
Ok(FromOverseer::Signal(Conclude)) => break,
_ => {},
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) | Err(_) => break,
Ok(FromOverseer::Signal(_)) => continue,
}
}
_ = next_inactivity_stream.next() => {
......@@ -1191,26 +1217,45 @@ where
);
dequeue_next_collation_and_fetch(&mut ctx, &mut state, relay_parent, collator_id).await;
}
_ = check_collations_stream.next() => {
let reputation_changes = poll_requests(
&mut state.requested_collations,
&state.metrics,
&state.span_per_relay_parent,
).await;
for (peer_id, rep) in reputation_changes {
modify_reputation(&mut ctx, peer_id, rep).await;
}
},
}
}
let mut retained_requested = HashSet::new();
for (pending_collation, per_req) in state.requested_collations.iter_mut() {
// Despite the await, this won't block on the response itself.
let finished = poll_collation_response(
&mut ctx,
&state.metrics,
&state.span_per_relay_parent,
pending_collation,
per_req,
)
.await;
if !finished {
retained_requested.insert(pending_collation.clone());
}
Ok(())
}
async fn poll_requests(
requested_collations: &mut HashMap<PendingCollation, PerRequest>,
metrics: &Metrics,
span_per_relay_parent: &HashMap<Hash, PerLeafSpan>,
) -> Vec<(PeerId, Rep)> {
let mut retained_requested = HashSet::new();
let mut reputation_changes = Vec::new();
for (pending_collation, per_req) in requested_collations.iter_mut() {
// Despite the await, this won't block on the response itself.
let result =
poll_collation_response(metrics, span_per_relay_parent, pending_collation, per_req)
.await;
if !result.is_ready() {
retained_requested.insert(pending_collation.clone());
}
if let CollationFetchResult::Error(rep) = result {
reputation_changes.push((pending_collation.peer_id.clone(), rep));
}
state.requested_collations.retain(|k, _| retained_requested.contains(k));
}
Ok(())
requested_collations.retain(|k, _| retained_requested.contains(k));
reputation_changes
}
/// Dequeue another collation and fetch.
......@@ -1314,29 +1359,38 @@ async fn disconnect_inactive_peers<Context>(
}
}
enum CollationFetchResult {
/// The collation is still being fetched.
Pending,
/// The collation was fetched successfully.
Success,
/// An error occurred when fetching a collation or it was invalid.
/// A reputation change should be applied to the peer.
Error(Rep),
}
impl CollationFetchResult {
fn is_ready(&self) -> bool {
!matches!(self, Self::Pending)
}
}
/// Poll collation response, return immediately if there is none.
///
/// Ready responses are handled, by logging and decreasing peer's reputation on error and by
/// Ready responses are handled, by logging and by
/// forwarding proper responses to the requester.
///
/// Returns: `true` if `from_collator` future was ready.
async fn poll_collation_response<Context>(
ctx: &mut Context,
async fn poll_collation_response(
metrics: &Metrics,
spans: &HashMap<Hash, PerLeafSpan>,
pending_collation: &PendingCollation,
per_req: &mut PerRequest,
) -> bool
where
Context: overseer::SubsystemContext<Message = CollatorProtocolMessage>,
Context: SubsystemContext,
{
) -> CollationFetchResult {
if never!(per_req.from_collator.is_terminated()) {
tracing::error!(
target: LOG_TARGET,
"We remove pending responses once received, this should not happen."
);
return true
return CollationFetchResult::Success
}
if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) {
......@@ -1348,7 +1402,7 @@ where
let mut metrics_result = Err(());
let mut success = "false";
match response {
let result = match response {
Err(RequestError::InvalidResponse(err)) => {
tracing::warn!(
target: LOG_TARGET,
......@@ -1358,8 +1412,7 @@ where
err = ?err,
"Collator provided response that could not be decoded"
);
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_CORRUPTED_MESSAGE)
.await;
CollationFetchResult::Error(COST_CORRUPTED_MESSAGE)
},
Err(RequestError::NetworkError(err)) => {
tracing::debug!(
......@@ -1374,7 +1427,7 @@ where
// sensible. In theory this could be exploited, by DoSing this node,
// which would result in reduced reputation for proper nodes, but the
// same can happen for penalties on timeouts, which we also have.
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_NETWORK_ERROR).await;
CollationFetchResult::Error(COST_NETWORK_ERROR)
},
Err(RequestError::Canceled(_)) => {
tracing::debug!(
......@@ -1388,8 +1441,7 @@ where
// sensible. In theory this could be exploited, by DoSing this node,
// which would result in reduced reputation for proper nodes, but the
// same can happen for penalties on timeouts, which we also have.
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_REQUEST_TIMED_OUT)
.await;
CollationFetchResult::Error(COST_REQUEST_TIMED_OUT)
},
Ok(CollationFetchingResponse::Collation(receipt, _))
if receipt.descriptor().para_id != pending_collation.para_id =>
......@@ -1402,7 +1454,7 @@ where
"Got wrong para ID for requested collation."
);
modify_reputation(ctx, pending_collation.peer_id.clone(), COST_WRONG_PARA).await;
CollationFetchResult::Error(COST_WRONG_PARA)
}
Ok(CollationFetchingResponse::Collation(receipt, pov)) => {
tracing::debug!(
......@@ -1430,12 +1482,15 @@ where
metrics_result = Ok(());
success = "true";
}
CollationFetchResult::Success
},
};
metrics.on_request(metrics_result);
per_req.span.as_mut().map(|s| s.add_string_tag("success", success));
true
result
} else {
false
CollationFetchResult::Pending
}
}
......@@ -674,6 +674,46 @@ fn fetch_collations_works() {
});
}
#[test]
fn dont_fetch_collation_if_assigned_to_next_group() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::OurViewChange(
our_view![test_state.relay_parent],
)),
)
.await;
respond_to_core_info_queries(&mut virtual_overseer, &test_state).await;
let peer_b = PeerId::random();
connect_and_declare_collator(
&mut virtual_overseer,
peer_b.clone(),
test_state.collators[0].clone(),
test_state.chain_ids[1].clone(), // next, not current para_id
)
.await;
advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await;
assert!(
overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30))
.await
.is_none(),
"There should be no PoV fetching request.",
);
virtual_overseer
})
}
// Ensure that we fetch a second collation, after the first checked collation was found to be invalid.
#[test]
fn fetch_next_collation_on_invalid_collation() {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment