You need to sign in or sign up before continuing.
Newer
Older
// Copyright 2018-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Integration of the GRANDPA finality gadget into substrate.
//!
//! This crate is unstable and the API and usage may change.
//!
//! This crate provides a long-running future that produces finality notifications.
//!
//! # Usage
//!
Max Inden
committed
//! First, create a block-import wrapper with the `block_import` function. The
//! GRANDPA worker needs to be linked together with this block import object, so
//! a `LinkHalf` is returned as well. All blocks imported (from network or
//! consensus or otherwise) must pass through this wrapper, otherwise consensus
//! is likely to break in unexpected ways.
Max Inden
committed
//! Next, use the `LinkHalf` and a local configuration to `run_grandpa_voter`.
//! This requires a `Network` implementation. The returned future should be
//! driven to completion and will finalize blocks in the background.
//!
//! # Changing authority sets
//!
//! The rough idea behind changing authority sets in GRANDPA is that at some point,
//! we obtain agreement for some maximum block height that the current set can
//! finalize, and once a block with that height is finalized the next set will
//! pick up finalization from there.
//!
//! Technically speaking, this would be implemented as a voting rule which says,
//! "if there is a signal for a change in N blocks in block B, only vote on
//! chains with length NUM(B) + N if they contain B". This conditional-inclusion
//! logic is complex to compute because it requires looking arbitrarily far
//! back in the chain.
//!
//! Instead, we keep track of a list of all signals we've seen so far (across
//! all forks), sorted ascending by the block number they would be applied at.
//! We never vote on chains with number higher than the earliest handoff block
//! number (this is num(signal) + N). When finalizing a block, we either apply
//! or prune any signaled changes based on whether the signaling block is
//! included in the newly-finalized chain.
use futures03::{StreamExt, future::ready};
use log::{debug, error, info};
use futures::sync::mpsc;
Bastian Köcher
committed
use sc_client_api::{BlockchainEvents, CallExecutor, backend::{AuxStore, Backend}, ExecutionStrategy};
Benjamin Kampmann
committed
use sp_blockchain::{HeaderBackend, Error as ClientError};
use sc_client::Client;
use parity_scale_codec::{Decode, Encode};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{NumberFor, Block as BlockT, DigestFor, Zero};
use sc_keystore::KeyStorePtr;
use sp_inherents::InherentDataProviders;
use sp_consensus::SelectChain;
Bastian Köcher
committed
use sp_core::Pair;
use sc_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG, CONSENSUS_WARN};
use finality_grandpa::Error as GrandpaError;
use finality_grandpa::{voter, BlockNumberOps, voter_set::VoterSet};
use std::{fmt, io};
mod authorities;
mod consensus_changes;
mod environment;
pub use finality_proof::FinalityProofProvider;
pub use justification::GrandpaJustification;
pub use light_import::light_block_import;
BeforeBestBlockBy, ThreeQuartersOfTheUnfinalizedChain, VotingRule, VotingRulesBuilder
use aux_schema::PersistentData;
use environment::{Environment, VoterSetState};
use until_imported::UntilGlobalMessageBlocksImported;
use communication::{NetworkBridge, Network as NetworkT};
use sp_finality_grandpa::{AuthorityList, AuthorityPair, AuthoritySignature, SetId};
// Re-export these two because it's just so damn convenient.
pub use sp_finality_grandpa::{AuthorityId, ScheduledChange};
/// A GRANDPA message for a substrate chain.
pub type Message<Block> = finality_grandpa::Message<<Block as BlockT>::Hash, NumberFor<Block>>;
pub type SignedMessage<Block> = finality_grandpa::SignedMessage<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
/// A primary propose message for this chain's block type.
pub type PrimaryPropose<Block> = finality_grandpa::PrimaryPropose<<Block as BlockT>::Hash, NumberFor<Block>>;
/// A prevote message for this chain's block type.
pub type Prevote<Block> = finality_grandpa::Prevote<<Block as BlockT>::Hash, NumberFor<Block>>;
/// A precommit message for this chain's block type.
pub type Precommit<Block> = finality_grandpa::Precommit<<Block as BlockT>::Hash, NumberFor<Block>>;
/// A catch up message for this chain's block type.
pub type CatchUp<Block> = finality_grandpa::CatchUp<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
/// A commit message for this chain's block type.
pub type Commit<Block> = finality_grandpa::Commit<
<Block as BlockT>::Hash,
NumberFor<Block>,
>;
/// A compact commit message for this chain's block type.
pub type CompactCommit<Block> = finality_grandpa::CompactCommit<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthorityId,
>;
/// A global communication input stream for commits and catch up messages. Not
/// exposed publicly, used internally to simplify types in the communication
/// layer.
type CommunicationIn<Block> = finality_grandpa::voter::CommunicationIn<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
/// Global communication input stream for commits and catch up messages, with
/// the hash type not being derived from the block, useful for forcing the hash
/// to some type (e.g. `H256`) when the compiler can't do the inference.
type CommunicationInH<Block, H> = finality_grandpa::voter::CommunicationIn<
H,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
/// A global communication sink for commits. Not exposed publicly, used
/// internally to simplify types in the communication layer.
type CommunicationOut<Block> = finality_grandpa::voter::CommunicationOut<
<Block as BlockT>::Hash,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
>;
/// Global communication sink for commits with the hash type not being derived
/// from the block, useful for forcing the hash to some type (e.g. `H256`) when
/// the compiler can't do the inference.
type CommunicationOutH<Block, H> = finality_grandpa::voter::CommunicationOut<
H,
NumberFor<Block>,
AuthoritySignature,
AuthorityId,
/// Configuration for the GRANDPA service.
pub struct Config {
/// The expected duration for a message to be gossiped across the network.
pub gossip_duration: Duration,
/// Justification generation period (in blocks). GRANDPA will try to generate justifications
/// at least every justification_period blocks. There are some other events which might cause
/// justification generation.
/// Whether the GRANDPA observer protocol is live on the network and thereby
/// a full-node not running as a validator is running the GRANDPA observer
/// protocol (we will only issue catch-up requests to authorities when the
/// observer protocol is enabled).
pub observer_enabled: bool,
/// Whether the node is running as an authority (i.e. running the full GRANDPA protocol).
pub is_authority: bool,
/// Some local identifier of the voter.
pub name: Option<String>,
/// The keystore that manages the keys of this node.
pub keystore: Option<sc_keystore::KeyStorePtr>,
}
impl Config {
fn name(&self) -> &str {
self.name.as_ref().map(|s| s.as_str()).unwrap_or("<unknown>")
}
}
/// Errors that can occur while voting in GRANDPA.
#[derive(Debug)]
pub enum Error {
/// An error within grandpa.
Grandpa(GrandpaError),
/// A network error.
Network(String),
/// A blockchain error.
Blockchain(String),
/// Could not complete a round on disk.
/// An invariant has been violated (e.g. not finalizing pending change blocks in-order)
Safety(String),
Timer(io::Error),
}
impl From<GrandpaError> for Error {
fn from(e: GrandpaError) -> Self {
Error::Grandpa(e)
}
}
impl From<ClientError> for Error {
fn from(e: ClientError) -> Self {
Error::Client(e)
}
}
/// Something which can determine if a block is known.
Max Inden
committed
pub(crate) trait BlockStatus<Block: BlockT> {
/// Return `Ok(Some(number))` or `Ok(None)` depending on whether the block
/// is definitely known and has been imported.
/// If an unexpected error occurs, return that.
fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error>;
Bastian Köcher
committed
impl<B, E, Block: BlockT, RA> BlockStatus<Block> for Arc<Client<B, E, Block, RA>> where
B: Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
RA: Send + Sync,
NumberFor<Block>: BlockNumberOps,
fn block_number(&self, hash: Block::Hash) -> Result<Option<NumberFor<Block>>, Error> {
self.block_number_from_id(&BlockId::Hash(hash))
.map_err(|e| Error::Blockchain(format!("{:?}", e)))
}
}
Max Inden
committed
/// Something that one can ask to do a block sync request.
pub(crate) trait BlockSyncRequester<Block: BlockT> {
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
Max Inden
committed
}
impl<Block, Network> BlockSyncRequester<Block> for NetworkBridge<Block, Network> where
Max Inden
committed
Block: BlockT,
Network: NetworkT<Block>,
Max Inden
committed
{
fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: Block::Hash, number: NumberFor<Block>) {
Max Inden
committed
NetworkBridge::set_sync_fork_request(self, peers, hash, number)
}
}
/// A new authority set along with the canonical block it changed at.
#[derive(Debug)]
pub(crate) struct NewAuthoritySet<H, N> {
pub(crate) canon_number: N,
pub(crate) canon_hash: H,
pub(crate) authorities: AuthorityList,
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
}
/// Commands issued to the voter.
#[derive(Debug)]
pub(crate) enum VoterCommand<H, N> {
/// Pause the voter for given reason.
Pause(String),
/// New authorities.
ChangeAuthorities(NewAuthoritySet<H, N>)
}
impl<H, N> fmt::Display for VoterCommand<H, N> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
VoterCommand::Pause(ref reason) => write!(f, "Pausing voter: {}", reason),
VoterCommand::ChangeAuthorities(_) => write!(f, "Changing authorities"),
}
}
}
/// Signals either an early exit of a voter or an error.
#[derive(Debug)]
pub(crate) enum CommandOrError<H, N> {
/// An error occurred.
Error(Error),
/// A command to the voter.
VoterCommand(VoterCommand<H, N>),
}
impl<H, N> From<Error> for CommandOrError<H, N> {
fn from(e: Error) -> Self {
CommandOrError::Error(e)
}
}
impl<H, N> From<ClientError> for CommandOrError<H, N> {
fn from(e: ClientError) -> Self {
CommandOrError::Error(Error::Client(e))
}
}
impl<H, N> From<finality_grandpa::Error> for CommandOrError<H, N> {
fn from(e: finality_grandpa::Error) -> Self {
CommandOrError::Error(Error::from(e))
}
}
impl<H, N> From<VoterCommand<H, N>> for CommandOrError<H, N> {
fn from(e: VoterCommand<H, N>) -> Self {
CommandOrError::VoterCommand(e)
}
}
impl<H: fmt::Debug, N: fmt::Debug> ::std::error::Error for CommandOrError<H, N> { }
impl<H, N> fmt::Display for CommandOrError<H, N> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
CommandOrError::Error(ref e) => write!(f, "{:?}", e),
CommandOrError::VoterCommand(ref cmd) => write!(f, "{}", cmd),
}
}
}
Bastian Köcher
committed
pub struct LinkHalf<B, E, Block: BlockT, RA, SC> {
client: Arc<Client<B, E, Block, RA>>,
select_chain: SC,
persistent_data: PersistentData<Block>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
/// Provider for the Grandpa authority set configured on the genesis block.
pub trait GenesisAuthoritySetProvider<Block: BlockT> {
/// Get the authority set at the genesis block.
fn get(&self) -> Result<AuthorityList, ClientError>;
}
Bastian Köcher
committed
impl<B, E, Block: BlockT, RA> GenesisAuthoritySetProvider<Block> for Client<B, E, Block, RA>
Bastian Köcher
committed
B: Backend<Block> + Send + Sync + 'static,
E: CallExecutor<Block> + Send + Sync,
RA: Send + Sync,
{
fn get(&self) -> Result<AuthorityList, ClientError> {
// This implementation uses the Grandpa runtime API instead of reading directly from the
// `GRANDPA_AUTHORITIES_KEY` as the data may have been migrated since the genesis block of
// the chain, whereas the runtime API is backwards compatible.
self.executor()
.call(
&BlockId::Number(Zero::zero()),
"GrandpaApi_grandpa_authorities",
&[],
ExecutionStrategy::NativeElseWasm,
None,
)
.and_then(|call_result| {
Decode::decode(&mut &call_result[..])
.map_err(|err| ClientError::CallResultDecode(
"failed to decode GRANDPA authorities set proof".into(), err
))
})
}
}
/// Make block importer and link half necessary to tie the background voter
/// to it.
Bastian Köcher
committed
pub fn block_import<B, E, Block: BlockT, RA, SC>(
client: Arc<Client<B, E, Block, RA>>,
genesis_authorities_provider: &dyn GenesisAuthoritySetProvider<Block>,
) -> Result<(
GrandpaBlockImport<B, E, Block, RA, SC>,
LinkHalf<B, E, Block, RA, SC>
), ClientError>
where
Bastian Köcher
committed
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync,
RA: Send + Sync,
SC: SelectChain<Block>,
Bastian Köcher
committed
Client<B, E, Block, RA>: AuxStore,
let chain_info = client.chain_info();
let genesis_hash = chain_info.genesis_hash;
let persistent_data = aux_schema::load_persistent(
genesis_hash,
<NumberFor<Block>>::zero(),
|| {
let authorities = genesis_authorities_provider.get()?;
telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities";
"authorities_len" => ?authorities.len()
)?;
let (voter_commands_tx, voter_commands_rx) = mpsc::unbounded();
GrandpaBlockImport::new(
client.clone(),
select_chain.clone(),
persistent_data.authority_set.clone(),
voter_commands_tx,
persistent_data.consensus_changes.clone(),
LinkHalf {
client,
select_chain,
persistent_data,
voter_commands_rx,
Bastian Köcher
committed
fn global_communication<Block: BlockT, B, E, N, RA>(
voters: &Arc<VoterSet<AuthorityId>>,
client: &Arc<Client<B, E, Block, RA>>,
network: &NetworkBridge<Block, N>,
Bastian Köcher
committed
Item = CommunicationInH<Block, Block::Hash>,
Error = CommandOrError<Block::Hash, NumberFor<Block>>,
Bastian Köcher
committed
SinkItem = CommunicationOutH<Block, Block::Hash>,
SinkError = CommandOrError<Block::Hash, NumberFor<Block>>,
Bastian Köcher
committed
B: Backend<Block>,
E: CallExecutor<Block> + Send + Sync,
N: NetworkT<Block>,
RA: Send + Sync,
NumberFor<Block>: BlockNumberOps,
{
let is_voter = is_voter(voters, keystore).is_some();
let (global_in, global_out) = network.global_communication(
communication::SetId(set_id),
is_voter,
// block commit and catch up messages until relevant blocks are imported.
let global_in = UntilGlobalMessageBlocksImported::new(
client.import_notification_stream(),
Max Inden
committed
network.clone(),
"global",
let global_in = global_in.map_err(CommandOrError::from);
let global_out = global_out.sink_map_err(CommandOrError::from);
/// Register the finality tracker inherent data provider (which is used by
/// GRANDPA), if not registered already.
Bastian Köcher
committed
fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT, RA>(
client: Arc<Client<B, E, Block, RA>>,
inherent_data_providers: &InherentDataProviders,
) -> Result<(), sp_consensus::Error> where
Bastian Köcher
committed
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
RA: Send + Sync + 'static,
{
if !inherent_data_providers.has_provider(&sp_finality_tracker::INHERENT_IDENTIFIER) {
.register_provider(sp_finality_tracker::InherentDataProvider::new(move || {
telemetry!(CONSENSUS_INFO; "afg.finalized";
"finalized_number" => ?info.finalized_number,
"finalized_hash" => ?info.finalized_hash,
);
Ok(info.finalized_number)
.map_err(|err| sp_consensus::Error::InherentData(err.into()))
} else {
Ok(())
}
}
Bastian Köcher
committed
pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> {
/// Configuration for the GRANDPA service.
pub config: Config,
/// A link to the block import worker.
pub link: LinkHalf<B, E, Block, RA, SC>,
/// The Network instance.
pub network: N,
/// The inherent data providers.
pub inherent_data_providers: InherentDataProviders,
/// Handle to a future that will resolve on exit.
pub on_exit: X,
/// If supplied, can be used to hook on telemetry connection established events.
pub telemetry_on_connect: Option<futures03::channel::mpsc::UnboundedReceiver<()>>,
/// A voting rule used to potentially restrict target votes.
pub voting_rule: VR,
/// How to spawn background tasks.
pub executor: Sp,
/// Run a GRANDPA voter as a task. Provide configuration and a link to a
/// block import worker that has already been instantiated with `block_import`.
Bastian Köcher
committed
pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
Benjamin Kampmann
committed
) -> sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Bastian Köcher
committed
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
N: NetworkT<Block> + Send + Sync + Clone + 'static,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
NumberFor<Block>: BlockNumberOps,
DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
Bastian Köcher
committed
Client<B, E, Block, RA>: AuxStore,
Sp: futures03::task::Spawn + 'static,
let GrandpaParams {
config,
link,
network,
inherent_data_providers,
on_exit,
telemetry_on_connect,
executor,
let LinkHalf {
client,
select_chain,
persistent_data,
voter_commands_rx,
let network = NetworkBridge::new(
network,
config.clone(),
persistent_data.set_state.clone(),
&executor,
on_exit.clone(),
);
register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
let conf = config.clone();
let telemetry_task = if let Some(telemetry_on_connect) = telemetry_on_connect {
let authorities = persistent_data.authority_set.clone();
let curr = authorities.current_authorities();
let mut auths = curr.voters().into_iter().map(|(p, _)| p);
let maybe_authority_id = authority_id(&mut auths, &conf.keystore)
.unwrap_or(Default::default());
telemetry!(CONSENSUS_INFO; "afg.authority_set";
"authority_id" => maybe_authority_id.to_string(),
"authority_set_id" => ?authorities.set_id(),
"authorities" => {
let authorities: Vec<String> = curr.voters()
.iter().map(|(id, _)| id.to_string()).collect();
serde_json::to_string(&authorities)
.expect("authorities is always at least an empty vector; elements are always of type string")
futures::future::Either::A(events)
} else {
futures::future::Either::B(futures::future::empty())
};
let voter_work = VoterWork::new(
client,
config,
network,
select_chain,
);
let voter_work = voter_work
.map(|_| ())
.map_err(|e| {
error!("GRANDPA Voter failed: {:?}", e);
telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e);
});
// Make sure that `telemetry_task` doesn't accidentally finish and kill grandpa.
let telemetry_task = telemetry_task
.then(|_| futures::future::empty::<(), ()>());
use futures03::{FutureExt, TryFutureExt};
Ok(voter_work.select(on_exit.map(Ok).compat()).select2(telemetry_task).then(|_| Ok(())))
}
/// Future that powers the voter.
#[must_use]
struct VoterWork<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> {
voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>,
env: Arc<Environment<B, E, Block, N, RA, SC, VR>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
network: futures03::compat::Compat<NetworkBridge<Block, N>>,
impl<B, E, Block, N, RA, SC, VR> VoterWork<B, E, Block, N, RA, SC, VR>
Bastian Köcher
committed
Block: BlockT,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
Bastian Köcher
committed
E: CallExecutor<Block> + Send + Sync + 'static,
B: Backend<Block> + 'static,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
Bastian Köcher
committed
Client<B, E, Block, RA>: AuxStore,
{
fn new(
client: Arc<Client<B, E, Block, RA>>,
config: Config,
network: NetworkBridge<Block, N>,
persistent_data: PersistentData<Block>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
) -> Self {
let voters = persistent_data.authority_set.current_authorities();
let env = Arc::new(Environment {
Max Inden
committed
client,
voters: Arc::new(voters),
config,
network: network.clone(),
set_id: persistent_data.authority_set.set_id(),
authority_set: persistent_data.authority_set.clone(),
consensus_changes: persistent_data.consensus_changes.clone(),
voter_set_state: persistent_data.set_state.clone(),
});
let mut work = VoterWork {
// `voter` is set to a temporary value and replaced below when
// calling `rebuild_voter`.
voter: Box::new(futures::empty()) as Box<_>,
env,
voter_commands_rx,
network: futures03::future::TryFutureExt::compat(network),
};
work.rebuild_voter();
work
}
/// Rebuilds the `self.voter` field using the current authority set
/// state. This method should be called when we know that the authority set
/// has changed (e.g. as signalled by a voter command).
fn rebuild_voter(&mut self) {
debug!(target: "afg", "{}: Starting new voter with set ID {}", self.env.config.name(), self.env.set_id);
let authority_id = is_voter(&self.env.voters, &self.env.config.keystore)
.map(|ap| ap.public())
.unwrap_or(Default::default());
telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter";
"name" => ?self.env.config.name(),
"set_id" => ?self.env.set_id,
"authority_id" => authority_id.to_string(),
let chain_info = self.env.client.chain_info();
telemetry!(CONSENSUS_INFO; "afg.authority_set";
"number" => ?chain_info.finalized_number,
"hash" => ?chain_info.finalized_hash,
"authority_id" => authority_id.to_string(),
"authority_set_id" => ?self.env.set_id,
"authorities" => {
let authorities: Vec<String> = self.env.voters.voters()
.iter().map(|(id, _)| id.to_string()).collect();
serde_json::to_string(&authorities)
.expect("authorities is always at least an empty vector; elements are always of type string")
},
);
match &*self.env.voter_set_state.read() {
VoterSetState::Live { completed_rounds, .. } => {
chain_info.finalized_hash,
chain_info.finalized_number,
let global_comms = global_communication(
self.env.set_id,
&self.env.voters,
Max Inden
committed
&self.env.client,
&self.env.network,
&self.env.config.keystore,
let last_completed_round = completed_rounds.last();
let voter = voter::Voter::new(
self.env.clone(),
(*self.env.voters).clone(),
global_comms,
last_completed_round.number,
last_completed_round.votes.clone(),
last_completed_round.base.clone(),
);
self.voter = Box::new(voter);
},
VoterSetState::Paused { .. } =>
self.voter = Box::new(futures::empty()),
fn handle_voter_command(
&mut self,
command: VoterCommand<Block::Hash, NumberFor<Block>>
) -> Result<(), Error> {
match command {
VoterCommand::ChangeAuthorities(new) => {
let voters: Vec<String> = new.authorities.iter().map(move |(a, _)| {
format!("{}", a)
}).collect();
telemetry!(CONSENSUS_INFO; "afg.voter_command_change_authorities";
"number" => ?new.canon_number,
"hash" => ?new.canon_hash,
"voters" => ?voters,
"set_id" => ?new.set_id,
);
self.env.update_voter_set_state(|_| {
// start the new authority set using the block where the
// set changed (not where the signal happened!) as the base.
let set_state = VoterSetState::live(
new.set_id,
&*self.env.authority_set.inner().read(),
(new.canon_hash, new.canon_number),
);
Max Inden
committed
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;
self.env = Arc::new(Environment {
voters: Arc::new(new.authorities.into_iter().collect()),
set_id: new.set_id,
voter_set_state: self.env.voter_set_state.clone(),
// Fields below are simply transferred and not updated.
Max Inden
committed
client: self.env.client.clone(),
select_chain: self.env.select_chain.clone(),
config: self.env.config.clone(),
authority_set: self.env.authority_set.clone(),
consensus_changes: self.env.consensus_changes.clone(),
network: self.env.network.clone(),
voting_rule: self.env.voting_rule.clone(),
});
self.rebuild_voter();
Ok(())
}
VoterCommand::Pause(reason) => {
info!(target: "afg", "Pausing old validator set: {}", reason);
// not racing because old voter is shut down.
self.env.update_voter_set_state(|voter_set_state| {
let completed_rounds = voter_set_state.completed_rounds();
let set_state = VoterSetState::Paused { completed_rounds };
Max Inden
committed
aux_schema::write_voter_set_state(&*self.env.client, &set_state)?;
Ok(Some(set_state))
})?;
self.rebuild_voter();
Ok(())
}
}
}
}
impl<B, E, Block, N, RA, SC, VR> Future for VoterWork<B, E, Block, N, RA, SC, VR>
Bastian Köcher
committed
Block: BlockT,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
Bastian Köcher
committed
E: CallExecutor<Block> + Send + Sync + 'static,
B: Backend<Block> + 'static,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
Bastian Köcher
committed
Client<B, E, Block, RA>: AuxStore,
{
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.voter.poll() {
Ok(Async::NotReady) => {}
Ok(Async::Ready(())) => {
// voters don't conclude naturally
return Err(Error::Safety("GRANDPA voter has concluded.".into()))
Err(CommandOrError::Error(e)) => {
// return inner observer error
return Err(e)
}
Err(CommandOrError::VoterCommand(command)) => {
// some command issued internally
self.handle_voter_command(command)?;
futures::task::current().notify();
}
}
match self.voter_commands_rx.poll() {
Ok(Async::NotReady) => {}
Err(_) => {
// the `voter_commands_rx` stream should not fail.
return Ok(Async::Ready(()))
}
Ok(Async::Ready(None)) => {
// the `voter_commands_rx` stream should never conclude since it's never closed.
return Ok(Async::Ready(()))
Ok(Async::Ready(Some(command))) => {
// some command issued externally
self.handle_voter_command(command)?;
futures::task::current().notify();
match self.network.poll() {
Ok(Async::NotReady) => {},
Ok(Async::Ready(())) => {
// the network bridge future should never conclude.
return Ok(Async::Ready(()))
}
e @ Err(_) => return e,
};
Ok(Async::NotReady)
}
#[deprecated(since = "1.1.0", note = "Please switch to run_grandpa_voter.")]
Bastian Köcher
committed
pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
Bastian Köcher
committed
) -> sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
Bastian Köcher
committed
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
N: NetworkT<Block> + Send + Sync + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
DigestFor<Block>: Encode,
RA: Send + Sync + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
X: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
Bastian Köcher
committed
Client<B, E, Block, RA>: AuxStore,
Sp: futures03::task::Spawn + 'static,
/// When GRANDPA is not initialized we still need to register the finality
/// tracker inherent provider which might be expected by the runtime for block
/// authoring. Additionally, we register a gossip message validator that
/// discards all GRANDPA messages (otherwise, we end up banning nodes that send
/// us a `Neighbor` message, since there is no registered gossip validator for
/// the engine id defined in the message.)
Bastian Köcher
committed
pub fn setup_disabled_grandpa<B, E, Block: BlockT, RA, N>(
client: Arc<Client<B, E, Block, RA>>,
inherent_data_providers: &InherentDataProviders,
network: N,
) -> Result<(), sp_consensus::Error> where
Bastian Köcher
committed
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
N: NetworkT<Block> + Send + Clone + 'static,
{
register_finality_tracker_inherent_data_provider(
client,
inherent_data_providers,
)?;
// We register the GRANDPA protocol so that we don't consider it an anomaly
// to receive GRANDPA messages on the network. We don't process the
// messages.
network.register_notifications_protocol(communication::GRANDPA_ENGINE_ID);
/// Checks if this node is a voter in the given voter set.
///
/// Returns the key pair of the node that is being used in the current voter set or `None`.
fn is_voter(
voters: &Arc<VoterSet<AuthorityId>>,
keystore: &Option<KeyStorePtr>,
) -> Option<AuthorityPair> {
match keystore {
Some(keystore) => voters.voters().iter()
.find_map(|(p, _)| keystore.read().key_pair::<AuthorityPair>(&p).ok()),
None => None,
}
/// Returns the authority id of this node, if available.
fn authority_id<'a, I>(
authorities: &mut I,
keystore: &Option<KeyStorePtr>,
) -> Option<AuthorityId> where
I: Iterator<Item = &'a AuthorityId>,
{
match keystore {
Some(keystore) => {
authorities
.find_map(|p| {
keystore.read().key_pair::<AuthorityPair>(&p)
.ok()
.map(|ap| ap.public())
})
}
None => None,
}
}