Unverified Commit 23f6d27f authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Do not send empty view updates to peers (#2233)



* Do not send empty view updates to peers

It happened that we send empty view updates to our peers, because we
only updated our finalized block. This could lead to situations where we
overwhelmed sub systems with too many messages. On Rococo this lead to
constant restarts of our nodes, because some node apparently was
finalizing a lot of blocks.

To prevent this, the pr is doing the following:

1. If a peer sends us an empty view, we report this peer and decrease it
reputation.

2. We ensure that we only send a view update when the `heads` changed
and not only the `finalized_number`.

3. We do not send empty `ActiveLeavesUpdates` from the overseer, as this
makes no sense to send these empty updates. If some subsystem is relying
on the finalized block, it needs to listen for the overseer signal.

* Update node/network/bridge/src/lib.rs

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Don't work if they're are no added heads

* Fix test

* Ahhh

* More fixes

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
parent 6cfab385
Pipeline #119402 passed with stages
in 25 minutes and 47 seconds
......@@ -550,6 +550,10 @@ where
*current = view;
if added.is_empty() {
return
}
// only contains the intersection of what we are interested and
// the union of all relay parent's candidates.
let added_candidates = state.cached_live_candidates_unioned(added.iter());
......
......@@ -59,12 +59,10 @@ pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1";
/// The protocol name for the collation peer-set.
pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1";
const MALFORMED_MESSAGE_COST: ReputationChange
= ReputationChange::new(-500, "Malformed Network-bridge message");
const UNCONNECTED_PEERSET_COST: ReputationChange
= ReputationChange::new(-50, "Message sent to un-connected peer-set");
const MALFORMED_VIEW_COST: ReputationChange
= ReputationChange::new(-500, "Malformed view");
const MALFORMED_MESSAGE_COST: ReputationChange = ReputationChange::new(-500, "Malformed Network-bridge message");
const UNCONNECTED_PEERSET_COST: ReputationChange = ReputationChange::new(-50, "Message sent to un-connected peer-set");
const MALFORMED_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Malformed view");
const EMPTY_VIEW_COST: ReputationChange = ReputationChange::new(-500, "Peer sent us an empty view");
// network bridge log target
const LOG_TARGET: &'static str = "network_bridge";
......@@ -388,7 +386,11 @@ async fn update_our_view(
collation_peers: &HashMap<PeerId, PeerData>,
) -> SubsystemResult<()> {
let new_view = construct_view(live_heads.iter().map(|v| v.0), finalized_number);
if *local_view == new_view { return Ok(()) }
// We only want to send a view update when the heads changed, not when only the finalized block changed.
if local_view.heads == new_view.heads {
return Ok(())
}
*local_view = new_view.clone();
......@@ -441,6 +443,13 @@ async fn handle_peer_messages<M>(
MALFORMED_VIEW_COST,
).await?;
continue
} else if new_view.heads.is_empty() {
net.report_peer(
peer.clone(),
EMPTY_VIEW_COST,
).await?;
continue
} else if new_view == peer_data.view {
continue
......@@ -923,10 +932,11 @@ mod tests {
}
}
// network actions are sensitive to ordering of `PeerId`s within a `HashMap`, so
// we need to use this to prevent fragile reliance on peer ordering.
fn network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) -> bool {
actions.iter().find(|&x| x == action).is_some()
/// Assert that the given actions contain the given `action`.
fn assert_network_actions_contains(actions: &[NetworkAction], action: &NetworkAction) {
if !actions.iter().any(|x| x == action) {
panic!("Could not find `{:?}` in `{:?}`", action, actions);
}
}
struct TestHarness {
......@@ -1035,23 +1045,85 @@ mod tests {
view![hash_a]
).encode();
assert!(network_actions_contains(
assert_network_actions_contains(
&actions,
&NetworkAction::WriteNotification(
peer_a,
PeerSet::Validation,
wire_message.clone(),
),
);
assert_network_actions_contains(
&actions,
&NetworkAction::WriteNotification(
peer_b,
PeerSet::Validation,
wire_message.clone(),
),
);
});
}
#[test]
fn do_not_send_view_update_when_only_finalized_block_changed() {
test_harness(|test_harness| async move {
let TestHarness { mut network_handle, mut virtual_overseer } = test_harness;
let peer_a = PeerId::random();
let peer_b = PeerId::random();
network_handle.connect_peer(
peer_a.clone(),
PeerSet::Validation,
ObservedRole::Full,
).await;
network_handle.connect_peer(
peer_b.clone(),
PeerSet::Validation,
ObservedRole::Full,
).await;
let hash_a = Hash::repeat_byte(1);
virtual_overseer.send(FromOverseer::Signal(OverseerSignal::BlockFinalized(Hash::random(), 5))).await;
// Send some empty active leaves update
//
// This should not trigger a view update to our peers.
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default()))
).await;
// This should trigger the view update to our peers.
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(
ActiveLeavesUpdate::start_work(hash_a, Arc::new(JaegerSpan::Disabled)),
))
).await;
let actions = network_handle.next_network_actions(2).await;
let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
View { heads: vec![hash_a], finalized_number: 5 }
).encode();
assert_network_actions_contains(
&actions,
&NetworkAction::WriteNotification(
peer_a,
PeerSet::Validation,
wire_message.clone(),
),
));
);
assert!(network_actions_contains(
assert_network_actions_contains(
&actions,
&NetworkAction::WriteNotification(
peer_b,
PeerSet::Validation,
wire_message.clone(),
),
));
);
});
}
......@@ -1225,14 +1297,14 @@ mod tests {
view![hash_a]
).encode();
assert!(network_actions_contains(
assert_network_actions_contains(
&actions,
&NetworkAction::WriteNotification(
peer.clone(),
PeerSet::Collation,
wire_message.clone(),
),
));
);
});
}
......@@ -1292,13 +1364,13 @@ mod tests {
).await;
let actions = network_handle.next_network_actions(1).await;
assert!(network_actions_contains(
assert_network_actions_contains(
&actions,
&NetworkAction::ReputationChange(
peer_a.clone(),
UNCONNECTED_PEERSET_COST,
),
));
);
// peer B has the message relayed.
......@@ -1402,7 +1474,6 @@ mod tests {
let hash_a = Hash::repeat_byte(1);
let hash_b = Hash::repeat_byte(2);
let hash_c = Hash::repeat_byte(3);
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_a, 1))
......@@ -1421,39 +1492,14 @@ mod tests {
}
).encode();
assert!(network_actions_contains(
assert_network_actions_contains(
&actions,
&NetworkAction::WriteNotification(
peer_a.clone(),
PeerSet::Validation,
wire_message.clone(),
),
));
// view updates are issued even when `ActiveLeavesUpdate` is empty
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash_c, 3))
).await;
virtual_overseer.send(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::default()))
).await;
let actions = network_handle.next_network_actions(1).await;
let wire_message = WireMessage::<protocol_v1::ValidationProtocol>::ViewUpdate(
View {
heads: vec![hash_b],
finalized_number: 3,
}
).encode();
assert!(network_actions_contains(
&actions,
&NetworkAction::WriteNotification(
peer_a,
PeerSet::Validation,
wire_message.clone(),
),
));
);
});
}
......
......@@ -137,7 +137,7 @@ enum ToOverseer {
/// This structure exists solely for the purposes of decoupling
/// `Overseer` code from the client code and the necessity to call
/// `HeaderBackend::block_number_from_id()`.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BlockInfo {
/// hash of the block.
pub hash: Hash,
......@@ -1514,7 +1514,9 @@ where
update.activated.push((hash, span));
}
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
loop {
select! {
......@@ -1620,10 +1622,14 @@ where
self.on_head_deactivated(deactivated)
}
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash, block.number)).await?;
// broadcast `ActiveLeavesUpdate` even if empty to issue view updates
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
// If there are no leaves being deactivated, we don't need to send an update.
//
// Our peers will be informed about our finalized block the next time we activating/deactivating some leaf.
if !update.is_empty() {
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
}
Ok(())
}
......@@ -1898,9 +1904,9 @@ mod tests {
}
}
struct TestSubsystem4;
struct ReturnOnStart;
impl<C> Subsystem<C> for TestSubsystem4
impl<C> Subsystem<C> for ReturnOnStart
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(self, mut _ctx: C) -> SpawnedSubsystem {
......@@ -2043,29 +2049,22 @@ mod tests {
// Spawn a subsystem that immediately exits.
//
// Should immediately conclude the overseer itself with an error.
// Should immediately conclude the overseer itself.
#[test]
fn overseer_panics_on_subsystem_exit() {
fn overseer_ends_on_subsystem_exit() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let (s1_tx, _) = mpsc::channel(64);
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_validation(TestSubsystem1(s1_tx))
.replace_candidate_backing(TestSubsystem4);
.replace_candidate_backing(ReturnOnStart);
let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
select! {
res = overseer_fut => assert!(res.is_err()),
complete => (),
}
overseer.run().await.unwrap();
})
}
......@@ -2309,9 +2308,8 @@ mod tests {
complete => break,
}
if ss5_results.len() == expected_heartbeats.len() &&
ss6_results.len() == expected_heartbeats.len() {
handler.stop().await;
if ss5_results.len() == expected_heartbeats.len() && ss6_results.len() == expected_heartbeats.len() {
handler.stop().await;
}
}
......@@ -2327,6 +2325,79 @@ mod tests {
});
}
#[test]
fn do_not_send_empty_leaves_update_on_block_finalization() {
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let imported_block = BlockInfo {
hash: Hash::random(),
parent_hash: Hash::random(),
number: 1,
};
let finalized_block = BlockInfo {
hash: Hash::random(),
parent_hash: Hash::random(),
number: 1,
};
let (tx_5, mut rx_5) = mpsc::channel(64);
let all_subsystems = AllSubsystems::<()>::dummy()
.replace_candidate_backing(TestSubsystem6(tx_5));
let (overseer, mut handler) = Overseer::new(
Vec::new(),
all_subsystems,
None,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
pin_mut!(overseer_fut);
let mut ss5_results = Vec::new();
handler.block_finalized(finalized_block.clone()).await;
handler.block_imported(imported_block.clone()).await;
let expected_heartbeats = vec![
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: [
(imported_block.hash, Arc::new(JaegerSpan::Disabled)),
].as_ref().into(),
..Default::default()
}),
OverseerSignal::BlockFinalized(finalized_block.hash, 1),
];
loop {
select! {
res = overseer_fut => {
assert!(res.is_ok());
break;
},
res = rx_5.next() => {
if let Some(res) = dbg!(res) {
ss5_results.push(res);
}
}
}
if ss5_results.len() == expected_heartbeats.len() {
handler.stop().await;
}
}
assert_eq!(ss5_results.len(), expected_heartbeats.len());
for expected in expected_heartbeats {
assert!(ss5_results.contains(&expected));
}
});
}
#[derive(Clone)]
struct CounterSubsystem {
stop_signals_received: Arc<atomic::AtomicUsize>,
......@@ -2542,7 +2613,7 @@ mod tests {
assert_eq!(stop_signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
// x2 because of broadcast_signal on startup
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), 2 * NUM_SUBSYSTEMS);
assert_eq!(signals_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS);
// -1 for BitfieldSigning
assert_eq!(msgs_received.load(atomic::Ordering::SeqCst), NUM_SUBSYSTEMS - 1);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment