SubstrateMessagesSource {
/// Create new Substrate headers source.
pub fn new(
client: Client,
lane_id: LaneId,
transaction_params: TransactionParams>,
target_to_source_headers_relay: Option>,
) -> Self {
SubstrateMessagesSource {
client,
lane_id,
transaction_params,
target_to_source_headers_relay,
}
}
/// Read outbound lane state from the on-chain storage at given block.
async fn outbound_lane_data(
&self,
id: SourceHeaderIdOf>,
) -> Result, SubstrateError> {
self.client
.storage_value(
outbound_lane_data_key(
P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
&self.lane_id,
),
Some(id.1),
)
.await
}
/// Ensure that the messages pallet at source chain is active.
async fn ensure_pallet_active(&self) -> Result<(), SubstrateError> {
ensure_messages_pallet_active::(&self.client).await
}
}
impl Clone for SubstrateMessagesSource {
fn clone(&self) -> Self {
Self {
client: self.client.clone(),
lane_id: self.lane_id,
transaction_params: self.transaction_params.clone(),
target_to_source_headers_relay: self.target_to_source_headers_relay.clone(),
}
}
}
#[async_trait]
impl RelayClient for SubstrateMessagesSource {
type Error = SubstrateError;
async fn reconnect(&mut self) -> Result<(), SubstrateError> {
self.client.reconnect().await
}
}
#[async_trait]
impl SourceClient> for SubstrateMessagesSource
where
AccountIdOf:
From< as Pair>::Public>,
P::SourceTransactionSignScheme: TransactionSignScheme,
{
async fn state(&self) -> Result>, SubstrateError> {
// we can't continue to deliver confirmations if source node is out of sync, because
// it may have already received confirmations that we're going to deliver
self.client.ensure_synced().await?;
// we can't relay confirmations if messages pallet at source chain is halted
self.ensure_pallet_active().await?;
read_client_state::<
_,
as MessageLane>::TargetHeaderHash,
as MessageLane>::TargetHeaderNumber,
>(&self.client, P::TargetChain::BEST_FINALIZED_HEADER_ID_METHOD)
.await
}
async fn latest_generated_nonce(
&self,
id: SourceHeaderIdOf>,
) -> Result<(SourceHeaderIdOf>, MessageNonce), SubstrateError> {
// lane data missing from the storage is fine until first message is sent
let latest_generated_nonce = self
.outbound_lane_data(id)
.await?
.map(|data| data.latest_generated_nonce)
.unwrap_or(0);
Ok((id, latest_generated_nonce))
}
async fn latest_confirmed_received_nonce(
&self,
id: SourceHeaderIdOf>,
) -> Result<(SourceHeaderIdOf>, MessageNonce), SubstrateError> {
// lane data missing from the storage is fine until first message is sent
let latest_received_nonce = self
.outbound_lane_data(id)
.await?
.map(|data| data.latest_received_nonce)
.unwrap_or(0);
Ok((id, latest_received_nonce))
}
async fn generated_message_details(
&self,
id: SourceHeaderIdOf>,
nonces: RangeInclusive,
) -> Result<
MessageDetailsMap< as MessageLane>::SourceChainBalance>,
SubstrateError,
> {
let encoded_response = self
.client
.state_call(
P::TargetChain::TO_CHAIN_MESSAGE_DETAILS_METHOD.into(),
Bytes((self.lane_id, nonces.start(), nonces.end()).encode()),
Some(id.1),
)
.await?;
make_message_details_map::(
Decode::decode(&mut &encoded_response.0[..])
.map_err(SubstrateError::ResponseParseFailed)?,
nonces,
)
}
async fn prove_messages(
&self,
id: SourceHeaderIdOf>,
nonces: RangeInclusive,
proof_parameters: MessageProofParameters,
) -> Result<
(
SourceHeaderIdOf>,
RangeInclusive,
as MessageLane>::MessagesProof,
),
SubstrateError,
> {
let mut storage_keys =
Vec::with_capacity(nonces.end().saturating_sub(*nonces.start()) as usize + 1);
let mut message_nonce = *nonces.start();
while message_nonce <= *nonces.end() {
let message_key = bp_messages::storage_keys::message_key(
P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
&self.lane_id,
message_nonce,
);
storage_keys.push(message_key);
message_nonce += 1;
}
if proof_parameters.outbound_state_proof_required {
storage_keys.push(bp_messages::storage_keys::outbound_lane_data_key(
P::TargetChain::WITH_CHAIN_MESSAGES_PALLET_NAME,
&self.lane_id,
));
}
let proof = self.client.prove_storage(storage_keys, id.1).await?.iter_nodes().collect();
let proof = FromBridgedChainMessagesProof {
bridged_header_hash: id.1,
storage_proof: proof,
lane: self.lane_id,
nonces_start: *nonces.start(),
nonces_end: *nonces.end(),
};
Ok((id, nonces, (proof_parameters.dispatch_weight, proof)))
}
async fn submit_messages_receiving_proof(
&self,
_generated_at_block: TargetHeaderIdOf>,
proof: as MessageLane>::MessagesReceivingProof,
) -> Result<(), SubstrateError> {
let genesis_hash = *self.client.genesis_hash();
let transaction_params = self.transaction_params.clone();
let (spec_version, transaction_version) = self.client.simple_runtime_version().await?;
self.client
.submit_signed_extrinsic(
self.transaction_params.signer.public().into(),
move |best_block_id, transaction_nonce| {
make_messages_delivery_proof_transaction::(
spec_version,
transaction_version,
&genesis_hash,
&transaction_params,
best_block_id,
transaction_nonce,
proof,
true,
)
},
)
.await?;
Ok(())
}
async fn require_target_header_on_source(&self, id: TargetHeaderIdOf>) {
if let Some(ref target_to_source_headers_relay) = self.target_to_source_headers_relay {
target_to_source_headers_relay.require_finalized_header(id).await;
}
}
async fn estimate_confirmation_transaction(
&self,
) -> as MessageLane>::SourceChainBalance {
let runtime_version = match self.client.runtime_version().await {
Ok(v) => v,
Err(_) => return BalanceOf::::max_value(),
};
self.client
.estimate_extrinsic_fee(make_messages_delivery_proof_transaction::(
runtime_version.spec_version,
runtime_version.transaction_version,
self.client.genesis_hash(),
&self.transaction_params,
HeaderId(Default::default(), Default::default()),
Zero::zero(),
prepare_dummy_messages_delivery_proof::(),
false,
))
.await
.map(|fee| fee.inclusion_fee())
.unwrap_or_else(|_| BalanceOf::::max_value())
}
}
/// Ensure that the messages pallet at source chain is active.
pub(crate) async fn ensure_messages_pallet_active(
client: &Client,
) -> Result<(), SubstrateError>
where
AtChain: ChainWithMessages,
WithChain: ChainWithMessages,
{
let operating_mode = client
.storage_value(operating_mode_key(WithChain::WITH_CHAIN_MESSAGES_PALLET_NAME), None)
.await?;
let is_halted = operating_mode == Some(OperatingMode::Halted);
if is_halted {
Err(SubstrateError::BridgePalletIsHalted)
} else {
Ok(())
}
}
/// Make messages delivery proof transaction from given proof.
fn make_messages_delivery_proof_transaction(
spec_version: u32,
transaction_version: u32,
source_genesis_hash: &HashOf,
source_transaction_params: &TransactionParams>,
source_best_block_id: HeaderIdOf,
transaction_nonce: IndexOf,
proof: SubstrateMessagesDeliveryProof,
trace_call: bool,
) -> Bytes
where
P::SourceTransactionSignScheme: TransactionSignScheme,
{
let call =
P::ReceiveMessagesDeliveryProofCallBuilder::build_receive_messages_delivery_proof_call(
proof, trace_call,
);
Bytes(
P::SourceTransactionSignScheme::sign_transaction(SignParam {
spec_version,
transaction_version,
genesis_hash: *source_genesis_hash,
signer: source_transaction_params.signer.clone(),
era: TransactionEra::new(source_best_block_id, source_transaction_params.mortality),
unsigned: UnsignedTransaction::new(call, transaction_nonce),
})
.encode(),
)
}
/// Prepare 'dummy' messages delivery proof that will compose the delivery confirmation transaction.
///
/// We don't care about proof actually being the valid proof, because its validity doesn't
/// affect the call weight - we only care about its size.
fn prepare_dummy_messages_delivery_proof(
) -> SubstrateMessagesDeliveryProof {
let single_message_confirmation_size = bp_messages::InboundLaneData::<()>::encoded_size_hint(
SC::MAXIMAL_ENCODED_ACCOUNT_ID_SIZE,
1,
1,
)
.unwrap_or(u32::MAX);
let proof_size = TC::STORAGE_PROOF_OVERHEAD.saturating_add(single_message_confirmation_size);
(
UnrewardedRelayersState {
unrewarded_relayer_entries: 1,
messages_in_oldest_entry: 1,
total_messages: 1,
},
FromBridgedChainMessagesDeliveryProof {
bridged_header_hash: Default::default(),
storage_proof: vec![vec![0; proof_size as usize]],
lane: Default::default(),
},
)
}
/// Read best blocks from given client.
///
/// This function assumes that the chain that is followed by the `self_client` has
/// bridge GRANDPA pallet deployed and it provides `best_finalized_header_id_method_name`
/// runtime API to read the best finalized Bridged chain header.
pub async fn read_client_state(
self_client: &Client,
best_finalized_header_id_method_name: &str,
) -> Result<
ClientState, HeaderId>,
SubstrateError,
>
where
SelfChain: Chain,
SelfChain::Header: DeserializeOwned,
SelfChain::Index: DeserializeOwned,
BridgedHeaderHash: Decode,
BridgedHeaderNumber: Decode,
{
// let's read our state first: we need best finalized header hash on **this** chain
let self_best_finalized_header_hash = self_client.best_finalized_header_hash().await?;
let self_best_finalized_header =
self_client.header_by_hash(self_best_finalized_header_hash).await?;
let self_best_finalized_id =
HeaderId(*self_best_finalized_header.number(), self_best_finalized_header_hash);
// now let's read our best header on **this** chain
let self_best_header = self_client.best_header().await?;
let self_best_hash = self_best_header.hash();
let self_best_id = HeaderId(*self_best_header.number(), self_best_hash);
// now let's read id of best finalized peer header at our best finalized block
let encoded_best_finalized_peer_on_self = self_client
.state_call(
best_finalized_header_id_method_name.into(),
Bytes(Vec::new()),
Some(self_best_hash),
)
.await?;
let decoded_best_finalized_peer_on_self: (BridgedHeaderNumber, BridgedHeaderHash) =
Decode::decode(&mut &encoded_best_finalized_peer_on_self.0[..])
.map_err(SubstrateError::ResponseParseFailed)?;
let peer_on_self_best_finalized_id =
HeaderId(decoded_best_finalized_peer_on_self.0, decoded_best_finalized_peer_on_self.1);
Ok(ClientState {
best_self: self_best_id,
best_finalized_self: self_best_finalized_id,
best_finalized_peer_at_best_self: peer_on_self_best_finalized_id,
})
}
fn make_message_details_map(
weights: Vec>,
nonces: RangeInclusive,
) -> Result, SubstrateError> {
let make_missing_nonce_error = |expected_nonce| {
Err(SubstrateError::Custom(format!(
"Missing nonce {} in message_details call result. Expected all nonces from {:?}",
expected_nonce, nonces,
)))
};
let mut weights_map = MessageDetailsMap::new();
// this is actually prevented by external logic
if nonces.is_empty() {
return Ok(weights_map)
}
// check if last nonce is missing - loop below is not checking this
let last_nonce_is_missing =
weights.last().map(|details| details.nonce != *nonces.end()).unwrap_or(true);
if last_nonce_is_missing {
return make_missing_nonce_error(*nonces.end())
}
let mut expected_nonce = *nonces.start();
let mut is_at_head = true;
for details in weights {
match (details.nonce == expected_nonce, is_at_head) {
(true, _) => (),
(false, true) => {
// this may happen if some messages were already pruned from the source node
//
// this is not critical error and will be auto-resolved by messages lane (and target
// node)
log::info!(
target: "bridge",
"Some messages are missing from the {} node: {:?}. Target node may be out of sync?",
C::NAME,
expected_nonce..details.nonce,
);
},
(false, false) => {
// some nonces are missing from the middle/tail of the range
//
// this is critical error, because we can't miss any nonces
return make_missing_nonce_error(expected_nonce)
},
}
weights_map.insert(
details.nonce,
MessageDetails {
dispatch_weight: details.dispatch_weight,
size: details.size as _,
reward: details.delivery_and_dispatch_fee,
dispatch_fee_payment: details.dispatch_fee_payment,
},
);
expected_nonce = details.nonce + 1;
is_at_head = false;
}
Ok(weights_map)
}
#[cfg(test)]
mod tests {
use super::*;
use bp_runtime::messages::DispatchFeePayment;
use relay_rococo_client::Rococo;
use relay_wococo_client::Wococo;
fn message_details_from_rpc(
nonces: RangeInclusive,
) -> Vec> {
nonces
.into_iter()
.map(|nonce| bp_messages::MessageDetails {
nonce,
dispatch_weight: 0,
size: 0,
delivery_and_dispatch_fee: 0,
dispatch_fee_payment: DispatchFeePayment::AtSourceChain,
})
.collect()
}
#[test]
fn make_message_details_map_succeeds_if_no_messages_are_missing() {
assert_eq!(
make_message_details_map::(message_details_from_rpc(1..=3), 1..=3,).unwrap(),
vec![
(
1,
MessageDetails {
dispatch_weight: 0,
size: 0,
reward: 0,
dispatch_fee_payment: DispatchFeePayment::AtSourceChain,
}
),
(
2,
MessageDetails {
dispatch_weight: 0,
size: 0,
reward: 0,
dispatch_fee_payment: DispatchFeePayment::AtSourceChain,
}
),
(
3,
MessageDetails {
dispatch_weight: 0,
size: 0,
reward: 0,
dispatch_fee_payment: DispatchFeePayment::AtSourceChain,
}
),
]
.into_iter()
.collect(),
);
}
#[test]
fn make_message_details_map_succeeds_if_head_messages_are_missing() {
assert_eq!(
make_message_details_map::(message_details_from_rpc(2..=3), 1..=3,).unwrap(),
vec![
(
2,
MessageDetails {
dispatch_weight: 0,
size: 0,
reward: 0,
dispatch_fee_payment: DispatchFeePayment::AtSourceChain,
}
),
(
3,
MessageDetails {
dispatch_weight: 0,
size: 0,
reward: 0,
dispatch_fee_payment: DispatchFeePayment::AtSourceChain,
}
),
]
.into_iter()
.collect(),
);
}
#[test]
fn make_message_details_map_fails_if_mid_messages_are_missing() {
let mut message_details_from_rpc = message_details_from_rpc(1..=3);
message_details_from_rpc.remove(1);
assert!(matches!(
make_message_details_map::(message_details_from_rpc, 1..=3,),
Err(SubstrateError::Custom(_))
));
}
#[test]
fn make_message_details_map_fails_if_tail_messages_are_missing() {
assert!(matches!(
make_message_details_map::(message_details_from_rpc(1..=2), 1..=3,),
Err(SubstrateError::Custom(_))
));
}
#[test]
fn make_message_details_map_fails_if_all_messages_are_missing() {
assert!(matches!(
make_message_details_map::(vec![], 1..=3),
Err(SubstrateError::Custom(_))
));
}
#[test]
fn prepare_dummy_messages_delivery_proof_works() {
let expected_minimal_size =
Wococo::MAXIMAL_ENCODED_ACCOUNT_ID_SIZE + Rococo::STORAGE_PROOF_OVERHEAD;
let dummy_proof = prepare_dummy_messages_delivery_proof::();
assert!(
dummy_proof.1.encode().len() as u32 > expected_minimal_size,
"Expected proof size at least {}. Got: {}",
expected_minimal_size,
dummy_proof.1.encode().len(),
);
}
}