Unverified Commit 17344327 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Add one Jaeger span per relay parent (#2196)



* Add one Jaeger span per relay parent

This adds one Jaeger span per relay parent, instead of always creating
new spans per relay parent. This should improve the UI view, because
subsystems are now grouped below one common span.

* Fix doc tests

* Replace `PerLeaveSpan` to `PerLeafSpan`

* More renaming

* Moare

* Update node/subsystem/src/lib.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Skip the spans

* Increase `spec_version`

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
parent 6b688dd5
Pipeline #118710 passed with stages
in 35 minutes and 8 seconds
......@@ -4969,7 +4969,6 @@ dependencies = [
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"smallvec 1.5.1",
"sp-core",
"sp-keyring",
"thiserror",
......@@ -5057,7 +5056,6 @@ dependencies = [
"polkadot-overseer",
"polkadot-primitives",
"sc-service",
"smallvec 1.5.1",
"sp-core",
"thiserror",
"tracing",
......@@ -5208,11 +5206,27 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "polkadot-node-jaeger"
version = "0.1.0"
dependencies = [
"async-std",
"lazy_static",
"log",
"mick-jaeger",
"parking_lot 0.11.1",
"polkadot-primitives",
"sc-network",
"sp-core",
"thiserror",
]
[[package]]
name = "polkadot-node-network-protocol"
version = "0.1.0"
dependencies = [
"parity-scale-codec",
"polkadot-node-jaeger",
"polkadot-node-primitives",
"polkadot-primitives",
"sc-network",
......@@ -5247,6 +5261,7 @@ dependencies = [
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.2",
"polkadot-node-jaeger",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
"polkadot-node-subsystem-test-helpers",
......@@ -5297,6 +5312,7 @@ dependencies = [
"parity-scale-codec",
"parking_lot 0.11.1",
"pin-project 1.0.2",
"polkadot-node-jaeger",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
......@@ -5368,7 +5384,6 @@ dependencies = [
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"smallvec 1.5.1",
"sp-core",
"sp-keyring",
"thiserror",
......
......@@ -65,6 +65,7 @@ members = [
"node/subsystem",
"node/subsystem-test-helpers",
"node/subsystem-util",
"node/jaeger",
"node/test/client",
"node/test/service",
"parachain/test-parachains",
......
......@@ -126,12 +126,17 @@ impl CollationGenerationSubsystem {
// follow the procedure from the guide
if let Some(config) = &self.config {
let metrics = self.metrics.clone();
if let Err(err) =
handle_new_activations(config.clone(), &activated, ctx, metrics, sender).await
{
if let Err(err) = handle_new_activations(
config.clone(),
activated.into_iter().map(|v| v.0),
ctx,
metrics,
sender,
).await {
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to handle new activations");
};
}
}
false
}
Ok(Signal(Conclude)) => true,
......@@ -164,10 +169,10 @@ where
Context: SubsystemContext<Message = CollationGenerationMessage>,
{
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = Box::pin(async move {
let future = async move {
self.run(ctx).await;
Ok(())
});
}.boxed();
SpawnedSubsystem {
name: "collation-generation-subsystem",
......@@ -176,10 +181,10 @@ where
}
}
#[tracing::instrument(level = "trace", skip(ctx, metrics, sender), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(ctx, metrics, sender, activated), fields(subsystem = LOG_TARGET))]
async fn handle_new_activations<Context: SubsystemContext>(
config: Arc<CollationGenerationConfig>,
activated: &[Hash],
activated: impl IntoIterator<Item = Hash>,
ctx: &mut Context,
metrics: Metrics,
sender: &mpsc::Sender<AllMessages>,
......@@ -189,11 +194,9 @@ async fn handle_new_activations<Context: SubsystemContext>(
let _overall_timer = metrics.time_new_activations();
for relay_parent in activated.iter().copied() {
for relay_parent in activated {
let _relay_parent_timer = metrics.time_new_activations_relay_parent();
// double-future magic happens here: the first layer of requests takes a mutable borrow of the context, and
// returns a receiver. The second layer of requests actually polls those receivers to completion.
let (availability_cores, validators) = join!(
request_availability_cores_ctx(relay_parent, ctx).await?,
request_validators_ctx(relay_parent, ctx).await?,
......@@ -544,7 +547,7 @@ mod tests {
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(
test_config(123u32),
&subsystem_activated_hashes,
subsystem_activated_hashes,
&mut ctx,
Metrics(None),
&tx,
......@@ -623,7 +626,7 @@ mod tests {
let (tx, _rx) = mpsc::channel(0);
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(test_config(16), &activated_hashes, &mut ctx, Metrics(None), &tx)
handle_new_activations(test_config(16), activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
});
......@@ -700,7 +703,7 @@ mod tests {
let sent_messages = Arc::new(Mutex::new(Vec::new()));
let subsystem_sent_messages = sent_messages.clone();
subsystem_test_harness(overseer, |mut ctx| async move {
handle_new_activations(subsystem_config, &activated_hashes, &mut ctx, Metrics(None), &tx)
handle_new_activations(subsystem_config, activated_hashes, &mut ctx, Metrics(None), &tx)
.await
.unwrap();
......
......@@ -26,7 +26,6 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master
log = "0.4.11"
env_logger = "0.8.2"
assert_matches = "1.4.0"
smallvec = "1.5.1"
kvdb-memorydb = "0.7.0"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
......
......@@ -534,7 +534,7 @@ where
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate { activated, .. })
) => {
for activated in activated.into_iter() {
for (activated, _span) in activated.into_iter() {
process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?;
}
}
......
......@@ -23,7 +23,6 @@ use futures::{
executor,
Future,
};
use smallvec::smallvec;
use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateDescriptor, CandidateReceipt, HeadData,
......@@ -31,7 +30,7 @@ use polkadot_primitives::v1::{
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem::{
ActiveLeavesUpdate, errors::RuntimeApiError,
ActiveLeavesUpdate, errors::RuntimeApiError, JaegerSpan,
};
use polkadot_node_subsystem_test_helpers as test_helpers;
......@@ -182,8 +181,8 @@ fn runtime_api_error_does_not_stop_the_subsystem() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}),
).await;
......@@ -516,8 +515,8 @@ fn stored_data_kept_until_finalized() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}),
).await;
......@@ -620,8 +619,8 @@ fn stored_chunk_kept_until_finalized() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf.clone()],
deactivated: smallvec![],
activated: vec![(new_leaf, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}),
).await;
......@@ -758,8 +757,8 @@ fn forkfullness_works() {
overseer_signal(
&mut virtual_overseer,
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: smallvec![new_leaf_1.clone(), new_leaf_2.clone()],
deactivated: smallvec![],
activated: vec![(new_leaf_1, Arc::new(JaegerSpan::Disabled)), (new_leaf_2, Arc::new(JaegerSpan::Disabled))].into(),
deactivated: vec![].into(),
}),
).await;
......
......@@ -37,7 +37,7 @@ use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport, ValidationResult,
};
use polkadot_subsystem::{
jaeger::{self, JaegerSpan},
JaegerSpan, PerLeafSpan,
messages::{
AllMessages, AvailabilityStoreMessage, CandidateBackingMessage, CandidateSelectionMessage,
CandidateValidationMessage, PoVDistributionMessage, ProvisionableData,
......@@ -923,9 +923,10 @@ impl util::JobTrait for CandidateBackingJob {
const NAME: &'static str = "CandidateBackingJob";
#[tracing::instrument(skip(keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(span, keystore, metrics, rx_to, tx_from), fields(subsystem = LOG_TARGET))]
fn run(
parent: Hash,
span: Arc<JaegerSpan>,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
rx_to: mpsc::Receiver<Self::ToJob>,
......@@ -952,7 +953,7 @@ impl util::JobTrait for CandidateBackingJob {
}
}
let span = jaeger::hash_span(&parent, "run:backing");
let span = PerLeafSpan::new(span, "backing");
let _span = span.child("runtime-apis");
let (validators, groups, session_index, cores) = futures::try_join!(
......@@ -1340,7 +1341,10 @@ mod tests {
) {
// Start work on some new parent.
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(test_state.relay_parent)))
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work(
test_state.relay_parent,
Arc::new(JaegerSpan::Disabled),
)))
).await;
// Check that subsystem job issues a request for a validator set.
......
......@@ -23,7 +23,7 @@
use futures::{channel::{mpsc, oneshot}, lock::Mutex, prelude::*, future, Future};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use polkadot_node_subsystem::{
jaeger,
jaeger, PerLeafSpan, JaegerSpan,
messages::{
AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
BitfieldSigningMessage, RuntimeApiMessage, RuntimeApiRequest,
......@@ -34,7 +34,7 @@ use polkadot_node_subsystem_util::{
self as util, JobManager, JobTrait, Validator, FromJobCommand, metrics::{self, prometheus},
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use std::{pin::Pin, time::Duration, iter::FromIterator};
use std::{pin::Pin, time::Duration, iter::FromIterator, sync::Arc};
use wasm_timer::{Delay, Instant};
/// Delay between starting a bitfield signing job and its attempting to create a bitfield.
......@@ -215,9 +215,10 @@ impl JobTrait for BitfieldSigningJob {
const NAME: &'static str = "BitfieldSigningJob";
/// Run a job for the parent block indicated
#[tracing::instrument(skip(keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(span, keystore, metrics, _receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
relay_parent: Hash,
span: Arc<JaegerSpan>,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
......@@ -225,7 +226,7 @@ impl JobTrait for BitfieldSigningJob {
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
let span = jaeger::hash_span(&relay_parent, "run:bitfield-signing");
let span = PerLeafSpan::new(span, "bitfield-signing");
let _span = span.child("delay");
let wait_until = Instant::now() + JOB_DELAY;
......
......@@ -25,7 +25,7 @@ use futures::{
};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem::{
jaeger,
jaeger, JaegerSpan, PerLeafSpan,
errors::ChainApiError,
messages::{
AllMessages, CandidateBackingMessage, CandidateSelectionMessage, CollatorProtocolMessage,
......@@ -39,7 +39,7 @@ use polkadot_node_subsystem_util::{
use polkadot_primitives::v1::{
CandidateReceipt, CollatorId, CoreState, CoreIndex, Hash, Id as ParaId, PoV,
};
use std::pin::Pin;
use std::{pin::Pin, sync::Arc};
use thiserror::Error;
const LOG_TARGET: &'static str = "candidate_selection";
......@@ -95,12 +95,13 @@ impl JobTrait for CandidateSelectionJob {
#[tracing::instrument(skip(keystore, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
relay_parent: Hash,
span: Arc<JaegerSpan>,
keystore: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<CandidateSelectionMessage>,
mut sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let span = jaeger::hash_span(&relay_parent, "candidate-selection:run");
let span = PerLeafSpan::new(span, "candidate-selection");
async move {
let _span = span.child("query-runtime");
let (groups, cores) = futures::try_join!(
......
......@@ -25,8 +25,7 @@ use futures::{
prelude::*,
};
use polkadot_node_subsystem::{
errors::{ChainApiError, RuntimeApiError},
jaeger,
errors::{ChainApiError, RuntimeApiError}, PerLeafSpan, JaegerSpan,
messages::{
AllMessages, CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData,
ProvisionerMessage,
......@@ -40,7 +39,7 @@ use polkadot_primitives::v1::{
BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption,
SignedAvailabilityBitfield, ValidatorIndex,
};
use std::{pin::Pin, collections::BTreeMap};
use std::{pin::Pin, collections::BTreeMap, sync::Arc};
use thiserror::Error;
use futures_timer::Delay;
......@@ -140,9 +139,10 @@ impl JobTrait for ProvisioningJob {
/// Run a job for the parent block indicated
//
// this function is in charge of creating and executing the job's main loop
#[tracing::instrument(skip(_run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(span, _run_args, metrics, receiver, sender), fields(subsystem = LOG_TARGET))]
fn run(
relay_parent: Hash,
span: Arc<JaegerSpan>,
_run_args: Self::RunArgs,
metrics: Self::Metrics,
receiver: mpsc::Receiver<ProvisionerMessage>,
......@@ -156,11 +156,7 @@ impl JobTrait for ProvisioningJob {
receiver,
);
let span = jaeger::hash_span(&relay_parent, "provisioner");
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
job.run_loop(&span).await
job.run_loop(PerLeafSpan::new(span, "provisioner")).await
}
.boxed()
}
......@@ -186,7 +182,7 @@ impl ProvisioningJob {
}
}
async fn run_loop(mut self, span: &jaeger::JaegerSpan) -> Result<(), Error> {
async fn run_loop(mut self, span: PerLeafSpan) -> Result<(), Error> {
use ProvisionerMessage::{
ProvisionableData, RequestBlockAuthorshipData, RequestInherentData,
};
......
[package]
name = "polkadot-node-jaeger"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Polkadot Jaeger primitives"
[dependencies]
async-std = "1.8.0"
mick-jaeger = "0.1.2"
lazy_static = "1.4"
parking_lot = "0.11.1"
polkadot-primitives = { path = "../../primitives" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.23"
log = "0.4.11"
......@@ -14,7 +14,11 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Jaeger integration.
//! Polkadot Jaeger related primitives
//!
//! Provides primitives used by Polkadot for interfacing with Jaeger.
//!
//! # Integration
//!
//! See <https://www.jaegertracing.io/> for an introduction.
//!
......@@ -39,15 +43,22 @@
//! -p 9411:9411 \
//! docker.io/jaegertracing/all-in-one:1.21
//! ```
//!
use polkadot_node_primitives::SpawnNamed;
use sp_core::traits::SpawnNamed;
use polkadot_primitives::v1::{Hash, PoV, CandidateHash};
use parking_lot::RwLock;
use std::sync::Arc;
use std::result;
pub use crate::errors::JaegerError;
use std::{sync::Arc, result};
/// A description of an error causing the chain API request to be unservable.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum JaegerError {
#[error("Already launched the collector thread")]
AlreadyLaunched,
#[error("Missing jaeger configuration")]
MissingConfiguration,
}
lazy_static::lazy_static! {
static ref INSTANCE: RwLock<Jaeger> = RwLock::new(Jaeger::None);
......@@ -102,6 +113,50 @@ impl JaegerConfigBuilder {
}
}
/// A special "per leaf span".
///
/// Essentially this span wraps two spans:
///
/// 1. The span that is created per leaf in the overseer.
/// 2. Some child span of the per-leaf span.
///
/// This just works as auxiliary structure to easily store both.
#[derive(Debug)]
pub struct PerLeafSpan {
leaf_span: Arc<JaegerSpan>,
span: JaegerSpan,
}
impl PerLeafSpan {
/// Creates a new instance.
///
/// Takes the `leaf_span` that is created by the overseer per leaf and a name for a child span.
/// Both will be stored in this object, while the child span is implicitly accessible by using the
/// [`Deref`](std::ops::Deref) implementation.
pub fn new(leaf_span: Arc<JaegerSpan>, name: impl Into<String>) -> Self {
let span = leaf_span.child(name);
Self {
span,
leaf_span,
}
}
/// Returns the leaf span.
pub fn leaf_span(&self) -> &Arc<JaegerSpan> {
&self.leaf_span
}
}
/// Returns a reference to the child span.
impl std::ops::Deref for PerLeafSpan {
type Target = JaegerSpan;
fn deref(&self) -> &JaegerSpan {
&self.span
}
}
/// A wrapper type for a span.
///
/// Handles running with and without jaeger.
......@@ -120,6 +175,7 @@ impl JaegerSpan {
Self::Disabled => Self::Disabled,
}
}
/// Add an additional tag to the span.
pub fn add_string_tag(&mut self, tag: &str, value: &str) {
match self {
......
......@@ -32,7 +32,7 @@ use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View,
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View, OurView,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
......@@ -45,10 +45,8 @@ use polkadot_subsystem::messages::{
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
};
use polkadot_subsystem::{
jaeger,
errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemError,
jaeger, errors::{ChainApiError, RuntimeApiError},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError,
};
use std::collections::{HashMap, HashSet};
use std::collections::hash_map::Entry;
......@@ -128,7 +126,7 @@ struct ProtocolState {
peer_views: HashMap<PeerId, View>,
/// Our own view.
view: View,
view: OurView,
/// Caches a mapping of relay parents or ancestor to live candidate hashes.
/// Allows fast intersection of live candidates with views and consecutive unioning.
......@@ -278,8 +276,8 @@ impl ProtocolState {
}
}
// Removes all entries from live_under which aren't referenced in the ancestry of
// one of our live relay-chain heads.
/// Removes all entries from live_under which aren't referenced in the ancestry of
/// one of our live relay-chain heads.
fn clean_up_live_under_cache(&mut self) {
let extended_view: HashSet<_> = self.per_relay_parent.iter()
.map(|(r_hash, v)| v.ancestors.iter().cloned().chain(iter::once(*r_hash)))
......@@ -353,7 +351,7 @@ async fn handle_our_view_change<Context>(
ctx: &mut Context,
keystore: &SyncCryptoStorePtr,
state: &mut ProtocolState,
view: View,
view: OurView,
metrics: &Metrics,
) -> Result<()>
where
......@@ -845,11 +843,11 @@ where
}
}
// Metadata about a candidate that is part of the live_candidates set.
//
// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This
// information is propagated to the higher level where it can be used to create data entries. Cached candidates
// already have entries associated with them, and thus don't need this metadata to be fetched.
/// Metadata about a candidate that is part of the live_candidates set.
///
/// Those which were not present in a cache are "fresh" and have their candidate descriptor attached. This
/// information is propagated to the higher level where it can be used to create data entries. Cached candidates
/// already have entries associated with them, and thus don't need this metadata to be fetched.
#[derive(Debug)]
enum FetchedLiveCandidate {
Cached,
......
......@@ -17,7 +17,7 @@
use super::*;
use assert_matches::assert_matches;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_node_network_protocol::{view, ObservedRole, our_view};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex,
......@@ -398,7 +398,7 @@ async fn expect_chunks_network_message(
async fn change_our_view(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
view: View,
view: OurView,
validator_public: &[ValidatorId],
ancestors: Vec<Hash>,
session_per_relay_parent: HashMap<Hash, SessionIndex>,
......@@ -574,7 +574,7 @@ fn check_views() {
let genesis = Hash::repeat_byte(0xAA);
change_our_view(
&mut virtual_overseer,
view![current],
our_view![current],