Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};
use async_trait::async_trait;
use bp_parachains::BestParaHeadHash;
use bp_polkadot_core::{
parachains::{ParaHash, ParaHeadsProof, ParaId},
BlockNumber as RelayBlockNumber,
};
use futures::{future::FutureExt, select};
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf};
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient};
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
time::{Duration, Instant},
};
/// Parachain heads synchronization params.
#[derive(Clone, Debug)]
pub struct ParachainSyncParams {
/// Parachains that we're relaying here.
pub parachains: Vec<ParaId>,
/// Parachain heads update strategy.
pub strategy: ParachainSyncStrategy,
/// Stall timeout. If we have submitted transaction and we see no state updates for this
/// period, we consider our transaction lost.
pub stall_timeout: Duration,
}
/// Parachain heads update strategy.
#[derive(Clone, Copy, Debug)]
pub enum ParachainSyncStrategy {
/// Update whenever any parachain head is updated.
Any,
/// Wait till all parachain heads are updated.
All,
}
/// Parachain header availability at a certain chain.
Svyatoslav Nikolsky
committed
#[derive(Clone, Copy, Debug)]
pub enum AvailableHeader<T> {
/// The client refuses to report parachain head at this moment.
Svyatoslav Nikolsky
committed
///
/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
/// This variant must be treated as "we don't want to update parachain head value at the
/// target chain at this moment".
Unavailable,
/// There's no parachain header at the relay chain.
///
/// Normally it means that the parachain is not registered there.
Missing,
/// Parachain head with given hash is available at the source chain.
Available(T),
Svyatoslav Nikolsky
committed
}
impl<T> AvailableHeader<T> {
/// Transform contained value.
pub fn map<F, U>(self, f: F) -> AvailableHeader<U>
where
F: FnOnce(T) -> U,
{
match self {
AvailableHeader::Unavailable => AvailableHeader::Unavailable,
AvailableHeader::Missing => AvailableHeader::Missing,
AvailableHeader::Available(val) => AvailableHeader::Available(f(val)),
Svyatoslav Nikolsky
committed
}
}
}
/// Source client used in parachain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
/// Returns `Ok(true)` if client is in synced state.
async fn ensure_synced(&self) -> Result<bool, Self::Error>;
/// Get parachain head hash at given block.
///
/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_source`
/// on provided `metrics` object to update corresponding metric value.
async fn parachain_head(
&self,
at_block: HeaderIdOf<P::SourceChain>,
metrics: Option<&ParachainsLoopMetrics>,
) -> Result<AvailableHeader<ParaHash>, Self::Error>;
/// Get parachain heads proof.
Svyatoslav Nikolsky
committed
///
/// The number and order of entries in the resulting parachain head hashes vector must match the
/// number and order of parachains in the `parachains` vector. The incorrect implementation will
/// result in panic.
async fn prove_parachain_heads(
&self,
at_block: HeaderIdOf<P::SourceChain>,
parachains: &[ParaId],
Svyatoslav Nikolsky
committed
) -> Result<(ParaHeadsProof, Vec<ParaHash>), Self::Error>;
}
/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
/// Get best block id.
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
/// Get best finalized source block id.
async fn best_finalized_source_block(
&self,
at_block: &HeaderIdOf<P::TargetChain>,
) -> Result<HeaderIdOf<P::SourceChain>, Self::Error>;
/// Get parachain head hash at given block.
///
/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_target`
/// on provided `metrics` object to update corresponding metric value.
async fn parachain_head(
&self,
at_block: HeaderIdOf<P::TargetChain>,
metrics: Option<&ParachainsLoopMetrics>,
para_id: ParaId,
) -> Result<Option<BestParaHeadHash>, Self::Error>;
/// Submit parachain heads proof.
async fn submit_parachain_heads_proof(
&self,
at_source_block: HeaderIdOf<P::SourceChain>,
Svyatoslav Nikolsky
committed
updated_parachains: Vec<(ParaId, ParaHash)>,
proof: ParaHeadsProof,
) -> Result<(), Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
/// sync loop.
pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
format!("{}_to_{}_Parachains", P::SourceChain::NAME, P::TargetChain::NAME)
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
}
/// Run parachain heads synchronization.
pub async fn run<P: ParachainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: ParachainSyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
.expose()
.await?
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
sync_params.clone(),
metrics,
exit_signal.clone(),
)
})
.await
}
/// Run parachain heads synchronization.
async fn run_until_connection_lost<P: ParachainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: ParachainSyncParams,
metrics: Option<ParachainsLoopMetrics>,
exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
where
P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.fuse();
let min_block_interval = std::cmp::min(
P::SourceChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
);
let mut tx_tracker: Option<TransactionTracker<P>> = None;
Loading full blame...