Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll};
use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture}};
use always_assert::never;
use polkadot_primitives::v1::{
Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV,
};
use polkadot_subsystem::{
FromOverseer, OverseerSignal, SubsystemContext,
messages::{
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
NetworkBridgeEvent, IfDisconnected,
},
};
use polkadot_node_network_protocol::{
OurView, PeerId, UnifiedReputationChange as Rep, View,
request_response::{OutgoingRequest, Requests, request::{Recipient, RequestError}}, v1 as protocol_v1
use polkadot_node_network_protocol::request_response::v1::{CollationFetchingRequest, CollationFetchingResponse};
use polkadot_node_network_protocol::request_response as req_res;
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_primitives::{Statement, SignedFullStatement};
use super::{modify_reputation, LOG_TARGET, Result};
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
/// Message could not be decoded properly.
const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt");
/// Network errors that originated at the remote host should have same cost as timeout.
const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out");
const COST_REPORT_BAD: Rep = Rep::CostMajor("A collator was reported by another subsystem");
const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem");
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_request(&self, succeeded: std::result::Result<(), ()>) {
if let Some(metrics) = &self.0 {
match succeeded {
Ok(()) => metrics.collation_requests.with_label_values(&["succeeded"]).inc(),
Err(()) => metrics.collation_requests.with_label_values(&["failed"]).inc(),
}
}
}
/// Provide a timer for `process_msg` which observes on drop.
fn time_process_msg(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.process_msg.start_timer())
}
/// Provide a timer for `handle_collation_request_result` which observes on drop.
fn time_handle_collation_request_result(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.handle_collation_request_result.start_timer())
}
}
#[derive(Clone)]
struct MetricsInner {
collation_requests: prometheus::CounterVec<prometheus::U64>,
process_msg: prometheus::Histogram,
handle_collation_request_result: prometheus::Histogram,
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
let metrics = MetricsInner {
collation_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_collation_requests_total",
"Number of collations requested from Collators.",
),
&["success"],
)?,
process_msg: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_collator_protocol_validator_process_msg",
"Time spent within `collator_protocol_validator::process_msg`",
)
)?,
registry,
)?,
handle_collation_request_result: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_collator_protocol_validator_handle_collation_request_result",
"Time spent within `collator_protocol_validator::handle_collation_request_result`",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
/// Responses from collator.
from_collator: Fuse<BoxFuture<'static, req_res::OutgoingResult<CollationFetchingResponse>>>,
/// Sender to forward to initial requester.
to_requester: oneshot::Sender<(CandidateReceipt, PoV)>,
/// A jaeger span corresponding to the lifetime of the request.
span: Option<jaeger::Span>,
}
/// All state relevant for the validator side of the protocol lives here.
#[derive(Default)]
struct State {
/// Our own view.
/// Track all active collators and their views.
peer_views: HashMap<PeerId, View>,
/// Peers that have declared themselves as collators.
known_collators: HashMap<PeerId, CollatorId>,
/// Advertisements received from collators. We accept one advertisement
/// per collator per source per relay-parent.
advertisements: HashMap<PeerId, HashSet<(ParaId, Hash)>>,
/// The collations we have requested by relay parent and para id.
///
/// For each relay parent and para id we may be connected to a number
/// of collators each of those may have advertised a different collation.
/// So we group such cases here.
requested_collations: HashMap<(Hash, ParaId, PeerId), PerRequest>,
/// Metrics.
metrics: Metrics,
/// Span per relay parent.
span_per_relay_parent: HashMap<Hash, PerLeafSpan>,
}
/// Another subsystem has requested to fetch collations on a particular leaf for some para.
#[tracing::instrument(level = "trace", skip(ctx, state, tx), fields(subsystem = LOG_TARGET))]
async fn fetch_collation<Context>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
collator_id: CollatorId,
para_id: ParaId,
tx: oneshot::Sender<(CandidateReceipt, PoV)>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
let relevant_advertiser = state.advertisements.iter().find_map(|(k, v)| {
if v.contains(&(para_id, relay_parent)) && state.known_collators.get(k) == Some(&collator_id) {
Some(k.clone())
} else {
None
// Request the collation.
// Assume it is `request_collation`'s job to check and ignore duplicate requests.
if let Some(relevant_advertiser) = relevant_advertiser {
request_collation(ctx, state, relay_parent, para_id, relevant_advertiser, tx).await;
}
}
/// Report a collator for some malicious actions.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn report_collator<Context>(
ctx: &mut Context,
state: &mut State,
id: CollatorId,
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
// Since we have a one way map of PeerId -> CollatorId we have to
// iterate here. Since a huge amount of peers is not expected this
// is a tolerable thing to do.
for (k, _) in state.known_collators.iter().filter(|d| *d.1 == id) {
modify_reputation(ctx, k.clone(), COST_REPORT_BAD).await;
}
}
/// Some other subsystem has reported a collator as a good one, bump reputation.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn note_good_collation<Context>(
ctx: &mut Context,
state: &mut State,
id: CollatorId,
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
for (peer_id, _) in state.known_collators.iter().filter(|d| *d.1 == id) {
modify_reputation(ctx, peer_id.clone(), BENEFIT_NOTIFY_GOOD).await;
}
}
/// Notify a collator that its collation got seconded.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn notify_collation_seconded(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
state: &mut State,
id: CollatorId,
statement: SignedFullStatement,
) {
if !matches!(statement.payload(), Statement::Seconded(_)) {
tracing::error!(
target: LOG_TARGET,
statement = ?statement,
"Notify collation seconded called with a wrong statement.",
);
return;
}
let peer_ids = state.known_collators.iter()
.filter_map(|(p, c)| if *c == id { Some(p.clone()) } else { None })
.collect::<Vec<_>>();
if !peer_ids.is_empty() {
let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(statement);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
peer_ids,
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
}
}
/// A peer's view has changed. A number of things should be done:
/// - Ongoing collation requests have to be cancelled.
/// - Advertisements by this peer that are no longer relevant have to be removed.
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_peer_view_change(
state: &mut State,
peer_id: PeerId,
view: View,
) -> Result<()> {
let current = state.peer_views.entry(peer_id.clone()).or_default();
let removed: Vec<_> = current.difference(&view).cloned().collect();
*current = view;
if let Some(advertisements) = state.advertisements.get_mut(&peer_id) {
advertisements.retain(|(_, relay_parent)| !removed.contains(relay_parent));
}
for removed in removed.into_iter() {
state.requested_collations.retain(|k, _| k.0 != removed);
}
Ok(())
}
/// Request a collation from the network.
/// This function will
/// - Check for duplicate requests.
/// - Check if the requested collation is in our view.
/// - Update PerRequest records with the `result` field if necessary.
/// And as such invocations of this function may rely on that.
#[tracing::instrument(level = "trace", skip(ctx, state, result), fields(subsystem = LOG_TARGET))]
async fn request_collation<Context>(
ctx: &mut Context,
state: &mut State,
relay_parent: Hash,
para_id: ParaId,
peer_id: PeerId,
result: oneshot::Sender<(CandidateReceipt, PoV)>,
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
if !state.view.contains(&relay_parent) {
peer_id = %peer_id,
para_id = %para_id,
relay_parent = %relay_parent,
"collation is no longer in view",
}
if state.requested_collations.contains_key(&(relay_parent, para_id.clone(), peer_id.clone())) {
%para_id,
?relay_parent,
"collation has already been requested",
let (full_request, response_recv) =
OutgoingRequest::new(Recipient::Peer(peer_id), CollationFetchingRequest {
relay_parent,
para_id,
});
let requests = Requests::CollationFetching(full_request);
from_collator: response_recv.boxed().fuse(),
to_requester: result,
span: state.span_per_relay_parent.get(&relay_parent).map(|s| {
s.child("collation-request")
.with_para_id(para_id)
}),
state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), per_request);
tracing::debug!(
target: LOG_TARGET,
peer_id = %peer_id,
%para_id,
?relay_parent,
"Requesting collation",
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(vec![requests], IfDisconnected::ImmediateError))
).await;
}
/// Notify `CandidateSelectionSubsystem` that a collation has been advertised.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn notify_candidate_selection<Context>(
ctx: &mut Context,
collator: CollatorId,
relay_parent: Hash,
para_id: ParaId,
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
ctx.send_message(AllMessages::CandidateSelection(
CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator,
)
}
/// Networking message has been received.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn process_incoming_peer_message<Context>(
ctx: &mut Context,
state: &mut State,
origin: PeerId,
msg: protocol_v1::CollatorProtocolMessage,
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use protocol_v1::CollatorProtocolMessage::*;
match msg {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
"Declared as collator",
);
state.known_collators.insert(origin.clone(), id);
state.peer_views.entry(origin).or_default();
}
AdvertiseCollation(relay_parent, para_id) => {
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation"));
state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent));
if let Some(collator) = state.known_collators.get(&origin) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
%para_id,
?relay_parent,
"Received advertise collation",
);
notify_candidate_selection(ctx, collator.clone(), relay_parent, para_id).await;
} else {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
%para_id,
?relay_parent,
"Advertise collation received from an unknown collator",
CollationSeconded(_) => {
tracing::warn!(
target: LOG_TARGET,
peer_id = ?origin,
"Unexpected `CollationSeconded` message, decreasing reputation",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
}
}
}
/// A leaf has become inactive so we want to
/// - Cancel all ongoing collation requests that are on top of that leaf.
/// - Remove all stored collations relevant to that leaf.
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn remove_relay_parent(
state: &mut State,
relay_parent: Hash,
) -> Result<()> {
state.requested_collations.retain(|k, _| {
k.0 != relay_parent
});
Ok(())
}
/// Our view has changed.
#[tracing::instrument(level = "trace", skip(state), fields(subsystem = LOG_TARGET))]
async fn handle_our_view_change(
state: &mut State,
let old_view = std::mem::replace(&mut state.view, view);
let added: HashMap<Hash, Arc<jaeger::Span>> = state.view
.span_per_head()
.iter()
.filter(|v| !old_view.contains(&v.0))
.map(|v| (v.0.clone(), v.1.clone()))
.collect();
added.into_iter().for_each(|(h, s)| {
state.span_per_relay_parent.insert(h, PerLeafSpan::new(s, "validator-side"));
});
let removed = old_view
.difference(&state.view)
.cloned()
.collect::<Vec<_>>();
for removed in removed.into_iter() {
remove_relay_parent(state, removed).await?;
state.span_per_relay_parent.remove(&removed);
}
Ok(())
}
/// Bridge event switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_msg<Context>(
ctx: &mut Context,
state: &mut State,
bridge_message: NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(_id, _role) => {
// A peer has connected. Until it issues a `Declare` message we do not
// want to track it's view or take any other actions.
},
PeerDisconnected(peer_id) => {
state.known_collators.remove(&peer_id);
state.peer_views.remove(&peer_id);
},
PeerViewChange(peer_id, view) => {
handle_peer_view_change(state, peer_id, view).await?;
},
OurViewChange(view) => {
handle_our_view_change(state, view).await?;
},
PeerMessage(remote, msg) => {
process_incoming_peer_message(ctx, state, remote, msg).await;
}
}
Ok(())
}
/// The main message receiver switch.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn process_msg<Context>(
ctx: &mut Context,
msg: CollatorProtocolMessage,
state: &mut State,
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use CollatorProtocolMessage::*;
let _timer = state.metrics.time_process_msg();
para_id = %id,
"CollateOn message is not expected on the validator side of the protocol",
DistributeCollation(_, _, _) => {
"DistributeCollation message is not expected on the validator side of the protocol",
);
}
FetchCollation(relay_parent, collator_id, para_id, tx) => {
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("fetch-collation"));
fetch_collation(ctx, state, relay_parent, collator_id, para_id, tx).await;
report_collator(ctx, state, id).await;
note_good_collation(ctx, state, id).await;
NotifyCollationSeconded(id, statement) => {
notify_collation_seconded(ctx, state, id, statement).await;
}
NetworkBridgeUpdateV1(event) => {
if let Err(e) = handle_network_msg(
ctx,
state,
event,
).await {
err = ?e,
"Failed to handle incoming network message",
CollationFetchingRequest(_) => {
tracing::warn!(
target: LOG_TARGET,
"CollationFetchingRequest message is not expected on the validator side of the protocol",
);
}
}
}
/// The main run loop.
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
pub(crate) async fn run<Context>(
mut ctx: Context,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use FromOverseer::*;
use OverseerSignal::*;
let mut state = State {
..Default::default()
};
loop {
if let Poll::Ready(msg) = futures::poll!(ctx.recv()) {
let msg = msg?;
tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
Communication { msg } => process_msg(&mut ctx, msg, &mut state).await,
Signal(BlockFinalized(..)) => {}
Signal(ActiveLeaves(_)) => {}
Signal(Conclude) => { break }
}
continue;
}
let mut retained_requested = HashSet::new();
for ((hash, para_id, peer_id), per_req) in state.requested_collations.iter_mut() {
// Despite the await, this won't block:
let finished = poll_collation_response(
&mut ctx, &state.metrics, &state.span_per_relay_parent,
hash, para_id, peer_id, per_req
).await;
if !finished {
retained_requested.insert((*hash, *para_id, *peer_id));
state.requested_collations.retain(|k, _| retained_requested.contains(k));
futures::pending!();
}
Ok(())
}
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
/// Poll collation response, return immediately if there is none.
///
/// Ready responses are handled, by logging and decreasing peer's reputation on error and by
/// forwarding proper responses to the requester.
///
/// Returns: `true` if `from_collator` future was ready.
async fn poll_collation_response<Context>(
ctx: &mut Context,
metrics: &Metrics,
spans: &HashMap<Hash, PerLeafSpan>,
hash: &Hash,
para_id: &ParaId,
peer_id: &PeerId,
per_req: &mut PerRequest
)
-> bool
where
Context: SubsystemContext
{
if never!(per_req.from_collator.is_terminated()) {
tracing::error!(
target: LOG_TARGET,
"We remove pending responses once received, this should not happen."
);
return true
}
if let Poll::Ready(response) = futures::poll!(&mut per_req.from_collator) {
let _span = spans.get(&hash)
.map(|s| s.child("received-collation"));
let _timer = metrics.time_handle_collation_request_result();
let mut metrics_result = Err(());
let mut success = "false";
match response {
Err(RequestError::InvalidResponse(err)) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
err = ?err,
"Collator provided response that could not be decoded"
);
modify_reputation(ctx, *peer_id, COST_CORRUPTED_MESSAGE).await;
}
Err(RequestError::NetworkError(err)) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
err = ?err,
"Fetching collation failed due to network error"
);
// A minor decrease in reputation for any network failure seems
// sensbile. In theory this could be exploited, by DoSing this node,
// which would result in reduced reputation for proper nodes, but the
// same can happen for penalities on timeouts, which we also have.
modify_reputation(ctx, *peer_id, COST_NETWORK_ERROR).await;
}
Err(RequestError::Canceled(_)) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
"Request timed out"
);
// A minor decrease in reputation for any network failure seems
// sensbile. In theory this could be exploited, by DoSing this node,
// which would result in reduced reputation for proper nodes, but the
// same can happen for penalities on timeouts, which we also have.
modify_reputation(ctx, *peer_id, COST_REQUEST_TIMED_OUT).await;
}
Ok(CollationFetchingResponse::Collation(receipt, compressed_pov)) => {
match compressed_pov.decompress() {
Ok(pov) => {
tracing::debug!(
target: LOG_TARGET,
hash = ?hash,
candidate_hash = ?receipt.hash(),
"Received collation",
);
// Actual sending:
let _span = jaeger::Span::new(&pov, "received-collation");
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
let (mut tx, _) = oneshot::channel();
std::mem::swap(&mut tx, &mut (per_req.to_requester));
let result = tx.send((receipt, pov));
if let Err(_) = result {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
"Sending response back to requester failed (receiving side closed)"
);
} else {
metrics_result = Ok(());
success = "true";
}
}
Err(error) => {
tracing::warn!(
target: LOG_TARGET,
hash = ?hash,
para_id = ?para_id,
peer_id = ?peer_id,
?error,
"Failed to extract PoV",
);
modify_reputation(ctx, *peer_id, COST_CORRUPTED_MESSAGE).await;
}
};
}
};
metrics.on_request(metrics_result);
per_req.span.as_mut().map(|s| s.add_string_tag("success", success));
true
} else {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{iter, time::Duration};
use futures::{executor, future, Future};
use polkadot_node_subsystem_util::TimeoutExt;
use sp_core::{crypto::Pair, Encode};
use assert_matches::assert_matches;
use polkadot_primitives::v1::{BlockData, CollatorPair, CompressedPoV};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::{our_view,
request_response::Requests
};
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
#[derive(Clone)]
struct TestState {
chain_ids: Vec<ParaId>,
relay_parent: Hash,
collators: Vec<CollatorPair>,
}
impl Default for TestState {
fn default() -> Self {
let chain_a = ParaId::from(1);
let chain_b = ParaId::from(2);
let chain_ids = vec![chain_a, chain_b];
let relay_parent = Hash::repeat_byte(0x05);
let collators = iter::repeat(())
.map(|_| CollatorPair::generate().0)
.take(4)
.collect();
Self {
chain_ids,
relay_parent,
collators,
}
}
}
struct TestHarness {
virtual_overseer: test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
}
fn test_harness<T: Future<Output = ()>>(test: impl FnOnce(TestHarness) -> T) {
let _ = env_logger::builder()
.is_test(true)
.filter(
Some("polkadot_collator_protocol"),
log::LevelFilter::Trace,
)
.filter(
log::LevelFilter::Trace,
)
.try_init();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = run(context, Metrics::default());
let test_fut = test(TestHarness { virtual_overseer });
futures::pin_mut!(test_fut);
futures::pin_mut!(subsystem);
executor::block_on(future::select(test_fut, subsystem));
}
const TIMEOUT: Duration = Duration::from_millis(100);
async fn overseer_send(
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
msg: CollatorProtocolMessage,
) {
tracing::trace!("Sending message:\n{:?}", &msg);
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
.await
.expect(&format!("{:?} is enough for sending messages.", TIMEOUT));
}
async fn overseer_recv(
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
) -> AllMessages {
let msg = overseer_recv_with_timeout(overseer, TIMEOUT)
.await
.expect(&format!("{:?} is enough to receive messages.", TIMEOUT));
tracing::trace!("Received message:\n{:?}", &msg);
msg
}
async fn overseer_recv_with_timeout(
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
timeout: Duration,
) -> Option<AllMessages> {
tracing::trace!("Waiting for message...");
overseer
.recv()
.timeout(timeout)
.await
}
// As we receive a relevant advertisement act on it and issue a collation request.
let test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
} = test_harness;
let pair = CollatorPair::generate().0;
tracing::trace!("activating");
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent])
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
)
).await;
let peer_b = PeerId::random();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(pair.public()),
)
)
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
test_state.relay_parent,
test_state.chain_ids[0],
)
)
)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateSelection(CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator,
)
) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(para_id, test_state.chain_ids[0]);
assert_eq!(collator, pair.public());
});
});
}
// Test that other subsystems may modify collators' reputations.
#[test]
fn collator_reporting_works() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
} = test_harness;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent])
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
)
).await;
let peer_b = PeerId::random();
let peer_c = PeerId::random();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
test_state.collators[0].public(),
),
)
)
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_c.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
test_state.collators[1].public(),
),
)
)
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::ReportCollator(test_state.collators[0].public()),
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_REPORT_BAD);
}
);
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NoteGoodCollation(test_state.collators[1].public()),
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
) => {
assert_eq!(peer, peer_c);
assert_eq!(rep, BENEFIT_NOTIFY_GOOD);
}
);
});
}
// A test scenario that takes the following steps