lib.rs 14 KiB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate.  If not, see <http://www.gnu.org/licenses/>.

//! # I'm online Module
//!
//! If the local node is a validator (i.e. contains an authority key), this module
//! gossips a heartbeat transaction with each new session. The heartbeat functions
//! as a simple mechanism to signal that the node is online in the current era.
//!
//! Received heartbeats are tracked for one era and reset with each new era. The
//! module exposes two public functions to query if a heartbeat has been received
//! in the current era or session.
//!
//! The heartbeat is a signed transaction, which was signed using the session key
//! and includes the recent best block number of the local validators chain as well
//! as the [NetworkState](../../core/offchain/struct.NetworkState.html).
//! It is submitted as an Unsigned Transaction via off-chain workers.
//!
//! - [`im_online::Trait`](./trait.Trait.html)
//! - [`Call`](./enum.Call.html)
//! - [`Module`](./struct.Module.html)
//!
//! ## Interface
//!
//! ### Public Functions
//!
//! - `is_online_in_current_era` - True if the validator sent a heartbeat in the current era.
//! - `is_online_in_current_session` - True if the validator sent a heartbeat in the current session.
//!
//! ## Usage
//!
//! ```
//! use srml_support::{decl_module, dispatch::Result};
//! use system::ensure_signed;
//! use srml_im_online::{self as im_online};
//!
//! pub trait Trait: im_online::Trait {}
//!
//! decl_module! {
//! 	pub struct Module<T: Trait> for enum Call where origin: T::Origin {
//! 		pub fn is_online(origin, authority_id: T::AuthorityId) -> Result {
//! 			let _sender = ensure_signed(origin)?;
//! 			let _is_online = <im_online::Module<T>>::is_online_in_current_era(&authority_id);
//! 			Ok(())
//! 		}
//! 	}
//! }
//! # fn main() { }
//! ```
//!
//! ## Dependencies
//!
//! This module depends on the [Session module](../srml_session/index.html).

// Ensure we're `no_std` when compiling for Wasm.
#![cfg_attr(not(feature = "std"), no_std)]

use substrate_primitives::{
	crypto::TypedKey, offchain::CryptoKind,
	crypto::key_types,
	offchain::OpaqueNetworkState,
	offchain::StorageKind,
	sr25519, ed25519,
};
use parity_codec::{Encode, Decode};
use primitives::{
	ApplyError, traits::{Member, IsMember, Extrinsic as ExtrinsicT},
	transaction_validity::{TransactionValidity, TransactionLongevity},
};
use rstd::prelude::*;
use session::SessionIndex;
use sr_io::Printable;
use srml_support::{
	Parameter, StorageValue, decl_module, decl_event, decl_storage,
	traits::Get, StorageDoubleMap, print,
};
use system::ensure_none;

// The local storage database key under which the worker progress status
// is tracked.
const DB_KEY: &[u8] = b"srml/im-online-worker-status";

// It's important to persist the worker state, since e.g. the
// server could be restarted while starting the gossip process, but before
// finishing it. With every execution of the off-chain worker we check
// if we need to recover and resume gossipping or if there is already
// another off-chain worker in the process of gossipping.
#[derive(Encode, Decode, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Debug))]
struct WorkerStatus<BlockNumber> {
	done: bool,
	gossipping_at: BlockNumber,
}

// Error which may occur while executing the off-chain code.
enum OffchainErr {
	DecodeAuthorityId,
	DecodeWorkerStatus,
	ExtrinsicCreation,
	FailedSigning,
	NetworkState,
	SubmitTransaction,
	UnknownCryptoKind,
}

impl Printable for OffchainErr {
	fn print(self) {
		match self {
			OffchainErr::DecodeAuthorityId => print("Offchain error: decoding AuthorityId failed!"),
			OffchainErr::DecodeWorkerStatus => print("Offchain error: decoding WorkerStatus failed!"),
			OffchainErr::ExtrinsicCreation => print("Offchain error: extrinsic creation failed!"),
			OffchainErr::FailedSigning => print("Offchain error: signing failed!"),
			OffchainErr::NetworkState => print("Offchain error: fetching network state failed!"),
			OffchainErr::SubmitTransaction => print("Offchain error: submitting transaction failed!"),
			OffchainErr::UnknownCryptoKind => print("Offchain error: the CryptoKind is unknown!"),
		}
	}
}

/// Heartbeat which is send/received.
#[derive(Encode, Decode, Clone, PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Debug))]
pub struct Heartbeat<BlockNumber, AuthorityId>
	where BlockNumber: PartialEq + Eq + Decode + Encode,
{
	block_number: BlockNumber,
	network_state: OpaqueNetworkState,
	session_index: session::SessionIndex,
	authority_id: AuthorityId,
}

pub trait Trait: system::Trait + session::Trait {
	/// The overarching event type.
	type Event: From<Event<Self>> + Into<<Self as system::Trait>::Event>;

	/// The function call.
	type Call: From<Call<Self>>;

	/// A extrinsic right from the external world. This is unchecked and so
	/// can contain a signature.
	type UncheckedExtrinsic: ExtrinsicT<Call=Self::Call> + Encode + Decode;

	/// The identifier type for an authority.
	type AuthorityId: Member + Parameter + Default + TypedKey + Decode + Encode + AsRef<[u8]>;

	/// Number of sessions per era.
	type SessionsPerEra: Get<SessionIndex>;

	/// Determine if an `AuthorityId` is a valid authority.
	type IsValidAuthorityId: IsMember<Self::AuthorityId>;
}

decl_event!(
	pub enum Event<T> where
		<T as system::Trait>::BlockNumber,
		<T as Trait>::AuthorityId
	{
		/// A new heartbeat was received at this `BlockNumber` from `AuthorityId`
		HeartbeatReceived(BlockNumber, AuthorityId),
	}
);

decl_storage! {
	trait Store for Module<T: Trait> as ImOnline {
		// The block number when we should gossip.
		GossipAt get(gossip_at) config(): T::BlockNumber;

		// The session index when the last new era started.
		LastNewEraStart get(last_new_era_start) config(): Option<session::SessionIndex>;

		// For each session index we keep a mapping of `AuthorityId` to
		// `offchain::OpaqueNetworkState`.
		ReceivedHeartbeats get(received_heartbeats): double_map session::SessionIndex,
			blake2_256(T::AuthorityId) => Vec<u8>;
	}
}

decl_module! {
	pub struct Module<T: Trait> for enum Call where origin: T::Origin {
		/// Number of sessions per era.
		const SessionsPerEra: session::SessionIndex = T::SessionsPerEra::get();

		fn deposit_event<T>() = default;

		fn heartbeat(
			origin,
			heartbeat: Heartbeat<T::BlockNumber, T::AuthorityId>,
			_signature: Vec<u8>
		) {
			ensure_none(origin)?;

			let current_session = <session::Module<T>>::current_index();
			let exists = <ReceivedHeartbeats<T>>::exists(current_session, &heartbeat.authority_id);
			if !exists {
				let now = <system::Module<T>>::block_number();
				Self::deposit_event(RawEvent::HeartbeatReceived(now, heartbeat.authority_id.clone()));

				let network_state = heartbeat.network_state.encode();
				<ReceivedHeartbeats<T>>::insert(current_session, &heartbeat.authority_id, network_state);
			}
		}

		// Runs after every block.
		fn offchain_worker(now: T::BlockNumber) {
			fn gossip_at<T: Trait>(block_number: T::BlockNumber) -> Result<(), OffchainErr> {
				let kind = match <T::AuthorityId as TypedKey>::KEY_TYPE {
					key_types::SR25519 => CryptoKind::Sr25519,
					key_types::ED25519 => CryptoKind::Ed25519,
					_ => return Err(OffchainErr::UnknownCryptoKind),
				};

				// we run only when a local authority key is configured
				if let Ok(key) = sr_io::authority_pubkey(kind) {
					let authority_id = <T as Trait>::AuthorityId::decode(&mut &key[..])
						.ok_or(OffchainErr::DecodeAuthorityId)?;
					let network_state =
						sr_io::network_state().map_err(|_| OffchainErr::NetworkState)?;
					let heartbeat_data = Heartbeat {
						block_number,
						network_state,
						session_index: <session::Module<T>>::current_index(),
						authority_id,
					};

					let signature = sr_io::sign(None, kind, &heartbeat_data.encode())
						.map_err(|_| OffchainErr::FailedSigning)?;
					let call = Call::heartbeat(heartbeat_data, signature);
					let ex = T::UncheckedExtrinsic::new_unsigned(call.into())
						.ok_or(OffchainErr::ExtrinsicCreation)?;
					sr_io::submit_transaction(&ex)
						.map_err(|_| OffchainErr::SubmitTransaction)?;
					set_worker_status::<T>(block_number, true);
				}
				Ok(())
			}

			fn set_worker_status<T: Trait>(gossipping_at: T::BlockNumber, done: bool) {
				let enc = WorkerStatus {
					done,
					gossipping_at,
				};
				sr_io::local_storage_set(StorageKind::PERSISTENT, DB_KEY, &enc.encode());
			}

			fn was_not_yet_gossipped<T: Trait>(
				now: T::BlockNumber,
				next_gossip: T::BlockNumber,
			) -> Result<bool, OffchainErr> {
				let last_gossip = sr_io::local_storage_get(StorageKind::PERSISTENT, DB_KEY);
				match last_gossip {
					Some(l) => {
						let worker_status: WorkerStatus<T::BlockNumber> = Decode::decode(&mut &l[..])
							.ok_or(OffchainErr::DecodeWorkerStatus)?;

						let was_aborted = !worker_status.done && worker_status.gossipping_at < now;

						// another off-chain worker is currently in the process of submitting
						let already_submitting =
							!worker_status.done && worker_status.gossipping_at == now;

						let not_yet_gossipped =
							worker_status.done && worker_status.gossipping_at < next_gossip;

						let ret = (was_aborted && !already_submitting) || not_yet_gossipped;
						Ok(ret)
					},
					None => Ok(true),
				}
			}

			let next_gossip = <GossipAt<T>>::get();
			let not_yet_gossipped = match was_not_yet_gossipped::<T>(now, next_gossip) {
				Ok(v) => v,
				Err(err) => {
					print(err);
					return;
				},
			};
			if next_gossip < now && not_yet_gossipped {
				set_worker_status::<T>(now, false);

				match gossip_at::<T>(now) {
					Ok(_) => {},
					Err(err) => print(err),
				}
			}
		}
	}
}

