Unverified Commit 46e39860 authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Flood protection for large statements. (#2984)

* Flood protection for large statements.

* Add test for flood protection.

* Doc improvements.
parent 79b37d30
Pipeline #137276 failed with stages
in 29 minutes and 59 seconds
......@@ -93,6 +93,9 @@ const VC_THRESHOLD: usize = 2;
const LOG_TARGET: &str = "parachain::statement-distribution";
/// Large statements should be rare.
const MAX_LARGE_STATEMENTS_PER_SENDER: usize = 20;
/// The statement distribution subsystem.
pub struct StatementDistribution {
/// Pointer to a keystore, which is required for determining this nodes validator index.
......@@ -194,6 +197,29 @@ struct PeerRelayParentKnowledge {
seconded_counts: HashMap<ValidatorIndex, VcPerPeerTracker>,
/// How many statements we've received for each candidate that we're aware of.
received_message_count: HashMap<CandidateHash, usize>,
/// How many large statements this peer already sent us.
///
/// Flood protection for large statements is rather hard and as soon as we get
/// https://github.com/paritytech/polkadot/issues/2979 implemented also no longer necessary.
/// Reason: We keep messages around until we fetched the payload, but if a node makes up
/// statements and never provides the data, we will keep it around for the slot duration. Not
/// even signature checking would help, as the sender, if a validator, can just sign arbitrary
/// invalid statements and will not face any consequences as long as it won't provide the
/// payload.
///
/// Quick and temporary fix, only accept `MAX_LARGE_STATEMENTS_PER_SENDER` per connected node.
///
/// Large statements should be rare, if they were not, we would run into problems anyways, as
/// we would not be able to distribute them in a timely manner. Therefore
/// `MAX_LARGE_STATEMENTS_PER_SENDER` can be set to a relatively small number. It is also not
/// per candidate hash, but in total as candidate hashes can be made up, as illustrated above.
///
/// An attacker could still try to fill up our memory, by repeatedly disconnecting and
/// connecting again with new peer ids, but we assume that the resulting effective bandwidth
/// for such an attack would be too low.
large_statement_count: usize,
}
impl PeerRelayParentKnowledge {
......@@ -318,6 +344,15 @@ impl PeerRelayParentKnowledge {
Ok(self.received_candidates.insert(candidate_hash.clone()))
}
/// Note a received large statement metadata.
fn receive_large_statement(&mut self) -> std::result::Result<(), Rep> {
if self.large_statement_count >= MAX_LARGE_STATEMENTS_PER_SENDER {
return Err(COST_APPARENT_FLOOD);
}
self.large_statement_count += 1;
Ok(())
}
/// This method does the same checks as `receive` without modifying the internal state.
/// Returns an error if the peer should not have sent us this message according to protocol
/// rules for flood protection.
......@@ -458,6 +493,17 @@ impl PeerData {
.ok_or(COST_UNEXPECTED_STATEMENT)?
.check_can_receive(fingerprint, max_message_count)
}
/// Basic flood protection for large statements.
fn receive_large_statement(
&mut self,
relay_parent: &Hash,
) -> std::result::Result<(), Rep> {
self.view_knowledge
.get_mut(relay_parent)
.ok_or(COST_UNEXPECTED_STATEMENT)?
.receive_large_statement()
}
}
// A statement stored while a relay chain head is active.
......@@ -1278,6 +1324,20 @@ async fn handle_incoming_message<'a>(
}
};
if let protocol_v1::StatementDistributionMessage::LargeStatement(_) = message {
if let Err(rep) = peer_data.receive_large_statement(&relay_parent) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?message,
?rep,
"Unexpected large statement.",
);
report_peer(ctx, peer, rep).await;
return None;
}
}
let fingerprint = message.get_fingerprint();
let candidate_hash = fingerprint.0.candidate_hash().clone();
let handle_incoming_span = active_head.span.child("handle-incoming")
......@@ -3471,6 +3531,176 @@ mod tests {
executor::block_on(future::join(test_fut, bg));
}
#[test]
fn peer_cant_flood_with_large_statements() {
sp_tracing::try_init_simple();
let hash_a = Hash::repeat_byte(1);
let candidate = {
let mut c = CommittedCandidateReceipt::default();
c.descriptor.relay_parent = hash_a;
c.descriptor.para_id = 1.into();
c.commitments.new_validation_code = Some(ValidationCode(vec![1,2,3]));
c
};
let peer_a = PeerId::random(); // Alice
let validators = vec![
Sr25519Keyring::Alice.pair(),
Sr25519Keyring::Bob.pair(),
Sr25519Keyring::Charlie.pair(),
// other group
Sr25519Keyring::Dave.pair(),
// We:
Sr25519Keyring::Ferdie.pair(),
];
let first_group = vec![0,1,2,4];
let session_info = make_session_info(
validators,
vec![first_group, vec![3]]
);
let session_index = 1;
let pool = sp_core::testing::TaskExecutor::new();
let (ctx, mut handle) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let bg = async move {
let s = StatementDistribution { metrics: Default::default(), keystore: make_ferdie_keystore()};
s.run(ctx).await.unwrap();
};
let (_, rx_reqs) = mpsc::channel(1);
let test_fut = async move {
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::StatementFetchingReceiver(rx_reqs)
}).await;
// register our active heads.
handle.send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: vec![ActivatedLeaf {
hash: hash_a,
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}].into(),
deactivated: vec![].into(),
}))).await;
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionIndexForChild(tx))
)
if r == hash_a
=> {
let _ = tx.send(Ok(session_index));
}
);
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(r, RuntimeApiRequest::SessionInfo(sess_index, tx))
)
if r == hash_a && sess_index == session_index
=> {
let _ = tx.send(Ok(Some(session_info)));
}
);
// notify of peers and view
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_a.clone(),
ObservedRole::Full,
Some(Sr25519Keyring::Alice.public().into())
)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerViewChange(peer_a.clone(), view![hash_a])
)
}).await;
// receive a seconded statement from peer A.
let statement = {
let signing_context = SigningContext {
parent_hash: hash_a,
session_index,
};
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let alice_public = CryptoStore::sr25519_generate_new(
&*keystore, ValidatorId::ID, Some(&Sr25519Keyring::Alice.to_seed())
).await.unwrap();
SignedFullStatement::sign(
&keystore,
Statement::Seconded(candidate.clone()),
&signing_context,
ValidatorIndex(0),
&alice_public.into(),
).await.ok().flatten().expect("should be signed")
};
let metadata =
protocol_v1::StatementDistributionMessage::Statement(hash_a, statement.clone().into()).get_metadata();
for _ in 0..MAX_LARGE_STATEMENTS_PER_SENDER + 1 {
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_a.clone(),
protocol_v1::StatementDistributionMessage::LargeStatement(metadata.clone()),
)
)
}).await;
}
// We should try to fetch the data:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::SendRequests(
mut reqs, IfDisconnected::ImmediateError
)
) => {
let reqs = reqs.pop().unwrap();
let outgoing = match reqs {
Requests::StatementFetching(outgoing) => outgoing,
_ => panic!("Unexpected request"),
};
let req = outgoing.payload;
assert_eq!(req.relay_parent, metadata.relay_parent);
assert_eq!(req.candidate_hash, metadata.candidate_hash);
assert_eq!(outgoing.peer, Recipient::Peer(peer_a));
// Just drop request - should trigger error.
}
);
// Then we should punish peer:
assert_matches!(
handle.recv().await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(p, r)
) if p == peer_a && r == COST_APPARENT_FLOOD => {}
);
handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::pin_mut!(test_fut);
futures::pin_mut!(bg);
executor::block_on(future::join(test_fut, bg));
}
fn make_session_info(validators: Vec<Pair>, groups: Vec<Vec<u32>>) -> SessionInfo {
let validator_groups: Vec<Vec<ValidatorIndex>> = groups
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment