Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};
use async_trait::async_trait;
use bp_parachains::BestParaHeadHash;
use bp_polkadot_core::{
parachains::{ParaHash, ParaHeadsProof, ParaId},
BlockNumber as RelayBlockNumber,
};
use futures::{future::FutureExt, select};
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf};
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient};
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
time::{Duration, Instant},
};
/// Parachain heads synchronization params.
#[derive(Clone, Debug)]
pub struct ParachainSyncParams {
/// Parachains that we're relaying here.
pub parachains: Vec<ParaId>,
/// Parachain heads update strategy.
pub strategy: ParachainSyncStrategy,
/// Stall timeout. If we have submitted transaction and we see no state updates for this
/// period, we consider our transaction lost.
pub stall_timeout: Duration,
}
/// Parachain heads update strategy.
#[derive(Clone, Copy, Debug)]
pub enum ParachainSyncStrategy {
/// Update whenever any parachain head is updated.
Any,
/// Wait till all parachain heads are updated.
All,
}
Svyatoslav Nikolsky
committed
/// Parachain head hash, available at the source (relay) chain.
#[derive(Clone, Copy, Debug)]
pub enum ParaHashAtSource {
/// There's no parachain head at the source chain.
///
/// Normally it means that the parachain is not registered there.
None,
/// Parachain head with given hash is available at the source chain.
Some(ParaHash),
/// The source client refuses to report parachain head hash at this moment.
///
/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
/// This variant must be treated as "we don't want to update parachain head value at the
/// target chain at this moment".
Unavailable,
}
Svyatoslav Nikolsky
committed
impl ParaHashAtSource {
/// Return parachain head hash, if available.
pub fn hash(&self) -> Option<&ParaHash> {
match *self {
ParaHashAtSource::Some(ref para_hash) => Some(para_hash),
_ => None,
}
}
}
/// Source client used in parachain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
/// Returns `Ok(true)` if client is in synced state.
async fn ensure_synced(&self) -> Result<bool, Self::Error>;
/// Get parachain head hash at given block.
///
/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_source`
/// on provided `metrics` object to update corresponding metric value.
async fn parachain_head(
&self,
at_block: HeaderIdOf<P::SourceChain>,
metrics: Option<&ParachainsLoopMetrics>,
Svyatoslav Nikolsky
committed
) -> Result<ParaHashAtSource, Self::Error>;
/// Get parachain heads proof.
Svyatoslav Nikolsky
committed
///
/// The number and order of entries in the resulting parachain head hashes vector must match the
/// number and order of parachains in the `parachains` vector. The incorrect implementation will
/// result in panic.
async fn prove_parachain_heads(
&self,
at_block: HeaderIdOf<P::SourceChain>,
parachains: &[ParaId],
Svyatoslav Nikolsky
committed
) -> Result<(ParaHeadsProof, Vec<ParaHash>), Self::Error>;
}
/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
/// Get best block id.
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
/// Get best finalized source block id.
async fn best_finalized_source_block(
&self,
at_block: &HeaderIdOf<P::TargetChain>,
) -> Result<HeaderIdOf<P::SourceChain>, Self::Error>;
/// Get parachain head hash at given block.
///
/// The implementation may call `ParachainsLoopMetrics::update_best_parachain_block_at_target`
/// on provided `metrics` object to update corresponding metric value.
async fn parachain_head(
&self,
at_block: HeaderIdOf<P::TargetChain>,
metrics: Option<&ParachainsLoopMetrics>,
para_id: ParaId,
) -> Result<Option<BestParaHeadHash>, Self::Error>;
/// Submit parachain heads proof.
async fn submit_parachain_heads_proof(
&self,
at_source_block: HeaderIdOf<P::SourceChain>,
Svyatoslav Nikolsky
committed
updated_parachains: Vec<(ParaId, ParaHash)>,
proof: ParaHeadsProof,
) -> Result<(), Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
/// sync loop.
pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
format!("{}_to_{}_Parachains", P::SourceChain::NAME, P::TargetChain::NAME)
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
}
/// Run parachain heads synchronization.
pub async fn run<P: ParachainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: ParachainSyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
.expose()
.await?
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
sync_params.clone(),
metrics,
exit_signal.clone(),
)
})
.await
}
/// Run parachain heads synchronization.
async fn run_until_connection_lost<P: ParachainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: ParachainSyncParams,
metrics: Option<ParachainsLoopMetrics>,
exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
where
P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.fuse();
let min_block_interval = std::cmp::min(
P::SourceChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
);
let mut tx_tracker: Option<TransactionTracker<P>> = None;
futures::pin_mut!(exit_signal);
// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
Loading full blame...