ReadSyncedHeaders {
pub async fn next>(
self,
target_client: &mut TC,
) -> Result, Self> {
match target_client.synced_headers_finality_info(self.target_block_num).await {
Ok(synced_headers) =>
Ok(ReadContext { target_block_num: self.target_block_num, synced_headers }),
Err(e) => {
log::error!(
target: "bridge",
"Could not get {} headers synced to {} at block {}: {e:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
self.target_block_num
);
// Reconnect target client in case of a connection error.
handle_client_error(target_client, e).await;
Err(self)
},
}
}
}
/// Second step in the block checking state machine.
///
/// Reading the equivocation reporting context from the target chain.
#[cfg_attr(test, derive(Debug))]
pub struct ReadContext {
target_block_num: P::TargetNumber,
synced_headers: Vec>,
}
impl ReadContext {
pub async fn next>(
self,
target_client: &mut TC,
) -> Result>, Self> {
match EquivocationReportingContext::try_read_from_target::(
target_client,
self.target_block_num.saturating_sub(1.into()),
)
.await
{
Ok(Some(context)) => Ok(Some(FindEquivocations {
target_block_num: self.target_block_num,
synced_headers: self.synced_headers,
context,
})),
Ok(None) => Ok(None),
Err(e) => {
log::error!(
target: "bridge",
"Could not read {} `EquivocationReportingContext` from {} at block {}: {e:?}",
P::SOURCE_NAME,
P::TARGET_NAME,
self.target_block_num.saturating_sub(1.into()),
);
// Reconnect target client in case of a connection error.
handle_client_error(target_client, e).await;
Err(self)
},
}
}
}
/// Third step in the block checking state machine.
///
/// Searching for equivocations in the source headers synced with the target chain.
#[cfg_attr(test, derive(Debug))]
pub struct FindEquivocations {
target_block_num: P::TargetNumber,
synced_headers: Vec>,
context: EquivocationReportingContext,
}
impl FindEquivocations {
pub fn next(
mut self,
finality_proofs_buf: &mut FinalityProofsBuf
,
) -> Vec> {
let mut result = vec![];
for synced_header in self.synced_headers {
match P::EquivocationsFinder::find_equivocations(
&self.context.synced_verification_context,
&synced_header.finality_proof,
finality_proofs_buf.buf().as_slice(),
) {
Ok(equivocations) =>
if !equivocations.is_empty() {
result.push(ReportEquivocations {
source_block_hash: self.context.synced_header_hash,
equivocations,
})
},
Err(e) => {
log::error!(
target: "bridge",
"Could not search for equivocations in the finality proof \
for source header {:?} synced at target block {}: {e:?}",
synced_header.finality_proof.target_header_hash(),
self.target_block_num
);
},
};
finality_proofs_buf.prune(synced_header.finality_proof.target_header_number(), None);
self.context.update(synced_header);
}
result
}
}
/// Fourth step in the block checking state machine.
///
/// Reporting the detected equivocations (if any).
#[cfg_attr(test, derive(Debug))]
pub struct ReportEquivocations {
source_block_hash: P::Hash,
equivocations: Vec,
}
impl ReportEquivocations {
pub async fn next>(
mut self,
source_client: &mut SC,
reporter: &mut EquivocationsReporter<'_, P, SC>,
) -> Result<(), Self> {
let mut unprocessed_equivocations = vec![];
for equivocation in self.equivocations {
match reporter
.submit_report(source_client, self.source_block_hash, equivocation.clone())
.await
{
Ok(_) => {},
Err(e) => {
log::error!(
target: "bridge",
"Could not submit equivocation report to {} for {equivocation:?}: {e:?}",
P::SOURCE_NAME,
);
// Mark the equivocation as unprocessed
unprocessed_equivocations.push(equivocation);
// Reconnect source client in case of a connection error.
handle_client_error(source_client, e).await;
},
}
}
self.equivocations = unprocessed_equivocations;
if !self.equivocations.is_empty() {
return Err(self)
}
Ok(())
}
}
/// Block checking state machine.
#[cfg_attr(test, derive(Debug))]
pub enum BlockChecker {
ReadSyncedHeaders(ReadSyncedHeaders),
ReadContext(ReadContext
),
ReportEquivocations(Vec>),
}
impl BlockChecker {
pub fn new(target_block_num: P::TargetNumber) -> Self {
Self::ReadSyncedHeaders(ReadSyncedHeaders { target_block_num })
}
pub fn run<'a, SC: SourceClient
, TC: TargetClient
>(
self,
source_client: &'a mut SC,
target_client: &'a mut TC,
finality_proofs_buf: &'a mut FinalityProofsBuf
,
reporter: &'a mut EquivocationsReporter
,
) -> BoxFuture<'a, Result<(), Self>> {
async move {
match self {
Self::ReadSyncedHeaders(state) => {
let read_context =
state.next(target_client).await.map_err(Self::ReadSyncedHeaders)?;
Self::ReadContext(read_context)
.run(source_client, target_client, finality_proofs_buf, reporter)
.await
},
Self::ReadContext(state) => {
let maybe_find_equivocations =
state.next(target_client).await.map_err(Self::ReadContext)?;
let find_equivocations = match maybe_find_equivocations {
Some(find_equivocations) => find_equivocations,
None => return Ok(()),
};
Self::ReportEquivocations(find_equivocations.next(finality_proofs_buf))
.run(source_client, target_client, finality_proofs_buf, reporter)
.await
},
Self::ReportEquivocations(state) => {
let mut failures = vec![];
for report_equivocations in state {
if let Err(failure) =
report_equivocations.next(source_client, reporter).await
{
failures.push(failure);
}
}
if !failures.is_empty() {
return Err(Self::ReportEquivocations(failures))
}
Ok(())
},
}
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::mock::*;
use std::collections::HashMap;
impl PartialEq for ReadContext {
fn eq(&self, other: &Self) -> bool {
self.target_block_num == other.target_block_num &&
self.synced_headers == other.synced_headers
}
}
impl PartialEq for FindEquivocations {
fn eq(&self, other: &Self) -> bool {
self.target_block_num == other.target_block_num &&
self.synced_headers == other.synced_headers &&
self.context == other.context
}
}
impl PartialEq for ReportEquivocations {
fn eq(&self, other: &Self) -> bool {
self.source_block_hash == other.source_block_hash &&
self.equivocations == other.equivocations
}
}
impl PartialEq for BlockChecker {
fn eq(&self, _other: &Self) -> bool {
matches!(self, _other)
}
}
#[async_std::test]
async fn block_checker_works() {
let mut source_client = TestSourceClient { ..Default::default() };
let mut target_client = TestTargetClient {
best_synced_header_hash: HashMap::from([(9, Ok(Some(5)))]),
finality_verification_context: HashMap::from([(
9,
Ok(TestFinalityVerificationContext { check_equivocations: true }),
)]),
synced_headers_finality_info: HashMap::from([(
10,
Ok(vec![
new_header_finality_info(6, None),
new_header_finality_info(7, Some(false)),
new_header_finality_info(8, None),
new_header_finality_info(9, Some(true)),
new_header_finality_info(10, None),
new_header_finality_info(11, None),
new_header_finality_info(12, None),
]),
)]),
..Default::default()
};
let mut reporter =
EquivocationsReporter::::new();
let block_checker = BlockChecker::new(10);
assert!(block_checker
.run(
&mut source_client,
&mut target_client,
&mut FinalityProofsBuf::new(vec![
TestFinalityProof(6, vec!["6-1"]),
TestFinalityProof(7, vec![]),
TestFinalityProof(8, vec!["8-1"]),
TestFinalityProof(9, vec!["9-1"]),
TestFinalityProof(10, vec![]),
TestFinalityProof(11, vec!["11-1", "11-2"]),
TestFinalityProof(12, vec!["12-1"])
]),
&mut reporter
)
.await
.is_ok());
assert_eq!(
*source_client.reported_equivocations.lock().unwrap(),
HashMap::from([(5, vec!["6-1"]), (9, vec!["11-1", "11-2", "12-1"])])
);
}
#[async_std::test]
async fn block_checker_works_with_empty_context() {
let mut target_client = TestTargetClient {
best_synced_header_hash: HashMap::from([(9, Ok(None))]),
finality_verification_context: HashMap::from([(
9,
Ok(TestFinalityVerificationContext { check_equivocations: true }),
)]),
synced_headers_finality_info: HashMap::from([(
10,
Ok(vec![new_header_finality_info(6, None)]),
)]),
..Default::default()
};
let mut source_client = TestSourceClient { ..Default::default() };
let mut reporter =
EquivocationsReporter::::new();
let block_checker = BlockChecker::new(10);
assert!(block_checker
.run(
&mut source_client,
&mut target_client,
&mut FinalityProofsBuf::new(vec![TestFinalityProof(6, vec!["6-1"])]),
&mut reporter
)
.await
.is_ok());
assert_eq!(*source_client.reported_equivocations.lock().unwrap(), HashMap::default());
}
#[async_std::test]
async fn read_synced_headers_handles_errors() {
let mut target_client = TestTargetClient {
synced_headers_finality_info: HashMap::from([
(10, Err(TestClientError::NonConnection)),
(11, Err(TestClientError::Connection)),
]),
..Default::default()
};
let mut source_client = TestSourceClient { ..Default::default() };
let mut reporter =
EquivocationsReporter::::new();
// NonConnection error
let block_checker = BlockChecker::new(10);
assert_eq!(
block_checker
.run(
&mut source_client,
&mut target_client,
&mut FinalityProofsBuf::new(vec![]),
&mut reporter
)
.await,
Err(BlockChecker::ReadSyncedHeaders(ReadSyncedHeaders { target_block_num: 10 }))
);
assert_eq!(target_client.num_reconnects, 0);
// Connection error
let block_checker = BlockChecker::new(11);
assert_eq!(
block_checker
.run(
&mut source_client,
&mut target_client,
&mut FinalityProofsBuf::new(vec![]),
&mut reporter
)
.await,
Err(BlockChecker::ReadSyncedHeaders(ReadSyncedHeaders { target_block_num: 11 }))
);
assert_eq!(target_client.num_reconnects, 1);
}
#[async_std::test]
async fn read_context_handles_errors() {
let mut target_client = TestTargetClient {
synced_headers_finality_info: HashMap::from([(10, Ok(vec![])), (11, Ok(vec![]))]),
best_synced_header_hash: HashMap::from([
(9, Err(TestClientError::NonConnection)),
(10, Err(TestClientError::Connection)),
]),
..Default::default()
};
let mut source_client = TestSourceClient { ..Default::default() };
let mut reporter =
EquivocationsReporter::::new();
// NonConnection error
let block_checker = BlockChecker::new(10);
assert_eq!(
block_checker
.run(
&mut source_client,
&mut target_client,
&mut FinalityProofsBuf::new(vec![]),
&mut reporter
)
.await,
Err(BlockChecker::ReadContext(ReadContext {
target_block_num: 10,
synced_headers: vec![]
}))
);
assert_eq!(target_client.num_reconnects, 0);
// Connection error
let block_checker = BlockChecker::new(11);
assert_eq!(
block_checker
.run(
&mut source_client,
&mut target_client,
&mut FinalityProofsBuf::new(vec![]),
&mut reporter
)
.await,
Err(BlockChecker::ReadContext(ReadContext {
target_block_num: 11,
synced_headers: vec![]
}))
);
assert_eq!(target_client.num_reconnects, 1);
}
}