Skip to content
Snippets Groups Projects
Commit 9d611827 authored by Pierre Krieger's avatar Pierre Krieger Committed by Gavin Wood
Browse files

Rewrite the GRANDPA voter to not use a loop (#3393)

parent 60582079
Branches
No related merge requests found
......@@ -512,8 +512,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
telemetry_on_connect,
} = grandpa_params;
use futures::future::{self, Loop as FutureLoop};
let LinkHalf {
client,
select_chain,
......@@ -521,19 +519,17 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
voter_commands_rx,
} = link;
let PersistentData { authority_set, set_state, consensus_changes } = persistent_data;
let (network, network_startup) = NetworkBridge::new(
network,
config.clone(),
set_state.clone(),
persistent_data.set_state.clone(),
on_exit.clone(),
);
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
let telemetry_task = if let Some(telemetry_on_connect) = telemetry_on_connect {
let authorities = authority_set.clone();
let authorities = persistent_data.authority_set.clone();
let events = telemetry_on_connect.telemetry_connection_sinks
.for_each(move |_| {
telemetry!(CONSENSUS_INFO; "afg.authority_set";
......@@ -555,30 +551,95 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
futures::future::Either::B(futures::future::empty())
};
let voters = authority_set.current_authorities();
let initial_environment = Arc::new(Environment {
inner: client.clone(),
config: config.clone(),
select_chain: select_chain.clone(),
voters: Arc::new(voters),
network: network.clone(),
set_id: authority_set.set_id(),
authority_set: authority_set.clone(),
consensus_changes: consensus_changes.clone(),
voter_set_state: set_state.clone(),
});
let initial_state = (initial_environment, voter_commands_rx.into_future());
let voter_work = future::loop_fn(initial_state, move |params| {
let (env, voter_commands_rx) = params;
debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id);
let voter_work = VoterWork::new(
client,
config,
network,
select_chain,
persistent_data,
voter_commands_rx
);
let voter_work = voter_work
.map(|_| ())
.map_err(|e| {
error!("GRANDPA Voter failed: {:?}", e);
telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e);
});
let voter_work = network_startup.and_then(move |()| voter_work);
// Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa.
let telemetry_task = telemetry_task
.then(|_| futures::future::empty::<(), ()>());
Ok(voter_work.select(on_exit).select2(telemetry_task).then(|_| Ok(())))
}
/// Future that powers the voter.
#[must_use]
struct VoterWork<B, E, Block: BlockT, N: Network<Block>, RA, SC> {
voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>,
env: Arc<Environment<B, E, Block, N, RA, SC>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
}
impl<B, E, Block, N, RA, SC> VoterWork<B, E, Block, N, RA, SC>
where
Block: BlockT<Hash=H256>,
N: Network<Block> + Sync,
N::In: Send + 'static,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
B: Backend<Block, Blake2Hasher> + 'static,
SC: SelectChain<Block> + 'static,
{
fn new(
client: Arc<Client<B, E, Block, RA>>,
config: Config,
network: NetworkBridge<Block, N>,
select_chain: SC,
persistent_data: PersistentData<Block>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
) -> Self {
let voters = persistent_data.authority_set.current_authorities();
let env = Arc::new(Environment {
inner: client,
select_chain,
voters: Arc::new(voters),
config,
network,
set_id: persistent_data.authority_set.set_id(),
authority_set: persistent_data.authority_set.clone(),
consensus_changes: persistent_data.consensus_changes.clone(),
voter_set_state: persistent_data.set_state.clone(),
});
let mut work = VoterWork {
// `voter` is set to a temporary value and replaced below when
// calling `rebuild_voter`.
voter: Box::new(futures::empty()) as Box<_>,
env,
voter_commands_rx,
};
work.rebuild_voter();
work
}
/// Rebuilds the `self.voter` field using the current authority set
/// state. This method should be called when we know that the authority set
/// has changed (e.g. as signalled by a voter command).
fn rebuild_voter(&mut self) {
debug!(target: "afg", "{}: Starting new voter with set ID {}", self.env.config.name(), self.env.set_id);
telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter";
"name" => ?config.name(), "set_id" => ?env.set_id
"name" => ?self.env.config.name(), "set_id" => ?self.env.set_id
);
let mut maybe_voter = match &*env.voter_set_state.read() {
match &*self.env.voter_set_state.read() {
VoterSetState::Live { completed_rounds, .. } => {
let chain_info = client.info();
let chain_info = self.env.inner.info();
let last_finalized = (
chain_info.chain.finalized_hash,
......@@ -586,140 +647,147 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, X>(
);
let global_comms = global_communication(
env.set_id,
&env.voters,
&client,
&network,
&config.keystore,
self.env.set_id,
&self.env.voters,
&self.env.inner,
&self.env.network,
&self.env.config.keystore,
);
let last_completed_round = completed_rounds.last();
Some(voter::Voter::new(
env.clone(),
(*env.voters).clone(),
let voter = voter::Voter::new(
self.env.clone(),
(*self.env.voters).clone(),
global_comms,
last_completed_round.number,
last_completed_round.state.clone(),
last_finalized,
))
);
self.voter = Box::new(voter);
},
VoterSetState::Paused { .. } => None,
VoterSetState::Paused { .. } =>
self.voter = Box::new(futures::empty()),
};
}
// needs to be combined with another future otherwise it can deadlock.
let poll_voter = future::poll_fn(move || match maybe_voter {
Some(ref mut voter) => voter.poll(),
None => Ok(Async::NotReady),
});
fn handle_voter_command(
&mut self,
command: VoterCommand<Block::Hash, NumberFor<Block>>
) -> Result<(), Error> {
match command {
VoterCommand::ChangeAuthorities(new) => {
let voters: Vec<String> = new.authorities.iter().map(move |(a, _)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.voter_command_change_authorities";
"number" => ?new.canon_number,
"hash" => ?new.canon_hash,
"voters" => ?voters,
"set_id" => ?new.set_id,
);
let client = client.clone();
let config = config.clone();
let network = network.clone();
let select_chain = select_chain.clone();
let authority_set = authority_set.clone();
let consensus_changes = consensus_changes.clone();
let handle_voter_command = move |command: VoterCommand<_, _>, voter_commands_rx| {
match command {
VoterCommand::ChangeAuthorities(new) => {
let voters: Vec<String> = new.authorities.iter().map(move |(a, _)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.voter_command_change_authorities";
"number" => ?new.canon_number,
"hash" => ?new.canon_hash,
"voters" => ?voters,
"set_id" => ?new.set_id,
);
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
let set_state = VoterSetState::live(
new.set_id,
&*self.env.authority_set.inner().read(),
(new.canon_hash, new.canon_number),
);
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
let set_state = VoterSetState::live(
new.set_id,
&*authority_set.inner().read(),
(new.canon_hash, new.canon_number),
);
#[allow(deprecated)]
aux_schema::write_voter_set_state(&**self.env.inner.backend(), &set_state)?;
let set_state: SharedVoterSetState<_> = set_state.into();
self.env = Arc::new(Environment {
voters: Arc::new(new.authorities.into_iter().collect()),
set_id: new.set_id,
voter_set_state: set_state,
// Fields below are simply transferred and not updated.
inner: self.env.inner.clone(),
select_chain: self.env.select_chain.clone(),
config: self.env.config.clone(),
authority_set: self.env.authority_set.clone(),
consensus_changes: self.env.consensus_changes.clone(),
network: self.env.network.clone(),
});
self.rebuild_voter();
Ok(())
}
VoterCommand::Pause(reason) => {
info!(target: "afg", "Pausing old validator set: {}", reason);
// not racing because old voter is shut down.
self.env.update_voter_set_state(|voter_set_state| {
let completed_rounds = voter_set_state.completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
#[allow(deprecated)]
aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
let set_state: SharedVoterSetState<_> = set_state.into();
let env = Arc::new(Environment {
inner: client,
select_chain,
config,
voters: Arc::new(new.authorities.into_iter().collect()),
set_id: new.set_id,
network,
authority_set,
consensus_changes,
voter_set_state: set_state,
});
Ok(FutureLoop::Continue((env, voter_commands_rx)))
}
VoterCommand::Pause(reason) => {
info!(target: "afg", "Pausing old validator set: {}", reason);
aux_schema::write_voter_set_state(&**self.env.inner.backend(), &set_state)?;
Ok(Some(set_state))
})?;
// not racing because old voter is shut down.
env.update_voter_set_state(|voter_set_state| {
let completed_rounds = voter_set_state.completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
self.rebuild_voter();
Ok(())
}
}
}
}
#[allow(deprecated)]
aux_schema::write_voter_set_state(&**client.backend(), &set_state)?;
Ok(Some(set_state))
})?;
impl<B, E, Block, N, RA, SC> Future for VoterWork<B, E, Block, N, RA, SC>
where
Block: BlockT<Hash=H256>,
N: Network<Block> + Sync,
N::In: Send + 'static,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
B: Backend<Block, Blake2Hasher> + 'static,
SC: SelectChain<Block> + 'static,
{
type Item = ();
type Error = Error;
Ok(FutureLoop::Continue((env, voter_commands_rx)))
},
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.voter.poll() {
Ok(Async::NotReady) => {}
Ok(Async::Ready(())) => {
// voters don't conclude naturally
return Err(Error::Safety("GRANDPA voter has concluded.".into()))
}
};
Err(CommandOrError::Error(e)) => {
// return inner observer error
return Err(e)
}
Err(CommandOrError::VoterCommand(command)) => {
// some command issued internally
self.handle_voter_command(command)?;
futures::task::current().notify();
}
}
poll_voter.select2(voter_commands_rx).then(move |res| match res {
Ok(future::Either::A(((), _))) => {
// voters don't conclude naturally
Err(Error::Safety("GRANDPA voter has concluded.".into()))
},
Err(future::Either::B(_)) => {
match self.voter_commands_rx.poll() {
Ok(Async::NotReady) => {}
Err(_) => {
// the `voter_commands_rx` stream should not fail.
Ok(FutureLoop::Break(()))
},
Ok(future::Either::B(((None, _), _))) => {
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => {
// the `voter_commands_rx` stream should never conclude since it's never closed.
Ok(FutureLoop::Break(()))
},
Err(future::Either::A((CommandOrError::Error(e), _))) => {
// return inner voter error
Err(e)
return Ok(Async::Ready(()))
}
Ok(future::Either::B(((Some(command), voter_commands_rx), _))) => {
// some command issued externally.
handle_voter_command(command, voter_commands_rx.into_future())
Ok(Async::Ready(Some(command))) => {
// some command issued externally
self.handle_voter_command(command)?;
futures::task::current().notify();
}
Err(future::Either::A((CommandOrError::VoterCommand(command), voter_commands_rx))) => {
// some command issued internally.
handle_voter_command(command, voter_commands_rx)
},
})
});
let voter_work = voter_work
.map(|_| ())
.map_err(|e| {
error!("GRANDPA Voter failed: {:?}", e);
telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e);
});
let voter_work = network_startup.and_then(move |()| voter_work);
// Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa.
let telemetry_task = telemetry_task
.then(|_| futures::future::empty::<(), ()>());
}
Ok(voter_work.select(on_exit).select2(telemetry_task).then(|_| Ok(())))
Ok(Async::NotReady)
}
}
#[deprecated(since = "1.1", note = "Please switch to run_grandpa_voter.")]
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment