Unverified Commit 5ea2527e authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Batch messages to network bridge and introduce a timeout to...


Batch messages to network bridge and introduce a timeout to `SubsystemContext::send_message` (#2197)

* guide: batch network messages

* bridge: batch

* av-dist: batch outgoing messages

* time-out message sends in subsystem context

* Update node/subsystem/src/messages.rs

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>

* Revert "time-out message sends in subsystem context"

This reverts commit d49be625

.

* Update node/network/availability-distribution/src/lib.rs

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
parent 9060c1e3
Pipeline #119231 passed with stages
in 27 minutes and 36 seconds
......@@ -394,6 +394,7 @@ where
}
// handle all candidates
let mut messages_out = Vec::new();
for candidate_hash in state.cached_live_candidates_unioned(view.difference(&old_view)) {
// If we are not a validator for this candidate, let's skip it.
match state.per_candidate.get(&candidate_hash) {
......@@ -475,13 +476,16 @@ where
.cloned()
.collect::<Vec<_>>();
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, peers, iter::once(message)).await;
add_tracked_messages_to_batch(&mut messages_out, per_candidate, metrics, peers, iter::once(message));
}
// traces are better if we wait until the loop is done to drop.
per_candidate.drop_span_after_own_availability();
}
// send all batched messages out.
send_batch_to_network(ctx, messages_out).await;
// cleanup the removed relay parents and their states
old_view.difference(&view).for_each(|r| state.remove_relay_parent(r));
state.clean_up_live_under_cache();
......@@ -489,17 +493,15 @@ where
Ok(())
}
#[tracing::instrument(level = "trace", skip(ctx, metrics, message_iter), fields(subsystem = LOG_TARGET))]
async fn send_tracked_gossip_messages_to_peers<Context>(
ctx: &mut Context,
// After this function is invoked, the state reflects the messages as having been sent to a peer.
#[tracing::instrument(level = "trace", skip(batch, metrics, message_iter), fields(subsystem = LOG_TARGET))]
fn add_tracked_messages_to_batch(
batch: &mut Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>,
per_candidate: &mut PerCandidate,
metrics: &Metrics,
peers: Vec<PeerId>,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
) {
for message in message_iter {
for peer in peers.iter() {
per_candidate
......@@ -510,16 +512,25 @@ where
}
if !peers.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessage(
batch.push((
peers.clone(),
protocol_v1::ValidationProtocol::AvailabilityDistribution(message.into()),
).into()).await;
));
metrics.on_chunk_distributed();
}
}
}
async fn send_batch_to_network(
ctx: &mut impl SubsystemContext,
batch: Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>,
) {
if !batch.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessages(batch).into()).await
}
}
// Send the difference between two views which were not sent
// to that particular peer.
#[tracing::instrument(level = "trace", skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
......@@ -544,6 +555,7 @@ where
let added_candidates = state.cached_live_candidates_unioned(added.iter());
// Send all messages we've seen before and the peer is now interested in.
let mut batch = Vec::new();
for candidate_hash in added_candidates {
let per_candidate = match state.per_candidate.get_mut(&candidate_hash) {
Some(p) => p,
......@@ -564,8 +576,10 @@ where
.cloned()
.collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peers(ctx, per_candidate, metrics, vec![origin.clone()], messages).await;
add_tracked_messages_to_batch(&mut batch, per_candidate, metrics, vec![origin.clone()], messages);
}
send_batch_to_network(ctx, batch).await;
}
/// Obtain the first key which has a signing key.
......@@ -753,7 +767,9 @@ where
drop(span);
// gossip that message to interested peers
send_tracked_gossip_messages_to_peers(ctx, candidate_entry, metrics, peers, iter::once(message)).await;
let mut batch = Vec::new();
add_tracked_messages_to_batch(&mut batch, candidate_entry, metrics, peers, iter::once(message));
send_batch_to_network(ctx, batch).await;
Ok(())
}
......
......@@ -372,28 +372,32 @@ fn derive_erasure_chunks_with_proofs(
async fn expect_chunks_network_message(
virtual_overseer: &mut test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
peers: &[PeerId],
peers: &[Vec<PeerId>],
candidates: &[CandidateHash],
chunks: &[ErasureChunk],
) {
for _ in 0..chunks.len() {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(
send_peers,
if chunks.is_empty() { return }
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessages(msgs)
) => {
assert_eq!(msgs.len(), chunks.len());
for (send_peers, msg) in msgs {
assert_matches!(
msg,
protocol_v1::ValidationProtocol::AvailabilityDistribution(
protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk),
),
)
) => {
assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate));
assert!(chunks.iter().any(|c| c == &send_chunk), format!("Could not find chunk: {:?}", send_chunk));
assert_eq!(peers.len(), send_peers.len());
assert!(peers.iter().all(|p| send_peers.contains(p)));
protocol_v1::AvailabilityDistributionMessage::Chunk(send_candidate, send_chunk)
) => {
let i = chunks.iter().position(|c| c == &send_chunk).unwrap();
assert!(candidates.contains(&send_candidate), format!("Could not find candidate: {:?}", send_candidate));
assert_eq!(&peers[i], &send_peers);
}
);
}
);
}
}
)
}
async fn change_our_view(
......@@ -464,6 +468,9 @@ async fn change_our_view(
);
}
let mut send_peers = Vec::new();
let mut send_chunks = Vec::new();
let mut candidates = Vec::new();
for _ in 0..data_availability.len() {
let (available, candidate_hash) = assert_matches!(
overseer_recv(virtual_overseer).await,
......@@ -485,6 +492,7 @@ async fn change_our_view(
continue;
}
candidates.push(candidate_hash);
if let Some((pov, persisted)) = chunk_data_per_candidate.get(&candidate_hash) {
let chunks = make_erasure_chunks(persisted.clone(), validator_public.len(), pov.clone());
......@@ -506,11 +514,15 @@ async fn change_our_view(
);
if let Some(peers) = send_chunks_to.get(&candidate_hash) {
expect_chunks_network_message(virtual_overseer, &peers, &[candidate_hash], &[chunk]).await;
send_peers.push(peers.clone());
send_chunks.push(chunk);
}
}
}
}
expect_chunks_network_message(virtual_overseer, &send_peers, &candidates, &send_chunks).await;
}
async fn setup_peer_with_view(
......@@ -725,17 +737,19 @@ fn reputation_verification() {
// Both peers send us this chunk already
chunks.remove(2);
expect_chunks_network_message(&mut virtual_overseer, &[peer_a.clone()], &[candidates[0].hash()], &chunks).await;
let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await;
overseer_send(&mut virtual_overseer, NetworkBridgeEvent::PeerViewChange(peer_b.clone(), view![current])).await;
expect_chunks_network_message(&mut virtual_overseer, &[peer_b.clone()], &[candidates[0].hash()], &chunks).await;
let send_peers = chunks.iter().map(|_| vec![peer_b.clone()]).collect::<Vec<_>>();
expect_chunks_network_message(&mut virtual_overseer, &send_peers, &[candidates[0].hash()], &chunks).await;
peer_send_message(&mut virtual_overseer, peer_a.clone(), valid.clone(), BENEFIT_VALID_MESSAGE_FIRST).await;
expect_chunks_network_message(
&mut virtual_overseer,
&[peer_b.clone()],
&[vec![peer_b.clone()]],
&[candidates[1].hash()],
&[valid.erasure_chunk.clone()],
).await;
......@@ -901,9 +915,10 @@ fn candidate_chunks_are_put_into_message_vault_when_candidate_is_first_seen() {
validator_public.len(),
pov_blocks[0].clone(),
);
let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
expect_chunks_network_message(
&mut virtual_overseer,
&[peer_a],
&send_peers,
&[candidates[0].hash()],
&chunks,
).await;
......@@ -1253,9 +1268,11 @@ fn new_peer_gets_all_chunks_send() {
chunks.push(valid.erasure_chunk);
let send_peers = chunks.iter().map(|_| vec![peer_a.clone()]).collect::<Vec<_>>();
expect_chunks_network_message(
&mut virtual_overseer,
&[peer_a],
&send_peers,
&[candidates[0].hash(), candidates[1].hash()],
&chunks,
).await;
......
......@@ -260,8 +260,8 @@ struct PeerData {
#[derive(Debug)]
enum Action {
SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol),
SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol),
SendValidationMessages(Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>),
SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),
ConnectToValidators {
validator_ids: Vec<AuthorityDiscoveryId>,
connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
......@@ -296,9 +296,13 @@ fn action_from_overseer_message(
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
NetworkBridgeMessage::SendValidationMessage(peers, msg)
=> Action::SendValidationMessage(peers, msg),
=> Action::SendValidationMessages(vec![(peers, msg)]),
NetworkBridgeMessage::SendCollationMessage(peers, msg)
=> Action::SendCollationMessage(peers, msg),
=> Action::SendCollationMessages(vec![(peers, msg)]),
NetworkBridgeMessage::SendValidationMessages(msgs)
=> Action::SendValidationMessages(msgs),
NetworkBridgeMessage::SendCollationMessages(msgs)
=> Action::SendCollationMessages(msgs),
NetworkBridgeMessage::ConnectToValidators { validator_ids, connected }
=> Action::ConnectToValidators { validator_ids, connected },
},
......@@ -623,19 +627,27 @@ where
Action::Nop => {}
Action::Abort => return Ok(()),
Action::SendValidationMessage(peers, msg) => send_message(
&mut network_service,
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
).await?,
Action::SendValidationMessages(msgs) => {
for (peers, msg) in msgs {
send_message(
&mut network_service,
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
).await?
}
}
Action::SendCollationMessage(peers, msg) => send_message(
&mut network_service,
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
).await?,
Action::SendCollationMessages(msgs) => {
for (peers, msg) in msgs {
send_message(
&mut network_service,
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
).await?
}
}
Action::ConnectToValidators {
validator_ids,
......
......@@ -203,6 +203,12 @@ pub enum NetworkBridgeMessage {
/// Send a message to one or more peers on the collation peer-set.
SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol),
/// Send a batch of validation messages.
SendValidationMessages(Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>),
/// Send a batch of collation messages.
SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),
/// Connect to peers who represent the given `validator_ids`.
///
/// Also ask the network to stay connected to these peers at least
......@@ -225,6 +231,8 @@ impl NetworkBridgeMessage {
Self::ReportPeer(_, _) => None,
Self::SendValidationMessage(_, _) => None,
Self::SendCollationMessage(_, _) => None,
Self::SendValidationMessages(_) => None,
Self::SendCollationMessages(_) => None,
Self::ConnectToValidators { .. } => None,
}
}
......
......@@ -86,11 +86,11 @@ Map the message onto the corresponding [Event Handler](#event-handlers) based on
- Adjust peer reputation according to cost or benefit provided
### SendValidationMessage
### SendValidationMessage / SendValidationMessages
- Issue a corresponding `ProtocolMessage` to each listed peer on the validation peer-set.
### SendCollationMessage
### SendCollationMessage / SendCollationMessages
- Issue a corresponding `ProtocolMessage` to each listed peer on the collation peer-set.
......
......@@ -320,7 +320,11 @@ enum NetworkBridgeMessage {
/// Send a message to one or more peers on the validation peerset.
SendValidationMessage([PeerId], ValidationProtocolV1),
/// Send a message to one or more peers on the collation peerset.
SendCollationMessage([PeerId], ValidationProtocolV1),
SendCollationMessage([PeerId], CollationProtocolV1),
/// Send multiple validation messages.
SendValidationMessages([([PeerId, ValidationProtocolV1])]),
/// Send multiple collation messages.
SendCollationMessages([([PeerId, ValidationProtocolV1])]),
/// Connect to peers who represent the given `validator_ids`.
///
/// Also ask the network to stay connected to these peers at least
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment