Skip to content
lib.rs 46.8 KiB
Newer Older
					Event::Dht(e) => Some(e),
					_ => None,
				}
			});
		let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config(
			sc_authority_discovery::WorkerConfig {
				publish_non_global_ips: auth_disc_publish_non_global_ips,
				// Require that authority discovery records are signed.
				strict_record_validation: true,
				..Default::default()
			},
			client.clone(),
			network.clone(),
			Box::pin(dht_event_stream),
			authority_discovery_role,
			prometheus_registry.clone(),
		);
		task_manager.spawn_handle().spawn(
			"authority-discovery-worker",
			Some("authority-discovery"),
			Box::pin(worker.run()),
		);
		gum::info!("Cannot run as validator without local keystore.");
	let maybe_params =
		local_keystore.and_then(move |k| authority_discovery_service.map(|a| (a, k)));
ordian's avatar
ordian committed
	let overseer_handle = if let Some((authority_discovery_service, keystore)) = maybe_params {
		let (overseer, overseer_handle) = overseer_gen
			.generate::<service::SpawnTaskHandle, FullClient<RuntimeApi, ExecutorDispatch>>(
				OverseerGenArgs {
					leaves: active_leaves,
					keystore,
					runtime_client: overseer_client.clone(),
					parachains_db,
					network_service: network.clone(),
					authority_discovery_service,
					pov_req_receiver,
					chunk_req_receiver,
					collation_req_receiver,
					available_data_req_receiver,
					statement_req_receiver,
					dispute_req_receiver,
					registry: prometheus_registry.as_ref(),
					spawner,
					is_collator,
					approval_voting_config,
					availability_config,
					candidate_validation_config,
					chain_selection_config,
					dispute_coordinator_config,
					pvf_checker_enabled,
					overseer_message_channel_capacity_override,
				gum::error!("Failed to init overseer: {}", e);
		let handle = Handle::new(overseer_handle.clone());

		{
			let handle = handle.clone();
			task_manager.spawn_essential_handle().spawn_blocking(
				"overseer",
				Box::pin(async move {
					use futures::{pin_mut, select, FutureExt};

					let forward = polkadot_overseer::forward_events(overseer_client, handle);

					let forward = forward.fuse();
					let overseer_fut = overseer.run().fuse();

					pin_mut!(overseer_fut);
					pin_mut!(forward);

					select! {
						_ = forward => (),
						_ = overseer_fut => (),
						complete => (),
					}
				}),
			);
ordian's avatar
ordian committed
		}
		Some(handle)
		assert!(
			!requires_overseer_for_chain_sel,
			"Precondition congruence (false) is guaranteed by manual checking. qed"
		);

	if role.is_authority() {
		let can_author_with =
			consensus_common::CanAuthorWithNativeVersion::new(client.executor().clone());

		let proposer = sc_basic_authorship::ProposerFactory::new(
			task_manager.spawn_handle(),
			client.clone(),
			transaction_pool,
			prometheus_registry.as_ref(),
			telemetry.as_ref().map(|x| x.handle()),
		let client_clone = client.clone();
		let overseer_handle =
			overseer_handle.as_ref().ok_or(Error::AuthoritiesRequireRealOverseer)?.clone();
		let slot_duration = babe_link.config().slot_duration();
		let babe_config = babe::BabeParams {
			keystore: keystore_container.sync_keystore(),
			client: client.clone(),
			select_chain,
			block_import,
			env: proposer,
			sync_oracle: network.clone(),
			justification_sync_link: network.clone(),
			create_inherent_data_providers: move |parent, ()| {
				let client_clone = client_clone.clone();
ordian's avatar
ordian committed
				let overseer_handle = overseer_handle.clone();
				async move {
					let parachain = polkadot_node_core_parachains_inherent::ParachainsInherentDataProvider::create(
						&*client_clone,
ordian's avatar
ordian committed
						overseer_handle,
						parent,
					).await.map_err(|e| Box::new(e))?;

					let timestamp = sp_timestamp::InherentDataProvider::from_system_time();

					let slot =
						sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
							*timestamp,
							slot_duration,
						);

					Ok((timestamp, slot, parachain))
			backoff_authoring_blocks,
			babe_link,
			can_author_with,
			block_proposal_slot_portion: babe::SlotProportion::new(2f32 / 3f32),
			max_block_proposal_slot_portion: None,
			telemetry: telemetry.as_ref().map(|x| x.handle()),
		let babe = babe::start_babe(babe_config)?;
		task_manager.spawn_essential_handle().spawn_blocking("babe", None, babe);
	// if the node isn't actively participating in consensus then it doesn't
	// need a keystore, regardless of which protocol we use below.
	let keystore_opt =
		if role.is_authority() { Some(keystore_container.sync_keystore()) } else { None };
Andreas Doerr's avatar
Andreas Doerr committed
		let beefy_params = beefy_gadget::BeefyParams {
			client: client.clone(),
			backend: backend.clone(),
Andreas Doerr's avatar
Andreas Doerr committed
			key_store: keystore_opt.clone(),
			network: network.clone(),
			min_block_delta: if chain_spec.is_wococo() { 4 } else { 8 },
			prometheus_registry: prometheus_registry.clone(),
			protocol_name: beefy_protocol_name,
		let gadget = beefy_gadget::start_beefy_gadget::<_, _, _, _, _>(beefy_params);
		// Wococo's purpose is to be a testbed for BEEFY, so if it fails we'll
		// bring the node down with it to make sure it is noticed.
		if chain_spec.is_wococo() {
			task_manager
				.spawn_essential_handle()
				.spawn_blocking("beefy-gadget", None, gadget);
		} else {
			task_manager.spawn_handle().spawn_blocking("beefy-gadget", None, gadget);
	// Reduce grandpa load on Kusama and test networks. This will slow down finality by
	// approximately one slot duration, but will reduce load. We would like to see the impact on
	// Kusama, see: https://github.com/paritytech/polkadot/issues/5464
	let gossip_duration = if chain_spec.is_versi() ||
		chain_spec.is_wococo() ||
		chain_spec.is_rococo() ||
		chain_spec.is_kusama()
	{
		Duration::from_millis(2000)
	} else {
		Duration::from_millis(1000)
	};

	let config = grandpa::Config {
		// FIXME substrate#1578 make this available through chainspec
		justification_period: 512,
		name: Some(name),
		observer_enabled: false,
		keystore: keystore_opt,
		telemetry: telemetry.as_ref().map(|x| x.handle()),
		protocol_name: grandpa_protocol_name,
	let enable_grandpa = !disable_grandpa;
	if enable_grandpa {
		// start the full GRANDPA voter
		// NOTE: unlike in substrate we are currently running the full
		// GRANDPA voter protocol for all full nodes (regardless of whether
		// they're validators or not). at this point the full voter should
		// provide better guarantees of block and vote data availability than
		// the observer.

		// add a custom voting rule to temporarily stop voting for new blocks
		// after the given pause block is finalized and restarting after the
		// given delay.
		let mut builder = grandpa::VotingRulesBuilder::default();

		#[cfg(not(feature = "malus"))]
		let _malus_finality_delay = None;

		if let Some(delay) = _malus_finality_delay {
			info!(?delay, "Enabling malus finality delay",);
			builder = builder.add(grandpa::BeforeBestBlockBy(delay));
		};
		let voting_rule = match grandpa_pause {
			Some((block, delay)) => {
				info!(
					block_number = %block,
					delay = %delay,
					"GRANDPA scheduled voting pause set for block #{} with a duration of {} blocks.",
				builder.add(grandpa_support::PauseAfterBlockFor(block, delay)).build()
			},
		let grandpa_config = grandpa::GrandpaParams {
			config,
			link: link_half,
			network: network.clone(),
			voting_rule,
			prometheus_registry: prometheus_registry.clone(),
			shared_voter_state,
			telemetry: telemetry.as_ref().map(|x| x.handle()),
		task_manager.spawn_essential_handle().spawn_blocking(
			"grandpa-voter",
			None,
			grandpa::run_grandpa_voter(grandpa_config)?,
		);
	Ok(NewFull { task_manager, client, overseer_handle, network, rpc_handlers, backend })
#[cfg(feature = "full-node")]
macro_rules! chain_ops {
	($config:expr, $jaeger_agent:expr, $telemetry_worker_handle:expr; $scope:ident, $executor:ident, $variant:ident) => {{
		let telemetry_worker_handle = $telemetry_worker_handle;
		let jaeger_agent = $jaeger_agent;
		let mut config = $config;
		let basics = new_partial_basics::<$scope::RuntimeApi, $executor>(
			config,
			jaeger_agent,
			telemetry_worker_handle,
		)?;

		use ::sc_consensus::LongestChain;
		// use the longest chain selection, since there is no overseer available
		let chain_selection = LongestChain::new(basics.backend.clone());

		let service::PartialComponents { client, backend, import_queue, task_manager, .. } =
			new_partial::<$scope::RuntimeApi, $executor, LongestChain<_, Block>>(
				&mut config,
				basics,
				chain_selection,
			)?;
		Ok((Arc::new(Client::$variant(client)), backend, import_queue, task_manager))
	}};
}

/// Builds a new object suitable for chain operations.
#[cfg(feature = "full-node")]
pub fn new_chain_ops(
	mut config: &mut Configuration,
	jaeger_agent: Option<std::net::SocketAddr>,
) -> Result<
		sc_consensus::BasicQueue<Block, PrefixedMemoryDB<BlakeTwo256>>,
	Error,
> {
	config.keystore = service::config::KeystoreConfig::InMemory;
	let telemetry_worker_handle = None;

	#[cfg(feature = "rococo-native")]
	if config.chain_spec.is_rococo() ||
		config.chain_spec.is_wococo() ||
		config.chain_spec.is_versi()
	{
		return chain_ops!(config, jaeger_agent, telemetry_worker_handle; rococo_runtime, RococoExecutorDispatch, Rococo)
	}

	#[cfg(feature = "kusama-native")]
	if config.chain_spec.is_kusama() {
		return chain_ops!(config, jaeger_agent, telemetry_worker_handle; kusama_runtime, KusamaExecutorDispatch, Kusama)
	}

	#[cfg(feature = "westend-native")]
	if config.chain_spec.is_westend() {
		return chain_ops!(config, jaeger_agent, telemetry_worker_handle; westend_runtime, WestendExecutorDispatch, Westend)
	#[cfg(feature = "polkadot-native")]
	{
		return chain_ops!(config, jaeger_agent, telemetry_worker_handle; polkadot_runtime, PolkadotExecutorDispatch, Polkadot)
	}
	#[cfg(not(feature = "polkadot-native"))]
	Err(Error::NoRuntime)
/// Build a full node.
///
/// The actual "flavor", aka if it will use `Polkadot`, `Rococo` or `Kusama` is determined based on
/// [`IdentifyVariant`] using the chain spec.
///
/// `overseer_enable_anyways` always enables the overseer, based on the provided `OverseerGenerator`,
/// regardless of the role the node has. The relay chain selection (longest or disputes-aware) is
/// still determined based on the role of the node. Likewise for authority discovery.
#[cfg(feature = "full-node")]
	config: Configuration,
	grandpa_pause: Option<(u32, u32)>,
	enable_beefy: bool,
	jaeger_agent: Option<std::net::SocketAddr>,
	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
	overseer_enable_anyways: bool,
	overseer_gen: impl OverseerGen,
	overseer_message_channel_override: Option<usize>,
	malus_finality_delay: Option<u32>,
	hwbench: Option<sc_sysinfo::HwBench>,
) -> Result<NewFull<Client>, Error> {
	#[cfg(feature = "rococo-native")]
	if config.chain_spec.is_rococo() ||
		config.chain_spec.is_wococo() ||
		config.chain_spec.is_versi()
	{
		return new_full::<rococo_runtime::RuntimeApi, RococoExecutorDispatch, _>(
			enable_beefy,
			jaeger_agent,
			telemetry_worker_handle,
			overseer_enable_anyways,
			overseer_message_channel_override,
			malus_finality_delay,
		)
		.map(|full| full.with_client(Client::Rococo))
	}

	#[cfg(feature = "kusama-native")]
	if config.chain_spec.is_kusama() {
		return new_full::<kusama_runtime::RuntimeApi, KusamaExecutorDispatch, _>(
			enable_beefy,
			jaeger_agent,
			telemetry_worker_handle,
			overseer_enable_anyways,
			overseer_message_channel_override,
			malus_finality_delay,
		)
		.map(|full| full.with_client(Client::Kusama))
	}

	#[cfg(feature = "westend-native")]
	if config.chain_spec.is_westend() {
		return new_full::<westend_runtime::RuntimeApi, WestendExecutorDispatch, _>(
			enable_beefy,
			jaeger_agent,
			telemetry_worker_handle,
			overseer_enable_anyways,
			overseer_message_channel_override,
			malus_finality_delay,
		)
		.map(|full| full.with_client(Client::Westend))
	#[cfg(feature = "polkadot-native")]
	{
		return new_full::<polkadot_runtime::RuntimeApi, PolkadotExecutorDispatch, _>(
			config,
			is_collator,
			grandpa_pause,
			enable_beefy,
			jaeger_agent,
			telemetry_worker_handle,
			None,
			overseer_enable_anyways,
			overseer_gen,
			overseer_message_channel_override.map(|capacity| {
				gum::warn!("Channel capacity should _never_ be tampered with on polkadot!");
				capacity
			}),
			malus_finality_delay,
		)
		.map(|full| full.with_client(Client::Polkadot))
	}

	#[cfg(not(feature = "polkadot-native"))]
	Err(Error::NoRuntime)

/// Reverts the node state down to at most the last finalized block.
///
/// In particular this reverts:
/// - `ApprovalVotingSubsystem` data in the parachains-db;
/// - `ChainSelectionSubsystem` data in the parachains-db;
/// - Low level Babe and Grandpa consensus data.
#[cfg(feature = "full-node")]
pub fn revert_backend(
	client: Arc<Client>,
	backend: Arc<FullBackend>,
	blocks: BlockNumber,
	config: Configuration,
) -> Result<(), Error> {
	let best_number = client.info().best_number;
	let finalized = client.info().finalized_number;
	let revertible = blocks.min(best_number - finalized);

	if revertible == 0 {
		return Ok(())
	}

	let number = best_number - revertible;
	let hash = client.block_hash_from_id(&BlockId::Number(number))?.ok_or(
		sp_blockchain::Error::Backend(format!(
			"Unexpected hash lookup failure for block number: {}",
			number
		)),
	)?;

	let parachains_db = open_database(&config.database)
		.map_err(|err| sp_blockchain::Error::Backend(err.to_string()))?;

	revert_approval_voting(parachains_db.clone(), hash)?;
	revert_chain_selection(parachains_db, hash)?;
	// Revert Substrate consensus related components
	client.execute_with(RevertConsensus { blocks, backend })?;

	Ok(())
}

fn revert_chain_selection(db: Arc<dyn Database>, hash: Hash) -> sp_blockchain::Result<()> {
	let config = chain_selection_subsystem::Config {
		col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data,
		stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(),
		stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly,
	let chain_selection = chain_selection_subsystem::ChainSelectionSubsystem::new(config, db);
		.revert_to(hash)
		.map_err(|err| sp_blockchain::Error::Backend(err.to_string()))
}
fn revert_approval_voting(db: Arc<dyn Database>, hash: Hash) -> sp_blockchain::Result<()> {
	let config = approval_voting_subsystem::Config {
		col_data: parachains_db::REAL_COLUMNS.col_approval_data,
		slot_duration_millis: Default::default(),
	};
	let approval_voting = approval_voting_subsystem::ApprovalVotingSubsystem::with_config(
		config,
		db,
		Arc::new(sc_keystore::LocalKeystore::in_memory()),
		Box::new(consensus_common::NoNetwork),
		approval_voting_subsystem::Metrics::default(),
	);

	approval_voting
		.revert_to(hash)
		.map_err(|err| sp_blockchain::Error::Backend(err.to_string()))
}

struct RevertConsensus {
	blocks: BlockNumber,
	backend: Arc<FullBackend>,
}

impl ExecuteWithClient for RevertConsensus {
	type Output = sp_blockchain::Result<()>;

	fn execute_with_client<Client, Api, Backend>(self, client: Arc<Client>) -> Self::Output
	where
		<Api as sp_api::ApiExt<Block>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
		Backend: sc_client_api::Backend<Block> + 'static,
		Backend::State: sp_api::StateBackend<BlakeTwo256>,
		Api: polkadot_client::RuntimeApiCollection<StateBackend = Backend::State>,
		Client: AbstractClient<Block, Backend, Api = Api> + 'static,
	{
		// Revert consensus-related components.
		// The operations are not correlated, thus call order is not relevant.
		babe::revert(client.clone(), self.backend, self.blocks)?;
		grandpa::revert(client, self.blocks)?;
		Ok(())
	}