// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{
	communication::{
		notification::{
			BeefyBestBlockSender, BeefyBestBlockStream, BeefyVersionedFinalityProofSender,
			BeefyVersionedFinalityProofStream,
		},
		peers::KnownPeers,
		request_response::{
			outgoing_requests_engine::OnDemandJustificationsEngine, BeefyJustifsRequestHandler,
		},
	},
	import::BeefyBlockImport,
	metrics::register_metrics,
	round::Rounds,
	worker::PersistedState,
};
use futures::{stream::Fuse, StreamExt};
use log::{debug, error, info};
use parking_lot::Mutex;
use prometheus::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_consensus::BlockImport;
use sc_network::{NetworkRequest, ProtocolName};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{
	Backend as BlockchainBackend, Error as ClientError, HeaderBackend, Result as ClientResult,
};
use sp_consensus::{Error as ConsensusError, SyncOracle};
use sp_consensus_beefy::{
	ecdsa_crypto::AuthorityId, BeefyApi, MmrRootHash, PayloadProvider, ValidatorSet,
	BEEFY_ENGINE_ID,
};
use sp_keystore::KeystorePtr;
use sp_mmr_primitives::MmrApi;
use sp_runtime::traits::{Block, Zero};
use std::{
	collections::{BTreeMap, VecDeque},
	marker::PhantomData,
	sync::Arc,
};

mod aux_schema;
mod error;
mod keystore;
mod metrics;
mod round;
mod worker;

pub mod communication;
pub mod import;
pub mod justification;

pub use communication::beefy_protocol_name::{
	gossip_protocol_name, justifications_protocol_name as justifs_protocol_name,
};

#[cfg(test)]
mod tests;

const LOG_TARGET: &str = "beefy";

/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as
/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking
/// issue is <https://github.com/rust-lang/rust/issues/41517>.
pub trait Client<B, BE>:
	BlockchainEvents<B> + HeaderBackend<B> + Finalizer<B, BE> + Send + Sync
where
	B: Block,
	BE: Backend<B>,
{
	// empty
}

impl<B, BE, T> Client<B, BE> for T
where
	B: Block,
	BE: Backend<B>,
	T: BlockchainEvents<B>
		+ HeaderBackend<B>
		+ Finalizer<B, BE>
		+ ProvideRuntimeApi<B>
		+ Send
		+ Sync,
{
	// empty
}

/// Links between the block importer, the background voter and the RPC layer,
/// to be used by the voter.
#[derive(Clone)]
pub struct BeefyVoterLinks<B: Block> {
	// BlockImport -> Voter links
	/// Stream of BEEFY signed commitments from block import to voter.
	pub from_block_import_justif_stream: BeefyVersionedFinalityProofStream<B>,

	// Voter -> RPC links
	/// Sends BEEFY signed commitments from voter to RPC.
	pub to_rpc_justif_sender: BeefyVersionedFinalityProofSender<B>,
	/// Sends BEEFY best block hashes from voter to RPC.
	pub to_rpc_best_block_sender: BeefyBestBlockSender<B>,
}

/// Links used by the BEEFY RPC layer, from the BEEFY background voter.
#[derive(Clone)]
pub struct BeefyRPCLinks<B: Block> {
	/// Stream of signed commitments coming from the voter.
	pub from_voter_justif_stream: BeefyVersionedFinalityProofStream<B>,
	/// Stream of BEEFY best block hashes coming from the voter.
	pub from_voter_best_beefy_stream: BeefyBestBlockStream<B>,
}

/// Make block importer and link half necessary to tie the background voter to it.
pub fn beefy_block_import_and_links<B, BE, RuntimeApi, I>(
	wrapped_block_import: I,
	backend: Arc<BE>,
	runtime: Arc<RuntimeApi>,
	prometheus_registry: Option<Registry>,
) -> (BeefyBlockImport<B, BE, RuntimeApi, I>, BeefyVoterLinks<B>, BeefyRPCLinks<B>)
where
	B: Block,
	BE: Backend<B>,
	I: BlockImport<B, Error = ConsensusError> + Send + Sync,
	RuntimeApi: ProvideRuntimeApi<B> + Send + Sync,
	RuntimeApi::Api: BeefyApi<B, AuthorityId>,
{
	// Voter -> RPC links
	let (to_rpc_justif_sender, from_voter_justif_stream) =
		BeefyVersionedFinalityProofStream::<B>::channel();
	let (to_rpc_best_block_sender, from_voter_best_beefy_stream) =
		BeefyBestBlockStream::<B>::channel();

	// BlockImport -> Voter links
	let (to_voter_justif_sender, from_block_import_justif_stream) =
		BeefyVersionedFinalityProofStream::<B>::channel();
	let metrics = register_metrics(prometheus_registry);

	// BlockImport
	let import = BeefyBlockImport::new(
		backend,
		runtime,
		wrapped_block_import,
		to_voter_justif_sender,
		metrics,
	);
	let voter_links = BeefyVoterLinks {
		from_block_import_justif_stream,
		to_rpc_justif_sender,
		to_rpc_best_block_sender,
	};
	let rpc_links = BeefyRPCLinks { from_voter_best_beefy_stream, from_voter_justif_stream };

	(import, voter_links, rpc_links)
}

/// BEEFY gadget network parameters.
pub struct BeefyNetworkParams<B: Block, N, S> {
	/// Network implementing gossip, requests and sync-oracle.
	pub network: Arc<N>,
	/// Syncing service implementing a sync oracle and an event stream for peers.
	pub sync: Arc<S>,
	/// Chain specific BEEFY gossip protocol name. See
	/// [`communication::beefy_protocol_name::gossip_protocol_name`].
	pub gossip_protocol_name: ProtocolName,
	/// Chain specific BEEFY on-demand justifications protocol name. See
	/// [`communication::beefy_protocol_name::justifications_protocol_name`].
	pub justifications_protocol_name: ProtocolName,

	pub _phantom: PhantomData<B>,
}

/// BEEFY gadget initialization parameters.
pub struct BeefyParams<B: Block, BE, C, N, P, R, S> {
	/// BEEFY client
	pub client: Arc<C>,
	/// Client Backend
	pub backend: Arc<BE>,
	/// BEEFY Payload provider
	pub payload_provider: P,
	/// Runtime Api Provider
	pub runtime: Arc<R>,
	/// Local key store
	pub key_store: Option<KeystorePtr>,
	/// BEEFY voter network params
	pub network_params: BeefyNetworkParams<B, N, S>,
	/// Minimal delta between blocks, BEEFY should vote for
	pub min_block_delta: u32,
	/// Prometheus metric registry
	pub prometheus_registry: Option<Registry>,
	/// Links between the block importer, the background voter and the RPC layer.
	pub links: BeefyVoterLinks<B>,
	/// Handler for incoming BEEFY justifications requests from a remote peer.
	pub on_demand_justifications_handler: BeefyJustifsRequestHandler<B, C>,
}

/// Start the BEEFY gadget.
///
/// This is a thin shim around running and awaiting a BEEFY worker.
pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
	beefy_params: BeefyParams<B, BE, C, N, P, R, S>,
) where
	B: Block,
	BE: Backend<B>,
	C: Client<B, BE> + BlockBackend<B>,
	P: PayloadProvider<B> + Clone,
	R: ProvideRuntimeApi<B>,
	R::Api: BeefyApi<B, AuthorityId> + MmrApi<B, MmrRootHash, NumberFor<B>>,
	N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
	S: GossipSyncing<B> + SyncOracle + 'static,
{
	let BeefyParams {
		client,
		backend,
		payload_provider,
		runtime,
		key_store,
		network_params,
		min_block_delta,
		prometheus_registry,
		links,
		mut on_demand_justifications_handler,
	} = beefy_params;

	let BeefyNetworkParams {
		network,
		sync,
		gossip_protocol_name,
		justifications_protocol_name,
		..
	} = network_params;

	let metrics = register_metrics(prometheus_registry.clone());

	// Subscribe to finality notifications and justifications before waiting for runtime pallet and
	// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
	let mut finality_notifications = client.finality_notification_stream().fuse();
	let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

	let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
	// Default votes filter is to discard everything.
	// Validator is updated later with correct starting round and set id.
	let (gossip_validator, gossip_report_stream) =
		communication::gossip::GossipValidator::new(known_peers.clone());
	let gossip_validator = Arc::new(gossip_validator);
	let gossip_engine = GossipEngine::new(
		network.clone(),
		sync.clone(),
		gossip_protocol_name.clone(),
		gossip_validator.clone(),
		None,
	);

	// The `GossipValidator` adds and removes known peers based on valid votes and network
	// events.
	let on_demand_justifications = OnDemandJustificationsEngine::new(
		network.clone(),
		justifications_protocol_name.clone(),
		known_peers,
		prometheus_registry.clone(),
	);
	let mut beefy_comms = worker::BeefyComms {
		gossip_engine,
		gossip_validator,
		gossip_report_stream,
		on_demand_justifications,
	};

	// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
	// select recoverable errors.
	loop {
		// Wait for BEEFY pallet to be active before starting voter.
		let persisted_state = match wait_for_runtime_pallet(
			&*runtime,
			&mut beefy_comms.gossip_engine,
			&mut finality_notifications,
		)
		.await
		.and_then(|(beefy_genesis, best_grandpa)| {
			load_or_init_voter_state(
				&*backend,
				&*runtime,
				beefy_genesis,
				best_grandpa,
				min_block_delta,
			)
		}) {
			Ok(state) => state,
			Err(e) => {
				error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
				return
			},
		};
		// Update the gossip validator with the right starting round and set id.
		if let Err(e) = persisted_state
			.gossip_filter_config()
			.map(|f| beefy_comms.gossip_validator.update_filter(f))
		{
			error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
			return
		}

		let worker = worker::BeefyWorker {
			backend: backend.clone(),
			payload_provider: payload_provider.clone(),
			runtime: runtime.clone(),
			sync: sync.clone(),
			key_store: key_store.clone().into(),
			comms: beefy_comms,
			links: links.clone(),
			metrics: metrics.clone(),
			pending_justifications: BTreeMap::new(),
			persisted_state,
		};

		match futures::future::select(
			Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
			Box::pin(on_demand_justifications_handler.run()),
		)
		.await
		{
			// On `ConsensusReset` error, just reinit and restart voter.
			futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => {
				error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
				beefy_comms = reuse_comms;
				continue
			},
			// On other errors, bring down / finish the task.
			futures::future::Either::Left(((worker_err, _), _)) =>
				error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
			futures::future::Either::Right((odj_handler_err, _)) =>
				error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),
		};
		return
	}
}

fn load_or_init_voter_state<B, BE, R>(
	backend: &BE,
	runtime: &R,
	beefy_genesis: NumberFor<B>,
	best_grandpa: <B as Block>::Header,
	min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
	B: Block,
	BE: Backend<B>,
	R: ProvideRuntimeApi<B>,
	R::Api: BeefyApi<B, AuthorityId>,
{
	// Initialize voter state from AUX DB if compatible.
	crate::aux_schema::load_persistent(backend)?
		// Verify state pallet genesis matches runtime.
		.filter(|state| state.pallet_genesis() == beefy_genesis)
		.and_then(|mut state| {
			// Overwrite persisted state with current best GRANDPA block.
			state.set_best_grandpa(best_grandpa.clone());
			// Overwrite persisted data with newly provided `min_block_delta`.
			state.set_min_block_delta(min_block_delta);
			info!(target: LOG_TARGET, "🥩 Loading BEEFY voter state from db: {:?}.", state);
			Some(Ok(state))
		})
		// No valid voter-state persisted, re-initialize from pallet genesis.
		.unwrap_or_else(|| {
			initialize_voter_state(backend, runtime, beefy_genesis, best_grandpa, min_block_delta)
		})
}

// If no persisted state present, walk back the chain from first GRANDPA notification to either:
//  - latest BEEFY finalized block, or if none found on the way,
//  - BEEFY pallet genesis;
// Enqueue any BEEFY mandatory blocks (session boundaries) found on the way, for voter to finalize.
fn initialize_voter_state<B, BE, R>(
	backend: &BE,
	runtime: &R,
	beefy_genesis: NumberFor<B>,
	best_grandpa: <B as Block>::Header,
	min_block_delta: u32,
) -> ClientResult<PersistedState<B>>
where
	B: Block,
	BE: Backend<B>,
	R: ProvideRuntimeApi<B>,
	R::Api: BeefyApi<B, AuthorityId>,
{
	let beefy_genesis = runtime
		.runtime_api()
		.beefy_genesis(best_grandpa.hash())
		.ok()
		.flatten()
		.filter(|genesis| *genesis == beefy_genesis)
		.ok_or_else(|| ClientError::Backend("BEEFY pallet expected to be active.".into()))?;
	// Walk back the imported blocks and initialize voter either, at the last block with
	// a BEEFY justification, or at pallet genesis block; voter will resume from there.
	let blockchain = backend.blockchain();
	let mut sessions = VecDeque::new();
	let mut header = best_grandpa.clone();
	let state = loop {
		if let Some(true) = blockchain
			.justifications(header.hash())
			.ok()
			.flatten()
			.map(|justifs| justifs.get(BEEFY_ENGINE_ID).is_some())
		{
			info!(
				target: LOG_TARGET,
				"🥩 Initialize BEEFY voter at last BEEFY finalized block: {:?}.",
				*header.number()
			);
			let best_beefy = *header.number();
			// If no session boundaries detected so far, just initialize new rounds here.
			if sessions.is_empty() {
				let active_set = expect_validator_set(runtime, backend, &header)?;
				let mut rounds = Rounds::new(best_beefy, active_set);
				// Mark the round as already finalized.
				rounds.conclude(best_beefy);
				sessions.push_front(rounds);
			}
			let state = PersistedState::checked_new(
				best_grandpa,
				best_beefy,
				sessions,
				min_block_delta,
				beefy_genesis,
			)
			.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?;
			break state
		}

		if *header.number() == beefy_genesis {
			// We've reached BEEFY genesis, initialize voter here.
			let genesis_set = expect_validator_set(runtime, backend, &header)?;
			info!(
				target: LOG_TARGET,
				"🥩 Loading BEEFY voter state from genesis on what appears to be first startup. \
				Starting voting rounds at block {:?}, genesis validator set {:?}.",
				beefy_genesis,
				genesis_set,
			);

			sessions.push_front(Rounds::new(beefy_genesis, genesis_set));
			break PersistedState::checked_new(
				best_grandpa,
				Zero::zero(),
				sessions,
				min_block_delta,
				beefy_genesis,
			)
			.ok_or_else(|| ClientError::Backend("Invalid BEEFY chain".into()))?
		}

		if let Some(active) = worker::find_authorities_change::<B>(&header) {
			info!(
				target: LOG_TARGET,
				"🥩 Marking block {:?} as BEEFY Mandatory.",
				*header.number()
			);
			sessions.push_front(Rounds::new(*header.number(), active));
		}

		// Move up the chain.
		header = blockchain.expect_header(*header.parent_hash())?;
	};

	aux_schema::write_current_version(backend)?;
	aux_schema::write_voter_state(backend, &state)?;
	Ok(state)
}

/// Wait for BEEFY runtime pallet to be available, return active validator set.
/// Should be called only once during worker initialization.
async fn wait_for_runtime_pallet<B, R>(
	runtime: &R,
	mut gossip_engine: &mut GossipEngine<B>,
	finality: &mut Fuse<FinalityNotifications<B>>,
) -> ClientResult<(NumberFor<B>, <B as Block>::Header)>
where
	B: Block,
	R: ProvideRuntimeApi<B>,
	R::Api: BeefyApi<B, AuthorityId>,
{
	info!(target: LOG_TARGET, "🥩 BEEFY gadget waiting for BEEFY pallet to become available...");
	loop {
		futures::select! {
			notif = finality.next() => {
				let notif = match notif {
					Some(notif) => notif,
					None => break
				};
				let at = notif.header.hash();
				if let Some(start) = runtime.runtime_api().beefy_genesis(at).ok().flatten() {
					if *notif.header.number() >= start {
						// Beefy pallet available, return header for best grandpa at the time.
						info!(
							target: LOG_TARGET,
							"🥩 BEEFY pallet available: block {:?} beefy genesis {:?}",
							notif.header.number(), start
						);
						return Ok((start, notif.header))
					}
				}
			},
			_ = gossip_engine => {
				break
			}
		}
	}
	let err_msg = "🥩 Gossip engine has unexpectedly terminated.".into();
	error!(target: LOG_TARGET, "{}", err_msg);
	Err(ClientError::Backend(err_msg))
}

fn expect_validator_set<B, BE, R>(
	runtime: &R,
	backend: &BE,
	at_header: &B::Header,
) -> ClientResult<ValidatorSet<AuthorityId>>
where
	B: Block,
	BE: Backend<B>,
	R: ProvideRuntimeApi<B>,
	R::Api: BeefyApi<B, AuthorityId>,
{
	debug!(target: LOG_TARGET, "🥩 Try to find validator set active at header: {:?}", at_header);
	runtime
		.runtime_api()
		.validator_set(at_header.hash())
		.ok()
		.flatten()
		.or_else(|| {
			// if state unavailable, fallback to walking up the chain looking for the header
			// Digest emitted when validator set active 'at_header' was enacted.
			let blockchain = backend.blockchain();
			let mut header = at_header.clone();
			loop {
				debug!(target: LOG_TARGET, "🥩 look for auth set change digest in header number: {:?}", *header.number());
				match worker::find_authorities_change::<B>(&header) {
					Some(active) => return Some(active),
					// Move up the chain.
					None => header = blockchain.expect_header(*header.parent_hash()).ok()?,
				}
			}
		})
		.ok_or_else(|| ClientError::Backend("Could not find initial validator set".into()))
}