diff --git a/substrate/core/finality-grandpa/src/authorities.rs b/substrate/core/finality-grandpa/src/authorities.rs index 4e435f542094e52995909a156c6a7ae1ac489cd2..dd6a433995e8b99e6e67292b551917c833cee3e6 100644 --- a/substrate/core/finality-grandpa/src/authorities.rs +++ b/substrate/core/finality-grandpa/src/authorities.rs @@ -85,6 +85,15 @@ impl<H, N> From<AuthoritySet<H, N>> for SharedAuthoritySet<H, N> { } } +/// Status of the set after changes were applied. +pub(crate) struct Status<H, N> { + /// Whether internal changes were made. + pub(crate) changed: bool, + /// `Some` when underlying authority set has changed, containign the + /// block where that set changed. + pub(crate) new_set_block: Option<(H, N)>, +} + /// A set of authorities. #[derive(Debug, Clone, Encode, Decode)] pub(crate) struct AuthoritySet<H, N> { @@ -129,12 +138,16 @@ impl<H: Eq, N> AuthoritySet<H, N> /// Apply or prune any pending transitions. Provide a closure that can be used to check for the /// finalized block with given number. /// - /// Returns true when the set's representation has changed. + /// When the set has changed, the return value will be `Ok(Some((H, N)))` which is the cnaonical + /// block where the set last changed. pub(crate) fn apply_changes<F, E>(&mut self, just_finalized: N, mut canonical: F) - -> Result<bool, E> + -> Result<Status<H, N>, E> where F: FnMut(N) -> Result<H, E> { - let mut changed = false; + let mut status = Status { + changed: false, + new_set_block: None, + }; loop { let remove_up_to = match self.pending_changes.first() { None => break, @@ -152,6 +165,11 @@ impl<H: Eq, N> AuthoritySet<H, N> self.current_authorities = change.next_authorities.clone(); self.set_id += 1; + status.new_set_block = Some(( + canonical(effective_number.clone())?, + effective_number.clone(), + )); + // discard any signalled changes // that happened before or equal to the effective number of the change. self.pending_changes.iter() @@ -165,10 +183,10 @@ impl<H: Eq, N> AuthoritySet<H, N> let remove_up_to = ::std::cmp::min(remove_up_to, self.pending_changes.len()); self.pending_changes.drain(..remove_up_to); - changed = true; // always changed because we strip at least the first change. + status.changed = true; // always changed because we strip at least the first change. } - Ok(changed) + Ok(status) } } @@ -264,16 +282,18 @@ mod tests { authorities.add_pending_change(change_a.clone()); authorities.add_pending_change(change_b.clone()); - authorities.apply_changes::<_, ()>(10, |_| panic!()).unwrap(); + authorities.apply_changes(10, |_| Err(())).unwrap(); assert!(authorities.current_authorities.is_empty()); - authorities.apply_changes::<_, ()>( - 15, - |n| if n == 5 { Ok("hash_a") } else { panic!() } - ).unwrap(); + authorities.apply_changes(15, |n| match n { + 5 => Ok("hash_a"), + 15 => Ok("hash_15_canon"), + _ => Err(()), + }).unwrap(); assert_eq!(authorities.current_authorities, set_a); assert_eq!(authorities.set_id, 1); + assert!(authorities.pending_changes.is_empty()); } #[test] @@ -318,10 +338,12 @@ mod tests { 5 => Ok("hash_a"), 15 => Ok("hash_b"), 16 => Ok("hash_c"), + 26 => Ok("hash_26"), _ => Err(()), }).unwrap(); assert_eq!(authorities.current_authorities, set_c); assert_eq!(authorities.set_id, 2); // has been bumped only twice + assert!(authorities.pending_changes.is_empty()); } } diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index f5379a8113a80a6e289297425cc9e73d2f033174..8b45ab56411e1a000839d1a107340f900436148c 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -67,8 +67,8 @@ mod authorities; const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round"; const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters"; -/// round-number, round-state, set indicator -type LastCompleted<H, N> = (u64, RoundState<H, N>, u64); +/// round-number, round-state +type LastCompleted<H, N> = (u64, RoundState<H, N>); /// A GRANDPA message for a substrate chain. pub type Message<Block> = grandpa::Message<<Block as BlockT>::Hash, NumberFor<Block>>; @@ -85,11 +85,10 @@ pub type Prevote<Block> = grandpa::Prevote<<Block as BlockT>::Hash, NumberFor<Bl pub type Precommit<Block> = grandpa::Precommit<<Block as BlockT>::Hash, NumberFor<Block>>; /// Configuration for the GRANDPA service. +#[derive(Clone)] pub struct Config { /// The expected duration for a message to be gossiped across the network. pub gossip_duration: Duration, - /// The voters. - pub genesis_voters: Vec<AuthorityId>, /// The local signing key. pub local_key: Option<Arc<ed25519::Pair>>, } @@ -331,7 +330,7 @@ fn checked_message_stream<Block: BlockT, S>( round: u64, set_id: u64, inner: S, - voters: Vec<AuthorityId>, + voters: Arc<HashMap<AuthorityId, u64>>, ) -> impl Stream<Item=SignedMessage<Block>,Error=Error> where S: Stream<Item=Vec<u8>,Error=()> @@ -346,7 +345,7 @@ fn checked_message_stream<Block: BlockT, S>( }) .and_then(move |msg| { // check signature. - if !voters.contains(&msg.id) { + if !voters.contains_key(&msg.id) { debug!(target: "afg", "Skipping message from unknown voter {}", msg.id); return Ok(None); } @@ -368,7 +367,7 @@ fn outgoing_messages<Block: BlockT, N: Network>( round: u64, set_id: u64, local_key: Option<Arc<ed25519::Pair>>, - voters: Vec<AuthorityId>, + voters: Arc<HashMap<AuthorityId, u64>>, network: N, ) -> ( impl Stream<Item=SignedMessage<Block>,Error=Error>, @@ -376,7 +375,12 @@ fn outgoing_messages<Block: BlockT, N: Network>( ) { let locals = local_key.and_then(|pair| { let public = pair.public(); - voters.iter().find(|id| id.0 == public.0).map(move |id| (pair, id.clone())) + let id = AuthorityId(public.0); + if voters.contains_key(&id) { + Some((pair, id)) + } else { + None + } }); let (tx, rx) = mpsc::unbounded(); @@ -413,7 +417,7 @@ fn outgoing_messages<Block: BlockT, N: Network>( /// The environment we run GRANDPA in. pub struct Environment<B, E, Block: BlockT, N: Network> { inner: Arc<Client<B, E, Block>>, - voters: HashMap<AuthorityId, u64>, + voters: Arc<HashMap<AuthorityId, u64>>, config: Config, authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>, network: N, @@ -499,28 +503,37 @@ pub trait CompatibleDigestItem<N> { fn scheduled_change(&self) -> Option<ScheduledChange<N>> { None } } +/// A new authority set along with the canonical block it changed at. +#[derive(Debug)] +pub struct NewAuthoritySet<H, N> { + canon_number: N, + canon_hash: H, + set_id: u64, + authorities: Vec<(AuthorityId, u64)>, +} + /// Signals either an early exit of a voter or an error. #[derive(Debug)] -pub enum ExitOrError { +pub enum ExitOrError<H, N> { /// An error occurred. Error(Error), /// Early exit of the voter: the new set ID and the new authorities along with respective weights. - AuthoritiesChanged(u64, Vec<(AuthorityId, u64)>), + AuthoritiesChanged(NewAuthoritySet<H, N>), } -impl From<Error> for ExitOrError { +impl<H, N> From<Error> for ExitOrError<H, N> { fn from(e: Error) -> Self { ExitOrError::Error(e) } } -impl From<ClientError> for ExitOrError { +impl<H, N> From<ClientError> for ExitOrError<H, N> { fn from(e: ClientError) -> Self { ExitOrError::Error(Error::Client(e)) } } -impl From<grandpa::Error> for ExitOrError { +impl<H, N> From<grandpa::Error> for ExitOrError<H, N> { fn from(e: grandpa::Error) -> Self { ExitOrError::Error(Error::from(e)) } @@ -546,7 +559,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f SinkItem = ::grandpa::Message<Block::Hash, NumberFor<Block>>, SinkError = Self::Error, >>; - type Error = ExitOrError; + type Error = ExitOrError<Block::Hash, NumberFor<Block>>; #[allow(unreachable_code)] fn round_data( @@ -565,14 +578,14 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f round, self.set_id, self.network.messages_for(round), - self.config.genesis_voters.clone(), + self.voters.clone(), ); let (out_rx, outgoing) = outgoing_messages::<Block, _>( round, self.set_id, self.config.local_key.clone(), - self.config.genesis_voters.clone(), + self.voters.clone(), self.network.clone(), ); @@ -597,7 +610,7 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f voter::RoundData { prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())), precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())), - voters: self.voters.clone(), + voters: (&*self.voters).clone(), incoming, outgoing, } @@ -628,21 +641,30 @@ impl<B, E, Block: BlockT, N> voter::Environment<Block::Hash, NumberFor<Block>> f self.authority_set.with_mut(|authority_set| { let client = &self.inner; - let prior_id = authority_set.set_id(); - let has_changed = authority_set.apply_changes(number, |canon_number| { + let status = authority_set.apply_changes(number, |canon_number| { client.block_hash_from_id(&BlockId::number(canon_number)) .map(|h| h.expect("given number always less than newly-finalized number; \ thus there is a block with that number finalized already; qed")) })?; - if has_changed { + if status.changed { // TODO [now]: write to disk. if it fails, exit the node. + // write `authorities.encode()` + + if let Some((ref canon_hash, ref canon_number)) = status.new_set_block { + // write `LastFinalized` entry with `RoundState::genesis(canon)`. + } } - let (new_id, set_ref) = authority_set.current(); - if new_id != prior_id { + if let Some((canon_hash, canon_number)) = status.new_set_block { // the authority set has changed. - return Err(ExitOrError::AuthoritiesChanged(new_id, set_ref.to_vec())); + let (new_id, set_ref) = authority_set.current(); + return Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet { + canon_hash, + canon_number, + set_id: new_id, + authorities: set_ref.to_vec(), + })); } Ok(()) @@ -727,22 +749,12 @@ pub fn run_grandpa<B, E, Block: BlockT, N>( NumberFor<Block>: BlockNumberOps, DigestItemFor<Block>: CompatibleDigestItem<NumberFor<Block>>, { + use futures::future::{self, Loop as FutureLoop}; + use runtime_primitives::traits::Zero; let chain_info = client.info()?; let genesis_hash = chain_info.chain.genesis_hash; - let last_finalized = ( - chain_info.chain.finalized_hash, - chain_info.chain.finalized_number, - ); - - let (last_round_number, last_state) = match client.backend().get_aux(LAST_COMPLETED_KEY)? { - None => (0, RoundState::genesis((genesis_hash, <NumberFor<Block>>::zero()))), - Some(raw) => <(u64, RoundState<Block::Hash, NumberFor<Block>>)>::decode(&mut &raw[..]) - .ok_or_else(|| ::client::error::ErrorKind::Backend( - format!("Last GRANDPA round state kept in invalid format") - ))? - }; // TODO [now]: attempt to load from disk. let authority_set = SharedAuthoritySet::genesis( @@ -754,23 +766,66 @@ pub fn run_grandpa<B, E, Block: BlockT, N>( authority_set: authority_set.clone(), }; - let environment = Arc::new(Environment { - inner: client, - config, - voters, - network, + let (last_round_number, last_state) = match client.backend().get_aux(LAST_COMPLETED_KEY)? { + None => (0, RoundState::genesis((genesis_hash, <NumberFor<Block>>::zero()))), + Some(raw) => LastCompleted::decode(&mut &raw[..]) + .ok_or_else(|| ::client::error::ErrorKind::Backend( + format!("Last GRANDPA round state kept in invalid format") + ))? + }; + + let initial_environment = Arc::new(Environment { + inner: client.clone(), + config: config.clone(), + voters: Arc::new(voters), + network: network.clone(), set_id: authority_set.set_id(), - authority_set, + authority_set: authority_set.clone(), }); - let voter = voter::Voter::new( - environment, - last_round_number, - last_state, - last_finalized, - ); + let voters = future::loop_fn((initial_environment, last_round_number, last_state), move |params| { + let (env, last_round_number, last_state) = params; + let chain_info = match client.info() { + Ok(i) => i, + Err(e) => return future::Either::B(future::err(Error::Client(e))), + }; + + let last_finalized = ( + chain_info.chain.finalized_hash, + chain_info.chain.finalized_number, + ); + + let voter = voter::Voter::new(env, last_round_number, last_state, last_finalized); + let client = client.clone(); + let config = config.clone(); + let network = network.clone(); + let authority_set = authority_set.clone(); + future::Either::A(voter.then(move |res| match res { + // voters don't conclude naturally; this could reasonably be an error. + Ok(()) => Ok(FutureLoop::Break(())), + Err(ExitOrError::Error(e)) => Err(e), + Err(ExitOrError::AuthoritiesChanged(new)) => { + let env = Arc::new(Environment { + inner: client, + config, + voters: Arc::new(new.authorities.into_iter().collect()), + set_id: new.set_id, + network, + authority_set, + }); + + // start the new authority set using the block where the + // set changed (not where the signal happened!) as the base. + Ok(FutureLoop::Continue(( + env, + 0, // always start at round 0 when changing sets. + RoundState::genesis((new.canon_hash, new.canon_number)), + ))) + } + })) + }); - let work = voter.map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)); + let work = voters.map_err(|e| warn!("GRANDPA Voter failed: {:?}", e)); Ok((work, block_import)) } @@ -870,7 +925,6 @@ mod tests { let (voter, _) = run_grandpa( Config { gossip_duration: TEST_GOSSIP_DURATION, - genesis_voters: voters.clone(), local_key: Some(Arc::new(key.clone().into())), }, client, @@ -929,7 +983,6 @@ mod tests { let (voter, _) = run_grandpa( Config { gossip_duration: TEST_GOSSIP_DURATION, - genesis_voters: voters.keys().cloned().collect(), local_key, }, client,