Commit d49be625 authored by asynchronous rob's avatar asynchronous rob
Browse files

time-out message sends in subsystem context

parent f188e3c9
......@@ -166,7 +166,7 @@ async fn runtime_api_request<T>(
request: RuntimeApiRequest,
receiver: oneshot::Receiver<Result<T, RuntimeApiError>>,
) -> SubsystemResult<Result<T, RuntimeApiError>> {
ctx.send_message(
let _ = ctx.send_message(
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
request,
......
......@@ -517,7 +517,7 @@ async fn send_batch_to_network(
batch: Vec<(Vec<PeerId>, protocol_v1::ValidationProtocol)>,
) {
if !batch.is_empty() {
ctx.send_message(NetworkBridgeMessage::SendValidationMessages(batch).into()).await
let _ = ctx.send_message(NetworkBridgeMessage::SendValidationMessages(batch).into()).await;
}
}
......
......@@ -236,10 +236,10 @@ where
{
tracing::trace!(target: LOG_TARGET, rep = ?rep, peer_id = %peer, "reputation change");
ctx.send_message(AllMessages::NetworkBridge(
let _ = ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
))
.await
.await;
}
/// Distribute a given valid and signature checked bitfield message.
......
......@@ -554,7 +554,7 @@ async fn dispatch_validation_events_to_all<I>(
a.chain(b).chain(p).chain(s).filter_map(|x| x)
};
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
let _ = ctx.send_messages(events.into_iter().flat_map(messages_for)).await;
}
#[tracing::instrument(level = "trace", skip(events, ctx), fields(subsystem = LOG_TARGET))]
......@@ -572,7 +572,7 @@ async fn dispatch_collation_events_to_all<I>(
))
};
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
let _ = ctx.send_messages(events.into_iter().flat_map(messages_for)).await;
}
#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))]
......
......@@ -487,7 +487,9 @@ async fn report_peer(
peer: PeerId,
rep: Rep,
) {
ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await
let _ = ctx.send_message(
AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))
).await;
}
/// Handle a notification from a peer that they are awaiting some PoVs.
......
......@@ -681,9 +681,9 @@ async fn report_peer(
peer: PeerId,
rep: Rep,
) {
ctx.send_message(AllMessages::NetworkBridge(
let _ = ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep)
)).await
)).await;
}
// Handle an incoming wire message. Returns a reference to a newly-stored statement
......
......@@ -343,33 +343,56 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
}).await.map_err(Into::into)
}
async fn send_message(&mut self, msg: AllMessages) {
async fn send_message(&mut self, msg: AllMessages) -> bool {
self.send_and_log_error(ToOverseer::SubsystemMessage(msg)).await
}
async fn send_messages<T>(&mut self, msgs: T)
async fn send_messages<T>(&mut self, msgs: T) -> bool
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
if self.tx.send_all(&mut msgs).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send messages to Overseer",
);
const SEND_TIMEOUT: Duration = Duration::from_millis(1000);
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
match self.tx.send_all(&mut msgs).timeout(SEND_TIMEOUT).await {
None => {
tracing::error!(target: LOG_TARGET, "Outgoing messages timeout reached");
false
}
Some(Err(_)) => {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send a messages to Overseer",
);
// We return `true` here because errors indicate no change of re-try.
true
}
Some(Ok(_)) => true,
}
}
}
impl<M> OverseerSubsystemContext<M> {
async fn send_and_log_error(&mut self, msg: ToOverseer) {
if self.tx.send(msg).await.is_err() {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send a message to Overseer",
);
async fn send_and_log_error(&mut self, msg: ToOverseer) -> bool {
const SEND_TIMEOUT: Duration = Duration::from_millis(500);
match self.tx.send(msg).timeout(SEND_TIMEOUT).await {
None => {
tracing::error!(target: LOG_TARGET, "Outgoing message timeout reached");
false
}
Some(Err(_)) => {
tracing::debug!(
target: LOG_TARGET,
msg_type = std::any::type_name::<M>(),
"Failed to send a message to Overseer",
);
// We return `true` here because errors indicate no chance of re-try.
true
}
Some(Ok(_)) => true,
}
}
}
......
......@@ -191,14 +191,16 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
Ok(())
}
async fn send_message(&mut self, msg: AllMessages) {
async fn send_message(&mut self, msg: AllMessages) -> bool {
self.tx
.send(msg)
.await
.expect("test overseer no longer live");
true
}
async fn send_messages<T>(&mut self, msgs: T)
async fn send_messages<T>(&mut self, msgs: T) -> bool
where
T: IntoIterator<Item = AllMessages> + Send,
T::IntoIter: Send,
......@@ -208,6 +210,8 @@ impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
.send_all(&mut iter)
.await
.expect("test overseer no longer live");
true
}
}
......
......@@ -828,7 +828,7 @@ where
ctx: &mut Context,
) -> SubsystemResult<()> {
match outgoing.expect("the Jobs stream never ends; qed") {
FromJobCommand::SendMessage(msg) => ctx.send_message(msg).await,
FromJobCommand::SendMessage(msg) => { let _ = ctx.send_message(msg).await; }
FromJobCommand::Spawn(name, task) => ctx.spawn(name, task).await?,
FromJobCommand::SpawnBlocking(name, task) => ctx.spawn_blocking(name, task).await?,
}
......
......@@ -211,10 +211,14 @@ pub trait SubsystemContext: Send + 'static {
) -> SubsystemResult<()>;
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: AllMessages);
/// Can time-out or fail internally, and the subsystem must decide when and where to
/// retry.
///
/// Returns true on success, false on failure.
async fn send_message(&mut self, msg: AllMessages) -> bool;
/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
async fn send_messages<T>(&mut self, msgs: T)
async fn send_messages<T>(&mut self, msgs: T) -> bool
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment