Skip to content
Snippets Groups Projects
Verified Commit 406c37b5 authored by Michal Kucharczyk's avatar Michal Kucharczyk
Browse files

dropped_watcher: rename C -> ChainApi

parent c9f2d393
No related merge requests found
......@@ -24,7 +24,7 @@
use crate::{
common::log_xt::log_xt_trace,
fork_aware_txpool::stream_map_util::next_event,
graph::{BlockHash, ChainApi, ExtrinsicHash},
graph::{self, BlockHash, ExtrinsicHash},
LOG_TARGET,
};
use futures::stream::StreamExt;
......@@ -59,24 +59,24 @@ type Controller<T> = mpsc::TracingUnboundedSender<T>;
type CommandReceiver<T> = mpsc::TracingUnboundedReceiver<T>;
/// Commands to control the instance of dropped transactions stream [`StreamOfDropped`].
enum Command<C>
enum Command<ChainApi>
where
C: ChainApi,
ChainApi: graph::ChainApi,
{
/// Adds a new stream of dropped-related events originating in a view with a specific block
/// hash
AddView(BlockHash<C>, ViewStream<C>),
AddView(BlockHash<ChainApi>, ViewStream<ChainApi>),
/// Removes an existing view's stream associated with a specific block hash.
RemoveView(BlockHash<C>),
RemoveView(BlockHash<ChainApi>),
/// Removes internal states for given extrinsic hashes.
///
/// Intended to ba called on finalization.
RemoveFinalizedTxs(Vec<ExtrinsicHash<C>>),
RemoveFinalizedTxs(Vec<ExtrinsicHash<ChainApi>>),
}
impl<C> Debug for Command<C>
impl<ChainApi> Debug for Command<ChainApi>
where
C: ChainApi,
ChainApi: graph::ChainApi,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
......@@ -92,29 +92,29 @@ where
///
/// This struct maintains a mapping of active views and their corresponding streams, as well as the
/// state of each transaction with respect to these views.
struct MultiViewDropWatcherContext<C>
struct MultiViewDropWatcherContext<ChainApi>
where
C: ChainApi,
ChainApi: graph::ChainApi,
{
/// A map that associates the views identified by corresponding block hashes with their streams
/// of dropped-related events. This map is used to keep track of active views and their event
/// streams.
stream_map: StreamMap<BlockHash<C>, ViewStream<C>>,
stream_map: StreamMap<BlockHash<ChainApi>, ViewStream<ChainApi>>,
/// A receiver for commands to control the state of the stream, allowing the addition and
/// removal of views. This is used to dynamically update which views are being tracked.
command_receiver: CommandReceiver<Command<C>>,
command_receiver: CommandReceiver<Command<ChainApi>>,
/// For each transaction hash we keep the set of hashes representing the views that see this
/// transaction as ready or future.
///
/// Once transaction is dropped, dropping view is removed from the set.
transaction_states: HashMap<ExtrinsicHash<C>, HashSet<BlockHash<C>>>,
transaction_states: HashMap<ExtrinsicHash<ChainApi>, HashSet<BlockHash<ChainApi>>>,
}
impl<C> MultiViewDropWatcherContext<C>
where
C: ChainApi + 'static,
<<C as ChainApi>::Block as BlockT>::Hash: Unpin,
C: graph::ChainApi + 'static,
<<C as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
{
/// Processes a `ViewStreamEvent` from a specific view and updates the internal state
/// accordingly.
......@@ -220,30 +220,30 @@ where
///
/// This struct provides methods to add and remove streams associated with views to and from the
/// stream.
pub struct MultiViewDroppedWatcherController<C: ChainApi> {
pub struct MultiViewDroppedWatcherController<ChainApi: graph::ChainApi> {
/// A controller allowing to update the state of the associated [`StreamOfDropped`].
controller: Controller<Command<C>>,
controller: Controller<Command<ChainApi>>,
}
impl<C: ChainApi> Clone for MultiViewDroppedWatcherController<C> {
impl<ChainApi: graph::ChainApi> Clone for MultiViewDroppedWatcherController<ChainApi> {
fn clone(&self) -> Self {
Self { controller: self.controller.clone() }
}
}
impl<C> MultiViewDroppedWatcherController<C>
impl<ChainApi> MultiViewDroppedWatcherController<ChainApi>
where
C: ChainApi + 'static,
<<C as ChainApi>::Block as BlockT>::Hash: Unpin,
ChainApi: graph::ChainApi + 'static,
<<ChainApi as graph::ChainApi>::Block as BlockT>::Hash: Unpin,
{
/// Creates new [`StreamOfDropped`] and its controller.
pub fn new() -> (MultiViewDroppedWatcherController<C>, StreamOfDropped<C>) {
let (stream_map, ctrl) = MultiViewDropWatcherContext::<C>::event_stream();
pub fn new() -> (MultiViewDroppedWatcherController<ChainApi>, StreamOfDropped<ChainApi>) {
let (stream_map, ctrl) = MultiViewDropWatcherContext::<ChainApi>::event_stream();
(Self { controller: ctrl }, stream_map.boxed())
}
/// Notifies the [`StreamOfDropped`] that new view was created.
pub fn add_view(&self, key: BlockHash<C>, view: ViewStream<C>) {
pub fn add_view(&self, key: BlockHash<ChainApi>, view: ViewStream<ChainApi>) {
let _ = self.controller.unbounded_send(Command::AddView(key, view)).map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: add_view {key:?} send message failed: {e}");
});
......@@ -251,14 +251,17 @@ where
/// Notifies the [`StreamOfDropped`] that the view was destroyed and shall be removed the
/// stream map.
pub fn remove_view(&self, key: BlockHash<C>) {
pub fn remove_view(&self, key: BlockHash<ChainApi>) {
let _ = self.controller.unbounded_send(Command::RemoveView(key)).map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: remove_view {key:?} send message failed: {e}");
});
}
/// Removes status info for finalized transactions.
pub fn remove_finalized_txs(&self, xts: impl IntoIterator<Item = ExtrinsicHash<C>> + Clone) {
pub fn remove_finalized_txs(
&self,
xts: impl IntoIterator<Item = ExtrinsicHash<ChainApi>> + Clone,
) {
let _ = self
.controller
.unbounded_send(Command::RemoveFinalizedTxs(xts.into_iter().collect()))
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment