// This file is part of Substrate.

// Copyright (C) 2018-2021 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{sync::Arc, collections::HashMap};

use log::debug;
use parity_scale_codec::Encode;
use parking_lot::RwLockWriteGuard;

use sp_blockchain::{BlockStatus, well_known_cache_keys};
use sc_client_api::{backend::Backend, utils::is_descendent_of};
use sp_utils::mpsc::TracingUnboundedSender;
use sp_api::TransactionFor;

use sp_consensus::{
	BlockImport, Error as ConsensusError,
	BlockCheckParams, BlockImportParams, BlockOrigin, ImportResult, JustificationImport,
	SelectChain,
};
use sp_finality_grandpa::{ConsensusLog, ScheduledChange, SetId, GRANDPA_ENGINE_ID};
use sp_runtime::Justification;
use sp_runtime::generic::{BlockId, OpaqueDigestItemId};
use sp_runtime::traits::{
	Block as BlockT, DigestFor, Header as HeaderT, NumberFor, Zero,
};

use crate::{Error, CommandOrError, NewAuthoritySet, VoterCommand};
use crate::authorities::{AuthoritySet, SharedAuthoritySet, DelayKind, PendingChange};
use crate::environment::finalize_block;
use crate::justification::GrandpaJustification;
use crate::notification::GrandpaJustificationSender;
use std::marker::PhantomData;

/// A block-import handler for GRANDPA.
///
/// This scans each imported block for signals of changing authority set.
/// If the block being imported enacts an authority set change then:
/// - If the current authority set is still live: we import the block
/// - Otherwise, the block must include a valid justification.
///
/// When using GRANDPA, the block import worker should be using this block import
/// object.
pub struct GrandpaBlockImport<Backend, Block: BlockT, Client, SC> {
	inner: Arc<Client>,
	select_chain: SC,
	authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
	send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
	authority_set_hard_forks: HashMap<Block::Hash, PendingChange<Block::Hash, NumberFor<Block>>>,
	justification_sender: GrandpaJustificationSender<Block>,
	_phantom: PhantomData<Backend>,
}

impl<Backend, Block: BlockT, Client, SC: Clone> Clone for
	GrandpaBlockImport<Backend, Block, Client, SC>
{
	fn clone(&self) -> Self {
		GrandpaBlockImport {
			inner: self.inner.clone(),
			select_chain: self.select_chain.clone(),
			authority_set: self.authority_set.clone(),
			send_voter_commands: self.send_voter_commands.clone(),
			authority_set_hard_forks: self.authority_set_hard_forks.clone(),
			justification_sender: self.justification_sender.clone(),
			_phantom: PhantomData,
		}
	}
}

impl<BE, Block: BlockT, Client, SC> JustificationImport<Block>
	for GrandpaBlockImport<BE, Block, Client, SC> where
		NumberFor<Block>: finality_grandpa::BlockNumberOps,
		DigestFor<Block>: Encode,
		BE: Backend<Block>,
		Client: crate::ClientForGrandpa<Block, BE>,
		SC: SelectChain<Block>,
{
	type Error = ConsensusError;

	fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor<Block>)> {
		let mut out = Vec::new();
		let chain_info = self.inner.info();

		// request justifications for all pending changes for which change blocks have already been imported
		let authorities = self.authority_set.inner().read();
		for pending_change in authorities.pending_changes() {
			if pending_change.delay_kind == DelayKind::Finalized &&
				pending_change.effective_number() > chain_info.finalized_number &&
				pending_change.effective_number() <= chain_info.best_number
			{
				let effective_block_hash = if !pending_change.delay.is_zero() {
					self.select_chain.finality_target(
						pending_change.canon_hash,
						Some(pending_change.effective_number()),
					)
				} else {
					Ok(Some(pending_change.canon_hash))
				};

				if let Ok(Some(hash)) = effective_block_hash {
					if let Ok(Some(header)) = self.inner.header(BlockId::Hash(hash)) {
						if *header.number() == pending_change.effective_number() {
							out.push((header.hash(), *header.number()));
						}
					}
				}
			}
		}

		out
	}

	fn import_justification(
		&mut self,
		hash: Block::Hash,
		number: NumberFor<Block>,
		justification: Justification,
	) -> Result<(), Self::Error> {
		// this justification was requested by the sync service, therefore we
		// are not sure if it should enact a change or not. it could have been a
		// request made as part of initial sync but that means the justification
		// wasn't part of the block and was requested asynchronously, probably
		// makes sense to log in that case.
		GrandpaBlockImport::import_justification(self, hash, number, justification, false, false)
	}
}

enum AppliedChanges<H, N> {
	Standard(bool), // true if the change is ready to be applied (i.e. it's a root)
	Forced(NewAuthoritySet<H, N>),
	None,
}

impl<H, N> AppliedChanges<H, N> {
	fn needs_justification(&self) -> bool {
		match *self {
			AppliedChanges::Standard(_) => true,
			AppliedChanges::Forced(_) | AppliedChanges::None => false,
		}
	}
}

struct PendingSetChanges<'a, Block: 'a + BlockT> {
	just_in_case: Option<(
		AuthoritySet<Block::Hash, NumberFor<Block>>,
		RwLockWriteGuard<'a, AuthoritySet<Block::Hash, NumberFor<Block>>>,
	)>,
	applied_changes: AppliedChanges<Block::Hash, NumberFor<Block>>,
	do_pause: bool,
}

impl<'a, Block: 'a + BlockT> PendingSetChanges<'a, Block> {
	// revert the pending set change explicitly.
	fn revert(self) { }

	fn defuse(mut self) -> (AppliedChanges<Block::Hash, NumberFor<Block>>, bool) {
		self.just_in_case = None;
		let applied_changes = ::std::mem::replace(&mut self.applied_changes, AppliedChanges::None);
		(applied_changes, self.do_pause)
	}
}

impl<'a, Block: 'a + BlockT> Drop for PendingSetChanges<'a, Block> {
	fn drop(&mut self) {
		if let Some((old_set, mut authorities)) = self.just_in_case.take() {
			*authorities = old_set;
		}
	}
}

/// Checks the given header for a consensus digest signalling a **standard** scheduled change and
/// extracts it.
pub fn find_scheduled_change<B: BlockT>(
	header: &B::Header,
) -> Option<ScheduledChange<NumberFor<B>>> {
	let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);

	let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
		ConsensusLog::ScheduledChange(change) => Some(change),
		_ => None,
	};

	// find the first consensus digest with the right ID which converts to
	// the right kind of consensus log.
	header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
}

/// Checks the given header for a consensus digest signalling a **forced** scheduled change and
/// extracts it.
pub fn find_forced_change<B: BlockT>(
	header: &B::Header,
) -> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)> {
	let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);

	let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
		ConsensusLog::ForcedChange(delay, change) => Some((delay, change)),
		_ => None,
	};

	// find the first consensus digest with the right ID which converts to
	// the right kind of consensus log.
	header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
}

impl<BE, Block: BlockT, Client, SC>
	GrandpaBlockImport<BE, Block, Client, SC>
where
	NumberFor<Block>: finality_grandpa::BlockNumberOps,
	DigestFor<Block>: Encode,
	BE: Backend<Block>,
	Client: crate::ClientForGrandpa<Block, BE>,
{
	// check for a new authority set change.
	fn check_new_change(
		&self,
		header: &Block::Header,
		hash: Block::Hash,
	) -> Option<PendingChange<Block::Hash, NumberFor<Block>>> {
		// check for forced authority set hard forks
		if let Some(change) = self.authority_set_hard_forks.get(&hash) {
			return Some(change.clone());
		}

		// check for forced change.
		if let Some((median_last_finalized, change)) = find_forced_change::<Block>(header) {
			return Some(PendingChange {
				next_authorities: change.next_authorities,
				delay: change.delay,
				canon_height: *header.number(),
				canon_hash: hash,
				delay_kind: DelayKind::Best { median_last_finalized },
			});
		}

		// check normal scheduled change.
		let change = find_scheduled_change::<Block>(header)?;
		Some(PendingChange {
			next_authorities: change.next_authorities,
			delay: change.delay,
			canon_height: *header.number(),
			canon_hash: hash,
			delay_kind: DelayKind::Finalized,
		})
	}

	fn make_authorities_changes(
		&self,
		block: &mut BlockImportParams<Block, TransactionFor<Client, Block>>,
		hash: Block::Hash,
		initial_sync: bool,
	) -> Result<PendingSetChanges<Block>, ConsensusError> {
		// when we update the authorities, we need to hold the lock
		// until the block is written to prevent a race if we need to restore
		// the old authority set on error or panic.
		struct InnerGuard<'a, T: 'a> {
			old: Option<T>,
			guard: Option<RwLockWriteGuard<'a, T>>,
		}

		impl<'a, T: 'a> InnerGuard<'a, T> {
			fn as_mut(&mut self) -> &mut T {
				&mut **self.guard.as_mut().expect("only taken on deconstruction; qed")
			}

			fn set_old(&mut self, old: T) {
				if self.old.is_none() {
					// ignore "newer" old changes.
					self.old = Some(old);
				}
			}

			fn consume(mut self) -> Option<(T, RwLockWriteGuard<'a, T>)> {
				if let Some(old) = self.old.take() {
					Some((old, self.guard.take().expect("only taken on deconstruction; qed")))
				} else {
					None
				}
			}
		}

		impl<'a, T: 'a> Drop for InnerGuard<'a, T> {
			fn drop(&mut self) {
				if let (Some(mut guard), Some(old)) = (self.guard.take(), self.old.take()) {
					*guard = old;
				}
			}
		}

		let number = *(block.header.number());
		let maybe_change = self.check_new_change(
			&block.header,
			hash,
		);

		// returns a function for checking whether a block is a descendent of another
		// consistent with querying client directly after importing the block.
		let parent_hash = *block.header.parent_hash();
		let is_descendent_of = is_descendent_of(&*self.inner, Some((hash, parent_hash)));

		let mut guard = InnerGuard {
			guard: Some(self.authority_set.inner().write()),
			old: None,
		};

		// whether to pause the old authority set -- happens after import
		// of a forced change block.
		let mut do_pause = false;

		// add any pending changes.
		if let Some(change) = maybe_change {
			let old = guard.as_mut().clone();
			guard.set_old(old);

			if let DelayKind::Best { .. } = change.delay_kind {
				do_pause = true;
			}

			guard.as_mut().add_pending_change(
				change,
				&is_descendent_of,
			).map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
		}

		let applied_changes = {
			let forced_change_set = guard
				.as_mut()
				.apply_forced_changes(hash, number, &is_descendent_of, initial_sync)
				.map_err(|e| ConsensusError::ClientImport(e.to_string()))
				.map_err(ConsensusError::from)?;

			if let Some((median_last_finalized_number, new_set)) = forced_change_set {
				let new_authorities = {
					let (set_id, new_authorities) = new_set.current();

					// we will use the median last finalized number as a hint
					// for the canon block the new authority set should start
					// with. we use the minimum between the median and the local
					// best finalized block.
					let best_finalized_number = self.inner.info().finalized_number;
					let canon_number = best_finalized_number.min(median_last_finalized_number);
					let canon_hash =
						self.inner.header(BlockId::Number(canon_number))
							.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
							.expect("the given block number is less or equal than the current best finalized number; \
									 current best finalized number must exist in chain; qed.")
							.hash();

					NewAuthoritySet {
						canon_number,
						canon_hash,
						set_id,
						authorities: new_authorities.to_vec(),
					}
				};
				let old = ::std::mem::replace(guard.as_mut(), new_set);
				guard.set_old(old);

				AppliedChanges::Forced(new_authorities)
			} else {
				let did_standard = guard.as_mut().enacts_standard_change(hash, number, &is_descendent_of)
					.map_err(|e| ConsensusError::ClientImport(e.to_string()))
					.map_err(ConsensusError::from)?;

				if let Some(root) = did_standard {
					AppliedChanges::Standard(root)
				} else {
					AppliedChanges::None
				}
			}
		};

		// consume the guard safely and write necessary changes.
		let just_in_case = guard.consume();
		if let Some((_, ref authorities)) = just_in_case {
			let authorities_change = match applied_changes {
				AppliedChanges::Forced(ref new) => Some(new),
				AppliedChanges::Standard(_) => None, // the change isn't actually applied yet.
				AppliedChanges::None => None,
			};

			crate::aux_schema::update_authority_set::<Block, _, _>(
				authorities,
				authorities_change,
				|insert| block.auxiliary.extend(
					insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
				)
			);
		}

		Ok(PendingSetChanges { just_in_case, applied_changes, do_pause })
	}
}

impl<BE, Block: BlockT, Client, SC> BlockImport<Block>
	for GrandpaBlockImport<BE, Block, Client, SC> where
		NumberFor<Block>: finality_grandpa::BlockNumberOps,
		DigestFor<Block>: Encode,
		BE: Backend<Block>,
		Client: crate::ClientForGrandpa<Block, BE>,
		for<'a> &'a Client:
			BlockImport<Block, Error = ConsensusError, Transaction = TransactionFor<Client, Block>>,
{
	type Error = ConsensusError;
	type Transaction = TransactionFor<Client, Block>;

	fn import_block(
		&mut self,
		mut block: BlockImportParams<Block, Self::Transaction>,
		new_cache: HashMap<well_known_cache_keys::Id, Vec<u8>>,
	) -> Result<ImportResult, Self::Error> {
		let hash = block.post_hash();
		let number = *block.header.number();

		// early exit if block already in chain, otherwise the check for
		// authority changes will error when trying to re-import a change block
		match self.inner.status(BlockId::Hash(hash)) {
			Ok(BlockStatus::InChain) => return Ok(ImportResult::AlreadyInChain),
			Ok(BlockStatus::Unknown) => {},
			Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
		}

		// on initial sync we will restrict logging under info to avoid spam.
		let initial_sync = block.origin == BlockOrigin::NetworkInitialSync;

		let pending_changes = self.make_authorities_changes(&mut block, hash, initial_sync)?;

		// we don't want to finalize on `inner.import_block`
		let mut justification = block.justification.take();
		let import_result = (&*self.inner).import_block(block, new_cache);

		let mut imported_aux = {
			match import_result {
				Ok(ImportResult::Imported(aux)) => aux,
				Ok(r) => {
					debug!(
						target: "afg",
						"Restoring old authority set after block import result: {:?}",
						r,
					);
					pending_changes.revert();
					return Ok(r);
				},
				Err(e) => {
					debug!(
						target: "afg",
						"Restoring old authority set after block import error: {:?}",
						e,
					);
					pending_changes.revert();
					return Err(ConsensusError::ClientImport(e.to_string()));
				},
			}
		};

		let (applied_changes, do_pause) = pending_changes.defuse();

		// Send the pause signal after import but BEFORE sending a `ChangeAuthorities` message.
		if do_pause {
			let _ = self.send_voter_commands.unbounded_send(
				VoterCommand::Pause("Forced change scheduled after inactivity".to_string())
			);
		}

		let needs_justification = applied_changes.needs_justification();

		match applied_changes {
			AppliedChanges::Forced(new) => {
				// NOTE: when we do a force change we are "discrediting" the old set so we
				// ignore any justifications from them. this block may contain a justification
				// which should be checked and imported below against the new authority
				// triggered by this forced change. the new grandpa voter will start at the
				// last median finalized block (which is before the block that enacts the
				// change), full nodes syncing the chain will not be able to successfully
				// import justifications for those blocks since their local authority set view
				// is still of the set before the forced change was enacted, still after #1867
				// they should import the block and discard the justification, and they will
				// then request a justification from sync if it's necessary (which they should
				// then be able to successfully validate).
				let _ = self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));

				// we must clear all pending justifications requests, presumably they won't be
				// finalized hence why this forced changes was triggered
				imported_aux.clear_justification_requests = true;
			},
			AppliedChanges::Standard(false) => {
				// we can't apply this change yet since there are other dependent changes that we
				// need to apply first, drop any justification that might have been provided with
				// the block to make sure we request them from `sync` which will ensure they'll be
				// applied in-order.
				justification.take();
			},
			_ => {},
		}

		match justification {
			Some(justification) => {
				let import_res = self.import_justification(
					hash,
					number,
					justification,
					needs_justification,
					initial_sync,
				);

				import_res.unwrap_or_else(|err| {
					if needs_justification {
						debug!(target: "afg", "Imported block #{} that enacts authority set change with \
							invalid justification: {:?}, requesting justification from peers.", number, err);
						imported_aux.bad_justification = true;
						imported_aux.needs_justification = true;
					}
				});
			},
			None => {
				if needs_justification {
					debug!(
						target: "afg",
						"Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.",
						number,
					);

					imported_aux.needs_justification = true;
				}
			}
		}

		Ok(ImportResult::Imported(imported_aux))
	}

	fn check_block(
		&mut self,
		block: BlockCheckParams<Block>,
	) -> Result<ImportResult, Self::Error> {
		self.inner.check_block(block)
	}
}

impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Client, SC> {
	pub(crate) fn new(
		inner: Arc<Client>,
		select_chain: SC,
		authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
		send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
		authority_set_hard_forks: Vec<(SetId, PendingChange<Block::Hash, NumberFor<Block>>)>,
		justification_sender: GrandpaJustificationSender<Block>,
	) -> GrandpaBlockImport<Backend, Block, Client, SC> {
		// check for and apply any forced authority set hard fork that applies
		// to the *current* authority set.
		if let Some((_, change)) = authority_set_hard_forks
			.iter()
			.find(|(set_id, _)| *set_id == authority_set.set_id())
		{
			let mut authority_set = authority_set.inner().write();
			authority_set.current_authorities = change.next_authorities.clone();
		}

		// index authority set hard forks by block hash so that they can be used
		// by any node syncing the chain and importing a block hard fork
		// authority set changes.
		let authority_set_hard_forks = authority_set_hard_forks
			.into_iter()
			.map(|(_, change)| (change.canon_hash, change))
			.collect::<HashMap<_, _>>();

		// check for and apply any forced authority set hard fork that apply to
		// any *pending* standard changes, checking by the block hash at which
		// they were announced.
		{
			let mut authority_set = authority_set.inner().write();

			authority_set.pending_standard_changes = authority_set
				.pending_standard_changes
				.clone()
				.map(&mut |hash, _, original| {
					authority_set_hard_forks
						.get(&hash)
						.cloned()
						.unwrap_or(original)
				});
		}

		GrandpaBlockImport {
			inner,
			select_chain,
			authority_set,
			send_voter_commands,
			authority_set_hard_forks,
			justification_sender,
			_phantom: PhantomData,
		}
	}
}

impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
where
	BE: Backend<Block>,
	Client: crate::ClientForGrandpa<Block, BE>,
	NumberFor<Block>: finality_grandpa::BlockNumberOps,
{
	/// Import a block justification and finalize the block.
	///
	/// If `enacts_change` is set to true, then finalizing this block *must*
	/// enact an authority set change, the function will panic otherwise.
	fn import_justification(
		&mut self,
		hash: Block::Hash,
		number: NumberFor<Block>,
		justification: Justification,
		enacts_change: bool,
		initial_sync: bool,
	) -> Result<(), ConsensusError> {
		let justification = GrandpaJustification::decode_and_verify_finalizes(
			&justification,
			(hash, number),
			self.authority_set.set_id(),
			&self.authority_set.current_authorities(),
		);

		let justification = match justification {
			Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
			Ok(justification) => justification,
		};

		let result = finalize_block(
			self.inner.clone(),
			&self.authority_set,
			None,
			hash,
			number,
			justification.into(),
			initial_sync,
			Some(&self.justification_sender),
		);

		match result {
			Err(CommandOrError::VoterCommand(command)) => {
				afg_log!(initial_sync,
					"👴 Imported justification for block #{} that triggers \
					command {}, signaling voter.",
					number,
					command,
				);

				// send the command to the voter
				let _ = self.send_voter_commands.unbounded_send(command);
			},
			Err(CommandOrError::Error(e)) => {
				return Err(match e {
					Error::Grandpa(error) => ConsensusError::ClientImport(error.to_string()),
					Error::Network(error) => ConsensusError::ClientImport(error),
					Error::Blockchain(error) => ConsensusError::ClientImport(error),
					Error::Client(error) => ConsensusError::ClientImport(error.to_string()),
					Error::Safety(error) => ConsensusError::ClientImport(error),
					Error::Signing(error) => ConsensusError::ClientImport(error),
					Error::Timer(error) => ConsensusError::ClientImport(error.to_string()),
					Error::RuntimeApi(error) => ConsensusError::ClientImport(error.to_string()),
				});
			},
			Ok(_) => {
				assert!(!enacts_change, "returns Ok when no authority set change should be enacted; qed;");
			},
		}

		Ok(())
	}
}