Newer
Older
}
}
} else {
// Record the fact we received it.
match status.binary_search_by_key(&sender, |item| item.sender) {
let count = status[i].message_metadata.len();
if count as u32 >= suspend_threshold && status[i].state == InboundState::Ok
{
status[i].state = InboundState::Suspended;
let r = Self::send_signal(sender, ChannelSignal::Suspend);
if r.is_err() {
log::warn!(
"Attempt to suspend channel failed. Messages may be dropped."
);
}
}
if (count as u32) < drop_threshold {
status[i].message_metadata.push((sent_at, format));
debug_assert!(
false,
"XCMP channel queue full. Silently dropping message"
);
Err(_) => status.push(InboundChannelDetails {
sender,
state: InboundState::Ok,
message_metadata: vec![(sent_at, format)],
}),
}
// Queue the payload for later execution.
<InboundXcmpMessages<T>>::insert(sender, sent_at, data_ref);
}
// Optimization note; it would make sense to execute messages immediately if
// `status.is_empty()` here.
}
status.sort();
Self::service_xcmp_queue(max_weight)
}
}
impl<T: Config> XcmpMessageSource for Pallet<T> {
fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
let mut statuses = <OutboundXcmpStatus<T>>::get();
let old_statuses_len = statuses.len();
let max_message_count = statuses.len().min(maximum_channels);
let mut result = Vec::with_capacity(max_message_count);
for status in statuses.iter_mut() {
let OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
mut signals_exist,
mut first_index,
mut last_index,
} = *status;
if result.len() == max_message_count {
// We check this condition in the beginning of the loop so that we don't include
// a message where the limit is 0.
if outbound_state == OutboundState::Suspended {
}
let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
ChannelStatus::Closed => {
// This means that there is no such channel anymore. Nothing to be done but
// swallow the messages and discard the status.
for i in first_index..last_index {
<OutboundXcmpMessages<T>>::remove(para_id, i);
*status = OutboundChannelDetails::new(para_id);
ChannelStatus::Full => continue,
ChannelStatus::Ready(n, e) => (n, e),
};
let page = <SignalMessages<T>>::get(para_id);
} else if last_index > first_index {
let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
<OutboundXcmpMessages<T>>::remove(para_id, first_index);
first_index += 1;
if first_index == last_index {
first_index = 0;
last_index = 0;
}
if page.len() > max_size_ever {
// TODO: #274 This means that the channel's max message size has changed since
// the message was sent. We should parse it and split into smaller mesasges but
// since it's so unlikely then for now we just drop it.
log::warn!("WARNING: oversize message in queue. silently dropping.");
} else {
result.push((para_id, page));
}
*status = OutboundChannelDetails {
recipient: para_id,
state: outbound_state,
signals_exist,
first_index,
last_index,
};
}
// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
// criteria requirement.
result.sort_by_key(|m| m.0);
// Prune hrmp channels that became empty. Additionally, because it may so happen that we
// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
// change the order. Otherwise, the next `on_finalize` we will again give attention
// only to those channels that happen to be in the beginning, until they are emptied.
// This leads to "starvation" of the channels near to the end.
//
// To mitigate this we shift all processed elements towards the end of the vector using
// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
statuses.retain(|x| {
x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
});
// old_status_len must be >= status.len() since we never add anything to status.
let pruned = old_statuses_len - statuses.len();
// removing an item from status implies a message being sent, so the result messages must
// be no less than the pruned channels.
statuses.rotate_left(result.len().saturating_sub(pruned));
<OutboundXcmpStatus<T>>::put(statuses);
result
}
}
/// Xcm sender for sending to a sibling parachain.
impl<T: Config> SendXcm for Pallet<T> {
type Ticket = (ParaId, VersionedXcm<()>);
fn validate(
dest: &mut Option<MultiLocation>,
msg: &mut Option<Xcm<()>>,
) -> SendResult<(ParaId, VersionedXcm<()>)> {
let d = dest.take().ok_or(SendError::MissingArgument)?;
// An HRMP message for a sibling parachain.
MultiLocation { parents: 1, interior: X1(Parachain(id)) } => {
let xcm = msg.take().ok_or(SendError::MissingArgument)?;
let id = ParaId::from(*id);
Branislav Kontur
committed
let price = T::PriceForSiblingDelivery::price_for_parachain_delivery(id, &xcm);
let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
.map_err(|()| SendError::DestinationUnsupported)?;
Ok(((id, versioned_xcm), price))
},
_ => {
// Anything else is unhandled. This includes a message that is not meant for us.
// We need to make sure that dest/msg is not consumed here.
*dest = Some(d);
Err(SendError::NotApplicable)
},
}
}
fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
Ok(_) => {
Gavin Wood
committed
Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
Err(e) => Err(SendError::Transport(<&'static str>::from(e))),