impl<T: Trait> Module<T> {
	/// Returns `true` if a heartbeat has been received for `AuthorityId`
	/// during the current era. Otherwise `false`.
	pub fn is_online_in_current_era(authority_id: &T::AuthorityId) -> bool {
		let curr = <session::Module<T>>::current_index();
		match LastNewEraStart::get() {
			Some(start) => {
				// iterate over every session
				for index in start..curr  {
					if <ReceivedHeartbeats<T>>::exists(index, authority_id) {
						return true;
					}
				}
				false
			},
			None => <ReceivedHeartbeats<T>>::exists(curr, authority_id),
		}
	}

	/// Returns `true` if a heartbeat has been received for `AuthorityId`
	/// during the current session. Otherwise `false`.
	pub fn is_online_in_current_session(authority_id: &T::AuthorityId) -> bool {
		let current_session = <session::Module<T>>::current_index();
		<ReceivedHeartbeats<T>>::exists(current_session, authority_id)
	}

	/// Session has just changed.
	fn new_session() {
		let now = <system::Module<T>>::block_number();
		<GossipAt<T>>::put(now);

		let current_session = <session::Module<T>>::current_index();

		match LastNewEraStart::get() {
			Some(last_new_era_start) => {
				let sessions_per_era = T::SessionsPerEra::get();

				let new_era = current_session - last_new_era_start > sessions_per_era;
				if new_era {
					LastNewEraStart::put(current_session);
					Self::remove_heartbeats();
				}
			},
			None => LastNewEraStart::put(current_session),
		};
	}

	// Remove all stored heartbeats.
	fn remove_heartbeats() {
		let curr = <session::Module<T>>::current_index();
		match LastNewEraStart::get() {
			Some(start) => {
				for index in start..curr {
					<ReceivedHeartbeats<T>>::remove_prefix(index);
				}
			},
			None => <ReceivedHeartbeats<T>>::remove_prefix(curr),
		}
	}
}

impl<T: Trait> session::OneSessionHandler<T::AccountId> for Module<T> {
	type Key = <T as Trait>::AuthorityId;

	fn on_new_session<'a, I: 'a>(_changed: bool, _validators: I) {
		Self::new_session();
	}

	fn on_disabled(_i: usize) {
		// ignore
	}
}

impl<T: Trait> srml_support::unsigned::ValidateUnsigned for Module<T> {
	type Call = Call<T>;

	fn validate_unsigned(call: &Self::Call) -> srml_support::unsigned::TransactionValidity {
		if let Call::heartbeat(heartbeat, signature) = call {
			// verify that the incoming (unverified) pubkey is actually an authority id
			let is_authority = T::IsValidAuthorityId::is_member(&heartbeat.authority_id);
			if !is_authority {
				return TransactionValidity::Invalid(ApplyError::BadSignature as i8);
			}

			if <Module<T>>::is_online_in_current_session(&heartbeat.authority_id) {
				// we already received a heartbeat for this authority
				return TransactionValidity::Invalid(ApplyError::BadSignature as i8);
			}

			if signature.len() != 64 {
				return TransactionValidity::Invalid(ApplyError::BadSignature as i8);
			}

			let signature = {
				  let mut array = [0; 64];
				  array.copy_from_slice(&signature); // panics if not enough, hence the check above
				  array
			};

			let encoded_heartbeat = heartbeat.encode();

			let signature_valid = match <T::AuthorityId as TypedKey>::KEY_TYPE {
				ed25519::Public::KEY_TYPE =>
					sr_io::ed25519_verify(&signature, &encoded_heartbeat, &heartbeat.authority_id),
				sr25519::Public::KEY_TYPE =>
					sr_io::sr25519_verify(&signature, &encoded_heartbeat, &heartbeat.authority_id),
				_ => return TransactionValidity::Invalid(ApplyError::BadSignature as i8),
			};

			if !signature_valid {
				return TransactionValidity::Invalid(ApplyError::BadSignature as i8);
			}

			// check if session index from heartbeat is recent
			let current_session = <session::Module<T>>::current_index();
			if heartbeat.session_index < current_session {
				return TransactionValidity::Invalid(ApplyError::BadSignature as i8);
			}

			return srml_support::unsigned::TransactionValidity::Valid {
				priority: 0,
				requires: vec![],
				provides: vec![encoded_heartbeat],
				longevity: TransactionLongevity::max_value(),
				propagate: true,
			}
		}
		TransactionValidity::Invalid(0)
	}
}