Skip to content
message_race_delivery.rs 38.5 KiB
Newer Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.

// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

//! Message delivery race delivers proof-of-messages from "lane.source" to "lane.target".
fewensa's avatar
fewensa committed
use std::{collections::VecDeque, marker::PhantomData, ops::RangeInclusive, time::Duration};

use async_trait::async_trait;
use futures::stream::FusedStream;

use bp_messages::{MessageNonce, UnrewardedRelayersState, Weight};
use relay_utils::FailedClient;

hacpy's avatar
hacpy committed
use crate::{
	message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
	message_lane_loop::{
fewensa's avatar
fewensa committed
		MessageDeliveryParams, MessageDetailsMap, MessageProofParameters,
hacpy's avatar
hacpy committed
		SourceClient as MessageLaneSourceClient, SourceClientState,
		TargetClient as MessageLaneTargetClient, TargetClientState,
	},
	message_race_loop::{
		MessageRace, NoncesRange, RaceState, RaceStrategy, SourceClient, SourceClientNonces,
		TargetClient, TargetClientNonces,
	},
fewensa's avatar
fewensa committed
	message_race_strategy::BasicStrategy,
hacpy's avatar
hacpy committed
	metrics::MessageLaneLoopMetrics,
fewensa's avatar
fewensa committed
	relay_strategy::{EnforcementStrategy, RelayMessagesBatchReference, RelayStrategy},

/// Run message delivery race.
fewensa's avatar
fewensa committed
pub async fn run<P: MessageLane, Strategy: RelayStrategy>(
	source_client: impl MessageLaneSourceClient<P>,
	source_state_updates: impl FusedStream<Item = SourceClientState<P>>,
	target_client: impl MessageLaneTargetClient<P>,
	target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
	stall_timeout: Duration,
	metrics_msg: Option<MessageLaneLoopMetrics>,
fewensa's avatar
fewensa committed
	params: MessageDeliveryParams<Strategy>,
) -> Result<(), FailedClient> {
	crate::message_race_loop::run(
		MessageDeliveryRaceSource {
			client: source_client.clone(),
			metrics_msg: metrics_msg.clone(),
			_phantom: Default::default(),
		},
		source_state_updates,
		MessageDeliveryRaceTarget {
			client: target_client.clone(),
			metrics_msg,
			_phantom: Default::default(),
		},
		target_state_updates,
		stall_timeout,
fewensa's avatar
fewensa committed
		MessageDeliveryStrategy::<P, Strategy, _, _> {
			lane_source_client: source_client,
			lane_target_client: target_client,
hacpy's avatar
hacpy committed
			max_unrewarded_relayer_entries_at_target: params
				.max_unrewarded_relayer_entries_at_target,
			max_unconfirmed_nonces_at_target: params.max_unconfirmed_nonces_at_target,
			max_messages_in_single_batch: params.max_messages_in_single_batch,
			max_messages_weight_in_single_batch: params.max_messages_weight_in_single_batch,
			max_messages_size_in_single_batch: params.max_messages_size_in_single_batch,
fewensa's avatar
fewensa committed
			relay_strategy: params.relay_strategy,
			latest_confirmed_nonces_at_source: VecDeque::new(),
			target_nonces: None,
			strategy: BasicStrategy::new(),
	)
	.await
}

/// Message delivery race.
struct MessageDeliveryRace<P>(std::marker::PhantomData<P>);

impl<P: MessageLane> MessageRace for MessageDeliveryRace<P> {
	type SourceHeaderId = SourceHeaderIdOf<P>;
	type TargetHeaderId = TargetHeaderIdOf<P>;

	type MessageNonce = MessageNonce;
	type Proof = P::MessagesProof;

	fn source_name() -> String {
		format!("{}::MessagesDelivery", P::SOURCE_NAME)
	}

	fn target_name() -> String {
		format!("{}::MessagesDelivery", P::TARGET_NAME)
	}
}

/// Message delivery race source, which is a source of the lane.
struct MessageDeliveryRaceSource<P: MessageLane, C> {
	client: C,
	metrics_msg: Option<MessageLaneLoopMetrics>,
	_phantom: PhantomData<P>,
}

impl<P, C> SourceClient<MessageDeliveryRace<P>> for MessageDeliveryRaceSource<P, C>
where
	P: MessageLane,
	C: MessageLaneSourceClient<P>,
{
	type Error = C::Error;
	type NoncesRange = MessageDetailsMap<P::SourceChainBalance>;
	type ProofParameters = MessageProofParameters;
	async fn nonces(
		&self,
		at_block: SourceHeaderIdOf<P>,
		prev_latest_nonce: MessageNonce,
	) -> Result<(SourceHeaderIdOf<P>, SourceClientNonces<Self::NoncesRange>), Self::Error> {
hacpy's avatar
hacpy committed
		let (at_block, latest_generated_nonce) =
			self.client.latest_generated_nonce(at_block).await?;
		let (at_block, latest_confirmed_nonce) =
			self.client.latest_confirmed_received_nonce(at_block).await?;
		if let Some(metrics_msg) = self.metrics_msg.as_ref() {
			metrics_msg.update_source_latest_generated_nonce::<P>(latest_generated_nonce);
			metrics_msg.update_source_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
		let new_nonces = if latest_generated_nonce > prev_latest_nonce {
			self.client
hacpy's avatar
hacpy committed
				.generated_message_details(
					at_block.clone(),
					prev_latest_nonce + 1..=latest_generated_nonce,
				)
			MessageDetailsMap::new()
hacpy's avatar
hacpy committed
			SourceClientNonces { new_nonces, confirmed_nonce: Some(latest_confirmed_nonce) },
	}

	async fn generate_proof(
		&self,
		at_block: SourceHeaderIdOf<P>,
		nonces: RangeInclusive<MessageNonce>,
		proof_parameters: Self::ProofParameters,
hacpy's avatar
hacpy committed
	) -> Result<(SourceHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesProof), Self::Error>
	{
		self.client.prove_messages(at_block, nonces, proof_parameters).await
	}
}

/// Message delivery race target, which is a target of the lane.
struct MessageDeliveryRaceTarget<P: MessageLane, C> {
	client: C,
	metrics_msg: Option<MessageLaneLoopMetrics>,
	_phantom: PhantomData<P>,
}

impl<P, C> TargetClient<MessageDeliveryRace<P>> for MessageDeliveryRaceTarget<P, C>
where
	P: MessageLane,
	C: MessageLaneTargetClient<P>,
{
	type Error = C::Error;
	type TargetNoncesData = DeliveryRaceTargetNoncesData;
	async fn require_source_header(&self, id: SourceHeaderIdOf<P>) {
		self.client.require_source_header_on_target(id).await
	async fn nonces(
		&self,
		at_block: TargetHeaderIdOf<P>,
hacpy's avatar
hacpy committed
	) -> Result<(TargetHeaderIdOf<P>, TargetClientNonces<DeliveryRaceTargetNoncesData>), Self::Error>
	{
		let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
hacpy's avatar
hacpy committed
		let (at_block, latest_confirmed_nonce) =
			self.client.latest_confirmed_received_nonce(at_block).await?;
		let (at_block, unrewarded_relayers) =
			self.client.unrewarded_relayers_state(at_block).await?;
		if update_metrics {
			if let Some(metrics_msg) = self.metrics_msg.as_ref() {
				metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
				metrics_msg.update_target_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
			}
Loading full blame...