Newer
Older
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
// This file is part of Parity Bridges Common.
// Parity Bridges Common is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Parity Bridges Common is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>.
use crate::metrics::{Metrics, MetricsParams, StandaloneMetrics};
use crate::{FailedClient, MaybeConnectionError};
use async_trait::async_trait;
use std::{fmt::Debug, future::Future, net::SocketAddr, time::Duration};
use substrate_prometheus_endpoint::{init_prometheus, Registry};
/// Default pause between reconnect attempts.
pub const RECONNECT_DELAY: Duration = Duration::from_secs(10);
/// Basic blockchain client from relay perspective.
#[async_trait]
pub trait Client: Clone + Send + Sync {
/// Type of error this clients returns.
type Error: Debug + MaybeConnectionError;
/// Try to reconnect to source node.
async fn reconnect(&mut self) -> Result<(), Self::Error>;
}
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/// Returns generic loop that may be customized and started.
pub fn relay_loop<SC, TC>(source_client: SC, target_client: TC) -> Loop<SC, TC, ()> {
Loop {
source_client,
target_client,
loop_metric: None,
}
}
/// Generic relay loop.
pub struct Loop<SC, TC, LM> {
source_client: SC,
target_client: TC,
loop_metric: Option<LM>,
}
/// Relay loop metrics builder.
pub struct LoopMetrics<SC, TC, LM> {
relay_loop: Loop<SC, TC, ()>,
registry: Registry,
loop_metric: Option<LM>,
}
impl<SC, TC, LM> Loop<SC, TC, LM> {
/// Start building loop metrics using given prefix.
///
/// Panics if `prefix` is empty.
pub fn with_metrics(self, prefix: String) -> LoopMetrics<SC, TC, ()> {
assert!(!prefix.is_empty(), "Metrics prefix can not be empty");
LoopMetrics {
relay_loop: Loop {
source_client: self.source_client,
target_client: self.target_client,
loop_metric: None,
},
registry: Registry::new_custom(Some(prefix), None)
.expect("only fails if prefix is empty; prefix is not empty; qed"),
loop_metric: None,
}
}
/// Run relay loop.
///
/// This function represents an outer loop, which in turn calls provided `run_loop` function to do
/// actual job. When `run_loop` returns, this outer loop reconnects to failed client (source,
/// target or both) and calls `run_loop` again.
pub async fn run<R, F>(mut self, run_loop: R) -> Result<(), String>
where
R: Fn(SC, TC, Option<LM>) -> F,
F: Future<Output = Result<(), FailedClient>>,
SC: Client,
TC: Client,
LM: Clone,
{
loop {
let result = run_loop(
self.source_client.clone(),
self.target_client.clone(),
self.loop_metric.clone(),
)
.await;
match result {
Ok(()) => break,
Err(failed_client) => loop {
async_std::task::sleep(RECONNECT_DELAY).await;
if failed_client == FailedClient::Both || failed_client == FailedClient::Source {
match self.source_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to source client. Going to retry in {}s: {:?}",
RECONNECT_DELAY.as_secs(),
error,
);
continue;
}
if failed_client == FailedClient::Both || failed_client == FailedClient::Target {
match self.target_client.reconnect().await {
Ok(()) => (),
Err(error) => {
log::warn!(
target: "bridge",
"Failed to reconnect to target client. Going to retry in {}s: {:?}",
RECONNECT_DELAY.as_secs(),
error,
);
continue;
}
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
break;
},
}
log::debug!(target: "bridge", "Restarting relay loop");
}
Ok(())
}
}
impl<SC, TC, LM> LoopMetrics<SC, TC, LM> {
/// Add relay loop metrics.
///
/// Loop metrics will be passed to the loop callback.
pub fn loop_metric<NewLM: Metrics>(self, loop_metric: NewLM) -> Result<LoopMetrics<SC, TC, NewLM>, String> {
loop_metric.register(&self.registry)?;
Ok(LoopMetrics {
relay_loop: self.relay_loop,
registry: self.registry,
loop_metric: Some(loop_metric),
})
}
/// Add standalone metrics.
pub fn standalone_metric<M: StandaloneMetrics>(self, standalone_metrics: M) -> Result<Self, String> {
standalone_metrics.register(&self.registry)?;
standalone_metrics.spawn();
Ok(self)
}
/// Expose metrics using given params.
///
/// If `params` is `None`, metrics are not exposed.
pub async fn expose(self, params: Option<MetricsParams>) -> Result<Loop<SC, TC, LM>, String> {
if let Some(params) = params {
let socket_addr = SocketAddr::new(
params.host.parse().map_err(|err| {
format!(
"Invalid host {} is used to expose Prometheus metrics: {}",
params.host, err,
)
})?,
params.port,
);
let registry = self.registry;
async_std::task::spawn(async move {
let result = init_prometheus(socket_addr, registry).await;
log::trace!(
target: "bridge-metrics",
"Prometheus endpoint has exited with result: {:?}",
result,
);
});
Ok(Loop {
source_client: self.relay_loop.source_client,
target_client: self.relay_loop.target_client,
loop_metric: self.loop_metric,
})