diff --git a/substrate/core/finality-grandpa/src/lib.rs b/substrate/core/finality-grandpa/src/lib.rs index 5c6835e3bb4df92a0082d0e6327c164eeb49c398..46b561906837c8ee9f7ed68188e328cd918025b3 100644 --- a/substrate/core/finality-grandpa/src/lib.rs +++ b/substrate/core/finality-grandpa/src/lib.rs @@ -512,8 +512,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>( telemetry_on_connect, } = grandpa_params; - use futures::future::{self, Loop as FutureLoop}; - let LinkHalf { client, select_chain, @@ -521,19 +519,17 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>( voter_commands_rx, } = link; - let PersistentData { authority_set, set_state, consensus_changes } = persistent_data; - let (network, network_startup) = NetworkBridge::new( network, config.clone(), - set_state.clone(), + persistent_data.set_state.clone(), on_exit.clone(), ); register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; let telemetry_task = if let Some(telemetry_on_connect) = telemetry_on_connect { - let authorities = authority_set.clone(); + let authorities = persistent_data.authority_set.clone(); let events = telemetry_on_connect.telemetry_connection_sinks .for_each(move |_| { telemetry!(CONSENSUS_INFO; "afg.authority_set"; @@ -555,30 +551,95 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>( futures::future::Either::B(futures::future::empty()) }; - let voters = authority_set.current_authorities(); - let initial_environment = Arc::new(Environment { - inner: client.clone(), - config: config.clone(), - select_chain: select_chain.clone(), - voters: Arc::new(voters), - network: network.clone(), - set_id: authority_set.set_id(), - authority_set: authority_set.clone(), - consensus_changes: consensus_changes.clone(), - voter_set_state: set_state.clone(), - }); - - let initial_state = (initial_environment, voter_commands_rx.into_future()); - let voter_work = future::loop_fn(initial_state, move |params| { - let (env, voter_commands_rx) = params; - debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id); + let voter_work = VoterWork::new( + client, + config, + network, + select_chain, + persistent_data, + voter_commands_rx + ); + + let voter_work = voter_work + .map(|_| ()) + .map_err(|e| { + error!("GRANDPA Voter failed: {:?}", e); + telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); + }); + + let voter_work = network_startup.and_then(move |()| voter_work); + + // Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa. + let telemetry_task = telemetry_task + .then(|_| futures::future::empty::<(), ()>()); + + Ok(voter_work.select(on_exit).select2(telemetry_task).then(|_| Ok(()))) +} + +/// Future that powers the voter. +#[must_use] +struct VoterWork<B, E, Block: BlockT, N: Network<Block>, RA, SC> { + voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>, + env: Arc<Environment<B, E, Block, N, RA, SC>>, + voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>, +} + +impl<B, E, Block, N, RA, SC> VoterWork<B, E, Block, N, RA, SC> +where + Block: BlockT<Hash=H256>, + N: Network<Block> + Sync, + N::In: Send + 'static, + NumberFor<Block>: BlockNumberOps, + RA: 'static + Send + Sync, + E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static, + B: Backend<Block, Blake2Hasher> + 'static, + SC: SelectChain<Block> + 'static, +{ + fn new( + client: Arc<Client<B, E, Block, RA>>, + config: Config, + network: NetworkBridge<Block, N>, + select_chain: SC, + persistent_data: PersistentData<Block>, + voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>, + ) -> Self { + + let voters = persistent_data.authority_set.current_authorities(); + let env = Arc::new(Environment { + inner: client, + select_chain, + voters: Arc::new(voters), + config, + network, + set_id: persistent_data.authority_set.set_id(), + authority_set: persistent_data.authority_set.clone(), + consensus_changes: persistent_data.consensus_changes.clone(), + voter_set_state: persistent_data.set_state.clone(), + }); + + let mut work = VoterWork { + // `voter` is set to a temporary value and replaced below when + // calling `rebuild_voter`. + voter: Box::new(futures::empty()) as Box<_>, + env, + voter_commands_rx, + }; + work.rebuild_voter(); + work + } + + /// Rebuilds the `self.voter` field using the current authority set + /// state. This method should be called when we know that the authority set + /// has changed (e.g. as signalled by a voter command). + fn rebuild_voter(&mut self) { + debug!(target: "afg", "{}: Starting new voter with set ID {}", self.env.config.name(), self.env.set_id); telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter"; - "name" => ?config.name(), "set_id" => ?env.set_id + "name" => ?self.env.config.name(), "set_id" => ?self.env.set_id ); - let mut maybe_voter = match &*env.voter_set_state.read() { + match &*self.env.voter_set_state.read() { VoterSetState::Live { completed_rounds, .. } => { - let chain_info = client.info(); + let chain_info = self.env.inner.info(); let last_finalized = ( chain_info.chain.finalized_hash, @@ -586,140 +647,147 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>( ); let global_comms = global_communication( - env.set_id, - &env.voters, - &client, - &network, - &config.keystore, + self.env.set_id, + &self.env.voters, + &self.env.inner, + &self.env.network, + &self.env.config.keystore, ); let last_completed_round = completed_rounds.last(); - Some(voter::Voter::new( - env.clone(), - (*env.voters).clone(), + let voter = voter::Voter::new( + self.env.clone(), + (*self.env.voters).clone(), global_comms, last_completed_round.number, last_completed_round.state.clone(), last_finalized, - )) + ); + + self.voter = Box::new(voter); }, - VoterSetState::Paused { .. } => None, + VoterSetState::Paused { .. } => + self.voter = Box::new(futures::empty()), }; + } - // needs to be combined with another future otherwise it can deadlock. - let poll_voter = future::poll_fn(move || match maybe_voter { - Some(ref mut voter) => voter.poll(), - None => Ok(Async::NotReady), - }); + fn handle_voter_command( + &mut self, + command: VoterCommand<Block::Hash, NumberFor<Block>> + ) -> Result<(), Error> { + match command { + VoterCommand::ChangeAuthorities(new) => { + let voters: Vec<String> = new.authorities.iter().map(move |(a, _)| { + format!("{}", a) + }).collect(); + telemetry!(CONSENSUS_INFO; "afg.voter_command_change_authorities"; + "number" => ?new.canon_number, + "hash" => ?new.canon_hash, + "voters" => ?voters, + "set_id" => ?new.set_id, + ); - let client = client.clone(); - let config = config.clone(); - let network = network.clone(); - let select_chain = select_chain.clone(); - let authority_set = authority_set.clone(); - let consensus_changes = consensus_changes.clone(); - - let handle_voter_command = move |command: VoterCommand<_, _>, voter_commands_rx| { - match command { - VoterCommand::ChangeAuthorities(new) => { - let voters: Vec<String> = new.authorities.iter().map(move |(a, _)| { - format!("{}", a) - }).collect(); - telemetry!(CONSENSUS_INFO; "afg.voter_command_change_authorities"; - "number" => ?new.canon_number, - "hash" => ?new.canon_hash, - "voters" => ?voters, - "set_id" => ?new.set_id, - ); + // start the new authority set using the block where the + // set changed (not where the signal happened!) as the base. + let set_state = VoterSetState::live( + new.set_id, + &*self.env.authority_set.inner().read(), + (new.canon_hash, new.canon_number), + ); - // start the new authority set using the block where the - // set changed (not where the signal happened!) as the base. - let set_state = VoterSetState::live( - new.set_id, - &*authority_set.inner().read(), - (new.canon_hash, new.canon_number), - ); + #[allow(deprecated)] + aux_schema::write_voter_set_state(&**self.env.inner.backend(), &set_state)?; + + let set_state: SharedVoterSetState<_> = set_state.into(); + + self.env = Arc::new(Environment { + voters: Arc::new(new.authorities.into_iter().collect()), + set_id: new.set_id, + voter_set_state: set_state, + // Fields below are simply transferred and not updated. + inner: self.env.inner.clone(), + select_chain: self.env.select_chain.clone(), + config: self.env.config.clone(), + authority_set: self.env.authority_set.clone(), + consensus_changes: self.env.consensus_changes.clone(), + network: self.env.network.clone(), + }); + + self.rebuild_voter(); + Ok(()) + } + VoterCommand::Pause(reason) => { + info!(target: "afg", "Pausing old validator set: {}", reason); + + // not racing because old voter is shut down. + self.env.update_voter_set_state(|voter_set_state| { + let completed_rounds = voter_set_state.completed_rounds(); + let set_state = VoterSetState::Paused { completed_rounds }; #[allow(deprecated)] - aux_schema::write_voter_set_state(&**client.backend(), &set_state)?; - - let set_state: SharedVoterSetState<_> = set_state.into(); - - let env = Arc::new(Environment { - inner: client, - select_chain, - config, - voters: Arc::new(new.authorities.into_iter().collect()), - set_id: new.set_id, - network, - authority_set, - consensus_changes, - voter_set_state: set_state, - }); - - Ok(FutureLoop::Continue((env, voter_commands_rx))) - } - VoterCommand::Pause(reason) => { - info!(target: "afg", "Pausing old validator set: {}", reason); + aux_schema::write_voter_set_state(&**self.env.inner.backend(), &set_state)?; + Ok(Some(set_state)) + })?; - // not racing because old voter is shut down. - env.update_voter_set_state(|voter_set_state| { - let completed_rounds = voter_set_state.completed_rounds(); - let set_state = VoterSetState::Paused { completed_rounds }; + self.rebuild_voter(); + Ok(()) + } + } + } +} - #[allow(deprecated)] - aux_schema::write_voter_set_state(&**client.backend(), &set_state)?; - Ok(Some(set_state)) - })?; +impl<B, E, Block, N, RA, SC> Future for VoterWork<B, E, Block, N, RA, SC> +where + Block: BlockT<Hash=H256>, + N: Network<Block> + Sync, + N::In: Send + 'static, + NumberFor<Block>: BlockNumberOps, + RA: 'static + Send + Sync, + E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static, + B: Backend<Block, Blake2Hasher> + 'static, + SC: SelectChain<Block> + 'static, +{ + type Item = (); + type Error = Error; - Ok(FutureLoop::Continue((env, voter_commands_rx))) - }, + fn poll(&mut self) -> Poll<Self::Item, Self::Error> { + match self.voter.poll() { + Ok(Async::NotReady) => {} + Ok(Async::Ready(())) => { + // voters don't conclude naturally + return Err(Error::Safety("GRANDPA voter has concluded.".into())) } - }; + Err(CommandOrError::Error(e)) => { + // return inner observer error + return Err(e) + } + Err(CommandOrError::VoterCommand(command)) => { + // some command issued internally + self.handle_voter_command(command)?; + futures::task::current().notify(); + } + } - poll_voter.select2(voter_commands_rx).then(move |res| match res { - Ok(future::Either::A(((), _))) => { - // voters don't conclude naturally - Err(Error::Safety("GRANDPA voter has concluded.".into())) - }, - Err(future::Either::B(_)) => { + match self.voter_commands_rx.poll() { + Ok(Async::NotReady) => {} + Err(_) => { // the `voter_commands_rx` stream should not fail. - Ok(FutureLoop::Break(())) - }, - Ok(future::Either::B(((None, _), _))) => { + return Ok(Async::Ready(())) + } + Ok(Async::Ready(None)) => { // the `voter_commands_rx` stream should never conclude since it's never closed. - Ok(FutureLoop::Break(())) - }, - Err(future::Either::A((CommandOrError::Error(e), _))) => { - // return inner voter error - Err(e) + return Ok(Async::Ready(())) } - Ok(future::Either::B(((Some(command), voter_commands_rx), _))) => { - // some command issued externally. - handle_voter_command(command, voter_commands_rx.into_future()) + Ok(Async::Ready(Some(command))) => { + // some command issued externally + self.handle_voter_command(command)?; + futures::task::current().notify(); } - Err(future::Either::A((CommandOrError::VoterCommand(command), voter_commands_rx))) => { - // some command issued internally. - handle_voter_command(command, voter_commands_rx) - }, - }) - }); - - let voter_work = voter_work - .map(|_| ()) - .map_err(|e| { - error!("GRANDPA Voter failed: {:?}", e); - telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); - }); - - let voter_work = network_startup.and_then(move |()| voter_work); - - // Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa. - let telemetry_task = telemetry_task - .then(|_| futures::future::empty::<(), ()>()); + } - Ok(voter_work.select(on_exit).select2(telemetry_task).then(|_| Ok(()))) + Ok(Async::NotReady) + } } #[deprecated(since = "1.1", note = "Please switch to run_grandpa_voter.")]