Unverified Commit 4685d8f4 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Fix statement distribution: forward statements to other peers. (#2146)



* add candidate hash statement circulation span

* add relay-parent to hash-span

* Some typos and misspellings in docs I found, during my studies. (#2144)

* Fix stale link to overseer docs

* Some typos and mispellings in docs/comments

I found during studying how Polkadot works.

* Rococo V1 (#2141)

* Update to latest master and use 30 minutes sessions

* add bootnodes to chainspec

* Update Substrate

* Update chain-spec

* Update Cargo.lock

* GENESIS

* Change session length to one hour

* Bump spec_version to not fuck anything up ;)

Co-authored-by: Erin Grasmick's avatarErin Grasmick <erin@parity.io>

* avoid creating duplicate unbacked spans when we see extra statements (#2145)

* improve jaeger spans for statement distribution

* tweak and add failing test for repropagation

* make a change that gets the test passing

* guide: clarify

* remove semicolon

Co-authored-by: default avatarRobert Klotzner <eskimor@users.noreply.github.com>
Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Erin Grasmick's avatarErin Grasmick <erin@parity.io>
parent 3d9ae5e8
Pipeline #117787 passed with stages
in 20 minutes and 27 seconds
...@@ -389,16 +389,23 @@ struct ActiveHeadData { ...@@ -389,16 +389,23 @@ struct ActiveHeadData {
session_index: sp_staking::SessionIndex, session_index: sp_staking::SessionIndex,
/// How many `Seconded` statements we've seen per validator. /// How many `Seconded` statements we've seen per validator.
seconded_counts: HashMap<ValidatorIndex, usize>, seconded_counts: HashMap<ValidatorIndex, usize>,
/// A Jaeger span for this head, so we can attach data to it.
span: jaeger::JaegerSpan,
} }
impl ActiveHeadData { impl ActiveHeadData {
fn new(validators: Vec<ValidatorId>, session_index: sp_staking::SessionIndex) -> Self { fn new(
validators: Vec<ValidatorId>,
session_index: sp_staking::SessionIndex,
relay_parent: &Hash,
) -> Self {
ActiveHeadData { ActiveHeadData {
candidates: Default::default(), candidates: Default::default(),
statements: Default::default(), statements: Default::default(),
validators, validators,
session_index, session_index,
seconded_counts: Default::default(), seconded_counts: Default::default(),
span: jaeger::hash_span(&relay_parent, "statement-dist-active"),
} }
} }
...@@ -532,6 +539,15 @@ async fn circulate_statement_and_dependents( ...@@ -532,6 +539,15 @@ async fn circulate_statement_and_dependents(
None => return, None => return,
}; };
let _span = {
let mut span = active_head.span.child("circulate-statement");
span.add_string_tag(
"candidate-hash",
&format!("{:?}", statement.payload().candidate_hash().0),
);
span
};
// First circulate the statement directly to all peers needing it. // First circulate the statement directly to all peers needing it.
// The borrow of `active_head` needs to encompass only this (Rust) statement. // The borrow of `active_head` needs to encompass only this (Rust) statement.
let outputs: Option<(CandidateHash, Vec<PeerId>)> = { let outputs: Option<(CandidateHash, Vec<PeerId>)> = {
...@@ -674,7 +690,7 @@ async fn report_peer( ...@@ -674,7 +690,7 @@ async fn report_peer(
// if we were not already aware of it, along with the corresponding relay-parent. // if we were not already aware of it, along with the corresponding relay-parent.
// //
// This function checks the signature and ensures the statement is compatible with our // This function checks the signature and ensures the statement is compatible with our
// view. // view. It also notifies candidate backing if the statement was previously unknown.
#[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(peer_data, ctx, active_heads, metrics), fields(subsystem = LOG_TARGET))]
async fn handle_incoming_message<'a>( async fn handle_incoming_message<'a>(
peer: PeerId, peer: PeerId,
...@@ -708,6 +724,16 @@ async fn handle_incoming_message<'a>( ...@@ -708,6 +724,16 @@ async fn handle_incoming_message<'a>(
} }
}; };
let candidate_hash = statement.payload().candidate_hash();
let handle_incoming_span = {
let mut span = active_head.span.child("handle-incoming");
span.add_string_tag(
"candidate-hash",
&format!("{:?}", candidate_hash.0),
);
span
};
// check the signature on the statement. // check the signature on the statement.
if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) { if let Err(()) = check_statement_signature(&active_head, relay_parent, &statement) {
report_peer(ctx, peer, COST_INVALID_SIGNATURE).await; report_peer(ctx, peer, COST_INVALID_SIGNATURE).await;
...@@ -733,7 +759,7 @@ async fn handle_incoming_message<'a>( ...@@ -733,7 +759,7 @@ async fn handle_incoming_message<'a>(
peer_data, peer_data,
ctx, ctx,
relay_parent, relay_parent,
fingerprint.0.candidate_hash().clone(), candidate_hash,
&*active_head, &*active_head,
metrics, metrics,
).await; ).await;
...@@ -753,6 +779,16 @@ async fn handle_incoming_message<'a>( ...@@ -753,6 +779,16 @@ async fn handle_incoming_message<'a>(
} }
NotedStatement::Fresh(statement) => { NotedStatement::Fresh(statement) => {
report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await; report_peer(ctx, peer, BENEFIT_VALID_STATEMENT_FIRST).await;
let mut _span = handle_incoming_span.child("notify-backing");
// When we receive a new message from a peer, we forward it to the
// candidate backing subsystem.
let message = AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(relay_parent, statement.statement.clone())
);
ctx.send_message(message).await;
Some((relay_parent, statement)) Some((relay_parent, statement))
} }
} }
...@@ -815,9 +851,9 @@ async fn handle_network_update( ...@@ -815,9 +851,9 @@ async fn handle_network_update(
peers.remove(&peer); peers.remove(&peer);
} }
NetworkBridgeEvent::PeerMessage(peer, message) => { NetworkBridgeEvent::PeerMessage(peer, message) => {
match peers.get_mut(&peer) { let handled_incoming = match peers.get_mut(&peer) {
Some(data) => { Some(data) => {
let new_stored = handle_incoming_message( handle_incoming_message(
peer, peer,
data, data,
&*our_view, &*our_view,
...@@ -826,21 +862,27 @@ async fn handle_network_update( ...@@ -826,21 +862,27 @@ async fn handle_network_update(
message, message,
metrics, metrics,
statement_listeners, statement_listeners,
).await; ).await
if let Some((relay_parent, new)) = new_stored {
let mut _span = jaeger::hash_span(&relay_parent, "sending-statement");
// When we receive a new message from a peer, we forward it to the
// candidate backing subsystem.
let message = AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(relay_parent, new.statement.clone())
);
ctx.send_message(message).await;
}
}
None => (),
} }
None => None,
};
// if we got a fresh message, we need to circulate it to all peers.
if let Some((relay_parent, statement)) = handled_incoming {
// we can ignore the set of peers who this function returns as now expecting
// dependent statements.
//
// we have the invariant in this subsystem that we never store a `Valid` or `Invalid`
// statement before a `Seconded` statement. `Seconded` statements are the only ones
// that require dependents. Thus, if this is a `Seconded` statement for a candidate we
// were not aware of before, we cannot have any dependent statements from the candidate.
let _ = circulate_statement(
peers,
ctx,
relay_parent,
statement,
).await;
}
} }
NetworkBridgeEvent::PeerViewChange(peer, view) => { NetworkBridgeEvent::PeerViewChange(peer, view) => {
match peers.get_mut(&peer) { match peers.get_mut(&peer) {
...@@ -935,7 +977,7 @@ impl StatementDistribution { ...@@ -935,7 +977,7 @@ impl StatementDistribution {
}; };
active_heads.entry(relay_parent) active_heads.entry(relay_parent)
.or_insert(ActiveHeadData::new(validators, session_index)); .or_insert(ActiveHeadData::new(validators, session_index, &relay_parent));
} }
} }
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => { FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {
...@@ -945,7 +987,6 @@ impl StatementDistribution { ...@@ -945,7 +987,6 @@ impl StatementDistribution {
FromOverseer::Communication { msg } => match msg { FromOverseer::Communication { msg } => match msg {
StatementDistributionMessage::Share(relay_parent, statement) => { StatementDistributionMessage::Share(relay_parent, statement) => {
let _timer = metrics.time_share(); let _timer = metrics.time_share();
let mut _span = jaeger::hash_span(&relay_parent, "circulate-statement");
inform_statement_listeners( inform_statement_listeners(
&statement, &statement,
...@@ -1072,7 +1113,7 @@ mod tests { ...@@ -1072,7 +1113,7 @@ mod tests {
use futures::executor::{self, block_on}; use futures::executor::{self, block_on};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore}; use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
use sc_keystore::LocalKeystore; use sc_keystore::LocalKeystore;
use polkadot_node_network_protocol::view; use polkadot_node_network_protocol::{view, ObservedRole};
#[test] #[test]
fn active_head_accepts_only_2_seconded_per_validator() { fn active_head_accepts_only_2_seconded_per_validator() {
...@@ -1110,7 +1151,7 @@ mod tests { ...@@ -1110,7 +1151,7 @@ mod tests {
c c
}; };
let mut head_data = ActiveHeadData::new(validators, session_index); let mut head_data = ActiveHeadData::new(validators, session_index, &parent_hash);
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory()); let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = SyncCryptoStore::sr25519_generate_new( let alice_public = SyncCryptoStore::sr25519_generate_new(
...@@ -1368,7 +1409,7 @@ mod tests { ...@@ -1368,7 +1409,7 @@ mod tests {
).unwrap(); ).unwrap();
let new_head_data = { let new_head_data = {
let mut data = ActiveHeadData::new(validators, session_index); let mut data = ActiveHeadData::new(validators, session_index, &hash_c);
let noted = data.note_statement(block_on(SignedFullStatement::sign( let noted = data.note_statement(block_on(SignedFullStatement::sign(
&keystore, &keystore,
...@@ -1586,4 +1627,162 @@ mod tests { ...@@ -1586,4 +1627,162 @@ mod tests {
) )
}); });
} }
#[test]
fn receiving_from_one_sends_to_another_and_to_candidate_backing() {
let hash_a = Hash::repeat_byte(1);
let candidate = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = hash_a;
c.descriptor.para_id = 1.into();
c
};
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let validators = vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
];
let session_index = 1;
let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let bg = async move {
let s = StatementDistribution { metrics: Default::default() };
s.run(ctx).await.unwrap();
};
let test_fut = async move {
// register our active heads.
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![hash_a].into(),
deactivated: vec![].into(),
}))).await;
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::Validators(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(validators));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);
// notify of peers and view
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a])
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![hash_a])
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(view![hash_a])
)
}).await;
// receive a seconded statement from peer A. it should be propagated onwards to peer B and to
// candidate backing.
let statement = {
let signing_context = SigningContext {
parent_hash: hash_a,
session_index,
};
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).await.unwrap();
SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate),
&signing_context,
0,
&alice_public.into(),
).await.expect("should be signed")
};
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone()),
)
)
}).await;
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == BENEFIT_VALID_STATEMENT_FIRST => {}
);
assert_matches!(
handle.recv().await,
AllMessages::CandidateBacking(
CandidateBackingMessage::Statement(r, s)
) if r == hash_a && s == statement => {}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
recipients,
protocol_v1::ValidationProtocol::StatementDistribution(
protocol_v1::StatementDistributionMessage::Statement(r, s)
),
)
) => {
assert_eq!(recipients, vec![peer_b.clone()]);
assert_eq!(r, hash_a);
assert_eq!(s, statement);
}
);
};
futures::pin_mut!(test_fut);
futures::pin_mut!(bg);
executor::block_on(future::select(test_fut, bg));
}
} }
...@@ -159,9 +159,13 @@ pub fn pov_span(pov: &PoV, span_name: impl Into<String>) -> JaegerSpan { ...@@ -159,9 +159,13 @@ pub fn pov_span(pov: &PoV, span_name: impl Into<String>) -> JaegerSpan {
/// Creates a `Span` referring to the given hash. All spans created with [`hash_span`] with the /// Creates a `Span` referring to the given hash. All spans created with [`hash_span`] with the
/// same hash (even from multiple different nodes) will be visible in the same view on Jaeger. /// same hash (even from multiple different nodes) will be visible in the same view on Jaeger.
///
/// This span automatically has the `relay-parent` tag set.
#[inline(always)] #[inline(always)]
pub fn hash_span(hash: &Hash, span_name: impl Into<String>) -> JaegerSpan { pub fn hash_span(hash: &Hash, span_name: impl Into<String>) -> JaegerSpan {
INSTANCE.read_recursive().span(|| { *hash }, span_name).into() let mut span: JaegerSpan = INSTANCE.read_recursive().span(|| { *hash }, span_name).into();
span.add_string_tag("relay-parent", &format!("{:?}", hash));
span
} }
/// Stateful convenience wrapper around [`mick_jaeger`]. /// Stateful convenience wrapper around [`mick_jaeger`].
......
...@@ -38,7 +38,7 @@ There is a very simple state machine which governs which messages we are willing ...@@ -38,7 +38,7 @@ There is a very simple state machine which governs which messages we are willing
A: Initial State. Receive `SignedFullStatement(Statement::Second)`: extract `Statement`, forward to Candidate Backing and PoV Distribution, proceed to B. Receive any other `SignedFullStatement` variant: drop it. A: Initial State. Receive `SignedFullStatement(Statement::Second)`: extract `Statement`, forward to Candidate Backing and PoV Distribution, proceed to B. Receive any other `SignedFullStatement` variant: drop it.
B: Receive any `SignedFullStatement`: check signature, forward to Candidate Backing. Receive `OverseerMessage::StopWork`: proceed to C. B: Receive any `SignedFullStatement`: check signature and determine whether the statement is new to us. if new, forward to Candidate Backing and circulate to other peers. Receive `OverseerMessage::StopWork`: proceed to C.
C: Receive any message for this block: drop it. C: Receive any message for this block: drop it.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment