From d631178e413bdc822cfd2095c7b8f8398f9b1ab1 Mon Sep 17 00:00:00 2001 From: Svyatoslav Nikolsky <svyatonik@gmail.com> Date: Wed, 8 Apr 2020 03:48:15 +0300 Subject: [PATCH] Make relay generic over source/target chains (#58) * renamed to-be-generic files * make everything required generic over source/target chains * some more fixes * cargo fmt --all * trait functions -> trait constants * cargo --fmt --all --- bridges/relays/ethereum/Cargo.toml | 1 + bridges/relays/ethereum/src/cli.yml | 81 +-- .../relays/ethereum/src/ethereum_client.rs | 8 +- .../relays/ethereum/src/ethereum_sync_loop.rs | 577 +++++------------- bridges/relays/ethereum/src/ethereum_types.rs | 103 +--- .../src/{ethereum_headers.rs => headers.rs} | 553 +++++++++-------- bridges/relays/ethereum/src/main.rs | 42 +- .../relays/ethereum/src/substrate_client.rs | 22 +- .../src/{ethereum_sync.rs => sync.rs} | 219 ++++--- bridges/relays/ethereum/src/sync_loop.rs | 464 ++++++++++++++ bridges/relays/ethereum/src/sync_types.rs | 130 ++++ 11 files changed, 1294 insertions(+), 906 deletions(-) rename bridges/relays/ethereum/src/{ethereum_headers.rs => headers.rs} (60%) rename bridges/relays/ethereum/src/{ethereum_sync.rs => sync.rs} (56%) create mode 100644 bridges/relays/ethereum/src/sync_loop.rs create mode 100644 bridges/relays/ethereum/src/sync_types.rs diff --git a/bridges/relays/ethereum/Cargo.toml b/bridges/relays/ethereum/Cargo.toml index 06b29e7e8ef..93a8d1b0a3e 100644 --- a/bridges/relays/ethereum/Cargo.toml +++ b/bridges/relays/ethereum/Cargo.toml @@ -15,6 +15,7 @@ futures = "0.3.1" jsonrpsee = { git = "https://github.com/paritytech/jsonrpsee.git", default-features = false, features = ["http"] } linked-hash-map = "0.5.2" log = "0.4.8" +num-traits = "0.2" parking_lot = "0.10.0" rustc-hex = "2.0.1" serde = { version = "1.0.106", features = ["derive"] } diff --git a/bridges/relays/ethereum/src/cli.yml b/bridges/relays/ethereum/src/cli.yml index 750d4f39274..fad56da722c 100644 --- a/bridges/relays/ethereum/src/cli.yml +++ b/bridges/relays/ethereum/src/cli.yml @@ -2,43 +2,44 @@ name: ethsub-bridge version: "0.1.0" author: Parity Technologies <admin@parity.io> about: Parity Ethereum (PoA) <-> Substrate bridge -args: - - eth-host: - long: eth-host - value_name: ETH_HOST - help: Connect to Ethereum node at given host. - takes_value: true - - eth-port: - long: eth-port - value_name: ETH_PORT - help: Connect to Ethereum node at given port. - takes_value: true - - sub-host: - long: sub-host - value_name: SUB_HOST - help: Connect to Substrate node at given host. - takes_value: true - - sub-port: - long: sub-port - value_name: SUB_PORT - help: Connect to Substrate node at given port. - takes_value: true - - sub-tx-mode: - long: sub-tx-mode - value_name: MODE - help: Submit headers using signed (default) or unsigned transactions. Third mode - backup - submits signed transactions only when we believe that sync has stalled. - takes_value: true - possible_values: - - signed - - unsigned - - backup - - sub-signer: - long: sub-signer - value_name: SUB_SIGNER - help: The SURI of secret key to use when transactions are submitted to the Substrate node. - takes_value: true - - sub-signer-password: - long: sub-signer-password - value_name: SUB_SIGNER_PASSWORD - help: The password for the SURI of secret key to use when transactions are submitted to the Substrate node. - takes_value: true \ No newline at end of file +subcommands: + - eth-to-sub: + about: Synchronize headers from Ethereum node to Substrate node. + args: + - eth-host: + long: eth-host + value_name: ETH_HOST + help: Connect to Ethereum node at given host. + takes_value: true + - eth-port: + long: eth-port + value_name: ETH_PORT + help: Connect to Ethereum node at given port. + takes_value: true + - sub-host: + long: sub-host + value_name: SUB_HOST + help: Connect to Substrate node at given host. + takes_value: true + - sub-port: + long: sub-port + value_name: SUB_PORT + help: Connect to Substrate node at given port. + takes_value: true + - sub-tx-mode: + long: sub-tx-mode + value_name: MODE + help: Submit headers using signed (default) or unsigned transactions. Third mode - backup - submits signed transactions only when we believe that sync has stalled. + takes_value: true + possible_values: + - signed + - unsigned + - backup + - sub-signer: + long: sub-signer + value_name: SUB_SIGNER + help: The SURI of secret key to use when transactions are submitted to the Substrate node. + - sub-signer-password: + long: sub-signer-password + value_name: SUB_SIGNER_PASSWORD + help: The password for the SURI of secret key to use when transactions are submitted to the Substrate node. diff --git a/bridges/relays/ethereum/src/ethereum_client.rs b/bridges/relays/ethereum/src/ethereum_client.rs index 30a6bfb28a8..cddbc8bc148 100644 --- a/bridges/relays/ethereum/src/ethereum_client.rs +++ b/bridges/relays/ethereum/src/ethereum_client.rs @@ -14,8 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. -use crate::ethereum_sync_loop::MaybeConnectionError; -use crate::ethereum_types::{Header, HeaderId, Receipt, H256, U64}; +use crate::ethereum_types::{EthereumHeaderId, Header, Receipt, H256, U64}; +use crate::sync_types::MaybeConnectionError; use jsonrpsee::common::Params; use jsonrpsee::raw::{RawClient, RawClientError}; use jsonrpsee::transport::http::{HttpTransportClient, RequestError}; @@ -117,9 +117,9 @@ pub async fn header_by_hash(client: Client, hash: H256) -> (Client, Result<Heade /// Retrieve transactions receipts for given block. pub async fn transactions_receipts( mut client: Client, - id: HeaderId, + id: EthereumHeaderId, transacactions: Vec<H256>, -) -> (Client, Result<(HeaderId, Vec<Receipt>), Error>) { +) -> (Client, Result<(EthereumHeaderId, Vec<Receipt>), Error>) { let mut transactions_receipts = Vec::with_capacity(transacactions.len()); for transacaction in transacactions { let (next_client, transaction_receipt) = transaction_receipt(client, transacaction).await; diff --git a/bridges/relays/ethereum/src/ethereum_sync_loop.rs b/bridges/relays/ethereum/src/ethereum_sync_loop.rs index b08d503a5fa..705d78fe9cd 100644 --- a/bridges/relays/ethereum/src/ethereum_sync_loop.rs +++ b/bridges/relays/ethereum/src/ethereum_sync_loop.rs @@ -15,40 +15,18 @@ // along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. use crate::ethereum_client; -use crate::ethereum_types::HeaderStatus as EthereumHeaderStatus; +use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, QueuedEthereumHeader, Receipt}; use crate::substrate_client; -use futures::{future::FutureExt, stream::StreamExt}; - -// TODO: when SharedClient will be available, switch to Substrate headers subscription -// (because we do not need old Substrate headers) +use crate::sync::{HeadersSyncParams, TargetTransactionMode}; +use crate::sync_loop::{SourceClient, TargetClient}; +use futures::future::FutureExt; +use std::{future::Future, pin::Pin}; +pub use web3::types::H256; /// Interval (in ms) at which we check new Ethereum headers when we are synced/almost synced. const ETHEREUM_TICK_INTERVAL_MS: u64 = 10_000; /// Interval (in ms) at which we check new Substrate blocks. const SUBSTRATE_TICK_INTERVAL_MS: u64 = 5_000; -/// When we submit Ethereum headers to Substrate runtime, but see no updates of best -/// Ethereum block known to Substrate runtime during STALL_SYNC_TIMEOUT_MS milliseconds, -/// we consider that our headers are rejected because there has been reorg in Substrate. -/// This reorg could invalidate our knowledge about sync process (i.e. we have asked if -/// HeaderA is known to Substrate, but then reorg happened and the answer is different -/// now) => we need to reset sync. -/// The other option is to receive **EVERY** best Substrate header and check if it is -/// direct child of previous best header. But: (1) subscription doesn't guarantee that -/// the subscriber will receive every best header (2) reorg won't always lead to sync -/// stall and restart is a heavy operation (we forget all in-memory headers). -const STALL_SYNC_TIMEOUT_MS: u64 = 30_000; -/// Delay (in milliseconds) after we have seen update of best Ethereum header in Substrate, -/// for us to treat sync stalled. ONLY when relay operates in backup mode. -const BACKUP_STALL_SYNC_TIMEOUT_MS: u64 = 5 * 60_000; -/// Delay (in milliseconds) after connection-related error happened before we'll try -/// reconnection again. -const CONNECTION_ERROR_DELAY_MS: u64 = 10_000; - -/// Error type that can signal connection errors. -pub trait MaybeConnectionError { - /// Returns true if error (maybe) represents connection error. - fn is_connection_error(&self) -> bool; -} /// Ethereum synchronization parameters. pub struct EthereumSyncParams { @@ -60,33 +38,10 @@ pub struct EthereumSyncParams { pub sub_host: String, /// Substrate RPC port. pub sub_port: u16, - /// Substrate transactions submission mode. - pub sub_tx_mode: SubstrateTransactionMode, /// Substrate transactions signer. pub sub_signer: sp_core::sr25519::Pair, - /// Maximal number of ethereum headers to pre-download. - pub max_future_headers_to_download: usize, - /// Maximal number of active (we believe) submit header transactions. - pub max_headers_in_submitted_status: usize, - /// Maximal number of headers in single submit request. - pub max_headers_in_single_submit: usize, - /// Maximal total headers size in single submit request. - pub max_headers_size_in_single_submit: usize, - /// We only may store and accept (from Ethereum node) headers that have - /// number >= than best_substrate_header.number - prune_depth. - pub prune_depth: u64, -} - -/// Substrate transaction mode. -#[derive(Debug, PartialEq)] -pub enum SubstrateTransactionMode { - /// Submit eth headers using signed substrate transactions. - Signed, - /// Submit eth headers using unsigned substrate transactions. - Unsigned, - /// Submit eth headers using signed substrate transactions, but only when we - /// believe that sync has stalled. - Backup, + /// Synchronization parameters. + pub sync_params: HeadersSyncParams, } impl std::fmt::Debug for EthereumSyncParams { @@ -96,15 +51,7 @@ impl std::fmt::Debug for EthereumSyncParams { .field("eth_port", &self.eth_port) .field("sub_host", &self.sub_port) .field("sub_port", &self.sub_port) - .field("sub_tx_mode", &self.sub_tx_mode) - .field("max_future_headers_to_download", &self.max_future_headers_to_download) - .field("max_headers_in_submitted_status", &self.max_headers_in_submitted_status) - .field("max_headers_in_single_submit", &self.max_headers_in_single_submit) - .field( - "max_headers_size_in_single_submit", - &self.max_headers_size_in_single_submit, - ) - .field("prune_depth", &self.prune_depth) + .field("sync_params", &self.sync_params) .finish() } } @@ -116,394 +63,158 @@ impl Default for EthereumSyncParams { eth_port: 8545, sub_host: "localhost".into(), sub_port: 9933, - sub_tx_mode: SubstrateTransactionMode::Signed, sub_signer: sp_keyring::AccountKeyring::Alice.pair(), - max_future_headers_to_download: 128, - max_headers_in_submitted_status: 128, - max_headers_in_single_submit: 32, - max_headers_size_in_single_submit: 131_072, - prune_depth: 4096, + sync_params: Default::default(), } } } -/// Run Ethereum headers synchronization. -pub fn run(params: EthereumSyncParams) { - let mut local_pool = futures::executor::LocalPool::new(); - let mut progress_context = (std::time::Instant::now(), None, None); - let sign_sub_transactions = match params.sub_tx_mode { - SubstrateTransactionMode::Signed | SubstrateTransactionMode::Backup => true, - SubstrateTransactionMode::Unsigned => false, - }; - - local_pool.run_until(async move { - let eth_uri = format!("http://{}:{}", params.eth_host, params.eth_port); - let sub_uri = format!("http://{}:{}", params.sub_host, params.sub_port); - let sub_signer = params.sub_signer.clone(); - - let mut eth_sync = crate::ethereum_sync::HeadersSync::new(params); - let mut stall_countdown = None; - let mut last_update_time = std::time::Instant::now(); - - let mut eth_maybe_client = None; - let mut eth_best_block_number_required = false; - let eth_best_block_number_future = ethereum_client::best_block_number(ethereum_client::client(ð_uri)).fuse(); - let eth_new_header_future = futures::future::Fuse::terminated(); - let eth_orphan_header_future = futures::future::Fuse::terminated(); - let eth_receipts_future = futures::future::Fuse::terminated(); - let eth_go_offline_future = futures::future::Fuse::terminated(); - let eth_tick_stream = interval(ETHEREUM_TICK_INTERVAL_MS).fuse(); - - let mut sub_maybe_client = None; - let mut sub_best_block_required = false; - let sub_best_block_future = - substrate_client::best_ethereum_block(substrate_client::client(&sub_uri, sub_signer)).fuse(); - let sub_receipts_check_future = futures::future::Fuse::terminated(); - let sub_existence_status_future = futures::future::Fuse::terminated(); - let sub_submit_header_future = futures::future::Fuse::terminated(); - let sub_go_offline_future = futures::future::Fuse::terminated(); - let sub_tick_stream = interval(SUBSTRATE_TICK_INTERVAL_MS).fuse(); - - futures::pin_mut!( - eth_best_block_number_future, - eth_new_header_future, - eth_orphan_header_future, - eth_receipts_future, - eth_go_offline_future, - eth_tick_stream, - sub_best_block_future, - sub_receipts_check_future, - sub_existence_status_future, - sub_submit_header_future, - sub_go_offline_future, - sub_tick_stream - ); - - loop { - futures::select! { - (eth_client, eth_best_block_number) = eth_best_block_number_future => { - eth_best_block_number_required = false; - - process_future_result( - &mut eth_maybe_client, - eth_client, - eth_best_block_number, - |eth_best_block_number| eth_sync.ethereum_best_header_number_response(eth_best_block_number), - &mut eth_go_offline_future, - |eth_client| delay(CONNECTION_ERROR_DELAY_MS, eth_client), - "Error retrieving best header number from Ethereum number", - ); - }, - (eth_client, eth_new_header) = eth_new_header_future => { - process_future_result( - &mut eth_maybe_client, - eth_client, - eth_new_header, - |eth_new_header| eth_sync.headers_mut().header_response(eth_new_header), - &mut eth_go_offline_future, - |eth_client| delay(CONNECTION_ERROR_DELAY_MS, eth_client), - "Error retrieving header from Ethereum node", - ); - }, - (eth_client, eth_orphan_header) = eth_orphan_header_future => { - process_future_result( - &mut eth_maybe_client, - eth_client, - eth_orphan_header, - |eth_orphan_header| eth_sync.headers_mut().header_response(eth_orphan_header), - &mut eth_go_offline_future, - |eth_client| delay(CONNECTION_ERROR_DELAY_MS, eth_client), - "Error retrieving orphan header from Ethereum node", - ); - }, - (eth_client, eth_receipts) = eth_receipts_future => { - process_future_result( - &mut eth_maybe_client, - eth_client, - eth_receipts, - |(header, receipts)| eth_sync.headers_mut().receipts_response(&header, receipts), - &mut eth_go_offline_future, - |eth_client| delay(CONNECTION_ERROR_DELAY_MS, eth_client), - "Error retrieving transactions receipts from Ethereum node", - ); - }, - eth_client = eth_go_offline_future => { - eth_maybe_client = Some(eth_client); - }, - _ = eth_tick_stream.next() => { - if eth_sync.is_almost_synced() { - eth_best_block_number_required = true; - } - }, - (sub_client, sub_best_block) = sub_best_block_future => { - sub_best_block_required = false; - - process_future_result( - &mut sub_maybe_client, - sub_client, - sub_best_block, - |sub_best_block| { - let head_updated = eth_sync.substrate_best_header_response(sub_best_block); - if head_updated { - last_update_time = std::time::Instant::now(); - } - match head_updated { - // IF head is updated AND there are still our transactions: - // => restart stall countdown timer - true if eth_sync.headers().headers_in_status(EthereumHeaderStatus::Submitted) != 0 => - stall_countdown = Some(std::time::Instant::now()), - // IF head is updated AND there are no our transactions: - // => stop stall countdown timer - true => stall_countdown = None, - // IF head is not updated AND stall countdown is not yet completed - // => do nothing - false if stall_countdown - .map(|stall_countdown| std::time::Instant::now() - stall_countdown < - std::time::Duration::from_millis(STALL_SYNC_TIMEOUT_MS)) - .unwrap_or(true) - => (), - // IF head is not updated AND stall countdown has completed - // => restart sync - false => { - log::info!( - target: "bridge", - "Possible Substrate fork detected. Restarting Ethereum headers synchronization.", - ); - stall_countdown = None; - eth_sync.restart(); - }, - } - }, - &mut sub_go_offline_future, - |sub_client| delay(CONNECTION_ERROR_DELAY_MS, sub_client), - "Error retrieving best known header from Substrate node", - ); - }, - (sub_client, sub_existence_status) = sub_existence_status_future => { - process_future_result( - &mut sub_maybe_client, - sub_client, - sub_existence_status, - |(sub_header, sub_existence_status)| eth_sync - .headers_mut() - .maybe_orphan_response(&sub_header, sub_existence_status), - &mut sub_go_offline_future, - |sub_client| delay(CONNECTION_ERROR_DELAY_MS, sub_client), - "Error retrieving existence status from Substrate node", - ); - }, - (sub_client, sub_submit_header_result) = sub_submit_header_future => { - process_future_result( - &mut sub_maybe_client, - sub_client, - sub_submit_header_result, - |(_, submitted_headers)| eth_sync.headers_mut().headers_submitted(submitted_headers), - &mut sub_go_offline_future, - |sub_client| delay(CONNECTION_ERROR_DELAY_MS, sub_client), - "Error submitting headers to Substrate node", - ); - }, - (sub_client, sub_receipts_check_result) = sub_receipts_check_future => { - // we can minimize number of receipts_check calls by checking header - // logs bloom here, but it may give us false positives (when authorities - // source is contract, we never need any logs) - process_future_result( - &mut sub_maybe_client, - sub_client, - sub_receipts_check_result, - |(header, receipts_check_result)| eth_sync - .headers_mut() - .maybe_receipts_response(&header, receipts_check_result), - &mut sub_go_offline_future, - |sub_client| delay(CONNECTION_ERROR_DELAY_MS, sub_client), - "Error retrieving receipts requirement from Substrate node", - ); - }, - sub_client = sub_go_offline_future => { - sub_maybe_client = Some(sub_client); - }, - _ = sub_tick_stream.next() => { - sub_best_block_required = true; - }, - } - - // print progress - progress_context = print_progress(progress_context, ð_sync); - - // if client is available: wait, or call Substrate RPC methods - if let Some(sub_client) = sub_maybe_client.take() { - // the priority is to: - // 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely; - // 2) check transactions receipts - it stops us from downloading/submitting new blocks; - // 3) check existence - it stops us from submitting new blocks; - // 4) submit header - - if sub_best_block_required { - log::debug!(target: "bridge", "Asking Substrate about best block"); - sub_best_block_future.set(substrate_client::best_ethereum_block(sub_client).fuse()); - } else if let Some(header) = eth_sync.headers().header(EthereumHeaderStatus::MaybeReceipts) { - log::debug!( - target: "bridge", - "Checking if header submission requires receipts: {:?}", - header.id(), - ); - - let header = header.clone(); - sub_receipts_check_future - .set(substrate_client::ethereum_receipts_required(sub_client, header).fuse()); - } else if let Some(header) = eth_sync.headers().header(EthereumHeaderStatus::MaybeOrphan) { - // for MaybeOrphan we actually ask for parent' header existence - let parent_id = header.parent_id(); - - log::debug!( - target: "bridge", - "Asking Substrate node for existence of: {:?}", - parent_id, - ); - - sub_existence_status_future - .set(substrate_client::ethereum_header_known(sub_client, parent_id).fuse()); - } else if let Some(headers) = eth_sync.select_headers_to_submit( - last_update_time.elapsed() > std::time::Duration::from_millis(BACKUP_STALL_SYNC_TIMEOUT_MS), - ) { - let ids = match headers.len() { - 1 => format!("{:?}", headers[0].id()), - 2 => format!("[{:?}, {:?}]", headers[0].id(), headers[1].id()), - len => format!("[{:?} ... {:?}]", headers[0].id(), headers[len - 1].id()), - }; - log::debug!( - target: "bridge", - "Submitting {} header(s) to Substrate node: {:?}", - headers.len(), - ids, - ); - - let headers = headers.into_iter().cloned().collect(); - sub_submit_header_future.set( - substrate_client::submit_ethereum_headers(sub_client, headers, sign_sub_transactions).fuse(), - ); - - // remember that we have submitted some headers - if stall_countdown.is_none() { - stall_countdown = Some(std::time::Instant::now()); - } - } else { - sub_maybe_client = Some(sub_client); - } - } - - // if client is available: wait, or call Ethereum RPC methods - if let Some(eth_client) = eth_maybe_client.take() { - // the priority is to: - // 1) get best block - it stops us from downloading new blocks + we call it rarely; - // 2) check transactions receipts - it stops us from downloading/submitting new blocks; - // 3) check existence - it stops us from submitting new blocks; - // 4) submit header +/// Ethereum client as headers source. +struct EthereumHeadersSource { + /// Ethereum node client. + client: ethereum_client::Client, +} - if eth_best_block_number_required { - log::debug!(target: "bridge", "Asking Ethereum node about best block number"); - eth_best_block_number_future.set(ethereum_client::best_block_number(eth_client).fuse()); - } else if let Some(header) = eth_sync.headers().header(EthereumHeaderStatus::Receipts) { - let id = header.id(); - log::debug!( - target: "bridge", - "Retrieving receipts for header: {:?}", - id, - ); - eth_receipts_future.set( - ethereum_client::transactions_receipts(eth_client, id, header.header().transactions.clone()) - .fuse(), - ); - } else if let Some(header) = eth_sync.headers().header(EthereumHeaderStatus::Orphan) { - // for Orphan we actually ask for parent' header - let parent_id = header.parent_id(); +impl SourceClient<EthereumHeadersSyncPipeline> for EthereumHeadersSource { + type Error = ethereum_client::Error; + type BestBlockNumberFuture = Pin<Box<dyn Future<Output = (Self, Result<u64, Self::Error>)>>>; + type HeaderByHashFuture = Pin<Box<dyn Future<Output = (Self, Result<Header, Self::Error>)>>>; + type HeaderByNumberFuture = Pin<Box<dyn Future<Output = (Self, Result<Header, Self::Error>)>>>; + type HeaderExtraFuture = + Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, Vec<Receipt>), Self::Error>)>>>; + + fn best_block_number(self) -> Self::BestBlockNumberFuture { + ethereum_client::best_block_number(self.client) + .map(|(client, result)| (EthereumHeadersSource { client }, result)) + .boxed() + } - log::debug!( - target: "bridge", - "Going to download orphan header from Ethereum node: {:?}", - parent_id, - ); + fn header_by_hash(self, hash: H256) -> Self::HeaderByHashFuture { + ethereum_client::header_by_hash(self.client, hash) + .map(|(client, result)| (EthereumHeadersSource { client }, result)) + .boxed() + } - eth_orphan_header_future.set(ethereum_client::header_by_hash(eth_client, parent_id.1).fuse()); - } else if let Some(id) = eth_sync.select_new_header_to_download() { - log::debug!( - target: "bridge", - "Going to download new header from Ethereum node: {:?}", - id, - ); + fn header_by_number(self, number: u64) -> Self::HeaderByNumberFuture { + ethereum_client::header_by_number(self.client, number) + .map(|(client, result)| (EthereumHeadersSource { client }, result)) + .boxed() + } - eth_new_header_future.set(ethereum_client::header_by_number(eth_client, id).fuse()); - } else { - eth_maybe_client = Some(eth_client); - } - } - } - }); + fn header_extra(self, id: EthereumHeaderId, header: &Header) -> Self::HeaderExtraFuture { + ethereum_client::transactions_receipts(self.client, id, header.transactions.clone()) + .map(|(client, result)| (EthereumHeadersSource { client }, result)) + .boxed() + } } -fn print_progress( - progress_context: (std::time::Instant, Option<u64>, Option<u64>), - eth_sync: &crate::ethereum_sync::HeadersSync, -) -> (std::time::Instant, Option<u64>, Option<u64>) { - let (prev_time, prev_best_header, prev_target_header) = progress_context; - let now_time = std::time::Instant::now(); - let (now_best_header, now_target_header) = eth_sync.status(); +/// Substrate client as Ethereum headers target. +struct SubstrateHeadersTarget { + /// Substrate node client. + client: substrate_client::Client, + /// Substrate transactions signer. + signer: sp_core::sr25519::Pair, + /// Whether we want to submit signed (true), or unsigned (false) transactions. + sign_transactions: bool, +} - let need_update = now_time - prev_time > std::time::Duration::from_secs(10) - || match (prev_best_header, now_best_header) { - (Some(prev_best_header), Some(now_best_header)) => now_best_header.0.saturating_sub(prev_best_header) > 10, - _ => false, - }; - if !need_update { - return (prev_time, prev_best_header, prev_target_header); +impl TargetClient<EthereumHeadersSyncPipeline> for SubstrateHeadersTarget { + type Error = substrate_client::Error; + type BestHeaderIdFuture = Pin<Box<dyn Future<Output = (Self, Result<EthereumHeaderId, Self::Error>)>>>; + type IsKnownHeaderFuture = Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, bool), Self::Error>)>>>; + type RequiresExtraFuture = Pin<Box<dyn Future<Output = (Self, Result<(EthereumHeaderId, bool), Self::Error>)>>>; + type SubmitHeadersFuture = Pin<Box<dyn Future<Output = (Self, Result<Vec<EthereumHeaderId>, Self::Error>)>>>; + + fn best_header_id(self) -> Self::BestHeaderIdFuture { + let (signer, sign_transactions) = (self.signer, self.sign_transactions); + substrate_client::best_ethereum_block(self.client) + .map(move |(client, result)| { + ( + SubstrateHeadersTarget { + client, + signer, + sign_transactions, + }, + result, + ) + }) + .boxed() } - log::info!( - target: "bridge", - "Synced {:?} of {:?} headers", - now_best_header.map(|id| id.0), - now_target_header, - ); - (now_time, now_best_header.clone().map(|id| id.0), *now_target_header) -} + fn is_known_header(self, id: EthereumHeaderId) -> Self::IsKnownHeaderFuture { + let (signer, sign_transactions) = (self.signer, self.sign_transactions); + substrate_client::ethereum_header_known(self.client, id) + .map(move |(client, result)| { + ( + SubstrateHeadersTarget { + client, + signer, + sign_transactions, + }, + result, + ) + }) + .boxed() + } -async fn delay<T>(timeout_ms: u64, retval: T) -> T { - async_std::task::sleep(std::time::Duration::from_millis(timeout_ms)).await; - retval -} + fn requires_extra(self, header: &QueuedEthereumHeader) -> Self::RequiresExtraFuture { + // we can minimize number of receipts_check calls by checking header + // logs bloom here, but it may give us false positives (when authorities + // source is contract, we never need any logs) + let (signer, sign_transactions) = (self.signer, self.sign_transactions); + substrate_client::ethereum_receipts_required(self.client, header.clone()) + .map(move |(client, result)| { + ( + SubstrateHeadersTarget { + client, + signer, + sign_transactions, + }, + result, + ) + }) + .boxed() + } -fn interval(timeout_ms: u64) -> impl futures::Stream<Item = ()> { - futures::stream::unfold((), move |_| async move { - delay(timeout_ms, ()).await; - Some(((), ())) - }) + fn submit_headers(self, headers: Vec<QueuedEthereumHeader>) -> Self::SubmitHeadersFuture { + let (signer, sign_transactions) = (self.signer, self.sign_transactions); + substrate_client::submit_ethereum_headers(self.client, signer.clone(), headers, sign_transactions) + .map(move |(client, result)| { + ( + SubstrateHeadersTarget { + client, + signer, + sign_transactions, + }, + result.map(|(_, submitted_headers)| submitted_headers), + ) + }) + .boxed() + } } -fn process_future_result<TClient, TResult, TError, TGoOfflineFuture>( - maybe_client: &mut Option<TClient>, - client: TClient, - result: Result<TResult, TError>, - on_success: impl FnOnce(TResult), - go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>, - go_offline: impl FnOnce(TClient) -> TGoOfflineFuture, - error_pattern: &'static str, -) where - TError: std::fmt::Debug + MaybeConnectionError, - TGoOfflineFuture: FutureExt, -{ - match result { - Ok(result) => { - *maybe_client = Some(client); - on_success(result); - } - Err(error) => { - if error.is_connection_error() { - go_offline_future.set(go_offline(client).fuse()); - } else { - *maybe_client = Some(client); - } +/// Run Ethereum headers synchronization. +pub fn run(params: EthereumSyncParams) { + let eth_uri = format!("http://{}:{}", params.eth_host, params.eth_port); + let eth_client = ethereum_client::client(ð_uri); + + let sub_uri = format!("http://{}:{}", params.sub_host, params.sub_port); + let sub_client = substrate_client::client(&sub_uri); + let sub_signer = params.sub_signer; + let sign_sub_transactions = match params.sync_params.target_tx_mode { + TargetTransactionMode::Signed | TargetTransactionMode::Backup => true, + TargetTransactionMode::Unsigned => false, + }; - log::error!(target: "bridge", "{}: {:?}", error_pattern, error); - } - } + crate::sync_loop::run( + EthereumHeadersSource { client: eth_client }, + ETHEREUM_TICK_INTERVAL_MS, + SubstrateHeadersTarget { + client: sub_client, + signer: sub_signer, + sign_transactions: sign_sub_transactions, + }, + SUBSTRATE_TICK_INTERVAL_MS, + params.sync_params, + ); } diff --git a/bridges/relays/ethereum/src/ethereum_types.rs b/bridges/relays/ethereum/src/ethereum_types.rs index beb6c100fb4..93c47da4f7f 100644 --- a/bridges/relays/ethereum/src/ethereum_types.rs +++ b/bridges/relays/ethereum/src/ethereum_types.rs @@ -14,6 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. +use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts}; +use crate::sync_types::{HeaderId, HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use codec::Encode; + pub use web3::types::{Bytes, H256, U128, U64}; /// When header is just received from the Ethereum node, we check that it has @@ -30,84 +34,43 @@ pub type Header = web3::types::Block<H256>; /// Ethereum transaction receipt type. pub type Receipt = web3::types::TransactionReceipt; -/// Ethereum header Id. -#[derive(Debug, Clone, Copy, PartialEq)] -pub struct HeaderId(pub u64, pub H256); +/// Ethereum header ID. +pub type EthereumHeaderId = HeaderId<H256, u64>; -impl From<&Header> for HeaderId { - fn from(header: &Header) -> HeaderId { - HeaderId( - header.number.expect(HEADER_ID_PROOF).as_u64(), - header.hash.expect(HEADER_ID_PROOF), - ) - } -} +/// Queued ethereum header ID. +pub type QueuedEthereumHeader = QueuedHeader<EthereumHeadersSyncPipeline>; -/// Ethereum header synchronization status. -#[derive(Debug, Clone, Copy, PartialEq)] -pub enum HeaderStatus { - /// Header is unknown. - Unknown, - /// Header is in MaybeOrphan queue. - MaybeOrphan, - /// Header is in Orphan queue. - Orphan, - /// Header is in MaybeReceipts queue. - MaybeReceipts, - /// Header is in Receipts queue. - Receipts, - /// Header is in Ready queue. - Ready, - /// Header has been recently submitted to the Substrate runtime. - Submitted, - /// Header is known to the Substrate runtime. - Synced, -} - -#[derive(Clone, Debug, Default)] +/// Ethereum synchronization pipeline. +#[derive(Clone, Copy, Debug)] #[cfg_attr(test, derive(PartialEq))] -pub struct QueuedHeader { - header: Header, - receipts: Option<Vec<Receipt>>, -} - -impl QueuedHeader { - /// Creates new queued header. - pub fn new(header: Header) -> Self { - QueuedHeader { header, receipts: None } - } - - /// Returns ID of header. - pub fn id(&self) -> HeaderId { - (&self.header).into() +pub struct EthereumHeadersSyncPipeline; + +impl HeadersSyncPipeline for EthereumHeadersSyncPipeline { + const SOURCE_NAME: &'static str = "Ethereum"; + const TARGET_NAME: &'static str = "Substrate"; + + type Hash = H256; + type Number = u64; + type Header = Header; + type Extra = Vec<Receipt>; + + fn estimate_size(source: &QueuedHeader<Self>) -> usize { + into_substrate_ethereum_header(source.header()).encode().len() + + into_substrate_ethereum_receipts(source.extra()) + .map(|extra| extra.encode().len()) + .unwrap_or(0) } +} - /// Returns ID of parent header. - pub fn parent_id(&self) -> HeaderId { +impl SourceHeader<H256, u64> for Header { + fn id(&self) -> EthereumHeaderId { HeaderId( - self.header.number.expect(HEADER_ID_PROOF).as_u64() - 1, - self.header.parent_hash, + self.number.expect(HEADER_ID_PROOF).as_u64(), + self.hash.expect(HEADER_ID_PROOF), ) } - /// Returns reference to header. - pub fn header(&self) -> &Header { - &self.header - } - - /// Returns reference to transactions receipts. - pub fn receipts(&self) -> &Option<Vec<Receipt>> { - &self.receipts - } - - /// Extract header and receipts from self. - pub fn extract(self) -> (Header, Option<Vec<Receipt>>) { - (self.header, self.receipts) - } - - /// Set associated transaction receipts. - pub fn set_receipts(mut self, receipts: Vec<Receipt>) -> Self { - self.receipts = Some(receipts); - self + fn parent_id(&self) -> EthereumHeaderId { + HeaderId(self.number.expect(HEADER_ID_PROOF).as_u64() - 1, self.parent_hash) } } diff --git a/bridges/relays/ethereum/src/ethereum_headers.rs b/bridges/relays/ethereum/src/headers.rs similarity index 60% rename from bridges/relays/ethereum/src/ethereum_headers.rs rename to bridges/relays/ethereum/src/headers.rs index 881bc15fee8..841743ce1d5 100644 --- a/bridges/relays/ethereum/src/ethereum_headers.rs +++ b/bridges/relays/ethereum/src/headers.rs @@ -14,46 +14,63 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. -use crate::ethereum_types::{Header, HeaderId, HeaderStatus, QueuedHeader, Receipt, H256}; +use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, QueuedHeader, SourceHeader}; +use num_traits::{One, Zero}; use std::collections::{ btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, HashMap, HashSet, }; -type HeadersQueue = BTreeMap<u64, HashMap<H256, QueuedHeader>>; -type KnownHeaders = BTreeMap<u64, HashMap<H256, HeaderStatus>>; +type HeadersQueue<P> = + BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, QueuedHeader<P>>>; +type KnownHeaders<P> = + BTreeMap<<P as HeadersSyncPipeline>::Number, HashMap<<P as HeadersSyncPipeline>::Hash, HeaderStatus>>; /// Ethereum headers queue. -#[derive(Debug, Default)] -pub struct QueuedHeaders { - /// Headers that are received from Ethereum node, but we (native ethereum sync code) have +#[derive(Debug)] +pub struct QueuedHeaders<P: HeadersSyncPipeline> { + /// Headers that are received from source node, but we (native sync code) have /// never seen their parents. So we need to check if we can/should submit this header. - maybe_orphan: HeadersQueue, - /// Headers that are received from Ethreum node, and we (native ethereum sync code) have + maybe_orphan: HeadersQueue<P>, + /// Headers that are received from source node, and we (native sync code) have /// checked that Substrate runtime doesn't know their parents. So we need to submit parents /// first. - orphan: HeadersQueue, - /// Headers that are ready to be submitted to Substrate runtime, but we need to check - /// whether submission requires transactions receipts to be provided. - maybe_receipts: HeadersQueue, - /// Headers that are ready to be submitted to Substrate runtime, but we need to retrieve - /// transactions receipts first. - receipts: HeadersQueue, - /// Headers that are ready to be submitted to Substrate runtime. - ready: HeadersQueue, - /// Headers that are (we believe) are currently submitted to Substrate runtime by our, + orphan: HeadersQueue<P>, + /// Headers that are ready to be submitted to target node, but we need to check + /// whether submission requires extra data to be provided. + maybe_extra: HeadersQueue<P>, + /// Headers that are ready to be submitted to target node, but we need to retrieve + /// extra data first. + extra: HeadersQueue<P>, + /// Headers that are ready to be submitted to target node. + ready: HeadersQueue<P>, + /// Headers that are (we believe) currently submitted to target node by our, /// not-yet mined transactions. - submitted: HeadersQueue, + submitted: HeadersQueue<P>, /// Pointers to all headers that we ever seen and we believe we can touch in the future. - known_headers: KnownHeaders, + known_headers: KnownHeaders<P>, /// Pruned blocks border. We do not store or accept any blocks with number less than /// this number. - prune_border: u64, + prune_border: P::Number, } -impl QueuedHeaders { +impl<P: HeadersSyncPipeline> QueuedHeaders<P> { + /// Returns new QueuedHeaders. + pub fn new() -> Self { + QueuedHeaders { + maybe_orphan: HeadersQueue::new(), + orphan: HeadersQueue::new(), + maybe_extra: HeadersQueue::new(), + extra: HeadersQueue::new(), + ready: HeadersQueue::new(), + submitted: HeadersQueue::new(), + known_headers: KnownHeaders::<P>::new(), + prune_border: Zero::zero(), + } + } + /// Returns prune border. #[cfg(test)] - pub fn prune_border(&self) -> u64 { + pub fn prune_border(&self) -> P::Number { self.prune_border } @@ -66,11 +83,11 @@ impl QueuedHeaders { .values() .fold(0, |total, headers| total + headers.len()), HeaderStatus::Orphan => self.orphan.values().fold(0, |total, headers| total + headers.len()), - HeaderStatus::MaybeReceipts => self - .maybe_receipts + HeaderStatus::MaybeExtra => self + .maybe_extra .values() .fold(0, |total, headers| total + headers.len()), - HeaderStatus::Receipts => self.receipts.values().fold(0, |total, headers| total + headers.len()), + HeaderStatus::Extra => self.extra.values().fold(0, |total, headers| total + headers.len()), HeaderStatus::Ready => self.ready.values().fold(0, |total, headers| total + headers.len()), HeaderStatus::Submitted => self.submitted.values().fold(0, |total, headers| total + headers.len()), } @@ -83,24 +100,24 @@ impl QueuedHeaders { .fold(0, |total, headers| total + headers.len()) + self.orphan.values().fold(0, |total, headers| total + headers.len()) + self - .maybe_receipts + .maybe_extra .values() .fold(0, |total, headers| total + headers.len()) - + self.receipts.values().fold(0, |total, headers| total + headers.len()) + + self.extra.values().fold(0, |total, headers| total + headers.len()) + self.ready.values().fold(0, |total, headers| total + headers.len()) } /// Returns number of best block in the queue. - pub fn best_queued_number(&self) -> u64 { + pub fn best_queued_number(&self) -> P::Number { std::cmp::max( - self.maybe_orphan.keys().next_back().cloned().unwrap_or(0), + self.maybe_orphan.keys().next_back().cloned().unwrap_or_else(Zero::zero), std::cmp::max( - self.orphan.keys().next_back().cloned().unwrap_or(0), + self.orphan.keys().next_back().cloned().unwrap_or_else(Zero::zero), std::cmp::max( - self.maybe_receipts.keys().next_back().cloned().unwrap_or(0), + self.maybe_extra.keys().next_back().cloned().unwrap_or_else(Zero::zero), std::cmp::max( - self.receipts.keys().next_back().cloned().unwrap_or(0), - self.ready.keys().next_back().cloned().unwrap_or(0), + self.extra.keys().next_back().cloned().unwrap_or_else(Zero::zero), + self.ready.keys().next_back().cloned().unwrap_or_else(Zero::zero), ), ), ), @@ -108,7 +125,7 @@ impl QueuedHeaders { } /// Returns synchronization status of the header. - pub fn status(&self, id: &HeaderId) -> HeaderStatus { + pub fn status(&self, id: &HeaderId<P::Hash, P::Number>) -> HeaderStatus { self.known_headers .get(&id.0) .and_then(|x| x.get(&id.1)) @@ -117,46 +134,61 @@ impl QueuedHeaders { } /// Get oldest header from given queue. - pub fn header(&self, status: HeaderStatus) -> Option<&QueuedHeader> { + pub fn header(&self, status: HeaderStatus) -> Option<&QueuedHeader<P>> { match status { HeaderStatus::Unknown | HeaderStatus::Synced => return None, HeaderStatus::MaybeOrphan => oldest_header(&self.maybe_orphan), HeaderStatus::Orphan => oldest_header(&self.orphan), - HeaderStatus::MaybeReceipts => oldest_header(&self.maybe_receipts), - HeaderStatus::Receipts => oldest_header(&self.receipts), + HeaderStatus::MaybeExtra => oldest_header(&self.maybe_extra), + HeaderStatus::Extra => oldest_header(&self.extra), HeaderStatus::Ready => oldest_header(&self.ready), HeaderStatus::Submitted => oldest_header(&self.submitted), } } /// Get oldest headers from given queue until functor will return false. - pub fn headers(&self, status: HeaderStatus, f: impl FnMut(&QueuedHeader) -> bool) -> Option<Vec<&QueuedHeader>> { + pub fn headers( + &self, + status: HeaderStatus, + f: impl FnMut(&QueuedHeader<P>) -> bool, + ) -> Option<Vec<&QueuedHeader<P>>> { match status { HeaderStatus::Unknown | HeaderStatus::Synced => return None, HeaderStatus::MaybeOrphan => oldest_headers(&self.maybe_orphan, f), HeaderStatus::Orphan => oldest_headers(&self.orphan, f), - HeaderStatus::MaybeReceipts => oldest_headers(&self.maybe_receipts, f), - HeaderStatus::Receipts => oldest_headers(&self.receipts, f), + HeaderStatus::MaybeExtra => oldest_headers(&self.maybe_extra, f), + HeaderStatus::Extra => oldest_headers(&self.extra, f), HeaderStatus::Ready => oldest_headers(&self.ready, f), HeaderStatus::Submitted => oldest_headers(&self.submitted, f), } } - /// Appends new header to the queue. - pub fn header_response(&mut self, header: Header) { - let id = (&header).into(); + /// Appends new header, received from the source node, to the queue. + pub fn header_response(&mut self, header: P::Header) { + let id = header.id(); let status = self.status(&id); if status != HeaderStatus::Unknown { - log::debug!(target: "bridge", "Ignoring new Ethereum header: {:?}. Status is {:?}.", id, status); + log::debug!( + target: "bridge", + "Ignoring new {} header: {:?}. Status is {:?}.", + P::SOURCE_NAME, + id, + status, + ); return; } if id.0 < self.prune_border { - log::debug!(target: "bridge", "Ignoring ancient new Ethereum header: {:?}.", id); + log::debug!( + target: "bridge", + "Ignoring ancient new {} header: {:?}.", + P::SOURCE_NAME, + id, + ); return; } - let parent_id = HeaderId(id.0 - 1, header.parent_hash); + let parent_id = header.parent_id(); let parent_status = self.status(&parent_id); let header = QueuedHeader::new(header); @@ -169,22 +201,28 @@ impl QueuedHeaders { insert_header(&mut self.orphan, id, header); HeaderStatus::Orphan } - HeaderStatus::MaybeReceipts - | HeaderStatus::Receipts + HeaderStatus::MaybeExtra + | HeaderStatus::Extra | HeaderStatus::Ready | HeaderStatus::Submitted | HeaderStatus::Synced => { - insert_header(&mut self.maybe_receipts, id, header); - HeaderStatus::MaybeReceipts + insert_header(&mut self.maybe_extra, id, header); + HeaderStatus::MaybeExtra } }; self.known_headers.entry(id.0).or_default().insert(id.1, status); - log::debug!(target: "bridge", "Queueing new Ethereum header: {:?}. Queue: {:?}.", id, status); + log::debug!( + target: "bridge", + "Queueing new {} header: {:?}. Queue: {:?}.", + P::SOURCE_NAME, + id, + status, + ); } - /// Receive Substrate best header. - pub fn substrate_best_header_response(&mut self, id: &HeaderId) { + /// Receive best header from the target node. + pub fn target_best_header_response(&mut self, id: &HeaderId<P::Hash, P::Number>) { // all ancestors of this header are now synced => let's remove them from // queues let mut current = *id; @@ -193,15 +231,21 @@ impl QueuedHeaders { HeaderStatus::Unknown => break, HeaderStatus::MaybeOrphan => remove_header(&mut self.maybe_orphan, ¤t), HeaderStatus::Orphan => remove_header(&mut self.orphan, ¤t), - HeaderStatus::MaybeReceipts => remove_header(&mut self.maybe_receipts, ¤t), - HeaderStatus::Receipts => remove_header(&mut self.receipts, ¤t), + HeaderStatus::MaybeExtra => remove_header(&mut self.maybe_extra, ¤t), + HeaderStatus::Extra => remove_header(&mut self.extra, ¤t), HeaderStatus::Ready => remove_header(&mut self.ready, ¤t), HeaderStatus::Submitted => remove_header(&mut self.submitted, ¤t), HeaderStatus::Synced => break, } .expect("header has a given status; given queue has the header; qed"); - log::debug!(target: "bridge", "Ethereum header {:?} is now {:?}", current, HeaderStatus::Synced); + log::debug!( + target: "bridge", + "{} header {:?} is now {:?}", + P::SOURCE_NAME, + current, + HeaderStatus::Synced, + ); *self .known_headers .entry(current.0) @@ -212,7 +256,13 @@ impl QueuedHeaders { } // remember that the header is synced - log::debug!(target: "bridge", "Ethereum header {:?} is now {:?}", id, HeaderStatus::Synced); + log::debug!( + target: "bridge", + "{} header {:?} is now {:?}", + P::SOURCE_NAME, + id, + HeaderStatus::Synced, + ); *self .known_headers .entry(id.0) @@ -221,20 +271,20 @@ impl QueuedHeaders { .or_insert(HeaderStatus::Synced) = HeaderStatus::Synced; // now let's move all descendants from maybe_orphan && orphan queues to - // maybe_receipts queue - move_header_descendants( + // maybe_extra queue + move_header_descendants::<P>( &mut [&mut self.maybe_orphan, &mut self.orphan], - &mut self.maybe_receipts, + &mut self.maybe_extra, &mut self.known_headers, - HeaderStatus::MaybeReceipts, + HeaderStatus::MaybeExtra, id, ); } - /// Receive Substrate response for MaybeOrphan request. - pub fn maybe_orphan_response(&mut self, id: &HeaderId, response: bool) { + /// Receive target node response for MaybeOrphan request. + pub fn maybe_orphan_response(&mut self, id: &HeaderId<P::Hash, P::Number>, response: bool) { if !response { - move_header_descendants( + move_header_descendants::<P>( &mut [&mut self.maybe_orphan], &mut self.orphan, &mut self.known_headers, @@ -244,25 +294,25 @@ impl QueuedHeaders { return; } - move_header_descendants( + move_header_descendants::<P>( &mut [&mut self.maybe_orphan, &mut self.orphan], - &mut self.maybe_receipts, + &mut self.maybe_extra, &mut self.known_headers, - HeaderStatus::MaybeReceipts, + HeaderStatus::MaybeExtra, &id, ); } - /// Receive Substrate response for MaybeReceipts request. - pub fn maybe_receipts_response(&mut self, id: &HeaderId, response: bool) { + /// Receive target node response for MaybeExtra request. + pub fn maybe_extra_response(&mut self, id: &HeaderId<P::Hash, P::Number>, response: bool) { let (destination_status, destination_queue) = if response { - (HeaderStatus::Receipts, &mut self.receipts) + (HeaderStatus::Extra, &mut self.extra) } else { (HeaderStatus::Ready, &mut self.ready) }; move_header( - &mut self.maybe_receipts, + &mut self.maybe_extra, destination_queue, &mut self.known_headers, destination_status, @@ -271,20 +321,21 @@ impl QueuedHeaders { ); } - /// Receive transactions receipts from Ethereum node. - pub fn receipts_response(&mut self, id: &HeaderId, receipts: Vec<Receipt>) { + /// Receive extra from source node. + pub fn extra_response(&mut self, id: &HeaderId<P::Hash, P::Number>, extra: P::Extra) { + // move header itself from extra to ready queue move_header( - &mut self.receipts, + &mut self.extra, &mut self.ready, &mut self.known_headers, HeaderStatus::Ready, id, - |header| header.set_receipts(receipts), + |header| header.set_extra(extra), ); } - /// When header is submitted to Substrate node. - pub fn headers_submitted(&mut self, ids: Vec<HeaderId>) { + /// When header is submitted to target node. + pub fn headers_submitted(&mut self, ids: Vec<HeaderId<P::Hash, P::Number>>) { for id in ids { move_header( &mut self.ready, @@ -298,18 +349,18 @@ impl QueuedHeaders { } /// Prune and never accep headers before this block. - pub fn prune(&mut self, prune_border: u64) { + pub fn prune(&mut self, prune_border: P::Number) { if prune_border <= self.prune_border { return; } prune_queue(&mut self.maybe_orphan, prune_border); prune_queue(&mut self.orphan, prune_border); - prune_queue(&mut self.maybe_receipts, prune_border); - prune_queue(&mut self.receipts, prune_border); + prune_queue(&mut self.maybe_extra, prune_border); + prune_queue(&mut self.extra, prune_border); prune_queue(&mut self.ready, prune_border); prune_queue(&mut self.submitted, prune_border); - prune_known_headers(&mut self.known_headers, prune_border); + prune_known_headers::<P>(&mut self.known_headers, prune_border); self.prune_border = prune_border; } @@ -317,22 +368,29 @@ impl QueuedHeaders { pub fn clear(&mut self) { self.maybe_orphan.clear(); self.orphan.clear(); - self.maybe_receipts.clear(); - self.receipts.clear(); + self.maybe_extra.clear(); + self.extra.clear(); self.ready.clear(); self.submitted.clear(); self.known_headers.clear(); - self.prune_border = 0; + self.prune_border = Zero::zero(); } } /// Insert header to the queue. -fn insert_header(queue: &mut HeadersQueue, id: HeaderId, header: QueuedHeader) { +fn insert_header<P: HeadersSyncPipeline>( + queue: &mut HeadersQueue<P>, + id: HeaderId<P::Hash, P::Number>, + header: QueuedHeader<P>, +) { queue.entry(id.0).or_default().insert(id.1, header); } /// Remove header from the queue. -fn remove_header(queue: &mut HeadersQueue, id: &HeaderId) -> Option<QueuedHeader> { +fn remove_header<P: HeadersSyncPipeline>( + queue: &mut HeadersQueue<P>, + id: &HeaderId<P::Hash, P::Number>, +) -> Option<QueuedHeader<P>> { let mut headers_at = match queue.entry(id.0) { BTreeMapEntry::Occupied(headers_at) => headers_at, BTreeMapEntry::Vacant(_) => return None, @@ -346,39 +404,45 @@ fn remove_header(queue: &mut HeadersQueue, id: &HeaderId) -> Option<QueuedHeader } /// Move header from source to destination queue. -fn move_header( - source_queue: &mut HeadersQueue, - destination_queue: &mut HeadersQueue, - known_headers: &mut KnownHeaders, +/// +/// Returns ID of parent header, if header has been moved, or None otherwise. +fn move_header<P: HeadersSyncPipeline>( + source_queue: &mut HeadersQueue<P>, + destination_queue: &mut HeadersQueue<P>, + known_headers: &mut KnownHeaders<P>, destination_status: HeaderStatus, - id: &HeaderId, - prepare: impl FnOnce(QueuedHeader) -> QueuedHeader, -) { + id: &HeaderId<P::Hash, P::Number>, + prepare: impl FnOnce(QueuedHeader<P>) -> QueuedHeader<P>, +) -> Option<HeaderId<P::Hash, P::Number>> { let header = match remove_header(source_queue, id) { Some(header) => prepare(header), - None => return, + None => return None, }; + let parent_id = header.header().parent_id(); known_headers.entry(id.0).or_default().insert(id.1, destination_status); destination_queue.entry(id.0).or_default().insert(id.1, header); log::debug!( target: "bridge", - "Ethereum header {:?} is now {:?}", + "{} header {:?} is now {:?}", + P::SOURCE_NAME, id, destination_status, ); + + Some(parent_id) } /// Move all descendant headers from the source to destination queue. -fn move_header_descendants( - source_queues: &mut [&mut HeadersQueue], - destination_queue: &mut HeadersQueue, - known_headers: &mut KnownHeaders, +fn move_header_descendants<P: HeadersSyncPipeline>( + source_queues: &mut [&mut HeadersQueue<P>], + destination_queue: &mut HeadersQueue<P>, + known_headers: &mut KnownHeaders<P>, destination_status: HeaderStatus, - id: &HeaderId, + id: &HeaderId<P::Hash, P::Number>, ) { - let mut current_number = id.0 + 1; + let mut current_number = id.0 + One::one(); let mut current_parents = HashSet::new(); current_parents.insert(id.1); @@ -398,7 +462,7 @@ fn move_header_descendants( HashMapEntry::Vacant(_) => unreachable!("iterating existing keys; qed"), }; - if current_parents.contains(&entry.get().header().parent_hash) { + if current_parents.contains(&entry.get().header().parent_id().1) { let header_to_move = entry.remove(); let header_to_move_id = header_to_move.id(); known_headers @@ -409,7 +473,8 @@ fn move_header_descendants( log::debug!( target: "bridge", - "Ethereum header {:?} is now {:?}", + "{} header {:?} is now {:?}", + P::SOURCE_NAME, header_to_move_id, destination_status, ); @@ -428,18 +493,21 @@ fn move_header_descendants( .extend(headers_to_move.into_iter().map(|(id, h)| (id.1, h))) } - current_number = current_number + 1; + current_number = current_number + One::one(); std::mem::swap(&mut current_parents, &mut next_parents); } } /// Return oldest header from the queue. -fn oldest_header(queue: &HeadersQueue) -> Option<&QueuedHeader> { +fn oldest_header<P: HeadersSyncPipeline>(queue: &HeadersQueue<P>) -> Option<&QueuedHeader<P>> { queue.values().flat_map(|h| h.values()).next() } /// Return oldest headers from the queue until functor will return false. -fn oldest_headers(queue: &HeadersQueue, mut f: impl FnMut(&QueuedHeader) -> bool) -> Option<Vec<&QueuedHeader>> { +fn oldest_headers<P: HeadersSyncPipeline>( + queue: &HeadersQueue<P>, + mut f: impl FnMut(&QueuedHeader<P>) -> bool, +) -> Option<Vec<&QueuedHeader<P>>> { let result = queue .values() .flat_map(|h| h.values()) @@ -453,12 +521,12 @@ fn oldest_headers(queue: &HeadersQueue, mut f: impl FnMut(&QueuedHeader) -> bool } /// Forget all headers with number less than given. -fn prune_queue(queue: &mut HeadersQueue, prune_border: u64) { +fn prune_queue<P: HeadersSyncPipeline>(queue: &mut HeadersQueue<P>, prune_border: P::Number) { *queue = queue.split_off(&prune_border); } /// Forget all known headers with number less than given. -fn prune_known_headers(known_headers: &mut KnownHeaders, prune_border: u64) { +fn prune_known_headers<P: HeadersSyncPipeline>(known_headers: &mut KnownHeaders<P>, prune_border: P::Number) { let new_known_headers = known_headers.split_off(&prune_border); for (pruned_number, pruned_headers) in &*known_headers { for pruned_hash in pruned_headers.keys() { @@ -471,8 +539,10 @@ fn prune_known_headers(known_headers: &mut KnownHeaders, prune_border: u64) { #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::ethereum_types::{EthereumHeaderId, EthereumHeadersSyncPipeline, Header, H256}; + use crate::sync_types::{HeaderId, QueuedHeader}; - pub(crate) fn header(number: u64) -> QueuedHeader { + pub(crate) fn header(number: u64) -> QueuedHeader<EthereumHeadersSyncPipeline> { QueuedHeader::new(Header { number: Some(number.into()), hash: Some(hash(number)), @@ -485,85 +555,88 @@ pub(crate) mod tests { H256::from_low_u64_le(number) } - pub(crate) fn id(number: u64) -> HeaderId { + pub(crate) fn id(number: u64) -> EthereumHeaderId { HeaderId(number, hash(number)) } #[test] fn total_headers_works() { // total headers just sums up number of headers in every queue - let mut queue = QueuedHeaders::default(); - queue - .maybe_orphan - .entry(1) - .or_default() - .insert(hash(1), Default::default()); - queue - .maybe_orphan - .entry(1) - .or_default() - .insert(hash(2), Default::default()); - queue - .maybe_orphan - .entry(2) - .or_default() - .insert(hash(3), Default::default()); - queue.orphan.entry(3).or_default().insert(hash(4), Default::default()); - queue - .maybe_receipts - .entry(4) - .or_default() - .insert(hash(5), Default::default()); - queue.ready.entry(5).or_default().insert(hash(6), Default::default()); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); + queue.maybe_orphan.entry(1).or_default().insert( + hash(1), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); + queue.maybe_orphan.entry(1).or_default().insert( + hash(2), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); + queue.maybe_orphan.entry(2).or_default().insert( + hash(3), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); + queue.orphan.entry(3).or_default().insert( + hash(4), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); + queue.maybe_extra.entry(4).or_default().insert( + hash(5), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); + queue.ready.entry(5).or_default().insert( + hash(6), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); assert_eq!(queue.total_headers(), 6); } #[test] fn best_queued_number_works() { // initially there are headers in MaybeOrphan queue only - let mut queue = QueuedHeaders::default(); - queue - .maybe_orphan - .entry(1) - .or_default() - .insert(hash(1), Default::default()); - queue - .maybe_orphan - .entry(1) - .or_default() - .insert(hash(2), Default::default()); - queue - .maybe_orphan - .entry(3) - .or_default() - .insert(hash(3), Default::default()); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); + queue.maybe_orphan.entry(1).or_default().insert( + hash(1), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); + queue.maybe_orphan.entry(1).or_default().insert( + hash(2), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); + queue.maybe_orphan.entry(3).or_default().insert( + hash(3), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); assert_eq!(queue.best_queued_number(), 3); // and then there's better header in Orphan - queue.orphan.entry(10).or_default().insert(hash(10), Default::default()); + queue.orphan.entry(10).or_default().insert( + hash(10), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); assert_eq!(queue.best_queued_number(), 10); - // and then there's better header in MaybeReceipts - queue - .maybe_receipts - .entry(20) - .or_default() - .insert(hash(20), Default::default()); + // and then there's better header in MaybeExtra + queue.maybe_extra.entry(20).or_default().insert( + hash(20), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); assert_eq!(queue.best_queued_number(), 20); // and then there's better header in Ready - queue.ready.entry(30).or_default().insert(hash(30), Default::default()); + queue.ready.entry(30).or_default().insert( + hash(30), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); assert_eq!(queue.best_queued_number(), 30); // and then there's better header in MaybeOrphan again - queue - .maybe_orphan - .entry(40) - .or_default() - .insert(hash(40), Default::default()); + queue.maybe_orphan.entry(40).or_default().insert( + hash(40), + QueuedHeader::<EthereumHeadersSyncPipeline>::new(Default::default()), + ); assert_eq!(queue.best_queued_number(), 40); } #[test] fn status_works() { // all headers are unknown initially - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); assert_eq!(queue.status(&id(10)), HeaderStatus::Unknown); // and status is read from the KnownHeaders queue @@ -577,7 +650,7 @@ pub(crate) mod tests { #[test] fn header_works() { // initially we have oldest header #10 - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue.maybe_orphan.entry(10).or_default().insert(hash(1), header(100)); assert_eq!( queue.header(HeaderStatus::MaybeOrphan).unwrap().header().hash.unwrap(), @@ -599,48 +672,48 @@ pub(crate) mod tests { #[test] fn header_response_works() { - // when parent is Synced, we insert to MaybeReceipts - let mut queue = QueuedHeaders::default(); + // when parent is Synced, we insert to MaybeExtra + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) .or_default() .insert(hash(100), HeaderStatus::Synced); queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeReceipts); + assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra); - // when parent is Ready, we insert to MaybeReceipts - let mut queue = QueuedHeaders::default(); + // when parent is Ready, we insert to MaybeExtra + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) .or_default() .insert(hash(100), HeaderStatus::Ready); queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeReceipts); + assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra); - // when parent is Receipts, we insert to MaybeReceipts - let mut queue = QueuedHeaders::default(); + // when parent is Receipts, we insert to MaybeExtra + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) .or_default() - .insert(hash(100), HeaderStatus::Receipts); + .insert(hash(100), HeaderStatus::Extra); queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeReceipts); + assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra); - // when parent is MaybeReceipts, we insert to MaybeReceipts - let mut queue = QueuedHeaders::default(); + // when parent is MaybeExtra, we insert to MaybeExtra + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) .or_default() - .insert(hash(100), HeaderStatus::MaybeReceipts); + .insert(hash(100), HeaderStatus::MaybeExtra); queue.header_response(header(101).header().clone()); - assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeReceipts); + assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeExtra); // when parent is Orphan, we insert to Orphan - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) @@ -650,7 +723,7 @@ pub(crate) mod tests { assert_eq!(queue.status(&id(101)), HeaderStatus::Orphan); // when parent is MaybeOrphan, we insert to MaybeOrphan - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) @@ -660,7 +733,7 @@ pub(crate) mod tests { assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeOrphan); // when parent is unknown, we insert to MaybeOrphan - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue.header_response(header(101).header().clone()); assert_eq!(queue.status(&id(101)), HeaderStatus::MaybeOrphan); } @@ -671,10 +744,10 @@ pub(crate) mod tests { // its best block to #100. At this time we have: // #100 in MaybeOrphan // #99 in Orphan - // #98 in MaybeReceipts + // #98 in MaybeExtra // #97 in Receipts // #96 in Ready - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) @@ -695,27 +768,27 @@ pub(crate) mod tests { .known_headers .entry(98) .or_default() - .insert(hash(98), HeaderStatus::MaybeReceipts); - queue.maybe_receipts.entry(98).or_default().insert(hash(98), header(98)); + .insert(hash(98), HeaderStatus::MaybeExtra); + queue.maybe_extra.entry(98).or_default().insert(hash(98), header(98)); queue .known_headers .entry(97) .or_default() - .insert(hash(97), HeaderStatus::Receipts); - queue.receipts.entry(97).or_default().insert(hash(97), header(97)); + .insert(hash(97), HeaderStatus::Extra); + queue.extra.entry(97).or_default().insert(hash(97), header(97)); queue .known_headers .entry(96) .or_default() .insert(hash(96), HeaderStatus::Ready); queue.ready.entry(96).or_default().insert(hash(96), header(96)); - queue.substrate_best_header_response(&id(100)); + queue.target_best_header_response(&id(100)); // then the #100 and all ancestors of #100 (#96..#99) are treated as synced assert!(queue.maybe_orphan.is_empty()); assert!(queue.orphan.is_empty()); - assert!(queue.maybe_receipts.is_empty()); - assert!(queue.receipts.is_empty()); + assert!(queue.maybe_extra.is_empty()); + assert!(queue.extra.is_empty()); assert!(queue.ready.is_empty()); assert_eq!(queue.known_headers.len(), 5); assert!(queue @@ -731,7 +804,7 @@ pub(crate) mod tests { // #101 in Orphan // #102 in MaybeOrphan // #103 in Orphan - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(101) @@ -754,15 +827,15 @@ pub(crate) mod tests { .or_default() .insert(hash(103), HeaderStatus::Orphan); queue.orphan.entry(103).or_default().insert(hash(103), header(103)); - queue.substrate_best_header_response(&id(100)); + queue.target_best_header_response(&id(100)); - // all descendants are moved to MaybeReceipts + // all descendants are moved to MaybeExtra assert!(queue.maybe_orphan.is_empty()); assert!(queue.orphan.is_empty()); - assert_eq!(queue.maybe_receipts.len(), 3); - assert_eq!(queue.known_headers[&101][&hash(101)], HeaderStatus::MaybeReceipts); - assert_eq!(queue.known_headers[&102][&hash(102)], HeaderStatus::MaybeReceipts); - assert_eq!(queue.known_headers[&103][&hash(103)], HeaderStatus::MaybeReceipts); + assert_eq!(queue.maybe_extra.len(), 3); + assert_eq!(queue.known_headers[&101][&hash(101)], HeaderStatus::MaybeExtra); + assert_eq!(queue.known_headers[&102][&hash(102)], HeaderStatus::MaybeExtra); + assert_eq!(queue.known_headers[&103][&hash(103)], HeaderStatus::MaybeExtra); } #[test] @@ -773,7 +846,7 @@ pub(crate) mod tests { // #102 in MaybeOrphan // and we have asked for MaybeOrphan status of #100.parent (i.e. #99) // and the response is: YES, #99 is known to the Substrate runtime - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) @@ -802,13 +875,13 @@ pub(crate) mod tests { .insert(hash(102), header(102)); queue.maybe_orphan_response(&id(99), true); - // then all headers (#100..#103) are moved to the MaybeReceipts queue + // then all headers (#100..#103) are moved to the MaybeExtra queue assert!(queue.orphan.is_empty()); assert!(queue.maybe_orphan.is_empty()); - assert_eq!(queue.maybe_receipts.len(), 3); - assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::MaybeReceipts); - assert_eq!(queue.known_headers[&101][&hash(101)], HeaderStatus::MaybeReceipts); - assert_eq!(queue.known_headers[&102][&hash(102)], HeaderStatus::MaybeReceipts); + assert_eq!(queue.maybe_extra.len(), 3); + assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::MaybeExtra); + assert_eq!(queue.known_headers[&101][&hash(101)], HeaderStatus::MaybeExtra); + assert_eq!(queue.known_headers[&102][&hash(102)], HeaderStatus::MaybeExtra); } #[test] @@ -818,7 +891,7 @@ pub(crate) mod tests { // #101 in MaybeOrphan // and we have asked for MaybeOrphan status of #100.parent (i.e. #99) // and the response is: NO, #99 is NOT known to the Substrate runtime - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) @@ -849,61 +922,53 @@ pub(crate) mod tests { } #[test] - fn positive_maybe_receipts_response_works() { - let mut queue = QueuedHeaders::default(); + fn positive_maybe_extra_response_works() { + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) .or_default() - .insert(hash(100), HeaderStatus::MaybeReceipts); - queue - .maybe_receipts - .entry(100) - .or_default() - .insert(hash(100), header(100)); - queue.maybe_receipts_response(&id(100), true); - assert!(queue.maybe_receipts.is_empty()); - assert_eq!(queue.receipts.len(), 1); - assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Receipts); + .insert(hash(100), HeaderStatus::MaybeExtra); + queue.maybe_extra.entry(100).or_default().insert(hash(100), header(100)); + queue.maybe_extra_response(&id(100), true); + assert!(queue.maybe_extra.is_empty()); + assert_eq!(queue.extra.len(), 1); + assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Extra); } #[test] - fn negative_maybe_receipts_response_works() { - let mut queue = QueuedHeaders::default(); + fn negative_maybe_extra_response_works() { + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) .or_default() - .insert(hash(100), HeaderStatus::MaybeReceipts); - queue - .maybe_receipts - .entry(100) - .or_default() - .insert(hash(100), header(100)); - queue.maybe_receipts_response(&id(100), false); - assert!(queue.maybe_receipts.is_empty()); + .insert(hash(100), HeaderStatus::MaybeExtra); + queue.maybe_extra.entry(100).or_default().insert(hash(100), header(100)); + queue.maybe_extra_response(&id(100), false); + assert!(queue.maybe_extra.is_empty()); assert_eq!(queue.ready.len(), 1); assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready); } #[test] fn receipts_response_works() { - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) .or_default() - .insert(hash(100), HeaderStatus::Receipts); - queue.receipts.entry(100).or_default().insert(hash(100), header(100)); - queue.receipts_response(&id(100), Vec::new()); - assert!(queue.receipts.is_empty()); + .insert(hash(100), HeaderStatus::Extra); + queue.extra.entry(100).or_default().insert(hash(100), header(100)); + queue.extra_response(&id(100), Vec::new()); + assert!(queue.extra.is_empty()); assert_eq!(queue.ready.len(), 1); assert_eq!(queue.known_headers[&100][&hash(100)], HeaderStatus::Ready); } #[test] fn header_submitted_works() { - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(100) @@ -917,7 +982,7 @@ pub(crate) mod tests { #[test] fn prune_works() { - let mut queue = QueuedHeaders::default(); + let mut queue = QueuedHeaders::<EthereumHeadersSyncPipeline>::new(); queue .known_headers .entry(104) @@ -938,18 +1003,14 @@ pub(crate) mod tests { .known_headers .entry(102) .or_default() - .insert(hash(102), HeaderStatus::MaybeReceipts); - queue - .maybe_receipts - .entry(102) - .or_default() - .insert(hash(102), header(102)); + .insert(hash(102), HeaderStatus::MaybeExtra); + queue.maybe_extra.entry(102).or_default().insert(hash(102), header(102)); queue .known_headers .entry(101) .or_default() - .insert(hash(101), HeaderStatus::Receipts); - queue.receipts.entry(101).or_default().insert(hash(101), header(101)); + .insert(hash(101), HeaderStatus::Extra); + queue.extra.entry(101).or_default().insert(hash(101), header(101)); queue .known_headers .entry(100) @@ -960,8 +1021,8 @@ pub(crate) mod tests { queue.prune(102); assert_eq!(queue.ready.len(), 0); - assert_eq!(queue.receipts.len(), 0); - assert_eq!(queue.maybe_receipts.len(), 1); + assert_eq!(queue.extra.len(), 0); + assert_eq!(queue.maybe_extra.len(), 1); assert_eq!(queue.orphan.len(), 1); assert_eq!(queue.maybe_orphan.len(), 1); assert_eq!(queue.known_headers.len(), 3); @@ -969,8 +1030,8 @@ pub(crate) mod tests { queue.prune(110); assert_eq!(queue.ready.len(), 0); - assert_eq!(queue.receipts.len(), 0); - assert_eq!(queue.maybe_receipts.len(), 0); + assert_eq!(queue.extra.len(), 0); + assert_eq!(queue.maybe_extra.len(), 0); assert_eq!(queue.orphan.len(), 0); assert_eq!(queue.maybe_orphan.len(), 0); assert_eq!(queue.known_headers.len(), 0); diff --git a/bridges/relays/ethereum/src/main.rs b/bridges/relays/ethereum/src/main.rs index fb276b480cf..8d897d51e42 100644 --- a/bridges/relays/ethereum/src/main.rs +++ b/bridges/relays/ethereum/src/main.rs @@ -17,12 +17,14 @@ #![recursion_limit = "1024"] mod ethereum_client; -mod ethereum_headers; -mod ethereum_sync; mod ethereum_sync_loop; mod ethereum_types; +mod headers; mod substrate_client; mod substrate_types; +mod sync; +mod sync_loop; +mod sync_types; use sp_core::crypto::Pair; use std::io::Write; @@ -30,13 +32,24 @@ use std::io::Write; fn main() { initialize(); - ethereum_sync_loop::run(match ethereum_sync_params() { - Ok(ethereum_sync_params) => ethereum_sync_params, - Err(err) => { - log::error!(target: "bridge", "Error parsing parameters: {}", err); + let yaml = clap::load_yaml!("cli.yml"); + let matches = clap::App::from_yaml(yaml).get_matches(); + match matches.subcommand() { + ("eth-to-sub", Some(eth_to_sub_matches)) => { + ethereum_sync_loop::run(match ethereum_sync_params(ð_to_sub_matches) { + Ok(ethereum_sync_params) => ethereum_sync_params, + Err(err) => { + log::error!(target: "bridge", "Error parsing parameters: {}", err); + return; + } + }); + } + ("", _) => { + log::error!(target: "bridge", "No subcommand specified"); return; } - }); + _ => unreachable!("all possible subcommands are checked above; qed"), + } } fn initialize() { @@ -76,10 +89,7 @@ fn initialize() { builder.init(); } -fn ethereum_sync_params() -> Result<ethereum_sync_loop::EthereumSyncParams, String> { - let yaml = clap::load_yaml!("cli.yml"); - let matches = clap::App::from_yaml(yaml).get_matches(); - +fn ethereum_sync_params(matches: &clap::ArgMatches) -> Result<ethereum_sync_loop::EthereumSyncParams, String> { let mut eth_sync_params = ethereum_sync_loop::EthereumSyncParams::default(); if let Some(eth_host) = matches.value_of("eth-host") { eth_sync_params.eth_host = eth_host.into(); @@ -100,16 +110,16 @@ fn ethereum_sync_params() -> Result<ethereum_sync_loop::EthereumSyncParams, Stri } match matches.value_of("sub-tx-mode") { - Some("signed") => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Signed, + Some("signed") => eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Signed, Some("unsigned") => { - eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Unsigned; + eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Unsigned; // tx pool won't accept too much unsigned transactions - eth_sync_params.max_headers_in_submitted_status = 10; + eth_sync_params.sync_params.max_headers_in_submitted_status = 10; } - Some("backup") => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Backup, + Some("backup") => eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Backup, Some(mode) => return Err(format!("Invalid sub-tx-mode: {}", mode)), - None => eth_sync_params.sub_tx_mode = ethereum_sync_loop::SubstrateTransactionMode::Signed, + None => eth_sync_params.sync_params.target_tx_mode = sync::TargetTransactionMode::Signed, } Ok(eth_sync_params) diff --git a/bridges/relays/ethereum/src/substrate_client.rs b/bridges/relays/ethereum/src/substrate_client.rs index 51e2b1886e5..40a9047c5ee 100644 --- a/bridges/relays/ethereum/src/substrate_client.rs +++ b/bridges/relays/ethereum/src/substrate_client.rs @@ -14,9 +14,9 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. -use crate::ethereum_sync_loop::MaybeConnectionError; -use crate::ethereum_types::{Bytes, HeaderId as EthereumHeaderId, QueuedHeader as QueuedEthereumHeader, H256}; +use crate::ethereum_types::{Bytes, EthereumHeaderId, QueuedEthereumHeader, H256}; use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts, TransactionHash}; +use crate::sync_types::{HeaderId, MaybeConnectionError, SourceHeader}; use codec::{Decode, Encode}; use jsonrpsee::common::Params; use jsonrpsee::raw::{RawClient, RawClientError}; @@ -29,8 +29,6 @@ use sp_runtime::traits::IdentifyAccount; pub struct Client { /// Substrate RPC client. rpc_client: RawClient<HttpTransportClient>, - /// Transactions signer. - signer: sp_core::sr25519::Pair, /// Genesis block hash. genesis_hash: Option<H256>, } @@ -58,11 +56,10 @@ impl MaybeConnectionError for Error { } /// Returns client that is able to call RPCs on Substrate node. -pub fn client(uri: &str, signer: sp_core::sr25519::Pair) -> Client { +pub fn client(uri: &str) -> Client { let transport = HttpTransportClient::new(uri); Client { rpc_client: RawClient::new(transport), - signer, genesis_hash: None, } } @@ -78,7 +75,7 @@ pub async fn best_ethereum_block(client: Client) -> (Client, Result<EthereumHead ]), ) .await; - (client, result.map(|(num, hash)| EthereumHeaderId(num, hash))) + (client, result.map(|(num, hash)| HeaderId(num, hash))) } /// Returns true if transactions receipts are required for Ethereum header submission. @@ -86,7 +83,7 @@ pub async fn ethereum_receipts_required( client: Client, header: QueuedEthereumHeader, ) -> (Client, Result<(EthereumHeaderId, bool), Error>) { - let id = header.id(); + let id = header.header().id(); let header = into_substrate_ethereum_header(header.header()); let encoded_header = header.encode(); let (client, receipts_required) = call_rpc( @@ -131,11 +128,12 @@ pub async fn ethereum_header_known( /// Submits Ethereum header to Substrate runtime. pub async fn submit_ethereum_headers( client: Client, + signer: sp_core::sr25519::Pair, headers: Vec<QueuedEthereumHeader>, sign_transactions: bool, ) -> (Client, Result<(Vec<TransactionHash>, Vec<EthereumHeaderId>), Error>) { match sign_transactions { - true => submit_signed_ethereum_headers(client, headers).await, + true => submit_signed_ethereum_headers(client, signer, headers).await, false => submit_unsigned_ethereum_headers(client, headers).await, } } @@ -143,6 +141,7 @@ pub async fn submit_ethereum_headers( /// Submits signed Ethereum header to Substrate runtime. pub async fn submit_signed_ethereum_headers( client: Client, + signer: sp_core::sr25519::Pair, headers: Vec<QueuedEthereumHeader>, ) -> (Client, Result<(Vec<TransactionHash>, Vec<EthereumHeaderId>), Error>) { let ids = headers.iter().map(|header| header.id()).collect(); @@ -158,15 +157,14 @@ pub async fn submit_signed_ethereum_headers( (client, genesis_hash) } }; - let account_id = client.signer.public().as_array_ref().clone().into(); + let account_id = signer.public().as_array_ref().clone().into(); let (client, nonce) = next_account_index(client, account_id).await; let nonce = match nonce { Ok(nonce) => nonce, Err(err) => return (client, Err(err)), }; - let transaction = create_signed_submit_transaction(headers, &client.signer, nonce, genesis_hash); - + let transaction = create_signed_submit_transaction(headers, &signer, nonce, genesis_hash); let encoded_transaction = transaction.encode(); let (client, transaction_hash) = call_rpc( client, diff --git a/bridges/relays/ethereum/src/ethereum_sync.rs b/bridges/relays/ethereum/src/sync.rs similarity index 56% rename from bridges/relays/ethereum/src/ethereum_sync.rs rename to bridges/relays/ethereum/src/sync.rs index b174d3819e4..768a7a7932b 100644 --- a/bridges/relays/ethereum/src/ethereum_sync.rs +++ b/bridges/relays/ethereum/src/sync.rs @@ -14,69 +14,97 @@ // You should have received a copy of the GNU General Public License // along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. -use crate::ethereum_headers::QueuedHeaders; -use crate::ethereum_sync_loop::{EthereumSyncParams, SubstrateTransactionMode}; -use crate::ethereum_types::{HeaderId, HeaderStatus, QueuedHeader}; -use crate::substrate_types::{into_substrate_ethereum_header, into_substrate_ethereum_receipts}; -use codec::Encode; +use crate::headers::QueuedHeaders; +use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, QueuedHeader}; +use num_traits::{One, Saturating}; -/// Ethereum headers synchronization context. +/// Common sync params. #[derive(Debug)] -pub struct HeadersSync { +pub struct HeadersSyncParams { + /// Maximal number of ethereum headers to pre-download. + pub max_future_headers_to_download: usize, + /// Maximal number of active (we believe) submit header transactions. + pub max_headers_in_submitted_status: usize, + /// Maximal number of headers in single submit request. + pub max_headers_in_single_submit: usize, + /// Maximal total headers size in single submit request. + pub max_headers_size_in_single_submit: usize, + /// We only may store and accept (from Ethereum node) headers that have + /// number >= than best_substrate_header.number - prune_depth. + pub prune_depth: u32, + /// Target transactions mode. + pub target_tx_mode: TargetTransactionMode, +} + +/// Target transaction mode. +#[derive(Debug, PartialEq)] +pub enum TargetTransactionMode { + /// Submit new headers using signed transactions. + Signed, + /// Submit new headers using unsigned transactions. + Unsigned, + /// Submit new headers using signed transactions, but only when we + /// believe that sync has stalled. + Backup, +} + +/// Headers synchronization context. +#[derive(Debug)] +pub struct HeadersSync<P: HeadersSyncPipeline> { /// Synchronization parameters. - params: EthereumSyncParams, - /// Best header number known to Ethereum node. - target_header_number: Option<u64>, - /// Best header known to Substrate node. - best_header: Option<HeaderId>, + params: HeadersSyncParams, + /// Best header number known to source node. + source_best_number: Option<P::Number>, + /// Best header known to target node. + target_best_header: Option<HeaderId<P::Hash, P::Number>>, /// Headers queue. - headers: QueuedHeaders, + headers: QueuedHeaders<P>, } -impl HeadersSync { - /// Creates new Ethereum headers synchronizer. - pub fn new(params: EthereumSyncParams) -> Self { +impl<P: HeadersSyncPipeline> HeadersSync<P> { + /// Creates new headers synchronizer. + pub fn new(params: HeadersSyncParams) -> Self { HeadersSync { + headers: QueuedHeaders::new(), params, - target_header_number: None, - best_header: None, - headers: Default::default(), + source_best_number: None, + target_best_header: None, } } /// Returns true if we have synced almost all known headers. pub fn is_almost_synced(&self) -> bool { - match self.target_header_number { - Some(target_header_number) => self - .best_header - .map(|best| target_header_number.saturating_sub(best.0) < 4) + match self.source_best_number { + Some(source_best_number) => self + .target_best_header + .map(|best| source_best_number.saturating_sub(best.0) < 4.into()) .unwrap_or(false), None => true, } } /// Returns synchronization status. - pub fn status(&self) -> (&Option<HeaderId>, &Option<u64>) { - (&self.best_header, &self.target_header_number) + pub fn status(&self) -> (&Option<HeaderId<P::Hash, P::Number>>, &Option<P::Number>) { + (&self.target_best_header, &self.source_best_number) } /// Returns reference to the headers queue. - pub fn headers(&self) -> &QueuedHeaders { + pub fn headers(&self) -> &QueuedHeaders<P> { &self.headers } /// Returns mutable reference to the headers queue. - pub fn headers_mut(&mut self) -> &mut QueuedHeaders { + pub fn headers_mut(&mut self) -> &mut QueuedHeaders<P> { &mut self.headers } - /// Select header that needs to be downloaded from the Ethereum node. - pub fn select_new_header_to_download(&self) -> Option<u64> { - // if we haven't received best header from Ethereum node yet, there's nothing we can download - let target_header_number = self.target_header_number.clone()?; + /// Select header that needs to be downloaded from the source node. + pub fn select_new_header_to_download(&self) -> Option<P::Number> { + // if we haven't received best header from source node yet, there's nothing we can download + let source_best_number = self.source_best_number.clone()?; - // if we haven't received known best header from Substrate node yet, there's nothing we can download - let best_header = self.best_header.as_ref()?; + // if we haven't received known best header from target node yet, there's nothing we can download + let target_best_header = self.target_best_header.as_ref()?; // if there's too many headers in the queue, stop downloading let in_memory_headers = self.headers.total_headers(); @@ -85,19 +113,19 @@ impl HeadersSync { } // we assume that there were no reorgs if we have already downloaded best header - let best_downloaded_number = std::cmp::max(self.headers.best_queued_number(), best_header.0); - if best_downloaded_number == target_header_number { + let best_downloaded_number = std::cmp::max(self.headers.best_queued_number(), target_best_header.0); + if best_downloaded_number == source_best_number { return None; } // download new header - Some(best_downloaded_number + 1) + Some(best_downloaded_number + One::one()) } - /// Select headers that need to be submitted to the Substrate node. - pub fn select_headers_to_submit(&self, stalled: bool) -> Option<Vec<&QueuedHeader>> { + /// Select headers that need to be submitted to the target node. + pub fn select_headers_to_submit(&self, stalled: bool) -> Option<Vec<&QueuedHeader<P>>> { // if we operate in backup mode, we only submit headers when sync has stalled - if self.params.sub_tx_mode == SubstrateTransactionMode::Backup && !stalled { + if self.params.target_tx_mode == TargetTransactionMode::Backup && !stalled { return None; } @@ -117,10 +145,7 @@ impl HeadersSync { return false; } - let encoded_size = into_substrate_ethereum_header(header.header()).encode().len() - + into_substrate_ethereum_receipts(header.receipts()) - .map(|receipts| receipts.encode().len()) - .unwrap_or(0); + let encoded_size = P::estimate_size(header); if total_headers != 0 && total_size + encoded_size > self.params.max_headers_size_in_single_submit { return false; } @@ -132,48 +157,72 @@ impl HeadersSync { }) } - /// Receive new target header number from the Ethereum node. - pub fn ethereum_best_header_number_response(&mut self, best_header_number: u64) { - log::debug!(target: "bridge", "Received best header number from Ethereum: {}", best_header_number); - self.target_header_number = Some(best_header_number); + /// Receive new target header number from the source node. + pub fn source_best_header_number_response(&mut self, best_header_number: P::Number) { + log::debug!( + target: "bridge", + "Received best header number from {} node: {}", + P::SOURCE_NAME, + best_header_number, + ); + self.source_best_number = Some(best_header_number); } - /// Receive new best header from the Substrate node. + /// Receive new best header from the target node. /// Returns true if it is different from the previous block known to us. - pub fn substrate_best_header_response(&mut self, best_header: HeaderId) -> bool { - log::debug!(target: "bridge", "Received best known header from Substrate: {:?}", best_header); + pub fn target_best_header_response(&mut self, best_header: HeaderId<P::Hash, P::Number>) -> bool { + log::debug!( + target: "bridge", + "Received best known header from {}: {:?}", + P::TARGET_NAME, + best_header, + ); // early return if it is still the same - if self.best_header == Some(best_header) { + if self.target_best_header == Some(best_header) { return false; } // remember that this header is now known to the Substrate runtime - self.headers.substrate_best_header_response(&best_header); + self.headers.target_best_header_response(&best_header); // prune ancient headers self.headers - .prune(best_header.0.saturating_sub(self.params.prune_depth)); + .prune(best_header.0.saturating_sub(self.params.prune_depth.into())); // finally remember the best header itself - self.best_header = Some(best_header); + self.target_best_header = Some(best_header); true } /// Restart synchronization. pub fn restart(&mut self) { - self.target_header_number = None; - self.best_header = None; + self.source_best_number = None; + self.target_best_header = None; self.headers.clear(); } } +impl Default for HeadersSyncParams { + fn default() -> Self { + HeadersSyncParams { + max_future_headers_to_download: 128, + max_headers_in_submitted_status: 128, + max_headers_in_single_submit: 32, + max_headers_size_in_single_submit: 131_072, + prune_depth: 4096, + target_tx_mode: TargetTransactionMode::Signed, + } + } +} + #[cfg(test)] mod tests { use super::*; - use crate::ethereum_headers::tests::{header, id}; - use crate::ethereum_types::{HeaderStatus, H256}; + use crate::ethereum_types::{EthereumHeadersSyncPipeline, H256}; + use crate::headers::tests::{header, id}; + use crate::sync_types::HeaderStatus; fn side_hash(number: u64) -> H256 { H256::from_low_u64_le(1000 + number) @@ -181,26 +230,26 @@ mod tests { #[test] fn select_new_header_to_download_works() { - let mut eth_sync = HeadersSync::new(Default::default()); + let mut eth_sync = HeadersSync::<EthereumHeadersSyncPipeline>::new(Default::default()); // both best && target headers are unknown assert_eq!(eth_sync.select_new_header_to_download(), None); // best header is known, target header is unknown - eth_sync.best_header = Some(HeaderId(0, Default::default())); + eth_sync.target_best_header = Some(HeaderId(0, Default::default())); assert_eq!(eth_sync.select_new_header_to_download(), None); // target header is known, best header is unknown - eth_sync.best_header = None; - eth_sync.target_header_number = Some(100); + eth_sync.target_best_header = None; + eth_sync.source_best_number = Some(100); assert_eq!(eth_sync.select_new_header_to_download(), None); // when our best block has the same number as the target - eth_sync.best_header = Some(HeaderId(100, Default::default())); + eth_sync.target_best_header = Some(HeaderId(100, Default::default())); assert_eq!(eth_sync.select_new_header_to_download(), None); // when we actually need a new header - eth_sync.target_header_number = Some(101); + eth_sync.source_best_number = Some(101); assert_eq!(eth_sync.select_new_header_to_download(), Some(101)); // when there are too many headers scheduled for submitting @@ -216,18 +265,18 @@ mod tests { eth_sync.params.max_headers_in_submitted_status = 1; // ethereum reports best header #102 - eth_sync.ethereum_best_header_number_response(102); + eth_sync.source_best_header_number_response(102); // substrate reports that it is at block #100 - eth_sync.substrate_best_header_response(id(100)); + eth_sync.target_best_header_response(id(100)); // block #101 is downloaded first assert_eq!(eth_sync.select_new_header_to_download(), Some(101)); eth_sync.headers.header_response(header(101).header().clone()); // now header #101 is ready to be submitted - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(101))); - eth_sync.headers.maybe_receipts_response(&id(101), false); + assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(101))); + eth_sync.headers.maybe_extra_response(&id(101), false); assert_eq!(eth_sync.headers.header(HeaderStatus::Ready), Some(&header(101))); assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)])); @@ -240,20 +289,20 @@ mod tests { // we have nothing to submit because previous header hasn't been confirmed yet // (and we allow max 1 submit transaction in the wild) - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(102))); - eth_sync.headers.maybe_receipts_response(&id(102), false); + assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(102))); + eth_sync.headers.maybe_extra_response(&id(102), false); assert_eq!(eth_sync.headers.header(HeaderStatus::Ready), Some(&header(102))); assert_eq!(eth_sync.select_headers_to_submit(false), None); // substrate reports that it has imported block #101 - eth_sync.substrate_best_header_response(id(101)); + eth_sync.target_best_header_response(id(101)); // and we are ready to submit #102 assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(102)])); eth_sync.headers.headers_submitted(vec![id(102)]); // substrate reports that it has imported block #102 - eth_sync.substrate_best_header_response(id(102)); + eth_sync.target_best_header_response(id(102)); // and we have nothing to download assert_eq!(eth_sync.select_new_header_to_download(), None); @@ -264,10 +313,10 @@ mod tests { let mut eth_sync = HeadersSync::new(Default::default()); // ethereum reports best header #102 - eth_sync.ethereum_best_header_number_response(102); + eth_sync.source_best_header_number_response(102); // substrate reports that it is at block #100, but it isn't part of best chain - eth_sync.substrate_best_header_response(HeaderId(100, side_hash(100))); + eth_sync.target_best_header_response(HeaderId(100, side_hash(100))); // block #101 is downloaded first assert_eq!(eth_sync.select_new_header_to_download(), Some(101)); @@ -296,40 +345,40 @@ mod tests { eth_sync.headers.maybe_orphan_response(&id(99), true); // and we are ready to submit #100 - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(100))); - eth_sync.headers.maybe_receipts_response(&id(100), false); + assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(100))); + eth_sync.headers.maybe_extra_response(&id(100), false); assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(100)])); eth_sync.headers.headers_submitted(vec![id(100)]); // and we are ready to submit #101 - assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeReceipts), Some(&header(101))); - eth_sync.headers.maybe_receipts_response(&id(101), false); + assert_eq!(eth_sync.headers.header(HeaderStatus::MaybeExtra), Some(&header(101))); + eth_sync.headers.maybe_extra_response(&id(101), false); assert_eq!(eth_sync.select_headers_to_submit(false), Some(vec![&header(101)])); eth_sync.headers.headers_submitted(vec![id(101)]); } #[test] - fn pruning_happens_on_substrate_best_header_response() { - let mut eth_sync = HeadersSync::new(Default::default()); + fn pruning_happens_on_target_best_header_response() { + let mut eth_sync = HeadersSync::<EthereumHeadersSyncPipeline>::new(Default::default()); eth_sync.params.prune_depth = 50; - eth_sync.substrate_best_header_response(id(100)); + eth_sync.target_best_header_response(id(100)); assert_eq!(eth_sync.headers.prune_border(), 50); } #[test] fn only_submitting_headers_in_backup_mode_when_stalled() { let mut eth_sync = HeadersSync::new(Default::default()); - eth_sync.params.sub_tx_mode = SubstrateTransactionMode::Backup; + eth_sync.params.target_tx_mode = TargetTransactionMode::Backup; // ethereum reports best header #102 - eth_sync.ethereum_best_header_number_response(102); + eth_sync.source_best_header_number_response(102); // substrate reports that it is at block #100 - eth_sync.substrate_best_header_response(id(100)); + eth_sync.target_best_header_response(id(100)); // block #101 is downloaded first eth_sync.headers.header_response(header(101).header().clone()); - eth_sync.headers.maybe_receipts_response(&id(101), false); + eth_sync.headers.maybe_extra_response(&id(101), false); // ensure that headers are not submitted when sync is not stalled assert_eq!(eth_sync.select_headers_to_submit(false), None); diff --git a/bridges/relays/ethereum/src/sync_loop.rs b/bridges/relays/ethereum/src/sync_loop.rs new file mode 100644 index 00000000000..803cfaf0c23 --- /dev/null +++ b/bridges/relays/ethereum/src/sync_loop.rs @@ -0,0 +1,464 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. + +use crate::sync::HeadersSyncParams; +use crate::sync_types::{HeaderId, HeaderStatus, HeadersSyncPipeline, MaybeConnectionError, QueuedHeader}; +use futures::{future::FutureExt, stream::StreamExt}; +use num_traits::Saturating; +use std::future::Future; + +/// When we submit headers to target node, but see no updates of best +/// source block known to target node during STALL_SYNC_TIMEOUT_MS milliseconds, +/// we consider that our headers are rejected because there has been reorg in target chain. +/// This reorg could invalidate our knowledge about sync process (i.e. we have asked if +/// HeaderA is known to target, but then reorg happened and the answer is different +/// now) => we need to reset sync. +/// The other option is to receive **EVERY** best target header and check if it is +/// direct child of previous best header. But: (1) subscription doesn't guarantee that +/// the subscriber will receive every best header (2) reorg won't always lead to sync +/// stall and restart is a heavy operation (we forget all in-memory headers). +const STALL_SYNC_TIMEOUT_MS: u64 = 30_000; +/// Delay (in milliseconds) after we have seen update of best source header at target node, +/// for us to treat sync stalled. ONLY when relay operates in backup mode. +const BACKUP_STALL_SYNC_TIMEOUT_MS: u64 = 5 * 60_000; +/// Delay (in milliseconds) after connection-related error happened before we'll try +/// reconnection again. +const CONNECTION_ERROR_DELAY_MS: u64 = 10_000; + +/// Source client trait. +pub trait SourceClient<P: HeadersSyncPipeline>: Sized { + /// Type of error this clients returns. + type Error: std::fmt::Debug + MaybeConnectionError; + /// Future that returns best block number. + type BestBlockNumberFuture: Future<Output = (Self, Result<P::Number, Self::Error>)>; + /// Future that returns header by hash. + type HeaderByHashFuture: Future<Output = (Self, Result<P::Header, Self::Error>)>; + /// Future that returns header by number. + type HeaderByNumberFuture: Future<Output = (Self, Result<P::Header, Self::Error>)>; + /// Future that returns extra data associated with header. + type HeaderExtraFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, P::Extra), Self::Error>)>; + + /// Get best block number. + fn best_block_number(self) -> Self::BestBlockNumberFuture; + /// Get header by hash. + fn header_by_hash(self, hash: P::Hash) -> Self::HeaderByHashFuture; + /// Get canonical header by number. + fn header_by_number(self, number: P::Number) -> Self::HeaderByNumberFuture; + /// Get extra data by header hash. + fn header_extra(self, id: HeaderId<P::Hash, P::Number>, header: &P::Header) -> Self::HeaderExtraFuture; +} + +/// Target client trait. +pub trait TargetClient<P: HeadersSyncPipeline>: Sized { + /// Type of error this clients returns. + type Error: std::fmt::Debug + MaybeConnectionError; + /// Future that returns best header id. + type BestHeaderIdFuture: Future<Output = (Self, Result<HeaderId<P::Hash, P::Number>, Self::Error>)>; + /// Future that returns known header check result. + type IsKnownHeaderFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, bool), Self::Error>)>; + /// Future that returns extra check result. + type RequiresExtraFuture: Future<Output = (Self, Result<(HeaderId<P::Hash, P::Number>, bool), Self::Error>)>; + /// Future that returns header submission result. + type SubmitHeadersFuture: Future<Output = (Self, Result<Vec<HeaderId<P::Hash, P::Number>>, Self::Error>)>; + + /// Returns ID of best header known to the target node. + fn best_header_id(self) -> Self::BestHeaderIdFuture; + /// Returns true if header is known to the target node. + fn is_known_header(self, id: HeaderId<P::Hash, P::Number>) -> Self::IsKnownHeaderFuture; + /// Returns true if header requires extra data to be submitted. + fn requires_extra(self, header: &QueuedHeader<P>) -> Self::RequiresExtraFuture; + /// Submit headers. + fn submit_headers(self, headers: Vec<QueuedHeader<P>>) -> Self::SubmitHeadersFuture; +} + +/// Run headers synchronization. +pub fn run<P: HeadersSyncPipeline>( + source_client: impl SourceClient<P>, + source_tick_ms: u64, + target_client: impl TargetClient<P>, + target_tick_ms: u64, + sync_params: HeadersSyncParams, +) { + let mut local_pool = futures::executor::LocalPool::new(); + let mut progress_context = (std::time::Instant::now(), None, None); + + local_pool.run_until(async move { + let mut sync = crate::sync::HeadersSync::<P>::new(sync_params); + let mut stall_countdown = None; + let mut last_update_time = std::time::Instant::now(); + + let mut source_maybe_client = None; + let mut source_best_block_number_required = false; + let source_best_block_number_future = source_client.best_block_number().fuse(); + let source_new_header_future = futures::future::Fuse::terminated(); + let source_orphan_header_future = futures::future::Fuse::terminated(); + let source_extra_future = futures::future::Fuse::terminated(); + let source_go_offline_future = futures::future::Fuse::terminated(); + let source_tick_stream = interval(source_tick_ms).fuse(); + + let mut target_maybe_client = None; + let mut target_best_block_required = false; + let target_best_block_future = target_client.best_header_id().fuse(); + let target_extra_check_future = futures::future::Fuse::terminated(); + let target_existence_status_future = futures::future::Fuse::terminated(); + let target_submit_header_future = futures::future::Fuse::terminated(); + let target_go_offline_future = futures::future::Fuse::terminated(); + let target_tick_stream = interval(target_tick_ms).fuse(); + + futures::pin_mut!( + source_best_block_number_future, + source_new_header_future, + source_orphan_header_future, + source_extra_future, + source_go_offline_future, + source_tick_stream, + target_best_block_future, + target_extra_check_future, + target_existence_status_future, + target_submit_header_future, + target_go_offline_future, + target_tick_stream + ); + + loop { + futures::select! { + (source_client, source_best_block_number) = source_best_block_number_future => { + source_best_block_number_required = false; + + process_future_result( + &mut source_maybe_client, + source_client, + source_best_block_number, + |source_best_block_number| sync.source_best_header_number_response(source_best_block_number), + &mut source_go_offline_future, + |source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client), + || format!("Error retrieving best header number from {}", P::SOURCE_NAME), + ); + }, + (source_client, source_new_header) = source_new_header_future => { + process_future_result( + &mut source_maybe_client, + source_client, + source_new_header, + |source_new_header| sync.headers_mut().header_response(source_new_header), + &mut source_go_offline_future, + |source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client), + || format!("Error retrieving header from {} node", P::SOURCE_NAME), + ); + }, + (source_client, source_orphan_header) = source_orphan_header_future => { + process_future_result( + &mut source_maybe_client, + source_client, + source_orphan_header, + |source_orphan_header| sync.headers_mut().header_response(source_orphan_header), + &mut source_go_offline_future, + |source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client), + || format!("Error retrieving orphan header from {} node", P::SOURCE_NAME), + ); + }, + (source_client, source_extra) = source_extra_future => { + process_future_result( + &mut source_maybe_client, + source_client, + source_extra, + |(header, extra)| sync.headers_mut().extra_response(&header, extra), + &mut source_go_offline_future, + |source_client| delay(CONNECTION_ERROR_DELAY_MS, source_client), + || format!("Error retrieving extra data from {} node", P::SOURCE_NAME), + ); + }, + source_client = source_go_offline_future => { + source_maybe_client = Some(source_client); + }, + _ = source_tick_stream.next() => { + if sync.is_almost_synced() { + source_best_block_number_required = true; + } + }, + (target_client, target_best_block) = target_best_block_future => { + target_best_block_required = false; + + process_future_result( + &mut target_maybe_client, + target_client, + target_best_block, + |target_best_block| { + let head_updated = sync.target_best_header_response(target_best_block); + if head_updated { + last_update_time = std::time::Instant::now(); + } + match head_updated { + // IF head is updated AND there are still our transactions: + // => restart stall countdown timer + true if sync.headers().headers_in_status(HeaderStatus::Submitted) != 0 => + stall_countdown = Some(std::time::Instant::now()), + // IF head is updated AND there are no our transactions: + // => stop stall countdown timer + true => stall_countdown = None, + // IF head is not updated AND stall countdown is not yet completed + // => do nothing + false if stall_countdown + .map(|stall_countdown| std::time::Instant::now() - stall_countdown < + std::time::Duration::from_millis(STALL_SYNC_TIMEOUT_MS)) + .unwrap_or(true) + => (), + // IF head is not updated AND stall countdown has completed + // => restart sync + false => { + log::info!( + target: "bridge", + "Possible {} fork detected. Restarting {} headers synchronization.", + P::TARGET_NAME, + P::SOURCE_NAME, + ); + stall_countdown = None; + sync.restart(); + }, + } + }, + &mut target_go_offline_future, + |target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client), + || format!("Error retrieving best known header from {} node", P::TARGET_NAME), + ); + }, + (target_client, target_existence_status) = target_existence_status_future => { + process_future_result( + &mut target_maybe_client, + target_client, + target_existence_status, + |(target_header, target_existence_status)| sync + .headers_mut() + .maybe_orphan_response(&target_header, target_existence_status), + &mut target_go_offline_future, + |target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client), + || format!("Error retrieving existence status from {} node", P::TARGET_NAME), + ); + }, + (target_client, target_submit_header_result) = target_submit_header_future => { + process_future_result( + &mut target_maybe_client, + target_client, + target_submit_header_result, + |submitted_headers| sync.headers_mut().headers_submitted(submitted_headers), + &mut target_go_offline_future, + |target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client), + || format!("Error submitting headers to {} node", P::TARGET_NAME), + ); + }, + (target_client, target_extra_check_result) = target_extra_check_future => { + process_future_result( + &mut target_maybe_client, + target_client, + target_extra_check_result, + |(header, extra_check_result)| sync + .headers_mut() + .maybe_extra_response(&header, extra_check_result), + &mut target_go_offline_future, + |target_client| delay(CONNECTION_ERROR_DELAY_MS, target_client), + || format!("Error retrieving receipts requirement from {} node", P::TARGET_NAME), + ); + }, + target_client = target_go_offline_future => { + target_maybe_client = Some(target_client); + }, + _ = target_tick_stream.next() => { + target_best_block_required = true; + }, + } + + // print progress + progress_context = print_sync_progress(progress_context, &sync); + + // if target client is available: wait, or call required target methods + if let Some(target_client) = target_maybe_client.take() { + // the priority is to: + // 1) get best block - it stops us from downloading/submitting new blocks + we call it rarely; + // 2) check if we need extra data from source - it stops us from downloading/submitting new blocks; + // 3) check existence - it stops us from submitting new blocks; + // 4) submit header + + if target_best_block_required { + log::debug!(target: "bridge", "Asking {} about best block", P::TARGET_NAME); + target_best_block_future.set(target_client.best_header_id().fuse()); + } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeExtra) { + log::debug!( + target: "bridge", + "Checking if header submission requires extra: {:?}", + header.id(), + ); + + target_extra_check_future.set(target_client.requires_extra(header).fuse()); + } else if let Some(header) = sync.headers().header(HeaderStatus::MaybeOrphan) { + // for MaybeOrphan we actually ask for parent' header existence + let parent_id = header.parent_id(); + + log::debug!( + target: "bridge", + "Asking {} node for existence of: {:?}", + P::TARGET_NAME, + parent_id, + ); + + target_existence_status_future.set(target_client.is_known_header(parent_id).fuse()); + } else if let Some(headers) = sync.select_headers_to_submit( + last_update_time.elapsed() > std::time::Duration::from_millis(BACKUP_STALL_SYNC_TIMEOUT_MS), + ) { + let ids = match headers.len() { + 1 => format!("{:?}", headers[0].id()), + 2 => format!("[{:?}, {:?}]", headers[0].id(), headers[1].id()), + len => format!("[{:?} ... {:?}]", headers[0].id(), headers[len - 1].id()), + }; + log::debug!( + target: "bridge", + "Submitting {} header(s) to {} node: {:?}", + headers.len(), + P::TARGET_NAME, + ids, + ); + + let headers = headers.into_iter().cloned().collect(); + target_submit_header_future.set(target_client.submit_headers(headers).fuse()); + + // remember that we have submitted some headers + if stall_countdown.is_none() { + stall_countdown = Some(std::time::Instant::now()); + } + } else { + target_maybe_client = Some(target_client); + } + } + + // if source client is available: wait, or call required source methods + if let Some(source_client) = source_maybe_client.take() { + // the priority is to: + // 1) get best block - it stops us from downloading new blocks + we call it rarely; + // 2) download extra data - it stops us from submitting new blocks; + // 3) download missing headers - it stops us from downloading/submitting new blocks; + // 4) downloading new headers + + if source_best_block_number_required { + log::debug!(target: "bridge", "Asking {} node about best block number", P::SOURCE_NAME); + source_best_block_number_future.set(source_client.best_block_number().fuse()); + } else if let Some(header) = sync.headers().header(HeaderStatus::Extra) { + let id = header.id(); + log::debug!( + target: "bridge", + "Retrieving extra data for header: {:?}", + id, + ); + source_extra_future.set(source_client.header_extra(id, header.header()).fuse()); + } else if let Some(header) = sync.headers().header(HeaderStatus::Orphan) { + // for Orphan we actually ask for parent' header + let parent_id = header.parent_id(); + + log::debug!( + target: "bridge", + "Going to download orphan header from {} node: {:?}", + P::SOURCE_NAME, + parent_id, + ); + + source_orphan_header_future.set(source_client.header_by_hash(parent_id.1).fuse()); + } else if let Some(id) = sync.select_new_header_to_download() { + log::debug!( + target: "bridge", + "Going to download new header from {} node: {:?}", + P::SOURCE_NAME, + id, + ); + + source_new_header_future.set(source_client.header_by_number(id).fuse()); + } else { + source_maybe_client = Some(source_client); + } + } + } + }); +} + +/// Future that resolves into given value after given timeout. +async fn delay<T>(timeout_ms: u64, retval: T) -> T { + async_std::task::sleep(std::time::Duration::from_millis(timeout_ms)).await; + retval +} + +/// Stream that emits item every `timeout_ms` milliseconds. +fn interval(timeout_ms: u64) -> impl futures::Stream<Item = ()> { + futures::stream::unfold((), move |_| async move { + delay(timeout_ms, ()).await; + Some(((), ())) + }) +} + +/// Process result of the future that may have been caused by connection failure. +fn process_future_result<TClient, TResult, TError, TGoOfflineFuture>( + maybe_client: &mut Option<TClient>, + client: TClient, + result: Result<TResult, TError>, + on_success: impl FnOnce(TResult), + go_offline_future: &mut std::pin::Pin<&mut futures::future::Fuse<TGoOfflineFuture>>, + go_offline: impl FnOnce(TClient) -> TGoOfflineFuture, + error_pattern: impl FnOnce() -> String, +) where + TError: std::fmt::Debug + MaybeConnectionError, + TGoOfflineFuture: FutureExt, +{ + match result { + Ok(result) => { + *maybe_client = Some(client); + on_success(result); + } + Err(error) => { + if error.is_connection_error() { + go_offline_future.set(go_offline(client).fuse()); + } else { + *maybe_client = Some(client); + } + + log::error!(target: "bridge", "{}: {:?}", error_pattern(), error); + } + } +} + +/// Print synchronization progress. +fn print_sync_progress<P: HeadersSyncPipeline>( + progress_context: (std::time::Instant, Option<P::Number>, Option<P::Number>), + eth_sync: &crate::sync::HeadersSync<P>, +) -> (std::time::Instant, Option<P::Number>, Option<P::Number>) { + let (prev_time, prev_best_header, prev_target_header) = progress_context; + let now_time = std::time::Instant::now(); + let (now_best_header, now_target_header) = eth_sync.status(); + + let need_update = now_time - prev_time > std::time::Duration::from_secs(10) + || match (prev_best_header, now_best_header) { + (Some(prev_best_header), Some(now_best_header)) => { + now_best_header.0.saturating_sub(prev_best_header) > 10.into() + } + _ => false, + }; + if !need_update { + return (prev_time, prev_best_header, prev_target_header); + } + + log::info!( + target: "bridge", + "Synced {:?} of {:?} headers", + now_best_header.map(|id| id.0), + now_target_header, + ); + (now_time, now_best_header.clone().map(|id| id.0), *now_target_header) +} diff --git a/bridges/relays/ethereum/src/sync_types.rs b/bridges/relays/ethereum/src/sync_types.rs new file mode 100644 index 00000000000..9b9712c739e --- /dev/null +++ b/bridges/relays/ethereum/src/sync_types.rs @@ -0,0 +1,130 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Parity Bridges Common. + +// Parity Bridges Common is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Parity Bridges Common is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Parity Bridges Common. If not, see <http://www.gnu.org/licenses/>. + +/// Ethereum header Id. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct HeaderId<Hash, Number>(pub Number, pub Hash); + +/// Ethereum header synchronization status. +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum HeaderStatus { + /// Header is unknown. + Unknown, + /// Header is in MaybeOrphan queue. + MaybeOrphan, + /// Header is in Orphan queue. + Orphan, + /// Header is in MaybeExtra queue. + MaybeExtra, + /// Header is in Extra queue. + Extra, + /// Header is in Ready queue. + Ready, + /// Header has been recently submitted to the target node. + Submitted, + /// Header is known to the target node. + Synced, +} + +/// Error type that can signal connection errors. +pub trait MaybeConnectionError { + /// Returns true if error (maybe) represents connection error. + fn is_connection_error(&self) -> bool; +} + +/// Headers synchronization pipeline. +pub trait HeadersSyncPipeline: Clone + Copy { + /// Name of the headers source. + const SOURCE_NAME: &'static str; + /// Name of the headers target. + const TARGET_NAME: &'static str; + + /// Headers we're syncing are identified by this hash. + type Hash: Eq + Clone + Copy + std::fmt::Debug + std::fmt::Display + std::hash::Hash; + /// Headers we're syncing are identified by this number. + type Number: From<u32> + + Ord + + Clone + + Copy + + std::fmt::Debug + + std::fmt::Display + + std::ops::Add<Output = Self::Number> + + std::ops::Sub<Output = Self::Number> + + num_traits::Saturating + + num_traits::Zero + + num_traits::One; + /// Type of header that we're syncing. + type Header: Clone + std::fmt::Debug + SourceHeader<Self::Hash, Self::Number>; + /// Type of extra data for the header that we're receiving from the source node. + type Extra: Clone + std::fmt::Debug; + + /// Function used to convert from queued header to target header. + fn estimate_size(source: &QueuedHeader<Self>) -> usize; +} + +/// Header that we're receiving from source node. +pub trait SourceHeader<Hash, Number> { + /// Returns ID of header. + fn id(&self) -> HeaderId<Hash, Number>; + /// Returns ID of parent header. + fn parent_id(&self) -> HeaderId<Hash, Number>; +} + +/// Header how it's stored in the synchronization queue. +#[derive(Clone, Debug, Default)] +#[cfg_attr(test, derive(PartialEq))] +pub struct QueuedHeader<P: HeadersSyncPipeline> { + header: P::Header, + extra: Option<P::Extra>, +} + +impl<P: HeadersSyncPipeline> QueuedHeader<P> { + /// Creates new queued header. + pub fn new(header: P::Header) -> Self { + QueuedHeader { header, extra: None } + } + + /// Returns ID of header. + pub fn id(&self) -> HeaderId<P::Hash, P::Number> { + self.header.id() + } + + /// Returns ID of parent header. + pub fn parent_id(&self) -> HeaderId<P::Hash, P::Number> { + self.header.parent_id() + } + + /// Returns reference to header. + pub fn header(&self) -> &P::Header { + &self.header + } + + /// Returns reference to associated extra data. + pub fn extra(&self) -> &Option<P::Extra> { + &self.extra + } + + /// Extract header and extra from self. + pub fn extract(self) -> (P::Header, Option<P::Extra>) { + (self.header, self.extra) + } + + /// Set associated extra data. + pub fn set_extra(mut self, extra: P::Extra) -> Self { + self.extra = Some(extra); + self + } +} -- GitLab