Skip to content
Snippets Groups Projects
Verified Commit 13156dd7 authored by Michal Kucharczyk's avatar Michal Kucharczyk
Browse files

mempool: better name: xts2 -> transactions

parent 8e94f045
No related merge requests found
Pipeline #485312 waiting for manual action with stages
in 19 minutes and 8 seconds
......@@ -62,7 +62,7 @@ where
//todo: add listener? for updating view with invalid transaction?
/// is transaction watched
watched: bool,
/// transaction actual body
/// extrinsic actual body
tx: ExtrinsicFor<ChainApi>,
/// transaction source
pub(crate) source: TransactionSource,
......@@ -102,9 +102,9 @@ where
ChainApi: graph::ChainApi<Block = Block> + 'static,
{
api: Arc<ChainApi>,
//could be removed after removing watched field (and adding listener into tx)
//todo: could be removed after removing watched field (and adding listener into tx)
listener: Arc<MultiViewListener<ChainApi>>,
xts2: RwLock<HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<Block, ChainApi>>>>,
transactions: RwLock<HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<Block, ChainApi>>>>,
metrics: PrometheusMetrics,
}
......@@ -121,26 +121,26 @@ where
listener: Arc<MultiViewListener<ChainApi>>,
metrics: PrometheusMetrics,
) -> Self {
Self { api, listener, xts2: Default::default(), metrics }
Self { api, listener, transactions: Default::default(), metrics }
}
pub(super) fn get_by_hash(
&self,
hash: ExtrinsicHash<ChainApi>,
) -> Option<ExtrinsicFor<ChainApi>> {
self.xts2.read().get(&hash).map(|t| t.tx.clone())
self.transactions.read().get(&hash).map(|t| t.tx.clone())
}
pub(super) fn len(&self) -> (usize, usize) {
let xts2 = self.xts2.read();
let watched_count = xts2.values().filter(|x| x.is_watched()).count();
(xts2.len() - watched_count, watched_count)
let transactions = self.transactions.read();
let watched_count = transactions.values().filter(|t| t.is_watched()).count();
(transactions.len() - watched_count, watched_count)
}
pub(super) fn push_unwatched(&self, source: TransactionSource, xt: ExtrinsicFor<ChainApi>) {
let hash = self.api.hash_and_length(&xt).0;
let unwatched = Arc::from(TxInMemPool::new_unwatched(source, xt));
self.xts2.write().entry(hash).or_insert(unwatched);
self.transactions.write().entry(hash).or_insert(unwatched);
}
pub(super) fn extend_unwatched(
......@@ -148,24 +148,24 @@ where
source: TransactionSource,
xts: Vec<ExtrinsicFor<ChainApi>>,
) {
let mut xts2 = self.xts2.write();
let mut transactions = self.transactions.write();
xts.into_iter().for_each(|xt| {
let hash = self.api.hash_and_length(&xt).0;
let unwatched = Arc::from(TxInMemPool::new_unwatched(source, xt));
xts2.entry(hash).or_insert(unwatched);
transactions.entry(hash).or_insert(unwatched);
});
}
pub(super) fn push_watched(&self, source: TransactionSource, xt: ExtrinsicFor<ChainApi>) {
let hash = self.api.hash_and_length(&xt).0;
let watched = Arc::from(TxInMemPool::new_watched(source, xt));
self.xts2.write().entry(hash).or_insert(watched);
self.transactions.write().entry(hash).or_insert(watched);
}
pub(super) fn clone_unwatched(
&self,
) -> HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<Block, ChainApi>>> {
self.xts2
self.transactions
.read()
.iter()
.filter_map(|(hash, tx)| (!tx.is_watched()).then(|| (*hash, tx.clone())))
......@@ -174,7 +174,7 @@ where
pub(super) fn clone_watched(
&self,
) -> HashMap<ExtrinsicHash<ChainApi>, Arc<TxInMemPool<Block, ChainApi>>> {
self.xts2
self.transactions
.read()
.iter()
.filter_map(|(hash, tx)| (tx.is_watched()).then(|| (*hash, tx.clone())))
......@@ -182,7 +182,7 @@ where
}
pub(super) fn remove_watched(&self, xt: &RawExtrinsicFor<ChainApi>) {
self.xts2.write().retain(|_, t| *t.tx != *xt);
self.transactions.write().retain(|_, t| *t.tx != *xt);
}
/// Revalidates a batch of transactions.
......@@ -193,11 +193,12 @@ where
let start = Instant::now();
let (count, input) = {
let xts2 = self.xts2.read();
let transactions = self.transactions.read();
(
xts2.len(),
xts2.clone()
transactions.len(),
transactions
.clone()
.into_iter()
.filter(|xt| {
let finalized_block_number = finalized_block.number.into().as_u64();
......@@ -257,9 +258,9 @@ where
) {
log::info!(target: LOG_TARGET, "purge_finalized_transactions count:{:?}", finalized_xts.len());
log_xt_debug!(target: LOG_TARGET, finalized_xts, "[{:?}] purged finalized transactions");
let mut xts2 = self.xts2.write();
let mut transactions = self.transactions.write();
finalized_xts.iter().for_each(|t| {
xts2.remove(t);
transactions.remove(t);
});
}
......@@ -271,9 +272,9 @@ where
metrics.mempool_revalidation_invalid_txs.inc_by(invalid_hashes.len() as _)
});
let mut xts2 = self.xts2.write();
let mut transactions = self.transactions.write();
invalid_hashes.iter().for_each(|i| {
xts2.remove(i);
transactions.remove(i);
});
self.listener.invalidate_transactions(invalid_hashes);
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment