//! Transactions handling to plug on top of the network service.
//! Usage:
//! - Use [`TransactionsHandlerPrototype::new`] to create a prototype.
//! - Pass the `NonDefaultSetConfig` returned from [`TransactionsHandlerPrototype::new`] to the
//!   network configuration as an extra peers set.
//! - Use [`TransactionsHandlerPrototype::build`] then [`TransactionsHandler::run`] to obtain a
//! `Future` that processes transactions.

use crate::config::*;

use codec::{Decode, Encode};
use futures::{prelude::*, stream::FuturesUnordered};
use log::{debug, trace, warn};

use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use sc_network::{
	config::{NonReservedPeerMode, ProtocolId, SetConfig},
	multiaddr::{Multiaddr, Protocol},
		traits::{NotificationEvent, NotificationService, ValidationResult},
	utils::{interval, LruHashSet},
	NetworkBackend, NetworkEventStream, NetworkPeers,
use sc_network_common::{role::ObservedRole, ExHashT};
use sc_network_sync::{SyncEvent, SyncEventStream};
use sc_network_types::PeerId;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::Block as BlockT;

use std::{
	collections::{hash_map::Entry, HashMap},

pub mod config;

/// A set of transactions.
pub type Transactions<E> = Vec<E>;

/// Logging target for the file.
const LOG_TARGET: &str = "sync";

mod rep {
	use sc_network::ReputationChange as Rep;
	/// Reputation change when a peer sends us any transaction.
	/// This forces node to verify it, thus the negative value here. Once transaction is verified,
	/// reputation change should be refunded with `ANY_TRANSACTION_REFUND`
	pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction");
	/// Reputation change when a peer sends us any transaction that is not invalid.
	pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)");
	/// Reputation change when a peer sends us an transaction that we didn't know about.
	pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction");
	/// Reputation change when a peer sends us a bad transaction.
	pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");

struct Metrics {
	propagated_transactions: Counter<U64>,

impl Metrics {
	fn register(r: &Registry) -> Result<Self, PrometheusError> {
		Ok(Self {
			propagated_transactions: register(
					"Number of transactions propagated to at least one peer",

struct PendingTransaction<H> {
	validation: TransactionImportFuture,
	tx_hash: H,

impl<H> Unpin for PendingTransaction<H> {}

impl<H: ExHashT> Future for PendingTransaction<H> {
	type Output = (H, TransactionImport);

	fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
		if let Poll::Ready(import_result) = self.validation.poll_unpin(cx) {
			return Poll::Ready((self.tx_hash.clone(), import_result))


/// Prototype for a [`TransactionsHandler`].
pub struct TransactionsHandlerPrototype {
	/// Name of the transaction protocol.
	protocol_name: ProtocolName,

	/// Handle that is used to communicate with `sc_network::Notifications`.
	notification_service: Box<dyn NotificationService>,

impl TransactionsHandlerPrototype {
	/// Create a new instance.
	pub fn new<
		Hash: AsRef<[u8]>,
		Block: BlockT,
		Net: NetworkBackend<Block, <Block as BlockT>::Hash>,
		protocol_id: ProtocolId,
		genesis_hash: Hash,
		fork_id: Option<&str>,
		metrics: NotificationMetrics,
		peer_store_handle: Arc<dyn PeerStoreProvider>,
	) -> (Self, Net::NotificationProtocolConfig) {
		let genesis_hash = genesis_hash.as_ref();
		let protocol_name: ProtocolName = if let Some(fork_id) = fork_id {
			format!("/{}/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash), fork_id)
		} else {
			format!("/{}/transactions/1", array_bytes::bytes2hex("", genesis_hash))
		let (config, notification_service) = Net::notification_config(
			vec![format!("/{}/transactions/1", protocol_id.as_ref()).into()],
			SetConfig {
				in_peers: 0,
				out_peers: 0,
				reserved_nodes: Vec::new(),
				non_reserved_mode: NonReservedPeerMode::Deny,

		(Self { protocol_name, notification_service }, config)

	/// Turns the prototype into the actual handler. Returns a controller that allows controlling
	/// the behaviour of the handler while it's running.
	/// Important: the transactions handler is initially disabled and doesn't gossip transactions.
	/// Gossiping is enabled when major syncing is done.
	pub fn build<
		B: BlockT + 'static,
		H: ExHashT,
		N: NetworkPeers + NetworkEventStream,
		S: SyncEventStream + sp_consensus::SyncOracle,
		network: N,
		sync: S,
		transaction_pool: Arc<dyn TransactionPool<H, B>>,
		metrics_registry: Option<&Registry>,
	) -> error::Result<(TransactionsHandler<B, H, N, S>, TransactionsHandlerController<H>)> {
		let sync_event_stream = sync.event_stream("transactions-handler-sync");
		let (to_handler, from_controller) = tracing_unbounded("mpsc_transactions_handler", 100_000);

		let handler = TransactionsHandler {
			protocol_name: self.protocol_name,
			notification_service: self.notification_service,
			propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT))
				as Pin<Box<dyn Stream<Item = ()> + Send>>)
			pending_transactions: FuturesUnordered::new(),
			pending_transactions_peers: HashMap::new(),
			sync_event_stream: sync_event_stream.fuse(),
			peers: HashMap::new(),
			metrics: if let Some(r) = metrics_registry {
			} else {

		let controller = TransactionsHandlerController { to_handler };

		Ok((handler, controller))

/// Controls the behaviour of a [`TransactionsHandler`] it is connected to.
pub struct TransactionsHandlerController<H: ExHashT> {
	to_handler: TracingUnboundedSender<ToHandler<H>>,

impl<H: ExHashT> TransactionsHandlerController<H> {
	/// You may call this when new transactions are imported by the transaction pool.
	/// All transactions will be fetched from the `TransactionPool` that was passed at
	/// initialization as part of the configuration and propagated to peers.
	pub fn propagate_transactions(&self) {
		let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions);

	/// You must call when new a transaction is imported by the transaction pool.
	/// This transaction will be fetched from the `TransactionPool` that was passed at
	/// initialization as part of the configuration and propagated to peers.
	pub fn propagate_transaction(&self, hash: H) {
		let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash));

enum ToHandler<H: ExHashT> {

/// Handler for transactions. Call [`TransactionsHandler::run`] to start the processing.
pub struct TransactionsHandler<
	B: BlockT + 'static,
	H: ExHashT,
	N: NetworkPeers + NetworkEventStream,
	S: SyncEventStream + sp_consensus::SyncOracle,
> {
	protocol_name: ProtocolName,
	/// Interval at which we call `propagate_transactions`.
	propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>,
	/// Pending transactions verification tasks.
	pending_transactions: FuturesUnordered<PendingTransaction<H>>,
	/// As multiple peers can send us the same transaction, we group
	/// these peers using the transaction hash while the transaction is
	/// imported. This prevents that we import the same transaction
	/// multiple times concurrently.
	pending_transactions_peers: HashMap<H, Vec<PeerId>>,
	/// Network service to use to send messages and manage peers.
	network: N,
	/// Syncing service.
	sync: S,
	/// Receiver for syncing-related events.
	sync_event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = SyncEvent> + Send>>>,
	// All connected peers
	peers: HashMap<PeerId, Peer<H>>,
	transaction_pool: Arc<dyn TransactionPool<H, B>>,
	from_controller: TracingUnboundedReceiver<ToHandler<H>>,
	/// Prometheus metrics.
	metrics: Option<Metrics>,
	/// Handle that is used to communicate with `sc_network::Notifications`.
	notification_service: Box<dyn NotificationService>,

/// Peer information
struct Peer<H: ExHashT> {
	/// Holds a set of transactions known to this peer.
	known_transactions: LruHashSet<H>,
	role: ObservedRole,

impl<B, H, N, S> TransactionsHandler<B, H, N, S>
	B: BlockT + 'static,
	H: ExHashT,
	N: NetworkPeers + NetworkEventStream,
	S: SyncEventStream + sp_consensus::SyncOracle,
	/// Turns the [`TransactionsHandler`] into a future that should run forever and not be
	/// interrupted.
	pub async fn run(mut self) {
		loop {
			futures::select! {
				_ = self.propagate_timeout.next() => {
				(tx_hash, result) = self.pending_transactions.select_next_some() => {
					if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) {
						peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result));
					} else {
						warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!");
				sync_event = self.sync_event_stream.next() => {
					if let Some(sync_event) = sync_event {
					} else {
						// Syncing has seemingly closed. Closing as well.
				message = self.from_controller.select_next_some() => {
					match message {
						ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash),
						ToHandler::PropagateTransactions => self.propagate_transactions(),
				event = self.notification_service.next_event().fuse() => {
					if let Some(event) = event {
					} else {
						// `Notifications` has seemingly closed. Closing as well.

	fn handle_notification_event(&mut self, event: NotificationEvent) {
		match event {
			NotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx, .. } => {
				// only accept peers whose role can be determined
				let result = self
					.peer_role(peer, handshake)
					.map_or(ValidationResult::Reject, |_| ValidationResult::Accept);
				let _ = result_tx.send(result);
			NotificationEvent::NotificationStreamOpened { peer, handshake, .. } => {
				let Some(role) = self.network.peer_role(peer, handshake) else {
					log::debug!(target: "sub-libp2p", "role for {peer} couldn't be determined");

				let _was_in = self.peers.insert(
					Peer {
						known_transactions: LruHashSet::new(
							NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS).expect("Constant is nonzero"),
			NotificationEvent::NotificationStreamClosed { peer } => {
				let _peer = self.peers.remove(&peer);
			NotificationEvent::NotificationReceived { peer, notification } => {
				if let Ok(m) =
					<Transactions<B::Extrinsic> as Decode>::decode(&mut notification.as_ref())
					self.on_transactions(peer, m);
				} else {
					warn!(target: "sub-libp2p", "Failed to decode transactions list from peer {peer}");
					self.network.report_peer(peer, rep::BAD_TRANSACTION);

	fn handle_sync_event(&mut self, event: SyncEvent) {
		match event {
			SyncEvent::InitialPeers(peer_ids) => {
				let addrs = peer_ids
					.map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into())))
				let result =
					self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs);
				if let Err(err) = result {
					log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err);
			SyncEvent::PeerConnected(peer_id) => {
				let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
				let result = self.network.add_peers_to_reserved_set(
				if let Err(err) = result {
					log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err);
			SyncEvent::PeerDisconnected(peer_id) => {
				let result = self.network.remove_peers_from_reserved_set(
				if let Err(err) = result {
					log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);

	/// Called when peer sends us new transactions
	fn on_transactions(&mut self, who: PeerId, transactions: Transactions<B::Extrinsic>) {
		// Accept transactions only when node is not major syncing
		if self.sync.is_major_syncing() {
			trace!(target: LOG_TARGET, "{} Ignoring transactions while major syncing", who);

		trace!(target: LOG_TARGET, "Received {} transactions from {}", transactions.len(), who);
		if let Some(ref mut peer) = self.peers.get_mut(&who) {
			for t in transactions {
				if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
						target: LOG_TARGET,
						"Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",

				let hash = self.transaction_pool.hash_of(&t);

				self.network.report_peer(who, rep::ANY_TRANSACTION);

				match self.pending_transactions_peers.entry(hash.clone()) {
					Entry::Vacant(entry) => {
						self.pending_transactions.push(PendingTransaction {
							validation: self.transaction_pool.import(t),
							tx_hash: hash,
					Entry::Occupied(mut entry) => {

	fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) {
		match import {
			TransactionImport::KnownGood =>
				self.network.report_peer(who, rep::ANY_TRANSACTION_REFUND),
			TransactionImport::NewGood => self.network.report_peer(who, rep::GOOD_TRANSACTION),
			TransactionImport::Bad => self.network.report_peer(who, rep::BAD_TRANSACTION),
			TransactionImport::None => {},

	/// Propagate one transaction.
	pub fn propagate_transaction(&mut self, hash: &H) {
		// Accept transactions only when node is not major syncing
		if self.sync.is_major_syncing() {

		debug!(target: LOG_TARGET, "Propagating transaction [{:?}]", hash);
		if let Some(transaction) = self.transaction_pool.transaction(hash) {
			let propagated_to =
				self.do_propagate_transactions(&[(hash.clone(), transaction)]);
		} else {
			debug!(target: "sync", "Propagating transaction failure [{:?}]", hash);

	fn do_propagate_transactions(
		&mut self,
		transactions: &[(H, Arc<B::Extrinsic>)],
	) -> HashMap<H, Vec<String>> {
		let mut propagated_to = HashMap::<_, Vec<_>>::new();
		let mut propagated_transactions = 0;

		for (who, peer) in self.peers.iter_mut() {
			// never send transactions to the light node
			if matches!(peer.role, ObservedRole::Light) {

			let (hashes, to_send): (Vec<_>, Transactions<_>) = transactions
				.filter(|(hash, _)| peer.known_transactions.insert(hash.clone()))

			propagated_transactions += hashes.len();

			if !to_send.is_empty() {
				for hash in hashes {
				trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
				// Historically, the format of a notification of the transactions protocol
				// consisted in a (SCALE-encoded) `Vec<Transaction>`.
				// After RFC 56, the format was modified in a backwards-compatible way to be
				// a (SCALE-encoded) tuple `(Compact(1), Transaction)`, which is the same encoding
				// as a `Vec` of length one. This is no coincidence, as the change was
				// intentionally done in a backwards-compatible way.
				// In other words, the `Vec` that is sent below **must** always have only a single
				// element in it.
				// See <https://github.com/polkadot-fellows/RFCs/blob/main/text/0056-one-transaction-per-notification.md>
				for to_send in to_send {
					let _ = self
						.send_sync_notification(who, vec![to_send].encode());

		if let Some(ref metrics) = self.metrics {
			metrics.propagated_transactions.inc_by(propagated_transactions as _)


	/// Call when we must propagate ready transactions to peers.
	fn propagate_transactions(&mut self) {
		// Accept transactions only when node is not major syncing
		if self.sync.is_major_syncing() {

		let transactions = self.transaction_pool.transactions();

		if transactions.is_empty() {

		debug!(target: LOG_TARGET, "Propagating transactions");

		let propagated_to = self.do_propagate_transactions(&transactions);