From b0b2b6799c269333bc823d630df5881d2b9bacb8 Mon Sep 17 00:00:00 2001
From: Dmitrii Markin <dmitry@markin.tech>
Date: Thu, 13 Oct 2022 12:24:31 +0300
Subject: [PATCH] Punish peers for duplicate GRANDPA neighbor messages (#12462)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

* Decrease peer reputation for duplicate GRANDPA neighbor messages.

* Fix comparison

* Fix update_peer_state() validity condition

* Add negative test

* Rework update_peer_state() validity condition, add tests

* update_peer_state() validity condition: invert comparison

* Split InvalidViewChange and DuplicateNeighborMessage misbehaviors

* Enforce rate-limiting of duplicate GRANDPA neighbor packets

* Update client/finality-grandpa/src/communication/gossip.rs

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>

* Make rolling clock back in a test safer

Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com>
---
 .../src/communication/gossip.rs               | 151 +++++++++++++-----
 .../finality-grandpa/src/communication/mod.rs |   8 +-
 .../src/communication/periodic.rs             |  17 +-
 3 files changed, 124 insertions(+), 52 deletions(-)

diff --git a/substrate/client/finality-grandpa/src/communication/gossip.rs b/substrate/client/finality-grandpa/src/communication/gossip.rs
index 1ba5e0da33c..218b4b668c1 100644
--- a/substrate/client/finality-grandpa/src/communication/gossip.rs
+++ b/substrate/client/finality-grandpa/src/communication/gossip.rs
@@ -35,7 +35,8 @@
 //! impolite to send messages about r+1 or later. "future-round" messages can
 //!  be dropped and ignored.
 //!
-//! It is impolite to send a neighbor packet which moves backwards in protocol state.
+//! It is impolite to send a neighbor packet which moves backwards or does not progress
+//! protocol state.
 //!
 //! This is beneficial if it conveys some progress in the protocol state of the peer.
 //!
@@ -97,7 +98,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound
 use sp_finality_grandpa::AuthorityId;
 use sp_runtime::traits::{Block as BlockT, NumberFor, Zero};
 
-use super::{benefit, cost, Round, SetId};
+use super::{benefit, cost, Round, SetId, NEIGHBOR_REBROADCAST_PERIOD};
 use crate::{environment, CatchUp, CompactCommit, SignedMessage};
 
 use std::{
@@ -148,14 +149,15 @@ enum Consider {
 /// A view of protocol state.
 #[derive(Debug)]
 struct View<N> {
-	round: Round,           // the current round we are at.
-	set_id: SetId,          // the current voter set id.
-	last_commit: Option<N>, // commit-finalized block height, if any.
+	round: Round,                 // the current round we are at.
+	set_id: SetId,                // the current voter set id.
+	last_commit: Option<N>,       // commit-finalized block height, if any.
+	last_update: Option<Instant>, // last time we heard from peer, used for spamming detection.
 }
 
 impl<N> Default for View<N> {
 	fn default() -> Self {
-		View { round: Round(1), set_id: SetId(0), last_commit: None }
+		View { round: Round(1), set_id: SetId(0), last_commit: None, last_update: None }
 	}
 }
 
@@ -225,7 +227,12 @@ impl<N> LocalView<N> {
 	/// Converts the local view to a `View` discarding round and set id
 	/// information about the last commit.
 	fn as_view(&self) -> View<&N> {
-		View { round: self.round, set_id: self.set_id, last_commit: self.last_commit_height() }
+		View {
+			round: self.round,
+			set_id: self.set_id,
+			last_commit: self.last_commit_height(),
+			last_update: None,
+		}
 	}
 
 	/// Update the set ID. implies a reset to round 1.
@@ -417,6 +424,8 @@ pub(super) struct FullCatchUpMessage<Block: BlockT> {
 pub(super) enum Misbehavior {
 	// invalid neighbor message, considering the last one.
 	InvalidViewChange,
+	// duplicate neighbor message.
+	DuplicateNeighborMessage,
 	// could not decode neighbor message. bytes-length of the packet.
 	UndecodablePacket(i32),
 	// Bad catch up message (invalid signatures).
@@ -438,6 +447,7 @@ impl Misbehavior {
 
 		match *self {
 			InvalidViewChange => cost::INVALID_VIEW_CHANGE,
+			DuplicateNeighborMessage => cost::DUPLICATE_NEIGHBOR_MESSAGE,
 			UndecodablePacket(bytes) => ReputationChange::new(
 				bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE),
 				"Grandpa: Bad packet",
@@ -488,20 +498,22 @@ struct Peers<N> {
 	second_stage_peers: HashSet<PeerId>,
 	/// The randomly picked set of `LUCKY_PEERS` light clients we'll gossip commit messages to.
 	lucky_light_peers: HashSet<PeerId>,
+	/// Neighbor packet rebroadcast period --- we reduce the reputation of peers sending duplicate
+	/// packets too often.
+	neighbor_rebroadcast_period: Duration,
 }
 
-impl<N> Default for Peers<N> {
-	fn default() -> Self {
+impl<N: Ord> Peers<N> {
+	fn new(neighbor_rebroadcast_period: Duration) -> Self {
 		Peers {
 			inner: Default::default(),
 			first_stage_peers: Default::default(),
 			second_stage_peers: Default::default(),
 			lucky_light_peers: Default::default(),
+			neighbor_rebroadcast_period,
 		}
 	}
-}
 
-impl<N: Ord> Peers<N> {
 	fn new_peer(&mut self, who: PeerId, role: ObservedRole) {
 		match role {
 			ObservedRole::Authority if self.first_stage_peers.len() < LUCKY_PEERS => {
@@ -547,10 +559,23 @@ impl<N: Ord> Peers<N> {
 			return Err(Misbehavior::InvalidViewChange)
 		}
 
+		let now = Instant::now();
+		let duplicate_packet = (update.set_id, update.round, Some(&update.commit_finalized_height)) ==
+			(peer.view.set_id, peer.view.round, peer.view.last_commit.as_ref());
+
+		if duplicate_packet {
+			if let Some(last_update) = peer.view.last_update {
+				if now < last_update + self.neighbor_rebroadcast_period / 2 {
+					return Err(Misbehavior::DuplicateNeighborMessage)
+				}
+			}
+		}
+
 		peer.view = View {
 			round: update.round,
 			set_id: update.set_id,
 			last_commit: Some(update.commit_finalized_height),
+			last_update: Some(now),
 		};
 
 		trace!(target: "afg", "Peer {} updated view. Now at {:?}, {:?}",
@@ -748,7 +773,7 @@ impl<Block: BlockT> Inner<Block> {
 
 		Inner {
 			local_view: None,
-			peers: Peers::default(),
+			peers: Peers::new(NEIGHBOR_REBROADCAST_PERIOD),
 			live_topics: KeepTopics::new(),
 			next_rebroadcast: Instant::now() + REBROADCAST_AFTER,
 			authorities: Vec::new(),
@@ -758,13 +783,16 @@ impl<Block: BlockT> Inner<Block> {
 		}
 	}
 
-	/// Note a round in the current set has started.
+	/// Note a round in the current set has started. Does nothing if the last
+	/// call to the function was with the same `round`.
 	fn note_round(&mut self, round: Round) -> MaybeMessage<Block> {
 		{
 			let local_view = match self.local_view {
 				None => return None,
 				Some(ref mut v) =>
 					if v.round == round {
+						// Do not send neighbor packets out if `round` has not changed ---
+						// such behavior is punishable.
 						return None
 					} else {
 						v
@@ -803,6 +831,8 @@ impl<Block: BlockT> Inner<Block> {
 							);
 							self.authorities = authorities;
 						}
+						// Do not send neighbor packets out if the `set_id` has not changed ---
+						// such behavior is punishable.
 						return None
 					} else {
 						v
@@ -816,7 +846,9 @@ impl<Block: BlockT> Inner<Block> {
 		self.multicast_neighbor_packet()
 	}
 
-	/// Note that we've imported a commit finalizing a given block.
+	/// Note that we've imported a commit finalizing a given block. Does nothing if the last
+	/// call to the function was with the same or higher `finalized` number.
+	/// `set_id` & `round` are the ones the commit message is from.
 	fn note_commit_finalized(
 		&mut self,
 		round: Round,
@@ -1357,6 +1389,8 @@ impl<Block: BlockT> GossipValidator<Block> {
 	}
 
 	/// Note that we've imported a commit finalizing a given block.
+	/// `set_id` & `round` are the ones the commit message is from and not necessarily
+	/// the latest set ID & round started.
 	pub(super) fn note_commit_finalized<F>(
 		&self,
 		round: Round,
@@ -1647,11 +1681,12 @@ pub(super) struct PeerReport {
 
 #[cfg(test)]
 mod tests {
-	use super::{environment::SharedVoterSetState, *};
+	use super::{super::NEIGHBOR_REBROADCAST_PERIOD, environment::SharedVoterSetState, *};
 	use crate::communication;
 	use sc_network::config::Role;
 	use sc_network_gossip::Validator as GossipValidatorT;
 	use sp_core::{crypto::UncheckedFrom, H256};
+	use std::time::Instant;
 	use substrate_test_runtime_client::runtime::{Block, Header};
 
 	// some random config (not really needed)
@@ -1684,7 +1719,12 @@ mod tests {
 
 	#[test]
 	fn view_vote_rules() {
-		let view = View { round: Round(100), set_id: SetId(1), last_commit: Some(1000u64) };
+		let view = View {
+			round: Round(100),
+			set_id: SetId(1),
+			last_commit: Some(1000u64),
+			last_update: None,
+		};
 
 		assert_eq!(view.consider_vote(Round(98), SetId(1)), Consider::RejectPast);
 		assert_eq!(view.consider_vote(Round(1), SetId(0)), Consider::RejectPast);
@@ -1701,7 +1741,12 @@ mod tests {
 
 	#[test]
 	fn view_global_message_rules() {
-		let view = View { round: Round(100), set_id: SetId(2), last_commit: Some(1000u64) };
+		let view = View {
+			round: Round(100),
+			set_id: SetId(2),
+			last_commit: Some(1000u64),
+			last_update: None,
+		};
 
 		assert_eq!(view.consider_global(SetId(3), 1), Consider::RejectFuture);
 		assert_eq!(view.consider_global(SetId(3), 1000), Consider::RejectFuture);
@@ -1719,7 +1764,7 @@ mod tests {
 
 	#[test]
 	fn unknown_peer_cannot_be_updated() {
-		let mut peers = Peers::default();
+		let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
 		let id = PeerId::random();
 
 		let update =
@@ -1750,27 +1795,35 @@ mod tests {
 		let update4 =
 			NeighborPacket { round: Round(3), set_id: SetId(11), commit_finalized_height: 80 };
 
-		let mut peers = Peers::default();
+		// Use shorter rebroadcast period to safely roll the clock back in the last test
+		// and don't hit the system boot time on systems with unsigned time.
+		const SHORT_NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(1);
+		let mut peers = Peers::new(SHORT_NEIGHBOR_REBROADCAST_PERIOD);
 		let id = PeerId::random();
 
 		peers.new_peer(id, ObservedRole::Authority);
 
-		let mut check_update = move |update: NeighborPacket<_>| {
+		let check_update = |peers: &mut Peers<_>, update: NeighborPacket<_>| {
 			let view = peers.update_peer_state(&id, update.clone()).unwrap().unwrap();
 			assert_eq!(view.round, update.round);
 			assert_eq!(view.set_id, update.set_id);
 			assert_eq!(view.last_commit, Some(update.commit_finalized_height));
 		};
 
-		check_update(update1);
-		check_update(update2);
-		check_update(update3);
-		check_update(update4);
+		check_update(&mut peers, update1);
+		check_update(&mut peers, update2);
+		check_update(&mut peers, update3);
+		check_update(&mut peers, update4.clone());
+
+		// Allow duplicate neighbor packets if enough time has passed.
+		peers.inner.get_mut(&id).unwrap().view.last_update =
+			Some(Instant::now() - SHORT_NEIGHBOR_REBROADCAST_PERIOD);
+		check_update(&mut peers, update4);
 	}
 
 	#[test]
 	fn invalid_view_change() {
-		let mut peers = Peers::default();
+		let mut peers = Peers::new(NEIGHBOR_REBROADCAST_PERIOD);
 
 		let id = PeerId::random();
 		peers.new_peer(id, ObservedRole::Authority);
@@ -1783,29 +1836,41 @@ mod tests {
 			.unwrap()
 			.unwrap();
 
-		let mut check_update = move |update: NeighborPacket<_>| {
+		let mut check_update = move |update: NeighborPacket<_>, misbehavior| {
 			let err = peers.update_peer_state(&id, update.clone()).unwrap_err();
-			assert_eq!(err, Misbehavior::InvalidViewChange);
+			assert_eq!(err, misbehavior);
 		};
 
 		// round moves backwards.
-		check_update(NeighborPacket {
-			round: Round(9),
-			set_id: SetId(10),
-			commit_finalized_height: 10,
-		});
-		// commit finalized height moves backwards.
-		check_update(NeighborPacket {
-			round: Round(10),
-			set_id: SetId(10),
-			commit_finalized_height: 9,
-		});
+		check_update(
+			NeighborPacket { round: Round(9), set_id: SetId(10), commit_finalized_height: 10 },
+			Misbehavior::InvalidViewChange,
+		);
 		// set ID moves backwards.
-		check_update(NeighborPacket {
-			round: Round(10),
-			set_id: SetId(9),
-			commit_finalized_height: 10,
-		});
+		check_update(
+			NeighborPacket { round: Round(10), set_id: SetId(9), commit_finalized_height: 10 },
+			Misbehavior::InvalidViewChange,
+		);
+		// commit finalized height moves backwards.
+		check_update(
+			NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 9 },
+			Misbehavior::InvalidViewChange,
+		);
+		// duplicate packet without grace period.
+		check_update(
+			NeighborPacket { round: Round(10), set_id: SetId(10), commit_finalized_height: 10 },
+			Misbehavior::DuplicateNeighborMessage,
+		);
+		// commit finalized height moves backwards while round moves forward.
+		check_update(
+			NeighborPacket { round: Round(11), set_id: SetId(10), commit_finalized_height: 9 },
+			Misbehavior::InvalidViewChange,
+		);
+		// commit finalized height moves backwards while set ID moves forward.
+		check_update(
+			NeighborPacket { round: Round(10), set_id: SetId(11), commit_finalized_height: 9 },
+			Misbehavior::InvalidViewChange,
+		);
 	}
 
 	#[test]
diff --git a/substrate/client/finality-grandpa/src/communication/mod.rs b/substrate/client/finality-grandpa/src/communication/mod.rs
index 12cb2601f4c..75a7697812c 100644
--- a/substrate/client/finality-grandpa/src/communication/mod.rs
+++ b/substrate/client/finality-grandpa/src/communication/mod.rs
@@ -37,6 +37,7 @@ use std::{
 	pin::Pin,
 	sync::Arc,
 	task::{Context, Poll},
+	time::Duration,
 };
 
 use finality_grandpa::{
@@ -68,6 +69,9 @@ mod periodic;
 #[cfg(test)]
 pub(crate) mod tests;
 
+// How often to rebroadcast neighbor packets, in cases where no new packets are created.
+pub(crate) const NEIGHBOR_REBROADCAST_PERIOD: Duration = Duration::from_secs(2 * 60);
+
 pub mod grandpa_protocol_name {
 	use sc_chain_spec::ChainSpec;
 	use sc_network_common::protocol::ProtocolName;
@@ -103,6 +107,8 @@ mod cost {
 	pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "Grandpa: Unknown voter");
 
 	pub(super) const INVALID_VIEW_CHANGE: Rep = Rep::new(-500, "Grandpa: Invalid view change");
+	pub(super) const DUPLICATE_NEIGHBOR_MESSAGE: Rep =
+		Rep::new(-500, "Grandpa: Duplicate neighbor message without grace period");
 	pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
 	pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
 	pub(super) const PER_BLOCK_LOADED: i32 = -10;
@@ -279,7 +285,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
 		}
 
 		let (neighbor_packet_worker, neighbor_packet_sender) =
-			periodic::NeighborPacketWorker::new();
+			periodic::NeighborPacketWorker::new(NEIGHBOR_REBROADCAST_PERIOD);
 
 		NetworkBridge {
 			service,
diff --git a/substrate/client/finality-grandpa/src/communication/periodic.rs b/substrate/client/finality-grandpa/src/communication/periodic.rs
index e6d63beafc3..c001796b5ca 100644
--- a/substrate/client/finality-grandpa/src/communication/periodic.rs
+++ b/substrate/client/finality-grandpa/src/communication/periodic.rs
@@ -32,9 +32,6 @@ use super::gossip::{GossipMessage, NeighborPacket};
 use sc_network::PeerId;
 use sp_runtime::traits::{Block as BlockT, NumberFor};
 
-// How often to rebroadcast, in cases where no new packets are created.
-const REBROADCAST_AFTER: Duration = Duration::from_secs(2 * 60);
-
 /// A sender used to send neighbor packets to a background job.
 #[derive(Clone)]
 pub(super) struct NeighborPacketSender<B: BlockT>(
@@ -60,6 +57,7 @@ impl<B: BlockT> NeighborPacketSender<B> {
 /// implementation). Periodically it sends out the last packet in cases where no new ones arrive.
 pub(super) struct NeighborPacketWorker<B: BlockT> {
 	last: Option<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
+	rebroadcast_period: Duration,
 	delay: Delay,
 	rx: TracingUnboundedReceiver<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>,
 }
@@ -67,13 +65,16 @@ pub(super) struct NeighborPacketWorker<B: BlockT> {
 impl<B: BlockT> Unpin for NeighborPacketWorker<B> {}
 
 impl<B: BlockT> NeighborPacketWorker<B> {
-	pub(super) fn new() -> (Self, NeighborPacketSender<B>) {
+	pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender<B>) {
 		let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>(
 			"mpsc_grandpa_neighbor_packet_worker",
 		);
-		let delay = Delay::new(REBROADCAST_AFTER);
+		let delay = Delay::new(rebroadcast_period);
 
-		(NeighborPacketWorker { last: None, delay, rx }, NeighborPacketSender(tx))
+		(
+			NeighborPacketWorker { last: None, rebroadcast_period, delay, rx },
+			NeighborPacketSender(tx),
+		)
 	}
 }
 
@@ -85,7 +86,7 @@ impl<B: BlockT> Stream for NeighborPacketWorker<B> {
 		match this.rx.poll_next_unpin(cx) {
 			Poll::Ready(None) => return Poll::Ready(None),
 			Poll::Ready(Some((to, packet))) => {
-				this.delay.reset(REBROADCAST_AFTER);
+				this.delay.reset(this.rebroadcast_period);
 				this.last = Some((to.clone(), packet.clone()));
 
 				return Poll::Ready(Some((to, GossipMessage::<B>::from(packet))))
@@ -98,7 +99,7 @@ impl<B: BlockT> Stream for NeighborPacketWorker<B> {
 
 		// Getting this far here implies that the timer fired.
 
-		this.delay.reset(REBROADCAST_AFTER);
+		this.delay.reset(this.rebroadcast_period);
 
 		// Make sure the underlying task is scheduled for wake-up.
 		//
-- 
GitLab