Newer
Older
metrics.statements_distributed.inc();
}
}
/// Provide a timer for `active_leaves_update` which observes on drop.
fn time_active_leaves_update(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.active_leaves_update.start_timer())
}
/// Provide a timer for `share` which observes on drop.
fn time_share(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.share.start_timer())
}
/// Provide a timer for `network_bridge_update_v1` which observes on drop.
fn time_network_bridge_update_v1(&self) -> Option<metrics::prometheus::prometheus::HistogramTimer> {
self.0.as_ref().map(|metrics| metrics.network_bridge_update_v1.start_timer())
}
}
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> std::result::Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
statements_distributed: prometheus::register(
prometheus::Counter::new(
"parachain_statements_distributed_total",
"Number of candidate validity statements distributed to other peers."
)?,
registry,
)?,
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
active_leaves_update: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_statement_distribution_active_leaves_update",
"Time spent within `statement_distribution::active_leaves_update`",
)
)?,
registry,
)?,
share: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_statement_distribution_share",
"Time spent within `statement_distribution::share`",
)
)?,
registry,
)?,
network_bridge_update_v1: prometheus::register(
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
"parachain_statement_distribution_network_bridge_update_v1",
"Time spent within `statement_distribution::network_bridge_update_v1`",
)
)?,
registry,
)?,
};
Ok(Metrics(Some(metrics)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use sp_keyring::Sr25519Keyring;
use sp_application_crypto::AppKey;
use node_primitives::Statement;
use polkadot_primitives::v1::CommittedCandidateReceipt;
use assert_matches::assert_matches;
use futures::executor::{self, block_on};
use sp_keystore::{CryptoStore, SyncCryptoStorePtr, SyncCryptoStore};
use sc_keystore::LocalKeystore;
#[test]
fn active_head_accepts_only_2_seconded_per_validator() {
let validators = vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
];
let parent_hash: Hash = [1; 32].into();
let session_index = 1;
let signing_context = SigningContext {
parent_hash,
session_index,
};
let candidate_a = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 1.into();
c
};
let candidate_b = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 2.into();
c
};
let candidate_c = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = parent_hash;
c.descriptor.para_id = 3.into();
c
};
let mut head_data = ActiveHeadData::new(validators, session_index);
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).unwrap();
let bob_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Bob.to_seed())
).unwrap();
let a_seconded_val_0 = block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_a.clone()),
&signing_context,
0,
&alice_public.into(),
)).expect("should be signed");
let noted = head_data.note_statement(a_seconded_val_0.clone());
assert_matches!(noted, NotedStatement::Fresh(_));
// note A (duplicate)
let noted = head_data.note_statement(a_seconded_val_0);
assert_matches!(noted, NotedStatement::UsefulButKnown);
// note B
let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_b.clone()),
&signing_context,
0,
&alice_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
// note C (beyond 2 - ignored)
let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_c.clone()),
&signing_context,
0,
&alice_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::NotUseful);
// note B (new validator)
let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_b.clone()),
&signing_context,
1,
&bob_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
// note C (new validator)
let noted = head_data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate_c.clone()),
&signing_context,
1,
&bob_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
}
#[test]
fn note_local_works() {
let hash_a = CandidateHash([1; 32].into());
let hash_b = CandidateHash([2; 32].into());
let mut per_peer_tracker = VcPerPeerTracker::default();
per_peer_tracker.note_local(hash_a.clone());
per_peer_tracker.note_local(hash_b.clone());
assert!(per_peer_tracker.local_observed.contains(&hash_a));
assert!(per_peer_tracker.local_observed.contains(&hash_b));
assert!(!per_peer_tracker.remote_observed.contains(&hash_a));
assert!(!per_peer_tracker.remote_observed.contains(&hash_b));
}
#[test]
fn note_remote_works() {
let hash_a = CandidateHash([1; 32].into());
let hash_b = CandidateHash([2; 32].into());
let hash_c = CandidateHash([3; 32].into());
let mut per_peer_tracker = VcPerPeerTracker::default();
assert!(per_peer_tracker.note_remote(hash_a.clone()));
assert!(per_peer_tracker.note_remote(hash_b.clone()));
assert!(!per_peer_tracker.note_remote(hash_c.clone()));
assert!(per_peer_tracker.remote_observed.contains(&hash_a));
assert!(per_peer_tracker.remote_observed.contains(&hash_b));
assert!(!per_peer_tracker.remote_observed.contains(&hash_c));
assert!(!per_peer_tracker.local_observed.contains(&hash_a));
assert!(!per_peer_tracker.local_observed.contains(&hash_b));
assert!(!per_peer_tracker.local_observed.contains(&hash_c));
}
#[test]
fn per_peer_relay_parent_knowledge_send() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
// Sending an un-pinned statement should not work and should have no effect.
assert!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)).is_none());
assert!(!knowledge.known_candidates.contains(&hash_a));
assert!(knowledge.sent_statements.is_empty());
assert!(knowledge.received_statements.is_empty());
assert!(knowledge.seconded_counts.is_empty());
assert!(knowledge.received_message_count.is_empty());
// Make the peer aware of the candidate.
assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)), Some(true));
assert_eq!(knowledge.send(&(CompactStatement::Candidate(hash_a), 1)), Some(false));
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 2);
assert!(knowledge.received_statements.is_empty());
assert_eq!(knowledge.seconded_counts.len(), 2);
assert!(knowledge.received_message_count.get(&hash_a).is_none());
// And now it should accept the dependent message.
assert_eq!(knowledge.send(&(CompactStatement::Valid(hash_a), 0)), Some(false));
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(knowledge.sent_statements.len(), 3);
assert!(knowledge.received_statements.is_empty());
assert_eq!(knowledge.seconded_counts.len(), 2);
assert!(knowledge.received_message_count.get(&hash_a).is_none());
}
#[test]
fn cant_send_after_receiving() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
assert!(knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3).unwrap());
assert!(knowledge.send(&(CompactStatement::Candidate(hash_a), 0)).is_none());
}
#[test]
fn per_peer_relay_parent_knowledge_receive() {
let mut knowledge = PeerRelayParentKnowledge::default();
let hash_a = CandidateHash([1; 32].into());
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 0), 3),
Err(COST_UNEXPECTED_STATEMENT),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Candidate(hash_a), 0), 3),
Ok(true),
);
// Push statements up to the flood limit.
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 1), 3),
Ok(false),
);
assert!(knowledge.known_candidates.contains(&hash_a));
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 2);
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3),
Ok(false),
);
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3);
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 7), 3),
Err(COST_APPARENT_FLOOD),
);
assert_eq!(*knowledge.received_message_count.get(&hash_a).unwrap(), 3);
assert_eq!(knowledge.received_statements.len(), 3); // number of prior `Ok`s.
// Now make sure that the seconding limit is respected.
let hash_b = CandidateHash([2; 32].into());
let hash_c = CandidateHash([3; 32].into());
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
assert_eq!(
knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3),
Ok(true),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Candidate(hash_c), 0), 3),
Err(COST_UNEXPECTED_STATEMENT),
);
// Last, make sure that already-known statements are disregarded.
assert_eq!(
knowledge.receive(&(CompactStatement::Valid(hash_a), 2), 3),
Err(COST_DUPLICATE_STATEMENT),
);
assert_eq!(
knowledge.receive(&(CompactStatement::Candidate(hash_b), 0), 3),
Err(COST_DUPLICATE_STATEMENT),
);
}
#[test]
fn peer_view_update_sends_messages() {
let hash_a = [1; 32].into();
let hash_b = [2; 32].into();
let hash_c = [3; 32].into();
let candidate = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = hash_c;
c.descriptor.para_id = 1.into();
c
};
let candidate_hash = candidate.hash();
let old_view = View(vec![hash_a, hash_b]);
let new_view = View(vec![hash_b, hash_c]);
let mut active_heads = HashMap::new();
let validators = vec![
Sr25519Keyring::Alice.public().into(),
Sr25519Keyring::Bob.public().into(),
Sr25519Keyring::Charlie.public().into(),
];
let session_index = 1;
let signing_context = SigningContext {
parent_hash: hash_c,
session_index,
};
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).unwrap();
let bob_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Bob.to_seed())
).unwrap();
let charlie_public = SyncCryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Charlie.to_seed())
).unwrap();
let new_head_data = {
let mut data = ActiveHeadData::new(validators, session_index);
let noted = data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
0,
&alice_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
let noted = data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Valid(candidate_hash),
&signing_context,
1,
&bob_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
let noted = data.note_statement(block_on(SignedFullStatement::sign(
&keystore,
Statement::Valid(candidate_hash),
&signing_context,
2,
&charlie_public.into(),
)).expect("should be signed"));
assert_matches!(noted, NotedStatement::Fresh(_));
data
};
active_heads.insert(hash_c, new_head_data);
let mut peer_data = PeerData {
view: old_view,
view_knowledge: {
let mut k = HashMap::new();
k.insert(hash_a, Default::default());
k.insert(hash_b, Default::default());
k
},
};
let pool = sp_core::testing::TaskExecutor::new();
Peter Goodspeed-Niklaus
committed
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let peer = PeerId::random();
executor::block_on(async move {
update_peer_view_and_send_unlocked(
peer.clone(),
&mut peer_data,
&mut ctx,
&active_heads,
new_view.clone(),
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
).await.unwrap();
assert_eq!(peer_data.view, new_view);
assert!(!peer_data.view_knowledge.contains_key(&hash_a));
assert!(peer_data.view_knowledge.contains_key(&hash_b));
let c_knowledge = peer_data.view_knowledge.get(&hash_c).unwrap();
assert!(c_knowledge.known_candidates.contains(&candidate_hash));
assert!(c_knowledge.sent_statements.contains(
&(CompactStatement::Candidate(candidate_hash), 0)
));
assert!(c_knowledge.sent_statements.contains(
&(CompactStatement::Valid(candidate_hash), 1)
));
assert!(c_knowledge.sent_statements.contains(
&(CompactStatement::Valid(candidate_hash), 2)
));
// now see if we got the 3 messages from the active head data.
let active_head = active_heads.get(&hash_c).unwrap();
// semi-fragile because hashmap iterator ordering is undefined, but in practice
// it will not change between runs of the program.
for statement in active_head.statements_about(candidate_hash) {
let message = handle.recv().await;
let expected_to = vec![peer.clone()];
let expected_payload
= statement_message(hash_c, statement.statement.clone());
assert_matches!(
message,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
to,
payload,
)) => {
assert_eq!(to, expected_to);
assert_eq!(payload, expected_payload)
}
)
}
});
}
#[test]
fn circulated_statement_goes_to_all_peers_with_view() {
let hash_a = [1; 32].into();
let hash_b = [2; 32].into();
let hash_c = [3; 32].into();
let candidate = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = hash_b;
c.descriptor.para_id = 1.into();
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
c
};
let peer_a = PeerId::random();
let peer_b = PeerId::random();
let peer_c = PeerId::random();
let peer_a_view = View(vec![hash_a]);
let peer_b_view = View(vec![hash_a, hash_b]);
let peer_c_view = View(vec![hash_b, hash_c]);
let session_index = 1;
let peer_data_from_view = |view: View| PeerData {
view: view.clone(),
view_knowledge: view.0.iter().map(|v| (v.clone(), Default::default())).collect(),
};
let mut peer_data: HashMap<_, _> = vec![
(peer_a.clone(), peer_data_from_view(peer_a_view)),
(peer_b.clone(), peer_data_from_view(peer_b_view)),
(peer_c.clone(), peer_data_from_view(peer_c_view)),
].into_iter().collect();
let pool = sp_core::testing::TaskExecutor::new();
Peter Goodspeed-Niklaus
committed
let (mut ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
let statement = {
let signing_context = SigningContext {
parent_hash: hash_b,
session_index,
};
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).await.unwrap();
let statement = SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate),
&signing_context,
0,
&alice_public.into(),
).await.expect("should be signed");
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
StoredStatement {
comparator: StoredStatementComparator {
compact: statement.payload().to_compact(),
validator_index: 0,
signature: statement.signature().clone()
},
statement,
}
};
let needs_dependents = circulate_statement(
&mut peer_data,
&mut ctx,
hash_b,
&statement,
).await.unwrap();
{
assert_eq!(needs_dependents.len(), 2);
assert!(needs_dependents.contains(&peer_b));
assert!(needs_dependents.contains(&peer_c));
}
let fingerprint = (statement.compact().clone(), 0);
assert!(
peer_data.get(&peer_b).unwrap()
.view_knowledge.get(&hash_b).unwrap()
.sent_statements.contains(&fingerprint),
);
assert!(
peer_data.get(&peer_c).unwrap()
.view_knowledge.get(&hash_b).unwrap()
.sent_statements.contains(&fingerprint),
);
let message = handle.recv().await;
assert_matches!(
message,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage(
to,
payload,
)) => {
assert_eq!(to.len(), 2);
assert!(to.contains(&peer_b));
assert!(to.contains(&peer_c));
assert_eq!(
payload,
statement_message(hash_b, statement.statement.clone()),
);
}
)
});
}
}