Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::{parachains_loop_metrics::ParachainsLoopMetrics, ParachainsPipeline};
use async_trait::async_trait;
use bp_parachains::BestParaHeadHash;
use bp_polkadot_core::{
parachains::{ParaHash, ParaHeadsProof, ParaId},
BlockNumber as RelayBlockNumber,
};
use futures::{future::FutureExt, select};
use relay_substrate_client::{BlockNumberOf, Chain, HeaderIdOf};
use relay_utils::{metrics::MetricsParams, relay_loop::Client as RelayClient, FailedClient};
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
time::{Duration, Instant},
};
/// Parachain heads synchronization params.
#[derive(Clone, Debug)]
pub struct ParachainSyncParams {
/// Parachains that we're relaying here.
pub parachains: Vec<ParaId>,
/// Parachain heads update strategy.
pub strategy: ParachainSyncStrategy,
/// Stall timeout. If we have submitted transaction and we see no state updates for this
/// period, we consider our transaction lost.
pub stall_timeout: Duration,
}
/// Parachain heads update strategy.
#[derive(Clone, Copy, Debug)]
pub enum ParachainSyncStrategy {
/// Update whenever any parachain head is updated.
Any,
/// Wait till all parachain heads are updated.
All,
}
Svyatoslav Nikolsky
committed
/// Parachain head hash, available at the source (relay) chain.
#[derive(Clone, Copy, Debug)]
pub enum ParaHashAtSource {
/// There's no parachain head at the source chain.
///
/// Normally it means that the parachain is not registered there.
None,
/// Parachain head with given hash is available at the source chain.
Some(ParaHash),
/// The source client refuses to report parachain head hash at this moment.
///
/// It is a "mild" error, which may appear when e.g. on-demand parachains relay is used.
/// This variant must be treated as "we don't want to update parachain head value at the
/// target chain at this moment".
Unavailable,
}
/// Source client used in parachain heads synchronization loop.
#[async_trait]
pub trait SourceClient<P: ParachainsPipeline>: RelayClient {
/// Returns `Ok(true)` if client is in synced state.
async fn ensure_synced(&self) -> Result<bool, Self::Error>;
/// Get parachain head hash at given block.
async fn parachain_head(
&self,
at_block: HeaderIdOf<P::SourceChain>,
para_id: ParaId,
Svyatoslav Nikolsky
committed
) -> Result<ParaHashAtSource, Self::Error>;
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/// Get parachain heads proof.
async fn prove_parachain_heads(
&self,
at_block: HeaderIdOf<P::SourceChain>,
parachains: &[ParaId],
) -> Result<ParaHeadsProof, Self::Error>;
}
/// Target client used in parachain heads synchronization loop.
#[async_trait]
pub trait TargetClient<P: ParachainsPipeline>: RelayClient {
/// Get best block id.
async fn best_block(&self) -> Result<HeaderIdOf<P::TargetChain>, Self::Error>;
/// Get best finalized source block id.
async fn best_finalized_source_block(
&self,
at_block: &HeaderIdOf<P::TargetChain>,
) -> Result<HeaderIdOf<P::SourceChain>, Self::Error>;
/// Get parachain head hash at given block.
async fn parachain_head(
&self,
at_block: HeaderIdOf<P::TargetChain>,
para_id: ParaId,
) -> Result<Option<BestParaHeadHash>, Self::Error>;
/// Submit parachain heads proof.
async fn submit_parachain_heads_proof(
&self,
at_source_block: HeaderIdOf<P::SourceChain>,
updated_parachains: Vec<ParaId>,
proof: ParaHeadsProof,
) -> Result<(), Self::Error>;
}
/// Return prefix that will be used by default to expose Prometheus metrics of the parachains
/// sync loop.
pub fn metrics_prefix<P: ParachainsPipeline>() -> String {
format!("{}_to_{}_Sync", P::SourceChain::NAME, P::TargetChain::NAME)
}
/// Run parachain heads synchronization.
pub async fn run<P: ParachainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: ParachainSyncParams,
metrics_params: MetricsParams,
exit_signal: impl Future<Output = ()> + 'static + Send,
) -> Result<(), relay_utils::Error>
where
P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.shared();
relay_utils::relay_loop(source_client, target_client)
.with_metrics(metrics_params)
.loop_metric(ParachainsLoopMetrics::new(Some(&metrics_prefix::<P>()))?)?
.expose()
.await?
.run(metrics_prefix::<P>(), move |source_client, target_client, metrics| {
run_until_connection_lost(
source_client,
target_client,
sync_params.clone(),
metrics,
exit_signal.clone(),
)
})
.await
}
/// Run parachain heads synchronization.
async fn run_until_connection_lost<P: ParachainsPipeline>(
source_client: impl SourceClient<P>,
target_client: impl TargetClient<P>,
sync_params: ParachainSyncParams,
_metrics: Option<ParachainsLoopMetrics>,
exit_signal: impl Future<Output = ()> + Send,
) -> Result<(), FailedClient>
where
P::SourceChain: Chain<BlockNumber = RelayBlockNumber>,
{
let exit_signal = exit_signal.fuse();
let min_block_interval = std::cmp::min(
P::SourceChain::AVERAGE_BLOCK_INTERVAL,
P::TargetChain::AVERAGE_BLOCK_INTERVAL,
);
let mut tx_tracker: Option<TransactionTracker<P>> = None;
futures::pin_mut!(exit_signal);
// Note that the internal loop breaks with `FailedClient` error even if error is non-connection.
// It is Ok for now, but it may need to be fixed in the future to use exponential backoff for
// regular errors.
loop {
// either wait for new block, or exit signal
select! {
_ = async_std::task::sleep(min_block_interval).fuse() => {},
_ = exit_signal => return Ok(()),
}
// if source client is not yet synced, we'll need to sleep. Otherwise we risk submitting too
// much redundant transactions
match source_client.ensure_synced().await {
Ok(true) => (),
Ok(false) => {
log::warn!(
target: "bridge",
"{} client is syncing. Won't do anything until it is synced",
P::SourceChain::NAME,
);
continue
},
Err(e) => {
Loading full blame...