Newer
Older
});
}
fn resume_channel(target: ParaId) {
if let Some(index) = s.iter().position(|item| item.recipient == target) {
let suspended = s[index].state == OutboundState::Suspended;
debug_assert!(
suspended,
"WARNING: Attempt to resume channel that was not suspended."
);
if s[index].first_index == s[index].last_index {
s[index].state = OutboundState::Ok;
debug_assert!(false, "WARNING: Attempt to resume channel that was not suspended.");
Adrian Catangiu
committed
#[cfg(feature = "bridging")]
fn is_inbound_channel_suspended(sender: ParaId) -> bool {
<InboundXcmpStatus<T>>::get()
.iter()
.find(|c| c.sender == sender)
.map(|c| c.state == InboundState::Suspended)
.unwrap_or(false)
}
#[cfg(feature = "bridging")]
/// Returns tuple of `OutboundState` and number of queued pages.
fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
let queued_pages = c.last_index.saturating_sub(c.first_index);
(c.state, queued_pages)
})
}
impl<T: Config> XcmpMessageHandler for Pallet<T> {
fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
iter: I,
max_weight: Weight,
) -> Weight {
let mut status = <InboundXcmpStatus<T>>::get();
let QueueConfigData { suspend_threshold, drop_threshold, .. } = <QueueConfig<T>>::get();
for (sender, sent_at, data) in iter {
// Figure out the message format.
let mut data_ref = data;
let format = match XcmpMessageFormat::decode_with_depth_limit(
MAX_XCM_DECODE_DEPTH,
&mut data_ref,
) {
debug_assert!(false, "Unknown XCMP message format. Silently dropping message");
continue
},
};
if format == XcmpMessageFormat::Signals {
while !data_ref.is_empty() {
use ChannelSignal::*;
match ChannelSignal::decode(&mut data_ref) {
Ok(Suspend) => Self::suspend_channel(sender),
Ok(Resume) => Self::resume_channel(sender),
Err(_) => break,
}
}
} else {
// Record the fact we received it.
match status.binary_search_by_key(&sender, |item| item.sender) {
let count = status[i].message_metadata.len() as u32;
if count >= suspend_threshold && status[i].state == InboundState::Ok {
status[i].state = InboundState::Suspended;
let r = Self::send_signal(sender, ChannelSignal::Suspend);
if r.is_err() {
log::warn!(
"Attempt to suspend channel failed. Messages may be dropped."
);
if count < drop_threshold {
status[i].message_metadata.push((sent_at, format));
debug_assert!(
false,
"XCMP channel queue full. Silently dropping message"
);
// Update the delivery fee factor, if applicable.
if count > suspend_threshold {
let message_size_factor =
FixedU128::from((data_ref.len() / 1024) as u128)
.saturating_mul(delivery_fee_constants::MESSAGE_SIZE_FEE_BASE);
Self::increase_fee_factor(sender, message_size_factor);
}
Err(_) => status.push(InboundChannelDetails {
sender,
state: InboundState::Ok,
message_metadata: vec![(sent_at, format)],
}),
}
// Queue the payload for later execution.
<InboundXcmpMessages<T>>::insert(sender, sent_at, data_ref);
}
// Optimization note; it would make sense to execute messages immediately if
// `status.is_empty()` here.
}
status.sort();
Self::service_xcmp_queue(max_weight)
}
}
impl<T: Config> XcmpMessageSource for Pallet<T> {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
let mut statuses = <OutboundXcmpStatus<T>>::get();
let old_statuses_len = statuses.len();
let max_message_count = statuses.len().min(maximum_channels);
let mut result = Vec::with_capacity(max_message_count);
for status in statuses.iter_mut() {
let OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
mut signals_exist,
mut first_index,
mut last_index,
} = *status;
if result.len() == max_message_count {
// We check this condition in the beginning of the loop so that we don't include
// a message where the limit is 0.
if outbound_state == OutboundState::Suspended {
}
let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
ChannelStatus::Closed => {
// This means that there is no such channel anymore. Nothing to be done but
// swallow the messages and discard the status.
for i in first_index..last_index {
<OutboundXcmpMessages<T>>::remove(para_id, i);
*status = OutboundChannelDetails::new(para_id);
ChannelStatus::Full => continue,
ChannelStatus::Ready(n, e) => (n, e),
};
let page = <SignalMessages<T>>::get(para_id);
} else if last_index > first_index {
let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
<OutboundXcmpMessages<T>>::remove(para_id, first_index);
first_index += 1;
if first_index == last_index {
first_index = 0;
last_index = 0;
}
if page.len() > max_size_ever {
// TODO: #274 This means that the channel's max message size has changed since
// the message was sent. We should parse it and split into smaller mesasges but
// since it's so unlikely then for now we just drop it.
log::warn!("WARNING: oversize message in queue. silently dropping.");
} else {
result.push((para_id, page));
}
let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
Some(channel_info) => channel_info.max_total_size,
None => {
log::warn!("calling `get_channel_info` with no RelevantMessagingState?!");
MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
},
};
let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
let remaining_total_size: usize = (first_index..last_index)
.map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
.sum();
if remaining_total_size <= threshold as usize {
Self::decrease_fee_factor(para_id);
}
*status = OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
signals_exist,
first_index,
last_index,
};
}
// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
// criteria requirement.
result.sort_by_key(|m| m.0);
// Prune hrmp channels that became empty. Additionally, because it may so happen that we
// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
// change the order. Otherwise, the next `on_finalize` we will again give attention
// only to those channels that happen to be in the beginning, until they are emptied.
// This leads to "starvation" of the channels near to the end.
//
// To mitigate this we shift all processed elements towards the end of the vector using
// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
statuses.retain(|x| {
x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
});
// old_status_len must be >= status.len() since we never add anything to status.
let pruned = old_statuses_len - statuses.len();
// removing an item from status implies a message being sent, so the result messages must
// be no less than the pruned channels.
statuses.rotate_left(result.len().saturating_sub(pruned));
<OutboundXcmpStatus<T>>::put(statuses);
result
}
}
/// Xcm sender for sending to a sibling parachain.
impl<T: Config> SendXcm for Pallet<T> {
type Ticket = (ParaId, VersionedXcm<()>);
fn validate(
dest: &mut Option<MultiLocation>,
msg: &mut Option<Xcm<()>>,
) -> SendResult<(ParaId, VersionedXcm<()>)> {
let d = dest.take().ok_or(SendError::MissingArgument)?;
// An HRMP message for a sibling parachain.
MultiLocation { parents: 1, interior: X1(Parachain(id)) } => {
let xcm = msg.take().ok_or(SendError::MissingArgument)?;
let id = ParaId::from(*id);
let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
.map_err(|()| SendError::DestinationUnsupported)?;
Ok(((id, versioned_xcm), price))
},
_ => {
// Anything else is unhandled. This includes a message that is not meant for us.
// We need to make sure that dest/msg is not consumed here.
*dest = Some(d);
Err(SendError::NotApplicable)
},
}
}
fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
Ok(_) => {
Gavin Wood
committed
Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
Err(e) => Err(SendError::Transport(<&'static str>::from(e))),
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
impl<T: Config> FeeTracker for Pallet<T> {
type Id = ParaId;
fn get_fee_factor(id: Self::Id) -> FixedU128 {
<DeliveryFeeFactor<T>>::get(id)
}
fn increase_fee_factor(id: Self::Id, message_size_factor: FixedU128) -> FixedU128 {
<DeliveryFeeFactor<T>>::mutate(id, |f| {
*f = f.saturating_mul(
delivery_fee_constants::EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor),
);
*f
})
}
fn decrease_fee_factor(id: Self::Id) -> FixedU128 {
<DeliveryFeeFactor<T>>::mutate(id, |f| {
*f = InitialFactor::get().max(*f / delivery_fee_constants::EXPONENTIAL_FEE_BASE);
*f
})
}
}