Unverified Commit 08f4be76 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

proposer: wait for a hash to be in the active-leaves set (#1616)

* overseer: add ExternalRequest to Event

* proposer: wait for the hash to be activated

* update comments

* overseer: handle unbounded growth of listeners map

* overseer: fix compilation

* overseer: clean up dead listeners

* overseer: cosmetic changes

* overseer: cosmetic changes t.2

* overseer: add debug_assertions

* overseer: fix formatting
parent 013c4a80
Pipeline #104388 passed with stages
in 25 minutes and 48 seconds
...@@ -119,6 +119,10 @@ where ...@@ -119,6 +119,10 @@ where
let mut provisioner_inherent_data = async move { let mut provisioner_inherent_data = async move {
let (sender, receiver) = futures::channel::oneshot::channel(); let (sender, receiver) = futures::channel::oneshot::channel();
overseer.wait_for_activation(parent_header_hash, sender).await?;
receiver.await.map_err(Error::ClosedChannelFromProvisioner)?;
let (sender, receiver) = futures::channel::oneshot::channel();
// strictly speaking, we don't _have_ to .await this send_msg before opening the // strictly speaking, we don't _have_ to .await this send_msg before opening the
// receiver; it's possible that the response there would be ready slightly before // receiver; it's possible that the response there would be ready slightly before
// this call completes. IMO it's not worth the hassle or overhead of spawning a // this call completes. IMO it's not worth the hassle or overhead of spawning a
......
...@@ -59,7 +59,7 @@ use std::pin::Pin; ...@@ -59,7 +59,7 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::Poll; use std::task::Poll;
use std::time::Duration; use std::time::Duration;
use std::collections::HashSet; use std::collections::{hash_map, HashMap};
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use futures::{ use futures::{
...@@ -165,9 +165,18 @@ enum Event { ...@@ -165,9 +165,18 @@ enum Event {
BlockImported(BlockInfo), BlockImported(BlockInfo),
BlockFinalized(BlockInfo), BlockFinalized(BlockInfo),
MsgToSubsystem(AllMessages), MsgToSubsystem(AllMessages),
ExternalRequest(ExternalRequest),
Stop, Stop,
} }
/// Some request from outer world.
enum ExternalRequest {
WaitForActivation {
hash: Hash,
response_channel: oneshot::Sender<()>,
},
}
/// A handler used to communicate with the [`Overseer`]. /// A handler used to communicate with the [`Overseer`].
/// ///
/// [`Overseer`]: struct.Overseer.html /// [`Overseer`]: struct.Overseer.html
...@@ -179,30 +188,36 @@ pub struct OverseerHandler { ...@@ -179,30 +188,36 @@ pub struct OverseerHandler {
impl OverseerHandler { impl OverseerHandler {
/// Inform the `Overseer` that that some block was imported. /// Inform the `Overseer` that that some block was imported.
pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { pub async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockImported(block)).await?; self.events_tx.send(Event::BlockImported(block)).await.map_err(Into::into)
Ok(())
} }
/// Send some message to one of the `Subsystem`s. /// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> { pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.events_tx.send(Event::MsgToSubsystem(msg)).await?; self.events_tx.send(Event::MsgToSubsystem(msg)).await.map_err(Into::into)
Ok(())
} }
/// Inform the `Overseer` that that some block was finalized. /// Inform the `Overseer` that that some block was finalized.
pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { pub async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
self.events_tx.send(Event::BlockFinalized(block)).await?; self.events_tx.send(Event::BlockFinalized(block)).await.map_err(Into::into)
}
Ok(()) /// Wait for a block with the given hash to be in the active-leaves set.
/// This method is used for external code like `Proposer` that doesn't subscribe to Overseer's signals.
///
/// The response channel responds if the hash was activated and is closed if the hash was deactivated.
/// Note that due the fact the overseer doesn't store the whole active-leaves set, only deltas,
/// the response channel may never return if the hash was deactivated before this call.
/// In this case, it's the caller's responsibility to ensure a timeout is set.
pub async fn wait_for_activation(&mut self, hash: Hash, response_channel: oneshot::Sender<()>) -> SubsystemResult<()> {
self.events_tx.send(Event::ExternalRequest(ExternalRequest::WaitForActivation {
hash,
response_channel
})).await.map_err(Into::into)
} }
/// Tell `Overseer` to shutdown. /// Tell `Overseer` to shutdown.
pub async fn stop(&mut self) -> SubsystemResult<()> { pub async fn stop(&mut self) -> SubsystemResult<()> {
self.events_tx.send(Event::Stop).await?; self.events_tx.send(Event::Stop).await.map_err(Into::into)
Ok(())
} }
} }
...@@ -297,9 +312,7 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { ...@@ -297,9 +312,7 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
self.tx.send(ToOverseer::SpawnJob { self.tx.send(ToOverseer::SpawnJob {
name, name,
s, s,
}).await?; }).await.map_err(Into::into)
Ok(())
} }
async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>) async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
...@@ -308,24 +321,18 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> { ...@@ -308,24 +321,18 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
self.tx.send(ToOverseer::SpawnBlockingJob { self.tx.send(ToOverseer::SpawnBlockingJob {
name, name,
s, s,
}).await?; }).await.map_err(Into::into)
Ok(())
} }
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage(msg)).await?; self.tx.send(ToOverseer::SubsystemMessage(msg)).await.map_err(Into::into)
Ok(())
} }
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()> async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{ {
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok)); let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
self.tx.send_all(&mut msgs).await?; self.tx.send_all(&mut msgs).await.map_err(Into::into)
Ok(())
} }
} }
...@@ -399,13 +406,16 @@ pub struct Overseer<S: SpawnNamed> { ...@@ -399,13 +406,16 @@ pub struct Overseer<S: SpawnNamed> {
/// Events that are sent to the overseer from the outside world /// Events that are sent to the overseer from the outside world
events_rx: mpsc::Receiver<Event>, events_rx: mpsc::Receiver<Event>,
/// External listeners waiting for a hash to be in the active-leave set.
activation_external_listeners: HashMap<Hash, Vec<oneshot::Sender<()>>>,
/// A set of leaves that `Overseer` starts working with. /// A set of leaves that `Overseer` starts working with.
/// ///
/// Drained at the beginning of `run` and never used again. /// Drained at the beginning of `run` and never used again.
leaves: Vec<(Hash, BlockNumber)>, leaves: Vec<(Hash, BlockNumber)>,
/// The set of the "active leaves". /// The set of the "active leaves".
active_leaves: HashSet<(Hash, BlockNumber)>, active_leaves: HashMap<Hash, BlockNumber>,
/// Various Prometheus metrics. /// Various Prometheus metrics.
metrics: Metrics, metrics: Metrics,
...@@ -749,9 +759,10 @@ where ...@@ -749,9 +759,10 @@ where
.map(|BlockInfo { hash, parent_hash: _, number }| (hash, number)) .map(|BlockInfo { hash, parent_hash: _, number }| (hash, number))
.collect(); .collect();
let active_leaves = HashSet::new(); let active_leaves = HashMap::new();
let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry); let metrics = <Metrics as metrics::Metrics>::register(prometheus_registry);
let activation_external_listeners = HashMap::new();
let this = Self { let this = Self {
candidate_validation_subsystem, candidate_validation_subsystem,
...@@ -773,6 +784,7 @@ where ...@@ -773,6 +784,7 @@ where
running_subsystems, running_subsystems,
running_subsystems_rx, running_subsystems_rx,
events_rx, events_rx,
activation_external_listeners,
leaves, leaves,
active_leaves, active_leaves,
metrics, metrics,
...@@ -863,10 +875,10 @@ where ...@@ -863,10 +875,10 @@ where
let leaves = std::mem::take(&mut self.leaves); let leaves = std::mem::take(&mut self.leaves);
let mut update = ActiveLeavesUpdate::default(); let mut update = ActiveLeavesUpdate::default();
for leaf in leaves.into_iter() { for (hash, number) in leaves.into_iter() {
update.activated.push(leaf.0); update.activated.push(hash);
self.active_leaves.insert(leaf); self.active_leaves.insert(hash, number);
self.metrics.on_head_activated(); self.on_head_activated(&hash);
} }
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
...@@ -887,6 +899,9 @@ where ...@@ -887,6 +899,9 @@ where
Event::BlockFinalized(block) => { Event::BlockFinalized(block) => {
self.block_finalized(block).await?; self.block_finalized(block).await?;
} }
Event::ExternalRequest(request) => {
self.handle_external_request(request);
}
} }
} }
...@@ -919,17 +934,27 @@ where ...@@ -919,17 +934,27 @@ where
async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> { async fn block_imported(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default(); let mut update = ActiveLeavesUpdate::default();
if let Some(parent) = block.number.checked_sub(1).and_then(|number| self.active_leaves.take(&(block.parent_hash, number))) { if let Some(number) = self.active_leaves.remove(&block.parent_hash) {
update.deactivated.push(parent.0); if let Some(expected_parent_number) = block.number.checked_sub(1) {
self.metrics.on_head_deactivated(); debug_assert_eq!(expected_parent_number, number);
}
update.deactivated.push(block.parent_hash);
self.on_head_deactivated(&block.parent_hash);
} }
if !self.active_leaves.contains(&(block.hash, block.number)) { match self.active_leaves.entry(block.hash) {
hash_map::Entry::Vacant(entry) => {
update.activated.push(block.hash); update.activated.push(block.hash);
self.active_leaves.insert((block.hash, block.number)); entry.insert(block.number);
self.metrics.on_head_activated(); self.on_head_activated(&block.hash);
},
hash_map::Entry::Occupied(entry) => {
debug_assert_eq!(*entry.get(), block.number);
}
} }
self.clean_up_external_listeners();
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
Ok(()) Ok(())
...@@ -937,18 +962,20 @@ where ...@@ -937,18 +962,20 @@ where
async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> { async fn block_finalized(&mut self, block: BlockInfo) -> SubsystemResult<()> {
let mut update = ActiveLeavesUpdate::default(); let mut update = ActiveLeavesUpdate::default();
let metrics = &self.metrics;
self.active_leaves.retain(|(h, n)| { self.active_leaves.retain(|h, n| {
if *n <= block.number { if *n <= block.number {
update.deactivated.push(*h); update.deactivated.push(*h);
metrics.on_head_deactivated();
false false
} else { } else {
true true
} }
}); });
for deactivated in &update.deactivated {
self.on_head_deactivated(deactivated)
}
self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?; self.broadcast_signal(OverseerSignal::ActiveLeaves(update)).await?;
self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash)).await?; self.broadcast_signal(OverseerSignal::BlockFinalized(block.hash)).await?;
...@@ -1100,6 +1127,45 @@ where ...@@ -1100,6 +1127,45 @@ where
} }
} }
fn on_head_activated(&mut self, hash: &Hash) {
self.metrics.on_head_activated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
for listener in listeners {
// it's fine if the listener is no longer interested
let _ = listener.send(());
}
}
}
fn on_head_deactivated(&mut self, hash: &Hash) {
self.metrics.on_head_deactivated();
if let Some(listeners) = self.activation_external_listeners.remove(hash) {
// clean up and signal to listeners the block is deactivated
drop(listeners);
}
}
fn clean_up_external_listeners(&mut self) {
self.activation_external_listeners.retain(|_, v| {
// remove dead listeners
v.retain(|c| !c.is_canceled());
!v.is_empty()
})
}
fn handle_external_request(&mut self, request: ExternalRequest) {
match request {
ExternalRequest::WaitForActivation { hash, response_channel } => {
if self.active_leaves.get(&hash).is_some() {
// it's fine if the listener is no longer interested
let _ = response_channel.send(());
} else {
self.activation_external_listeners.entry(hash).or_default().push(response_channel);
}
}
}
}
fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) { fn spawn_job(&mut self, name: &'static str, j: BoxFuture<'static, ()>) {
self.s.spawn(name, j); self.s.spawn(name, j);
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment