Unverified Commit b8905d13 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Collator protocol followup (#1741)



* Metrics

* Dont punish late collations

* Fix metrics

* Update node/network/collator-protocol/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Change on_request arg to Result

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent c80f7b6b
Pipeline #108767 passed with stages
in 22 minutes and 54 seconds
......@@ -29,6 +29,7 @@ use polkadot_subsystem::{
AllMessages, CollatorProtocolMessage, RuntimeApiMessage, RuntimeApiRequest,
NetworkBridgeMessage,
},
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, PeerSet, NetworkBridgeEvent, RequestId,
......@@ -38,6 +39,54 @@ use polkadot_node_subsystem_util::{
request_validator_groups_ctx,
};
#[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_advertisment_made(&self) {
if let Some(metrics) = &self.0 {
metrics.advertisments_made.inc();
}
}
fn on_collation_sent(&self) {
if let Some(metrics) = &self.0 {
metrics.collations_sent.inc();
}
}
}
#[derive(Clone)]
struct MetricsInner {
advertisments_made: prometheus::Counter<prometheus::U64>,
collations_sent: prometheus::Counter<prometheus::U64>,
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
let metrics = MetricsInner {
advertisments_made: prometheus::register(
prometheus::Counter::new(
"parachain_advertisments_made_total",
"A number of advertisments sent to validators.",
)?,
registry,
)?,
collations_sent: prometheus::register(
prometheus::Counter::new(
"parachain_collations_sent_total",
"A number of collations sent to validators.",
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[derive(Default)]
struct State {
/// Our id.
......@@ -69,6 +118,9 @@ struct State {
/// Entries in this map will be cleared as validator groups in `our_validator_groups`
/// go out of scope with their respective deactivated leafs.
known_validators: HashMap<PeerId, ValidatorId>,
/// Metrics.
metrics: Metrics,
}
/// Distribute a collation.
......@@ -287,6 +339,8 @@ where
)
)).await?;
state.metrics.on_advertisment_made();
Ok(())
}
......@@ -367,6 +421,7 @@ where
/// Issue a response to a previously requested collation.
async fn send_collation<Context>(
ctx: &mut Context,
state: &mut State,
request_id: RequestId,
origin: PeerId,
receipt: CandidateReceipt,
......@@ -388,6 +443,8 @@ where
)
)).await?;
state.metrics.on_collation_sent();
Ok(())
}
......@@ -421,7 +478,7 @@ where
Some(our_para_id) => {
if our_para_id == para_id {
if let Some(collation) = state.collations.get(&relay_parent).cloned() {
send_collation(ctx, request_id, origin, collation.0, collation.1).await?;
send_collation(ctx, state, request_id, origin, collation.0, collation.1).await?;
}
} else {
warn!(
......@@ -555,14 +612,21 @@ async fn handle_our_view_change(
}
/// The collator protocol collator side main loop.
pub(crate) async fn run<Context>(mut ctx: Context, our_id: CollatorId) -> Result<()>
pub(crate) async fn run<Context>(
mut ctx: Context,
our_id: CollatorId,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use FromOverseer::*;
use OverseerSignal::*;
let mut state = State::default();
let mut state = State {
metrics,
..Default::default()
};
state.our_id = our_id;
......@@ -597,7 +661,7 @@ mod tests {
};
use polkadot_subsystem::ActiveLeavesUpdate;
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers::{self as test_helpers};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::ObservedRole;
#[derive(Default)]
......@@ -719,7 +783,7 @@ mod tests {
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = run(context, collator_id);
let subsystem = run(context, collator_id, Metrics::default());
let test_fut = test(TestHarness { virtual_overseer });
......
......@@ -53,13 +53,15 @@ enum Error {
RuntimeApi(RuntimeApiError),
#[from]
UtilError(util::Error),
#[from]
Prometheus(prometheus::PrometheusError),
}
type Result<T> = std::result::Result<T, Error>;
enum ProtocolSide {
Validator,
Collator(CollatorId),
Validator(validator_side::Metrics),
Collator(CollatorId, collator_side::Metrics),
}
/// The collator protocol subsystem.
......@@ -71,10 +73,12 @@ impl CollatorProtocolSubsystem {
/// Start the collator protocol.
/// If `id` is `Some` this is a collator side of the protocol.
/// If `id` is `None` this is a validator side of the protocol.
pub fn new(id: Option<CollatorId>) -> Self {
/// Caller must provide a registry for prometheus metrics.
pub fn new(id: Option<CollatorId>, registry: Option<&prometheus::Registry>) -> Self {
use metrics::Metrics;
let protocol_side = match id {
Some(id) => ProtocolSide::Collator(id),
None => ProtocolSide::Validator,
Some(id) => ProtocolSide::Collator(id, collator_side::Metrics::register(registry)),
None => ProtocolSide::Validator(validator_side::Metrics::register(registry)),
};
Self {
......@@ -87,28 +91,26 @@ impl CollatorProtocolSubsystem {
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
match self.protocol_side {
ProtocolSide::Validator => validator_side::run(ctx, REQUEST_TIMEOUT).await,
ProtocolSide::Collator(id) => collator_side::run(ctx, id).await,
ProtocolSide::Validator(metrics) => validator_side::run(
ctx,
REQUEST_TIMEOUT,
metrics,
).await,
ProtocolSide::Collator(id, metrics) => collator_side::run(
ctx,
id,
metrics,
).await,
}
}
}
/// Collator protocol metrics.
#[derive(Default, Clone)]
pub struct Metrics;
impl metrics::Metrics for Metrics {
fn try_register(_registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError> {
Ok(Metrics)
}
}
impl<Context> Subsystem<Context> for CollatorProtocolSubsystem
where
Context: SubsystemContext<Message = CollatorProtocolMessage> + Sync + Send,
{
type Metrics = Metrics;
// The actual `Metrics` type depends on whether we're on the collator or validator side.
type Metrics = ();
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem {
......
......@@ -34,6 +34,7 @@ use polkadot_subsystem::{
messages::{
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
},
metrics::{self, prometheus},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, PeerId, ReputationChange as Rep, RequestId,
......@@ -48,6 +49,46 @@ const COST_REQUEST_TIMED_OUT: Rep = Rep::new(-20, "A collation request has timed
const COST_REPORT_BAD: Rep = Rep::new(-50, "A collator was reported by another subsystem");
const BENEFIT_NOTIFY_GOOD: Rep = Rep::new(50, "A collator was noted good by another subsystem");
#[derive(Clone, Default)]
pub(super) struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_request(&self, succeeded: std::result::Result<(), ()>) {
if let Some(metrics) = &self.0 {
match succeeded {
Ok(()) => metrics.collation_requests.with_label_values(&["succeeded"]).inc(),
Err(()) => metrics.collation_requests.with_label_values(&["failed"]).inc(),
}
}
}
}
#[derive(Clone)]
struct MetricsInner {
collation_requests: prometheus::CounterVec<prometheus::U64>,
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError>
{
let metrics = MetricsInner {
collation_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"parachain_collation_requests_total",
"Number of collations requested from Collators.",
),
&["succeeded", "failed"],
)?,
registry,
)?
};
Ok(Metrics(Some(metrics)))
}
}
#[derive(Debug)]
enum CollationRequestResult {
Received(RequestId),
......@@ -134,6 +175,14 @@ struct State {
/// Possessed collations.
collations: HashMap<(Hash, ParaId), Vec<(CollatorId, CandidateReceipt, PoV)>>,
/// Leaves have recently moved out of scope.
/// These are looked into when we receive previously requested collations that we
/// are no longer interested in.
recently_removed_heads: HashSet<Hash>,
/// Metrics.
metrics: Metrics,
}
/// Another subsystem has requested to fetch collations on a particular leaf for some para.
......@@ -291,6 +340,7 @@ where
let _ = per_request.received.send(());
if let Some(collator_id) = state.known_collators.get(&origin) {
let _ = per_request.result.send((receipt.clone(), pov.clone()));
state.metrics.on_request(Ok(()));
state.collations
.entry((relay_parent, para_id))
......@@ -300,11 +350,11 @@ where
}
}
} else {
// TODO: https://github.com/paritytech/polkadot/issues/1694
// This is tricky. If our chain has moved on, we have already canceled
// the relevant request and removed it from the map; so and we are not expecting
// this reply although technically it is not a malicious behaviur.
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?;
// If this collation is not just a delayed one that we were expecting,
// but our view has moved on, in that case modify peer's reputation.
if !state.recently_removed_heads.contains(&relay_parent) {
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await?;
}
}
Ok(())
......@@ -481,7 +531,11 @@ async fn handle_our_view_change(
.cloned()
.collect::<Vec<_>>();
// Update the set of recently removed chain heads.
state.recently_removed_heads.clear();
for removed in removed.into_iter() {
state.recently_removed_heads.insert(removed.clone());
remove_relay_parent(state, removed).await?;
}
......@@ -497,6 +551,8 @@ async fn request_timed_out<Context>(
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
state.metrics.on_request(Err(()));
// We have to go backwards in the map, again.
if let Some(key) = find_val_in_map(&state.requested_collations, &id) {
if let Some(_) = state.requested_collations.remove(&key) {
......@@ -595,7 +651,11 @@ where
}
/// The main run loop.
pub(crate) async fn run<Context>(mut ctx: Context, request_timeout: Duration) -> Result<()>
pub(crate) async fn run<Context>(
mut ctx: Context,
request_timeout: Duration,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
......@@ -604,6 +664,7 @@ where
let mut state = State {
request_timeout,
metrics,
..Default::default()
};
......@@ -707,7 +768,7 @@ mod tests {
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = run(context, Duration::from_millis(50));
let subsystem = run(context, Duration::from_millis(50), Metrics::default());
let test_fut = test(TestHarness { virtual_overseer });
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment