From 7d44a3647538efeb4ebe952e83ae98e86555490c Mon Sep 17 00:00:00 2001
From: Svyatoslav Nikolsky <svyatonik@gmail.com>
Date: Tue, 1 Aug 2023 12:43:11 +0300
Subject: [PATCH] Implement additional require primitives for dynamic fees
 directly for pallet-xcm-bridge-hub (#2261)

Xcm bridge hub router v2 (backport to master branch) (#2312)

* copy new pallet (palle-xcm-bridge-hub-router) from dynamic-fees-v1 branch

* added remaining traces of pallet-xcm-bridge-hub-router

* added comment about sharing delivery fee factor between all bridges, opened by this chain

* spelling

* clippy

Implement additional require primitives for dynamic fees directly for pallet-xcm-bridge-hub (#2261)

* added backoff mechanism to inbound bridge queue

* impl backpressure in the XcmBlobHaulerAdapter

* leave TODOs

* BridgeMessageProcessor prototype

* another TODO

* Revert "also temporary (?) remove BridgesByLocalOrigin because the storage format will likely change to be able to resume bridges from the on_iniitalize/on_idle"

This reverts commit bdd7ae11a8942b58c5db6ac6d4e7922aa28cece4.

* prototype for QueuePausedQuery

* implement ExportXcm and MessageDispatch for pallet-xcm-bridge-hub

* spelling

* flush

* small comments to myself

* more backports from dynamic-fees-v1

* use new pallet as exporter and dispatcher in Millau

* use new pallet as exporter and dispatcher in Rialto

* use new pallet as exporter and dispatcher in RialtoParachain

* flush

* fix remaining compilation issues

* warnings + fmt

* fix tests

* LocalXcmChannelManager

* change lane ids

* it works!

* remove bp-xcm-bridge-hub-router and use LocalXcmChannelManager everywhere

* removed commented code

* cleaning up

* cleaning up

* cleaning up

* - separated BridgeId and LaneId
- BridgeId now uses versioned universal locations
- added missing stuff to exporter.rs

* OnMessagesDelivered is back

* start using bp-xcm-bridge-hub as OnMessagesDelivered

* cleaning up

* spelling

* fix stupid issues

* Backport latest relevant dynamic fees changes from v1 to v2 (#2372)

* backport latest relevant dynamic fees changes from v1 to v2

* fix comment

Added remaining unit tests for pallet-xcm-bridge-hub (#2499)

* added backoff mechanism to inbound bridge queue

* impl backpressure in the XcmBlobHaulerAdapter

* leave TODOs

* BridgeMessageProcessor prototype

* another TODO

* Revert "also temporary (?) remove BridgesByLocalOrigin because the storage format will likely change to be able to resume bridges from the on_iniitalize/on_idle"

This reverts commit bdd7ae11a8942b58c5db6ac6d4e7922aa28cece4.

* prototype for QueuePausedQuery

* implement ExportXcm and MessageDispatch for pallet-xcm-bridge-hub

* spelling

* flush

* small comments to myself

* more backports from dynamic-fees-v1

* use new pallet as exporter and dispatcher in Millau

* use new pallet as exporter and dispatcher in Rialto

* use new pallet as exporter and dispatcher in RialtoParachain

* flush

* fix remaining compilation issues

* warnings + fmt

* fix tests

* LocalXcmChannelManager

* change lane ids

* it works!

* remove bp-xcm-bridge-hub-router and use LocalXcmChannelManager everywhere

* removed commented code

* cleaning up

* cleaning up

* cleaning up

* - separated BridgeId and LaneId
- BridgeId now uses versioned universal locations
- added missing stuff to exporter.rs

* OnMessagesDelivered is back

* start using bp-xcm-bridge-hub as OnMessagesDelivered

* cleaning up

* spelling

* fix stupid issues

* added remaining unit tests for pallet-xcm-bridge-hub

fixed benchmarks (#2504)

Remove pallet_xcm_bridge_hub::SuspendedBridges (#2505)

* remove pallet_xcm_bridge_hub::SuspendedBridges

* apply review suggestions
---
 Cargo.lock                                    |   3 +
 .../extensions/refund_relayer_extension.rs    |  22 +-
 bridges/bin/runtime-common/src/lib.rs         |   3 -
 .../runtime-common/src/messages_call_ext.rs   |   7 +-
 .../src/messages_xcm_extension.rs             | 524 ------------------
 bridges/bin/runtime-common/src/mock.rs        |  17 +-
 bridges/modules/messages/src/lanes_manager.rs |  37 +-
 bridges/modules/messages/src/lib.rs           |   6 +-
 bridges/modules/messages/src/tests/mock.rs    |  34 +-
 .../messages/src/tests/pallet_tests.rs        |  86 ++-
 .../modules/xcm-bridge-hub-router/Cargo.toml  |   5 +-
 .../xcm-bridge-hub-router/src/benchmarking.rs |  43 +-
 .../modules/xcm-bridge-hub-router/src/lib.rs  | 291 ++++------
 .../modules/xcm-bridge-hub-router/src/mock.rs |  34 +-
 .../xcm-bridge-hub-router/src/weights.rs      | 133 +----
 bridges/modules/xcm-bridge-hub/Cargo.toml     |   1 +
 .../modules/xcm-bridge-hub/src/dispatcher.rs  | 119 +++-
 .../modules/xcm-bridge-hub/src/exporter.rs    | 371 +++++++++++--
 bridges/modules/xcm-bridge-hub/src/lib.rs     | 209 +++++--
 bridges/modules/xcm-bridge-hub/src/mock.rs    |  80 ++-
 .../primitives/messages/src/target_chain.rs   |   4 +-
 bridges/primitives/xcm-bridge-hub/Cargo.toml  |   2 +
 bridges/primitives/xcm-bridge-hub/src/lib.rs  | 153 +++--
 23 files changed, 1092 insertions(+), 1092 deletions(-)
 delete mode 100644 bridges/bin/runtime-common/src/messages_xcm_extension.rs

diff --git a/Cargo.lock b/Cargo.lock
index 938daca729c..ed3a7a678aa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2012,6 +2012,7 @@ dependencies = [
  "frame-support",
  "parity-scale-codec",
  "scale-info",
+ "serde",
  "sp-std 14.0.0",
  "staging-xcm",
 ]
@@ -11993,6 +11994,7 @@ dependencies = [
 name = "pallet-xcm-bridge-hub"
 version = "0.2.0"
 dependencies = [
+ "bp-header-chain",
  "bp-messages",
  "bp-runtime",
  "bp-xcm-bridge-hub",
@@ -12018,6 +12020,7 @@ dependencies = [
 name = "pallet-xcm-bridge-hub-router"
 version = "0.5.0"
 dependencies = [
+ "bp-xcm-bridge-hub",
  "bp-xcm-bridge-hub-router",
  "frame-benchmarking",
  "frame-support",
diff --git a/bridges/bin/runtime-common/src/extensions/refund_relayer_extension.rs b/bridges/bin/runtime-common/src/extensions/refund_relayer_extension.rs
index 668ca94dd7c..42a3d118854 100644
--- a/bridges/bin/runtime-common/src/extensions/refund_relayer_extension.rs
+++ b/bridges/bin/runtime-common/src/extensions/refund_relayer_extension.rs
@@ -19,11 +19,8 @@
 //! with calls that are: delivering new message and all necessary underlying headers
 //! (parachain or relay chain).
 
-use crate::{
-	messages_call_ext::{
-		CallHelper as MessagesCallHelper, CallInfo as MessagesCallInfo, MessagesCallSubType,
-	},
-	messages_xcm_extension::LaneIdFromChainId,
+use crate::messages_call_ext::{
+	CallHelper as MessagesCallHelper, CallInfo as MessagesCallInfo, MessagesCallSubType,
 };
 use bp_messages::{ChainWithMessages, LaneId, MessageNonce};
 use bp_relayers::{ExplicitOrAccountParams, RewardsAccountOwner, RewardsAccountParams};
@@ -95,15 +92,17 @@ pub trait RefundableMessagesLaneId {
 }
 
 /// Default implementation of `RefundableMessagesLaneId`.
-pub struct RefundableMessagesLane<Runtime, Instance>(PhantomData<(Runtime, Instance)>);
+pub struct RefundableMessagesLane<Runtime, Instance, Lane>(PhantomData<(Runtime, Instance, Lane)>);
 
-impl<Runtime, Instance> RefundableMessagesLaneId for RefundableMessagesLane<Runtime, Instance>
+impl<Runtime, Instance, Lane> RefundableMessagesLaneId
+	for RefundableMessagesLane<Runtime, Instance, Lane>
 where
 	Runtime: MessagesConfig<Instance>,
 	Instance: 'static,
+	Lane: Get<LaneId>,
 {
 	type Instance = Instance;
-	type Id = LaneIdFromChainId<Runtime, Instance>;
+	type Id = Lane;
 }
 
 /// Refund calculator.
@@ -982,13 +981,14 @@ pub(crate) mod tests {
 			TEST_BRIDGED_CHAIN_ID,
 			RewardsAccountOwner::BridgedChain,
 		);
+		pub TestLaneId: LaneId = test_lane_id();
 	}
 
 	bp_runtime::generate_static_str_provider!(TestExtension);
 
 	type TestMessagesExtensionProvider = RefundBridgedMessages<
 		TestRuntime,
-		RefundableMessagesLane<TestRuntime, ()>,
+		RefundableMessagesLane<TestRuntime, (), TestLaneId>,
 		ActualFeeRefund<TestRuntime>,
 		ConstU64<1>,
 		StrTestExtension,
@@ -997,7 +997,7 @@ pub(crate) mod tests {
 	type TestGrandpaExtensionProvider = RefundBridgedGrandpaMessages<
 		TestRuntime,
 		(),
-		RefundableMessagesLane<TestRuntime, ()>,
+		RefundableMessagesLane<TestRuntime, (), TestLaneId>,
 		ActualFeeRefund<TestRuntime>,
 		ConstU64<1>,
 		StrTestExtension,
@@ -1006,7 +1006,7 @@ pub(crate) mod tests {
 	type TestExtensionProvider = RefundBridgedParachainMessages<
 		TestRuntime,
 		RefundableParachain<(), BridgedUnderlyingParachain>,
-		RefundableMessagesLane<TestRuntime, ()>,
+		RefundableMessagesLane<TestRuntime, (), TestLaneId>,
 		ActualFeeRefund<TestRuntime>,
 		ConstU64<1>,
 		StrTestExtension,
diff --git a/bridges/bin/runtime-common/src/lib.rs b/bridges/bin/runtime-common/src/lib.rs
index b65bb6041d5..3e3394688f1 100644
--- a/bridges/bin/runtime-common/src/lib.rs
+++ b/bridges/bin/runtime-common/src/lib.rs
@@ -24,12 +24,9 @@ pub mod extensions;
 pub mod messages_api;
 pub mod messages_benchmarking;
 pub mod messages_call_ext;
-pub mod messages_xcm_extension;
 pub mod parachains_benchmarking;
 
 mod mock;
 
 #[cfg(feature = "integrity-test")]
 pub mod integrity;
-
-const LOG_TARGET_BRIDGE_DISPATCH: &str = "runtime::bridge-dispatch";
diff --git a/bridges/bin/runtime-common/src/messages_call_ext.rs b/bridges/bin/runtime-common/src/messages_call_ext.rs
index c4e5a5e2582..2101e2a5ffc 100644
--- a/bridges/bin/runtime-common/src/messages_call_ext.rs
+++ b/bridges/bin/runtime-common/src/messages_call_ext.rs
@@ -302,7 +302,8 @@ impl<
 				return sp_runtime::transaction_validity::InvalidTransaction::Call.into()
 			},
 			Some(CallInfo::ReceiveMessagesProof(proof_info))
-				if proof_info.is_obsolete(T::MessageDispatch::is_active()) =>
+				if proof_info
+					.is_obsolete(T::MessageDispatch::is_active(proof_info.base.lane_id)) =>
 			{
 				log::trace!(
 					target: pallet_bridge_messages::LOG_TARGET,
@@ -486,12 +487,12 @@ mod tests {
 
 	#[test]
 	fn extension_reject_call_when_dispatcher_is_inactive() {
-		sp_io::TestExternalities::new(Default::default()).execute_with(|| {
+		run_test(|| {
 			// when current best delivered is message#10 and we're trying to deliver message 11..=15
 			// => tx is accepted, but we have inactive dispatcher, so...
 			deliver_message_10();
 
-			DummyMessageDispatch::deactivate();
+			DummyMessageDispatch::deactivate(test_lane_id());
 			assert!(!validate_message_delivery(11, 15));
 		});
 	}
diff --git a/bridges/bin/runtime-common/src/messages_xcm_extension.rs b/bridges/bin/runtime-common/src/messages_xcm_extension.rs
deleted file mode 100644
index 5ce906bef61..00000000000
--- a/bridges/bin/runtime-common/src/messages_xcm_extension.rs
+++ /dev/null
@@ -1,524 +0,0 @@
-// Copyright (C) Parity Technologies (UK) Ltd.
-// This file is part of Parity Bridges Common.
-
-// Parity Bridges Common is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// Parity Bridges Common is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with Parity Bridges Common.  If not, see <http://www.gnu.org/licenses/>.
-
-//! Module provides utilities for easier XCM handling, e.g:
-//! `XcmExecutor` -> `MessageSender` -> `OutboundMessageQueue`
-//!                                             |
-//!                                          `Relayer`
-//!                                             |
-//! `XcmRouter` <- `MessageDispatch` <- `InboundMessageQueue`
-
-use bp_messages::{
-	source_chain::OnMessagesDelivered,
-	target_chain::{DispatchMessage, MessageDispatch},
-	LaneId, MessageNonce,
-};
-use bp_runtime::{messages::MessageDispatchResult, Chain};
-pub use bp_xcm_bridge_hub::XcmAsPlainPayload;
-use bp_xcm_bridge_hub_router::XcmChannelStatusProvider;
-use codec::{Decode, Encode};
-use frame_support::{traits::Get, weights::Weight, CloneNoBound, EqNoBound, PartialEqNoBound};
-use pallet_bridge_messages::{
-	Config as MessagesConfig, OutboundLanesCongestedSignals, WeightInfoExt as MessagesPalletWeights,
-};
-use scale_info::TypeInfo;
-use sp_runtime::SaturatedConversion;
-use sp_std::{fmt::Debug, marker::PhantomData};
-use xcm::prelude::*;
-
-use xcm_builder::{DispatchBlob, DispatchBlobError};
-
-/// Make LaneId from chain identifiers of two bridge endpoints.
-// TODO: https://github.com/paritytech/parity-bridges-common/issues/1666: this function
-// is a temporary solution, because `ChainId` and will be removed soon.
-pub struct LaneIdFromChainId<R, I>(PhantomData<(R, I)>);
-
-impl<R, I> Get<LaneId> for LaneIdFromChainId<R, I>
-where
-	R: pallet_bridge_messages::Config<I>,
-	I: 'static,
-{
-	fn get() -> LaneId {
-		LaneId::new(
-			pallet_bridge_messages::ThisChainOf::<R, I>::ID,
-			pallet_bridge_messages::BridgedChainOf::<R, I>::ID,
-		)
-	}
-}
-
-/// Message dispatch result type for single message.
-#[derive(CloneNoBound, EqNoBound, PartialEqNoBound, Encode, Decode, Debug, TypeInfo)]
-pub enum XcmBlobMessageDispatchResult {
-	/// We've been unable to decode message payload.
-	InvalidPayload,
-	/// Message has been dispatched.
-	Dispatched,
-	/// Message has **NOT** been dispatched because of given error.
-	NotDispatched(#[codec(skip)] Option<DispatchBlobError>),
-}
-
-/// [`XcmBlobMessageDispatch`] is responsible for dispatching received messages
-///
-/// It needs to be used at the target bridge hub.
-pub struct XcmBlobMessageDispatch<DispatchBlob, Weights, Channel> {
-	_marker: sp_std::marker::PhantomData<(DispatchBlob, Weights, Channel)>,
-}
-
-impl<
-		BlobDispatcher: DispatchBlob,
-		Weights: MessagesPalletWeights,
-		Channel: XcmChannelStatusProvider,
-	> MessageDispatch for XcmBlobMessageDispatch<BlobDispatcher, Weights, Channel>
-{
-	type DispatchPayload = XcmAsPlainPayload;
-	type DispatchLevelResult = XcmBlobMessageDispatchResult;
-
-	fn is_active() -> bool {
-		!Channel::is_congested()
-	}
-
-	fn dispatch_weight(message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
-		match message.data.payload {
-			Ok(ref payload) => {
-				let payload_size = payload.encoded_size().saturated_into();
-				Weights::message_dispatch_weight(payload_size)
-			},
-			Err(_) => Weight::zero(),
-		}
-	}
-
-	fn dispatch(
-		message: DispatchMessage<Self::DispatchPayload>,
-	) -> MessageDispatchResult<Self::DispatchLevelResult> {
-		let payload = match message.data.payload {
-			Ok(payload) => payload,
-			Err(e) => {
-				log::error!(
-					target: crate::LOG_TARGET_BRIDGE_DISPATCH,
-					"[XcmBlobMessageDispatch] payload error: {:?} - message_nonce: {:?}",
-					e,
-					message.key.nonce
-				);
-				return MessageDispatchResult {
-					unspent_weight: Weight::zero(),
-					dispatch_level_result: XcmBlobMessageDispatchResult::InvalidPayload,
-				}
-			},
-		};
-		let dispatch_level_result = match BlobDispatcher::dispatch_blob(payload) {
-			Ok(_) => {
-				log::debug!(
-					target: crate::LOG_TARGET_BRIDGE_DISPATCH,
-					"[XcmBlobMessageDispatch] DispatchBlob::dispatch_blob was ok - message_nonce: {:?}",
-					message.key.nonce
-				);
-				XcmBlobMessageDispatchResult::Dispatched
-			},
-			Err(e) => {
-				log::error!(
-					target: crate::LOG_TARGET_BRIDGE_DISPATCH,
-					"[XcmBlobMessageDispatch] DispatchBlob::dispatch_blob failed, error: {:?} - message_nonce: {:?}",
-					e, message.key.nonce
-				);
-				XcmBlobMessageDispatchResult::NotDispatched(Some(e))
-			},
-		};
-		MessageDispatchResult { unspent_weight: Weight::zero(), dispatch_level_result }
-	}
-}
-
-/// A pair of sending chain location and message lane, used by this chain to send messages
-/// over the bridge.
-#[cfg_attr(feature = "std", derive(Debug, Eq, PartialEq))]
-pub struct SenderAndLane {
-	/// Sending chain relative location.
-	pub location: Location,
-	/// Message lane, used by the sending chain.
-	pub lane: LaneId,
-}
-
-impl SenderAndLane {
-	/// Create new object using provided location and lane.
-	pub fn new(location: Location, lane: LaneId) -> Self {
-		SenderAndLane { location, lane }
-	}
-}
-
-/// [`XcmBlobHauler`] is responsible for sending messages to the bridge "point-to-point link" from
-/// one side, where on the other it can be dispatched by [`XcmBlobMessageDispatch`].
-pub trait XcmBlobHauler {
-	/// Runtime that has messages pallet deployed.
-	type Runtime: MessagesConfig<Self::MessagesInstance>;
-	/// Instance of the messages pallet that is used to send messages.
-	type MessagesInstance: 'static;
-
-	/// Actual XCM message sender (`HRMP` or `UMP`) to the source chain
-	/// location (`Self::SenderAndLane::get().location`).
-	type ToSourceChainSender: SendXcm;
-	/// An XCM message that is sent to the sending chain when the bridge queue becomes congested.
-	type CongestedMessage: Get<Option<Xcm<()>>>;
-	/// An XCM message that is sent to the sending chain when the bridge queue becomes not
-	/// congested.
-	type UncongestedMessage: Get<Option<Xcm<()>>>;
-
-	/// Returns `true` if we want to handle congestion.
-	fn supports_congestion_detection() -> bool {
-		Self::CongestedMessage::get().is_some() || Self::UncongestedMessage::get().is_some()
-	}
-}
-
-/// XCM bridge adapter which connects [`XcmBlobHauler`] with [`pallet_bridge_messages`] and
-/// makes sure that XCM blob is sent to the outbound lane to be relayed.
-///
-/// It needs to be used at the source bridge hub.
-pub struct XcmBlobHaulerAdapter<XcmBlobHauler, Lanes>(
-	sp_std::marker::PhantomData<(XcmBlobHauler, Lanes)>,
-);
-
-impl<
-		H: XcmBlobHauler,
-		Lanes: Get<sp_std::vec::Vec<(SenderAndLane, (NetworkId, InteriorLocation))>>,
-	> OnMessagesDelivered for XcmBlobHaulerAdapter<H, Lanes>
-{
-	fn on_messages_delivered(lane: LaneId, enqueued_messages: MessageNonce) {
-		if let Some(sender_and_lane) =
-			Lanes::get().iter().find(|link| link.0.lane == lane).map(|link| &link.0)
-		{
-			// notify XCM queue manager about updated lane state
-			LocalXcmQueueManager::<H>::on_bridge_messages_delivered(
-				sender_and_lane,
-				enqueued_messages,
-			);
-		}
-	}
-}
-
-/// Manager of local XCM queues (and indirectly - underlying transport channels) that
-/// controls the queue state.
-///
-/// It needs to be used at the source bridge hub.
-pub struct LocalXcmQueueManager<H>(PhantomData<H>);
-
-/// Maximal number of messages in the outbound bridge queue. Once we reach this limit, we
-/// send a "congestion" XCM message to the sending chain.
-const OUTBOUND_LANE_CONGESTED_THRESHOLD: MessageNonce = 8_192;
-
-/// After we have sent "congestion" XCM message to the sending chain, we wait until number
-/// of messages in the outbound bridge queue drops to this count, before sending `uncongestion`
-/// XCM message.
-const OUTBOUND_LANE_UNCONGESTED_THRESHOLD: MessageNonce = 1_024;
-
-impl<H: XcmBlobHauler> LocalXcmQueueManager<H> {
-	/// Must be called whenever we push a message to the bridge lane.
-	pub fn on_bridge_message_enqueued(
-		sender_and_lane: &SenderAndLane,
-		enqueued_messages: MessageNonce,
-	) {
-		// skip if we dont want to handle congestion
-		if !H::supports_congestion_detection() {
-			return
-		}
-
-		// if we have already sent the congestion signal, we don't want to do anything
-		if Self::is_congested_signal_sent(sender_and_lane.lane) {
-			return
-		}
-
-		// if the bridge queue is not congested, we don't want to do anything
-		let is_congested = enqueued_messages > OUTBOUND_LANE_CONGESTED_THRESHOLD;
-		if !is_congested {
-			return
-		}
-
-		log::info!(
-			target: crate::LOG_TARGET_BRIDGE_DISPATCH,
-			"Sending 'congested' XCM message to {:?} to avoid overloading lane {:?}: there are\
-			{} messages queued at the bridge queue",
-			sender_and_lane.location,
-			sender_and_lane.lane,
-			enqueued_messages,
-		);
-
-		if let Err(e) = Self::send_congested_signal(sender_and_lane) {
-			log::info!(
-				target: crate::LOG_TARGET_BRIDGE_DISPATCH,
-				"Failed to send the 'congested' XCM message to {:?}: {:?}",
-				sender_and_lane.location,
-				e,
-			);
-		}
-	}
-
-	/// Must be called whenever we receive a message delivery confirmation.
-	pub fn on_bridge_messages_delivered(
-		sender_and_lane: &SenderAndLane,
-		enqueued_messages: MessageNonce,
-	) {
-		// skip if we don't want to handle congestion
-		if !H::supports_congestion_detection() {
-			return
-		}
-
-		// if we have not sent the congestion signal before, we don't want to do anything
-		if !Self::is_congested_signal_sent(sender_and_lane.lane) {
-			return
-		}
-
-		// if the bridge queue is still congested, we don't want to do anything
-		let is_congested = enqueued_messages > OUTBOUND_LANE_UNCONGESTED_THRESHOLD;
-		if is_congested {
-			return
-		}
-
-		log::info!(
-			target: crate::LOG_TARGET_BRIDGE_DISPATCH,
-			"Sending 'uncongested' XCM message to {:?}. Lane {:?}: there are\
-			{} messages queued at the bridge queue",
-			sender_and_lane.location,
-			sender_and_lane.lane,
-			enqueued_messages,
-		);
-
-		if let Err(e) = Self::send_uncongested_signal(sender_and_lane) {
-			log::info!(
-				target: crate::LOG_TARGET_BRIDGE_DISPATCH,
-				"Failed to send the 'uncongested' XCM message to {:?}: {:?}",
-				sender_and_lane.location,
-				e,
-			);
-		}
-	}
-
-	/// Returns true if we have sent "congested" signal to the `sending_chain_location`.
-	fn is_congested_signal_sent(lane: LaneId) -> bool {
-		OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::get(lane)
-	}
-
-	/// Send congested signal to the `sending_chain_location`.
-	fn send_congested_signal(sender_and_lane: &SenderAndLane) -> Result<(), SendError> {
-		if let Some(msg) = H::CongestedMessage::get() {
-			send_xcm::<H::ToSourceChainSender>(sender_and_lane.location.clone(), msg)?;
-			OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::insert(
-				sender_and_lane.lane,
-				true,
-			);
-		}
-		Ok(())
-	}
-
-	/// Send `uncongested` signal to the `sending_chain_location`.
-	fn send_uncongested_signal(sender_and_lane: &SenderAndLane) -> Result<(), SendError> {
-		if let Some(msg) = H::UncongestedMessage::get() {
-			send_xcm::<H::ToSourceChainSender>(sender_and_lane.location.clone(), msg)?;
-			OutboundLanesCongestedSignals::<H::Runtime, H::MessagesInstance>::remove(
-				sender_and_lane.lane,
-			);
-		}
-		Ok(())
-	}
-}
-
-/// Adapter for the implementation of `GetVersion`, which attempts to find the minimal
-/// configured XCM version between the destination `dest` and the bridge hub location provided as
-/// `Get<Location>`.
-pub struct XcmVersionOfDestAndRemoteBridge<Version, RemoteBridge>(
-	sp_std::marker::PhantomData<(Version, RemoteBridge)>,
-);
-impl<Version: GetVersion, RemoteBridge: Get<Location>> GetVersion
-	for XcmVersionOfDestAndRemoteBridge<Version, RemoteBridge>
-{
-	fn get_version_for(dest: &Location) -> Option<XcmVersion> {
-		let dest_version = Version::get_version_for(dest);
-		let bridge_hub_version = Version::get_version_for(&RemoteBridge::get());
-
-		match (dest_version, bridge_hub_version) {
-			(Some(dv), Some(bhv)) => Some(sp_std::cmp::min(dv, bhv)),
-			(Some(dv), None) => Some(dv),
-			(None, Some(bhv)) => Some(bhv),
-			(None, None) => None,
-		}
-	}
-}
-
-#[cfg(test)]
-mod tests {
-	use super::*;
-	use crate::mock::*;
-
-	use bp_messages::{LaneState, OutboundLaneData};
-	use frame_support::parameter_types;
-	use pallet_bridge_messages::OutboundLanes;
-
-	parameter_types! {
-		pub TestSenderAndLane: SenderAndLane = SenderAndLane {
-			location: Location::new(1, [Parachain(1000)]),
-			lane: test_lane_id(),
-		};
-		pub TestLanes: sp_std::vec::Vec<(SenderAndLane, (NetworkId, InteriorLocation))> = sp_std::vec![
-			(TestSenderAndLane::get(), (NetworkId::ByGenesis([0; 32]), InteriorLocation::Here))
-		];
-		pub DummyXcmMessage: Xcm<()> = Xcm::new();
-	}
-
-	struct DummySendXcm;
-
-	impl DummySendXcm {
-		fn messages_sent() -> u32 {
-			frame_support::storage::unhashed::get(b"DummySendXcm").unwrap_or(0)
-		}
-	}
-
-	impl SendXcm for DummySendXcm {
-		type Ticket = ();
-
-		fn validate(
-			_destination: &mut Option<Location>,
-			_message: &mut Option<Xcm<()>>,
-		) -> SendResult<Self::Ticket> {
-			Ok(((), Default::default()))
-		}
-
-		fn deliver(_ticket: Self::Ticket) -> Result<XcmHash, SendError> {
-			let messages_sent: u32 = Self::messages_sent();
-			frame_support::storage::unhashed::put(b"DummySendXcm", &(messages_sent + 1));
-			Ok(XcmHash::default())
-		}
-	}
-
-	struct TestBlobHauler;
-
-	impl XcmBlobHauler for TestBlobHauler {
-		type Runtime = TestRuntime;
-		type MessagesInstance = ();
-
-		type ToSourceChainSender = DummySendXcm;
-		type CongestedMessage = DummyXcmMessage;
-		type UncongestedMessage = DummyXcmMessage;
-	}
-
-	type TestBlobHaulerAdapter = XcmBlobHaulerAdapter<TestBlobHauler, TestLanes>;
-
-	fn fill_up_lane_to_congestion() -> MessageNonce {
-		let latest_generated_nonce = OUTBOUND_LANE_CONGESTED_THRESHOLD;
-		OutboundLanes::<TestRuntime, ()>::insert(
-			test_lane_id(),
-			OutboundLaneData {
-				state: LaneState::Opened,
-				oldest_unpruned_nonce: 0,
-				latest_received_nonce: 0,
-				latest_generated_nonce,
-			},
-		);
-		latest_generated_nonce
-	}
-
-	#[test]
-	fn congested_signal_is_not_sent_twice() {
-		run_test(|| {
-			let enqueued = fill_up_lane_to_congestion();
-
-			// next sent message leads to congested signal
-			LocalXcmQueueManager::<TestBlobHauler>::on_bridge_message_enqueued(
-				&TestSenderAndLane::get(),
-				enqueued + 1,
-			);
-			assert_eq!(DummySendXcm::messages_sent(), 1);
-
-			// next sent message => we don't sent another congested signal
-			LocalXcmQueueManager::<TestBlobHauler>::on_bridge_message_enqueued(
-				&TestSenderAndLane::get(),
-				enqueued,
-			);
-			assert_eq!(DummySendXcm::messages_sent(), 1);
-		});
-	}
-
-	#[test]
-	fn congested_signal_is_not_sent_when_outbound_lane_is_not_congested() {
-		run_test(|| {
-			LocalXcmQueueManager::<TestBlobHauler>::on_bridge_message_enqueued(
-				&TestSenderAndLane::get(),
-				1,
-			);
-			assert_eq!(DummySendXcm::messages_sent(), 0);
-		});
-	}
-
-	#[test]
-	fn congested_signal_is_sent_when_outbound_lane_is_congested() {
-		run_test(|| {
-			let enqueued = fill_up_lane_to_congestion();
-
-			// next sent message leads to congested signal
-			LocalXcmQueueManager::<TestBlobHauler>::on_bridge_message_enqueued(
-				&TestSenderAndLane::get(),
-				enqueued + 1,
-			);
-			assert_eq!(DummySendXcm::messages_sent(), 1);
-			assert!(LocalXcmQueueManager::<TestBlobHauler>::is_congested_signal_sent(
-				test_lane_id()
-			));
-		});
-	}
-
-	#[test]
-	fn uncongested_signal_is_not_sent_when_messages_are_delivered_at_other_lane() {
-		run_test(|| {
-			LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
-			assert_eq!(DummySendXcm::messages_sent(), 1);
-
-			// when we receive a delivery report for other lane, we don't send an uncongested signal
-			TestBlobHaulerAdapter::on_messages_delivered(LaneId::new(1, 3), 0);
-			assert_eq!(DummySendXcm::messages_sent(), 1);
-		});
-	}
-
-	#[test]
-	fn uncongested_signal_is_not_sent_when_we_havent_send_congested_signal_before() {
-		run_test(|| {
-			TestBlobHaulerAdapter::on_messages_delivered(test_lane_id(), 0);
-			assert_eq!(DummySendXcm::messages_sent(), 0);
-		});
-	}
-
-	#[test]
-	fn uncongested_signal_is_not_sent_if_outbound_lane_is_still_congested() {
-		run_test(|| {
-			LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
-			assert_eq!(DummySendXcm::messages_sent(), 1);
-
-			TestBlobHaulerAdapter::on_messages_delivered(
-				test_lane_id(),
-				OUTBOUND_LANE_UNCONGESTED_THRESHOLD + 1,
-			);
-			assert_eq!(DummySendXcm::messages_sent(), 1);
-		});
-	}
-
-	#[test]
-	fn uncongested_signal_is_sent_if_outbound_lane_is_uncongested() {
-		run_test(|| {
-			LocalXcmQueueManager::<TestBlobHauler>::send_congested_signal(&TestSenderAndLane::get()).unwrap();
-			assert_eq!(DummySendXcm::messages_sent(), 1);
-
-			TestBlobHaulerAdapter::on_messages_delivered(
-				test_lane_id(),
-				OUTBOUND_LANE_UNCONGESTED_THRESHOLD,
-			);
-			assert_eq!(DummySendXcm::messages_sent(), 2);
-		});
-	}
-}
diff --git a/bridges/bin/runtime-common/src/mock.rs b/bridges/bin/runtime-common/src/mock.rs
index 83ee83f517f..917b529be83 100644
--- a/bridges/bin/runtime-common/src/mock.rs
+++ b/bridges/bin/runtime-common/src/mock.rs
@@ -18,8 +18,6 @@
 
 #![cfg(test)]
 
-use crate::messages_xcm_extension::XcmAsPlainPayload;
-
 use bp_header_chain::ChainWithGrandpa;
 use bp_messages::{
 	target_chain::{DispatchMessage, MessageDispatch},
@@ -28,12 +26,12 @@ use bp_messages::{
 use bp_parachains::SingleParaStoredHeaderDataBuilder;
 use bp_relayers::PayRewardFromAccount;
 use bp_runtime::{messages::MessageDispatchResult, Chain, ChainId, Parachain};
+use codec::Encode;
 use frame_support::{
 	derive_impl, parameter_types,
 	weights::{ConstantMultiplier, IdentityFee, RuntimeDbWeight, Weight},
 };
 use pallet_transaction_payment::Multiplier;
-use sp_core::Get;
 use sp_runtime::{
 	testing::H256,
 	traits::{BlakeTwo256, ConstU32, ConstU64, ConstU8},
@@ -87,7 +85,7 @@ pub type TestStakeAndSlash = pallet_bridge_relayers::StakeAndSlashNamed<
 
 /// Message lane used in tests.
 pub fn test_lane_id() -> LaneId {
-	crate::messages_xcm_extension::LaneIdFromChainId::<TestRuntime, ()>::get()
+	LaneId::new(1, 2)
 }
 
 /// Bridged chain id used in tests.
@@ -189,7 +187,7 @@ impl pallet_bridge_messages::Config for TestRuntime {
 	type RuntimeEvent = RuntimeEvent;
 	type WeightInfo = pallet_bridge_messages::weights::BridgeWeight<TestRuntime>;
 
-	type OutboundPayload = XcmAsPlainPayload;
+	type OutboundPayload = Vec<u8>;
 
 	type InboundPayload = Vec<u8>;
 	type DeliveryPayments = ();
@@ -220,8 +218,8 @@ impl pallet_bridge_relayers::Config for TestRuntime {
 pub struct DummyMessageDispatch;
 
 impl DummyMessageDispatch {
-	pub fn deactivate() {
-		frame_support::storage::unhashed::put(&b"inactive"[..], &false);
+	pub fn deactivate(lane: LaneId) {
+		frame_support::storage::unhashed::put(&(b"inactive", lane).encode()[..], &false);
 	}
 }
 
@@ -229,8 +227,9 @@ impl MessageDispatch for DummyMessageDispatch {
 	type DispatchPayload = Vec<u8>;
 	type DispatchLevelResult = ();
 
-	fn is_active() -> bool {
-		frame_support::storage::unhashed::take::<bool>(&b"inactive"[..]) != Some(false)
+	fn is_active(lane: LaneId) -> bool {
+		frame_support::storage::unhashed::take::<bool>(&(b"inactive", lane).encode()[..]) !=
+			Some(false)
 	}
 
 	fn dispatch_weight(_message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
diff --git a/bridges/modules/messages/src/lanes_manager.rs b/bridges/modules/messages/src/lanes_manager.rs
index 911bed9fcb6..b3455202259 100644
--- a/bridges/modules/messages/src/lanes_manager.rs
+++ b/bridges/modules/messages/src/lanes_manager.rs
@@ -21,8 +21,8 @@ use crate::{
 };
 
 use bp_messages::{
-	ChainWithMessages, InboundLaneData, LaneId, LaneState, MessageKey, MessageNonce,
-	OutboundLaneData,
+	target_chain::MessageDispatch, ChainWithMessages, InboundLaneData, LaneId, LaneState,
+	MessageKey, MessageNonce, OutboundLaneData,
 };
 use bp_runtime::AccountIdOf;
 use codec::{Decode, Encode, MaxEncodedLen};
@@ -45,6 +45,9 @@ pub enum LanesManagerError {
 	ClosedInboundLane,
 	/// Outbound lane with given id is closed.
 	ClosedOutboundLane,
+	/// Message dispatcher is inactive at given inbound lane. This is logical equivalent
+	/// of the [`Self::ClosedInboundLane`] variant.
+	LaneDispatcherInactive,
 }
 
 /// Message lanes manager.
@@ -145,10 +148,32 @@ impl<T: Config<I>, I: 'static> RuntimeInboundLaneStorage<T, I> {
 	) -> Result<RuntimeInboundLaneStorage<T, I>, LanesManagerError> {
 		let cached_data =
 			InboundLanes::<T, I>::get(lane_id).ok_or(LanesManagerError::UnknownInboundLane)?;
-		ensure!(
-			!check_active || cached_data.state.is_active(),
-			LanesManagerError::ClosedInboundLane
-		);
+
+		if check_active {
+			// check that the lane is not explicitly closed
+			ensure!(cached_data.state.is_active(), LanesManagerError::ClosedInboundLane);
+			// apart from the explicit closure, the lane may be unable to receive any messages.
+			// Right now we do an additional check here, but it may be done later (e.g. by
+			// explicitly closing the lane and reopening it from
+			// `pallet-xcm-bridge-hub::on-initialize`)
+			//
+			// The fact that we only check it here, means that the `MessageDispatch` may switch
+			// to inactive state during some message dispatch in the middle of message delivery
+			// transaction. But we treat result of `MessageDispatch::is_active()` as a hint, so
+			// we know that it won't drop messages - just it experiences problems with processing.
+			// This would allow us to check that in our signed extensions, and invalidate
+			// transaction early, thus avoiding losing honest relayers funds. This problem should
+			// gone with relayers coordination protocol.
+			//
+			// There's a limit on number of messages in the message delivery transaction, so even
+			// if we dispatch (enqueue) some additional messages, we'll know the maximal queue
+			// length;
+			ensure!(
+				T::MessageDispatch::is_active(lane_id),
+				LanesManagerError::LaneDispatcherInactive
+			);
+		}
+
 		Ok(RuntimeInboundLaneStorage {
 			lane_id,
 			cached_data: cached_data.into(),
diff --git a/bridges/modules/messages/src/lib.rs b/bridges/modules/messages/src/lib.rs
index 554db4faa8e..03fae73f255 100644
--- a/bridges/modules/messages/src/lib.rs
+++ b/bridges/modules/messages/src/lib.rs
@@ -213,9 +213,6 @@ pub mod pallet {
 				Error::<T, I>::TooManyMessagesInTheProof
 			);
 
-			// if message dispatcher is currently inactive, we won't accept any messages
-			ensure!(T::MessageDispatch::is_active(), Error::<T, I>::MessageDispatchInactive);
-
 			// why do we need to know the weight of this (`receive_messages_proof`) call? Because
 			// we may want to return some funds for not-dispatching (or partially dispatching) some
 			// messages to the call origin (relayer). And this is done by returning actual weight
@@ -463,8 +460,6 @@ pub mod pallet {
 	pub enum Error<T, I = ()> {
 		/// Pallet is not in Normal operating mode.
 		NotOperatingNormally,
-		/// The inbound message dispatcher is inactive.
-		MessageDispatchInactive,
 		/// Error that is reported by the lanes manager.
 		LanesManager(LanesManagerError),
 		/// Message has been treated as invalid by the pallet logic.
@@ -702,6 +697,7 @@ where
 		lane_id: LaneId,
 		message: &T::OutboundPayload,
 	) -> Result<SendMessageArgs<T, I>, Self::Error> {
+		// we can't accept any messages if the pallet is halted
 		ensure_normal_operating_mode::<T, I>()?;
 
 		// check lane
diff --git a/bridges/modules/messages/src/tests/mock.rs b/bridges/modules/messages/src/tests/mock.rs
index 9d24233929b..2caea9813e8 100644
--- a/bridges/modules/messages/src/tests/mock.rs
+++ b/bridges/modules/messages/src/tests/mock.rs
@@ -43,7 +43,7 @@ use bp_runtime::{
 };
 use codec::{Decode, Encode};
 use frame_support::{
-	derive_impl, parameter_types,
+	derive_impl,
 	weights::{constants::RocksDbWeight, Weight},
 };
 use scale_info::TypeInfo;
@@ -183,11 +183,6 @@ impl pallet_bridge_grandpa::Config for TestRuntime {
 	type WeightInfo = pallet_bridge_grandpa::weights::BridgeWeight<TestRuntime>;
 }
 
-parameter_types! {
-	pub const MaxMessagesToPruneAtOnce: u64 = 10;
-	pub const TestBridgedChainId: bp_runtime::ChainId = *b"test";
-}
-
 /// weights of messages pallet calls we use in tests.
 pub type TestWeightInfo = ();
 
@@ -346,8 +341,18 @@ impl DeliveryConfirmationPayments<AccountId> for TestDeliveryConfirmationPayment
 pub struct TestMessageDispatch;
 
 impl TestMessageDispatch {
-	pub fn deactivate() {
-		frame_support::storage::unhashed::put(b"TestMessageDispatch.IsCongested", &true)
+	pub fn deactivate(lane: LaneId) {
+		// "enqueue" enough (to deactivate dispatcher) messages at dispatcher
+		let latest_received_nonce = BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX + 1;
+		for _ in 1..=latest_received_nonce {
+			Self::emulate_enqueued_message(lane);
+		}
+	}
+
+	pub fn emulate_enqueued_message(lane: LaneId) {
+		let key = (b"dispatched", lane).encode();
+		let dispatched = frame_support::storage::unhashed::get_or_default::<MessageNonce>(&key[..]);
+		frame_support::storage::unhashed::put(&key[..], &(dispatched + 1));
 	}
 }
 
@@ -355,10 +360,10 @@ impl MessageDispatch for TestMessageDispatch {
 	type DispatchPayload = TestPayload;
 	type DispatchLevelResult = TestDispatchLevelResult;
 
-	fn is_active() -> bool {
-		!frame_support::storage::unhashed::get_or_default::<bool>(
-			b"TestMessageDispatch.IsCongested",
-		)
+	fn is_active(lane: LaneId) -> bool {
+		frame_support::storage::unhashed::get_or_default::<MessageNonce>(
+			&(b"dispatched", lane).encode()[..],
+		) <= BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX
 	}
 
 	fn dispatch_weight(message: &mut DispatchMessage<TestPayload>) -> Weight {
@@ -372,7 +377,10 @@ impl MessageDispatch for TestMessageDispatch {
 		message: DispatchMessage<TestPayload>,
 	) -> MessageDispatchResult<TestDispatchLevelResult> {
 		match message.data.payload.as_ref() {
-			Ok(payload) => payload.dispatch_result.clone(),
+			Ok(payload) => {
+				Self::emulate_enqueued_message(message.key.lane_id);
+				payload.dispatch_result.clone()
+			},
 			Err(_) => dispatch_result(0),
 		}
 	}
diff --git a/bridges/modules/messages/src/tests/pallet_tests.rs b/bridges/modules/messages/src/tests/pallet_tests.rs
index 13d5af15a3b..a3497c4b66d 100644
--- a/bridges/modules/messages/src/tests/pallet_tests.rs
+++ b/bridges/modules/messages/src/tests/pallet_tests.rs
@@ -28,7 +28,7 @@ use crate::{
 
 use bp_messages::{
 	source_chain::{FromBridgedChainMessagesDeliveryProof, MessagesBridge},
-	target_chain::FromBridgedChainMessagesProof,
+	target_chain::{FromBridgedChainMessagesProof, MessageDispatch},
 	BridgeMessagesCall, ChainWithMessages, DeliveredMessages, InboundLaneData,
 	InboundMessageDetails, LaneId, LaneState, MessageKey, MessageNonce, MessagesOperatingMode,
 	OutboundLaneData, OutboundMessageDetails, UnrewardedRelayer, UnrewardedRelayersState,
@@ -168,7 +168,7 @@ fn pallet_rejects_transactions_if_halted() {
 #[test]
 fn receive_messages_fails_if_dispatcher_is_inactive() {
 	run_test(|| {
-		TestMessageDispatch::deactivate();
+		TestMessageDispatch::deactivate(test_lane_id());
 		let proof = prepare_messages_proof(vec![message(1, REGULAR_PAYLOAD)], None);
 		assert_noop!(
 			Pallet::<TestRuntime>::receive_messages_proof(
@@ -178,7 +178,7 @@ fn receive_messages_fails_if_dispatcher_is_inactive() {
 				1,
 				REGULAR_PAYLOAD.declared_weight,
 			),
-			Error::<TestRuntime, ()>::MessageDispatchInactive,
+			Error::<TestRuntime, ()>::LanesManager(LanesManagerError::LaneDispatcherInactive),
 		);
 	});
 }
@@ -347,6 +347,86 @@ fn receive_messages_proof_updates_confirmed_message_nonce() {
 	});
 }
 
+#[test]
+fn receive_messages_proof_fails_when_dispatcher_is_inactive() {
+	run_test(|| {
+		// "enqueue" enough (to deactivate dispatcher) messages at dispatcher
+		let latest_received_nonce = BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX + 1;
+		for _ in 1..=latest_received_nonce {
+			TestMessageDispatch::emulate_enqueued_message(test_lane_id());
+		}
+		assert!(!TestMessageDispatch::is_active(test_lane_id()));
+		InboundLanes::<TestRuntime, ()>::insert(
+			test_lane_id(),
+			InboundLaneData {
+				state: LaneState::Opened,
+				last_confirmed_nonce: latest_received_nonce,
+				relayers: vec![].into(),
+			},
+		);
+
+		// try to delvier next message - it should fail because dispatcher is in "suspended" state
+		// at the beginning of the call
+		let messages_proof =
+			prepare_messages_proof(vec![message(latest_received_nonce + 1, REGULAR_PAYLOAD)], None);
+		assert_noop!(
+			Pallet::<TestRuntime>::receive_messages_proof(
+				RuntimeOrigin::signed(1),
+				TEST_RELAYER_A,
+				messages_proof,
+				1,
+				REGULAR_PAYLOAD.declared_weight,
+			),
+			Error::<TestRuntime, ()>::LanesManager(LanesManagerError::LaneDispatcherInactive)
+		);
+		assert!(!TestMessageDispatch::is_active(test_lane_id()));
+	});
+}
+
+#[test]
+fn receive_messages_succeeds_when_dispatcher_becomes_inactive_in_the_middle_of_transaction() {
+	run_test(|| {
+		// "enqueue" enough (to deactivate dispatcher) messages at dispatcher
+		let latest_received_nonce = BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX / 2;
+		for _ in 1..=latest_received_nonce {
+			TestMessageDispatch::emulate_enqueued_message(test_lane_id());
+		}
+		assert!(TestMessageDispatch::is_active(test_lane_id()));
+		InboundLanes::<TestRuntime, ()>::insert(
+			test_lane_id(),
+			InboundLaneData {
+				state: LaneState::Opened,
+				last_confirmed_nonce: latest_received_nonce,
+				relayers: vec![].into(),
+			},
+		);
+
+		// try to delvier next `BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX` messages
+		// - it will lead to dispatcher deactivation, but the transaction shall not fail and all
+		// messages must be delivered
+		let messages_begin = latest_received_nonce + 1;
+		let messages_end =
+			messages_begin + BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX;
+		let messages_range = messages_begin..messages_end;
+		let messages_count = BridgedChain::MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX;
+		assert_ok!(Pallet::<TestRuntime>::receive_messages_proof(
+			RuntimeOrigin::signed(1),
+			TEST_RELAYER_A,
+			prepare_messages_proof(
+				messages_range.map(|nonce| message(nonce, REGULAR_PAYLOAD)).collect(),
+				None,
+			),
+			messages_count as _,
+			REGULAR_PAYLOAD.declared_weight * messages_count,
+		),);
+		assert_eq!(
+			inbound_unrewarded_relayers_state(test_lane_id()).last_delivered_nonce,
+			messages_end - 1,
+		);
+		assert!(!TestMessageDispatch::is_active(test_lane_id()));
+	});
+}
+
 #[test]
 fn receive_messages_proof_does_not_accept_message_if_dispatch_weight_is_not_enough() {
 	run_test(|| {
diff --git a/bridges/modules/xcm-bridge-hub-router/Cargo.toml b/bridges/modules/xcm-bridge-hub-router/Cargo.toml
index ec7c3b56283..1cda8893967 100644
--- a/bridges/modules/xcm-bridge-hub-router/Cargo.toml
+++ b/bridges/modules/xcm-bridge-hub-router/Cargo.toml
@@ -16,11 +16,10 @@ log = { workspace = true }
 scale-info = { features = ["bit-vec", "derive", "serde"], workspace = true }
 
 # Bridge dependencies
-
 bp-xcm-bridge-hub-router = { workspace = true }
+bp-xcm-bridge-hub = { workspace = true }
 
 # Substrate Dependencies
-
 frame-benchmarking = { optional = true, workspace = true }
 frame-support = { workspace = true }
 frame-system = { workspace = true }
@@ -29,7 +28,6 @@ sp-runtime = { workspace = true }
 sp-std = { workspace = true }
 
 # Polkadot Dependencies
-
 xcm = { workspace = true }
 xcm-builder = { workspace = true }
 
@@ -41,6 +39,7 @@ sp-std = { workspace = true, default-features = true }
 default = ["std"]
 std = [
 	"bp-xcm-bridge-hub-router/std",
+	"bp-xcm-bridge-hub/std",
 	"codec/std",
 	"frame-benchmarking/std",
 	"frame-support/std",
diff --git a/bridges/modules/xcm-bridge-hub-router/src/benchmarking.rs b/bridges/modules/xcm-bridge-hub-router/src/benchmarking.rs
index c4f9f534c1a..3c4a10f82e7 100644
--- a/bridges/modules/xcm-bridge-hub-router/src/benchmarking.rs
+++ b/bridges/modules/xcm-bridge-hub-router/src/benchmarking.rs
@@ -18,11 +18,9 @@
 
 #![cfg(feature = "runtime-benchmarks")]
 
-use crate::{Bridge, Call};
-
-use bp_xcm_bridge_hub_router::{BridgeState, MINIMAL_DELIVERY_FEE_FACTOR};
+use crate::{DeliveryFeeFactor, MINIMAL_DELIVERY_FEE_FACTOR};
 use frame_benchmarking::{benchmarks_instance_pallet, BenchmarkError};
-use frame_support::traits::{EnsureOrigin, Get, Hooks, UnfilteredDispatchable};
+use frame_support::traits::{Get, Hooks};
 use sp_runtime::traits::Zero;
 use xcm::prelude::*;
 
@@ -47,49 +45,16 @@ pub trait Config<I: 'static>: crate::Config<I> {
 
 benchmarks_instance_pallet! {
 	on_initialize_when_non_congested {
-		Bridge::<T, I>::put(BridgeState {
-			is_congested: false,
-			delivery_fee_factor: MINIMAL_DELIVERY_FEE_FACTOR + MINIMAL_DELIVERY_FEE_FACTOR,
-		});
+		DeliveryFeeFactor::<T, I>::put(MINIMAL_DELIVERY_FEE_FACTOR + MINIMAL_DELIVERY_FEE_FACTOR);
 	}: {
 		crate::Pallet::<T, I>::on_initialize(Zero::zero())
 	}
 
 	on_initialize_when_congested {
-		Bridge::<T, I>::put(BridgeState {
-			is_congested: false,
-			delivery_fee_factor: MINIMAL_DELIVERY_FEE_FACTOR + MINIMAL_DELIVERY_FEE_FACTOR,
-		});
-
+		DeliveryFeeFactor::<T, I>::put(MINIMAL_DELIVERY_FEE_FACTOR + MINIMAL_DELIVERY_FEE_FACTOR);
 		let _ = T::ensure_bridged_target_destination()?;
 		T::make_congested();
 	}: {
 		crate::Pallet::<T, I>::on_initialize(Zero::zero())
 	}
-
-	report_bridge_status {
-		Bridge::<T, I>::put(BridgeState::default());
-
-		let origin: T::RuntimeOrigin = T::BridgeHubOrigin::try_successful_origin().expect("expected valid BridgeHubOrigin");
-		let bridge_id = Default::default();
-		let is_congested = true;
-
-		let call = Call::<T, I>::report_bridge_status { bridge_id, is_congested };
-	}: { call.dispatch_bypass_filter(origin)? }
-	verify {
-		assert!(Bridge::<T, I>::get().is_congested);
-	}
-
-	send_message {
-		let dest = T::ensure_bridged_target_destination()?;
-		let xcm = sp_std::vec![].into();
-
-		// make local queue congested, because it means additional db write
-		T::make_congested();
-	}: {
-		send_xcm::<crate::Pallet<T, I>>(dest, xcm).expect("message is sent")
-	}
-	verify {
-		assert!(Bridge::<T, I>::get().delivery_fee_factor > MINIMAL_DELIVERY_FEE_FACTOR);
-	}
 }
diff --git a/bridges/modules/xcm-bridge-hub-router/src/lib.rs b/bridges/modules/xcm-bridge-hub-router/src/lib.rs
index 60739460346..89474bd53fc 100644
--- a/bridges/modules/xcm-bridge-hub-router/src/lib.rs
+++ b/bridges/modules/xcm-bridge-hub-router/src/lib.rs
@@ -30,12 +30,9 @@
 
 #![cfg_attr(not(feature = "std"), no_std)]
 
-use bp_xcm_bridge_hub_router::{
-	BridgeState, XcmChannelStatusProvider, MINIMAL_DELIVERY_FEE_FACTOR,
-};
+use bp_xcm_bridge_hub::LocalXcmChannelManager;
 use codec::Encode;
 use frame_support::traits::Get;
-use sp_core::H256;
 use sp_runtime::{FixedPointNumber, FixedU128, Saturating};
 use sp_std::vec::Vec;
 use xcm::prelude::*;
@@ -49,6 +46,9 @@ pub mod weights;
 
 mod mock;
 
+/// Minimal delivery fee factor.
+pub const MINIMAL_DELIVERY_FEE_FACTOR: FixedU128 = FixedU128::from_u32(1);
+
 /// The factor that is used to increase current message fee factor when bridge experiencing
 /// some lags.
 const EXPONENTIAL_FEE_BASE: FixedU128 = FixedU128::from_rational(105, 100); // 1.05
@@ -82,6 +82,8 @@ pub mod pallet {
 
 		/// Universal location of this runtime.
 		type UniversalLocation: Get<InteriorLocation>;
+		/// Relative location of the supported sibling bridge hub.
+		type SiblingBridgeHubLocation: Get<Location>;
 		/// The bridged network that this config is for if specified.
 		/// Also used for filtering `Bridges` by `BridgedNetworkId`.
 		/// If not specified, allows all networks pass through.
@@ -97,9 +99,8 @@ pub mod pallet {
 		type BridgeHubOrigin: EnsureOrigin<Self::RuntimeOrigin>;
 		/// Actual message sender (`HRMP` or `DMP`) to the sibling bridge hub location.
 		type ToBridgeHubSender: SendXcm + InspectMessageQueues;
-		/// Underlying channel with the sibling bridge hub. It must match the channel, used
-		/// by the `Self::ToBridgeHubSender`.
-		type WithBridgeHubChannel: XcmChannelStatusProvider;
+		/// Local XCM channel manager.
+		type LocalXcmChannelManager: LocalXcmChannelManager;
 
 		/// Additional fee that is paid for every byte of the outbound message.
 		type ByteFee: Get<u128>;
@@ -113,115 +114,95 @@ pub mod pallet {
 	#[pallet::hooks]
 	impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I> {
 		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
-			// TODO: make sure that `WithBridgeHubChannel::is_congested` returns true if either
-			// of XCM channels (outbound/inbound) is suspended. Because if outbound is suspended
-			// that is definitely congestion. If inbound is suspended, then we are not able to
-			// receive the "report_bridge_status" signal (that maybe sent by the bridge hub).
-
-			// if the channel with sibling/child bridge hub is suspended, we don't change
-			// anything
-			if T::WithBridgeHubChannel::is_congested() {
+			// if XCM channel is still congested, we don't change anything
+			if T::LocalXcmChannelManager::is_congested(&T::SiblingBridgeHubLocation::get()) {
 				return T::WeightInfo::on_initialize_when_congested()
 			}
 
-			// if bridge has reported congestion, we don't change anything
-			let mut bridge = Self::bridge();
-			if bridge.is_congested {
+			// if we can't decrease the delivery fee factor anymore, we don't change anything
+			let mut delivery_fee_factor = Self::delivery_fee_factor();
+			if delivery_fee_factor == MINIMAL_DELIVERY_FEE_FACTOR {
 				return T::WeightInfo::on_initialize_when_congested()
 			}
 
-			// if fee factor is already minimal, we don't change anything
-			if bridge.delivery_fee_factor == MINIMAL_DELIVERY_FEE_FACTOR {
-				return T::WeightInfo::on_initialize_when_congested()
-			}
-
-			let previous_factor = bridge.delivery_fee_factor;
-			bridge.delivery_fee_factor =
-				MINIMAL_DELIVERY_FEE_FACTOR.max(bridge.delivery_fee_factor / EXPONENTIAL_FEE_BASE);
+			let previous_factor = delivery_fee_factor;
+			delivery_fee_factor =
+				MINIMAL_DELIVERY_FEE_FACTOR.max(delivery_fee_factor / EXPONENTIAL_FEE_BASE);
 			log::info!(
 				target: LOG_TARGET,
-				"Bridge queue is uncongested. Decreased fee factor from {} to {}",
+				"Bridge channel is uncongested. Decreased fee factor from {} to {}",
 				previous_factor,
-				bridge.delivery_fee_factor,
+				delivery_fee_factor,
 			);
+			DeliveryFeeFactor::<T, I>::put(delivery_fee_factor);
 
-			Bridge::<T, I>::put(bridge);
 			T::WeightInfo::on_initialize_when_non_congested()
 		}
 	}
 
-	#[pallet::call]
-	impl<T: Config<I>, I: 'static> Pallet<T, I> {
-		/// Notification about congested bridge queue.
-		#[pallet::call_index(0)]
-		#[pallet::weight(T::WeightInfo::report_bridge_status())]
-		pub fn report_bridge_status(
-			origin: OriginFor<T>,
-			// this argument is not currently used, but to ease future migration, we'll keep it
-			// here
-			bridge_id: H256,
-			is_congested: bool,
-		) -> DispatchResult {
-			let _ = T::BridgeHubOrigin::ensure_origin(origin)?;
-
-			log::info!(
-				target: LOG_TARGET,
-				"Received bridge status from {:?}: congested = {}",
-				bridge_id,
-				is_congested,
-			);
-
-			Bridge::<T, I>::mutate(|bridge| {
-				bridge.is_congested = is_congested;
-			});
-			Ok(())
-		}
+	/// Initialization value for the delivery fee factor.
+	#[pallet::type_value]
+	pub fn InitialFactor() -> FixedU128 {
+		MINIMAL_DELIVERY_FEE_FACTOR
 	}
 
-	/// Bridge that we are using.
+	/// The number to multiply the base delivery fee by.
+	///
+	/// This factor is shared by all bridges, served by this pallet. For example, if this
+	/// chain (`Config::UniversalLocation`) opens two bridges (
+	/// `X2(GlobalConsensus(Config::BridgedNetworkId::get()), Parachain(1000))` and
+	/// `X2(GlobalConsensus(Config::BridgedNetworkId::get()), Parachain(2000))`), then they
+	/// both will be sharing the same fee factor. This is because both bridges are sharing
+	/// the same local XCM channel with the child/sibling bridge hub, which we are using
+	/// to detect congestion:
 	///
-	/// **bridges-v1** assumptions: all outbound messages through this router are using single lane
-	/// and to single remote consensus. If there is some other remote consensus that uses the same
-	/// bridge hub, the separate pallet instance shall be used, In `v2` we'll have all required
-	/// primitives (lane-id aka bridge-id, derived from XCM locations) to support multiple  bridges
-	/// by the same pallet instance.
+	/// ```nocompile
+	///  ThisChain --- Local XCM chanel --> Sibling Bridge Hub ------
+	///                                            |                   |
+	///                                            |                   |
+	///                                            |                   |
+	///                                          Lane1               Lane2
+	///                                            |                   |
+	///                                            |                   |
+	///                                            |                   |
+	///                                           \ /                  |
+	///  Parachain1  <-- Local XCM channel --- Remote Bridge Hub <------
+	///                                            |
+	///                                            |
+	///  Parachain1  <-- Local XCM channel ---------
+	/// ```
+	///
+	/// If at least one of other channels is congested, the local XCM channel with sibling
+	/// bridge hub eventually becomes congested too. And we have no means to detect - which
+	/// bridge exactly causes the congestion. So the best solution here is not to make
+	/// any differences between all bridges, started by this chain.
 	#[pallet::storage]
-	#[pallet::getter(fn bridge)]
-	pub type Bridge<T: Config<I>, I: 'static = ()> = StorageValue<_, BridgeState, ValueQuery>;
+	#[pallet::getter(fn delivery_fee_factor)]
+	pub type DeliveryFeeFactor<T: Config<I>, I: 'static = ()> =
+		StorageValue<_, FixedU128, ValueQuery, InitialFactor>;
 
 	impl<T: Config<I>, I: 'static> Pallet<T, I> {
 		/// Called when new message is sent (queued to local outbound XCM queue) over the bridge.
 		pub(crate) fn on_message_sent_to_bridge(message_size: u32) {
-			log::trace!(
-				target: LOG_TARGET,
-				"on_message_sent_to_bridge - message_size: {message_size:?}",
-			);
-			let _ = Bridge::<T, I>::try_mutate(|bridge| {
-				let is_channel_with_bridge_hub_congested = T::WithBridgeHubChannel::is_congested();
-				let is_bridge_congested = bridge.is_congested;
-
-				// if outbound queue is not congested AND bridge has not reported congestion, do
-				// nothing
-				if !is_channel_with_bridge_hub_congested && !is_bridge_congested {
-					return Err(())
-				}
-
-				// ok - we need to increase the fee factor, let's do that
-				let message_size_factor = FixedU128::from_u32(message_size.saturating_div(1024))
-					.saturating_mul(MESSAGE_SIZE_FEE_BASE);
-				let total_factor = EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor);
-				let previous_factor = bridge.delivery_fee_factor;
-				bridge.delivery_fee_factor =
-					bridge.delivery_fee_factor.saturating_mul(total_factor);
+			// if outbound channel is not congested, do nothing
+			if !T::LocalXcmChannelManager::is_congested(&T::SiblingBridgeHubLocation::get()) {
+				return
+			}
 
+			// ok - we need to increase the fee factor, let's do that
+			let message_size_factor = FixedU128::from_u32(message_size.saturating_div(1024))
+				.saturating_mul(MESSAGE_SIZE_FEE_BASE);
+			let total_factor = EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor);
+			DeliveryFeeFactor::<T, I>::mutate(|f| {
+				let previous_factor = *f;
+				*f = f.saturating_mul(total_factor);
 				log::info!(
 					target: LOG_TARGET,
 					"Bridge channel is congested. Increased fee factor from {} to {}",
 					previous_factor,
-					bridge.delivery_fee_factor,
+					f,
 				);
-
-				Ok(())
+				*f
 			});
 		}
 	}
@@ -259,17 +240,25 @@ impl<T: Config<I>, I: 'static> ExporterFor for Pallet<T, I> {
 		}
 
 		// ensure that the message is sent to the expected bridged network and location.
-		let Some((bridge_hub_location, maybe_payment)) =
-			T::Bridges::exporter_for(network, remote_location, message)
-		else {
-			log::trace!(
-				target: LOG_TARGET,
-				"Router with bridged_network_id {:?} does not support bridging to network {:?} and remote_location {:?}!",
-				T::BridgedNetworkId::get(),
-				network,
-				remote_location,
-			);
-			return None
+		let (bridge_hub_location, maybe_payment) = match T::Bridges::exporter_for(
+			network,
+			remote_location,
+			message,
+		) {
+			Some((bridge_hub_location, maybe_payment))
+				if bridge_hub_location.eq(&T::SiblingBridgeHubLocation::get()) =>
+				(bridge_hub_location, maybe_payment),
+			_ => {
+				log::trace!(
+					target: LOG_TARGET,
+					"Router configured with bridged_network_id {:?} and sibling_bridge_hub_location: {:?} does not support bridging to network {:?} and remote_location {:?}!",
+					T::BridgedNetworkId::get(),
+					T::SiblingBridgeHubLocation::get(),
+					network,
+					remote_location,
+				);
+				return None
+			},
 		};
 
 		// take `base_fee` from `T::Brides`, but it has to be the same `T::FeeAsset`
@@ -279,8 +268,8 @@ impl<T: Config<I>, I: 'static> ExporterFor for Pallet<T, I> {
 				invalid_asset => {
 					log::error!(
 						target: LOG_TARGET,
-						"Router with bridged_network_id {:?} is configured for `T::FeeAsset` {:?} which is not \
-						compatible with {:?} for bridge_hub_location: {:?} for bridging to {:?}/{:?}!",
+						"Router with bridged_network_id {:?} is configured for `T::FeeAsset` {:?} \
+						which is not compatible with {:?} for bridge_hub_location: {:?} for bridging to {:?}/{:?}!",
 						T::BridgedNetworkId::get(),
 						T::FeeAsset::get(),
 						invalid_asset,
@@ -300,18 +289,18 @@ impl<T: Config<I>, I: 'static> ExporterFor for Pallet<T, I> {
 		let message_size = message.encoded_size();
 		let message_fee = (message_size as u128).saturating_mul(T::ByteFee::get());
 		let fee_sum = base_fee.saturating_add(message_fee);
-		let fee_factor = Self::bridge().delivery_fee_factor;
-		let fee = fee_factor.saturating_mul_int(fee_sum);
 
+		let fee_factor = Self::delivery_fee_factor();
+		let fee = fee_factor.saturating_mul_int(fee_sum);
 		let fee = if fee > 0 { Some((T::FeeAsset::get(), fee).into()) } else { None };
 
 		log::info!(
 			target: LOG_TARGET,
-			"Validate send message to {:?} ({} bytes) over bridge. Computed bridge fee {:?} using fee factor {}",
+			"Going to send message to {:?} ({} bytes) over bridge. Computed bridge fee {:?} using fee factor {}",
 			(network, remote_location),
 			message_size,
 			fee,
-			fee_factor
+			fee_factor,
 		);
 
 		Some((bridge_hub_location, fee))
@@ -412,65 +401,39 @@ mod tests {
 	use frame_support::traits::Hooks;
 	use sp_runtime::traits::One;
 
-	fn congested_bridge(delivery_fee_factor: FixedU128) -> BridgeState {
-		BridgeState { is_congested: true, delivery_fee_factor }
-	}
-
-	fn uncongested_bridge(delivery_fee_factor: FixedU128) -> BridgeState {
-		BridgeState { is_congested: false, delivery_fee_factor }
-	}
-
 	#[test]
 	fn initial_fee_factor_is_one() {
 		run_test(|| {
-			assert_eq!(
-				Bridge::<TestRuntime, ()>::get(),
-				uncongested_bridge(MINIMAL_DELIVERY_FEE_FACTOR),
-			);
+			assert_eq!(DeliveryFeeFactor::<TestRuntime, ()>::get(), MINIMAL_DELIVERY_FEE_FACTOR);
 		})
 	}
 
 	#[test]
 	fn fee_factor_is_not_decreased_from_on_initialize_when_xcm_channel_is_congested() {
 		run_test(|| {
-			Bridge::<TestRuntime, ()>::put(uncongested_bridge(FixedU128::from_rational(125, 100)));
-			TestWithBridgeHubChannel::make_congested();
-
-			// it should not decrease, because xcm channel is congested
-			let old_bridge = XcmBridgeHubRouter::bridge();
-			XcmBridgeHubRouter::on_initialize(One::one());
-			assert_eq!(XcmBridgeHubRouter::bridge(), old_bridge);
-		})
-	}
-
-	#[test]
-	fn fee_factor_is_not_decreased_from_on_initialize_when_bridge_has_reported_congestion() {
-		run_test(|| {
-			Bridge::<TestRuntime, ()>::put(congested_bridge(FixedU128::from_rational(125, 100)));
+			DeliveryFeeFactor::<TestRuntime, ()>::put(FixedU128::from_rational(125, 100));
+			TestLocalXcmChannelManager::make_congested(&SiblingBridgeHubLocation::get());
 
-			// it should not decrease, because bridge congested
-			let old_bridge = XcmBridgeHubRouter::bridge();
+			// it should not decrease, because queue is congested
+			let old_delivery_fee_factor = XcmBridgeHubRouter::delivery_fee_factor();
 			XcmBridgeHubRouter::on_initialize(One::one());
-			assert_eq!(XcmBridgeHubRouter::bridge(), old_bridge);
+			assert_eq!(XcmBridgeHubRouter::delivery_fee_factor(), old_delivery_fee_factor);
 		})
 	}
 
 	#[test]
 	fn fee_factor_is_decreased_from_on_initialize_when_xcm_channel_is_uncongested() {
 		run_test(|| {
-			Bridge::<TestRuntime, ()>::put(uncongested_bridge(FixedU128::from_rational(125, 100)));
+			DeliveryFeeFactor::<TestRuntime, ()>::put(FixedU128::from_rational(125, 100));
 
-			// it should eventually decreased to one
-			while XcmBridgeHubRouter::bridge().delivery_fee_factor > MINIMAL_DELIVERY_FEE_FACTOR {
+			// it shold eventually decreased to one
+			while XcmBridgeHubRouter::delivery_fee_factor() > MINIMAL_DELIVERY_FEE_FACTOR {
 				XcmBridgeHubRouter::on_initialize(One::one());
 			}
 
 			// verify that it doesn't decreases anymore
 			XcmBridgeHubRouter::on_initialize(One::one());
-			assert_eq!(
-				XcmBridgeHubRouter::bridge(),
-				uncongested_bridge(MINIMAL_DELIVERY_FEE_FACTOR)
-			);
+			assert_eq!(XcmBridgeHubRouter::delivery_fee_factor(), MINIMAL_DELIVERY_FEE_FACTOR);
 		})
 	}
 
@@ -577,7 +540,7 @@ mod tests {
 			// but when factor is larger than one, it increases the fee, so it becomes:
 			// `(BASE_FEE + BYTE_FEE * msg_size) * F + HRMP_FEE`
 			let factor = FixedU128::from_rational(125, 100);
-			Bridge::<TestRuntime, ()>::put(uncongested_bridge(factor));
+			DeliveryFeeFactor::<TestRuntime, ()>::put(factor);
 			let expected_fee =
 				(FixedU128::saturating_from_integer(BASE_FEE + BYTE_FEE * (msg_size as u128)) *
 					factor)
@@ -591,45 +554,29 @@ mod tests {
 	}
 
 	#[test]
-	fn sent_message_doesnt_increase_factor_if_xcm_channel_is_uncongested() {
+	fn sent_message_doesnt_increase_factor_if_queue_is_uncongested() {
 		run_test(|| {
-			let old_bridge = XcmBridgeHubRouter::bridge();
-			assert_ok!(send_xcm::<XcmBridgeHubRouter>(
-				Location::new(2, [GlobalConsensus(BridgedNetworkId::get()), Parachain(1000)]),
-				vec![ClearOrigin].into(),
-			)
-			.map(drop));
+			let old_delivery_fee_factor = XcmBridgeHubRouter::delivery_fee_factor();
+			assert_eq!(
+				send_xcm::<XcmBridgeHubRouter>(
+					Location::new(2, [GlobalConsensus(BridgedNetworkId::get()), Parachain(1000)]),
+					vec![ClearOrigin].into(),
+				)
+				.map(drop),
+				Ok(()),
+			);
 
 			assert!(TestToBridgeHubSender::is_message_sent());
-			assert_eq!(old_bridge, XcmBridgeHubRouter::bridge());
+			assert_eq!(old_delivery_fee_factor, XcmBridgeHubRouter::delivery_fee_factor());
 		});
 	}
 
 	#[test]
 	fn sent_message_increases_factor_if_xcm_channel_is_congested() {
 		run_test(|| {
-			TestWithBridgeHubChannel::make_congested();
-
-			let old_bridge = XcmBridgeHubRouter::bridge();
-			assert_ok!(send_xcm::<XcmBridgeHubRouter>(
-				Location::new(2, [GlobalConsensus(BridgedNetworkId::get()), Parachain(1000)]),
-				vec![ClearOrigin].into(),
-			)
-			.map(drop));
-
-			assert!(TestToBridgeHubSender::is_message_sent());
-			assert!(
-				old_bridge.delivery_fee_factor < XcmBridgeHubRouter::bridge().delivery_fee_factor
-			);
-		});
-	}
-
-	#[test]
-	fn sent_message_increases_factor_if_bridge_has_reported_congestion() {
-		run_test(|| {
-			Bridge::<TestRuntime, ()>::put(congested_bridge(MINIMAL_DELIVERY_FEE_FACTOR));
+			TestLocalXcmChannelManager::make_congested(&SiblingBridgeHubLocation::get());
 
-			let old_bridge = XcmBridgeHubRouter::bridge();
+			let old_delivery_fee_factor = XcmBridgeHubRouter::delivery_fee_factor();
 			assert_ok!(send_xcm::<XcmBridgeHubRouter>(
 				Location::new(2, [GlobalConsensus(BridgedNetworkId::get()), Parachain(1000)]),
 				vec![ClearOrigin].into(),
@@ -637,9 +584,7 @@ mod tests {
 			.map(drop));
 
 			assert!(TestToBridgeHubSender::is_message_sent());
-			assert!(
-				old_bridge.delivery_fee_factor < XcmBridgeHubRouter::bridge().delivery_fee_factor
-			);
+			assert!(old_delivery_fee_factor < XcmBridgeHubRouter::delivery_fee_factor());
 		});
 	}
 
diff --git a/bridges/modules/xcm-bridge-hub-router/src/mock.rs b/bridges/modules/xcm-bridge-hub-router/src/mock.rs
index 3e2c1bb369c..e829aae5e7d 100644
--- a/bridges/modules/xcm-bridge-hub-router/src/mock.rs
+++ b/bridges/modules/xcm-bridge-hub-router/src/mock.rs
@@ -18,7 +18,7 @@
 
 use crate as pallet_xcm_bridge_hub_router;
 
-use bp_xcm_bridge_hub_router::XcmChannelStatusProvider;
+use bp_xcm_bridge_hub::{BridgeId, LocalXcmChannelManager};
 use codec::Encode;
 use frame_support::{
 	construct_runtime, derive_impl, parameter_types,
@@ -75,6 +75,7 @@ impl pallet_xcm_bridge_hub_router::Config<()> for TestRuntime {
 	type WeightInfo = ();
 
 	type UniversalLocation = UniversalLocation;
+	type SiblingBridgeHubLocation = SiblingBridgeHubLocation;
 	type BridgedNetworkId = BridgedNetworkId;
 	type Bridges = NetworkExportTable<BridgeTable>;
 	type DestinationVersion =
@@ -82,7 +83,7 @@ impl pallet_xcm_bridge_hub_router::Config<()> for TestRuntime {
 
 	type BridgeHubOrigin = EnsureRoot<AccountId>;
 	type ToBridgeHubSender = TestToBridgeHubSender;
-	type WithBridgeHubChannel = TestWithBridgeHubChannel;
+	type LocalXcmChannelManager = TestLocalXcmChannelManager;
 
 	type ByteFee = ConstU128<BYTE_FEE>;
 	type FeeAsset = BridgeFeeAsset;
@@ -147,17 +148,32 @@ impl InspectMessageQueues for TestToBridgeHubSender {
 	}
 }
 
-pub struct TestWithBridgeHubChannel;
+pub struct TestLocalXcmChannelManager;
 
-impl TestWithBridgeHubChannel {
-	pub fn make_congested() {
-		frame_support::storage::unhashed::put(b"TestWithBridgeHubChannel.Congested", &true);
+impl TestLocalXcmChannelManager {
+	pub fn make_congested(with: &Location) {
+		frame_support::storage::unhashed::put(
+			&(b"TestLocalXcmChannelManager.Congested", with).encode()[..],
+			&true,
+		);
 	}
 }
 
-impl XcmChannelStatusProvider for TestWithBridgeHubChannel {
-	fn is_congested() -> bool {
-		frame_support::storage::unhashed::get_or_default(b"TestWithBridgeHubChannel.Congested")
+impl LocalXcmChannelManager for TestLocalXcmChannelManager {
+	type Error = ();
+
+	fn is_congested(with: &Location) -> bool {
+		frame_support::storage::unhashed::get_or_default(
+			&(b"TestLocalXcmChannelManager.Congested", with).encode()[..],
+		)
+	}
+
+	fn suspend_bridge(_with: &Location, _bridge: BridgeId) -> Result<(), Self::Error> {
+		Ok(())
+	}
+
+	fn resume_bridge(_with: &Location, _bridge: BridgeId) -> Result<(), Self::Error> {
+		Ok(())
 	}
 }
 
diff --git a/bridges/modules/xcm-bridge-hub-router/src/weights.rs b/bridges/modules/xcm-bridge-hub-router/src/weights.rs
index b0c8fc6252c..d9a0426feca 100644
--- a/bridges/modules/xcm-bridge-hub-router/src/weights.rs
+++ b/bridges/modules/xcm-bridge-hub-router/src/weights.rs
@@ -52,8 +52,6 @@ use sp_std::marker::PhantomData;
 pub trait WeightInfo {
 	fn on_initialize_when_non_congested() -> Weight;
 	fn on_initialize_when_congested() -> Weight;
-	fn report_bridge_status() -> Weight;
-	fn send_message() -> Weight;
 }
 
 /// Weights for `pallet_xcm_bridge_hub_router` that are generated using one of the Bridge testnets.
@@ -61,30 +59,20 @@ pub trait WeightInfo {
 /// Those weights are test only and must never be used in production.
 pub struct BridgeWeight<T>(PhantomData<T>);
 impl<T: frame_system::Config> WeightInfo for BridgeWeight<T> {
-	/// Storage: `XcmBridgeHubRouter::Bridge` (r:1 w:1)
 	///
-	/// Proof: `XcmBridgeHubRouter::Bridge` (`max_values`: Some(1), `max_size`: Some(17), added:
-	/// 512, mode: `MaxEncodedLen`)
+	/// Storage: `XcmBridgeHubRouter::DeliveryFeeFactor` (r:1 w:1)
 	///
-	/// Storage: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765`
-	/// (r:1 w:0)
-	///
-	/// Proof: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765` (r:1
-	/// w:0)
+	/// Proof: `XcmBridgeHubRouter::DeliveryFeeFactor` (`max_values`: Some(1), `max_size`: Some(16),
+	/// added: 511, mode: `MaxEncodedLen`)
 	fn on_initialize_when_non_congested() -> Weight {
 		// Proof Size summary in bytes:
-		//  Measured:  `53`
-		//  Estimated: `3518`
-		// Minimum execution time: 11_934 nanoseconds.
-		Weight::from_parts(12_201_000, 3518)
+		//  Measured:  `52`
+		//  Estimated: `3517`
+		// Minimum execution time: 11_141 nanoseconds.
+		Weight::from_parts(11_339_000, 3517)
 			.saturating_add(T::DbWeight::get().reads(2_u64))
 			.saturating_add(T::DbWeight::get().writes(1_u64))
 	}
-	/// Storage: `XcmBridgeHubRouter::Bridge` (r:1 w:1)
-	///
-	/// Proof: `XcmBridgeHubRouter::Bridge` (`max_values`: Some(1), `max_size`: Some(17), added:
-	/// 512, mode: `MaxEncodedLen`)
-	///
 	/// Storage: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765`
 	/// (r:1 w:0)
 	///
@@ -92,117 +80,44 @@ impl<T: frame_system::Config> WeightInfo for BridgeWeight<T> {
 	/// w:0)
 	fn on_initialize_when_congested() -> Weight {
 		// Proof Size summary in bytes:
-		//  Measured:  `94`
-		//  Estimated: `3559`
-		// Minimum execution time: 9_010 nanoseconds.
-		Weight::from_parts(9_594_000, 3559)
-			.saturating_add(T::DbWeight::get().reads(2_u64))
-			.saturating_add(T::DbWeight::get().writes(1_u64))
-	}
-	/// Storage: `XcmBridgeHubRouter::Bridge` (r:1 w:1)
-	///
-	/// Proof: `XcmBridgeHubRouter::Bridge` (`max_values`: Some(1), `max_size`: Some(17), added:
-	/// 512, mode: `MaxEncodedLen`)
-	fn report_bridge_status() -> Weight {
-		// Proof Size summary in bytes:
-		//  Measured:  `53`
-		//  Estimated: `1502`
-		// Minimum execution time: 10_427 nanoseconds.
-		Weight::from_parts(10_682_000, 1502)
-			.saturating_add(T::DbWeight::get().reads(1_u64))
-			.saturating_add(T::DbWeight::get().writes(1_u64))
-	}
-	/// Storage: `XcmBridgeHubRouter::Bridge` (r:1 w:1)
-	///
-	/// Proof: `XcmBridgeHubRouter::Bridge` (`max_values`: Some(1), `max_size`: Some(17), added:
-	/// 512, mode: `MaxEncodedLen`)
-	///
-	/// Storage: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765`
-	/// (r:1 w:0)
-	///
-	/// Proof: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765` (r:1
-	/// w:0)
-	fn send_message() -> Weight {
-		// Proof Size summary in bytes:
-		//  Measured:  `52`
-		//  Estimated: `3517`
-		// Minimum execution time: 19_709 nanoseconds.
-		Weight::from_parts(20_110_000, 3517)
-			.saturating_add(T::DbWeight::get().reads(2_u64))
-			.saturating_add(T::DbWeight::get().writes(1_u64))
+		//  Measured:  `82`
+		//  Estimated: `3547`
+		// Minimum execution time: 4_239 nanoseconds.
+		Weight::from_parts(4_383_000, 3547).saturating_add(T::DbWeight::get().reads(1_u64))
 	}
 }
 
 // For backwards compatibility and tests
 impl WeightInfo for () {
-	/// Storage: `XcmBridgeHubRouter::Bridge` (r:1 w:1)
-	///
-	/// Proof: `XcmBridgeHubRouter::Bridge` (`max_values`: Some(1), `max_size`: Some(17), added:
-	/// 512, mode: `MaxEncodedLen`)
-	///
 	/// Storage: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765`
 	/// (r:1 w:0)
 	///
 	/// Proof: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765` (r:1
 	/// w:0)
-	fn on_initialize_when_non_congested() -> Weight {
-		// Proof Size summary in bytes:
-		//  Measured:  `53`
-		//  Estimated: `3518`
-		// Minimum execution time: 11_934 nanoseconds.
-		Weight::from_parts(12_201_000, 3518)
-			.saturating_add(RocksDbWeight::get().reads(2_u64))
-			.saturating_add(RocksDbWeight::get().writes(1_u64))
-	}
-	/// Storage: `XcmBridgeHubRouter::Bridge` (r:1 w:1)
 	///
-	/// Proof: `XcmBridgeHubRouter::Bridge` (`max_values`: Some(1), `max_size`: Some(17), added:
-	/// 512, mode: `MaxEncodedLen`)
+	/// Storage: `XcmBridgeHubRouter::DeliveryFeeFactor` (r:1 w:1)
 	///
-	/// Storage: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765`
-	/// (r:1 w:0)
-	///
-	/// Proof: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765` (r:1
-	/// w:0)
-	fn on_initialize_when_congested() -> Weight {
+	/// Proof: `XcmBridgeHubRouter::DeliveryFeeFactor` (`max_values`: Some(1), `max_size`: Some(16),
+	/// added: 511, mode: `MaxEncodedLen`)
+	fn on_initialize_when_non_congested() -> Weight {
 		// Proof Size summary in bytes:
-		//  Measured:  `94`
-		//  Estimated: `3559`
-		// Minimum execution time: 9_010 nanoseconds.
-		Weight::from_parts(9_594_000, 3559)
+		//  Measured:  `52`
+		//  Estimated: `3517`
+		// Minimum execution time: 11_141 nanoseconds.
+		Weight::from_parts(11_339_000, 3517)
 			.saturating_add(RocksDbWeight::get().reads(2_u64))
 			.saturating_add(RocksDbWeight::get().writes(1_u64))
 	}
-	/// Storage: `XcmBridgeHubRouter::Bridge` (r:1 w:1)
-	///
-	/// Proof: `XcmBridgeHubRouter::Bridge` (`max_values`: Some(1), `max_size`: Some(17), added:
-	/// 512, mode: `MaxEncodedLen`)
-	fn report_bridge_status() -> Weight {
-		// Proof Size summary in bytes:
-		//  Measured:  `53`
-		//  Estimated: `1502`
-		// Minimum execution time: 10_427 nanoseconds.
-		Weight::from_parts(10_682_000, 1502)
-			.saturating_add(RocksDbWeight::get().reads(1_u64))
-			.saturating_add(RocksDbWeight::get().writes(1_u64))
-	}
-	/// Storage: `XcmBridgeHubRouter::Bridge` (r:1 w:1)
-	///
-	/// Proof: `XcmBridgeHubRouter::Bridge` (`max_values`: Some(1), `max_size`: Some(17), added:
-	/// 512, mode: `MaxEncodedLen`)
-	///
 	/// Storage: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765`
 	/// (r:1 w:0)
 	///
 	/// Proof: UNKNOWN KEY `0x456d756c617465645369626c696e6758636d704368616e6e656c2e436f6e6765` (r:1
 	/// w:0)
-	fn send_message() -> Weight {
+	fn on_initialize_when_congested() -> Weight {
 		// Proof Size summary in bytes:
-		//  Measured:  `52`
-		//  Estimated: `3517`
-		// Minimum execution time: 19_709 nanoseconds.
-		Weight::from_parts(20_110_000, 3517)
-			.saturating_add(RocksDbWeight::get().reads(2_u64))
-			.saturating_add(RocksDbWeight::get().writes(1_u64))
+		//  Measured:  `82`
+		//  Estimated: `3547`
+		// Minimum execution time: 4_239 nanoseconds.
+		Weight::from_parts(4_383_000, 3547).saturating_add(RocksDbWeight::get().reads(1_u64))
 	}
 }
diff --git a/bridges/modules/xcm-bridge-hub/Cargo.toml b/bridges/modules/xcm-bridge-hub/Cargo.toml
index e422b58f4fb..a1c623cb0b3 100644
--- a/bridges/modules/xcm-bridge-hub/Cargo.toml
+++ b/bridges/modules/xcm-bridge-hub/Cargo.toml
@@ -37,6 +37,7 @@ xcm-executor = { workspace = true }
 pallet-balances = { workspace = true }
 sp-io = { workspace = true }
 bp-runtime = { features = ["test-helpers"], workspace = true }
+bp-header-chain = { workspace = true }
 pallet-xcm-bridge-hub-router = { workspace = true }
 polkadot-parachain-primitives = { workspace = true }
 
diff --git a/bridges/modules/xcm-bridge-hub/src/dispatcher.rs b/bridges/modules/xcm-bridge-hub/src/dispatcher.rs
index 432896c25ac..f88a825e41b 100644
--- a/bridges/modules/xcm-bridge-hub/src/dispatcher.rs
+++ b/bridges/modules/xcm-bridge-hub/src/dispatcher.rs
@@ -18,16 +18,23 @@
 //! bridge messages dispatcher. Internally, it just forwards inbound blob to the
 //! XCM-level blob dispatcher, which pushes message to some other queue (e.g.
 //! to HRMP queue with the sibling target chain).
+//!
+//! This code is executed at the target bridge hub.
 
-use crate::{Config, Pallet, XcmAsPlainPayload, LOG_TARGET};
+use crate::{Config, Pallet, LOG_TARGET};
 
-use bp_messages::target_chain::{DispatchMessage, MessageDispatch};
+use bp_messages::{
+	target_chain::{DispatchMessage, MessageDispatch},
+	LaneId,
+};
 use bp_runtime::messages::MessageDispatchResult;
+use bp_xcm_bridge_hub::{BridgeId, LocalXcmChannelManager, XcmAsPlainPayload};
 use codec::{Decode, Encode};
 use frame_support::{weights::Weight, CloneNoBound, EqNoBound, PartialEqNoBound};
 use pallet_bridge_messages::{Config as BridgeMessagesConfig, WeightInfoExt};
 use scale_info::TypeInfo;
 use sp_runtime::SaturatedConversion;
+use xcm::prelude::*;
 use xcm_builder::{DispatchBlob, DispatchBlobError};
 
 /// Message dispatch result type for single message.
@@ -52,8 +59,12 @@ where
 	type DispatchPayload = XcmAsPlainPayload;
 	type DispatchLevelResult = XcmBlobMessageDispatchResult;
 
-	fn is_active() -> bool {
-		true
+	fn is_active(lane: LaneId) -> bool {
+		let bridge_id = BridgeId::from_lane_id(lane);
+		Pallet::<T, I>::bridge(bridge_id)
+			.and_then(|bridge| bridge.bridge_origin_relative_location.try_as().cloned().ok())
+			.map(|recipient: Location| !T::LocalXcmChannelManager::is_congested(&recipient))
+			.unwrap_or(false)
 	}
 
 	fn dispatch_weight(message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
@@ -105,3 +116,103 @@ where
 		MessageDispatchResult { unspent_weight: Weight::zero(), dispatch_level_result }
 	}
 }
+
+#[cfg(test)]
+mod tests {
+	use super::*;
+	use crate::{mock::*, Bridges};
+
+	use bp_messages::{target_chain::DispatchMessageData, MessageKey};
+	use bp_xcm_bridge_hub::{Bridge, BridgeState};
+
+	fn bridge_id() -> BridgeId {
+		BridgeId::from_lane_id(LaneId::new(1, 2))
+	}
+
+	fn run_test_with_opened_bridge(test: impl FnOnce()) {
+		run_test(|| {
+			Bridges::<TestRuntime, ()>::insert(
+				bridge_id(),
+				Bridge {
+					bridge_origin_relative_location: Box::new(Location::new(0, Here).into()),
+					state: BridgeState::Opened,
+					bridge_owner_account: [0u8; 32].into(),
+					reserve: 0,
+				},
+			);
+
+			test();
+		});
+	}
+
+	fn invalid_message() -> DispatchMessage<Vec<u8>> {
+		DispatchMessage {
+			key: MessageKey { lane_id: LaneId::new(1, 2), nonce: 1 },
+			data: DispatchMessageData { payload: Err(codec::Error::from("test")) },
+		}
+	}
+
+	fn valid_message() -> DispatchMessage<Vec<u8>> {
+		DispatchMessage {
+			key: MessageKey { lane_id: LaneId::new(1, 2), nonce: 1 },
+			data: DispatchMessageData { payload: Ok(vec![42]) },
+		}
+	}
+
+	#[test]
+	fn dispatcher_is_inactive_when_channel_with_target_chain_is_congested() {
+		run_test_with_opened_bridge(|| {
+			TestLocalXcmChannelManager::make_congested();
+			assert!(!XcmOverBridge::is_active(bridge_id().lane_id()));
+		});
+	}
+
+	#[test]
+	fn dispatcher_is_active_when_channel_with_target_chain_is_not_congested() {
+		run_test_with_opened_bridge(|| {
+			assert!(XcmOverBridge::is_active(bridge_id().lane_id()));
+		});
+	}
+
+	#[test]
+	fn dispatch_weight_is_zero_if_we_have_failed_to_decode_message() {
+		run_test(|| {
+			assert_eq!(XcmOverBridge::dispatch_weight(&mut invalid_message()), Weight::zero());
+		});
+	}
+
+	#[test]
+	fn dispatch_weight_is_non_zero_if_we_have_decoded_message() {
+		run_test(|| {
+			assert_ne!(XcmOverBridge::dispatch_weight(&mut valid_message()), Weight::zero());
+		});
+	}
+
+	#[test]
+	fn message_is_not_dispatched_when_we_have_failed_to_decode_message() {
+		run_test(|| {
+			assert_eq!(
+				XcmOverBridge::dispatch(invalid_message()),
+				MessageDispatchResult {
+					unspent_weight: Weight::zero(),
+					dispatch_level_result: XcmBlobMessageDispatchResult::InvalidPayload,
+				},
+			);
+			assert!(!TestBlobDispatcher::is_dispatched());
+		});
+	}
+
+	#[test]
+	fn message_is_dispatched_when_we_have_decoded_message() {
+		run_test(|| {
+			assert_eq!(
+				XcmOverBridge::dispatch(valid_message()),
+				MessageDispatchResult {
+					unspent_weight: Weight::zero(),
+					dispatch_level_result: XcmBlobMessageDispatchResult::Dispatched,
+				},
+			);
+			assert!(TestBlobDispatcher::is_dispatched());
+		});
+	}
+}
diff --git a/bridges/modules/xcm-bridge-hub/src/exporter.rs b/bridges/modules/xcm-bridge-hub/src/exporter.rs
index 5262b289aea..7848bafe2ed 100644
--- a/bridges/modules/xcm-bridge-hub/src/exporter.rs
+++ b/bridges/modules/xcm-bridge-hub/src/exporter.rs
@@ -22,19 +22,32 @@
 
 use crate::{Config, Pallet, LOG_TARGET};
 
-use bp_xcm_bridge_hub::XcmAsPlainPayload;
+use crate::{BridgeOf, Bridges};
 
-use bp_messages::{source_chain::MessagesBridge, LaneId};
+use bp_messages::{
+	source_chain::{MessagesBridge, OnMessagesDelivered},
+	LaneId, MessageNonce,
+};
+use bp_xcm_bridge_hub::{BridgeId, BridgeState, LocalXcmChannelManager, XcmAsPlainPayload};
 use frame_support::traits::Get;
 use pallet_bridge_messages::{
 	Config as BridgeMessagesConfig, Error, Pallet as BridgeMessagesPallet,
 };
+use sp_std::boxed::Box;
 use xcm::prelude::*;
 use xcm_builder::{HaulBlob, HaulBlobError, HaulBlobExporter};
 use xcm_executor::traits::ExportXcm;
 
+/// Maximal number of messages in the outbound bridge queue. Once we reach this limit, we
+/// suspend a bridge.
+const OUTBOUND_LANE_CONGESTED_THRESHOLD: MessageNonce = 8_192;
+
+/// After we have suspended the bridge, we wait until number of messages in the outbound bridge
+/// queue drops to this count, before sending resuming the bridge.
+const OUTBOUND_LANE_UNCONGESTED_THRESHOLD: MessageNonce = 1_024;
+
 /// An easy way to access `HaulBlobExporter`.
-type PalletAsHaulBlobExporter<T, I> = HaulBlobExporter<
+pub type PalletAsHaulBlobExporter<T, I> = HaulBlobExporter<
 	DummyHaulBlob,
 	<T as Config<I>>::BridgedNetwork,
 	<T as Config<I>>::DestinationVersion,
@@ -48,7 +61,8 @@ where
 	T: BridgeMessagesConfig<T::BridgeMessagesPalletInstance, OutboundPayload = XcmAsPlainPayload>,
 {
 	type Ticket = (
-		LaneId,
+		BridgeId,
+		BridgeOf<T, I>,
 		<MessagesPallet<T, I> as MessagesBridge<T::OutboundPayload>>::SendMessageArgs,
 		XcmHash,
 	);
@@ -64,7 +78,7 @@ where
 		// let's save them before
 		let bridge_origin_universal_location =
 			universal_source.clone().take().ok_or(SendError::MissingArgument)?;
-		let bridge_destination_interior_location =
+		let bridge_destination_universal_location =
 			destination.clone().take().ok_or(SendError::MissingArgument)?;
 
 		// check if we are able to route the message. We use existing `HaulBlobExporter` for that.
@@ -78,28 +92,22 @@ where
 			message,
 		)?;
 
-		// ok - now we know that the message may be routed by the pallet, let's prepare the
-		// destination universal location
-		let mut bridge_destination_universal_location: InteriorLocation =
-			GlobalConsensus(network).into();
-		bridge_destination_universal_location
-			.append_with(bridge_destination_interior_location)
-			.map_err(|_| SendError::Unroutable)?;
-
-		// .. and the origin relative location
+		// prepare the origin relative location
 		let bridge_origin_relative_location =
 			bridge_origin_universal_location.relative_to(&T::UniversalLocation::get());
 
 		// then we are able to compute the lane id used to send messages
-		let bridge_locations = Self::bridge_locations(
+		let locations = Self::bridge_locations(
 			Box::new(bridge_origin_relative_location),
 			Box::new(bridge_destination_universal_location.into()),
 		)
 		.map_err(|_| SendError::Unroutable)?;
+		let bridge = Self::bridge(locations.bridge_id).ok_or(SendError::Unroutable)?;
 
 		let bridge_message =
-			MessagesPallet::<T, I>::validate_message(bridge_locations.lane_id, &blob).map_err(
-				|e| {
+			MessagesPallet::<T, I>::validate_message(locations.bridge_id.lane_id(), &blob)
+				.map_err(|e| {
+					// TODO:(bridges-v2) - add test/std feature gate?
 					match e {
 						Error::LanesManager(ref ei) =>
 							log::error!(target: LOG_TARGET, "LanesManager: {ei:?}"),
@@ -112,33 +120,180 @@ where
 
 					log::error!(
 						target: LOG_TARGET,
-						"XCM message {:?} cannot be exported because of bridge error: {:?} on bridge {:?}",
+						"XCM message {:?} cannot be exported because of bridge error: {:?} on bridge {:?} and laneId: {:?}",
 						id,
 						e,
-						bridge_locations,
+						locations,
+						locations.bridge_id.lane_id(),
 					);
 					SendError::Transport("BridgeValidateError")
-				},
-			)?;
+				})?;
 
-		Ok(((bridge_locations.lane_id, bridge_message, id), price))
+		Ok(((locations.bridge_id, bridge, bridge_message, id), price))
 	}
 
-	fn deliver((lane_id, bridge_message, id): Self::Ticket) -> Result<XcmHash, SendError> {
+	fn deliver(
+		(bridge_id, bridge, bridge_message, id): Self::Ticket,
+	) -> Result<XcmHash, SendError> {
 		let artifacts = MessagesPallet::<T, I>::send_message(bridge_message);
 
 		log::info!(
 			target: LOG_TARGET,
 			"XCM message {:?} has been enqueued at bridge {:?} with nonce {}",
 			id,
-			lane_id,
+			bridge_id,
 			artifacts.nonce,
 		);
 
+		// maybe we need switch to congested state
+		Self::on_bridge_message_enqueued(bridge_id, bridge, artifacts.enqueued_messages);
+
 		Ok(id)
 	}
 }
 
+impl<T: Config<I>, I: 'static> OnMessagesDelivered for Pallet<T, I> {
+	fn on_messages_delivered(lane_id: LaneId, enqueued_messages: MessageNonce) {
+		Self::on_bridge_messages_delivered(lane_id, enqueued_messages);
+	}
+}
+
+impl<T: Config<I>, I: 'static> Pallet<T, I> {
+	/// Called when new message is pushed onto outbound bridge queue.
+	fn on_bridge_message_enqueued(
+		bridge_id: BridgeId,
+		bridge: BridgeOf<T, I>,
+		enqueued_messages: MessageNonce,
+	) {
+		// if the bridge queue is not congested, we don't want to do anything
+		let is_congested = enqueued_messages > OUTBOUND_LANE_CONGESTED_THRESHOLD;
+		if !is_congested {
+			return
+		}
+
+		// TODO: https://github.com/paritytech/parity-bridges-common/issues/2006 we either need fishermens
+		// to watch this rule violation (suspended, but keep sending new messages), or we need a
+		// hard limit for that like other XCM queues have
+
+		// check if the lane is already suspended. If it is, do nothing. We still accept new
+		// messages to the suspended bridge, hoping that it'll be actually suspended soon
+		if bridge.state == BridgeState::Suspended {
+			return
+		}
+
+		// else - suspend the bridge
+		let bridge_origin_relative_location = match bridge.bridge_origin_relative_location.try_as()
+		{
+			Ok(bridge_origin_relative_location) => bridge_origin_relative_location,
+			Err(_) => {
+				log::debug!(
+					target: LOG_TARGET,
+					"Failed to convert the bridge {:?} origin location {:?}",
+					bridge_id,
+					bridge.bridge_origin_relative_location,
+				);
+
+				return
+			},
+		};
+		let suspend_result =
+			T::LocalXcmChannelManager::suspend_bridge(bridge_origin_relative_location, bridge_id);
+		match suspend_result {
+			Ok(_) => {
+				log::debug!(
+					target: LOG_TARGET,
+					"Suspended the bridge {:?}, originated by the {:?}",
+					bridge_id,
+					bridge.bridge_origin_relative_location,
+				);
+			},
+			Err(e) => {
+				log::debug!(
+					target: LOG_TARGET,
+					"Failed to suspended the bridge {:?}, originated by the {:?}: {:?}",
+					bridge_id,
+					bridge.bridge_origin_relative_location,
+					e,
+				);
+
+				return
+			},
+		}
+
+		// and remember that we have suspended the bridge
+		Bridges::<T, I>::mutate_extant(bridge_id, |bridge| {
+			bridge.state = BridgeState::Suspended;
+		});
+	}
+
+	/// Must be called whenever we receive a message delivery confirmation.
+	fn on_bridge_messages_delivered(lane_id: LaneId, enqueued_messages: MessageNonce) {
+		// if the bridge queue is still congested, we don't want to do anything
+		let is_congested = enqueued_messages > OUTBOUND_LANE_UNCONGESTED_THRESHOLD;
+		if is_congested {
+			return
+		}
+
+		// if we have not suspended the bridge before (or it is closed), we don't want to do
+		// anything
+		let bridge_id = BridgeId::from_lane_id(lane_id);
+		let bridge = match Self::bridge(bridge_id) {
+			Some(bridge) if bridge.state == BridgeState::Suspended => bridge,
+			_ => {
+				// if there is no bridge or it has been closed, then we don't need to send resume
+				// signal to the local origin - it has closed bridge itself, so it should have
+				// alrady pruned everything else
+				return
+			},
+		};
+
+		// else - resume the bridge
+		let bridge_origin_relative_location = (*bridge.bridge_origin_relative_location).try_into();
+		let bridge_origin_relative_location = match bridge_origin_relative_location {
+			Ok(bridge_origin_relative_location) => bridge_origin_relative_location,
+			Err(e) => {
+				log::debug!(
+					target: LOG_TARGET,
+					"Failed to convert the bridge  {:?} location: {:?}",
+					lane_id,
+					e,
+				);
+
+				return
+			},
+		};
+
+		let resume_result =
+			T::LocalXcmChannelManager::resume_bridge(&bridge_origin_relative_location, bridge_id);
+		match resume_result {
+			Ok(_) => {
+				log::debug!(
+					target: LOG_TARGET,
+					"Resumed the bridge {:?}, originated by the {:?}",
+					lane_id,
+					bridge_origin_relative_location,
+				);
+			},
+			Err(e) => {
+				log::debug!(
+					target: LOG_TARGET,
+					"Failed to resume the bridge {:?}, originated by the {:?}: {:?}",
+					lane_id,
+					bridge_origin_relative_location,
+					e,
+				);
+
+				return
+			},
+		}
+
+		// and forget that we have previously suspended the bridge
+		Bridges::<T, I>::mutate_extant(bridge_id, |bridge| {
+			bridge.state = BridgeState::Opened;
+		});
+	}
+}
+
 /// Dummy implementation of the `HaulBlob` trait that is never called.
 ///
 /// We are using `HaulBlobExporter`, which requires `HaulBlob` implementation. It assumes that
@@ -146,7 +301,7 @@ where
 /// else. But bridge messages pallet may have a dedicated channel (lane) for every pair of bridged
 /// chains. So we are using our own `ExportXcm` implementation, but to utilize `HaulBlobExporter` we
 /// still need this `DummyHaulBlob`.
-struct DummyHaulBlob;
+pub struct DummyHaulBlob;
 
 impl HaulBlob for DummyHaulBlob {
 	fn haul_blob(_blob: XcmAsPlainPayload) -> Result<(), HaulBlobError> {
@@ -159,9 +314,8 @@ mod tests {
 	use super::*;
 	use crate::{mock::*, Bridges, LanesManagerOf};
 
-	use bp_messages::{LaneState, OutboundLaneData};
 	use bp_runtime::RangeInclusiveExt;
-	use bp_xcm_bridge_hub::{Bridge, BridgeState};
+	use bp_xcm_bridge_hub::{Bridge, BridgeLocations, BridgeState};
 	use frame_support::assert_ok;
 	use xcm_executor::traits::export_xcm;
 
@@ -173,33 +327,56 @@ mod tests {
 		BridgedRelativeDestination::get()
 	}
 
-	#[test]
-	fn proper_lane_is_used_by_export_xcm() {
-		run_test(|| {
-			// open expected outbound lane
-			let origin = OpenBridgeOrigin::sibling_parachain_origin();
-			let with = bridged_asset_hub_location();
-			let locations =
-				XcmOverBridge::bridge_locations_from_origin(origin, Box::new(with.into())).unwrap();
-
-			let lanes_manager = LanesManagerOf::<TestRuntime, ()>::new();
-			lanes_manager.create_outbound_lane(locations.lane_id).unwrap();
+	fn universal_destination() -> InteriorLocation {
+		[GlobalConsensus(BridgedRelayNetwork::get()), Parachain(BRIDGED_ASSET_HUB_ID)].into()
+	}
+
+	fn open_lane() -> BridgeLocations {
+		// open expected outbound lane
+		let origin = OpenBridgeOrigin::sibling_parachain_origin();
+		let with = bridged_asset_hub_location();
+		let locations =
+			XcmOverBridge::bridge_locations_from_origin(origin, Box::new(with.into())).unwrap();
+
+		let lanes_manager = LanesManagerOf::<TestRuntime, ()>::new();
+		if lanes_manager.create_outbound_lane(locations.bridge_id.lane_id()).is_ok() {
 			assert!(lanes_manager
-				.active_outbound_lane(locations.lane_id)
+				.active_outbound_lane(locations.bridge_id.lane_id())
 				.unwrap()
 				.queued_messages()
 				.is_empty());
 
-			// now let's try to enqueue message using our `ExportXcm` implementation
-			export_xcm::<XcmOverBridge>(
-				BridgedRelayNetwork::get(),
-				0,
-				locations.bridge_origin_universal_location,
-				locations.bridge_destination_universal_location.split_first().0,
-				vec![Instruction::ClearOrigin].into(),
-			)
-			.unwrap();
-		})
+			// insert bridge
+			Bridges::<TestRuntime, ()>::insert(
+				locations.bridge_id,
+				Bridge {
+					bridge_origin_relative_location: Box::new(
+						Location::new(1, Parachain(SIBLING_ASSET_HUB_ID)).into(),
+					),
+					state: BridgeState::Opened,
+					bridge_owner_account: [0u8; 32].into(),
+					reserve: 0,
+				},
+			);
+		}
+
+		*locations
+	}
+
+	fn open_lane_and_send_regular_message() -> BridgeId {
+		let locations = open_lane();
+
+		// now let's try to enqueue message using our `ExportXcm` implementation
+		export_xcm::<XcmOverBridge>(
+			BridgedRelayNetwork::get(),
+			0,
+			locations.bridge_origin_universal_location,
+			locations.bridge_destination_universal_location,
+			vec![Instruction::ClearOrigin].into(),
+		)
+		.unwrap();
+
+		locations.bridge_id
 	}
 
 	#[test]
@@ -217,6 +394,96 @@ mod tests {
 		});
 	}
 
+	#[test]
+	fn exporter_does_not_suspend_the_bridge_if_outbound_bridge_queue_is_not_congested() {
+		run_test(|| {
+			let bridge_id = open_lane_and_send_regular_message();
+			assert!(!TestLocalXcmChannelManager::is_bridge_suspened());
+			assert_eq!(XcmOverBridge::bridge(bridge_id).unwrap().state, BridgeState::Opened);
+		});
+	}
+
+	#[test]
+	fn exporter_does_not_suspend_the_bridge_if_it_is_already_suspended() {
+		run_test(|| {
+			let bridge_id = open_lane_and_send_regular_message();
+			Bridges::<TestRuntime, ()>::mutate_extant(bridge_id, |bridge| {
+				bridge.state = BridgeState::Suspended;
+			});
+			for _ in 1..OUTBOUND_LANE_CONGESTED_THRESHOLD {
+				open_lane_and_send_regular_message();
+			}
+
+			open_lane_and_send_regular_message();
+			assert!(!TestLocalXcmChannelManager::is_bridge_suspened());
+		});
+	}
+
+	#[test]
+	fn exporter_suspends_the_bridge_if_outbound_bridge_queue_is_congested() {
+		run_test(|| {
+			let bridge_id = open_lane_and_send_regular_message();
+			for _ in 1..OUTBOUND_LANE_CONGESTED_THRESHOLD {
+				open_lane_and_send_regular_message();
+			}
+
+			assert!(!TestLocalXcmChannelManager::is_bridge_suspened());
+			assert_eq!(XcmOverBridge::bridge(bridge_id).unwrap().state, BridgeState::Opened);
+
+			open_lane_and_send_regular_message();
+			assert!(TestLocalXcmChannelManager::is_bridge_suspened());
+			assert_eq!(XcmOverBridge::bridge(bridge_id).unwrap().state, BridgeState::Suspended);
+		});
+	}
+
+	#[test]
+	fn bridge_is_not_resumed_if_outbound_bridge_queue_is_still_congested() {
+		run_test(|| {
+			let bridge_id = open_lane_and_send_regular_message();
+			Bridges::<TestRuntime, ()>::mutate_extant(bridge_id, |bridge| {
+				bridge.state = BridgeState::Suspended;
+			});
+			XcmOverBridge::on_bridge_messages_delivered(
+				bridge_id.lane_id(),
+				OUTBOUND_LANE_UNCONGESTED_THRESHOLD + 1,
+			);
+
+			assert!(!TestLocalXcmChannelManager::is_bridge_resumed());
+			assert_eq!(XcmOverBridge::bridge(bridge_id).unwrap().state, BridgeState::Suspended);
+		});
+	}
+
+	#[test]
+	fn bridge_is_not_resumed_if_it_was_not_suspended_before() {
+		run_test(|| {
+			let bridge_id = open_lane_and_send_regular_message();
+			XcmOverBridge::on_bridge_messages_delivered(
+				bridge_id.lane_id(),
+				OUTBOUND_LANE_UNCONGESTED_THRESHOLD,
+			);
+
+			assert!(!TestLocalXcmChannelManager::is_bridge_resumed());
+			assert_eq!(XcmOverBridge::bridge(bridge_id).unwrap().state, BridgeState::Opened);
+		});
+	}
+
+	#[test]
+	fn bridge_is_resumed_when_enough_messages_are_delivered() {
+		run_test(|| {
+			let bridge_id = open_lane_and_send_regular_message();
+			Bridges::<TestRuntime, ()>::mutate_extant(bridge_id, |bridge| {
+				bridge.state = BridgeState::Suspended;
+			});
+			XcmOverBridge::on_bridge_messages_delivered(
+				bridge_id.lane_id(),
+				OUTBOUND_LANE_UNCONGESTED_THRESHOLD,
+			);
+
+			assert!(TestLocalXcmChannelManager::is_bridge_resumed());
+			assert_eq!(XcmOverBridge::bridge(bridge_id).unwrap().state, BridgeState::Opened);
+		});
+	}
+
 	#[test]
 	fn export_fails_if_argument_is_missing() {
 		run_test(|| {
@@ -286,13 +553,9 @@ mod tests {
 		run_test(|| {
 			// valid routable destination
 			let dest = Location::new(2, BridgedUniversalDestination::get());
-			let expected_lane_id = test_lane_id();
 
 			// open bridge
-			pallet_bridge_messages::OutboundLanes::<TestRuntime>::insert(
-				expected_lane_id,
-				OutboundLaneData { state: LaneState::Opened, ..Default::default() },
-			);
+			let expected_lane_id = open_lane().bridge_id.lane_id();
 
 			// check before - no messages
 			assert_eq!(
diff --git a/bridges/modules/xcm-bridge-hub/src/lib.rs b/bridges/modules/xcm-bridge-hub/src/lib.rs
index 957ac73933f..e9ccdfbf8e8 100644
--- a/bridges/modules/xcm-bridge-hub/src/lib.rs
+++ b/bridges/modules/xcm-bridge-hub/src/lib.rs
@@ -51,20 +51,26 @@
 #![warn(missing_docs)]
 #![cfg_attr(not(feature = "std"), no_std)]
 
-use bp_messages::{LaneId, LaneState, MessageNonce};
+use bp_messages::{LaneState, MessageNonce};
 use bp_runtime::{AccountIdOf, BalanceOf, RangeInclusiveExt};
 use bp_xcm_bridge_hub::{
-	bridge_locations, Bridge, BridgeLocations, BridgeLocationsError, BridgeState, XcmAsPlainPayload,
+	bridge_locations, Bridge, BridgeId, BridgeLocations, BridgeLocationsError, BridgeState,
+	LocalXcmChannelManager,
+};
+use frame_support::{
+	traits::{Currency, ReservableCurrency},
+	DefaultNoBound,
 };
-use frame_support::traits::{Currency, ReservableCurrency};
 use frame_system::Config as SystemConfig;
 use pallet_bridge_messages::{Config as BridgeMessagesConfig, LanesManagerError};
 use sp_runtime::traits::Zero;
+use sp_std::{boxed::Box, vec::Vec};
 use xcm::prelude::*;
 use xcm_builder::DispatchBlob;
 use xcm_executor::traits::ConvertLocation;
 
 pub use dispatcher::XcmBlobMessageDispatchResult;
+pub use exporter::PalletAsHaulBlobExporter;
 pub use pallet::*;
 
 mod dispatcher;
@@ -101,7 +107,7 @@ pub mod pallet {
 		/// `BridgedNetworkId` consensus.
 		type BridgeMessagesPalletInstance: 'static;
 
-		/// Price of single message export to the bridged consensus (`Self::BridgedNetworkId`).
+		/// Price of single message export to the bridged consensus (`Self::BridgedNetwork`).
 		type MessageExportPrice: Get<Assets>;
 		/// Checks the XCM version for the destination.
 		type DestinationVersion: GetVersion;
@@ -125,6 +131,8 @@ pub mod pallet {
 		/// Currency used to pay for bridge registration.
 		type NativeCurrency: ReservableCurrency<Self::AccountId>;
 
+		/// Local XCM channel manager.
+		type LocalXcmChannelManager: LocalXcmChannelManager;
 		/// XCM-level dispatcher for inbound bridge messages.
 		type BlobDispatcher: DispatchBlob;
 	}
@@ -145,10 +153,10 @@ pub mod pallet {
 	impl<T: Config<I>, I: 'static> Hooks<BlockNumberFor<T>> for Pallet<T, I> {
 		fn integrity_test() {
 			assert!(
-			Self::bridged_network_id().is_ok(),
-			"Configured `T::BridgedNetwork`: {:?} does not contain `GlobalConsensus` junction with `NetworkId`",
-			T::BridgedNetwork::get()
-		)
+				Self::bridged_network_id().is_ok(),
+				"Configured `T::BridgedNetwork`: {:?} does not contain `GlobalConsensus` junction with `NetworkId`",
+				T::BridgedNetwork::get()
+			)
 		}
 	}
 
@@ -189,7 +197,7 @@ pub mod pallet {
 				.map_err(|_| Error::<T, I>::FailedToReserveBridgeReserve)?;
 
 			// save bridge metadata
-			Bridges::<T, I>::try_mutate(locations.lane_id, |bridge| match bridge {
+			Bridges::<T, I>::try_mutate(locations.bridge_id, |bridge| match bridge {
 				Some(_) => Err(Error::<T, I>::BridgeAlreadyExists),
 				None => {
 					*bridge = Some(BridgeOf::<T, I> {
@@ -207,24 +215,24 @@ pub mod pallet {
 			// create new lanes. Under normal circumstances, following calls shall never fail
 			let lanes_manager = LanesManagerOf::<T, I>::new();
 			lanes_manager
-				.create_inbound_lane(locations.lane_id)
+				.create_inbound_lane(locations.bridge_id.lane_id())
 				.map_err(Error::<T, I>::LanesManager)?;
 			lanes_manager
-				.create_outbound_lane(locations.lane_id)
+				.create_outbound_lane(locations.bridge_id.lane_id())
 				.map_err(Error::<T, I>::LanesManager)?;
 
 			// write something to log
 			log::trace!(
 				target: LOG_TARGET,
 				"Bridge {:?} between {:?} and {:?} has been opened",
-				locations.lane_id,
+				locations.bridge_id,
 				locations.bridge_origin_universal_location,
 				locations.bridge_destination_universal_location,
 			);
 
 			// deposit `BridgeOpened` event
 			Self::deposit_event(Event::<T, I>::BridgeOpened {
-				lane_id: locations.lane_id,
+				bridge_id: locations.bridge_id,
 				local_endpoint: Box::new(locations.bridge_origin_universal_location),
 				remote_endpoint: Box::new(locations.bridge_destination_universal_location),
 			});
@@ -265,7 +273,7 @@ pub mod pallet {
 
 			// update bridge metadata - this also guarantees that the bridge is in the proper state
 			let bridge =
-				Bridges::<T, I>::try_mutate_exists(locations.lane_id, |bridge| match bridge {
+				Bridges::<T, I>::try_mutate_exists(locations.bridge_id, |bridge| match bridge {
 					Some(bridge) => {
 						bridge.state = BridgeState::Closed;
 						Ok(bridge.clone())
@@ -276,10 +284,10 @@ pub mod pallet {
 			// close inbound and outbound lanes
 			let lanes_manager = LanesManagerOf::<T, I>::new();
 			let mut inbound_lane = lanes_manager
-				.any_state_inbound_lane(locations.lane_id)
+				.any_state_inbound_lane(locations.bridge_id.lane_id())
 				.map_err(Error::<T, I>::LanesManager)?;
 			let mut outbound_lane = lanes_manager
-				.any_state_outbound_lane(locations.lane_id)
+				.any_state_outbound_lane(locations.bridge_id.lane_id())
 				.map_err(Error::<T, I>::LanesManager)?;
 
 			// now prune queued messages
@@ -304,7 +312,7 @@ pub mod pallet {
 				log::trace!(
 					target: LOG_TARGET,
 					"Bridge {:?} between {:?} and {:?} is closing. {} messages remaining",
-					locations.lane_id,
+					locations.bridge_id,
 					locations.bridge_origin_universal_location,
 					locations.bridge_destination_universal_location,
 					enqueued_messages,
@@ -312,7 +320,7 @@ pub mod pallet {
 
 				// deposit the `ClosingBridge` event
 				Self::deposit_event(Event::<T, I>::ClosingBridge {
-					lane_id: locations.lane_id,
+					bridge_id: locations.bridge_id,
 					pruned_messages,
 					enqueued_messages,
 				});
@@ -323,7 +331,7 @@ pub mod pallet {
 			// else we have pruned all messages, so lanes and the bridge itself may gone
 			inbound_lane.purge();
 			outbound_lane.purge();
-			Bridges::<T, I>::remove(locations.lane_id);
+			Bridges::<T, I>::remove(locations.bridge_id);
 
 			// unreserve remaining amount
 			let failed_to_unreserve =
@@ -335,7 +343,7 @@ pub mod pallet {
 					target: LOG_TARGET,
 					"Failed to unreserve {:?} during ridge {:?} closure",
 					failed_to_unreserve,
-					locations.lane_id,
+					locations.bridge_id,
 				);
 			}
 
@@ -343,14 +351,14 @@ pub mod pallet {
 			log::trace!(
 				target: LOG_TARGET,
 				"Bridge {:?} between {:?} and {:?} has been closed",
-				locations.lane_id,
+				locations.bridge_id,
 				locations.bridge_origin_universal_location,
 				locations.bridge_destination_universal_location,
 			);
 
 			// deposit the `BridgePruned` event
 			Self::deposit_event(Event::<T, I>::BridgePruned {
-				lane_id: locations.lane_id,
+				bridge_id: locations.bridge_id,
 				pruned_messages,
 			});
 
@@ -406,8 +414,56 @@ pub mod pallet {
 
 	/// All registered bridges.
 	#[pallet::storage]
+	#[pallet::getter(fn bridge)]
 	pub type Bridges<T: Config<I>, I: 'static = ()> =
-		StorageMap<_, Identity, LaneId, BridgeOf<T, I>>;
+		StorageMap<_, Identity, BridgeId, BridgeOf<T, I>>;
+
+	#[pallet::genesis_config]
+	#[derive(DefaultNoBound)]
+	pub struct GenesisConfig<T: Config<I>, I: 'static = ()> {
+		/// Opened bridges.
+		///
+		/// Keep in mind that we are **NOT** reserving any amount for the bridges, opened at
+		/// genesis. We are **NOT** opening lanes, used by this bridge. It all must be done using
+		/// other pallets genesis configuration or some other means.
+		pub opened_bridges: Vec<(Location, InteriorLocation)>,
+		/// Dummy marker.
+		pub phantom: sp_std::marker::PhantomData<(T, I)>,
+	}
+
+	#[pallet::genesis_build]
+	impl<T: Config<I>, I: 'static> BuildGenesisConfig for GenesisConfig<T, I>
+	where
+		T: frame_system::Config<AccountId = AccountIdOf<ThisChainOf<T, I>>>,
+	{
+		fn build(&self) {
+			for (bridge_origin_relative_location, bridge_destination_universal_location) in
+				&self.opened_bridges
+			{
+				let locations = Pallet::<T, I>::bridge_locations(
+					Box::new(bridge_origin_relative_location.clone()),
+					Box::new(bridge_destination_universal_location.clone().into()),
+				)
+				.expect("Invalid genesis configuration");
+				let bridge_owner_account = T::BridgeOriginAccountIdConverter::convert_location(
+					&locations.bridge_origin_relative_location,
+				)
+				.expect("Invalid genesis configuration");
+
+				Bridges::<T, I>::insert(
+					locations.bridge_id,
+					Bridge {
+						bridge_origin_relative_location: Box::new(
+							locations.bridge_origin_relative_location.into(),
+						),
+						state: BridgeState::Opened,
+						bridge_owner_account,
+						reserve: Zero::zero(),
+					},
+				);
+			}
+		}
+	}
 
 	#[pallet::event]
 	#[pallet::generate_deposit(pub(super) fn deposit_event)]
@@ -418,13 +474,13 @@ pub mod pallet {
 			local_endpoint: Box<InteriorLocation>,
 			/// Universal location of remote bridge endpoint.
 			remote_endpoint: Box<InteriorLocation>,
-			/// Bridge and its lane identifier.
-			lane_id: LaneId,
+			/// Bridge identifier.
+			bridge_id: BridgeId,
 		},
 		/// Bridge is going to be closed, but not yet fully pruned from the runtime storage.
 		ClosingBridge {
-			/// Bridge and its lane identifier.
-			lane_id: LaneId,
+			/// Bridge identifier.
+			bridge_id: BridgeId,
 			/// Number of pruned messages during the close call.
 			pruned_messages: MessageNonce,
 			/// Number of enqueued messages that need to be pruned in follow up calls.
@@ -433,8 +489,8 @@ pub mod pallet {
 		/// Bridge has been closed and pruned from the runtime storage. It now may be reopened
 		/// again by any participant.
 		BridgePruned {
-			/// Bridge and its lane identifier.
-			lane_id: LaneId,
+			/// Bridge identifier.
+			bridge_id: BridgeId,
 			/// Number of pruned messages during the close call.
 			pruned_messages: MessageNonce,
 		},
@@ -448,6 +504,8 @@ pub mod pallet {
 		InvalidBridgeOriginAccount,
 		/// The bridge is already registered in this pallet.
 		BridgeAlreadyExists,
+		/// The local origin already owns a maximal number of bridges.
+		TooManyBridgesForLocalOrigin,
 		/// Trying to close already closed bridge.
 		BridgeAlreadyClosed,
 		/// Lanes manager error.
@@ -466,6 +524,7 @@ mod tests {
 	use super::*;
 	use mock::*;
 
+	use bp_messages::LaneId;
 	use frame_support::{assert_noop, assert_ok, traits::fungible::Mutate, BoundedVec};
 	use frame_system::{EventRecord, Phase};
 
@@ -496,11 +555,11 @@ mod tests {
 			bridge_owner_account,
 			reserve,
 		};
-		Bridges::<TestRuntime, ()>::insert(locations.lane_id, bridge.clone());
+		Bridges::<TestRuntime, ()>::insert(locations.bridge_id, bridge.clone());
 
 		let lanes_manager = LanesManagerOf::<TestRuntime, ()>::new();
-		lanes_manager.create_inbound_lane(locations.lane_id).unwrap();
-		lanes_manager.create_outbound_lane(locations.lane_id).unwrap();
+		lanes_manager.create_inbound_lane(locations.bridge_id.lane_id()).unwrap();
+		lanes_manager.create_outbound_lane(locations.bridge_id.lane_id()).unwrap();
 
 		(bridge, *locations)
 	}
@@ -635,7 +694,7 @@ mod tests {
 			);
 
 			Bridges::<TestRuntime, ()>::insert(
-				locations.lane_id,
+				locations.bridge_id,
 				Bridge {
 					bridge_origin_relative_location: Box::new(
 						locations.bridge_origin_relative_location.into(),
@@ -669,7 +728,7 @@ mod tests {
 
 			let lanes_manager = LanesManagerOf::<TestRuntime, ()>::new();
 
-			lanes_manager.create_inbound_lane(locations.lane_id).unwrap();
+			lanes_manager.create_inbound_lane(locations.bridge_id.lane_id()).unwrap();
 			assert_noop!(
 				XcmOverBridge::open_bridge(
 					origin.clone(),
@@ -678,8 +737,11 @@ mod tests {
 				Error::<TestRuntime, ()>::LanesManager(LanesManagerError::InboundLaneAlreadyExists),
 			);
 
-			lanes_manager.active_inbound_lane(locations.lane_id).unwrap().purge();
-			lanes_manager.create_outbound_lane(locations.lane_id).unwrap();
+			lanes_manager
+				.active_inbound_lane(locations.bridge_id.lane_id())
+				.unwrap()
+				.purge();
+			lanes_manager.create_outbound_lane(locations.bridge_id.lane_id()).unwrap();
 			assert_noop!(
 				XcmOverBridge::open_bridge(origin, Box::new(bridged_asset_hub_location().into()),),
 				Error::<TestRuntime, ()>::LanesManager(
@@ -716,13 +778,13 @@ mod tests {
 				.unwrap();
 
 				// ensure that there's no bridge and lanes in the storage
-				assert_eq!(Bridges::<TestRuntime, ()>::get(locations.lane_id), None);
+				assert_eq!(Bridges::<TestRuntime, ()>::get(locations.bridge_id), None);
 				assert_eq!(
-					lanes_manager.active_inbound_lane(locations.lane_id).map(drop),
+					lanes_manager.active_inbound_lane(locations.bridge_id.lane_id()).map(drop),
 					Err(LanesManagerError::UnknownInboundLane)
 				);
 				assert_eq!(
-					lanes_manager.active_outbound_lane(locations.lane_id).map(drop),
+					lanes_manager.active_outbound_lane(locations.bridge_id.lane_id()).map(drop),
 					Err(LanesManagerError::UnknownOutboundLane)
 				);
 
@@ -745,7 +807,7 @@ mod tests {
 
 				// ensure that everything has been set up in the runtime storage
 				assert_eq!(
-					Bridges::<TestRuntime, ()>::get(locations.lane_id),
+					Bridges::<TestRuntime, ()>::get(locations.bridge_id),
 					Some(Bridge {
 						bridge_origin_relative_location: Box::new(
 							locations.bridge_origin_relative_location.into()
@@ -756,11 +818,15 @@ mod tests {
 					}),
 				);
 				assert_eq!(
-					lanes_manager.active_inbound_lane(locations.lane_id).map(|l| l.state()),
+					lanes_manager
+						.active_inbound_lane(locations.bridge_id.lane_id())
+						.map(|l| l.state()),
 					Ok(LaneState::Opened)
 				);
 				assert_eq!(
-					lanes_manager.active_outbound_lane(locations.lane_id).map(|l| l.state()),
+					lanes_manager
+						.active_outbound_lane(locations.bridge_id.lane_id())
+						.map(|l| l.state()),
 					Ok(LaneState::Opened)
 				);
 				assert_eq!(Balances::free_balance(&bridge_owner_account), existential_deposit);
@@ -772,7 +838,7 @@ mod tests {
 					Some(&EventRecord {
 						phase: Phase::Initialization,
 						event: RuntimeEvent::XcmOverBridge(Event::BridgeOpened {
-							lane_id: locations.lane_id,
+							bridge_id: locations.bridge_id,
 							local_endpoint: Box::new(locations.bridge_origin_universal_location),
 							remote_endpoint: Box::new(
 								locations.bridge_destination_universal_location
@@ -833,7 +899,10 @@ mod tests {
 			let (_, locations) = mock_open_bridge_from(origin.clone());
 
 			let lanes_manager = LanesManagerOf::<TestRuntime, ()>::new();
-			lanes_manager.any_state_inbound_lane(locations.lane_id).unwrap().purge();
+			lanes_manager
+				.any_state_inbound_lane(locations.bridge_id.lane_id())
+				.unwrap()
+				.purge();
 			assert_noop!(
 				XcmOverBridge::close_bridge(
 					origin.clone(),
@@ -842,10 +911,16 @@ mod tests {
 				),
 				Error::<TestRuntime, ()>::LanesManager(LanesManagerError::UnknownInboundLane),
 			);
-			lanes_manager.any_state_outbound_lane(locations.lane_id).unwrap().purge();
+			lanes_manager
+				.any_state_outbound_lane(locations.bridge_id.lane_id())
+				.unwrap()
+				.purge();
 
 			let (_, locations) = mock_open_bridge_from(origin.clone());
-			lanes_manager.any_state_outbound_lane(locations.lane_id).unwrap().purge();
+			lanes_manager
+				.any_state_outbound_lane(locations.bridge_id.lane_id())
+				.unwrap()
+				.purge();
 			assert_noop!(
 				XcmOverBridge::close_bridge(
 					origin,
@@ -870,7 +945,7 @@ mod tests {
 
 			// enqueue some messages
 			for _ in 0..32 {
-				enqueue_message(locations.lane_id);
+				enqueue_message(locations.bridge_id.lane_id());
 			}
 
 			// now call the `close_bridge`, which will only partially prune messages
@@ -884,20 +959,26 @@ mod tests {
 			// are pruned, but funds are not unreserved
 			let lanes_manager = LanesManagerOf::<TestRuntime, ()>::new();
 			assert_eq!(
-				Bridges::<TestRuntime, ()>::get(locations.lane_id).map(|b| b.state),
+				Bridges::<TestRuntime, ()>::get(locations.bridge_id).map(|b| b.state),
 				Some(BridgeState::Closed)
 			);
 			assert_eq!(
-				lanes_manager.any_state_inbound_lane(locations.lane_id).unwrap().state(),
+				lanes_manager
+					.any_state_inbound_lane(locations.bridge_id.lane_id())
+					.unwrap()
+					.state(),
 				LaneState::Closed
 			);
 			assert_eq!(
-				lanes_manager.any_state_outbound_lane(locations.lane_id).unwrap().state(),
+				lanes_manager
+					.any_state_outbound_lane(locations.bridge_id.lane_id())
+					.unwrap()
+					.state(),
 				LaneState::Closed
 			);
 			assert_eq!(
 				lanes_manager
-					.any_state_outbound_lane(locations.lane_id)
+					.any_state_outbound_lane(locations.bridge_id.lane_id())
 					.unwrap()
 					.queued_messages()
 					.checked_len(),
@@ -910,7 +991,7 @@ mod tests {
 				Some(&EventRecord {
 					phase: Phase::Initialization,
 					event: RuntimeEvent::XcmOverBridge(Event::ClosingBridge {
-						lane_id: locations.lane_id,
+						bridge_id: locations.bridge_id,
 						pruned_messages: 16,
 						enqueued_messages: 16,
 					}),
@@ -927,20 +1008,26 @@ mod tests {
 
 			// nothing is changed (apart from the pruned messages)
 			assert_eq!(
-				Bridges::<TestRuntime, ()>::get(locations.lane_id).map(|b| b.state),
+				Bridges::<TestRuntime, ()>::get(locations.bridge_id).map(|b| b.state),
 				Some(BridgeState::Closed)
 			);
 			assert_eq!(
-				lanes_manager.any_state_inbound_lane(locations.lane_id).unwrap().state(),
+				lanes_manager
+					.any_state_inbound_lane(locations.bridge_id.lane_id())
+					.unwrap()
+					.state(),
 				LaneState::Closed
 			);
 			assert_eq!(
-				lanes_manager.any_state_outbound_lane(locations.lane_id).unwrap().state(),
+				lanes_manager
+					.any_state_outbound_lane(locations.bridge_id.lane_id())
+					.unwrap()
+					.state(),
 				LaneState::Closed
 			);
 			assert_eq!(
 				lanes_manager
-					.any_state_outbound_lane(locations.lane_id)
+					.any_state_outbound_lane(locations.bridge_id.lane_id())
 					.unwrap()
 					.queued_messages()
 					.checked_len(),
@@ -953,7 +1040,7 @@ mod tests {
 				Some(&EventRecord {
 					phase: Phase::Initialization,
 					event: RuntimeEvent::XcmOverBridge(Event::ClosingBridge {
-						lane_id: locations.lane_id,
+						bridge_id: locations.bridge_id,
 						pruned_messages: 8,
 						enqueued_messages: 8,
 					}),
@@ -970,13 +1057,13 @@ mod tests {
 			),);
 
 			// there's no traces of bridge in the runtime storage and funds are unreserved
-			assert_eq!(Bridges::<TestRuntime, ()>::get(locations.lane_id).map(|b| b.state), None);
+			assert_eq!(Bridges::<TestRuntime, ()>::get(locations.bridge_id).map(|b| b.state), None);
 			assert_eq!(
-				lanes_manager.any_state_inbound_lane(locations.lane_id).map(drop),
+				lanes_manager.any_state_inbound_lane(locations.bridge_id.lane_id()).map(drop),
 				Err(LanesManagerError::UnknownInboundLane)
 			);
 			assert_eq!(
-				lanes_manager.any_state_outbound_lane(locations.lane_id).map(drop),
+				lanes_manager.any_state_outbound_lane(locations.bridge_id.lane_id()).map(drop),
 				Err(LanesManagerError::UnknownOutboundLane)
 			);
 			assert_eq!(
@@ -989,7 +1076,7 @@ mod tests {
 				Some(&EventRecord {
 					phase: Phase::Initialization,
 					event: RuntimeEvent::XcmOverBridge(Event::BridgePruned {
-						lane_id: locations.lane_id,
+						bridge_id: locations.bridge_id,
 						pruned_messages: 8,
 					}),
 					topics: vec![],
diff --git a/bridges/modules/xcm-bridge-hub/src/mock.rs b/bridges/modules/xcm-bridge-hub/src/mock.rs
index cfaf3998e25..15c59b0af16 100644
--- a/bridges/modules/xcm-bridge-hub/src/mock.rs
+++ b/bridges/modules/xcm-bridge-hub/src/mock.rs
@@ -22,11 +22,12 @@ use bp_messages::{
 	target_chain::{DispatchMessage, MessageDispatch},
 	ChainWithMessages, LaneId, MessageNonce,
 };
-use bp_runtime::{messages::MessageDispatchResult, Chain, ChainId};
+use bp_runtime::{messages::MessageDispatchResult, Chain, ChainId, HashOf};
+use bp_xcm_bridge_hub::{BridgeId, LocalXcmChannelManager};
 use codec::Encode;
 use frame_support::{
 	assert_ok, derive_impl, parameter_types,
-	traits::{Everything, Get, NeverEnsureOrigin},
+	traits::{Everything, NeverEnsureOrigin},
 	weights::RuntimeDbWeight,
 };
 use sp_core::H256;
@@ -88,16 +89,18 @@ impl pallet_bridge_messages::Config for TestRuntime {
 	type RuntimeEvent = RuntimeEvent;
 	type WeightInfo = TestMessagesWeights;
 
+	type ThisChain = ThisUnderlyingChain;
+	type BridgedChain = BridgedUnderlyingChain;
+	type BridgedHeaderChain = BridgedHeaderChain;
+
 	type OutboundPayload = Vec<u8>;
 	type InboundPayload = Vec<u8>;
+
 	type DeliveryPayments = ();
 	type DeliveryConfirmationPayments = ();
 	type OnMessagesDelivered = ();
-	type MessageDispatch = TestMessageDispatch;
 
-	type ThisChain = ThisUnderlyingChain;
-	type BridgedChain = BridgedUnderlyingChain;
-	type BridgedHeaderChain = ();
+	type MessageDispatch = TestMessageDispatch;
 }
 
 pub struct TestMessagesWeights;
@@ -124,8 +127,8 @@ impl pallet_bridge_messages::WeightInfo for TestMessagesWeights {
 	fn receive_delivery_proof_for_two_messages_by_two_relayers() -> Weight {
 		Weight::zero()
 	}
-	fn receive_single_n_bytes_message_proof_with_dispatch(_: u32) -> Weight {
-		Weight::zero()
+	fn receive_single_n_bytes_message_proof_with_dispatch(_n: u32) -> Weight {
+		Weight::from_parts(1, 0)
 	}
 }
 
@@ -190,6 +193,8 @@ impl pallet_xcm_bridge_hub::Config for TestRuntime {
 	type BridgeReserve = BridgeDeposit;
 	type NativeCurrency = Balances;
 
+	type LocalXcmChannelManager = TestLocalXcmChannelManager;
+
 	type BlobDispatcher = TestBlobDispatcher;
 }
 
@@ -197,17 +202,17 @@ impl pallet_xcm_bridge_hub_router::Config<()> for TestRuntime {
 	type WeightInfo = ();
 
 	type UniversalLocation = UniversalLocation;
+	type SiblingBridgeHubLocation = BridgeHubLocation;
 	type BridgedNetworkId = BridgedRelayNetwork;
 	type Bridges = NetworkExportTable<BridgeTable>;
 	type DestinationVersion = AlwaysLatest;
 
 	type BridgeHubOrigin = NeverEnsureOrigin<AccountId>;
 	type ToBridgeHubSender = TestExportXcmWithXcmOverBridge;
+	type LocalXcmChannelManager = TestLocalXcmChannelManager;
 
 	type ByteFee = ConstU128<0>;
 	type FeeAsset = BridgeFeeAsset;
-
-	type WithBridgeHubChannel = ();
 }
 
 pub struct XcmConfig;
@@ -374,10 +379,51 @@ impl EnsureOrigin<RuntimeOrigin> for OpenBridgeOrigin {
 	}
 }
 
+pub struct TestLocalXcmChannelManager;
+
+impl TestLocalXcmChannelManager {
+	pub fn make_congested() {
+		frame_support::storage::unhashed::put(b"TestLocalXcmChannelManager.Congested", &true);
+	}
+
+	pub fn is_bridge_suspened() -> bool {
+		frame_support::storage::unhashed::get_or_default(b"TestLocalXcmChannelManager.Suspended")
+	}
+
+	pub fn is_bridge_resumed() -> bool {
+		frame_support::storage::unhashed::get_or_default(b"TestLocalXcmChannelManager.Resumed")
+	}
+}
+
+impl LocalXcmChannelManager for TestLocalXcmChannelManager {
+	type Error = ();
+
+	fn is_congested(_with: &Location) -> bool {
+		frame_support::storage::unhashed::get_or_default(b"TestLocalXcmChannelManager.Congested")
+	}
+
+	fn suspend_bridge(_local_origin: &Location, _bridge: BridgeId) -> Result<(), Self::Error> {
+		frame_support::storage::unhashed::put(b"TestLocalXcmChannelManager.Suspended", &true);
+		Ok(())
+	}
+
+	fn resume_bridge(_local_origin: &Location, _bridge: BridgeId) -> Result<(), Self::Error> {
+		frame_support::storage::unhashed::put(b"TestLocalXcmChannelManager.Resumed", &true);
+		Ok(())
+	}
+}
+
 pub struct TestBlobDispatcher;
 
+impl TestBlobDispatcher {
+	pub fn is_dispatched() -> bool {
+		frame_support::storage::unhashed::get_or_default(b"TestBlobDispatcher.Dispatched")
+	}
+}
+
 impl DispatchBlob for TestBlobDispatcher {
 	fn dispatch_blob(_blob: Vec<u8>) -> Result<(), DispatchBlobError> {
+		frame_support::storage::unhashed::put(b"TestBlobDispatcher.Dispatched", &true);
 		Ok(())
 	}
 }
@@ -445,6 +491,15 @@ impl ChainWithMessages for BridgedUnderlyingChain {
 	const MAX_UNCONFIRMED_MESSAGES_IN_CONFIRMATION_TX: MessageNonce = 128;
 }
 
+pub struct BridgedHeaderChain;
+impl bp_header_chain::HeaderChain<BridgedUnderlyingChain> for BridgedHeaderChain {
+	fn finalized_header_state_root(
+		_hash: HashOf<BridgedUnderlyingChain>,
+	) -> Option<HashOf<BridgedUnderlyingChain>> {
+		unreachable!()
+	}
+}
+
 /// Test message dispatcher.
 pub struct TestMessageDispatch;
 
@@ -458,8 +513,9 @@ impl MessageDispatch for TestMessageDispatch {
 	type DispatchPayload = Vec<u8>;
 	type DispatchLevelResult = ();
 
-	fn is_active() -> bool {
-		frame_support::storage::unhashed::take::<bool>(&(b"inactive").encode()[..]) != Some(false)
+	fn is_active(lane: LaneId) -> bool {
+		frame_support::storage::unhashed::take::<bool>(&(b"inactive", lane).encode()[..]) !=
+			Some(false)
 	}
 
 	fn dispatch_weight(_message: &mut DispatchMessage<Self::DispatchPayload>) -> Weight {
diff --git a/bridges/primitives/messages/src/target_chain.rs b/bridges/primitives/messages/src/target_chain.rs
index b351cef42af..67868ff7c7c 100644
--- a/bridges/primitives/messages/src/target_chain.rs
+++ b/bridges/primitives/messages/src/target_chain.rs
@@ -103,7 +103,7 @@ pub trait MessageDispatch {
 	///
 	/// We check it in the messages delivery transaction prologue. So if it becomes `false`
 	/// after some portion of messages is already dispatched, it doesn't fail the whole transaction.
-	fn is_active() -> bool;
+	fn is_active(lane: LaneId) -> bool;
 
 	/// Estimate dispatch weight.
 	///
@@ -179,7 +179,7 @@ impl<DispatchPayload: Decode> MessageDispatch for ForbidInboundMessages<Dispatch
 	type DispatchPayload = DispatchPayload;
 	type DispatchLevelResult = ();
 
-	fn is_active() -> bool {
+	fn is_active(_: LaneId) -> bool {
 		false
 	}
 
diff --git a/bridges/primitives/xcm-bridge-hub/Cargo.toml b/bridges/primitives/xcm-bridge-hub/Cargo.toml
index fa8f9ef0b76..9949108af21 100644
--- a/bridges/primitives/xcm-bridge-hub/Cargo.toml
+++ b/bridges/primitives/xcm-bridge-hub/Cargo.toml
@@ -13,6 +13,7 @@ workspace = true
 [dependencies]
 codec = { features = ["derive"], default-features = false, workspace = true }
 scale-info = { features = ["derive"], workspace = true }
+serde = { features = ["alloc", "derive"], workspace = true }
 
 # Bridge Dependencies
 bp-messages = { path = "../messages", default-features = false }
@@ -33,6 +34,7 @@ std = [
 	"codec/std",
 	"frame-support/std",
 	"scale-info/std",
+	"serde/std",
 	"sp-std/std",
 	"xcm/std",
 ]
diff --git a/bridges/primitives/xcm-bridge-hub/src/lib.rs b/bridges/primitives/xcm-bridge-hub/src/lib.rs
index 44dbb969824..3aabe220153 100644
--- a/bridges/primitives/xcm-bridge-hub/src/lib.rs
+++ b/bridges/primitives/xcm-bridge-hub/src/lib.rs
@@ -27,62 +27,112 @@ use frame_support::{
 	RuntimeDebugNoBound,
 };
 use scale_info::TypeInfo;
+use serde::{Deserialize, Serialize};
 use sp_std::boxed::Box;
-use xcm::{latest::prelude::*, VersionedLocation};
+use xcm::{latest::prelude::*, VersionedInteriorLocation, VersionedLocation};
 
 /// Encoded XCM blob. We expect the bridge messages pallet to use this blob type for both inbound
 /// and outbound payloads.
 pub type XcmAsPlainPayload = sp_std::vec::Vec<u8>;
 
-/// A manager of XCM communication channels between the bridge hub and parent/sibling chains
-/// that have opened bridges at this bridge hub.
-///
-/// We use this interface to suspend and resume channels programmatically to implement backpressure
-/// mechanism for bridge queues.
-#[allow(clippy::result_unit_err)] // XCM uses `Result<(), ()>` everywhere
-pub trait LocalXcmChannelManager {
-	// TODO: https://github.com/paritytech/parity-bridges-common/issues/2255
-	// check following assumptions. They are important at least for following cases:
-	// 1) we now close the associated outbound lane when misbehavior is reported. If we'll keep
-	//    handling inbound XCM messages after the `suspend_inbound_channel`, they will be dropped
-	// 2) the sender will be able to enqueue message to othe lanes if we won't stop handling inbound
-	//    XCM immediately. He even may open additional bridges
-
-	/// Stop handling new incoming XCM messages from given bridge `owner` (parent/sibling chain).
-	///
-	/// We assume that the channel will be suspended immediately, but we don't mind if inbound
-	/// messages will keep piling up here for some time. Once this is communicated to the
-	/// `owner` chain (in any form), we expect it to stop sending messages to us and queue
-	/// messages at that `owner` chain instead.
-	///
-	/// We expect that:
+/// Bridge identifier.
+#[derive(
+	Clone,
+	Copy,
+	Decode,
+	Default,
+	Encode,
+	Eq,
+	Ord,
+	PartialOrd,
+	PartialEq,
+	RuntimeDebug,
+	TypeInfo,
+	MaxEncodedLen,
+	Serialize,
+	Deserialize,
+)]
+pub struct BridgeId(LaneId);
+
+impl BridgeId {
+	/// Create bridge identifier from two universal locations.
 	///
-	/// - no more incoming XCM messages from the `owner` will be processed until further
-	///  `resume_inbound_channel` call;
+	/// The fact that we are using versioned locations here means that XCM version upgrades must
+	/// be coordinated at all involved chains (at source and target chains + at bridge hubs).
+	/// Otherwise messages may simply be dropped anywhere on its path to the target chain.
+	pub fn new(
+		universal_location1: &VersionedInteriorLocation,
+		universal_location2: &VersionedInteriorLocation,
+	) -> Self {
+		// a tricky helper struct that adds required `Ord` support for
+		// `VersionedInteriorMultiLocation`
+		#[derive(Eq, PartialEq, Ord, PartialOrd)]
+		struct EncodedVersionedInteriorMultiLocation(sp_std::vec::Vec<u8>);
+
+		impl Encode for EncodedVersionedInteriorMultiLocation {
+			fn encode(&self) -> sp_std::vec::Vec<u8> {
+				self.0.clone()
+			}
+		}
+
+		Self(LaneId::new(
+			EncodedVersionedInteriorMultiLocation(universal_location1.encode()),
+			EncodedVersionedInteriorMultiLocation(universal_location2.encode()),
+		))
+	}
+
+	/// Creates bridge id using lane id.
 	///
-	/// - soon after the call, the channel will switch to the state when incoming messages are
-	///   piling up at the sending chain, not at the bridge hub.
+	/// **ATTENTION**: this function may be removed in the future.
+	pub fn from_lane_id(lane_id: LaneId) -> Self {
+		// in the future we may want to keep using the same lane identifiers if we'll be upgrading
+		// the XCM version (and `VersionedInteriorMultiLocation` will change)
+		Self(lane_id)
+	}
+
+	/// Return lane id, used by this bridge.
+	pub fn lane_id(&self) -> LaneId {
+		self.0
+	}
+}
+
+/// Local XCM channel manager.
+pub trait LocalXcmChannelManager {
+	/// Error that may be returned when suspending/resuming the bridge.
+	type Error: sp_std::fmt::Debug;
+
+	/// Returns true if the channel with given location is currently congested.
 	///
-	/// This method shall not fail if the channel is already suspended.
-	fn suspend_inbound_channel(owner: Location) -> Result<(), ()>;
+	/// The `with` is guaranteed to be in the same consensus. However, it may point to something
+	/// below the chain level - like the constract or pallet instance, for example.
+	fn is_congested(with: &Location) -> bool;
 
-	/// Start handling incoming messages from from given bridge `owner` (parent/sibling chain)
-	/// again.
+	/// Suspend the bridge, opened by given origin.
 	///
-	/// The channel is assumed to be suspended by the previous `suspend_inbound_channel` call,
-	/// however we don't check it anywhere.
+	/// The `local_origin` is guaranteed to be in the same consensus. However, it may point to
+	/// something below the chain level - like the constract or pallet instance, for example.
+	fn suspend_bridge(local_origin: &Location, bridge: BridgeId) -> Result<(), Self::Error>;
+
+	/// Resume the previously suspended bridge, opened by given origin.
 	///
-	/// This method shall not fail if the channel is already resumed.
-	fn resume_inbound_channel(owner: Location) -> Result<(), ()>;
+	/// The `local_origin` is guaranteed to be in the same consensus. However, it may point to
+	/// something below the chain level - like the constract or pallet instance, for example.
+	fn resume_bridge(local_origin: &Location, bridge: BridgeId) -> Result<(), Self::Error>;
 }
 
 impl LocalXcmChannelManager for () {
-	fn suspend_inbound_channel(_owner: Location) -> Result<(), ()> {
+	type Error = ();
+
+	fn is_congested(_with: &Location) -> bool {
+		false
+	}
+
+	fn suspend_bridge(_local_origin: &Location, _bridge: BridgeId) -> Result<(), Self::Error> {
 		Ok(())
 	}
 
-	fn resume_inbound_channel(_owner: Location) -> Result<(), ()> {
-		Err(())
+	fn resume_bridge(_local_origin: &Location, _bridge: BridgeId) -> Result<(), Self::Error> {
+		Ok(())
 	}
 }
 
@@ -91,6 +141,11 @@ impl LocalXcmChannelManager for () {
 pub enum BridgeState {
 	/// Bridge is opened. Associated lanes are also opened.
 	Opened,
+	/// Bridge is suspended. Associated lanes are opened.
+	///
+	/// We keep accepting messages to the bridge. The only difference with the `Opened` state
+	/// is that we have sent the "Suspended" message/signal to the local bridge origin.
+	Suspended,
 	/// Bridge is closed. Associated lanes are also closed.
 	/// After all outbound messages will be pruned, the bridge will vanish without any traces.
 	Closed,
@@ -122,7 +177,7 @@ pub struct BridgeLocations {
 	/// Universal (unique) location of other side of the bridge.
 	pub bridge_destination_universal_location: InteriorLocation,
 	/// An identifier of the dedicated bridge message lane.
-	pub lane_id: LaneId,
+	pub bridge_id: BridgeId,
 }
 
 /// Errors that may happen when we check bridge locations.
@@ -217,16 +272,16 @@ pub fn bridge_locations(
 	// `GlobalConsensus` and we know that the `bridge_origin_universal_location`
 	// is also within the `GlobalConsensus`. So we know that the lane id will be
 	// the same on both ends of the bridge
-	let lane_id = LaneId::new(
-		bridge_origin_universal_location.clone(),
-		bridge_destination_universal_location.clone(),
+	let bridge_id = BridgeId::new(
+		&bridge_origin_universal_location.clone().into(),
+		&bridge_destination_universal_location.clone().into(),
 	);
 
 	Ok(Box::new(BridgeLocations {
 		bridge_origin_relative_location: *bridge_origin_relative_location,
 		bridge_origin_universal_location,
 		bridge_destination_universal_location,
-		lane_id,
+		bridge_id,
 	}))
 }
 
@@ -259,14 +314,14 @@ mod tests {
 		assert_eq!(
 			locations,
 			Ok(Box::new(BridgeLocations {
-				bridge_origin_relative_location: test.bridge_origin_relative_location.clone(),
+				bridge_origin_relative_location: test.bridge_origin_relative_location,
 				bridge_origin_universal_location: test.bridge_origin_universal_location.clone(),
 				bridge_destination_universal_location: test
 					.bridge_destination_universal_location
 					.clone(),
-				lane_id: LaneId::new(
-					test.bridge_origin_universal_location,
-					test.bridge_destination_universal_location,
+				bridge_id: BridgeId::new(
+					&test.bridge_origin_universal_location.into(),
+					&test.bridge_destination_universal_location.into(),
 				),
 			})),
 		);
@@ -421,7 +476,7 @@ mod tests {
 			bridge_destination_universal_location: [GlobalConsensus(REMOTE_NETWORK)].into(),
 		});
 
-		assert_eq!(locations1.lane_id, locations2.lane_id);
+		assert_eq!(locations1.bridge_id, locations2.bridge_id);
 	}
 
 	#[test]
@@ -441,7 +496,7 @@ mod tests {
 			bridge_destination_universal_location: [GlobalConsensus(REMOTE_NETWORK)].into(),
 		});
 
-		assert_eq!(locations1.lane_id, locations2.lane_id);
+		assert_eq!(locations1.bridge_id, locations2.bridge_id);
 	}
 
 	// negative tests
-- 
GitLab