Newer
Older
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//! Message receiving race delivers proof-of-messages-delivery from "lane.target" to "lane.source".
Svyatoslav Nikolsky
committed
use crate::{
message_lane::{MessageLane, SourceHeaderIdOf, TargetHeaderIdOf},
message_lane_loop::{
NoncesSubmitArtifacts, SourceClient as MessageLaneSourceClient, SourceClientState,
TargetClient as MessageLaneTargetClient, TargetClientState,
},
message_race_loop::{
MessageRace, NoncesRange, SourceClient, SourceClientNonces, TargetClient,
TargetClientNonces,
},
message_race_strategy::BasicStrategy,
metrics::MessageLaneLoopMetrics,
};
use async_trait::async_trait;
use bp_messages::MessageNonce;
use futures::stream::FusedStream;
Svyatoslav Nikolsky
committed
use relay_utils::FailedClient;
use std::{marker::PhantomData, ops::RangeInclusive};
/// Message receiving confirmations delivery strategy.
type ReceivingConfirmationsBasicStrategy<P> = BasicStrategy<
<P as MessageLane>::TargetHeaderNumber,
<P as MessageLane>::TargetHeaderHash,
<P as MessageLane>::SourceHeaderNumber,
<P as MessageLane>::SourceHeaderHash,
RangeInclusive<MessageNonce>,
<P as MessageLane>::MessagesReceivingProof,
>;
/// Run receiving confirmations race.
pub async fn run<P: MessageLane>(
source_client: impl MessageLaneSourceClient<P>,
source_state_updates: impl FusedStream<Item = SourceClientState<P>>,
target_client: impl MessageLaneTargetClient<P>,
target_state_updates: impl FusedStream<Item = TargetClientState<P>>,
metrics_msg: Option<MessageLaneLoopMetrics>,
) -> Result<(), FailedClient> {
crate::message_race_loop::run(
ReceivingConfirmationsRaceSource {
client: target_client,
_phantom: Default::default(),
},
target_state_updates,
ReceivingConfirmationsRaceTarget {
client: source_client,
_phantom: Default::default(),
},
source_state_updates,
ReceivingConfirmationsBasicStrategy::<P>::new(),
)
.await
}
/// Messages receiving confirmations race.
struct ReceivingConfirmationsRace<P>(std::marker::PhantomData<P>);
impl<P: MessageLane> MessageRace for ReceivingConfirmationsRace<P> {
type SourceHeaderId = TargetHeaderIdOf<P>;
type TargetHeaderId = SourceHeaderIdOf<P>;
type MessageNonce = MessageNonce;
type Proof = P::MessagesReceivingProof;
fn source_name() -> String {
format!("{}::ReceivingConfirmationsDelivery", P::TARGET_NAME)
}
fn target_name() -> String {
format!("{}::ReceivingConfirmationsDelivery", P::SOURCE_NAME)
}
}
/// Message receiving confirmations race source, which is a target of the lane.
struct ReceivingConfirmationsRaceSource<P: MessageLane, C> {
client: C,
metrics_msg: Option<MessageLaneLoopMetrics>,
_phantom: PhantomData<P>,
}
impl<P, C> SourceClient<ReceivingConfirmationsRace<P>> for ReceivingConfirmationsRaceSource<P, C>
where
P: MessageLane,
C: MessageLaneTargetClient<P>,
{
type Error = C::Error;
type NoncesRange = RangeInclusive<MessageNonce>;
&self,
at_block: TargetHeaderIdOf<P>,
prev_latest_nonce: MessageNonce,
) -> Result<(TargetHeaderIdOf<P>, SourceClientNonces<Self::NoncesRange>), Self::Error> {
let (at_block, latest_received_nonce) = self.client.latest_received_nonce(at_block).await?;
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
metrics_msg.update_target_latest_received_nonce::<P>(latest_received_nonce);
SourceClientNonces {
new_nonces: prev_latest_nonce + 1..=latest_received_nonce,
confirmed_nonce: None,
},
))
async fn generate_proof(
&self,
at_block: TargetHeaderIdOf<P>,
nonces: RangeInclusive<MessageNonce>,
_proof_parameters: Self::ProofParameters,
(TargetHeaderIdOf<P>, RangeInclusive<MessageNonce>, P::MessagesReceivingProof),
Self::Error,
> {
self.client
.prove_messages_receiving(at_block)
.await
.map(|(at_block, proof)| (at_block, nonces, proof))
}
}
/// Message receiving confirmations race target, which is a source of the lane.
struct ReceivingConfirmationsRaceTarget<P: MessageLane, C> {
client: C,
metrics_msg: Option<MessageLaneLoopMetrics>,
_phantom: PhantomData<P>,
}
impl<P, C> TargetClient<ReceivingConfirmationsRace<P>> for ReceivingConfirmationsRaceTarget<P, C>
where
P: MessageLane,
C: MessageLaneSourceClient<P>,
{
type Error = C::Error;
type TargetNoncesData = ();
type TransactionTracker = C::TransactionTracker;
async fn require_source_header(&self, id: TargetHeaderIdOf<P>) {
self.client.require_target_header_on_source(id).await
&self,
at_block: SourceHeaderIdOf<P>,
Svyatoslav Nikolsky
committed
update_metrics: bool,
) -> Result<(SourceHeaderIdOf<P>, TargetClientNonces<()>), Self::Error> {
let (at_block, latest_confirmed_nonce) =
self.client.latest_confirmed_received_nonce(at_block).await?;
Svyatoslav Nikolsky
committed
if update_metrics {
if let Some(metrics_msg) = self.metrics_msg.as_ref() {
metrics_msg.update_source_latest_confirmed_nonce::<P>(latest_confirmed_nonce);
}
Ok((at_block, TargetClientNonces { latest_nonce: latest_confirmed_nonce, nonces_data: () }))
}
async fn submit_proof(
&self,
generated_at_block: TargetHeaderIdOf<P>,
nonces: RangeInclusive<MessageNonce>,
proof: P::MessagesReceivingProof,
) -> Result<NoncesSubmitArtifacts<Self::TransactionTracker>, Self::Error> {
let tx_tracker =
self.client.submit_messages_receiving_proof(generated_at_block, proof).await?;
Ok(NoncesSubmitArtifacts { nonces, tx_tracker })
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
impl NoncesRange for RangeInclusive<MessageNonce> {
fn begin(&self) -> MessageNonce {
*RangeInclusive::<MessageNonce>::start(self)
}
fn end(&self) -> MessageNonce {
*RangeInclusive::<MessageNonce>::end(self)
}
fn greater_than(self, nonce: MessageNonce) -> Option<Self> {
let next_nonce = nonce + 1;
let end = *self.end();
if next_nonce > end {
None
} else {
Some(std::cmp::max(self.begin(), next_nonce)..=end)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn range_inclusive_works_as_nonces_range() {
let range = 20..=30;
assert_eq!(NoncesRange::begin(&range), 20);
assert_eq!(NoncesRange::end(&range), 30);
assert_eq!(range.clone().greater_than(10), Some(20..=30));
assert_eq!(range.clone().greater_than(19), Some(20..=30));
assert_eq!(range.clone().greater_than(20), Some(21..=30));
assert_eq!(range.clone().greater_than(25), Some(26..=30));
assert_eq!(range.clone().greater_than(29), Some(30..=30));
assert_eq!(range.greater_than(30), None);
}
}