diff --git a/substrate/Cargo.lock b/substrate/Cargo.lock index 00d2eb47ef505b378bf5612901fad19c2e05ef5e..7b7cdd902113b6ede422005384128f8bc6840338 100644 --- a/substrate/Cargo.lock +++ b/substrate/Cargo.lock @@ -1940,6 +1940,7 @@ dependencies = [ "srml-democracy 0.1.0", "srml-executive 0.1.0", "srml-fees 0.1.0", + "srml-finality-tracker 0.1.0", "srml-grandpa 0.1.0", "srml-indices 0.1.0", "srml-session 0.1.0", @@ -3212,6 +3213,27 @@ dependencies = [ "substrate-primitives 0.1.0", ] +[[package]] +name = "srml-finality-tracker" +version = "0.1.0" +dependencies = [ + "hex-literal 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec-derive 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.87 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-io 0.1.0", + "sr-primitives 0.1.0", + "sr-std 0.1.0", + "srml-session 0.1.0", + "srml-support 0.1.0", + "srml-system 0.1.0", + "substrate-inherents 0.1.0", + "substrate-primitives 0.1.0", +] + [[package]] name = "srml-grandpa" version = "0.1.0" @@ -3222,6 +3244,7 @@ dependencies = [ "sr-io 0.1.0", "sr-primitives 0.1.0", "sr-std 0.1.0", + "srml-finality-tracker 0.1.0", "srml-session 0.1.0", "srml-support 0.1.0", "srml-system 0.1.0", @@ -3786,9 +3809,11 @@ dependencies = [ "parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", + "srml-finality-tracker 0.1.0", "substrate-client 0.1.0", "substrate-consensus-common 0.1.0", "substrate-finality-grandpa-primitives 0.1.0", + "substrate-inherents 0.1.0", "substrate-keyring 0.1.0", "substrate-network 0.1.0", "substrate-primitives 0.1.0", diff --git a/substrate/Cargo.toml b/substrate/Cargo.toml index e6e6921e15fd9d8c82a49ddf6ad42730dff1be15..699f9a158981f0b98c9e511cb81b95892343da1b 100644 --- a/substrate/Cargo.toml +++ b/substrate/Cargo.toml @@ -67,6 +67,7 @@ members = [ "srml/example", "srml/executive", "srml/fees", + "srml/finality-tracker", "srml/grandpa", "srml/indices", "srml/metadata", diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 8936477e40f2f100b78255d30001233cf81b1315..a170c7e202609a241c326f56a261724d6d8368e8 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -27,7 +27,7 @@ use runtime_primitives::{ }; use consensus::{ Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, - BlockOrigin, ForkChoiceStrategy + BlockOrigin, ForkChoiceStrategy, }; use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, Zero, As, NumberFor, CurrentHeight, BlockNumberToHash, @@ -619,9 +619,10 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where } /// Lock the import lock, and run operations inside. - pub fn lock_import_and_run<R, F: FnOnce(&mut ClientImportOperation<Block, Blake2Hasher, B>) -> error::Result<R>>( - &self, f: F - ) -> error::Result<R> { + pub fn lock_import_and_run<R, Err, F>(&self, f: F) -> Result<R, Err> where + F: FnOnce(&mut ClientImportOperation<Block, Blake2Hasher, B>) -> Result<R, Err>, + Err: From<error::Error>, + { let inner = || { let _import_lock = self.import_lock.lock(); @@ -827,7 +828,7 @@ impl<B, E, Block, RA> Client<B, E, Block, RA> where operation.notify_imported = Some((hash, origin, import_headers.into_post(), is_new_best, storage_changes)); } - Ok(ImportResult::Queued) + Ok(ImportResult::imported()) } fn block_execution( @@ -1391,13 +1392,15 @@ impl<B, E, Block, RA> consensus::BlockImport<Block> for Client<B, E, Block, RA> blockchain::BlockStatus::InChain => {}, blockchain::BlockStatus::Unknown => return Ok(ImportResult::UnknownParent), } + match self.backend.blockchain().status(BlockId::Hash(hash)) .map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))? { blockchain::BlockStatus::InChain => return Ok(ImportResult::AlreadyInChain), blockchain::BlockStatus::Unknown => {}, } - Ok(ImportResult::Queued) + + Ok(ImportResult::imported()) } } @@ -1414,7 +1417,7 @@ impl<B, E, Block, RA> consensus::Authorities<Block> for Client<B, E, Block, RA> impl<B, E, Block, RA> CurrentHeight for Client<B, E, Block, RA> where B: backend::Backend<Block, Blake2Hasher>, - E: CallExecutor<Block, Blake2Hasher> + Clone, + E: CallExecutor<Block, Blake2Hasher>, Block: BlockT<Hash=H256>, { type BlockNumber = <Block::Header as HeaderT>::Number; @@ -1425,7 +1428,7 @@ impl<B, E, Block, RA> CurrentHeight for Client<B, E, Block, RA> where impl<B, E, Block, RA> BlockNumberToHash for Client<B, E, Block, RA> where B: backend::Backend<Block, Blake2Hasher>, - E: CallExecutor<Block, Blake2Hasher> + Clone, + E: CallExecutor<Block, Blake2Hasher>, Block: BlockT<Hash=H256>, { type BlockNumber = <Block::Header as HeaderT>::Number; diff --git a/substrate/core/consensus/common/src/block_import.rs b/substrate/core/consensus/common/src/block_import.rs index 6cc5b329be9dee38b115a1a9782924e8dc2cac6e..0c2b5330b415d0686ebd06f3c6a584dd271874b0 100644 --- a/substrate/core/consensus/common/src/block_import.rs +++ b/substrate/core/consensus/common/src/block_import.rs @@ -23,19 +23,40 @@ use std::borrow::Cow; /// Block import result. #[derive(Debug, PartialEq, Eq)] pub enum ImportResult { - /// Added to the import queue. - Queued, - /// Already in the import queue. - AlreadyQueued, + /// Block imported. + Imported(ImportedAux), /// Already in the blockchain. AlreadyInChain, /// Block or parent is known to be bad. KnownBad, /// Block parent is not in the chain. UnknownParent, - /// Added to the import queue but must be justified - /// (usually required to safely enact consensus changes). - NeedsJustification, +} + +/// Auxiliary data associated with an imported block result. +#[derive(Debug, PartialEq, Eq)] +pub struct ImportedAux { + /// Clear all pending justification requests. + pub clear_justification_requests: bool, + /// Request a justification for the given block. + pub needs_justification: bool, +} + +impl Default for ImportedAux { + fn default() -> ImportedAux { + ImportedAux { + clear_justification_requests: false, + needs_justification: false, + } + } +} + +impl ImportResult { + /// Returns default value for `ImportResult::Imported` with both + /// `clear_justification_requests` and `needs_justification` set to false. + pub fn imported() -> ImportResult { + ImportResult::Imported(ImportedAux::default()) + } } /// Block data origin. diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 38f79804358b4cc8a866e70dc6356a7e6196f597..882fd7228401b64907b4affc2585b6a206fb3af4 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -24,7 +24,9 @@ //! The `BasicQueue` and `BasicVerifier` traits allow serial queues to be //! instantiated simply. -use crate::block_import::{ImportBlock, BlockImport, JustificationImport, ImportResult, BlockOrigin}; +use crate::block_import::{ + BlockImport, BlockOrigin, ImportBlock, ImportedAux, ImportResult, JustificationImport, +}; use crossbeam_channel::{self as channel, Receiver, Sender}; use std::sync::Arc; @@ -307,31 +309,37 @@ impl<B: BlockT> BlockImporter<B> { match result { Ok(BlockImportResult::ImportedKnown(number)) => link.block_imported(&hash, number), - Ok(BlockImportResult::ImportedUnknown(number)) => { - link.block_imported(&hash, number) - } - Ok(BlockImportResult::ImportedUnjustified(number)) => { + Ok(BlockImportResult::ImportedUnknown(number, aux)) => { link.block_imported(&hash, number); - link.request_justification(&hash, number); + + if aux.clear_justification_requests { + trace!(target: "sync", "Block imported clears all pending justification requests {}: {:?}", number, hash); + link.clear_justification_requests(); + } + + if aux.needs_justification { + trace!(target: "sync", "Block imported but requires justification {}: {:?}", number, hash); + link.request_justification(&hash, number); + } }, Err(BlockImportError::IncompleteHeader(who)) => { if let Some(peer) = who { link.note_useless_and_restart_sync(peer, "Sent block with incomplete header to import"); } - } + }, Err(BlockImportError::VerificationFailed(who, e)) => { if let Some(peer) = who { link.note_useless_and_restart_sync(peer, &format!("Verification failed: {}", e)); } - } + }, Err(BlockImportError::BadBlock(who)) => { if let Some(peer) = who { link.note_useless_and_restart_sync(peer, "Sent us a bad block"); } - } + }, Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { - link.restart() - } + link.restart(); + }, }; } if let Some(link) = self.link.as_ref() { @@ -448,13 +456,15 @@ impl<B: BlockT, V: 'static + Verifier<B>> BlockImportWorker<B, V> { /// algorithm. pub trait Link<B: BlockT>: Send { /// Block imported. - fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) { } + fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) {} /// Batch of blocks imported, with or without error. fn blocks_processed(&self, _processed_blocks: Vec<B::Hash>, _has_error: bool) {} /// Justification import result. - fn justification_imported(&self, _who: Origin, _hash: &B::Hash, _number: NumberFor<B>, _success: bool) { } + fn justification_imported(&self, _who: Origin, _hash: &B::Hash, _number: NumberFor<B>, _success: bool) {} + /// Clear all pending justification requests. + fn clear_justification_requests(&self) {} /// Request a justification for the given block. - fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) { } + fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {} /// Disconnect from peer. fn useless_peer(&self, _who: Origin, _reason: &str) {} /// Disconnect from peer and restart sync. @@ -469,9 +479,7 @@ pub enum BlockImportResult<N: ::std::fmt::Debug + PartialEq> { /// Imported known block. ImportedKnown(N), /// Imported unknown block. - ImportedUnknown(N), - /// Imported unjustified block that requires one. - ImportedUnjustified(N), + ImportedUnknown(N, ImportedAux), } /// Block import error. @@ -520,17 +528,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>( trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); Ok(BlockImportResult::ImportedKnown(number)) }, - Ok(ImportResult::AlreadyQueued) => { - trace!(target: "sync", "Block already queued {}: {:?}", number, hash); - Ok(BlockImportResult::ImportedKnown(number)) - }, - Ok(ImportResult::Queued) => { - Ok(BlockImportResult::ImportedUnknown(number)) - }, - Ok(ImportResult::NeedsJustification) => { - trace!(target: "sync", "Block queued but requires justification {}: {:?}", number, hash); - Ok(BlockImportResult::ImportedUnjustified(number)) - }, + Ok(ImportResult::Imported(aux)) => Ok(BlockImportResult::ImportedUnknown(number, aux)), Ok(ImportResult::UnknownParent) => { debug!(target: "sync", "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent); Err(BlockImportError::UnknownParent) @@ -547,7 +545,7 @@ pub fn import_single_block<B: BlockT, V: Verifier<B>>( }; match import_error(import_handle.check_block(hash, parent))? { - BlockImportResult::ImportedUnknown(_) => (), + BlockImportResult::ImportedUnknown { .. } => (), r @ _ => return Ok(r), // Any other successfull result means that the block is already imported. } @@ -629,7 +627,7 @@ mod tests { assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); // Send an unknown - let results = vec![(Ok(BlockImportResult::ImportedUnknown(Default::default())), Default::default())]; + let results = vec![(Ok(BlockImportResult::ImportedUnknown(Default::default(), Default::default())), Default::default())]; let _ = result_sender.send(BlockImportWorkerMsg::Imported(results)).ok().unwrap(); assert_eq!(link_port.recv(), Ok(LinkMsg::BlockImported)); diff --git a/substrate/core/consensus/common/src/lib.rs b/substrate/core/consensus/common/src/lib.rs index 53ffe1fec3e4dd36083c83043f362bba11cee3cd..fb777b837a6a6b5ba0ccf87ea36d3fc1cf12b2f5 100644 --- a/substrate/core/consensus/common/src/lib.rs +++ b/substrate/core/consensus/common/src/lib.rs @@ -47,7 +47,9 @@ pub mod evaluation; const MAX_BLOCK_SIZE: usize = 4 * 1024 * 1024 + 512; pub use self::error::{Error, ErrorKind}; -pub use block_import::{BlockImport, JustificationImport, ImportBlock, BlockOrigin, ImportResult, ForkChoiceStrategy}; +pub use block_import::{ + BlockImport, BlockOrigin, ForkChoiceStrategy, ImportedAux, ImportBlock, ImportResult, JustificationImport, +}; /// Trait for getting the authorities at a given block. pub trait Authorities<B: Block> { diff --git a/substrate/core/finality-grandpa/Cargo.toml b/substrate/core/finality-grandpa/Cargo.toml index 9c17a5a77054efd51d88c5ac0dedb0ca66dd53d7..c84e798fe739206a0775bc1d5896c7a49cb21569 100644 --- a/substrate/core/finality-grandpa/Cargo.toml +++ b/substrate/core/finality-grandpa/Cargo.toml @@ -17,8 +17,10 @@ consensus_common = { package = "substrate-consensus-common", path = "../consensu substrate-primitives = { path = "../primitives" } substrate-telemetry = { path = "../telemetry" } client = { package = "substrate-client", path = "../client" } +inherents = { package = "substrate-inherents", path = "../../core/inherents" } network = { package = "substrate-network", path = "../network" } service = { package = "substrate-service", path = "../service", optional = true } +srml-finality-tracker = { path = "../../srml/finality-tracker" } fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "primitives" } grandpa = { package = "finality-grandpa", version = "0.6.0", features = ["derive-codec"] } diff --git a/substrate/core/finality-grandpa/primitives/src/lib.rs b/substrate/core/finality-grandpa/primitives/src/lib.rs index 4878b79efb9a8640805bfa2a667300b58f648b15..1069647f69ca07a487b67fd76a9f94bb68706919 100644 --- a/substrate/core/finality-grandpa/primitives/src/lib.rs +++ b/substrate/core/finality-grandpa/primitives/src/lib.rs @@ -61,6 +61,7 @@ decl_runtime_apis! { /// applied in the runtime after those N blocks have passed. /// /// The consensus protocol will coordinate the handoff externally. + #[api_version(2)] pub trait GrandpaApi { /// Check a digest for pending changes. /// Return `None` if there are no pending changes. @@ -77,6 +78,28 @@ decl_runtime_apis! { fn grandpa_pending_change(digest: &DigestFor<Block>) -> Option<ScheduledChange<NumberFor<Block>>>; + /// Check a digest for forced changes. + /// Return `None` if there are no forced changes. Otherwise, return a + /// tuple containing the pending change and the median last finalized + /// block number at the time the change was signalled. + /// + /// Added in version 2. + /// + /// Forced changes are applied after a delay of _imported_ blocks, + /// while pending changes are applied after a delay of _finalized_ blocks. + /// + /// Precedence towards earlier or later digest items can be given + /// based on the rules of the chain. + /// + /// No change should be scheduled if one is already and the delay has not + /// passed completely. + /// + /// This should be a pure function: i.e. as long as the runtime can interpret + /// the digest type it should return the same result regardless of the current + /// state. + fn grandpa_forced_change(digest: &DigestFor<Block>) + -> Option<(NumberFor<Block>, ScheduledChange<NumberFor<Block>>)>; + /// Get the current GRANDPA authorities and weights. This should not change except /// for when changes are scheduled and the corresponding delay has passed. /// diff --git a/substrate/core/finality-grandpa/src/authorities.rs b/substrate/core/finality-grandpa/src/authorities.rs index b6cc1baed3ee0ec63ad6be529c5cce3b4bf3d810..bb4992ef1d74ecbb2f4640fdff0dda71050cd620 100644 --- a/substrate/core/finality-grandpa/src/authorities.rs +++ b/substrate/core/finality-grandpa/src/authorities.rs @@ -39,17 +39,7 @@ impl<H, N> Clone for SharedAuthoritySet<H, N> { } } -impl<H, N> SharedAuthoritySet<H, N> -where H: PartialEq, - N: Ord, -{ - /// The genesis authority set. - pub(crate) fn genesis(initial: Vec<(Ed25519AuthorityId, u64)>) -> Self { - SharedAuthoritySet { - inner: Arc::new(RwLock::new(AuthoritySet::genesis(initial))) - } - } - +impl<H, N> SharedAuthoritySet<H, N> { /// Acquire a reference to the inner read-write lock. pub(crate) fn inner(&self) -> &RwLock<AuthoritySet<H, N>> { &*self.inner @@ -93,11 +83,18 @@ pub(crate) struct Status<H, N> { } /// A set of authorities. -#[derive(Debug, Clone, Encode, Decode)] +#[derive(Debug, Clone, Encode, Decode, PartialEq)] pub(crate) struct AuthoritySet<H, N> { - current_authorities: Vec<(Ed25519AuthorityId, u64)>, - set_id: u64, - pending_changes: ForkTree<H, N, PendingChange<H, N>>, + pub(crate) current_authorities: Vec<(Ed25519AuthorityId, u64)>, + pub(crate) set_id: u64, + // Tree of pending standard changes across forks. Standard changes are + // enacted on finality and must be enacted (i.e. finalized) in-order across + // a given branch + pub(crate) pending_standard_changes: ForkTree<H, N, PendingChange<H, N>>, + // Pending forced changes across different forks (at most one per fork). + // Forced changes are enacted on block depth (not finality), for this reason + // only one forced change should exist per fork. + pub(crate) pending_forced_changes: Vec<PendingChange<H, N>>, } impl<H, N> AuthoritySet<H, N> @@ -109,7 +106,8 @@ where H: PartialEq, AuthoritySet { current_authorities: initial, set_id: 0, - pending_changes: ForkTree::new(), + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), } } @@ -124,12 +122,7 @@ where N: Add<Output=N> + Ord + Clone + Debug, H: Clone + Debug { - /// Note an upcoming pending transition. Multiple pending changes on the - /// same branch can be added as long as they don't overlap. This method - /// assumes that changes on the same branch will be added in-order. The - /// given function `is_descendent_of` should return `true` if the second - /// hash (target) is a descendent of the first hash (base). - pub(crate) fn add_pending_change<F, E>( + fn add_standard_change<F, E>( &mut self, pending: PendingChange<H, N>, is_descendent_of: &F, @@ -140,65 +133,180 @@ where let hash = pending.canon_hash.clone(); let number = pending.canon_height.clone(); - debug!(target: "afg", "Inserting potential set change signalled at block {:?} \ + debug!(target: "afg", "Inserting potential standard set change signalled at block {:?} \ (delayed by {:?} blocks).", - (&number, &hash), pending.finalization_depth); + (&number, &hash), pending.delay); - self.pending_changes.import( + self.pending_standard_changes.import( hash.clone(), number.clone(), pending, is_descendent_of, )?; - debug!(target: "afg", "There are now {} alternatives for the next pending change (roots), \ - and a total of {} pending changes (across all forks).", - self.pending_changes.roots().count(), - self.pending_changes.iter().count(), + debug!(target: "afg", "There are now {} alternatives for the next pending standard change (roots), \ + and a total of {} pending standard changes (across all forks).", + self.pending_standard_changes.roots().count(), + self.pending_standard_changes.iter().count(), ); Ok(()) } - /// Inspect pending changes. Pending changes in the tree are traversed in pre-order. + fn add_forced_change<F, E>( + &mut self, + pending: PendingChange<H, N>, + is_descendent_of: &F, + ) -> Result<(), fork_tree::Error<E>> where + F: Fn(&H, &H) -> Result<bool, E>, + E: std::error::Error, + { + for change in self.pending_forced_changes.iter() { + if change.canon_hash == pending.canon_hash || + is_descendent_of(&change.canon_hash, &pending.canon_hash)? + { + return Err(fork_tree::Error::UnfinalizedAncestor); + } + } + + // ordered first by effective number and then by signal-block number. + let key = (pending.effective_number(), pending.canon_height.clone()); + let idx = self.pending_forced_changes + .binary_search_by_key(&key, |change| ( + change.effective_number(), + change.canon_height.clone(), + )) + .unwrap_or_else(|i| i); + + debug!(target: "afg", "Inserting potential forced set change at block {:?} \ + (delayed by {:?} blocks).", + (&pending.canon_height, &pending.canon_hash), pending.delay); + + self.pending_forced_changes.insert(idx, pending); + + debug!(target: "afg", "There are now {} pending forced changes.", self.pending_forced_changes.len()); + + Ok(()) + } + + /// Note an upcoming pending transition. Multiple pending standard changes + /// on the same branch can be added as long as they don't overlap. Forced + /// changes are restricted to one per fork. This method assumes that changes + /// on the same branch will be added in-order. The given function + /// `is_descendent_of` should return `true` if the second hash (target) is a + /// descendent of the first hash (base). + pub(crate) fn add_pending_change<F, E>( + &mut self, + pending: PendingChange<H, N>, + is_descendent_of: &F, + ) -> Result<(), fork_tree::Error<E>> where + F: Fn(&H, &H) -> Result<bool, E>, + E: std::error::Error, + { + match pending.delay_kind { + DelayKind::Best { .. } => { + self.add_forced_change(pending, is_descendent_of) + }, + DelayKind::Finalized => { + self.add_standard_change(pending, is_descendent_of) + }, + } + } + + /// Inspect pending changes. Standard pending changes are iterated first, + /// and the changes in the tree are traversed in pre-order, afterwards all + /// forced changes are iterated. pub(crate) fn pending_changes(&self) -> impl Iterator<Item=&PendingChange<H, N>> { - self.pending_changes.iter().map(|(_, _, c)| c) + self.pending_standard_changes.iter().map(|(_, _, c)| c) + .chain(self.pending_forced_changes.iter()) } - /// Get the earliest limit-block number, if any. If there are pending - /// changes across different forks, this method will return the earliest - /// effective number (across the different branches). + /// Get the earliest limit-block number, if any. If there are pending changes across + /// different forks, this method will return the earliest effective number (across the + /// different branches). Only standard changes are taken into account for the current + /// limit, since any existing forced change should preclude the voter from voting. pub(crate) fn current_limit(&self) -> Option<N> { - self.pending_changes.roots() + self.pending_standard_changes.roots() .min_by_key(|&(_, _, c)| c.effective_number()) .map(|(_, _, c)| c.effective_number()) } - /// Apply or prune any pending transitions. This method ensures that if - /// there are multiple changes in the same branch, finalizing this block - /// won't finalize past multiple transitions (i.e. transitions must be - /// finalized in-order). The given function `is_descendent_of` should return - /// `true` if the second hash (target) is a descendent of the first hash - /// (base). + /// Apply or prune any pending transitions based on a best-block trigger. + /// + /// Returns `Ok((median, new_set))` when a forced change has occurred. The + /// median represents the median last finalized block at the time the change + /// was signaled, and it should be used as the canon block when starting the + /// new grandpa voter. Only alters the internal state in this case. + /// + /// These transitions are always forced and do not lead to justifications + /// which light clients can follow. + pub(crate) fn apply_forced_changes<F, E>( + &self, + best_hash: H, + best_number: N, + is_descendent_of: &F, + ) -> Result<Option<(N, Self)>, E> + where F: Fn(&H, &H) -> Result<bool, E>, + { + let mut new_set = None; + + for change in self.pending_forced_changes.iter() + .take_while(|c| c.effective_number() <= best_number) // to prevent iterating too far + .filter(|c| c.effective_number() == best_number) + { + // check if the given best block is in the same branch as the block that signalled the change. + if is_descendent_of(&change.canon_hash, &best_hash)? { + // apply this change: make the set canonical + info!(target: "finality", "Applying authority set change forced at block #{:?}", + change.canon_height); + + let median_last_finalized = match change.delay_kind { + DelayKind::Best { ref median_last_finalized } => median_last_finalized.clone(), + _ => unreachable!("pending_forced_changes only contains forced changes; forced changes have delay kind Best; qed."), + }; + + new_set = Some((median_last_finalized, AuthoritySet { + current_authorities: change.next_authorities.clone(), + set_id: self.set_id + 1, + pending_standard_changes: ForkTree::new(), // new set, new changes. + pending_forced_changes: Vec::new(), + })); + + break; + } + + // we don't wipe forced changes until another change is + // applied + } + + Ok(new_set) + } + + /// Apply or prune any pending transitions based on a finality trigger. This + /// method ensures that if there are multiple changes in the same branch, + /// finalizing this block won't finalize past multiple transitions (i.e. + /// transitions must be finalized in-order). The given function + /// `is_descendent_of` should return `true` if the second hash (target) is a + /// descendent of the first hash (base). /// /// When the set has changed, the return value will be `Ok(Some((H, N)))` /// which is the canonical block where the set last changed (i.e. the given /// hash and number). - pub(crate) fn apply_changes<F, E>( + pub(crate) fn apply_standard_changes<F, E>( &mut self, finalized_hash: H, finalized_number: N, is_descendent_of: &F, ) -> Result<Status<H, N>, fork_tree::Error<E>> - where F: Fn(&H, &H) -> Result<bool, E>, - E: std::error::Error, + where F: Fn(&H, &H) -> Result<bool, E>, + E: std::error::Error, { let mut status = Status { changed: false, new_set_block: None, }; - match self.pending_changes.finalize_with_descendent_if( + match self.pending_standard_changes.finalize_with_descendent_if( &finalized_hash, finalized_number.clone(), is_descendent_of, @@ -207,6 +315,10 @@ where fork_tree::FinalizationResult::Changed(change) => { status.changed = true; + // if we are able to finalize any standard change then we can + // discard all pending forced changes (on different forks) + self.pending_forced_changes.clear(); + if let Some(change) = change { info!(target: "finality", "Applying authority set change scheduled at block #{:?}", change.canon_height); @@ -226,13 +338,13 @@ where Ok(status) } - /// Check whether the given finalized block number enacts any authority set - /// change (without triggering it), ensuring that if there are multiple - /// changes in the same branch, finalizing this block won't finalize past - /// multiple transitions (i.e. transitions must be finalized in-order). The - /// given function `is_descendent_of` should return `true` if the second - /// hash (target) is a descendent of the first hash (base). - pub fn enacts_change<F, E>( + /// Check whether the given finalized block number enacts any standard + /// authority set change (without triggering it), ensuring that if there are + /// multiple changes in the same branch, finalizing this block won't + /// finalize past multiple transitions (i.e. transitions must be finalized + /// in-order). The given function `is_descendent_of` should return `true` if + /// the second hash (target) is a descendent of the first hash (base). + pub fn enacts_standard_change<F, E>( &self, finalized_hash: H, finalized_number: N, @@ -241,7 +353,7 @@ where where F: Fn(&H, &H) -> Result<bool, E>, E: std::error::Error, { - self.pending_changes.finalizes_any_with_descendent_if( + self.pending_standard_changes.finalizes_any_with_descendent_if( &finalized_hash, finalized_number.clone(), is_descendent_of, @@ -250,27 +362,58 @@ where } } +/// Kinds of delays for pending changes. +#[derive(Debug, Clone, Encode, Decode, PartialEq)] +pub(crate) enum DelayKind<N> { + /// Depth in finalized chain. + Finalized, + /// Depth in best chain. The median last finalized block is calculated at the time the + /// change was signalled. + Best { median_last_finalized: N }, +} + /// A pending change to the authority set. /// /// This will be applied when the announcing block is at some depth within -/// the finalized chain. -#[derive(Debug, Clone, Encode, Decode, PartialEq)] +/// the finalized or unfinalized chain. +#[derive(Debug, Clone, Encode, PartialEq)] pub(crate) struct PendingChange<H, N> { /// The new authorities and weights to apply. pub(crate) next_authorities: Vec<(Ed25519AuthorityId, u64)>, - /// How deep in the finalized chain the announcing block must be + /// How deep in the chain the announcing block must be /// before the change is applied. - pub(crate) finalization_depth: N, + pub(crate) delay: N, /// The announcing block's height. pub(crate) canon_height: N, /// The announcing block's hash. pub(crate) canon_hash: H, + /// The delay kind. + pub(crate) delay_kind: DelayKind<N>, +} + +impl<H: Decode, N: Decode> Decode for PendingChange<H, N> { + fn decode<I: parity_codec::Input>(value: &mut I) -> Option<Self> { + let next_authorities = Decode::decode(value)?; + let delay = Decode::decode(value)?; + let canon_height = Decode::decode(value)?; + let canon_hash = Decode::decode(value)?; + + let delay_kind = DelayKind::decode(value).unwrap_or(DelayKind::Finalized); + + Some(PendingChange { + next_authorities, + delay, + canon_height, + canon_hash, + delay_kind, + }) + } } impl<H, N: Add<Output=N> + Clone> PendingChange<H, N> { /// Returns the effective number this change will be applied at. pub fn effective_number(&self) -> N { - self.canon_height.clone() + self.finalization_depth.clone() + self.canon_height.clone() + self.delay.clone() } } @@ -295,28 +438,32 @@ mod tests { let mut authorities = AuthoritySet { current_authorities: Vec::new(), set_id: 0, - pending_changes: ForkTree::new(), + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), }; let change_a = PendingChange { next_authorities: Vec::new(), - finalization_depth: 10, + delay: 10, canon_height: 5, canon_hash: "hash_a", + delay_kind: DelayKind::Finalized, }; let change_b = PendingChange { next_authorities: Vec::new(), - finalization_depth: 0, + delay: 0, canon_height: 5, canon_hash: "hash_b", + delay_kind: DelayKind::Finalized, }; let change_c = PendingChange { next_authorities: Vec::new(), - finalization_depth: 5, + delay: 5, canon_height: 10, canon_hash: "hash_c", + delay_kind: DelayKind::Finalized, }; authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(false)).unwrap(); @@ -327,9 +474,29 @@ mod tests { _ => unreachable!(), })).unwrap(); + // forced changes are iterated last + let change_d = PendingChange { + next_authorities: Vec::new(), + delay: 2, + canon_height: 1, + canon_hash: "hash_d", + delay_kind: DelayKind::Best { median_last_finalized: 0 }, + }; + + let change_e = PendingChange { + next_authorities: Vec::new(), + delay: 2, + canon_height: 0, + canon_hash: "hash_e", + delay_kind: DelayKind::Best { median_last_finalized: 0 }, + }; + + authorities.add_pending_change(change_d.clone(), &static_is_descendent_of(false)).unwrap(); + authorities.add_pending_change(change_e.clone(), &static_is_descendent_of(false)).unwrap(); + assert_eq!( authorities.pending_changes().collect::<Vec<_>>(), - vec![&change_b, &change_a, &change_c], + vec![&change_b, &change_a, &change_c, &change_e, &change_d], ); } @@ -338,7 +505,8 @@ mod tests { let mut authorities = AuthoritySet { current_authorities: Vec::new(), set_id: 0, - pending_changes: ForkTree::new(), + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), }; let set_a = vec![([1; 32].into(), 5)]; @@ -347,16 +515,18 @@ mod tests { // two competing changes at the same height on different forks let change_a = PendingChange { next_authorities: set_a.clone(), - finalization_depth: 10, + delay: 10, canon_height: 5, canon_hash: "hash_a", + delay_kind: DelayKind::Finalized, }; let change_b = PendingChange { next_authorities: set_b.clone(), - finalization_depth: 10, + delay: 10, canon_height: 5, canon_hash: "hash_b", + delay_kind: DelayKind::Finalized, }; authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(true)).unwrap(); @@ -368,7 +538,7 @@ mod tests { ); // finalizing "hash_c" won't enact the change signalled at "hash_a" but it will prune out "hash_b" - let status = authorities.apply_changes("hash_c", 11, &is_descendent_of(|base, hash| match (*base, *hash) { + let status = authorities.apply_standard_changes("hash_c", 11, &is_descendent_of(|base, hash| match (*base, *hash) { ("hash_a", "hash_c") => true, ("hash_b", "hash_c") => false, _ => unreachable!(), @@ -382,7 +552,7 @@ mod tests { ); // finalizing "hash_d" will enact the change signalled at "hash_a" - let status = authorities.apply_changes("hash_d", 15, &is_descendent_of(|base, hash| match (*base, *hash) { + let status = authorities.apply_standard_changes("hash_d", 15, &is_descendent_of(|base, hash| match (*base, *hash) { ("hash_a", "hash_d") => true, _ => unreachable!(), })).unwrap(); @@ -400,7 +570,8 @@ mod tests { let mut authorities = AuthoritySet { current_authorities: Vec::new(), set_id: 0, - pending_changes: ForkTree::new(), + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), }; let set_a = vec![([1; 32].into(), 5)]; @@ -409,16 +580,18 @@ mod tests { // two competing changes at the same height on different forks let change_a = PendingChange { next_authorities: set_a.clone(), - finalization_depth: 10, + delay: 10, canon_height: 5, canon_hash: "hash_a", + delay_kind: DelayKind::Finalized, }; let change_c = PendingChange { next_authorities: set_c.clone(), - finalization_depth: 10, + delay: 10, canon_height: 30, canon_hash: "hash_c", + delay_kind: DelayKind::Finalized, }; authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(true)).unwrap(); @@ -437,12 +610,12 @@ mod tests { }); // trying to finalize past `change_c` without finalizing `change_a` first - match authorities.apply_changes("hash_d", 40, &is_descendent_of) { + match authorities.apply_standard_changes("hash_d", 40, &is_descendent_of) { Err(fork_tree::Error::UnfinalizedAncestor) => {}, _ => unreachable!(), } - let status = authorities.apply_changes("hash_b", 15, &is_descendent_of).unwrap(); + let status = authorities.apply_standard_changes("hash_b", 15, &is_descendent_of).unwrap(); assert!(status.changed); assert_eq!(status.new_set_block, Some(("hash_b", 15))); @@ -450,7 +623,7 @@ mod tests { assert_eq!(authorities.set_id, 1); // after finalizing `change_a` it should be possible to finalize `change_c` - let status = authorities.apply_changes("hash_d", 40, &is_descendent_of).unwrap(); + let status = authorities.apply_standard_changes("hash_d", 40, &is_descendent_of).unwrap(); assert!(status.changed); assert_eq!(status.new_set_block, Some(("hash_d", 40))); @@ -459,20 +632,22 @@ mod tests { } #[test] - fn enacts_change_works() { + fn enacts_standard_change_works() { let mut authorities = AuthoritySet { current_authorities: Vec::new(), set_id: 0, - pending_changes: ForkTree::new(), + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), }; let set_a = vec![([1; 32].into(), 5)]; let change_a = PendingChange { next_authorities: set_a.clone(), - finalization_depth: 10, + delay: 10, canon_height: 5, canon_hash: "hash_a", + delay_kind: DelayKind::Finalized, }; authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(false)).unwrap(); @@ -484,12 +659,84 @@ mod tests { }); // "hash_c" won't finalize the existing change since it isn't a descendent - assert!(!authorities.enacts_change("hash_c", 15, &is_descendent_of).unwrap()); - + assert!(!authorities.enacts_standard_change("hash_c", 15, &is_descendent_of).unwrap()); // "hash_b" at depth 14 won't work either - assert!(!authorities.enacts_change("hash_b", 14, &is_descendent_of).unwrap()); + assert!(!authorities.enacts_standard_change("hash_b", 14, &is_descendent_of).unwrap()); // but it should work at depth 15 (change height + depth) - assert!(authorities.enacts_change("hash_b", 15, &is_descendent_of).unwrap()); + assert!(authorities.enacts_standard_change("hash_b", 15, &is_descendent_of).unwrap()); + } + + #[test] + fn forced_changes() { + let mut authorities = AuthoritySet { + current_authorities: Vec::new(), + set_id: 0, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + }; + + let set_a = vec![([1; 32].into(), 5)]; + let set_b = vec![([2; 32].into(), 5)]; + + let change_a = PendingChange { + next_authorities: set_a.clone(), + delay: 10, + canon_height: 5, + canon_hash: "hash_a", + delay_kind: DelayKind::Best { median_last_finalized: 42 }, + }; + + let change_b = PendingChange { + next_authorities: set_b.clone(), + delay: 10, + canon_height: 5, + canon_hash: "hash_b", + delay_kind: DelayKind::Best { median_last_finalized: 0 }, + }; + + authorities.add_pending_change(change_a, &static_is_descendent_of(false)).unwrap(); + authorities.add_pending_change(change_b, &static_is_descendent_of(false)).unwrap(); + + // there's an effective change triggered at block 15 but not a standard one. + // so this should do nothing. + assert!(!authorities.enacts_standard_change("hash_c", 15, &static_is_descendent_of(true)).unwrap()); + + // throw a standard change into the mix to prove that it's discarded + // for being on the same fork. + // + // NOTE: after https://github.com/paritytech/substrate/issues/1861 + // this should still be rejected based on the "span" rule -- it overlaps + // with another change on the same fork. + let change_c = PendingChange { + next_authorities: set_b.clone(), + delay: 3, + canon_height: 8, + canon_hash: "hash_a8", + delay_kind: DelayKind::Best { median_last_finalized: 0 }, + }; + + let is_descendent_of_a = is_descendent_of(|base: &&str, _| { + base.starts_with("hash_a") + }); + + assert!(authorities.add_pending_change(change_c, &is_descendent_of_a).is_err()); + + // too early. + assert!(authorities.apply_forced_changes("hash_a10", 10, &static_is_descendent_of(true)).unwrap().is_none()); + + // too late. + assert!(authorities.apply_forced_changes("hash_a16", 16, &static_is_descendent_of(true)).unwrap().is_none()); + + // on time -- chooses the right change. + assert_eq!( + authorities.apply_forced_changes("hash_a15", 15, &is_descendent_of_a).unwrap().unwrap(), + (42, AuthoritySet { + current_authorities: set_a, + set_id: 1, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + }) + ); } } diff --git a/substrate/core/finality-grandpa/src/aux_schema.rs b/substrate/core/finality-grandpa/src/aux_schema.rs new file mode 100644 index 0000000000000000000000000000000000000000..930329344b8fb4a3413bed94116f887fcb10293f --- /dev/null +++ b/substrate/core/finality-grandpa/src/aux_schema.rs @@ -0,0 +1,275 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +//! Schema for stuff in the aux-db. + +use parity_codec::{Encode, Decode}; +use client::backend::AuxStore; +use client::error::{Result as ClientResult, Error as ClientError, ErrorKind as ClientErrorKind}; +use fork_tree::ForkTree; +use grandpa::round::State as RoundState; +use substrate_primitives::Ed25519AuthorityId; +use log::{info, warn}; + +use crate::authorities::{AuthoritySet, SharedAuthoritySet, PendingChange, DelayKind}; +use crate::consensus_changes::{SharedConsensusChanges, ConsensusChanges}; +use crate::NewAuthoritySet; + +use std::fmt::Debug; +use std::sync::Arc; + +const VERSION_KEY: &[u8] = b"grandpa_schema_version"; +const SET_STATE_KEY: &[u8] = b"grandpa_completed_round"; +const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters"; +const CONSENSUS_CHANGES_KEY: &[u8] = b"grandpa_consensus_changes"; + +const CURRENT_VERSION: u32 = 1; + +/// The voter set state. +#[derive(Clone, Encode, Decode)] +pub enum VoterSetState<H, N> { + /// The voter set state, currently paused. + Paused(u64, RoundState<H, N>), + /// The voter set state, currently live. + Live(u64, RoundState<H, N>), +} + +impl<H: Clone, N: Clone> VoterSetState<H, N> { + /// Yields the current state. + pub(crate) fn round(&self) -> (u64, RoundState<H, N>) { + match *self { + VoterSetState::Paused(n, ref s) => (n, s.clone()), + VoterSetState::Live(n, ref s) => (n, s.clone()), + } + } +} + +type V0VoterSetState<H, N> = (u64, RoundState<H, N>); + +#[derive(Debug, Clone, Encode, Decode, PartialEq)] +struct V0PendingChange<H, N> { + next_authorities: Vec<(Ed25519AuthorityId, u64)>, + delay: N, + canon_height: N, + canon_hash: H, +} + +#[derive(Debug, Clone, Encode, Decode, PartialEq)] +struct V0AuthoritySet<H, N> { + current_authorities: Vec<(Ed25519AuthorityId, u64)>, + set_id: u64, + pending_changes: Vec<V0PendingChange<H, N>>, +} + +impl<H, N> Into<AuthoritySet<H, N>> for V0AuthoritySet<H, N> +where H: Clone + Debug + PartialEq, + N: Clone + Debug + Ord, +{ + fn into(self) -> AuthoritySet<H, N> { + let mut pending_standard_changes = ForkTree::new(); + + for old_change in self.pending_changes { + let new_change = PendingChange { + next_authorities: old_change.next_authorities, + delay: old_change.delay, + canon_height: old_change.canon_height, + canon_hash: old_change.canon_hash, + delay_kind: DelayKind::Finalized, + }; + + if let Err(err) = pending_standard_changes.import::<_, ClientError>( + new_change.canon_hash.clone(), + new_change.canon_height.clone(), + new_change, + // previously we only supported at most one pending change per fork + &|_, _| Ok(false), + ) { + warn!(target: "afg", "Error migrating pending authority set change: {:?}.", err); + warn!(target: "afg", "Node is in a potentially inconsistent state."); + } + } + + AuthoritySet { + current_authorities: self.current_authorities, + set_id: self.set_id, + pending_forced_changes: Vec::new(), + pending_standard_changes + } + } +} + +fn load_decode<B: AuxStore, T: Decode>(backend: &B, key: &[u8]) -> ClientResult<Option<T>> { + match backend.get_aux(key)? { + None => Ok(None), + Some(t) => T::decode(&mut &t[..]) + .ok_or_else( + || ClientErrorKind::Backend(format!("GRANDPA DB is corrupted.")).into(), + ) + .map(Some) + } +} + +/// Persistent data kept between runs. +pub(crate) struct PersistentData<H, N> { + pub(crate) authority_set: SharedAuthoritySet<H, N>, + pub(crate) consensus_changes: SharedConsensusChanges<H, N>, + pub(crate) set_state: VoterSetState<H, N>, +} + +/// Load or initialize persistent data from backend. +pub(crate) fn load_persistent<B, H, N, G>( + backend: &B, + genesis_hash: H, + genesis_number: N, + genesis_authorities: G, +) + -> ClientResult<PersistentData<H, N>> + where + B: AuxStore, + H: Debug + Decode + Encode + Clone + PartialEq, + N: Debug + Decode + Encode + Clone + Ord, + G: FnOnce() -> ClientResult<Vec<(Ed25519AuthorityId, u64)>> +{ + let version: Option<u32> = load_decode(backend, VERSION_KEY)?; + let consensus_changes = load_decode(backend, CONSENSUS_CHANGES_KEY)? + .unwrap_or_else(ConsensusChanges::<H, N>::empty); + + let make_genesis_round = move || RoundState::genesis((genesis_hash, genesis_number)); + + match version { + None => { + CURRENT_VERSION.using_encoded(|s| + backend.insert_aux(&[(VERSION_KEY, s)], &[]) + )?; + + if let Some(old_set) = load_decode::<_, V0AuthoritySet<H, N>>(backend, AUTHORITY_SET_KEY)? { + let new_set: AuthoritySet<H, N> = old_set.into(); + backend.insert_aux(&[(AUTHORITY_SET_KEY, new_set.encode().as_slice())], &[])?; + + let set_state = match load_decode::<_, V0VoterSetState<H, N>>(backend, SET_STATE_KEY)? { + Some((number, state)) => VoterSetState::Live(number, state), + None => VoterSetState::Live(0, make_genesis_round()), + }; + + return Ok(PersistentData { + authority_set: new_set.into(), + consensus_changes: Arc::new(consensus_changes.into()), + set_state, + }); + } + } + Some(1) => { + if let Some(set) = load_decode::<_, AuthoritySet<H, N>>(backend, AUTHORITY_SET_KEY)? { + let set_state = match load_decode::<_, VoterSetState<H, N>>(backend, SET_STATE_KEY)? { + Some(state) => state, + None => VoterSetState::Live(0, make_genesis_round()), + }; + + return Ok(PersistentData { + authority_set: set.into(), + consensus_changes: Arc::new(consensus_changes.into()), + set_state, + }); + } + } + Some(other) => return Err(ClientErrorKind::Backend( + format!("Unsupported GRANDPA DB version: {:?}", other) + ).into()), + } + + // genesis. + info!(target: "afg", "Loading GRANDPA authority set \ + from genesis on what appears to be first startup."); + + let genesis_set = AuthoritySet::genesis(genesis_authorities()?); + let genesis_state = VoterSetState::Live(0, make_genesis_round()); + backend.insert_aux( + &[ + (AUTHORITY_SET_KEY, genesis_set.encode().as_slice()), + (SET_STATE_KEY, genesis_state.encode().as_slice()), + ], + &[], + )?; + + Ok(PersistentData { + authority_set: genesis_set.into(), + set_state: genesis_state, + consensus_changes: Arc::new(consensus_changes.into()), + }) +} + +/// Update the authority set on disk after a change. +pub(crate) fn update_authority_set<H, N, F, R>( + set: &AuthoritySet<H, N>, + new_set: Option<&NewAuthoritySet<H, N>>, + write_aux: F +) -> R where + H: Encode + Clone, + N: Encode + Clone, + F: FnOnce(&[(&'static [u8], &[u8])]) -> R, +{ + // write new authority set state to disk. + let encoded_set = set.encode(); + + if let Some(new_set) = new_set { + // we also overwrite the "last completed round" entry with a blank slate + // because from the perspective of the finality gadget, the chain has + // reset. + let round_state = RoundState::genesis(( + new_set.canon_hash.clone(), + new_set.canon_number.clone(), + )); + let set_state = VoterSetState::Live(0, round_state); + let encoded = set_state.encode(); + + write_aux(&[ + (AUTHORITY_SET_KEY, &encoded_set[..]), + (SET_STATE_KEY, &encoded[..]), + ]) + } else { + write_aux(&[(AUTHORITY_SET_KEY, &encoded_set[..])]) + } +} + +/// Write voter set state. +pub(crate) fn write_voter_set_state<B, H, N>(backend: &B, state: &VoterSetState<H, N>) + -> ClientResult<()> + where B: AuxStore, H: Encode, N: Encode +{ + backend.insert_aux( + &[(SET_STATE_KEY, state.encode().as_slice())], + &[] + ) +} + +/// Update the consensus changes. +pub(crate) fn update_consensus_changes<H, N, F, R>( + set: &ConsensusChanges<H, N>, + write_aux: F +) -> R where + H: Encode + Clone, + N: Encode + Clone, + F: FnOnce(&[(&'static [u8], &[u8])]) -> R, +{ + write_aux(&[(CONSENSUS_CHANGES_KEY, set.encode().as_slice())]) +} + +#[cfg(test)] +pub(crate) fn load_authorities<B: AuxStore, H: Decode, N: Decode>(backend: &B) + -> Option<AuthoritySet<H, N>> { + load_decode::<_, AuthoritySet<H, N>>(backend, AUTHORITY_SET_KEY) + .expect("backend error") +} diff --git a/substrate/core/finality-grandpa/src/consensus_changes.rs b/substrate/core/finality-grandpa/src/consensus_changes.rs index 9d701152dd7deda53203b4cf1e60c8ff33cc9231..cbd7b30f8e7a5ebcb2883baf4ea0be20683357d6 100644 --- a/substrate/core/finality-grandpa/src/consensus_changes.rs +++ b/substrate/core/finality-grandpa/src/consensus_changes.rs @@ -23,11 +23,14 @@ pub(crate) struct ConsensusChanges<H, N> { pending_changes: Vec<(N, H)>, } -impl<H: Copy + PartialEq, N: Copy + Ord> ConsensusChanges<H, N> { +impl<H, N> ConsensusChanges<H, N> { /// Create empty consensus changes. pub(crate) fn empty() -> Self { ConsensusChanges { pending_changes: Vec::new(), } } +} + +impl<H: Copy + PartialEq, N: Copy + Ord> ConsensusChanges<H, N> { /// Note unfinalized change of consensus-related data. pub(crate) fn note_change(&mut self, at: (N, H)) { diff --git a/substrate/core/finality-grandpa/src/environment.rs b/substrate/core/finality-grandpa/src/environment.rs index 71424e8be9161089355263d97c8a9e59f9d78496..857d6eafd711326d7d7f8367b0e8decadb3a0fe6 100644 --- a/substrate/core/finality-grandpa/src/environment.rs +++ b/substrate/core/finality-grandpa/src/environment.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. -use std::fmt; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -22,6 +21,7 @@ use log::{debug, warn, info}; use parity_codec::Encode; use futures::prelude::*; use tokio::timer::Delay; +use parking_lot::RwLock; use client::{ backend::Backend, BlockchainEvents, CallExecutor, Client, error::Error as ClientError @@ -36,14 +36,42 @@ use runtime_primitives::traits::{ use substrate_primitives::{Blake2Hasher, ed25519,Ed25519AuthorityId, H256}; use crate::{ - AUTHORITY_SET_KEY, CONSENSUS_CHANGES_KEY, LAST_COMPLETED_KEY, - Commit, Config, Error, Network, Precommit, Prevote, LastCompleted, + Commit, Config, Error, Network, Precommit, Prevote, + CommandOrError, NewAuthoritySet, VoterCommand, }; -use crate::authorities::{AuthoritySet, SharedAuthoritySet}; + +use crate::authorities::SharedAuthoritySet; use crate::consensus_changes::SharedConsensusChanges; use crate::justification::GrandpaJustification; use crate::until_imported::UntilVoteTargetImported; +/// Data about a completed round. +pub(crate) type CompletedRound<H, N> = (u64, RoundState<H, N>); + +/// A read-only view of the last completed round. +pub(crate) struct LastCompletedRound<H, N> { + inner: RwLock<CompletedRound<H, N>>, +} + +impl<H: Clone, N: Clone> LastCompletedRound<H, N> { + /// Create a new tracker based on some starting last-completed round. + pub(crate) fn new(round: CompletedRound<H, N>) -> Self { + LastCompletedRound { inner: RwLock::new(round) } + } + + /// Read the last completed round. + pub(crate) fn read(&self) -> CompletedRound<H, N> { + self.inner.read().clone() + } + + // NOTE: not exposed outside of this module intentionally. + fn with<F, R>(&self, f: F) -> R + where F: FnOnce(&mut CompletedRound<H, N>) -> R + { + f(&mut *self.inner.write()) + } +} + /// The environment we run GRANDPA in. pub(crate) struct Environment<B, E, Block: BlockT, N: Network<Block>, RA> { pub(crate) inner: Arc<Client<B, E, Block, RA>>, @@ -53,6 +81,7 @@ pub(crate) struct Environment<B, E, Block: BlockT, N: Network<Block>, RA> { pub(crate) consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>, pub(crate) network: N, pub(crate) set_id: u64, + pub(crate) last_completed: LastCompletedRound<Block::Hash, NumberFor<Block>>, } impl<Block: BlockT<Hash=H256>, B, E, N, RA> grandpa::Chain<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N, RA> where @@ -166,54 +195,6 @@ impl<Block: BlockT<Hash=H256>, B, E, N, RA> grandpa::Chain<Block::Hash, NumberFo } } -/// A new authority set along with the canonical block it changed at. -#[derive(Debug)] -pub(crate) struct NewAuthoritySet<H, N> { - pub(crate) canon_number: N, - pub(crate) canon_hash: H, - pub(crate) set_id: u64, - pub(crate) authorities: Vec<(Ed25519AuthorityId, u64)>, -} - -/// Signals either an early exit of a voter or an error. -#[derive(Debug)] -pub(crate) enum ExitOrError<H, N> { - /// An error occurred. - Error(Error), - /// Early exit of the voter: the new set ID and the new authorities along with respective weights. - AuthoritiesChanged(NewAuthoritySet<H, N>), -} - -impl<H, N> From<Error> for ExitOrError<H, N> { - fn from(e: Error) -> Self { - ExitOrError::Error(e) - } -} - -impl<H, N> From<ClientError> for ExitOrError<H, N> { - fn from(e: ClientError) -> Self { - ExitOrError::Error(Error::Client(e)) - } -} - -impl<H, N> From<grandpa::Error> for ExitOrError<H, N> { - fn from(e: grandpa::Error) -> Self { - ExitOrError::Error(Error::from(e)) - } -} - -impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for ExitOrError<H, N> { } - -impl<H, N> fmt::Display for ExitOrError<H, N> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - ExitOrError::Error(ref e) => write!(f, "{:?}", e), - ExitOrError::AuthoritiesChanged(_) => write!(f, "restarting voter on new authorities"), - } - } -} - - impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, NumberFor<Block>> for Environment<B, E, Block, N, RA> where Block: 'static, B: Backend<Block, Blake2Hasher> + 'static, @@ -237,7 +218,7 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb SinkError = Self::Error, > + Send>; - type Error = ExitOrError<Block::Hash, NumberFor<Block>>; + type Error = CommandOrError<Block::Hash, NumberFor<Block>>; fn round_data( &self, @@ -295,17 +276,32 @@ impl<B, E, Block: BlockT<Hash=H256>, N, RA> voter::Environment<Block::Hash, Numb state.finalized.as_ref().map(|e| e.1), ); - let encoded_state = (round, state).encode(); - let res = Backend::insert_aux(&**self.inner.backend(), &[(LAST_COMPLETED_KEY, &encoded_state[..])], &[]); - if let Err(e) = res { - warn!(target: "afg", "Shutting down voter due to error bookkeeping last completed round in DB: {:?}", e); - Err(Error::Client(e).into()) - } else { + self.last_completed.with(|last_completed| { + let set_state = crate::aux_schema::VoterSetState::Live(round, state.clone()); + crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?; + + *last_completed = (round, state); // after writing to DB successfully. Ok(()) - } + }) } fn finalize_block(&self, hash: Block::Hash, number: NumberFor<Block>, round: u64, commit: Commit<Block>) -> Result<(), Self::Error> { + use client::blockchain::HeaderBackend; + + let status = self.inner.backend().blockchain().info()?; + if number <= status.finalized_number && self.inner.backend().blockchain().hash(number)? == Some(hash) { + // This can happen after a forced change (triggered by the finality tracker when finality is stalled), since + // the voter will be restarted at the median last finalized block, which can be lower than the local best + // finalized block. + warn!(target: "afg", "Re-finalized block #{:?} ({:?}) in the canonical chain, current best finalized is #{:?}", + hash, + number, + status.finalized_number, + ); + + return Ok(()); + } + finalize_block( &*self.inner, &self.authority_set, @@ -375,7 +371,7 @@ pub(crate) fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>( hash: Block::Hash, number: NumberFor<Block>, justification_or_commit: JustificationOrCommit<Block>, -) -> Result<(), ExitOrError<Block::Hash, NumberFor<Block>>> where +) -> Result<(), CommandOrError<Block::Hash, NumberFor<Block>>> where B: Backend<Block, Blake2Hasher>, E: CallExecutor<Block, Blake2Hasher> + Send + Sync, RA: Send + Sync, @@ -385,72 +381,43 @@ pub(crate) fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>( // FIXME #1483: clone only when changed let old_authority_set = authority_set.clone(); - // needed in case there is an authority set change, used for reverting in - // case of error - let mut old_last_completed = None; + // holds the old consensus changes in case it is changed below, needed for + // reverting in case of failure + let mut old_consensus_changes = None; let mut consensus_changes = consensus_changes.lock(); - let status = authority_set.apply_changes( - hash, - number, - &is_descendent_of(client, None), - ).map_err(|e| Error::Safety(e.to_string()))?; - - if status.changed { - // write new authority set state to disk. - let encoded_set = authority_set.encode(); - - let write_result = if let Some((ref canon_hash, ref canon_number)) = status.new_set_block { - // we also overwrite the "last completed round" entry with a blank slate - // because from the perspective of the finality gadget, the chain has - // reset. - let round_state = RoundState::genesis((*canon_hash, *canon_number)); - let last_completed: LastCompleted<_, _> = (0, round_state); - let encoded = last_completed.encode(); - - old_last_completed = Backend::get_aux(&**client.backend(), LAST_COMPLETED_KEY)?; - - Backend::insert_aux( - &**client.backend(), - &[ - (AUTHORITY_SET_KEY, &encoded_set[..]), - (LAST_COMPLETED_KEY, &encoded[..]), - ], - &[] - ) - } else { - Backend::insert_aux(&**client.backend(), &[(AUTHORITY_SET_KEY, &encoded_set[..])], &[]) - }; + let canon_at_height = |canon_number| { + // "true" because the block is finalized + canonical_at_height(client, (hash, number), true, canon_number) + }; - if let Err(e) = write_result { - warn!(target: "finality", "Failed to write updated authority set to disk. Bailing."); - warn!(target: "finality", "Node is in a potentially inconsistent state."); + let update_res: Result<_, Error> = client.lock_import_and_run(|import_op| { + let status = authority_set.apply_standard_changes( + hash, + number, + &is_descendent_of(client, None), + ).map_err(|e| Error::Safety(e.to_string()))?; - return Err(e.into()); - } - } + // check if this is this is the first finalization of some consensus changes + let (alters_consensus_changes, finalizes_consensus_changes) = consensus_changes + .finalize((number, hash), &canon_at_height)?; - // check if this is this is the first finalization of some consensus changes - let (alters_consensus_changes, finalizes_consensus_changes) = consensus_changes - .finalize((number, hash), |at_height| canonical_at_height(client, (hash, number), at_height))?; + if alters_consensus_changes { + old_consensus_changes = Some(consensus_changes.clone()); - // holds the old consensus changes in case it is changed below, needed for - // reverting in case of failure - let mut old_consensus_changes = None; - if alters_consensus_changes { - old_consensus_changes = Some(consensus_changes.clone()); + let write_result = crate::aux_schema::update_consensus_changes( + &*consensus_changes, + |insert| client.apply_aux(import_op, insert, &[]), + ); - let encoded = consensus_changes.encode(); - let write_result = Backend::insert_aux(&**client.backend(), &[(CONSENSUS_CHANGES_KEY, &encoded[..])], &[]); - if let Err(e) = write_result { - warn!(target: "finality", "Failed to write updated consensus changes to disk. Bailing."); - warn!(target: "finality", "Node is in a potentially inconsistent state."); + if let Err(e) = write_result { + warn!(target: "finality", "Failed to write updated consensus changes to disk. Bailing."); + warn!(target: "finality", "Node is in a potentially inconsistent state."); - return Err(e.into()); + return Err(e.into()); + } } - } - let aux = |authority_set: &AuthoritySet<Block::Hash, NumberFor<Block>>| { // NOTE: this code assumes that honest voters will never vote past a // transition block, thus we don't have to worry about the case where // we have a transition with `effective_block = N`, but we finalize @@ -496,12 +463,12 @@ pub(crate) fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>( // ideally some handle to a synchronization oracle would be used // to avoid unconditionally notifying. - client.finalize_block(BlockId::Hash(hash), justification, true).map_err(|e| { + client.apply_finality(import_op, BlockId::Hash(hash), justification, true).map_err(|e| { warn!(target: "finality", "Error applying finality to block {:?}: {:?}", (hash, number), e); e })?; - if let Some((canon_hash, canon_number)) = status.new_set_block { + let new_authorities = if let Some((canon_hash, canon_number)) = status.new_set_block { // the authority set has changed. let (new_id, set_ref) = authority_set.current(); @@ -511,54 +478,46 @@ pub(crate) fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>( info!("Applying GRANDPA set change to new set {:?}", set_ref); } - Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet { + Some(NewAuthoritySet { canon_hash, canon_number, set_id: new_id, authorities: set_ref.to_vec(), - })) + }) } else { - Ok(()) - } - }; - - match aux(&authority_set) { - Err(ExitOrError::Error(err)) => { - debug!(target: "afg", "Reverting authority set and/or consensus changes after block finalization error: {:?}", err); - - let mut revert_aux = Vec::new(); - - if status.changed { - revert_aux.push((AUTHORITY_SET_KEY, old_authority_set.encode())); - if let Some(old_last_completed) = old_last_completed { - revert_aux.push((LAST_COMPLETED_KEY, old_last_completed)); - } - - *authority_set = old_authority_set.clone(); - } - - if let Some(old_consensus_changes) = old_consensus_changes { - revert_aux.push((CONSENSUS_CHANGES_KEY, old_consensus_changes.encode())); - - *consensus_changes = old_consensus_changes; - } + None + }; - let write_result = Backend::insert_aux( - &**client.backend(), - revert_aux.iter().map(|(k, v)| (*k, &**v)).collect::<Vec<_>>().iter(), - &[], + if status.changed { + let write_result = crate::aux_schema::update_authority_set( + &authority_set, + new_authorities.as_ref(), + |insert| client.apply_aux(import_op, insert, &[]), ); if let Err(e) = write_result { - warn!(target: "finality", "Failed to revert consensus changes to disk. Bailing."); + warn!(target: "finality", "Failed to write updated authority set to disk. Bailing."); warn!(target: "finality", "Node is in a potentially inconsistent state."); return Err(e.into()); } + } + + Ok(new_authorities.map(VoterCommand::ChangeAuthorities)) + }); + + match update_res { + Ok(Some(command)) => Err(CommandOrError::VoterCommand(command)), + Ok(None) => Ok(()), + Err(e) => { + *authority_set = old_authority_set; + + if let Some(old_consensus_changes) = old_consensus_changes { + *consensus_changes = old_consensus_changes; + } - Err(ExitOrError::Error(err)) - }, - res => res, + Err(CommandOrError::Error(e)) + } } } @@ -567,27 +526,39 @@ pub(crate) fn finalize_block<B, Block: BlockT<Hash=H256>, E, RA>( pub(crate) fn canonical_at_height<B, E, Block: BlockT<Hash=H256>, RA>( client: &Client<B, E, Block, RA>, base: (Block::Hash, NumberFor<Block>), + base_is_canonical: bool, height: NumberFor<Block>, ) -> Result<Option<Block::Hash>, ClientError> where B: Backend<Block, Blake2Hasher>, E: CallExecutor<Block, Blake2Hasher> + Send + Sync, { - use runtime_primitives::traits::{One, Zero}; + use runtime_primitives::traits::{One, Zero, BlockNumberToHash}; if height > base.1 { return Ok(None); } if height == base.1 { - return Ok(Some(base.0)); + if base_is_canonical { + return Ok(Some(base.0)); + } else { + return Ok(client.block_number_to_hash(height)); + } + } else if base_is_canonical { + return Ok(client.block_number_to_hash(height)); } - let mut current = match client.header(&BlockId::Hash(base.0))? { + let one = NumberFor::<Block>::one(); + + // start by getting _canonical_ block with number at parent position and then iterating + // backwards by hash. + let mut current = match client.header(&BlockId::Number(base.1 - one))? { Some(header) => header, _ => return Ok(None), }; - let mut steps = base.1 - height; + // we've already checked that base > height above. + let mut steps = base.1 - height - one; while steps > NumberFor::<Block>::zero() { current = match client.header(&BlockId::Hash(*current.parent_hash()))? { @@ -595,7 +566,7 @@ pub(crate) fn canonical_at_height<B, E, Block: BlockT<Hash=H256>, RA>( _ => return Ok(None), }; - steps -= NumberFor::<Block>::one(); + steps -= one; } Ok(Some(current.hash())) diff --git a/substrate/core/finality-grandpa/src/import.rs b/substrate/core/finality-grandpa/src/import.rs index 11306f72195c5d614cdb24d8af0e2945efcc1e6a..3f4dbb3650a86eb027d472cfbe88ee851c3663d1 100644 --- a/substrate/core/finality-grandpa/src/import.rs +++ b/substrate/core/finality-grandpa/src/import.rs @@ -19,9 +19,12 @@ use std::sync::Arc; use log::{debug, trace, info}; use parity_codec::Encode; use futures::sync::mpsc; +use parking_lot::RwLockWriteGuard; use client::{blockchain, CallExecutor, Client}; +use client::blockchain::HeaderBackend; use client::backend::Backend; +use client::runtime_api::ApiExt; use consensus_common::{ BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult, JustificationImport, @@ -35,10 +38,10 @@ use runtime_primitives::traits::{ }; use substrate_primitives::{H256, Ed25519AuthorityId, Blake2Hasher}; -use crate::{AUTHORITY_SET_KEY, Error}; -use crate::authorities::SharedAuthoritySet; +use crate::{Error, CommandOrError, NewAuthoritySet, VoterCommand}; +use crate::authorities::{AuthoritySet, SharedAuthoritySet, DelayKind, PendingChange}; use crate::consensus_changes::SharedConsensusChanges; -use crate::environment::{finalize_block, is_descendent_of, ExitOrError, NewAuthoritySet}; +use crate::environment::{finalize_block, is_descendent_of}; use crate::justification::GrandpaJustification; /// A block-import handler for GRANDPA. @@ -53,7 +56,7 @@ use crate::justification::GrandpaJustification; pub struct GrandpaBlockImport<B, E, Block: BlockT<Hash=H256>, RA, PRA> { inner: Arc<Client<B, E, Block, RA>>, authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>, - authority_set_change: mpsc::UnboundedSender<NewAuthoritySet<Block::Hash, NumberFor<Block>>>, + send_voter_commands: mpsc::UnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>, consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>, api: Arc<PRA>, } @@ -80,7 +83,8 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> JustificationImport<Block> // request justifications for all pending changes for which change blocks have already been imported let authorities = self.authority_set.inner().read(); for pending_change in authorities.pending_changes() { - if pending_change.effective_number() > chain_info.finalized_number && + if pending_change.delay_kind == DelayKind::Finalized && + pending_change.effective_number() > chain_info.finalized_number && pending_change.effective_number() <= chain_info.best_number { let effective_block_hash = self.inner.best_containing( @@ -109,6 +113,266 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> JustificationImport<Block> } } +enum AppliedChanges<H, N> { + Standard, + Forced(NewAuthoritySet<H, N>), + None, +} + +impl<H, N> AppliedChanges<H, N> { + fn needs_justification(&self) -> bool { + match *self { + AppliedChanges::Standard => true, + AppliedChanges::Forced(_) | AppliedChanges::None => false, + } + } +} + +struct PendingSetChanges<'a, Block: 'a + BlockT> { + just_in_case: Option<( + AuthoritySet<Block::Hash, NumberFor<Block>>, + RwLockWriteGuard<'a, AuthoritySet<Block::Hash, NumberFor<Block>>>, + )>, + applied_changes: AppliedChanges<Block::Hash, NumberFor<Block>>, + do_pause: bool, +} + +impl<'a, Block: 'a + BlockT> PendingSetChanges<'a, Block> { + // revert the pending set change explicitly. + fn revert(self) { } + + fn defuse(mut self) -> (AppliedChanges<Block::Hash, NumberFor<Block>>, bool) { + self.just_in_case = None; + let applied_changes = ::std::mem::replace(&mut self.applied_changes, AppliedChanges::None); + (applied_changes, self.do_pause) + } +} + +impl<'a, Block: 'a + BlockT> Drop for PendingSetChanges<'a, Block> { + fn drop(&mut self) { + if let Some((old_set, mut authorities)) = self.just_in_case.take() { + *authorities = old_set; + } + } +} + +impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> GrandpaBlockImport<B, E, Block, RA, PRA> where + NumberFor<Block>: grandpa::BlockNumberOps, + B: Backend<Block, Blake2Hasher> + 'static, + E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync, + DigestFor<Block>: Encode, + DigestItemFor<Block>: DigestItem<AuthorityId=Ed25519AuthorityId>, + RA: Send + Sync, + PRA: ProvideRuntimeApi, + PRA::Api: GrandpaApi<Block>, +{ + // check for a new authority set change. + fn check_new_change(&self, header: &Block::Header, hash: Block::Hash) + -> Result<Option<PendingChange<Block::Hash, NumberFor<Block>>>, ConsensusError> + { + let at = BlockId::hash(*header.parent_hash()); + let digest = header.digest(); + + let api = self.api.runtime_api(); + + // check for forced change. + { + let maybe_change = api.grandpa_forced_change( + &at, + digest, + ); + + match maybe_change { + Err(e) => match api.has_api_with::<GrandpaApi<Block>, _>(&at, |v| v >= 2) { + Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), + Ok(true) => { + // API version is high enough to support forced changes + // but got error, so it is legitimate. + return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()) + }, + Ok(false) => { + // API version isn't high enough to support forced changes + }, + }, + Ok(None) => {}, + Ok(Some((median_last_finalized, change))) => return Ok(Some(PendingChange { + next_authorities: change.next_authorities, + delay: change.delay, + canon_height: *header.number(), + canon_hash: hash, + delay_kind: DelayKind::Best { median_last_finalized }, + })), + } + } + + // check normal scheduled change. + { + let maybe_change = api.grandpa_pending_change( + &at, + digest, + ); + + match maybe_change { + Err(e) => Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), + Ok(Some(change)) => Ok(Some(PendingChange { + next_authorities: change.next_authorities, + delay: change.delay, + canon_height: *header.number(), + canon_hash: hash, + delay_kind: DelayKind::Finalized, + })), + Ok(None) => Ok(None), + } + } + } + + fn make_authorities_changes<'a>(&'a self, block: &mut ImportBlock<Block>, hash: Block::Hash) + -> Result<PendingSetChanges<'a, Block>, ConsensusError> + { + // when we update the authorities, we need to hold the lock + // until the block is written to prevent a race if we need to restore + // the old authority set on error or panic. + struct InnerGuard<'a, T: 'a> { + old: Option<T>, + guard: Option<RwLockWriteGuard<'a, T>>, + } + + impl<'a, T: 'a> InnerGuard<'a, T> { + fn as_mut(&mut self) -> &mut T { + &mut **self.guard.as_mut().expect("only taken on deconstruction; qed") + } + + fn set_old(&mut self, old: T) { + if self.old.is_none() { + // ignore "newer" old changes. + self.old = Some(old); + } + } + + fn consume(mut self) -> Option<(T, RwLockWriteGuard<'a, T>)> { + if let Some(old) = self.old.take() { + Some((old, self.guard.take().expect("only taken on deconstruction; qed"))) + } else { + None + } + } + } + + impl<'a, T: 'a> Drop for InnerGuard<'a, T> { + fn drop(&mut self) { + if let (Some(mut guard), Some(old)) = (self.guard.take(), self.old.take()) { + *guard = old; + } + } + } + + let number = block.header.number().clone(); + let maybe_change = self.check_new_change( + &block.header, + hash, + )?; + + // returns a function for checking whether a block is a descendent of another + // consistent with querying client directly after importing the block. + let parent_hash = *block.header.parent_hash(); + let is_descendent_of = is_descendent_of(&self.inner, Some((&hash, &parent_hash))); + + let mut guard = InnerGuard { + guard: Some(self.authority_set.inner().write()), + old: None, + }; + + // whether to pause the old authority set -- happens after import + // of a forced change block. + let mut do_pause = false; + + // add any pending changes. + if let Some(change) = maybe_change { + let old = guard.as_mut().clone(); + guard.set_old(old); + + if let DelayKind::Best { .. } = change.delay_kind { + do_pause = true; + } + + guard.as_mut().add_pending_change( + change, + &is_descendent_of, + ).map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))?; + } + + let applied_changes = { + let forced_change_set = guard.as_mut().apply_forced_changes(hash, number, &is_descendent_of) + .map_err(|e| ConsensusErrorKind::ClientImport(e.to_string())) + .map_err(ConsensusError::from)?; + + if let Some((median_last_finalized_number, new_set)) = forced_change_set { + let new_authorities = { + let (set_id, new_authorities) = new_set.current(); + + // we will use the median last finalized number as a hint + // for the canon block the new authority set should start + // with. we use the minimum between the median and the local + // best finalized block. + let best_finalized_number = self.inner.backend().blockchain().info() + .map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()))? + .finalized_number; + + let canon_number = best_finalized_number.min(median_last_finalized_number); + + let canon_hash = + self.inner.backend().blockchain().header(BlockId::Number(canon_number)) + .map_err(|e| ConsensusErrorKind::ClientImport(e.to_string()))? + .expect("the given block number is less or equal than the current best finalized number; \ + current best finalized number must exist in chain; qed.") + .hash(); + + NewAuthoritySet { + canon_number, + canon_hash, + set_id, + authorities: new_authorities.to_vec(), + } + }; + let old = ::std::mem::replace(guard.as_mut(), new_set); + guard.set_old(old); + + AppliedChanges::Forced(new_authorities) + } else { + let did_standard = guard.as_mut().enacts_standard_change(hash, number, &is_descendent_of) + .map_err(|e| ConsensusErrorKind::ClientImport(e.to_string())) + .map_err(ConsensusError::from)?; + + if did_standard { + AppliedChanges::Standard + } else { + AppliedChanges::None + } + } + }; + + // consume the guard safely and write necessary changes. + let just_in_case = guard.consume(); + if let Some((_, ref authorities)) = just_in_case { + let authorities_change = match applied_changes { + AppliedChanges::Forced(ref new) => Some(new), + AppliedChanges::Standard => None, // the change isn't actually applied yet. + AppliedChanges::None => None, + }; + + crate::aux_schema::update_authority_set( + authorities, + authorities_change, + |insert| block.auxiliary.extend( + insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))) + ) + ); + } + + Ok(PendingSetChanges { just_in_case, applied_changes, do_pause }) + } +} + impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block> for GrandpaBlockImport<B, E, Block, RA, PRA> where NumberFor<Block>: grandpa::BlockNumberOps, @@ -125,12 +389,8 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block> fn import_block(&self, mut block: ImportBlock<Block>, new_authorities: Option<Vec<Ed25519AuthorityId>>) -> Result<ImportResult, Self::Error> { - use crate::authorities::PendingChange; - use client::blockchain::HeaderBackend; - let hash = block.post_header().hash(); - let parent_hash = *block.header.parent_hash(); - let number = *block.header.number(); + let number = block.header.number().clone(); // early exit if block already in chain, otherwise the check for // authority changes will error when trying to re-import a change block @@ -140,82 +400,68 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block> Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), } - let maybe_change = self.api.runtime_api().grandpa_pending_change( - &BlockId::hash(parent_hash), - &block.header.digest().clone(), - ); - - let maybe_change = match maybe_change { - Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), - Ok(maybe) => maybe, - }; - - // when we update the authorities, we need to hold the lock - // until the block is written to prevent a race if we need to restore - // the old authority set on error. - let is_descendent_of = is_descendent_of(&self.inner, Some((&hash, &parent_hash))); - let just_in_case = if let Some(change) = maybe_change { - let mut authorities = self.authority_set.inner().write(); - let old_set = authorities.clone(); - - authorities.add_pending_change( - PendingChange { - next_authorities: change.next_authorities, - finalization_depth: change.delay, - canon_height: number, - canon_hash: hash, - }, - &is_descendent_of, - ).map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))?; - - block.auxiliary.push((AUTHORITY_SET_KEY.to_vec(), Some(authorities.encode()))); - Some((old_set, authorities)) - } else { - None - }; + let pending_changes = self.make_authorities_changes(&mut block, hash)?; // we don't want to finalize on `inner.import_block` let justification = block.justification.take(); let enacts_consensus_change = new_authorities.is_some(); let import_result = self.inner.import_block(block, new_authorities); - let import_result = { - // we scope this so that `just_in_case` is dropped eagerly and releases the authorities lock - let revert_authorities = || if let Some((old_set, mut authorities)) = just_in_case { - *authorities = old_set; - }; - + let mut imported_aux = { match import_result { - Ok(ImportResult::Queued) => ImportResult::Queued, + Ok(ImportResult::Imported(aux)) => aux, Ok(r) => { debug!(target: "afg", "Restoring old authority set after block import result: {:?}", r); - revert_authorities(); + pending_changes.revert(); return Ok(r); }, Err(e) => { debug!(target: "afg", "Restoring old authority set after block import error: {:?}", e); - revert_authorities(); + pending_changes.revert(); return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()); }, } }; - let enacts_change = self.authority_set.inner().read().enacts_change( - hash, - number, - &is_descendent_of, - ).map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))?; + let (applied_changes, do_pause) = pending_changes.defuse(); + + // Send the pause signal after import but BEFORE sending a `ChangeAuthorities` message. + if do_pause { + let _ = self.send_voter_commands.unbounded_send( + VoterCommand::Pause(format!("Forced change scheduled after inactivity")) + ); + } - if !enacts_change && !enacts_consensus_change { - return Ok(import_result); + let needs_justification = applied_changes.needs_justification(); + if let AppliedChanges::Forced(new) = applied_changes { + // NOTE: when we do a force change we are "discrediting" the old set so we + // ignore any justifications from them. this block may contain a justification + // which should be checked and imported below against the new authority + // triggered by this forced change. the new grandpa voter will start at the + // last median finalized block (which is before the block that enacts the + // change), full nodes syncing the chain will not be able to successfully + // import justifications for those blocks since their local authority set view + // is still of the set before the forced change was enacted, still after #1867 + // they should import the block and discard the justification, and they will + // then request a justification from sync if it's necessary (which they should + // then be able to successfully validate). + let _ = self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new)); + + // we must clear all pending justifications requests, presumably they won't be + // finalized hence why this forced changes was triggered + imported_aux.clear_justification_requests = true; + } + + if !needs_justification && !enacts_consensus_change { + return Ok(ImportResult::Imported(imported_aux)); } match justification { Some(justification) => { - self.import_justification(hash, number, justification, enacts_change)?; + self.import_justification(hash, number, justification, needs_justification)?; }, None => { - if enacts_change { + if needs_justification { trace!( target: "finality", "Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.", @@ -229,11 +475,11 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> BlockImport<Block> self.consensus_changes.lock().note_change((number, hash)); } - return Ok(ImportResult::NeedsJustification); + imported_aux.needs_justification = true; } } - Ok(import_result) + Ok(ImportResult::Imported(imported_aux)) } fn check_block( @@ -249,22 +495,22 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> GrandpaBlockImport<B, E, Block, RA pub(crate) fn new( inner: Arc<Client<B, E, Block, RA>>, authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>, - authority_set_change: mpsc::UnboundedSender<NewAuthoritySet<Block::Hash, NumberFor<Block>>>, + send_voter_commands: mpsc::UnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>, consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>, api: Arc<PRA>, ) -> GrandpaBlockImport<B, E, Block, RA, PRA> { GrandpaBlockImport { inner, authority_set, - authority_set_change, + send_voter_commands, consensus_changes, api, } } } -impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> - GrandpaBlockImport<B, E, Block, RA, PRA> where +impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> GrandpaBlockImport<B, E, Block, RA, PRA> + where NumberFor<Block>: grandpa::BlockNumberOps, B: Backend<Block, Blake2Hasher> + 'static, E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync, @@ -304,13 +550,15 @@ impl<B, E, Block: BlockT<Hash=H256>, RA, PRA> ); match result { - Err(ExitOrError::AuthoritiesChanged(new)) => { - info!(target: "finality", "Imported justification for block #{} that enacts authority set change, signalling voter.", number); - if let Err(e) = self.authority_set_change.unbounded_send(new) { + Err(CommandOrError::VoterCommand(command)) => { + info!(target: "finality", "Imported justification for block #{} that triggers \ + command {}, signalling voter.", number, command); + + if let Err(e) = self.send_voter_commands.unbounded_send(command) { return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()); } }, - Err(ExitOrError::Error(e)) => { + Err(CommandOrError::Error(e)) => { return Err(match e { Error::Grandpa(error) => ConsensusErrorKind::ClientImport(error.to_string()), Error::Network(error) => ConsensusErrorKind::ClientImport(error), diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 806fc6cfaafb2e375c3fe9ebce50ac6fb93dcf0a..a6b3df13fc565995fa86081b564f874d74327a8a 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -66,21 +66,27 @@ use runtime_primitives::traits::{ DigestItemFor, DigestItem, }; use fg_primitives::GrandpaApi; +use inherents::InherentDataProviders; use runtime_primitives::generic::BlockId; use substrate_primitives::{ed25519, H256, Ed25519AuthorityId, Blake2Hasher}; use substrate_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_WARN}; +use srml_finality_tracker; + use grandpa::Error as GrandpaError; use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet}; use network::Service as NetworkService; use network::consensus_gossip as network_gossip; + +use std::fmt; use std::sync::Arc; use std::time::Duration; pub use fg_primitives::ScheduledChange; mod authorities; +mod aux_schema; mod communication; mod consensus_changes; mod environment; @@ -94,9 +100,8 @@ mod service_integration; #[cfg(feature="service-integration")] pub use service_integration::{LinkHalfForService, BlockImportForService}; -use authorities::SharedAuthoritySet; -use consensus_changes::{ConsensusChanges, SharedConsensusChanges}; -use environment::{Environment, ExitOrError, NewAuthoritySet}; +use aux_schema::{PersistentData, VoterSetState}; +use environment::Environment; pub use finality_proof::{prove_finality, check_finality_proof}; use import::GrandpaBlockImport; use until_imported::UntilCommitBlocksImported; @@ -104,17 +109,9 @@ use until_imported::UntilCommitBlocksImported; #[cfg(test)] mod tests; -const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round"; -const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters"; -const CONSENSUS_CHANGES_KEY: &[u8] = b"grandpa_consensus_changes"; - const GRANDPA_ENGINE_ID: network::ConsensusEngineId = [b'a', b'f', b'g', b'1']; - const MESSAGE_ROUND_TOLERANCE: u64 = 2; -/// round-number, round-state -type LastCompleted<H, N> = (u64, RoundState<H, N>); - /// A GRANDPA message for a substrate chain. pub type Message<Block> = grandpa::Message<<Block as BlockT>::Hash, NumberFor<Block>>; /// A signed message. @@ -557,13 +554,81 @@ impl<B, E, Block: BlockT<Hash=H256>, RA> BlockStatus<Block> for Arc<Client<B, E, } } -/// Half of a link between a block-import worker and a the background voter. -// This should remain non-clone. +/// A new authority set along with the canonical block it changed at. +#[derive(Debug)] +pub(crate) struct NewAuthoritySet<H, N> { + pub(crate) canon_number: N, + pub(crate) canon_hash: H, + pub(crate) set_id: u64, + pub(crate) authorities: Vec<(Ed25519AuthorityId, u64)>, +} + +/// Commands issued to the voter. +#[derive(Debug)] +pub(crate) enum VoterCommand<H, N> { + /// Pause the voter for given reason. + Pause(String), + /// New authorities. + ChangeAuthorities(NewAuthoritySet<H, N>) +} + +impl<H, N> fmt::Display for VoterCommand<H, N> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason), + VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"), + } + } +} + +/// Signals either an early exit of a voter or an error. +#[derive(Debug)] +pub(crate) enum CommandOrError<H, N> { + /// An error occurred. + Error(Error), + /// A command to the voter. + VoterCommand(VoterCommand<H, N>), +} + +impl<H, N> From<Error> for CommandOrError<H, N> { + fn from(e: Error) -> Self { + CommandOrError::Error(e) + } +} + +impl<H, N> From<ClientError> for CommandOrError<H, N> { + fn from(e: ClientError) -> Self { + CommandOrError::Error(Error::Client(e)) + } +} + +impl<H, N> From<grandpa::Error> for CommandOrError<H, N> { + fn from(e: grandpa::Error) -> Self { + CommandOrError::Error(Error::from(e)) + } +} + +impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> { + fn from(e: VoterCommand<H, N>) -> Self { + CommandOrError::VoterCommand(e) + } +} + +impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> { } + +impl<H, N> fmt::Display for CommandOrError<H, N> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + CommandOrError::Error(ref e) => write!(f, "{:?}", e), + CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd), + } + } +} + pub struct LinkHalf<B, E, Block: BlockT<Hash=H256>, RA> { client: Arc<Client<B, E, Block, RA>>, - authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>, - authority_set_change: mpsc::UnboundedReceiver<NewAuthoritySet<Block::Hash, NumberFor<Block>>>, - consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>, + persistent_data: PersistentData<Block::Hash, NumberFor<Block>>, + voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>, } /// Make block importer and link half necessary to tie the background voter @@ -577,60 +642,41 @@ pub fn block_import<B, E, Block: BlockT<Hash=H256>, RA, PRA>( E: CallExecutor<Block, Blake2Hasher> + 'static + Clone + Send + Sync, RA: Send + Sync, PRA: ProvideRuntimeApi, - PRA::Api: GrandpaApi<Block> + PRA::Api: GrandpaApi<Block>, { use runtime_primitives::traits::Zero; - let authority_set = match Backend::get_aux(&**client.backend(), AUTHORITY_SET_KEY)? { - None => { - info!(target: "afg", "Loading GRANDPA authorities \ - from genesis on what appears to be first startup."); - - // no authority set on disk: fetch authorities from genesis state. - // if genesis state is not available, we may be a light client, but these - // are unsupported for following GRANDPA directly. + + let chain_info = client.info()?; + let genesis_hash = chain_info.chain.genesis_hash; + + let persistent_data = aux_schema::load_persistent( + &**client.backend(), + genesis_hash, + <NumberFor<Block>>::zero(), + || { let genesis_authorities = api.runtime_api() .grandpa_authorities(&BlockId::number(Zero::zero()))?; telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities"; "authorities_len" => ?genesis_authorities.len() ); - - let authority_set = SharedAuthoritySet::genesis(genesis_authorities); - let encoded = authority_set.inner().read().encode(); - Backend::insert_aux(&**client.backend(), &[(AUTHORITY_SET_KEY, &encoded[..])], &[])?; - - authority_set + Ok(genesis_authorities) } - Some(raw) => crate::authorities::AuthoritySet::decode(&mut &raw[..]) - .ok_or_else(|| ::client::error::ErrorKind::Backend( - format!("GRANDPA authority set kept in invalid format") - ))? - .into(), - }; - - let consensus_changes = Backend::get_aux(&**client.backend(), CONSENSUS_CHANGES_KEY)?; - let consensus_changes = Arc::new(parking_lot::Mutex::new(match consensus_changes { - Some(raw) => ConsensusChanges::decode(&mut &raw[..]) - .ok_or_else(|| ::client::error::ErrorKind::Backend( - format!("GRANDPA consensus changes kept in invalid format") - ))?, - None => ConsensusChanges::empty(), - })); - - let (authority_set_change_tx, authority_set_change_rx) = mpsc::unbounded(); + )?; + + let (voter_commands_tx, voter_commands_rx) = mpsc::unbounded(); Ok(( GrandpaBlockImport::new( client.clone(), - authority_set.clone(), - authority_set_change_tx, - consensus_changes.clone(), + persistent_data.authority_set.clone(), + voter_commands_tx, + persistent_data.consensus_changes.clone(), api, ), LinkHalf { client, - authority_set, - authority_set_change: authority_set_change_rx, - consensus_changes, + persistent_data, + voter_commands_rx, }, )) } @@ -644,11 +690,11 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>( ) -> ( impl Stream< Item = (u64, ::grandpa::CompactCommit<H256, NumberFor<Block>, ed25519::Signature, Ed25519AuthorityId>), - Error = ExitOrError<H256, NumberFor<Block>>, + Error = CommandOrError<H256, NumberFor<Block>>, >, impl Sink< SinkItem = (u64, ::grandpa::Commit<H256, NumberFor<Block>, ed25519::Signature, Ed25519AuthorityId>), - SinkError = ExitOrError<H256, NumberFor<Block>>, + SinkError = CommandOrError<H256, NumberFor<Block>>, >, ) where B: Backend<Block, Blake2Hasher>, @@ -687,12 +733,37 @@ fn committer_communication<Block: BlockT<Hash=H256>, B, E, N, RA>( (commit_in, commit_out) } +/// Register the finality tracker inherent data provider (which is used by +/// GRANDPA), if not registered already. +fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT<Hash=H256>, RA>( + client: Arc<Client<B, E, Block, RA>>, + inherent_data_providers: &InherentDataProviders, +) -> Result<(), consensus_common::Error> where + B: Backend<Block, Blake2Hasher> + 'static, + E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static, + RA: Send + Sync + 'static, +{ + if !inherent_data_providers.has_provider(&srml_finality_tracker::INHERENT_IDENTIFIER) { + inherent_data_providers + .register_provider(srml_finality_tracker::InherentDataProvider::new(move || { + match client.backend().blockchain().info() { + Err(e) => Err(std::borrow::Cow::Owned(e.to_string())), + Ok(info) => Ok(info.finalized_number), + } + })) + .map_err(|err| consensus_common::ErrorKind::InherentData(err.into()).into()) + } else { + Ok(()) + } +} + /// Run a GRANDPA voter as a task. Provide configuration and a link to a /// block import worker that has already been instantiated with `block_import`. pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>( config: Config, link: LinkHalf<B, E, Block, RA>, network: N, + inherent_data_providers: InherentDataProviders, on_exit: impl Future<Item=(),Error=()> + Send + 'static, ) -> ::client::error::Result<impl Future<Item=(),Error=()> + Send + 'static> where Block::Hash: Ord, @@ -706,29 +777,18 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>( RA: Send + Sync + 'static, { use futures::future::{self, Loop as FutureLoop}; - use runtime_primitives::traits::Zero; let LinkHalf { client, - authority_set, - authority_set_change, - consensus_changes, + persistent_data, + voter_commands_rx, } = link; - - let chain_info = client.info()?; - let genesis_hash = chain_info.chain.genesis_hash; - // we shadow network with the wrapping/rebroadcasting network to avoid // accidental reuse. let (broadcast_worker, network) = communication::rebroadcasting_network(network); + let PersistentData { authority_set, set_state, consensus_changes } = persistent_data; - let (last_round_number, last_state) = match Backend::get_aux(&**client.backend(), LAST_COMPLETED_KEY)? { - None => (0, RoundState::genesis((genesis_hash, <NumberFor<Block>>::zero()))), - Some(raw) => LastCompleted::decode(&mut &raw[..]) - .ok_or_else(|| ::client::error::ErrorKind::Backend( - format!("Last GRANDPA round state kept in invalid format") - ))? - }; + register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; let voters = authority_set.current_authorities(); @@ -740,95 +800,131 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA>( set_id: authority_set.set_id(), authority_set: authority_set.clone(), consensus_changes: consensus_changes.clone(), + last_completed: environment::LastCompletedRound::new(set_state.round()), }); - let initial_state = (initial_environment, last_round_number, last_state, authority_set_change.into_future()); + let initial_state = (initial_environment, set_state, voter_commands_rx.into_future()); let voter_work = future::loop_fn(initial_state, move |params| { - let (env, last_round_number, last_state, authority_set_change) = params; + let (env, set_state, voter_commands_rx) = params; debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id); telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter"; "name" => ?config.name(), "set_id" => ?env.set_id ); - let chain_info = match client.info() { - Ok(i) => i, - Err(e) => return future::Either::B(future::err(Error::Client(e))), - }; + let mut maybe_voter = match set_state.clone() { + VoterSetState::Live(last_round_number, last_round_state) => { + let chain_info = match client.info() { + Ok(i) => i, + Err(e) => return future::Either::B(future::err(Error::Client(e))), + }; - let last_finalized = ( - chain_info.chain.finalized_hash, - chain_info.chain.finalized_number, - ); + let last_finalized = ( + chain_info.chain.finalized_hash, + chain_info.chain.finalized_number, + ); - let committer_data = committer_communication( - config.local_key.clone(), - env.set_id, - &env.voters, - &client, - &network, - ); + let committer_data = committer_communication( + config.local_key.clone(), + env.set_id, + &env.voters, + &client, + &network, + ); - let voters = (*env.voters).clone(); + let voters = (*env.voters).clone(); + + Some(voter::Voter::new( + env.clone(), + voters, + committer_data, + last_round_number, + last_round_state, + last_finalized, + )) + } + VoterSetState::Paused(_, _) => None, + }; + + // needs to be combined with another future otherwise it can deadlock. + let poll_voter = future::poll_fn(move || match maybe_voter { + Some(ref mut voter) => voter.poll(), + None => Ok(Async::NotReady), + }); - let voter = voter::Voter::new( - env, - voters, - committer_data, - last_round_number, - last_state, - last_finalized, - ); let client = client.clone(); let config = config.clone(); let network = network.clone(); let authority_set = authority_set.clone(); let consensus_changes = consensus_changes.clone(); - let trigger_authority_set_change = |new: NewAuthoritySet<_, _>, authority_set_change| { - let env = Arc::new(Environment { - inner: client, - config, - voters: Arc::new(new.authorities.into_iter().collect()), - set_id: new.set_id, - network, - authority_set, - consensus_changes, - }); - - // start the new authority set using the block where the - // set changed (not where the signal happened!) as the base. - Ok(FutureLoop::Continue(( - env, - 0, // always start at round 0 when changing sets. - RoundState::genesis((new.canon_hash, new.canon_number)), - authority_set_change, - ))) + let handle_voter_command = move |command: VoterCommand<_, _>, voter_commands_rx| { + match command { + VoterCommand::ChangeAuthorities(new) => { + // start the new authority set using the block where the + // set changed (not where the signal happened!) as the base. + let genesis_state = RoundState::genesis((new.canon_hash, new.canon_number)); + let env = Arc::new(Environment { + inner: client, + config, + voters: Arc::new(new.authorities.into_iter().collect()), + set_id: new.set_id, + network, + authority_set, + consensus_changes, + last_completed: environment::LastCompletedRound::new( + (0, genesis_state.clone()) + ), + }); + + + let set_state = VoterSetState::Live( + 0, // always start at round 0 when changing sets. + genesis_state, + ); + + Ok(FutureLoop::Continue((env, set_state, voter_commands_rx))) + } + VoterCommand::Pause(reason) => { + info!(target: "afg", "Pausing old validator set: {}", reason); + + // not racing because old voter is shut down. + let (last_round_number, last_round_state) = env.last_completed.read(); + let set_state = VoterSetState::Paused( + last_round_number, + last_round_state, + ); + + aux_schema::write_voter_set_state(&**client.backend(), &set_state)?; + + Ok(FutureLoop::Continue((env, set_state, voter_commands_rx))) + }, + } }; - future::Either::A(voter.select2(authority_set_change).then(move |res| match res { + future::Either::A(poll_voter.select2(voter_commands_rx).then(move |res| match res { Ok(future::Either::A(((), _))) => { // voters don't conclude naturally; this could reasonably be an error. Ok(FutureLoop::Break(())) }, Err(future::Either::B(_)) => { - // the `authority_set_change` stream should not fail. + // the `voter_commands_rx` stream should not fail. Ok(FutureLoop::Break(())) }, Ok(future::Either::B(((None, _), _))) => { - // the `authority_set_change` stream should never conclude since it's never closed. + // the `voter_commands_rx` stream should never conclude since it's never closed. Ok(FutureLoop::Break(())) }, - Err(future::Either::A((ExitOrError::Error(e), _))) => { + Err(future::Either::A((CommandOrError::Error(e), _))) => { // return inner voter error Err(e) } - Ok(future::Either::B(((Some(new), authority_set_change), _))) => { - // authority set change triggered externally through the channel - trigger_authority_set_change(new, authority_set_change.into_future()) + Ok(future::Either::B(((Some(command), voter_commands_rx), _))) => { + // some command issued externally. + handle_voter_command(command, voter_commands_rx.into_future()) } - Err(future::Either::A((ExitOrError::AuthoritiesChanged(new), authority_set_change))) => { - // authority set change triggered internally by finalizing a change block - trigger_authority_set_change(new, authority_set_change) + Err(future::Either::A((CommandOrError::VoterCommand(command), voter_commands_rx))) => { + // some command issued internally. + handle_voter_command(command, voter_commands_rx) }, })) }); diff --git a/substrate/core/finality-grandpa/src/tests.rs b/substrate/core/finality-grandpa/src/tests.rs index 4821686f38bea64a0b4f967df89dcd381a7cb947..7fba9b654cbdd2c1569a78ce0164f3d9e2fb3000 100644 --- a/substrate/core/finality-grandpa/src/tests.rs +++ b/substrate/core/finality-grandpa/src/tests.rs @@ -29,8 +29,7 @@ use client::{ runtime_api::{Core, RuntimeVersion, ApiExt}, }; use test_client::{self, runtime::BlockNumber}; -use parity_codec::Decode; -use consensus_common::{BlockOrigin, ForkChoiceStrategy, ImportBlock, ImportResult}; +use consensus_common::{BlockOrigin, ForkChoiceStrategy, ImportedAux, ImportBlock, ImportResult}; use consensus_common::import_queue::{SharedBlockImport, SharedJustificationImport}; use std::collections::{HashMap, HashSet}; use std::result; @@ -40,6 +39,7 @@ use runtime_primitives::ExecutionContext; use substrate_primitives::NativeOrEncoded; use authorities::AuthoritySet; +use consensus_changes::ConsensusChanges; type PeerData = Mutex< @@ -240,6 +240,7 @@ impl Network<Block> for MessageRouting { struct TestApi { genesis_authorities: Vec<(Ed25519AuthorityId, u64)>, scheduled_changes: Arc<Mutex<HashMap<Hash, ScheduledChange<BlockNumber>>>>, + forced_changes: Arc<Mutex<HashMap<Hash, (BlockNumber, ScheduledChange<BlockNumber>)>>>, } impl TestApi { @@ -247,6 +248,7 @@ impl TestApi { TestApi { genesis_authorities, scheduled_changes: Arc::new(Mutex::new(HashMap::new())), + forced_changes: Arc::new(Mutex::new(HashMap::new())), } } } @@ -349,6 +351,24 @@ impl GrandpaApi<Block> for RuntimeApi { // extrinsics. Ok(self.inner.scheduled_changes.lock().get(&parent_hash).map(|c| c.clone())).map(NativeOrEncoded::Native) } + + fn grandpa_forced_change_runtime_api_impl( + &self, + at: &BlockId<Block>, + _: ExecutionContext, + _: Option<(&DigestFor<Block>)>, + _: Vec<u8>, + ) + -> Result<NativeOrEncoded<Option<(NumberFor<Block>, ScheduledChange<NumberFor<Block>>)>>> { + let parent_hash = match at { + &BlockId::Hash(at) => at, + _ => panic!("not requested by block hash!!"), + }; + + // we take only scheduled changes at given block number where there are no + // extrinsics. + Ok(self.inner.forced_changes.lock().get(&parent_hash).map(|c| c.clone())).map(NativeOrEncoded::Native) + } } const TEST_GOSSIP_DURATION: Duration = Duration::from_millis(500); @@ -361,7 +381,14 @@ fn make_ids(keys: &[Keyring]) -> Vec<(Ed25519AuthorityId, u64)> { .collect() } -fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[Keyring]) -> u64 { +// run the voters to completion. provide a closure to be invoked after +// the voters are spawned but before blocking on them. +fn run_to_completion_with<F: FnOnce()>( + blocks: u64, + net: Arc<Mutex<GrandpaTestNet>>, + peers: &[Keyring], + before_waiting: F, +) -> u64 { use parking_lot::RwLock; let mut finality_notifications = Vec::new(); @@ -402,6 +429,7 @@ fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[Keyr }, link, MessageRouting::new(net.clone(), peer_id), + InherentDataProviders::new(), futures::empty(), ).expect("all in order with client and network"); @@ -425,6 +453,8 @@ fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[Keyr .map(|_| ()) .map_err(|_| ()); + (before_waiting)(); + runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); let highest_finalized = *highest_finalized.read(); @@ -432,8 +462,13 @@ fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[Keyr highest_finalized } +fn run_to_completion(blocks: u64, net: Arc<Mutex<GrandpaTestNet>>, peers: &[Keyring]) -> u64 { + run_to_completion_with(blocks, net, peers, || {}) +} + #[test] fn finalize_3_voters_no_observers() { + let _ = env_logger::try_init(); let peers = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; let voters = make_ids(peers); @@ -495,6 +530,7 @@ fn finalize_3_voters_1_observer() { }, link, MessageRouting::new(net.clone(), peer_id), + InherentDataProviders::new(), futures::empty(), ).expect("all in order with client and network"); @@ -552,8 +588,9 @@ fn transition_3_voters_twice_1_observer() { assert_eq!(peer.client().info().unwrap().chain.best_number, 1, "Peer #{} failed to sync", i); - let set_raw = peer.client().backend().get_aux(crate::AUTHORITY_SET_KEY).unwrap().unwrap(); - let set = AuthoritySet::<Hash, BlockNumber>::decode(&mut &set_raw[..]).unwrap(); + let set: AuthoritySet<Hash, BlockNumber> = crate::aux_schema::load_authorities( + &**peer.client().backend() + ).unwrap(); assert_eq!(set.current(), (0, make_ids(peers_a).as_slice())); assert_eq!(set.pending_changes().count(), 0); @@ -638,8 +675,9 @@ fn transition_3_voters_twice_1_observer() { .take_while(|n| Ok(n.header.number() < &30)) .for_each(move |_| Ok(())) .map(move |()| { - let set_raw = client.backend().get_aux(crate::AUTHORITY_SET_KEY).unwrap().unwrap(); - let set = AuthoritySet::<Hash, BlockNumber>::decode(&mut &set_raw[..]).unwrap(); + let set: AuthoritySet<Hash, BlockNumber> = crate::aux_schema::load_authorities( + &**client.backend() + ).unwrap(); assert_eq!(set.current(), (2, make_ids(peers_c).as_slice())); assert_eq!(set.pending_changes().count(), 0); @@ -654,6 +692,7 @@ fn transition_3_voters_twice_1_observer() { }, link, MessageRouting::new(net.clone(), peer_id), + InherentDataProviders::new(), futures::empty(), ).expect("all in order with client and network"); @@ -784,7 +823,7 @@ fn sync_justifications_on_change_blocks() { #[test] fn finalizes_multiple_pending_changes_in_order() { - env_logger::init(); + let _ = env_logger::try_init(); let peers_a = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; let peers_b = &[Keyring::Dave, Keyring::Eve, Keyring::Ferdie]; @@ -865,6 +904,60 @@ fn doesnt_vote_on_the_tip_of_the_chain() { assert_eq!(highest, 75); } +#[test] +fn force_change_to_new_set() { + // two of these guys are offline. + let genesis_authorities = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie, Keyring::One, Keyring::Two]; + let peers_a = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; + let api = TestApi::new(make_ids(genesis_authorities)); + + let voters = make_ids(peers_a); + let normal_transitions = api.scheduled_changes.clone(); + let forced_transitions = api.forced_changes.clone(); + let net = GrandpaTestNet::new(api, 3); + let net = Arc::new(Mutex::new(net)); + + let runner_net = net.clone(); + let add_blocks = move || { + net.lock().peer(0).push_blocks(1, false); + + { + // add a forced transition at block 12. + let parent_hash = net.lock().peer(0).client().info().unwrap().chain.best_hash; + forced_transitions.lock().insert(parent_hash, (0, ScheduledChange { + next_authorities: voters.clone(), + delay: 10, + })); + + // add a normal transition too to ensure that forced changes take priority. + normal_transitions.lock().insert(parent_hash, ScheduledChange { + next_authorities: make_ids(genesis_authorities), + delay: 5, + }); + } + + net.lock().peer(0).push_blocks(25, false); + net.lock().sync(); + + for (i, peer) in net.lock().peers().iter().enumerate() { + assert_eq!(peer.client().info().unwrap().chain.best_number, 26, + "Peer #{} failed to sync", i); + + let set: AuthoritySet<Hash, BlockNumber> = crate::aux_schema::load_authorities( + &**peer.client().backend() + ).unwrap(); + + assert_eq!(set.current(), (1, voters.as_slice())); + assert_eq!(set.pending_changes().count(), 0); + } + }; + + // it will only finalize if the forced transition happens. + // we add_blocks after the voters are spawned because otherwise + // the link-halfs have the wrong AuthoritySet + run_to_completion_with(25, runner_net, peers_a, add_blocks); +} + #[test] fn allows_reimporting_change_blocks() { let peers_a = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; @@ -899,7 +992,7 @@ fn allows_reimporting_change_blocks() { assert_eq!( block_import.import_block(block(), None).unwrap(), - ImportResult::NeedsJustification + ImportResult::Imported(ImportedAux { needs_justification: true, clear_justification_requests: false }), ); assert_eq!( diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 237e06751e3029be6165610ad7329cc85a59ab5a..7c6e973c6991926d44f8fc025ce325412f313feb 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -223,6 +223,8 @@ pub enum ProtocolMsg<B: BlockT, S: NetworkSpecialization<B>> { PropagateExtrinsics, /// Tell protocol that a block was imported (sent by the import-queue). BlockImportedSync(B::Hash, NumberFor<B>), + /// Tell protocol to clear all pending justification requests. + ClearJustificationRequests, /// Tell protocol to request justification for a block. RequestJustification(B::Hash, NumberFor<B>), /// Inform protocol whether a justification was successfully imported. @@ -394,6 +396,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } ProtocolMsg::AnnounceBlock(hash) => self.announce_block(hash), ProtocolMsg::BlockImportedSync(hash, number) => self.sync.block_imported(&hash, number), + ProtocolMsg::ClearJustificationRequests => self.sync.clear_justification_requests(), ProtocolMsg::RequestJustification(hash, number) => { let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 993af60129cb9f147499adb4c99948f26efee1fb..194ea462ab31142542a5df08872d097b900c8738 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -101,6 +101,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> { } } + fn clear_justification_requests(&self) { + let _ = self.protocol_sender.send(ProtocolMsg::ClearJustificationRequests); + } + fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) { let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number)); } diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 6ba6617d7af1cba1a14f97ab4e59ed7e13182797..abc215a597bb402028b0985c1f9af75f56a8ef7f 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -293,6 +293,14 @@ impl<B: BlockT> PendingJustifications<B> { Ok(()) } + + /// Clear all data. + fn clear(&mut self) { + self.justifications = ForkTree::new(); + self.pending_requests.clear(); + self.peer_requests.clear(); + self.previous_requests.clear(); + } } /// Relay chain sync strategy. @@ -691,6 +699,11 @@ impl<B: BlockT> ChainSync<B> { self.justifications.dispatch(&mut self.peers, protocol); } + /// Clears all pending justification requests. + pub fn clear_justification_requests(&mut self) { + self.justifications.clear(); + } + pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) { self.justifications.justification_import_result(hash, number, success); } diff --git a/substrate/core/network/src/test/block_import.rs b/substrate/core/network/src/test/block_import.rs index aca789ce2b8608c78d0a8136b937bf0f0d2c3ae6..c8999419aa936b73071f879f5f5ff3642a0a9104 100644 --- a/substrate/core/network/src/test/block_import.rs +++ b/substrate/core/network/src/test/block_import.rs @@ -48,7 +48,7 @@ fn import_single_good_block_works() { let (_, _hash, number, block) = prepare_good_block(); assert_eq!( import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), - Ok(BlockImportResult::ImportedUnknown(number)) + Ok(BlockImportResult::ImportedUnknown(number, Default::default())) ); } diff --git a/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm b/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm index f4a22553c152bebc5c38eeae6597a2c05a8c76f3..13b55df5d1434798d967fb35a1e13737b982c3fd 100644 Binary files a/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm and b/substrate/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm differ diff --git a/substrate/core/util/fork-tree/src/lib.rs b/substrate/core/util/fork-tree/src/lib.rs index caed51adf67b307ea02969515d72bf91131fc9ad..71c83f57ca48791dcf44770886e881daeadb0246 100644 --- a/substrate/core/util/fork-tree/src/lib.rs +++ b/substrate/core/util/fork-tree/src/lib.rs @@ -79,7 +79,7 @@ pub enum FinalizationResult<V> { /// in order. Each node is uniquely identified by its hash but can be ordered by /// its number. In order to build the tree an external function must be provided /// when interacting with the tree to establish a node's ancestry. -#[derive(Clone, Debug, Decode, Encode)] +#[derive(Clone, Debug, Decode, Encode, PartialEq)] pub struct ForkTree<H, N, V> { roots: Vec<Node<H, N, V>>, best_finalized_number: Option<N>, @@ -353,8 +353,7 @@ impl<H, N, V> ForkTree<H, N, V> where mod node_implementation { use super::*; - #[derive(Clone, Debug, Decode, Encode)] - #[cfg_attr(test, derive(PartialEq))] + #[derive(Clone, Debug, Decode, Encode, PartialEq)] pub struct Node<H, N, V> { pub hash: H, pub number: N, diff --git a/substrate/node/cli/src/service.rs b/substrate/node/cli/src/service.rs index ff241a4c98a33d4fc1ad676cf0b8dd5f3e6fb465..9b1139f838e1dd02ff419efd3d6c72f6f8846db9 100644 --- a/substrate/node/cli/src/service.rs +++ b/substrate/node/cli/src/service.rs @@ -112,6 +112,7 @@ construct_service_factory! { }, link_half, grandpa::NetworkBridge::new(service.network()), + service.config.custom.inherent_data_providers.clone(), service.on_exit(), )?); diff --git a/substrate/node/runtime/Cargo.toml b/substrate/node/runtime/Cargo.toml index ac220ab7a8cfe7258da9d87bfa3796649373bd0f..d009a544d594647837be2c30a6b53c787c1fc52e 100644 --- a/substrate/node/runtime/Cargo.toml +++ b/substrate/node/runtime/Cargo.toml @@ -21,6 +21,7 @@ contract = { package = "srml-contract", path = "../../srml/contract", default-fe council = { package = "srml-council", path = "../../srml/council", default-features = false } democracy = { package = "srml-democracy", path = "../../srml/democracy", default-features = false } executive = { package = "srml-executive", path = "../../srml/executive", default-features = false } +finality-tracker = { package = "srml-finality-tracker", path = "../../srml/finality-tracker", default-features = false } grandpa = { package = "srml-grandpa", path = "../../srml/grandpa", default-features = false } indices = { package = "srml-indices", path = "../../srml/indices", default-features = false } session = { package = "srml-session", path = "../../srml/session", default-features = false } diff --git a/substrate/node/runtime/src/lib.rs b/substrate/node/runtime/src/lib.rs index ecbdbf691e8f5b8bf16eb5a72f0346faf36adc77..d9089f6b0ad5b5cabd50caeb27ab7c47e3725f12 100644 --- a/substrate/node/runtime/src/lib.rs +++ b/substrate/node/runtime/src/lib.rs @@ -60,8 +60,8 @@ pub const VERSION: RuntimeVersion = RuntimeVersion { spec_name: create_runtime_str!("node"), impl_name: create_runtime_str!("substrate-node"), authoring_version: 10, - spec_version: 32, - impl_version: 32, + spec_version: 33, + impl_version: 33, apis: RUNTIME_API_VERSIONS, }; @@ -192,6 +192,10 @@ impl grandpa::Trait for Runtime { type Event = Event; } +impl finality_tracker::Trait for Runtime { + type OnFinalizationStalled = grandpa::SyncedAuthorities<Runtime>; +} + construct_runtime!( pub enum Runtime with Log(InternalLog: DigestItem<Hash, SessionKey>) where Block = Block, @@ -211,6 +215,7 @@ construct_runtime!( CouncilVoting: council_voting, CouncilMotions: council_motions::{Module, Call, Storage, Event<T>, Origin}, CouncilSeats: council_seats::{Config<T>}, + FinalityTracker: finality_tracker::{Module, Call, Inherent}, Grandpa: grandpa::{Module, Call, Storage, Config<T>, Log(), Event<T>}, Treasury: treasury, Contract: contract::{Module, Call, Storage, Config<T>, Event<T>}, @@ -295,7 +300,7 @@ impl_runtime_apis! { { for log in digest.logs.iter().filter_map(|l| match l { Log(InternalLog::grandpa(grandpa_signal)) => Some(grandpa_signal), - _=> None + _ => None }) { if let Some(change) = Grandpa::scrape_digest_change(log) { return Some(change); @@ -304,6 +309,20 @@ impl_runtime_apis! { None } + fn grandpa_forced_change(digest: &DigestFor<Block>) + -> Option<(NumberFor<Block>, ScheduledChange<NumberFor<Block>>)> + { + for log in digest.logs.iter().filter_map(|l| match l { + Log(InternalLog::grandpa(grandpa_signal)) => Some(grandpa_signal), + _ => None + }) { + if let Some(change) = Grandpa::scrape_digest_forced_change(log) { + return Some(change); + } + } + None + } + fn grandpa_authorities() -> Vec<(SessionKey, u64)> { Grandpa::grandpa_authorities() } diff --git a/substrate/node/runtime/wasm/Cargo.lock b/substrate/node/runtime/wasm/Cargo.lock index c4b1e60dc3705088b12c5c1b4569bd7771325f5a..5bb473efb18409c66a083a563df533be9fa110f0 100644 --- a/substrate/node/runtime/wasm/Cargo.lock +++ b/substrate/node/runtime/wasm/Cargo.lock @@ -660,6 +660,7 @@ dependencies = [ "srml-democracy 0.1.0", "srml-executive 0.1.0", "srml-fees 0.1.0", + "srml-finality-tracker 0.1.0", "srml-grandpa 0.1.0", "srml-indices 0.1.0", "srml-session 0.1.0", @@ -1424,6 +1425,23 @@ dependencies = [ "substrate-primitives 0.1.0", ] +[[package]] +name = "srml-finality-tracker" +version = "0.1.0" +dependencies = [ + "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec-derive 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_derive 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 0.1.0", + "sr-std 0.1.0", + "srml-session 0.1.0", + "srml-support 0.1.0", + "srml-system 0.1.0", + "substrate-inherents 0.1.0", +] + [[package]] name = "srml-grandpa" version = "0.1.0" @@ -1433,6 +1451,7 @@ dependencies = [ "serde_derive 1.0.81 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", "sr-std 0.1.0", + "srml-finality-tracker 0.1.0", "srml-session 0.1.0", "srml-support 0.1.0", "srml-system 0.1.0", diff --git a/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm b/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm index b86fbdea66f2b33f8dd92eaf97bf4bed6d301ac8..7c6e43a51622bdd2c89c0dc1f1c19f0efae79cd4 100644 Binary files a/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm and b/substrate/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm differ diff --git a/substrate/srml/finality-tracker/Cargo.toml b/substrate/srml/finality-tracker/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..246d48d01c732e7fbf4360df72703df1dee325e2 --- /dev/null +++ b/substrate/srml/finality-tracker/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "srml-finality-tracker" +version = "0.1.0" +authors = ["Parity Technologies <admin@parity.io>"] +edition = "2018" + +[dependencies] +hex-literal = "0.1.0" +serde = { version = "1.0", default-features = false } +serde_derive = { version = "1.0", optional = true } +parity-codec = { version = "3.0", default-features = false } +parity-codec-derive = { version = "3.0", default-features = false } +substrate-inherents = { path = "../../core/inherents", default-features = false } +sr-std = { path = "../../core/sr-std", default-features = false } +sr-primitives = { path = "../../core/sr-primitives", default-features = false } +srml-support = { path = "../support", default-features = false } +srml-system = { path = "../system", default-features = false } +srml-session = { path = "../session", default-features = false } + +[dev-dependencies] +substrate-primitives = { path = "../../core/primitives", default-features = false } +sr-io = { path = "../../core/sr-io", default-features = false } +lazy_static = "1.0" +parking_lot = "0.7" + +[features] +default = ["std"] +std = [ + "serde/std", + "serde_derive", + "parity-codec/std", + "sr-std/std", + "srml-support/std", + "sr-primitives/std", + "srml-system/std", + "srml-session/std", + "substrate-inherents/std", +] diff --git a/substrate/srml/finality-tracker/src/lib.rs b/substrate/srml/finality-tracker/src/lib.rs new file mode 100644 index 0000000000000000000000000000000000000000..43e95912cfb2940e235fba2ee66f9bad1cf0f158 --- /dev/null +++ b/substrate/srml/finality-tracker/src/lib.rs @@ -0,0 +1,385 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. + +//! SRML module that tracks the last finalized block, as perceived by block authors. + +#![cfg_attr(not(feature = "std"), no_std)] + +#[macro_use] +extern crate srml_support; + +use substrate_inherents::{ + RuntimeString, InherentIdentifier, ProvideInherent, + InherentData, MakeFatalError, +}; +use srml_support::StorageValue; +use sr_primitives::traits::{As, One, Zero}; +use sr_std::{prelude::*, result, cmp, vec}; +use parity_codec::Decode; +use srml_system::{ensure_inherent, Trait as SystemTrait}; + +#[cfg(feature = "std")] +use parity_codec::Encode; + +const DEFAULT_WINDOW_SIZE: u64 = 101; +const DEFAULT_DELAY: u64 = 1000; + +/// The identifier for the `finalnum` inherent. +pub const INHERENT_IDENTIFIER: InherentIdentifier = *b"finalnum"; + +/// Auxiliary trait to extract finalized inherent data. +pub trait FinalizedInherentData<N: Decode> { + /// Get finalized inherent data. + fn finalized_number(&self) -> Result<N, RuntimeString>; +} + +impl<N: Decode> FinalizedInherentData<N> for InherentData { + fn finalized_number(&self) -> Result<N, RuntimeString> { + self.get_data(&INHERENT_IDENTIFIER) + .and_then(|r| r.ok_or_else(|| "Finalized number inherent data not found".into())) + } +} + +/// Provider for inherent data. +#[cfg(feature = "std")] +pub struct InherentDataProvider<F, N> { + inner: F, + _marker: std::marker::PhantomData<N>, +} + +#[cfg(feature = "std")] +impl<F, N> InherentDataProvider<F, N> { + pub fn new(final_oracle: F) -> Self { + InherentDataProvider { inner: final_oracle, _marker: Default::default() } + } +} + +#[cfg(feature = "std")] +impl<F, N: Encode> substrate_inherents::ProvideInherentData for InherentDataProvider<F, N> + where F: Fn() -> Result<N, RuntimeString> +{ + fn inherent_identifier(&self) -> &'static InherentIdentifier { + &INHERENT_IDENTIFIER + } + + fn provide_inherent_data(&self, inherent_data: &mut InherentData) -> Result<(), RuntimeString> { + (self.inner)() + .and_then(|n| inherent_data.put_data(INHERENT_IDENTIFIER, &n)) + } + + fn error_to_string(&self, _error: &[u8]) -> Option<String> { + Some(format!("no further information")) + } +} + + +pub trait Trait: SystemTrait { + /// Something which can be notified when the timestamp is set. Set this to `()` if not needed. + type OnFinalizationStalled: OnFinalizationStalled<Self::BlockNumber>; +} + +decl_storage! { + trait Store for Module<T: Trait> as Timestamp { + /// Recent hints. + RecentHints get(recent_hints) build(|_| vec![T::BlockNumber::zero()]): Vec<T::BlockNumber>; + /// Ordered recent hints. + OrderedHints get(ordered_hints) build(|_| vec![T::BlockNumber::zero()]): Vec<T::BlockNumber>; + /// The median. + Median get(median) build(|_| T::BlockNumber::zero()): T::BlockNumber; + /// The number of recent samples to keep from this chain. Default is n-100 + pub WindowSize get(window_size) config(window_size): T::BlockNumber = T::BlockNumber::sa(DEFAULT_WINDOW_SIZE); + /// The delay after which point things become suspicious. + pub ReportLatency get(report_latency) config(report_latency): T::BlockNumber = T::BlockNumber::sa(DEFAULT_DELAY); + + /// Final hint to apply in the block. `None` means "same as parent". + Update: Option<T::BlockNumber>; + + // when initialized through config this is set in the beginning. + Initialized get(initialized) build(|_| false): bool; + } +} + +decl_module! { + pub struct Module<T: Trait> for enum Call where origin: T::Origin { + /// Hint that the author of this block thinks the best finalized + /// block is the given number. + fn final_hint(origin, #[compact] hint: T::BlockNumber) { + ensure_inherent(origin)?; + assert!(!<Self as Store>::Update::exists(), "Final hint must be updated only once in the block"); + assert!( + srml_system::Module::<T>::block_number() >= hint, + "Finalized height above block number", + ); + <Self as Store>::Update::put(hint); + } + + fn on_finalise() { + Self::update_hint(<Self as Store>::Update::take()) + } + } +} + +impl<T: Trait> Module<T> { + fn update_hint(hint: Option<T::BlockNumber>) { + if !Self::initialized() { + <Self as Store>::RecentHints::put(vec![T::BlockNumber::zero()]); + <Self as Store>::OrderedHints::put(vec![T::BlockNumber::zero()]); + <Self as Store>::Median::put(T::BlockNumber::zero()); + + <Self as Store>::Initialized::put(true); + } + + let mut recent = Self::recent_hints(); + let mut ordered = Self::ordered_hints(); + let window_size = cmp::max(T::BlockNumber::one(), Self::window_size()); + + let hint = hint.unwrap_or_else(|| recent.last() + .expect("always at least one recent sample; qed").clone() + ); + + // prune off the front of the list -- typically 1 except for when + // the sample size has just been shrunk. + { + // take into account the item we haven't pushed yet. + let to_prune = (recent.len() + 1).saturating_sub(window_size.as_() as usize); + + for drained in recent.drain(..to_prune) { + let idx = ordered.binary_search(&drained) + .expect("recent and ordered contain the same items; qed"); + + ordered.remove(idx); + } + } + + // find the position in the ordered list where the new item goes. + let ordered_idx = ordered.binary_search(&hint) + .unwrap_or_else(|idx| idx); + + ordered.insert(ordered_idx, hint); + recent.push(hint); + + let two = T::BlockNumber::one() + T::BlockNumber::one(); + + let median = { + let len = ordered.len(); + assert!(len > 0, "pruning dictated by window_size which is always saturated at 1; qed"); + + if len % 2 == 0 { + let a = ordered[len / 2]; + let b = ordered[(len / 2) - 1]; + + // compute average. + (a + b) / two + } else { + ordered[len / 2] + } + }; + + let our_window_size = recent.len(); + + <Self as Store>::RecentHints::put(recent); + <Self as Store>::OrderedHints::put(ordered); + <Self as Store>::Median::put(median); + + if T::BlockNumber::sa(our_window_size as u64) == window_size { + let now = srml_system::Module::<T>::block_number(); + let latency = Self::report_latency(); + + // the delay is the latency plus half the window size. + let delay = latency + (window_size / two); + // median may be at most n - delay + if median + delay <= now { + T::OnFinalizationStalled::on_stalled(window_size - T::BlockNumber::one()); + } + } + } +} + +/// Called when finalization stalled at a given number. +pub trait OnFinalizationStalled<N> { + /// The parameter here is how many more blocks to wait before applying + /// changes triggered by finality stalling. + fn on_stalled(further_wait: N); +} + +macro_rules! impl_on_stalled { + () => ( + impl<N> OnFinalizationStalled<N> for () { + fn on_stalled(_: N) {} + } + ); + + ( $($t:ident)* ) => { + impl<NUM: Clone, $($t: OnFinalizationStalled<NUM>),*> OnFinalizationStalled<NUM> for ($($t,)*) { + fn on_stalled(further_wait: NUM) { + $($t::on_stalled(further_wait.clone());)* + } + } + } +} + +for_each_tuple!(impl_on_stalled); + +impl<T: Trait> ProvideInherent for Module<T> { + type Call = Call<T>; + type Error = MakeFatalError<()>; + const INHERENT_IDENTIFIER: InherentIdentifier = INHERENT_IDENTIFIER; + + fn create_inherent(data: &InherentData) -> Option<Self::Call> { + let final_num = + data.finalized_number().expect("Gets and decodes final number inherent data"); + + // make hint only when not same as last to avoid bloat. + Self::recent_hints().last().and_then(|last| if last == &final_num { + None + } else { + Some(Call::final_hint(final_num)) + }) + } + + fn check_inherent(_call: &Self::Call, _data: &InherentData) -> result::Result<(), Self::Error> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use sr_io::{with_externalities, TestExternalities}; + use substrate_primitives::H256; + use sr_primitives::BuildStorage; + use sr_primitives::traits::{BlakeTwo256, IdentityLookup, OnFinalise, Header as HeaderT}; + use sr_primitives::testing::{Digest, DigestItem, Header}; + use srml_support::impl_outer_origin; + use srml_system as system; + use lazy_static::lazy_static; + use parking_lot::Mutex; + + #[derive(Clone, PartialEq, Debug)] + pub struct StallEvent { + at: u64, + further_wait: u64, + } + + macro_rules! make_test_context { + () => { + #[derive(Clone, Eq, PartialEq)] + pub struct Test; + + impl_outer_origin! { + pub enum Origin for Test {} + } + + impl system::Trait for Test { + type Origin = Origin; + type Index = u64; + type BlockNumber = u64; + type Hash = H256; + type Hashing = BlakeTwo256; + type Digest = Digest; + type AccountId = u64; + type Lookup = IdentityLookup<u64>; + type Header = Header; + type Event = (); + type Log = DigestItem; + } + + type System = system::Module<Test>; + + lazy_static! { + static ref NOTIFICATIONS: Mutex<Vec<StallEvent>> = Mutex::new(Vec::new()); + } + + pub struct StallTracker; + impl OnFinalizationStalled<u64> for StallTracker { + fn on_stalled(further_wait: u64) { + let now = System::block_number(); + NOTIFICATIONS.lock().push(StallEvent { at: now, further_wait }); + } + } + + impl Trait for Test { + type OnFinalizationStalled = StallTracker; + } + + type FinalityTracker = Module<Test>; + } + } + + #[test] + fn median_works() { + make_test_context!(); + let t = system::GenesisConfig::<Test>::default().build_storage().unwrap().0; + + with_externalities(&mut TestExternalities::new(t), || { + FinalityTracker::update_hint(Some(500)); + assert_eq!(FinalityTracker::median(), 250); + assert!(NOTIFICATIONS.lock().is_empty()); + }); + } + + #[test] + fn notifies_when_stalled() { + make_test_context!(); + let mut t = system::GenesisConfig::<Test>::default().build_storage().unwrap().0; + t.extend(GenesisConfig::<Test> { + window_size: 11, + report_latency: 100 + }.build_storage().unwrap().0); + + with_externalities(&mut TestExternalities::new(t), || { + let mut parent_hash = System::parent_hash(); + for i in 2..106 { + System::initialise(&i, &parent_hash, &Default::default()); + FinalityTracker::on_finalise(i); + let hdr = System::finalise(); + parent_hash = hdr.hash(); + } + + assert_eq!( + NOTIFICATIONS.lock().to_vec(), + vec![StallEvent { at: 105, further_wait: 10 }] + ) + }); + } + + #[test] + fn recent_notifications_prevent_stalling() { + make_test_context!(); + let mut t = system::GenesisConfig::<Test>::default().build_storage().unwrap().0; + t.extend(GenesisConfig::<Test> { + window_size: 11, + report_latency: 100 + }.build_storage().unwrap().0); + + with_externalities(&mut TestExternalities::new(t), || { + let mut parent_hash = System::parent_hash(); + for i in 2..106 { + System::initialise(&i, &parent_hash, &Default::default()); + assert_ok!(FinalityTracker::dispatch( + Call::final_hint(i-1), + Origin::INHERENT, + )); + FinalityTracker::on_finalise(i); + let hdr = System::finalise(); + parent_hash = hdr.hash(); + } + + assert!(NOTIFICATIONS.lock().is_empty()); + }); + } +} diff --git a/substrate/srml/grandpa/Cargo.toml b/substrate/srml/grandpa/Cargo.toml index c9faeadb5babe973e1458d6987859f9644a43a56..e7a1b701052d333c48813d4b1cdf2343dbc170b1 100644 --- a/substrate/srml/grandpa/Cargo.toml +++ b/substrate/srml/grandpa/Cargo.toml @@ -16,6 +16,7 @@ primitives = { package = "sr-primitives", path = "../../core/sr-primitives", def srml-support = { path = "../support", default-features = false } system = { package = "srml-system", path = "../system", default-features = false } session = { package = "srml-session", path = "../session", default-features = false } +finality-tracker = { package = "srml-finality-tracker", path = "../finality-tracker", default-features = false } [dev-dependencies] runtime_io = { package = "sr-io", path = "../../core/sr-io" } @@ -33,4 +34,5 @@ std = [ "primitives/std", "system/std", "session/std", + "finality-tracker/std", ] diff --git a/substrate/srml/grandpa/src/lib.rs b/substrate/srml/grandpa/src/lib.rs index 16e73540d6aa4a106a61842e4d496857d60547fc..f06c84e23507e942bad6a02d59f75ffb8a453796 100644 --- a/substrate/srml/grandpa/src/lib.rs +++ b/substrate/srml/grandpa/src/lib.rs @@ -64,6 +64,8 @@ pub type Log<T> = RawLog< pub trait GrandpaChangeSignal<N> { /// Try to cast the log entry as a contained signal. fn as_signal(&self) -> Option<ScheduledChange<N>>; + /// Try to cast the log entry as a contained forced signal. + fn as_forced_signal(&self) -> Option<(N, ScheduledChange<N>)>; } /// A logs in this module. @@ -71,15 +73,28 @@ pub trait GrandpaChangeSignal<N> { #[derive(Encode, Decode, PartialEq, Eq, Clone)] pub enum RawLog<N, SessionKey> { /// Authorities set change has been signalled. Contains the new set of authorities - /// and the delay in blocks before applying. + /// and the delay in blocks _to finalize_ before applying. AuthoritiesChangeSignal(N, Vec<(SessionKey, u64)>), + /// A forced authorities set change. Contains in this order: the median last + /// finalized block when the change was signaled, the delay in blocks _to import_ + /// before applying and the new set of authorities. + ForcedAuthoritiesChangeSignal(N, N, Vec<(SessionKey, u64)>), } impl<N: Clone, SessionKey> RawLog<N, SessionKey> { /// Try to cast the log entry as a contained signal. pub fn as_signal(&self) -> Option<(N, &[(SessionKey, u64)])> { match *self { - RawLog::AuthoritiesChangeSignal(ref n, ref signal) => Some((n.clone(), signal)), + RawLog::AuthoritiesChangeSignal(ref delay, ref signal) => Some((delay.clone(), signal)), + RawLog::ForcedAuthoritiesChangeSignal(_, _, _) => None, + } + } + + /// Try to cast the log entry as a contained forced signal. + pub fn as_forced_signal(&self) -> Option<(N, N, &[(SessionKey, u64)])> { + match *self { + RawLog::ForcedAuthoritiesChangeSignal(ref median, ref delay, ref signal) => Some((median.clone(), delay.clone(), signal)), + RawLog::AuthoritiesChangeSignal(_, _) => None, } } } @@ -96,6 +111,16 @@ impl<N, SessionKey> GrandpaChangeSignal<N> for RawLog<N, SessionKey> .collect(), }) } + + fn as_forced_signal(&self) -> Option<(N, ScheduledChange<N>)> { + RawLog::as_forced_signal(self).map(|(median, delay, next_authorities)| (median, ScheduledChange { + delay, + next_authorities: next_authorities.iter() + .cloned() + .map(|(k, w)| (k.into(), w)) + .collect(), + })) + } } pub trait Trait: system::Trait { @@ -109,8 +134,21 @@ pub trait Trait: system::Trait { type Event: From<Event<Self>> + Into<<Self as system::Trait>::Event>; } -/// A stored pending change. +/// A stored pending change, old format. +// TODO: remove shim +// https://github.com/paritytech/substrate/issues/1614 #[derive(Encode, Decode)] +pub struct OldStoredPendingChange<N, SessionKey> { + /// The block number this was scheduled at. + pub scheduled_at: N, + /// The delay in blocks until it will be applied. + pub delay: N, + /// The next authority set. + pub next_authorities: Vec<(SessionKey, u64)>, +} + +/// A stored pending change. +#[derive(Encode)] pub struct StoredPendingChange<N, SessionKey> { /// The block number this was scheduled at. pub scheduled_at: N, @@ -118,6 +156,23 @@ pub struct StoredPendingChange<N, SessionKey> { pub delay: N, /// The next authority set. pub next_authorities: Vec<(SessionKey, u64)>, + /// If defined it means the change was forced and the given block number + /// indicates the median last finalized block when the change was signaled. + pub forced: Option<N>, +} + +impl<N: Decode, SessionKey: Decode> Decode for StoredPendingChange<N, SessionKey> { + fn decode<I: codec::Input>(value: &mut I) -> Option<Self> { + let old = OldStoredPendingChange::decode(value)?; + let forced = <Option<N>>::decode(value).unwrap_or(None); + + Some(StoredPendingChange { + scheduled_at: old.scheduled_at, + delay: old.delay, + next_authorities: old.next_authorities, + forced, + }) + } } /// GRANDPA events. @@ -132,6 +187,8 @@ decl_storage! { trait Store for Module<T: Trait> as GrandpaFinality { // Pending change: (signalled at, scheduled change). PendingChange get(pending_change): Option<StoredPendingChange<T::BlockNumber, T::SessionKey>>; + // next block number where we can force a change. + NextForced get(next_forced): Option<T::BlockNumber>; } add_extra_genesis { config(authorities): Vec<(T::SessionKey, u64)>; @@ -167,10 +224,18 @@ decl_module! { fn on_finalise(block_number: T::BlockNumber) { if let Some(pending_change) = <PendingChange<T>>::get() { if block_number == pending_change.scheduled_at { - Self::deposit_log(RawLog::AuthoritiesChangeSignal( - pending_change.delay, - pending_change.next_authorities.clone(), - )); + if let Some(median) = pending_change.forced { + Self::deposit_log(RawLog::ForcedAuthoritiesChangeSignal( + median, + pending_change.delay, + pending_change.next_authorities.clone(), + )); + } else { + Self::deposit_log(RawLog::AuthoritiesChangeSignal( + pending_change.delay, + pending_change.next_authorities.clone(), + )); + } } if block_number == pending_change.scheduled_at + pending_change.delay { @@ -197,18 +262,39 @@ impl<T: Trait> Module<T> { /// `in_blocks` after the current block. This value may be 0, in which /// case the change is applied at the end of the current block. /// + /// If the `forced` parameter is defined, this indicates that the current + /// set has been synchronously determined to be offline and that after + /// `in_blocks` the given change should be applied. The given block number + /// indicates the median last finalized block number and it should be used + /// as the canon block when starting the new grandpa voter. + /// /// No change should be signalled while any change is pending. Returns /// an error if a change is already pending. pub fn schedule_change( next_authorities: Vec<(T::SessionKey, u64)>, in_blocks: T::BlockNumber, + forced: Option<T::BlockNumber>, ) -> Result { + use primitives::traits::As; + if Self::pending_change().is_none() { let scheduled_at = system::ChainContext::<T>::default().current_height(); + + if let Some(_) = forced { + if Self::next_forced().map_or(false, |next| next > scheduled_at) { + return Err("Cannot signal forced change so soon after last."); + } + + // only allow the next forced change when twice the window has passed since + // this one. + <NextForced<T>>::put(scheduled_at + in_blocks * T::BlockNumber::sa(2)); + } + <PendingChange<T>>::put(StoredPendingChange { delay: in_blocks, scheduled_at, next_authorities, + forced, }); Ok(()) @@ -224,12 +310,19 @@ impl<T: Trait> Module<T> { } impl<T: Trait> Module<T> where Ed25519AuthorityId: core::convert::From<<T as Trait>::SessionKey> { - /// See if the digest contains any scheduled change. + /// See if the digest contains any standard scheduled change. pub fn scrape_digest_change(log: &Log<T>) -> Option<ScheduledChange<T::BlockNumber>> { <Log<T> as GrandpaChangeSignal<T::BlockNumber>>::as_signal(log) } + + /// See if the digest contains any forced scheduled change. + pub fn scrape_digest_forced_change(log: &Log<T>) + -> Option<(T::BlockNumber, ScheduledChange<T::BlockNumber>)> + { + <Log<T> as GrandpaChangeSignal<T::BlockNumber>>::as_forced_signal(log) + } } /// Helper for authorities being synchronized with the general session authorities. @@ -266,7 +359,34 @@ impl<X, T> session::OnSessionChange<X> for SyncedAuthorities<T> where // instant changes let last_authorities = <Module<T>>::grandpa_authorities(); if next_authorities != last_authorities { - let _ = <Module<T>>::schedule_change(next_authorities, Zero::zero()); + let _ = <Module<T>>::schedule_change(next_authorities, Zero::zero(), None); } } } + +impl<T> finality_tracker::OnFinalizationStalled<T::BlockNumber> for SyncedAuthorities<T> where + T: Trait, + T: session::Trait, + T: finality_tracker::Trait, + <T as session::Trait>::ConvertAccountIdToSessionKey: Convert< + <T as system::Trait>::AccountId, + <T as Trait>::SessionKey, + >, +{ + fn on_stalled(further_wait: T::BlockNumber) { + // when we record old authority sets, we can use `finality_tracker::median` + // to figure out _who_ failed. until then, we can't meaningfully guard + // against `next == last` the way that normal session changes do. + + let next_authorities = <session::Module<T>>::validators() + .into_iter() + .map(T::ConvertAccountIdToSessionKey::convert) + .map(|key| (key, 1)) // evenly-weighted. + .collect::<Vec<(<T as Trait>::SessionKey, u64)>>(); + + let median = <finality_tracker::Module<T>>::median(); + + // schedule a change for `further_wait` blocks. + let _ = <Module<T>>::schedule_change(next_authorities, further_wait, Some(median)); + } +} diff --git a/substrate/srml/grandpa/src/tests.rs b/substrate/srml/grandpa/src/tests.rs index 74c998d873db0375cfb36f6d018da7544be23c24..f7e08a20b3b6f597fafb97a21aceb16f9404493e 100644 --- a/substrate/srml/grandpa/src/tests.rs +++ b/substrate/srml/grandpa/src/tests.rs @@ -24,12 +24,14 @@ use runtime_io::with_externalities; use crate::mock::{Grandpa, System, new_test_ext}; use system::{EventRecord, Phase}; use crate::{RawLog, RawEvent}; +use codec::{Decode, Encode}; +use super::*; #[test] fn authorities_change_logged() { with_externalities(&mut new_test_ext(vec![(1, 1), (2, 1), (3, 1)]), || { System::initialise(&1, &Default::default(), &Default::default()); - Grandpa::schedule_change(vec![(4, 1), (5, 1), (6, 1)], 0).unwrap(); + Grandpa::schedule_change(vec![(4, 1), (5, 1), (6, 1)], 0, None).unwrap(); System::note_finished_extrinsics(); Grandpa::on_finalise(1); @@ -54,7 +56,7 @@ fn authorities_change_logged() { fn authorities_change_logged_after_delay() { with_externalities(&mut new_test_ext(vec![(1, 1), (2, 1), (3, 1)]), || { System::initialise(&1, &Default::default(), &Default::default()); - Grandpa::schedule_change(vec![(4, 1), (5, 1), (6, 1)], 1).unwrap(); + Grandpa::schedule_change(vec![(4, 1), (5, 1), (6, 1)], 1, None).unwrap(); Grandpa::on_finalise(1); let header = System::finalise(); assert_eq!(header.digest, testing::Digest { @@ -84,25 +86,112 @@ fn authorities_change_logged_after_delay() { fn cannot_schedule_change_when_one_pending() { with_externalities(&mut new_test_ext(vec![(1, 1), (2, 1), (3, 1)]), || { System::initialise(&1, &Default::default(), &Default::default()); - Grandpa::schedule_change(vec![(4, 1), (5, 1), (6, 1)], 1).unwrap(); + Grandpa::schedule_change(vec![(4, 1), (5, 1), (6, 1)], 1, None).unwrap(); assert!(Grandpa::pending_change().is_some()); - assert!(Grandpa::schedule_change(vec![(5, 1)], 1).is_err()); + assert!(Grandpa::schedule_change(vec![(5, 1)], 1, None).is_err()); Grandpa::on_finalise(1); let header = System::finalise(); System::initialise(&2, &header.hash(), &Default::default()); assert!(Grandpa::pending_change().is_some()); - assert!(Grandpa::schedule_change(vec![(5, 1)], 1).is_err()); + assert!(Grandpa::schedule_change(vec![(5, 1)], 1, None).is_err()); Grandpa::on_finalise(2); let header = System::finalise(); System::initialise(&3, &header.hash(), &Default::default()); assert!(Grandpa::pending_change().is_none()); - assert!(Grandpa::schedule_change(vec![(5, 1)], 1).is_ok()); + assert!(Grandpa::schedule_change(vec![(5, 1)], 1, None).is_ok()); Grandpa::on_finalise(3); let _header = System::finalise(); }); } + +#[test] +fn new_decodes_from_old() { + let old = OldStoredPendingChange { + scheduled_at: 5u32, + delay: 100u32, + next_authorities: vec![(1u64, 5), (2u64, 10), (3u64, 2)], + }; + + let encoded = old.encode(); + let new = StoredPendingChange::<u32, u64>::decode(&mut &encoded[..]).unwrap(); + assert!(new.forced.is_none()); + assert_eq!(new.scheduled_at, old.scheduled_at); + assert_eq!(new.delay, old.delay); + assert_eq!(new.next_authorities, old.next_authorities); +} + +#[test] +fn dispatch_forced_change() { + with_externalities(&mut new_test_ext(vec![(1, 1), (2, 1), (3, 1)]), || { + System::initialise(&1, &Default::default(), &Default::default()); + Grandpa::schedule_change( + vec![(4, 1), (5, 1), (6, 1)], + 5, + Some(0), + ).unwrap(); + + assert!(Grandpa::pending_change().is_some()); + assert!(Grandpa::schedule_change(vec![(5, 1)], 1, Some(0)).is_err()); + + Grandpa::on_finalise(1); + let mut header = System::finalise(); + + for i in 2..7 { + System::initialise(&i, &header.hash(), &Default::default()); + assert!(Grandpa::pending_change().unwrap().forced.is_some()); + assert_eq!(Grandpa::next_forced(), Some(11)); + assert!(Grandpa::schedule_change(vec![(5, 1)], 1, None).is_err()); + assert!(Grandpa::schedule_change(vec![(5, 1)], 1, Some(0)).is_err()); + + Grandpa::on_finalise(i); + header = System::finalise(); + } + + // change has been applied at the end of block 6. + // add a normal change. + { + System::initialise(&7, &header.hash(), &Default::default()); + assert!(Grandpa::pending_change().is_none()); + assert_eq!(Grandpa::grandpa_authorities(), vec![(4, 1), (5, 1), (6, 1)]); + assert!(Grandpa::schedule_change(vec![(5, 1)], 1, None).is_ok()); + Grandpa::on_finalise(7); + header = System::finalise(); + } + + // run the normal change. + { + System::initialise(&8, &header.hash(), &Default::default()); + assert!(Grandpa::pending_change().is_some()); + assert_eq!(Grandpa::grandpa_authorities(), vec![(4, 1), (5, 1), (6, 1)]); + assert!(Grandpa::schedule_change(vec![(5, 1)], 1, None).is_err()); + Grandpa::on_finalise(8); + header = System::finalise(); + } + + // normal change applied. but we can't apply a new forced change for some + // time. + for i in 9..11 { + System::initialise(&i, &header.hash(), &Default::default()); + assert!(Grandpa::pending_change().is_none()); + assert_eq!(Grandpa::grandpa_authorities(), vec![(5, 1)]); + assert_eq!(Grandpa::next_forced(), Some(11)); + assert!(Grandpa::schedule_change(vec![(5, 1), (6, 1)], 5, Some(0)).is_err()); + Grandpa::on_finalise(i); + header = System::finalise(); + } + + { + System::initialise(&11, &header.hash(), &Default::default()); + assert!(Grandpa::pending_change().is_none()); + assert!(Grandpa::schedule_change(vec![(5, 1), (6, 1), (7, 1)], 5, Some(0)).is_ok()); + assert_eq!(Grandpa::next_forced(), Some(21)); + Grandpa::on_finalise(11); + header = System::finalise(); + } + }); +}