Unverified Commit ac39b076 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

make it easier to dbg stalls (#3351)

* make it easier to dbg

* revert channel sizes

* BAnon
parent e714cc67
Pipeline #145389 passed with stages
in 37 minutes and 52 seconds
...@@ -57,9 +57,12 @@ impl ParachainsInherentDataProvider { ...@@ -57,9 +57,12 @@ impl ParachainsInherentDataProvider {
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)?.map_err(Error::Subsystem)?; receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)?.map_err(Error::Subsystem)?;
let (sender, receiver) = futures::channel::oneshot::channel(); let (sender, receiver) = futures::channel::oneshot::channel();
overseer.send_msg(AllMessages::Provisioner( overseer.send_msg(
ProvisionerMessage::RequestInherentData(parent, sender), AllMessages::Provisioner(
)).await; ProvisionerMessage::RequestInherentData(parent, sender),
),
std::any::type_name::<Self>(),
).await;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData) receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData)
}; };
......
...@@ -401,7 +401,10 @@ impl From<FinalityNotification<Block>> for BlockInfo { ...@@ -401,7 +401,10 @@ impl From<FinalityNotification<Block>> for BlockInfo {
enum Event { enum Event {
BlockImported(BlockInfo), BlockImported(BlockInfo),
BlockFinalized(BlockInfo), BlockFinalized(BlockInfo),
MsgToSubsystem(AllMessages), MsgToSubsystem {
msg: AllMessages,
origin: &'static str,
},
ExternalRequest(ExternalRequest), ExternalRequest(ExternalRequest),
Stop, Stop,
} }
...@@ -452,8 +455,16 @@ impl OverseerHandler { ...@@ -452,8 +455,16 @@ impl OverseerHandler {
} }
/// Send some message to one of the `Subsystem`s. /// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) { pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await self.send_and_log_error(Event::MsgToSubsystem {
msg: msg.into(),
origin,
}).await
}
/// Same as `send_msg`, but with no origin. Used for tests.
pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
self.send_msg(msg, "").await
} }
/// Inform the `Overseer` that some block was finalized. /// Inform the `Overseer` that some block was finalized.
...@@ -801,7 +812,8 @@ pub struct OverseerSubsystemSender { ...@@ -801,7 +812,8 @@ pub struct OverseerSubsystemSender {
#[async_trait::async_trait] #[async_trait::async_trait]
impl SubsystemSender for OverseerSubsystemSender { impl SubsystemSender for OverseerSubsystemSender {
async fn send_message(&mut self, msg: AllMessages) { async fn send_message(&mut self, msg: AllMessages) {
self.channels.send_and_log_error(self.signals_received.load(), msg).await; let needed_signals = self.signals_received.load();
self.channels.send_and_log_error(needed_signals, msg).await;
} }
async fn send_messages<T>(&mut self, msgs: T) async fn send_messages<T>(&mut self, msgs: T)
...@@ -891,12 +903,18 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { ...@@ -891,12 +903,18 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
loop { loop {
// If we have a message pending an overseer signal, we only poll for signals // If we have a message pending an overseer signal, we only poll for signals
// in the meantime. // in the meantime.
let signals_received = self.signals_received.load();
if let Some((needs_signals_received, msg)) = self.pending_incoming.take() { if let Some((needs_signals_received, msg)) = self.pending_incoming.take() {
if needs_signals_received <= self.signals_received.load() { if needs_signals_received <= signals_received {
return Ok(FromOverseer::Communication { msg }); return Ok(FromOverseer::Communication { msg });
} else { } else {
self.pending_incoming = Some((needs_signals_received, msg)); self.pending_incoming = Some((needs_signals_received, msg));
tracing::debug!(
target: LOG_TARGET,
subsystem = std::any::type_name::<M>(),
diff = needs_signals_received - signals_received,
"waiting for a signal",
);
// wait for next signal. // wait for next signal.
let signal = self.signals.next().await let signal = self.signals.next().await
.ok_or(SubsystemError::Context( .ok_or(SubsystemError::Context(
...@@ -911,7 +929,6 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { ...@@ -911,7 +929,6 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
let mut await_message = self.messages.next(); let mut await_message = self.messages.next();
let mut await_signal = self.signals.next(); let mut await_signal = self.signals.next();
let signals_received = self.signals_received.load();
let pending_incoming = &mut self.pending_incoming; let pending_incoming = &mut self.pending_incoming;
// Otherwise, wait for the next signal or incoming message. // Otherwise, wait for the next signal or incoming message.
...@@ -989,7 +1006,7 @@ impl<M> OverseenSubsystem<M> { ...@@ -989,7 +1006,7 @@ impl<M> OverseenSubsystem<M> {
/// Send a message to the wrapped subsystem. /// Send a message to the wrapped subsystem.
/// ///
/// If the inner `instance` is `None`, nothing is happening. /// If the inner `instance` is `None`, nothing is happening.
async fn send_message(&mut self, msg: M) -> SubsystemResult<()> { async fn send_message(&mut self, msg: M, origin: &'static str) -> SubsystemResult<()> {
const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);
if let Some(ref mut instance) = self.instance { if let Some(ref mut instance) = self.instance {
...@@ -999,7 +1016,12 @@ impl<M> OverseenSubsystem<M> { ...@@ -999,7 +1016,12 @@ impl<M> OverseenSubsystem<M> {
}).timeout(MESSAGE_TIMEOUT).await }).timeout(MESSAGE_TIMEOUT).await
{ {
None => { None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); tracing::error!(
target: LOG_TARGET,
%origin,
"Subsystem {} appears unresponsive.",
instance.name,
);
Err(SubsystemError::SubsystemStalled(instance.name)) Err(SubsystemError::SubsystemStalled(instance.name))
} }
Some(res) => res.map_err(Into::into), Some(res) => res.map_err(Into::into),
...@@ -1016,9 +1038,15 @@ impl<M> OverseenSubsystem<M> { ...@@ -1016,9 +1038,15 @@ impl<M> OverseenSubsystem<M> {
const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10); const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10);
if let Some(ref mut instance) = self.instance { if let Some(ref mut instance) = self.instance {
match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await { match instance.tx_signal.send(signal.clone()).timeout(SIGNAL_TIMEOUT).await {
None => { None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); tracing::error!(
target: LOG_TARGET,
?signal,
received = instance.signals_received,
"Subsystem {} appears unresponsive.",
instance.name,
);
Err(SubsystemError::SubsystemStalled(instance.name)) Err(SubsystemError::SubsystemStalled(instance.name))
} }
Some(res) => { Some(res) => {
...@@ -1903,8 +1931,8 @@ where ...@@ -1903,8 +1931,8 @@ where
select! { select! {
msg = self.events_rx.select_next_some() => { msg = self.events_rx.select_next_some() => {
match msg { match msg {
Event::MsgToSubsystem(msg) => { Event::MsgToSubsystem { msg, origin } => {
self.route_message(msg.into()).await?; self.route_message(msg.into(), origin).await?;
} }
Event::Stop => { Event::Stop => {
self.stop().await; self.stop().await;
...@@ -2028,59 +2056,63 @@ where ...@@ -2028,59 +2056,63 @@ where
Ok(()) Ok(())
} }
async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { async fn route_message(
&mut self,
msg: AllMessages,
origin: &'static str,
) -> SubsystemResult<()> {
self.metrics.on_message_relayed(); self.metrics.on_message_relayed();
match msg { match msg {
AllMessages::CandidateValidation(msg) => { AllMessages::CandidateValidation(msg) => {
self.subsystems.candidate_validation.send_message(msg).await?; self.subsystems.candidate_validation.send_message(msg, origin).await?;
}, },
AllMessages::CandidateBacking(msg) => { AllMessages::CandidateBacking(msg) => {
self.subsystems.candidate_backing.send_message(msg).await?; self.subsystems.candidate_backing.send_message(msg, origin).await?;
}, },
AllMessages::StatementDistribution(msg) => { AllMessages::StatementDistribution(msg) => {
self.subsystems.statement_distribution.send_message(msg).await?; self.subsystems.statement_distribution.send_message(msg, origin).await?;
}, },
AllMessages::AvailabilityDistribution(msg) => { AllMessages::AvailabilityDistribution(msg) => {
self.subsystems.availability_distribution.send_message(msg).await?; self.subsystems.availability_distribution.send_message(msg, origin).await?;
}, },
AllMessages::AvailabilityRecovery(msg) => { AllMessages::AvailabilityRecovery(msg) => {
self.subsystems.availability_recovery.send_message(msg).await?; self.subsystems.availability_recovery.send_message(msg, origin).await?;
}, },
AllMessages::BitfieldDistribution(msg) => { AllMessages::BitfieldDistribution(msg) => {
self.subsystems.bitfield_distribution.send_message(msg).await?; self.subsystems.bitfield_distribution.send_message(msg, origin).await?;
}, },
AllMessages::BitfieldSigning(msg) => { AllMessages::BitfieldSigning(msg) => {
self.subsystems.bitfield_signing.send_message(msg).await?; self.subsystems.bitfield_signing.send_message(msg, origin).await?;
}, },
AllMessages::Provisioner(msg) => { AllMessages::Provisioner(msg) => {
self.subsystems.provisioner.send_message(msg).await?; self.subsystems.provisioner.send_message(msg, origin).await?;
}, },
AllMessages::RuntimeApi(msg) => { AllMessages::RuntimeApi(msg) => {
self.subsystems.runtime_api.send_message(msg).await?; self.subsystems.runtime_api.send_message(msg, origin).await?;
}, },
AllMessages::AvailabilityStore(msg) => { AllMessages::AvailabilityStore(msg) => {
self.subsystems.availability_store.send_message(msg).await?; self.subsystems.availability_store.send_message(msg, origin).await?;
}, },
AllMessages::NetworkBridge(msg) => { AllMessages::NetworkBridge(msg) => {
self.subsystems.network_bridge.send_message(msg).await?; self.subsystems.network_bridge.send_message(msg, origin).await?;
}, },
AllMessages::ChainApi(msg) => { AllMessages::ChainApi(msg) => {
self.subsystems.chain_api.send_message(msg).await?; self.subsystems.chain_api.send_message(msg, origin).await?;
}, },
AllMessages::CollationGeneration(msg) => { AllMessages::CollationGeneration(msg) => {
self.subsystems.collation_generation.send_message(msg).await?; self.subsystems.collation_generation.send_message(msg, origin).await?;
}, },
AllMessages::CollatorProtocol(msg) => { AllMessages::CollatorProtocol(msg) => {
self.subsystems.collator_protocol.send_message(msg).await?; self.subsystems.collator_protocol.send_message(msg, origin).await?;
}, },
AllMessages::ApprovalDistribution(msg) => { AllMessages::ApprovalDistribution(msg) => {
self.subsystems.approval_distribution.send_message(msg).await?; self.subsystems.approval_distribution.send_message(msg, origin).await?;
}, },
AllMessages::ApprovalVoting(msg) => { AllMessages::ApprovalVoting(msg) => {
self.subsystems.approval_voting.send_message(msg).await?; self.subsystems.approval_voting.send_message(msg, origin).await?;
}, },
AllMessages::GossipSupport(msg) => { AllMessages::GossipSupport(msg) => {
self.subsystems.gossip_support.send_message(msg).await?; self.subsystems.gossip_support.send_message(msg, origin).await?;
}, },
AllMessages::DisputeCoordinator(_) => {} AllMessages::DisputeCoordinator(_) => {}
AllMessages::DisputeParticipation(_) => {} AllMessages::DisputeParticipation(_) => {}
......
...@@ -228,7 +228,7 @@ fn overseer_metrics_work() { ...@@ -228,7 +228,7 @@ fn overseer_metrics_work() {
handler.block_imported(second_block).await; handler.block_imported(second_block).await;
handler.block_imported(third_block).await; handler.block_imported(third_block).await;
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.stop().await; handler.stop().await;
select! { select! {
...@@ -984,22 +984,22 @@ fn overseer_all_subsystems_receive_signals_and_messages() { ...@@ -984,22 +984,22 @@ fn overseer_all_subsystems_receive_signals_and_messages() {
// send a msg to each subsystem // send a msg to each subsystem
// except for BitfieldSigning and GossipSupport as the messages are not instantiable // except for BitfieldSigning and GossipSupport as the messages are not instantiable
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await; handler.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await; handler.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await; handler.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; handler.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
handler.send_msg(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await; handler.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await; // handler.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
// handler.send_msg(AllMessages::GossipSupport(test_bitfield_signing_msg())).await; // handler.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await; handler.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await; handler.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await;
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await; handler.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await; handler.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await; handler.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await; handler.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await;
handler.send_msg(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await; handler.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
handler.send_msg(AllMessages::ApprovalVoting(test_approval_voting_msg())).await; handler.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;
// Wait until all subsystems have received. Otherwise the messages might race against // Wait until all subsystems have received. Otherwise the messages might race against
// the conclude signal. // the conclude signal.
......
...@@ -130,11 +130,14 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingVotingRule ...@@ -130,11 +130,14 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingVotingRule
Box::pin(async move { Box::pin(async move {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let approval_checking_subsystem_vote = { let approval_checking_subsystem_vote = {
overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor( overseer.send_msg(
best_hash, ApprovalVotingMessage::ApprovedAncestor(
base_number, best_hash,
tx, base_number,
)).await; tx,
),
std::any::type_name::<Self>(),
).await;
rx.await.ok().and_then(|v| v) rx.await.ok().and_then(|v| v)
}; };
......
...@@ -216,7 +216,10 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B> ...@@ -216,7 +216,10 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>
self.overseer self.overseer
.clone() .clone()
.send_msg(ChainSelectionMessage::Leaves(tx)).await; .send_msg(
ChainSelectionMessage::Leaves(tx),
std::any::type_name::<Self>(),
).await;
rx.await rx.await
.map_err(Error::OverseerDisconnected) .map_err(Error::OverseerDisconnected)
...@@ -264,7 +267,10 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B> ...@@ -264,7 +267,10 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>
let subchain_head = { let subchain_head = {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
overseer.send_msg(ChainSelectionMessage::BestLeafContaining(target_hash, tx)).await; overseer.send_msg(
ChainSelectionMessage::BestLeafContaining(target_hash, tx),
std::any::type_name::<Self>(),
).await;
let best = rx.await let best = rx.await
.map_err(Error::OverseerDisconnected) .map_err(Error::OverseerDisconnected)
...@@ -318,11 +324,14 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B> ...@@ -318,11 +324,14 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>
let (subchain_head, subchain_number) = { let (subchain_head, subchain_number) = {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor( overseer.send_msg(
subchain_head, ApprovalVotingMessage::ApprovedAncestor(
target_number, subchain_head,
tx, target_number,
)).await; tx,
),
std::any::type_name::<Self>(),
).await;
match rx.await match rx.await
.map_err(Error::OverseerDisconnected) .map_err(Error::OverseerDisconnected)
......
...@@ -385,7 +385,7 @@ mod tests { ...@@ -385,7 +385,7 @@ mod tests {
spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed()); spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());
block_on(handler.send_msg(CollatorProtocolMessage::CollateOn(Default::default()))); block_on(handler.send_msg_anon(CollatorProtocolMessage::CollateOn(Default::default())));
assert!(matches!(block_on(rx.into_future()).0.unwrap(), CollatorProtocolMessage::CollateOn(_))); assert!(matches!(block_on(rx.into_future()).0.unwrap(), CollatorProtocolMessage::CollateOn(_)));
} }
} }
...@@ -348,11 +348,11 @@ impl PolkadotTestNode { ...@@ -348,11 +348,11 @@ impl PolkadotTestNode {
}; };
self.overseer_handler self.overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config)) .send_msg(CollationGenerationMessage::Initialize(config), "Collator")
.await; .await;
self.overseer_handler self.overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id)) .send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
.await; .await;
} }
} }
......
...@@ -88,11 +88,11 @@ fn main() -> Result<()> { ...@@ -88,11 +88,11 @@ fn main() -> Result<()> {
para_id, para_id,
}; };
overseer_handler overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config)) .send_msg(CollationGenerationMessage::Initialize(config), "Collator")
.await; .await;
overseer_handler overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id)) .send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
.await; .await;
Ok(full_node.task_manager) Ok(full_node.task_manager)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment