// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . //! # I'm online Module //! //! If the local node is a validator (i.e. contains an authority key), this module //! gossips a heartbeat transaction with each new session. The heartbeat functions //! as a simple mechanism to signal that the node is online in the current era. //! //! Received heartbeats are tracked for one era and reset with each new era. The //! module exposes two public functions to query if a heartbeat has been received //! in the current era or session. //! //! The heartbeat is a signed transaction, which was signed using the session key //! and includes the recent best block number of the local validators chain as well //! as the [NetworkState](../../core/offchain/struct.NetworkState.html). //! It is submitted as an Unsigned Transaction via off-chain workers. //! //! - [`im_online::Trait`](./trait.Trait.html) //! - [`Call`](./enum.Call.html) //! - [`Module`](./struct.Module.html) //! //! ## Interface //! //! ### Public Functions //! //! - `is_online_in_current_session` - True if the validator sent a heartbeat in the current session. //! //! ## Usage //! //! ``` //! use support::{decl_module, dispatch::Result}; //! use system::ensure_signed; //! use srml_im_online::{self as im_online}; //! //! pub trait Trait: im_online::Trait {} //! //! decl_module! { //! pub struct Module for enum Call where origin: T::Origin { //! pub fn is_online(origin, authority_index: u32) -> Result { //! let _sender = ensure_signed(origin)?; //! let _is_online = >::is_online_in_current_session(authority_index); //! Ok(()) //! } //! } //! } //! # fn main() { } //! ``` //! //! ## Dependencies //! //! This module depends on the [Session module](../srml_session/index.html). // Ensure we're `no_std` when compiling for Wasm. #![cfg_attr(not(feature = "std"), no_std)] mod mock; mod tests; use app_crypto::RuntimeAppPublic; use codec::{Encode, Decode}; use primitives::offchain::{OpaqueNetworkState, StorageKind}; use rstd::prelude::*; use session::historical::IdentificationTuple; use sr_primitives::{ RuntimeDebug, traits::{Convert, Member, Printable, Saturating}, Perbill, transaction_validity::{ TransactionValidity, TransactionLongevity, ValidTransaction, InvalidTransaction, TransactionPriority, }, }; use sr_staking_primitives::{ SessionIndex, offence::{ReportOffence, Offence, Kind}, }; use support::{ decl_module, decl_event, decl_storage, print, ensure, Parameter, debug }; use system::ensure_none; use system::offchain::SubmitUnsignedTransaction; pub mod sr25519 { mod app_sr25519 { use app_crypto::{app_crypto, key_types::IM_ONLINE, sr25519}; app_crypto!(sr25519, IM_ONLINE); } /// An i'm online keypair using sr25519 as its crypto. #[cfg(feature = "std")] pub type AuthorityPair = app_sr25519::Pair; /// An i'm online signature using sr25519 as its crypto. pub type AuthoritySignature = app_sr25519::Signature; /// An i'm online identifier using sr25519 as its crypto. pub type AuthorityId = app_sr25519::Public; } pub mod ed25519 { mod app_ed25519 { use app_crypto::{app_crypto, key_types::IM_ONLINE, ed25519}; app_crypto!(ed25519, IM_ONLINE); } /// An i'm online keypair using ed25519 as its crypto. #[cfg(feature = "std")] pub type AuthorityPair = app_ed25519::Pair; /// An i'm online signature using ed25519 as its crypto. pub type AuthoritySignature = app_ed25519::Signature; /// An i'm online identifier using ed25519 as its crypto. pub type AuthorityId = app_ed25519::Public; } /// The local storage database key under which the worker progress status /// is tracked. const DB_KEY: &[u8] = b"srml/im-online-worker-status"; /// It's important to persist the worker state, since e.g. the /// server could be restarted while starting the gossip process, but before /// finishing it. With every execution of the off-chain worker we check /// if we need to recover and resume gossipping or if there is already /// another off-chain worker in the process of gossipping. #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] struct WorkerStatus { done: bool, gossipping_at: BlockNumber, } /// Error which may occur while executing the off-chain code. #[derive(RuntimeDebug)] enum OffchainErr { DecodeWorkerStatus, FailedSigning, NetworkState, SubmitTransaction, } impl Printable for OffchainErr { fn print(&self) { match self { OffchainErr::DecodeWorkerStatus => print("Offchain error: decoding WorkerStatus failed!"), OffchainErr::FailedSigning => print("Offchain error: signing failed!"), OffchainErr::NetworkState => print("Offchain error: fetching network state failed!"), OffchainErr::SubmitTransaction => print("Offchain error: submitting transaction failed!"), } } } pub type AuthIndex = u32; /// Heartbeat which is sent/received. #[derive(Encode, Decode, Clone, PartialEq, Eq, RuntimeDebug)] pub struct Heartbeat where BlockNumber: PartialEq + Eq + Decode + Encode, { block_number: BlockNumber, network_state: OpaqueNetworkState, session_index: SessionIndex, authority_index: AuthIndex, } pub trait Trait: system::Trait + session::historical::Trait { /// The identifier type for an authority. type AuthorityId: Member + Parameter + RuntimeAppPublic + Default + Ord; /// The overarching event type. type Event: From> + Into<::Event>; /// A dispatchable call type. type Call: From>; /// A transaction submitter. type SubmitTransaction: SubmitUnsignedTransaction::Call>; /// A type that gives us the ability to submit unresponsiveness offence reports. type ReportUnresponsiveness: ReportOffence< Self::AccountId, IdentificationTuple, UnresponsivenessOffence>, >; } decl_event!( pub enum Event where ::AuthorityId, { /// A new heartbeat was received from `AuthorityId` HeartbeatReceived(AuthorityId), } ); decl_storage! { trait Store for Module as ImOnline { /// The block number when we should gossip. GossipAt get(fn gossip_at): T::BlockNumber; /// The current set of keys that may issue a heartbeat. Keys get(fn keys): Vec; /// For each session index we keep a mapping of `AuthorityId` /// to `offchain::OpaqueNetworkState`. ReceivedHeartbeats get(fn received_heartbeats): double_map SessionIndex, blake2_256(AuthIndex) => Vec; } add_extra_genesis { config(keys): Vec; build(|config| Module::::initialize_keys(&config.keys)) } } decl_module! { pub struct Module for enum Call where origin: T::Origin { fn deposit_event() = default; fn heartbeat( origin, heartbeat: Heartbeat, signature: ::Signature ) { ensure_none(origin)?; let current_session = >::current_index(); ensure!(current_session == heartbeat.session_index, "Outdated heartbeat received."); let exists = ::exists( ¤t_session, &heartbeat.authority_index ); let keys = Keys::::get(); let public = keys.get(heartbeat.authority_index as usize); if let (true, Some(public)) = (!exists, public) { let signature_valid = heartbeat.using_encoded(|encoded_heartbeat| { public.verify(&encoded_heartbeat, &signature) }); ensure!(signature_valid, "Invalid heartbeat signature."); Self::deposit_event(Event::::HeartbeatReceived(public.clone())); let network_state = heartbeat.network_state.encode(); ::insert( ¤t_session, &heartbeat.authority_index, &network_state ); } else if exists { Err("Duplicated heartbeat.")? } else { Err("Non existent public key.")? } } // Runs after every block. fn offchain_worker(now: T::BlockNumber) { debug::RuntimeLogger::init(); // Only send messages if we are a potential validator. if runtime_io::is_validator() { Self::offchain(now); } } } } impl Module { /// Returns `true` if a heartbeat has been received for the authority at `authority_index` in /// the authorities series, during the current session. Otherwise `false`. pub fn is_online_in_current_session(authority_index: AuthIndex) -> bool { let current_session = >::current_index(); ::exists(¤t_session, &authority_index) } pub(crate) fn offchain(now: T::BlockNumber) { let next_gossip = >::get(); let check = Self::check_not_yet_gossipped(now, next_gossip); let (curr_worker_status, not_yet_gossipped) = match check { Ok((s, v)) => (s, v), Err(err) => { print(err); return; }, }; if next_gossip < now && not_yet_gossipped { let value_set = Self::compare_and_set_worker_status(now, false, curr_worker_status); if !value_set { // value could not be set in local storage, since the value was // different from `curr_worker_status`. this indicates that // another worker was running in parallel. return; } match Self::do_gossip_at(now) { Ok(_) => {}, Err(err) => print(err), } } else { debug::native::trace!( target: "imonline", "Skipping gossip at: {:?} >= {:?} || {:?}", next_gossip, now, if not_yet_gossipped { "not gossipped" } else { "gossipped" } ); } } fn do_gossip_at(block_number: T::BlockNumber) -> Result<(), OffchainErr> { // we run only when a local authority key is configured let authorities = Keys::::get(); let mut local_keys = T::AuthorityId::all(); local_keys.sort(); for (authority_index, key) in authorities.into_iter() .enumerate() .filter_map(|(index, authority)| { local_keys.binary_search(&authority) .ok() .map(|location| (index as u32, &local_keys[location])) }) { let network_state = runtime_io::network_state().map_err(|_| OffchainErr::NetworkState)?; let heartbeat_data = Heartbeat { block_number, network_state, session_index: >::current_index(), authority_index, }; let signature = key.sign(&heartbeat_data.encode()).ok_or(OffchainErr::FailedSigning)?; let call = Call::heartbeat(heartbeat_data, signature); debug::info!( target: "imonline", "[index: {:?}] Reporting im-online at block: {:?}", authority_index, block_number ); T::SubmitTransaction::submit_unsigned(call) .map_err(|_| OffchainErr::SubmitTransaction)?; // once finished we set the worker status without comparing // if the existing value changed in the meantime. this is // because at this point the heartbeat was definitely submitted. Self::set_worker_status(block_number, true); } Ok(()) } fn compare_and_set_worker_status( gossipping_at: T::BlockNumber, done: bool, curr_worker_status: Option>, ) -> bool { let enc = WorkerStatus { done, gossipping_at, }; runtime_io::local_storage_compare_and_set( StorageKind::PERSISTENT, DB_KEY, curr_worker_status.as_ref().map(Vec::as_slice), &enc.encode() ) } fn set_worker_status( gossipping_at: T::BlockNumber, done: bool, ) { let enc = WorkerStatus { done, gossipping_at, }; runtime_io::local_storage_set( StorageKind::PERSISTENT, DB_KEY, &enc.encode()); } // Checks if a heartbeat gossip already occurred at this block number. // Returns a tuple of `(current worker status, bool)`, whereby the bool // is true if not yet gossipped. fn check_not_yet_gossipped( now: T::BlockNumber, next_gossip: T::BlockNumber, ) -> Result<(Option>, bool), OffchainErr> { let last_gossip = runtime_io::local_storage_get(StorageKind::PERSISTENT, DB_KEY); match last_gossip { Some(last) => { let worker_status: WorkerStatus = Decode::decode(&mut &last[..]) .map_err(|_| OffchainErr::DecodeWorkerStatus)?; let was_aborted = !worker_status.done && worker_status.gossipping_at < now; // another off-chain worker is currently in the process of submitting let already_submitting = !worker_status.done && worker_status.gossipping_at == now; let not_yet_gossipped = worker_status.done && worker_status.gossipping_at < next_gossip; let ret = (was_aborted && !already_submitting) || not_yet_gossipped; Ok((Some(last), ret)) }, None => Ok((None, true)), } } fn initialize_keys(keys: &[T::AuthorityId]) { if !keys.is_empty() { assert!(Keys::::get().is_empty(), "Keys are already initialized!"); Keys::::put(keys); } } } impl sr_primitives::BoundToRuntimeAppPublic for Module { type Public = T::AuthorityId; } impl session::OneSessionHandler for Module { type Key = T::AuthorityId; fn on_genesis_session<'a, I: 'a>(validators: I) where I: Iterator { let keys = validators.map(|x| x.1).collect::>(); Self::initialize_keys(&keys); } fn on_new_session<'a, I: 'a>(_changed: bool, validators: I, _queued_validators: I) where I: Iterator { // Tell the offchain worker to start making the next session's heartbeats. >::put(>::block_number()); // Remember who the authorities are for the new session. Keys::::put(validators.map(|x| x.1).collect::>()); } fn on_before_session_ending() { let mut unresponsive = vec![]; let current_session = >::current_index(); let keys = Keys::::get(); let current_validators = >::validators(); for (auth_idx, validator_id) in current_validators.into_iter().enumerate() { let auth_idx = auth_idx as u32; let exists = ::exists(¤t_session, &auth_idx); if !exists { let full_identification = T::FullIdentificationOf::convert(validator_id.clone()) .expect( "we got the validator_id from current_validators; current_validators is set of currently acting validators; the mapping between the validator id and its full identification should be valid; thus `FullIdentificationOf::convert` can't return `None`; qed", ); unresponsive.push((validator_id, full_identification)); } } if unresponsive.is_empty() { return; } let validator_set_count = keys.len() as u32; let offence = UnresponsivenessOffence { session_index: current_session, validator_set_count, offenders: unresponsive, }; T::ReportUnresponsiveness::report_offence(vec![], offence); // Remove all received heartbeats from the current session, they have // already been processed and won't be needed anymore. ::remove_prefix(&>::current_index()); } fn on_disabled(_i: usize) { // ignore } } impl support::unsigned::ValidateUnsigned for Module { type Call = Call; fn validate_unsigned(call: &Self::Call) -> TransactionValidity { if let Call::heartbeat(heartbeat, signature) = call { if >::is_online_in_current_session(heartbeat.authority_index) { // we already received a heartbeat for this authority return InvalidTransaction::Stale.into(); } // check if session index from heartbeat is recent let current_session = >::current_index(); if heartbeat.session_index != current_session { return InvalidTransaction::Stale.into(); } // verify that the incoming (unverified) pubkey is actually an authority id let keys = Keys::::get(); let authority_id = match keys.get(heartbeat.authority_index as usize) { Some(id) => id, None => return InvalidTransaction::BadProof.into(), }; // check signature (this is expensive so we do it last). let signature_valid = heartbeat.using_encoded(|encoded_heartbeat| { authority_id.verify(&encoded_heartbeat, &signature) }); if !signature_valid { return InvalidTransaction::BadProof.into(); } Ok(ValidTransaction { priority: TransactionPriority::max_value(), requires: vec![], provides: vec![(current_session, authority_id).encode()], longevity: TransactionLongevity::max_value(), propagate: true, }) } else { InvalidTransaction::Call.into() } } } /// An offence that is filed if a validator didn't send a heartbeat message. #[derive(RuntimeDebug)] #[cfg_attr(feature = "std", derive(Clone, PartialEq, Eq))] pub struct UnresponsivenessOffence { /// The current session index in which we report the unresponsive validators. /// /// It acts as a time measure for unresponsiveness reports and effectively will always point /// at the end of the session. session_index: SessionIndex, /// The size of the validator set in current session/era. validator_set_count: u32, /// Authorities that were unresponsive during the current era. offenders: Vec, } impl Offence for UnresponsivenessOffence { const ID: Kind = *b"im-online:offlin"; type TimeSlot = SessionIndex; fn offenders(&self) -> Vec { self.offenders.clone() } fn session_index(&self) -> SessionIndex { self.session_index } fn validator_set_count(&self) -> u32 { self.validator_set_count } fn time_slot(&self) -> Self::TimeSlot { self.session_index } fn slash_fraction(offenders: u32, validator_set_count: u32) -> Perbill { // the formula is min((3 * (k - 1)) / n, 1) * 0.05 let x = Perbill::from_rational_approximation(3 * (offenders - 1), validator_set_count); x.saturating_mul(Perbill::from_percent(5)) } }