// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
//! # I'm online Module
//!
//! If the local node is a validator (i.e. contains an authority key), this module
//! gossips a heartbeat transaction with each new session. The heartbeat functions
//! as a simple mechanism to signal that the node is online in the current era.
//!
//! Received heartbeats are tracked for one era and reset with each new era. The
//! module exposes two public functions to query if a heartbeat has been received
//! in the current era or session.
//!
//! The heartbeat is a signed transaction, which was signed using the session key
//! and includes the recent best block number of the local validators chain as well
//! as the [NetworkState](../../client/offchain/struct.NetworkState.html).
//! It is submitted as an Unsigned Transaction via off-chain workers.
//!
//! - [`im_online::Trait`](./trait.Trait.html)
//! - [`Call`](./enum.Call.html)
//! - [`Module`](./struct.Module.html)
//!
//! ## Interface
//!
//! ### Public Functions
//!
//! - `is_online` - True if the validator sent a heartbeat in the current session.
//!
//! ## Usage
//!
//! ```
//! use frame_support::{decl_module, dispatch};
//! use frame_system::{self as system, ensure_signed};
//! use pallet_im_online::{self as im_online};
//!
//! pub trait Trait: im_online::Trait {}
//!
//! decl_module! {
//! pub struct Module for enum Call where origin: T::Origin {
//! #[weight = 0]
//! pub fn is_online(origin, authority_index: u32) -> dispatch::DispatchResult {
//! let _sender = ensure_signed(origin)?;
//! let _is_online = >::is_online(authority_index);
//! Ok(())
//! }
//! }
//! }
//! # fn main() { }
//! ```
//!
//! ## Dependencies
//!
//! This module depends on the [Session module](../pallet_session/index.html).
// Ensure we're `no_std` when compiling for Wasm.
#![cfg_attr(not(feature = "std"), no_std)]
mod mock;
mod tests;
mod benchmarking;
use sp_application_crypto::RuntimeAppPublic;
use codec::{Encode, Decode};
use sp_core::offchain::OpaqueNetworkState;
use sp_std::prelude::*;
use sp_std::convert::TryInto;
use pallet_session::historical::IdentificationTuple;
use sp_runtime::{
offchain::storage::StorageValueRef,
RuntimeDebug,
traits::{Convert, Member, Saturating, AtLeast32Bit}, Perbill,
transaction_validity::{
TransactionValidity, ValidTransaction, InvalidTransaction, TransactionSource,
TransactionPriority,
},
};
use sp_staking::{
SessionIndex,
offence::{ReportOffence, Offence, Kind},
};
use frame_support::{
decl_module, decl_event, decl_storage, Parameter, debug, decl_error,
traits::Get,
weights::Weight,
};
use frame_system::{self as system, ensure_none};
use frame_system::offchain::{
SendTransactionTypes,
SubmitTransaction,
};
pub mod sr25519 {
mod app_sr25519 {
use sp_application_crypto::{app_crypto, key_types::IM_ONLINE, sr25519};
app_crypto!(sr25519, IM_ONLINE);
}
sp_application_crypto::with_pair! {
/// An i'm online keypair using sr25519 as its crypto.
pub type AuthorityPair = app_sr25519::Pair;
}
/// An i'm online signature using sr25519 as its crypto.
pub type AuthoritySignature = app_sr25519::Signature;
/// An i'm online identifier using sr25519 as its crypto.
pub type AuthorityId = app_sr25519::Public;
}
pub mod ed25519 {
mod app_ed25519 {
use sp_application_crypto::{app_crypto, key_types::IM_ONLINE, ed25519};
app_crypto!(ed25519, IM_ONLINE);
}
sp_application_crypto::with_pair! {
/// An i'm online keypair using ed25519 as its crypto.
pub type AuthorityPair = app_ed25519::Pair;
}
/// An i'm online signature using ed25519 as its crypto.
pub type AuthoritySignature = app_ed25519::Signature;
/// An i'm online identifier using ed25519 as its crypto.
pub type AuthorityId = app_ed25519::Public;
}
const DB_PREFIX: &[u8] = b"parity/im-online-heartbeat/";
/// How many blocks do we wait for heartbeat transaction to be included
/// before sending another one.
const INCLUDE_THRESHOLD: u32 = 3;
/// Status of the offchain worker code.
///
/// This stores the block number at which heartbeat was requested and when the worker
/// has actually managed to produce it.
/// Note we store such status for every `authority_index` separately.
#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
struct HeartbeatStatus {
/// An index of the session that we are supposed to send heartbeat for.
pub session_index: SessionIndex,
/// A block number at which the heartbeat for that session has been actually sent.
///
/// It may be 0 in case the sending failed. In such case we should just retry
/// as soon as possible (i.e. in a worker running for the next block).
pub sent_at: BlockNumber,
}
impl HeartbeatStatus {
/// Returns true if heartbeat has been recently sent.
///
/// Parameters:
/// `session_index` - index of current session.
/// `now` - block at which the offchain worker is running.
///
/// This function will return `true` iff:
/// 1. the session index is the same (we don't care if it went up or down)
/// 2. the heartbeat has been sent recently (within the threshold)
///
/// The reasoning for 1. is that it's better to send an extra heartbeat than
/// to stall or not send one in case of a bug.
fn is_recent(&self, session_index: SessionIndex, now: BlockNumber) -> bool {
self.session_index == session_index && self.sent_at + INCLUDE_THRESHOLD.into() > now
}
}
/// Error which may occur while executing the off-chain code.
#[cfg_attr(test, derive(PartialEq))]
enum OffchainErr {
TooEarly(BlockNumber),
WaitingForInclusion(BlockNumber),
AlreadyOnline(u32),
FailedSigning,
FailedToAcquireLock,
NetworkState,
SubmitTransaction,
}
impl sp_std::fmt::Debug for OffchainErr {
fn fmt(&self, fmt: &mut sp_std::fmt::Formatter) -> sp_std::fmt::Result {
match *self {
OffchainErr::TooEarly(ref block) =>
write!(fmt, "Too early to send heartbeat, next expected at {:?}", block),
OffchainErr::WaitingForInclusion(ref block) =>
write!(fmt, "Heartbeat already sent at {:?}. Waiting for inclusion.", block),
OffchainErr::AlreadyOnline(auth_idx) =>
write!(fmt, "Authority {} is already online", auth_idx),
OffchainErr::FailedSigning => write!(fmt, "Failed to sign heartbeat"),
OffchainErr::FailedToAcquireLock => write!(fmt, "Failed to acquire lock"),
OffchainErr::NetworkState => write!(fmt, "Failed to fetch network state"),
OffchainErr::SubmitTransaction => write!(fmt, "Failed to submit transaction"),
}
}
}
pub type AuthIndex = u32;
/// Heartbeat which is sent/received.
#[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)]
pub struct Heartbeat
where BlockNumber: PartialEq + Eq + Decode + Encode,
{
/// Block number at the time heartbeat is created..
pub block_number: BlockNumber,
/// A state of local network (peer id and external addresses)
pub network_state: OpaqueNetworkState,
/// Index of the current session.
pub session_index: SessionIndex,
/// An index of the authority on the list of validators.
pub authority_index: AuthIndex,
/// The length of session validator set
pub validators_len: u32,
}
pub trait Trait: SendTransactionTypes> + pallet_session::historical::Trait {
/// The identifier type for an authority.
type AuthorityId: Member + Parameter + RuntimeAppPublic + Default + Ord;
/// The overarching event type.
type Event: From> + Into<::Event>;
/// An expected duration of the session.
///
/// This parameter is used to determine the longevity of `heartbeat` transaction
/// and a rough time when we should start considering sending heartbeats,
/// since the workers avoids sending them at the very beginning of the session, assuming
/// there is a chance the authority will produce a block and they won't be necessary.
type SessionDuration: Get;
/// A type that gives us the ability to submit unresponsiveness offence reports.
type ReportUnresponsiveness:
ReportOffence<
Self::AccountId,
IdentificationTuple,
UnresponsivenessOffence>,
>;
/// A configuration for base priority of unsigned transactions.
///
/// This is exposed so that it can be tuned for particular runtime, when
/// multiple pallets send unsigned transactions.
type UnsignedPriority: Get;
}
decl_event!(
pub enum Event where
::AuthorityId,
IdentificationTuple = IdentificationTuple,
{
/// A new heartbeat was received from `AuthorityId`
HeartbeatReceived(AuthorityId),
/// At the end of the session, no offence was committed.
AllGood,
/// At the end of the session, at least one validator was found to be offline.
SomeOffline(Vec),
}
);
decl_storage! {
trait Store for Module as ImOnline {
/// The block number after which it's ok to send heartbeats in current session.
///
/// At the beginning of each session we set this to a value that should
/// fall roughly in the middle of the session duration.
/// The idea is to first wait for the validators to produce a block
/// in the current session, so that the heartbeat later on will not be necessary.
HeartbeatAfter get(fn heartbeat_after): T::BlockNumber;
/// The current set of keys that may issue a heartbeat.
Keys get(fn keys): Vec;
/// For each session index, we keep a mapping of `AuthIndex` to
/// `offchain::OpaqueNetworkState`.
ReceivedHeartbeats get(fn received_heartbeats):
double_map hasher(twox_64_concat) SessionIndex, hasher(twox_64_concat) AuthIndex
=> Option>;
/// For each session index, we keep a mapping of `T::ValidatorId` to the
/// number of blocks authored by the given authority.
AuthoredBlocks get(fn authored_blocks):
double_map hasher(twox_64_concat) SessionIndex, hasher(twox_64_concat) T::ValidatorId
=> u32;
}
add_extra_genesis {
config(keys): Vec;
build(|config| Module::::initialize_keys(&config.keys))
}
}
decl_error! {
/// Error for the im-online module.
pub enum Error for Module {
/// Non existent public key.
InvalidKey,
/// Duplicated heartbeat.
DuplicatedHeartbeat,
}
}
decl_module! {
pub struct Module for enum Call where origin: T::Origin {
type Error = Error;
fn deposit_event() = default;
/// #
/// - Complexity: `O(K + E)` where K is length of `Keys` and E is length of
/// `Heartbeat.network_state.external_address`
///
/// - `O(K)`: decoding of length `K`
/// - `O(E)`: decoding/encoding of length `E`
/// - DbReads: pallet_session `Validators`, pallet_session `CurrentIndex`, `Keys`,
/// `ReceivedHeartbeats`
/// - DbWrites: `ReceivedHeartbeats`
/// #
// NOTE: the weight include cost of validate_unsigned as it is part of the cost to import
// block with such an extrinsic.
#[weight = (310_000_000 + T::DbWeight::get().reads_writes(4, 1))
.saturating_add(750_000.saturating_mul(heartbeat.validators_len as Weight))
.saturating_add(
1_200_000.saturating_mul(heartbeat.network_state.external_addresses.len() as Weight)
)
]
fn heartbeat(
origin,
heartbeat: Heartbeat,
// since signature verification is done in `validate_unsigned`
// we can skip doing it here again.
_signature: ::Signature,
) {
ensure_none(origin)?;
let current_session = >::current_index();
let exists = ::contains_key(
¤t_session,
&heartbeat.authority_index
);
let keys = Keys::::get();
let public = keys.get(heartbeat.authority_index as usize);
if let (false, Some(public)) = (exists, public) {
Self::deposit_event(Event::::HeartbeatReceived(public.clone()));
let network_state = heartbeat.network_state.encode();
::insert(
¤t_session,
&heartbeat.authority_index,
&network_state
);
} else if exists {
Err(Error::::DuplicatedHeartbeat)?
} else {
Err(Error::::InvalidKey)?
}
}
// Runs after every block.
fn offchain_worker(now: T::BlockNumber) {
// Only send messages if we are a potential validator.
if sp_io::offchain::is_validator() {
for res in Self::send_heartbeats(now).into_iter().flatten() {
if let Err(e) = res {
debug::debug!(
target: "imonline",
"Skipping heartbeat at {:?}: {:?}",
now,
e,
)
}
}
} else {
debug::trace!(
target: "imonline",
"Skipping heartbeat at {:?}. Not a validator.",
now,
)
}
}
}
}
type OffchainResult = Result::BlockNumber>>;
/// Keep track of number of authored blocks per authority, uncles are counted as
/// well since they're a valid proof of being online.
impl pallet_authorship::EventHandler for Module {
fn note_author(author: T::ValidatorId) {
Self::note_authorship(author);
}
fn note_uncle(author: T::ValidatorId, _age: T::BlockNumber) {
Self::note_authorship(author);
}
}
impl Module {
/// Returns `true` if a heartbeat has been received for the authority at
/// `authority_index` in the authorities series or if the authority has
/// authored at least one block, during the current session. Otherwise
/// `false`.
pub fn is_online(authority_index: AuthIndex) -> bool {
let current_validators = >::validators();
if authority_index >= current_validators.len() as u32 {
return false;
}
let authority = ¤t_validators[authority_index as usize];
Self::is_online_aux(authority_index, authority)
}
fn is_online_aux(authority_index: AuthIndex, authority: &T::ValidatorId) -> bool {
let current_session = >::current_index();
::contains_key(¤t_session, &authority_index) ||
>::get(
¤t_session,
authority,
) != 0
}
/// Returns `true` if a heartbeat has been received for the authority at `authority_index` in
/// the authorities series, during the current session. Otherwise `false`.
pub fn received_heartbeat_in_current_session(authority_index: AuthIndex) -> bool {
let current_session = >::current_index();
::contains_key(¤t_session, &authority_index)
}
/// Note that the given authority has authored a block in the current session.
fn note_authorship(author: T::ValidatorId) {
let current_session = >::current_index();
>::mutate(
¤t_session,
author,
|authored| *authored += 1,
);
}
pub(crate) fn send_heartbeats(block_number: T::BlockNumber)
-> OffchainResult>>
{
let heartbeat_after = >::get();
if block_number < heartbeat_after {
return Err(OffchainErr::TooEarly(heartbeat_after))
}
let session_index = >::current_index();
let validators_len = >::validators().len() as u32;
Ok(Self::local_authority_keys()
.map(move |(authority_index, key)|
Self::send_single_heartbeat(
authority_index,
key,
session_index,
block_number,
validators_len,
)
))
}
fn send_single_heartbeat(
authority_index: u32,
key: T::AuthorityId,
session_index: SessionIndex,
block_number: T::BlockNumber,
validators_len: u32,
) -> OffchainResult {
// A helper function to prepare heartbeat call.
let prepare_heartbeat = || -> OffchainResult> {
let network_state = sp_io::offchain::network_state()
.map_err(|_| OffchainErr::NetworkState)?;
let heartbeat_data = Heartbeat {
block_number,
network_state,
session_index,
authority_index,
validators_len,
};
let signature = key.sign(&heartbeat_data.encode()).ok_or(OffchainErr::FailedSigning)?;
Ok(Call::heartbeat(heartbeat_data, signature))
};
if Self::is_online(authority_index) {
return Err(OffchainErr::AlreadyOnline(authority_index));
}
// acquire lock for that authority at current heartbeat to make sure we don't
// send concurrent heartbeats.
Self::with_heartbeat_lock(
authority_index,
session_index,
block_number,
|| {
let call = prepare_heartbeat()?;
debug::info!(
target: "imonline",
"[index: {:?}] Reporting im-online at block: {:?} (session: {:?}): {:?}",
authority_index,
block_number,
session_index,
call,
);
SubmitTransaction::>::submit_unsigned_transaction(call.into())
.map_err(|_| OffchainErr::SubmitTransaction)?;
Ok(())
},
)
}
fn local_authority_keys() -> impl Iterator- {
// on-chain storage
//
// At index `idx`:
// 1. A (ImOnline) public key to be used by a validator at index `idx` to send im-online
// heartbeats.
let authorities = Keys::::get();
// local keystore
//
// All `ImOnline` public (+private) keys currently in the local keystore.
let mut local_keys = T::AuthorityId::all();
local_keys.sort();
authorities.into_iter()
.enumerate()
.filter_map(move |(index, authority)| {
local_keys.binary_search(&authority)
.ok()
.map(|location| (index as u32, local_keys[location].clone()))
})
}
fn with_heartbeat_lock(
authority_index: u32,
session_index: SessionIndex,
now: T::BlockNumber,
f: impl FnOnce() -> OffchainResult,
) -> OffchainResult {
let key = {
let mut key = DB_PREFIX.to_vec();
key.extend(authority_index.encode());
key
};
let storage = StorageValueRef::persistent(&key);
let res = storage.mutate(|status: Option