Unverified Commit bb91bedf authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Do not send messages twice in bitfield distribution (#2005)

* Do not send messages twice in bitfield distribution

This removes a bug which resulted in sending bitfield messages multiple
times by not checking if we already relayed them. Besides that it also
adds an optimization to not relay a message to a peer that send us
this message.

* Review comments

* Break some lines
parent ede19593
......@@ -4746,11 +4746,9 @@ dependencies = [
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-keystore",
"sp-application-crypto",
"sp-core",
"sp-keystore",
"tempfile",
"tracing",
"tracing-futures",
]
......
......@@ -20,9 +20,7 @@ bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
maplit = "1.0.2"
log = "0.4.11"
env_logger = "0.8.2"
assert_matches = "1.4.0"
tempfile = "3.1.0"
......@@ -121,11 +121,8 @@ impl PerRelayParentData {
peer: &PeerId,
validator: &ValidatorId,
) -> bool {
if let Some(set) = self.message_sent_to_peer.get(peer) {
!set.contains(validator)
} else {
false
}
self.message_sent_to_peer.get(peer).map(|v| !v.contains(validator)).unwrap_or(true)
&& self.message_received_from_peer.get(peer).map(|v| !v.contains(validator)).unwrap_or(true)
}
}
......@@ -321,21 +318,24 @@ where
))
.await;
let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
// pass on the bitfield distribution to all interested peers
let interested_peers = peer_views
.iter()
.filter_map(|(peer, view)| {
// check interest in the peer in this message's relay parent
if view.contains(&message.relay_parent) {
let message_needed = job_data.message_from_validator_needed_by_peer(&peer, &validator);
// track the message as sent for this peer
message_sent_to_peer
job_data.message_sent_to_peer
.entry(peer.clone())
.or_default()
.insert(validator.clone());
Some(peer.clone())
if message_needed {
Some(peer.clone())
} else {
None
}
} else {
None
}
......@@ -529,11 +529,7 @@ async fn handle_peer_view_change<Context>(
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
let current = state.peer_views.entry(origin.clone()).or_default();
let added: Vec<Hash> = view.difference(&*current).cloned().collect();
*current = view;
let added = state.peer_views.entry(origin.clone()).or_default().replace_difference(view).cloned().collect::<Vec<_>>();
// Send all messages we've seen before and the peer is now interested
// in to that peer.
......@@ -585,8 +581,7 @@ where
return;
};
let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
message_sent_to_peer
job_data.message_sent_to_peer
.entry(dest.clone())
.or_default()
.insert(validator.clone());
......@@ -755,7 +750,7 @@ mod test {
use polkadot_node_subsystem_util::TimeoutExt;
use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore};
use sp_application_crypto::AppKey;
use sc_keystore::LocalKeystore;
use sp_keystore::testing::KeyStore;
use std::sync::Arc;
use std::time::Duration;
use assert_matches::assert_matches;
......@@ -767,12 +762,6 @@ mod test {
];
}
macro_rules! peers {
( $( $peer:expr ),* $(,)? ) => [
vec![ $( $peer.clone() ),* ]
];
}
macro_rules! launch {
($fut:expr) => {
$fut
......@@ -816,7 +805,6 @@ mod test {
fn state_with_view(
view: View,
relay_parent: Hash,
keystore_path: &tempfile::TempDir,
) -> (ProtocolState, SigningContext, SyncCryptoStorePtr, ValidatorId) {
let mut state = ProtocolState::default();
......@@ -825,8 +813,7 @@ mod test {
parent_hash: relay_parent.clone(),
};
let keystore : SyncCryptoStorePtr = Arc::new(LocalKeystore::open(keystore_path.path(), None)
.expect("Creates keystore"));
let keystore : SyncCryptoStorePtr = Arc::new(KeyStore::new());
let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
.expect("generating sr25519 key not to fail");
......@@ -865,18 +852,20 @@ mod test {
};
// another validator not part of the validatorset
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
let keystore : SyncCryptoStorePtr = Arc::new(LocalKeystore::open(keystore_path.path(), None)
.expect("Creates keystore"));
let keystore : SyncCryptoStorePtr = Arc::new(KeyStore::new());
let malicious = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
.expect("Malicious key created");
let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
.expect("Malicious key created");
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
let signed =
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &malicious.into()))
.expect("should be signed");
let signed = executor::block_on(Signed::<AvailabilityBitfield>::sign(
&keystore,
payload,
&signing_context,
0,
&malicious.into(),
)).expect("should be signed");
let msg = BitfieldGossipMessage {
relay_parent: hash_a.clone(),
......@@ -929,17 +918,19 @@ mod test {
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
state.peer_views.insert(peer_b.clone(), view![hash_a]);
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
let signed =
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 42, &validator))
.expect("should be signed");
let signed = executor::block_on(Signed::<AvailabilityBitfield>::sign(
&keystore,
payload,
&signing_context,
42,
&validator,
)).expect("should be signed");
let msg = BitfieldGossipMessage {
relay_parent: hash_a.clone(),
......@@ -985,16 +976,18 @@ mod test {
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
let signed_bitfield =
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &validator))
.expect("should be signed");
let signed_bitfield = executor::block_on(Signed::<AvailabilityBitfield>::sign(
&keystore,
payload,
&signing_context,
0,
&validator,
)).expect("should be signed");
let msg = BitfieldGossipMessage {
relay_parent: hash_a.clone(),
......@@ -1085,6 +1078,101 @@ mod test {
});
}
#[test]
fn do_not_relay_message_twice() {
let _ = env_logger::builder()
.filter(None, log::LevelFilter::Trace)
.is_test(true)
.try_init();
let hash = Hash::random();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash.clone());
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
let signed_bitfield = executor::block_on(Signed::<AvailabilityBitfield>::sign(
&keystore,
payload,
&signing_context,
0,
&validator,
)).expect("should be signed");
state.peer_views.insert(peer_b.clone(), view![hash]);
state.peer_views.insert(peer_a.clone(), view![hash]);
let msg = BitfieldGossipMessage {
relay_parent: hash.clone(),
signed_availability: signed_bitfield.clone(),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
executor::block_on(async move {
relay_message(
&mut ctx,
state.per_relay_parent.get_mut(&hash).unwrap(),
&mut state.peer_views,
validator.clone(),
msg.clone(),
).await;
assert_matches!(
handle.recv().await,
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::Bitfield(h, signed)
)) => {
assert_eq!(h, hash);
assert_eq!(signed, signed_bitfield)
}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(peers, send_msg),
) => {
assert_eq!(2, peers.len());
assert!(peers.contains(&peer_a));
assert!(peers.contains(&peer_b));
assert_eq!(send_msg, msg.clone().into_validation_protocol());
}
);
// Relaying the message a second time shouldn't work.
relay_message(
&mut ctx,
state.per_relay_parent.get_mut(&hash).unwrap(),
&mut state.peer_views,
validator.clone(),
msg.clone(),
).await;
assert_matches!(
handle.recv().await,
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::Bitfield(h, signed)
)) => {
assert_eq!(h, hash);
assert_eq!(signed, signed_bitfield)
}
);
// There shouldn't be any other message
assert!(handle.recv().timeout(Duration::from_millis(10)).await.is_none());
});
}
#[test]
fn changing_view() {
let _ = env_logger::builder()
......@@ -1099,16 +1187,18 @@ mod test {
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);
let keystore_path = tempfile::tempdir().expect("Creates keystore path");
// validator 0 key pair
let (mut state, signing_context, keystore, validator) =
state_with_view(view![hash_a, hash_b], hash_a.clone(), &keystore_path);
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash_a, hash_b], hash_a.clone());
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
let signed_bitfield =
executor::block_on(Signed::<AvailabilityBitfield>::sign(&keystore, payload, &signing_context, 0, &validator))
.expect("should be signed");
let signed_bitfield = executor::block_on(Signed::<AvailabilityBitfield>::sign(
&keystore,
payload,
&signing_context,
0,
&validator,
)).expect("should be signed");
let msg = BitfieldGossipMessage {
relay_parent: hash_a.clone(),
......@@ -1160,17 +1250,6 @@ mod test {
}
);
// gossip to the network
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(NetworkBridgeMessage::SendValidationMessage (
peers, out_msg,
)) => {
assert_eq!(peers, peers![peer_b]);
assert_eq!(out_msg, msg.clone().into_validation_protocol());
}
);
// reputation change for peer B
assert_matches!(
handle.recv().await,
......@@ -1253,4 +1332,88 @@ mod test {
});
}
#[test]
fn do_not_send_message_back_to_origin() {
let _ = env_logger::builder()
.filter(None, log::LevelFilter::Trace)
.is_test(true)
.try_init();
let hash: Hash = [0; 32].into();
let peer_a = PeerId::random();
let peer_b = PeerId::random();
assert_ne!(peer_a, peer_b);
// validator 0 key pair
let (mut state, signing_context, keystore, validator) = state_with_view(view![hash], hash);
// create a signed message by validator 0
let payload = AvailabilityBitfield(bitvec![bitvec::order::Lsb0, u8; 1u8; 32]);
let signed_bitfield = executor::block_on(Signed::<AvailabilityBitfield>::sign(
&keystore,
payload,
&signing_context,
0,
&validator,
)).expect("should be signed");
state.peer_views.insert(peer_b.clone(), view![hash]);
state.peer_views.insert(peer_a.clone(), view![hash]);
let msg = BitfieldGossipMessage {
relay_parent: hash.clone(),
signed_availability: signed_bitfield.clone(),
};
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
executor::block_on(async move {
// send a first message
launch!(handle_network_msg(
&mut ctx,
&mut state,
&Default::default(),
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
msg.clone().into_network_message(),
),
));
assert_matches!(
handle.recv().await,
AllMessages::Provisioner(ProvisionerMessage::ProvisionableData(
_,
ProvisionableData::Bitfield(hash, signed)
)) => {
assert_eq!(hash, hash);
assert_eq!(signed, signed_bitfield)
}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendValidationMessage(peers, send_msg),
) => {
assert_eq!(1, peers.len());
assert!(peers.contains(&peer_a));
assert_eq!(send_msg, msg.clone().into_validation_protocol());
}
);
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, BENEFIT_VALID_MESSAGE_FIRST)
}
);
});
}
}
......@@ -166,6 +166,15 @@ impl<M> NetworkBridgeEvent<M> {
pub struct View(pub Vec<Hash>);
impl View {
/// Replace `self` with `new`.
///
/// Returns an iterator that will yield all elements of `new` that were not part of `self`.
pub fn replace_difference(&mut self, new: View) -> impl Iterator<Item = &Hash> {
let old = std::mem::replace(self, new);
self.0.iter().filter(move |h| !old.contains(h))
}
/// Returns an iterator of the hashes present in `Self` but not in `other`.
pub fn difference<'a>(&'a self, other: &'a View) -> impl Iterator<Item = &'a Hash> + 'a {
self.0.iter().filter(move |h| !other.contains(h))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment