From 5f6dc6bc6a378d227efc154076aea202aa911077 Mon Sep 17 00:00:00 2001
From: Robert Habermeier <rphmeier@gmail.com>
Date: Fri, 26 Oct 2018 19:19:12 +0200
Subject: [PATCH] apply authority set changes

---
 .../core/finality-grandpa/src/authorities.rs  | 87 +++++++++++++++--
 substrate/core/finality-grandpa/src/lib.rs    | 95 +++++++++++++++----
 2 files changed, 157 insertions(+), 25 deletions(-)

diff --git a/substrate/core/finality-grandpa/src/authorities.rs b/substrate/core/finality-grandpa/src/authorities.rs
index 0ed2bcb36c0..37712a7615f 100644
--- a/substrate/core/finality-grandpa/src/authorities.rs
+++ b/substrate/core/finality-grandpa/src/authorities.rs
@@ -54,12 +54,14 @@ impl<H, N> SharedAuthoritySet<H, N> {
 	}
 }
 
-impl<H, N: Add<Output=N> + Ord + Clone> SharedAuthoritySet<H, N> {
+impl<H: Eq, N: Add<Output=N> + Ord + Clone> SharedAuthoritySet<H, N> {
 	/// Note an upcoming pending transition.
 	pub(crate) fn add_pending_change(&self, pending: PendingChange<H, N>) {
+		// ordered first by effective number and then by signal-block number.
 		let mut inner = self.inner.write();
+		let key = (pending.effective_number(), pending.canon_height);
 		let idx = inner.pending_changes
-			.binary_search_by_key(&pending.effective_number(), |change| change.effective_number())
+			.binary_search_by_key(&key, |change| (change.effective_number(), change.canon_height))
 			.unwrap_or_else(|i| i);
 
 		inner.pending_changes.insert(idx, pending);
@@ -67,7 +69,17 @@ impl<H, N: Add<Output=N> + Ord + Clone> SharedAuthoritySet<H, N> {
 
 	/// Get the earliest limit-block number, if any.
 	pub(crate) fn current_limit(&self) -> Option<N> {
-		self.inner.read().pending_changes.get(0).map(|change| change.effective_number().clone())
+		self.inner.read().current_limit()
+	}
+
+	/// Get the current set ID. This is incremented every time the set changes.
+	pub(crate) fn set_id(&self) -> u64 {
+		self.inner.read().set_id
+	}
+
+	/// Execute a closure with the inner set mutably.
+	pub(crate) fn with_mut<F, U>(&self, f: F) -> U where F: FnOnce(&mut AuthoritySet<H, N>) -> U {
+		f(&mut *self.inner.write())
 	}
 }
 
@@ -78,18 +90,81 @@ impl<H, N> From<AuthoritySet<H, N>> for SharedAuthoritySet<H, N> {
 }
 
 /// A set of authorities.
-#[derive(Encode, Decode)]
+#[derive(Debug, Clone, Encode, Decode)]
 pub(crate) struct AuthoritySet<H, N> {
 	current_authorities: Vec<(AuthorityId, u64)>,
 	set_id: u64,
 	pending_changes: Vec<PendingChange<H, N>>,
 }
 
+impl<H, N> AuthoritySet<H, N> {
+	/// Get the earliest limit-block number, if any.
+	pub(crate) fn current_limit(&self) -> Option<N> {
+		self.pending_changes.get(0).map(|change| change.effective_number().clone())
+	}
+
+	/// Get the set identifier.
+	pub(crate) fn set_id(&self) -> u64 {
+		self.set_id
+	}
+
+	/// Get the current set id and a reference to the current authority set.
+	pub(crate) fn current(&self) -> (u64, &[(AuthorityId, u64)]) {
+		(self.set_id, &self.current_authorities[..])
+	}
+}
+
+impl<H: Eq, N: Ord + Debug> AuthoritySet<H, N> {
+	/// Apply or prune any pending transitions. Provide a closure that can be used to check for the
+	/// finalized block with given number.
+	///
+	/// Returns true when the set's representation has changed.
+	pub(crate) fn apply_changes<F, E>(&mut self, just_finalized: N, canonical: F) -> Result<bool, E>
+		where F: FnMut(N) -> Result<H, E>
+	{
+		let mut changed = false;
+		loop {
+			let remove_up_to = match self.pending_changes.first() {
+				None => break,
+				Some(change) => {
+					let effective_number = change.effective_number();
+					if effective_number > just_finalized { break }
+
+					// check if the block that signalled the change is canonical in
+					// our chain.
+					if canonical(change.canon_height)? == change.canon_hash {
+						// apply this change: make the set canonical
+						info!(target: "finality", "Applying authority set change scheduled at block #{:?}",
+							change.canon_height);
+
+						self.current_authorities = change.next_authorities.clone();
+						self.set_id += 1;
+
+						// discard any signalled changes
+						// that happened before or equal to the effective number of the change.
+						self.pending_changes.iter()
+							.take_while(|c| c.canon_height <= effective_number)
+							.count()
+					} else {
+						1 // prune out this entry; it's no longer relevant.
+					}
+				}
+			};
+
+			let remove_up_to = ::std::cmp::min(remove_up_to, self.pending_changes.len());
+			self.pending_changes.drain(..remove_up_to);
+			changed = true; // always changed because we strip at least the first change.
+		}
+
+		Ok(changed)
+	}
+}
+
 /// A pending change to the authority set.
 ///
 /// This will be applied when the announcing block is at some depth within
 /// the finalized chain.
-#[derive(Encode, Decode)]
+#[derive(Debug, Clone, Encode, Decode)]
 pub(crate) struct PendingChange<H, N> {
 	/// The new authorities and weights to apply.
 	pub(crate) next_authorities: Vec<(AuthorityId, u64)>,
@@ -103,7 +178,7 @@ pub(crate) struct PendingChange<H, N> {
 }
 
 impl<H, N: Add<Output=N> + Clone> PendingChange<H, N> {
-	/// Returns the effective number.
+	/// Returns the effective number this change will be applied at.
 	fn effective_number(&self) -> N {
 		self.canon_height.clone() + self.finalization_depth.clone()
 	}
diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs
index faf1611e1a3..2e907bf9fa0 100644
--- a/substrate/core/finality-grandpa/src/lib.rs
+++ b/substrate/core/finality-grandpa/src/lib.rs
@@ -47,11 +47,11 @@ extern crate parity_codec_derive;
 use futures::prelude::*;
 use futures::stream::Fuse;
 use futures::sync::mpsc;
-use client::{Client, ImportNotifications, backend::Backend, CallExecutor};
+use client::{Client, error::Error as ClientError, ImportNotifications, backend::Backend, CallExecutor};
 use codec::{Encode, Decode};
 use consensus_common::BlockImport;
 use runtime_primitives::traits::{
-	As, NumberFor, Block as BlockT, Header as HeaderT, DigestItemFor,
+	NumberFor, Block as BlockT, Header as HeaderT, DigestItemFor,
 };
 use runtime_primitives::{generic::BlockId, Justification};
 use substrate_primitives::{ed25519, AuthorityId, Blake2Hasher};
@@ -108,7 +108,7 @@ pub enum Error {
 	/// A blockchain error.
 	Blockchain(String),
 	/// Could not complete a round on disk.
-	CouldNotCompleteRound(::client::error::Error),
+	CouldNotCompleteRound(ClientError),
 	/// A timer failed to fire.
 	Timer(::tokio::timer::Error),
 }
@@ -421,6 +421,7 @@ pub struct Environment<B, E, Block: BlockT, N: Network> {
 	config: Config,
 	authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
 	network: N,
+	set_id: u64,
 }
 
 impl<Block: BlockT, B, E, N> grandpa::Chain<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N> where
@@ -459,13 +460,20 @@ impl<Block: BlockT, B, E, N> grandpa::Chain<Block::Hash, NumberFor<Block>> for E
 	}
 
 	fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor<Block>)> {
-		match self.inner.best_containing(block, None) {
+		// we refuse to vote beyond the current limit number where transitions are scheduled to
+		// occur.
+		// once blocks are finalized that make that transition irrelevant or activate it,
+		// we will proceed onwards. most of the time there will be no pending transition.
+		let limit = self.authority_set.current_limit();
+		match self.inner.best_containing(block, limit) {
 			Ok(Some(hash)) => {
 				let header = self.inner.header(&BlockId::Hash(hash)).ok()?
 					.expect("Header known to exist after `best_containing` call; qed");
 
 				Some((hash, header.number().clone()))
 			}
+			// Ok(None) can be returned when `block` is after `limit`. That might cause issues.
+			// might be better to return the header itself in this (rare) case.
 			Ok(None) => None,
 			Err(e) => {
 				debug!(target: "afg", "Encountered error finding best chain containing {:?}: {:?}", block, e);
@@ -486,7 +494,8 @@ pub struct ScheduledChange<N> {
 
 /// A GRANDPA-compatible DigestItem. This can describe when GRANDPA set changes
 /// are scheduled.
-// TODO: with specialization, do a blanket implementation so this trait
+//
+// With specialization, could do a blanket implementation so this trait
 // doesn't have to be implemented by users.
 pub trait CompatibleDigestItem<N> {
 	/// If this digest item notes a GRANDPA set change, return information about
@@ -494,6 +503,33 @@ pub trait CompatibleDigestItem<N> {
 	fn scheduled_change(&self) -> Option<ScheduledChange<N>> { None }
 }
 
+/// Signals either an early exit of a voter or an error.
+#[derive(Debug)]
+pub enum ExitOrError {
+	/// An error occurred.
+	Error(Error),
+	/// Early exit of the voter: the new set ID and the new authorities along with respective weights.
+	AuthoritiesChanged(u64, Vec<(AuthorityId, u64)>),
+}
+
+impl From<Error> for ExitOrError {
+	fn from(e: Error) -> Self {
+		ExitOrError::Error(e)
+	}
+}
+
+impl From<ClientError> for ExitOrError {
+	fn from(e: ClientError) -> Self {
+		ExitOrError::Error(Error::from(e))
+	}
+}
+
+impl From<grandpa::Error> for ExitOrError {
+	fn from(e: grandpa::Error) -> Self {
+		ExitOrError::Error(Error::from(e))
+	}
+}
+
 impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N> where
 	Block: 'static,
 	B: Backend<Block, Blake2Hasher> + 'static,
@@ -514,7 +550,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
 		SinkItem = ::grandpa::Message<Block::Hash, NumberFor<Block>>,
 		SinkError = Self::Error,
 	>>;
-	type Error = Error;
+	type Error = ExitOrError;
 
 	#[allow(unreachable_code)]
 	fn round_data(
@@ -528,24 +564,21 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
 		let prevote_timer = Delay::new(now + self.config.gossip_duration * 2);
 		let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
 
-		// TODO [now]: Get from shared authority set.
-		let set_id = unimplemented!();
-
 		// TODO: dispatch this with `mpsc::spawn`.
 		let incoming = checked_message_stream::<Block, _>(
 			round,
-			set_id,
+			self.set_id,
 			self.network.messages_for(round),
 			self.config.genesis_voters.clone(),
 		);
 
 		let (out_rx, outgoing) = outgoing_messages::<Block, _>(
 			round,
-			set_id,
+			self.set_id,
 			self.config.local_key.clone(),
 			self.config.genesis_voters.clone(),
 			self.network.clone(),
-		);
+		).sink_map_err(Into::into);
 
 		// schedule incoming messages from the network to be held until
 		// corresponding blocks are imported.
@@ -556,7 +589,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
 		);
 
 		// join incoming network messages with locally originating ones.
-		let incoming = Box::new(incoming.select(out_rx));
+		let incoming = Box::new(incoming.select(out_rx).map_err(Into::into));
 
 		// schedule network message cleanup when sink drops.
 		let outgoing = Box::new(ClearOnDrop {
@@ -580,21 +613,44 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f
 			.insert_aux(&[(LAST_COMPLETED_KEY, &encoded_state[..])], &[])
 		{
 			warn!(target: "afg", "Shutting down voter due to error bookkeeping last completed round in DB: {:?}", e);
-			Err(Error::CouldNotCompleteRound(e))
+			Err(Error::CouldNotCompleteRound(e).into())
 		} else {
 			Ok(())
 		}
 	}
 
 	fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>) -> Result<(), Self::Error> {
-		// TODO: don't unconditionally notify.
+		// ideally some handle to a synchronization oracle would be used
+		// to avoid unconditionally notifying.
 		if let Err(e) = self.inner.finalize_block(BlockId::Hash(hash), true) {
 			warn!(target: "afg", "Error applying finality to block {:?}: {:?}", (hash, number), e);
+
+			// we return without error because not being able to finalize (temporarily) is
+			// non-fatal.
+			return Ok(());
 		}
 
-		// we return without error in all cases because not being able to finalize is
-		// non-fatal.
-		Ok(())
+		self.authority_set.with_mut(|authority_set| {
+			let client = &self.inner;
+			let prior_id = authority_set.set_id();
+			let has_changed = authority_set.apply_changes(number, |canon_number| {
+				client.block_hash_from_id(&BlockId::number(canon_number))
+					.map(|h| h.expect("given number always less than newly-finalized number; \
+						thus there is a block with that number finalized already; qed"))
+			})?;
+
+			if has_changed {
+				// TODO [now]: write to disk. if it fails, exit the node.
+			}
+
+			let (new_id, set_ref) = authority_set.current();
+			if new_id != prior_id {
+				// the authority set has changed.
+				return Err(ExitOrError::AuthoritiesChanged(new_id, set_ref.to_vec()));
+			}
+
+			Ok(())
+		})
 	}
 
 	fn prevote_equivocation(
@@ -692,7 +748,7 @@ pub fn run_grandpa<B, E, Block: BlockT, N>(
 			))?
 	};
 
-	// TODO: attempt to load from disk.
+	// TODO [now]: attempt to load from disk.
 	let authority_set = SharedAuthoritySet::genesis(
 		voters.iter().map(|(&id, &weight)| (id, weight)).collect(),
 	);
@@ -707,6 +763,7 @@ pub fn run_grandpa<B, E, Block: BlockT, N>(
 		config,
 		voters,
 		network,
+		set_id: authority_set.set_id(),
 		authority_set,
 	});
 
-- 
GitLab