Skip to content
Snippets Groups Projects
Unverified Commit 98a364fe authored by Alexandru Vasile's avatar Alexandru Vasile Committed by GitHub
Browse files

rpc-v2: Limit transactionBroadcast calls to 16 (#3772)


This PR limits the number of active calls to the transactionBroadcast
APIs to 16.

cc @paritytech/subxt-team 

Closes: https://github.com/paritytech/polkadot-sdk/issues/3081

---------

Signed-off-by: default avatarAlexandru Vasile <alexandru.vasile@parity.io>
Co-authored-by: default avatarJames Wilson <james@jsdw.me>
parent 88a2f360
No related merge requests found
Pipeline #467147 failed with stages
in 51 minutes and 33 seconds
......@@ -47,14 +47,14 @@ pub trait TransactionBroadcastApi {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_broadcast")]
fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;
#[method(name = "transaction_unstable_broadcast", raw_method)]
async fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>>;
/// Broadcast an extrinsic to the chain.
///
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "transaction_unstable_stop")]
fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
#[method(name = "transaction_unstable_stop", raw_method)]
async fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast>;
}
......@@ -67,6 +67,7 @@ fn maintained_pool(
pub fn setup_api(
options: Options,
max_tx_per_connection: usize,
) -> (
Arc<TestApi>,
Arc<MiddlewarePool>,
......@@ -85,9 +86,13 @@ pub fn setup_api(
let (task_executor, executor_recv) = TaskExecutorBroadcast::new();
let tx_api =
RpcTransactionBroadcast::new(client_mock.clone(), pool.clone(), Arc::new(task_executor))
.into_rpc();
let tx_api = RpcTransactionBroadcast::new(
client_mock.clone(),
pool.clone(),
Arc::new(task_executor),
max_tx_per_connection,
)
.into_rpc();
(api, pool, client_mock, tx_api, executor_recv, pool_state)
}
......
......@@ -26,6 +26,8 @@ use std::sync::Arc;
use substrate_test_runtime_client::AccountKeyring::*;
use substrate_test_runtime_transaction_pool::uxt;
const MAX_TX_PER_CONNECTION: usize = 4;
// Test helpers.
use crate::transaction::tests::{
middleware_pool::{MiddlewarePoolEvent, TxStatusTypeTest},
......@@ -35,7 +37,7 @@ use crate::transaction::tests::{
#[tokio::test]
async fn tx_broadcast_enters_pool() {
let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(Default::default());
setup_api(Default::default(), MAX_TX_PER_CONNECTION);
// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);
......@@ -94,7 +96,8 @@ async fn tx_broadcast_enters_pool() {
#[tokio::test]
async fn tx_broadcast_invalid_tx() {
let (_, pool, _, tx_api, exec_middleware, _) = setup_api(Default::default());
let (_, pool, _, tx_api, exec_middleware, _) =
setup_api(Default::default(), MAX_TX_PER_CONNECTION);
// Invalid parameters.
let err = tx_api
......@@ -131,7 +134,7 @@ async fn tx_broadcast_invalid_tx() {
#[tokio::test]
async fn tx_stop_with_invalid_operation_id() {
let (_, _, _, tx_api, _, _) = setup_api(Default::default());
let (_, _, _, tx_api, _, _) = setup_api(Default::default(), MAX_TX_PER_CONNECTION);
// Make an invalid stop call.
let err = tx_api
......@@ -146,7 +149,7 @@ async fn tx_stop_with_invalid_operation_id() {
#[tokio::test]
async fn tx_broadcast_resubmits_future_nonce_tx() {
let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(Default::default());
setup_api(Default::default(), MAX_TX_PER_CONNECTION);
// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);
......@@ -237,7 +240,7 @@ async fn tx_broadcast_resubmits_future_nonce_tx() {
#[tokio::test]
async fn tx_broadcast_stop_after_broadcast_finishes() {
let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(Default::default());
setup_api(Default::default(), MAX_TX_PER_CONNECTION);
// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);
......@@ -320,7 +323,7 @@ async fn tx_broadcast_resubmits_invalid_tx() {
};
let (api, pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(options);
setup_api(options, MAX_TX_PER_CONNECTION);
let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());
......@@ -439,7 +442,8 @@ async fn tx_broadcast_resubmits_dropped_tx() {
ban_time: std::time::Duration::ZERO,
};
let (api, pool, client_mock, tx_api, _, mut pool_middleware) = setup_api(options);
let (api, pool, client_mock, tx_api, _, mut pool_middleware) =
setup_api(options, MAX_TX_PER_CONNECTION);
let current_uxt = uxt(Alice, ALICE_NONCE);
let current_xt = hex_string(&current_uxt.encode());
......@@ -518,3 +522,53 @@ async fn tx_broadcast_resubmits_dropped_tx() {
// The dropped transaction was resubmitted.
assert_eq!(events.get(&future_xt).unwrap(), &vec![TxStatusTypeTest::Ready]);
}
#[tokio::test]
async fn tx_broadcast_limit_reached() {
// One operation per connection.
let (api, _pool, client_mock, tx_api, mut exec_middleware, mut pool_middleware) =
setup_api(Default::default(), 1);
// Start at block 1.
let block_1_header = api.push_block(1, vec![], true);
let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());
let operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
// Announce block 1 to `transaction_unstable_broadcast`.
client_mock.trigger_import_stream(block_1_header).await;
// Ensure the tx propagated from `transaction_unstable_broadcast` to the transaction pool.
let event = get_next_event!(&mut pool_middleware);
assert_eq!(
event,
MiddlewarePoolEvent::TransactionStatus {
transaction: xt.clone(),
status: TxStatusTypeTest::Ready
}
);
assert_eq!(1, exec_middleware.num_tasks());
let operation_id_limit_reached: Option<String> =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
assert!(operation_id_limit_reached.is_none(), "No operation ID => tx was rejected");
// We still have in flight one operation.
assert_eq!(1, exec_middleware.num_tasks());
// Force the future to exit by calling stop.
let _: () = tx_api
.call("transaction_unstable_stop", rpc_params![&operation_id])
.await
.unwrap();
// Ensure the broadcast future finishes.
let _ = get_next_event!(&mut exec_middleware.recv);
assert_eq!(0, exec_middleware.num_tasks());
// Can resubmit again now.
let _operation_id: String =
tx_api.call("transaction_unstable_broadcast", rpc_params![&xt]).await.unwrap();
}
......@@ -26,6 +26,7 @@ use crate::{
},
SubscriptionTaskExecutor,
};
use codec::Decode;
use futures::{StreamExt, TryFutureExt};
use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
......
......@@ -18,11 +18,17 @@
//! API implementation for broadcasting transactions.
use crate::{transaction::api::TransactionBroadcastApiServer, SubscriptionTaskExecutor};
use crate::{
common::connections::RpcConnections, transaction::api::TransactionBroadcastApiServer,
SubscriptionTaskExecutor,
};
use codec::Decode;
use futures::{FutureExt, Stream, StreamExt};
use futures_util::stream::AbortHandle;
use jsonrpsee::core::{async_trait, RpcResult};
use jsonrpsee::{
core::{async_trait, RpcResult},
ConnectionDetails,
};
use parking_lot::RwLock;
use rand::{distributions::Alphanumeric, Rng};
use sc_client_api::BlockchainEvents;
......@@ -46,6 +52,8 @@ pub struct TransactionBroadcast<Pool: TransactionPool, Client> {
executor: SubscriptionTaskExecutor,
/// The broadcast operation IDs.
broadcast_ids: Arc<RwLock<HashMap<String, BroadcastState<Pool>>>>,
/// Keep track of how many concurrent operations are active for each connection.
rpc_connections: RpcConnections,
}
/// The state of a broadcast operation.
......@@ -58,8 +66,19 @@ struct BroadcastState<Pool: TransactionPool> {
impl<Pool: TransactionPool, Client> TransactionBroadcast<Pool, Client> {
/// Creates a new [`TransactionBroadcast`].
pub fn new(client: Arc<Client>, pool: Arc<Pool>, executor: SubscriptionTaskExecutor) -> Self {
TransactionBroadcast { client, pool, executor, broadcast_ids: Default::default() }
pub fn new(
client: Arc<Client>,
pool: Arc<Pool>,
executor: SubscriptionTaskExecutor,
max_transactions_per_connection: usize,
) -> Self {
TransactionBroadcast {
client,
pool,
executor,
broadcast_ids: Default::default(),
rpc_connections: RpcConnections::new(max_transactions_per_connection),
}
}
/// Generate an unique operation ID for the `transaction_broadcast` RPC method.
......@@ -102,12 +121,26 @@ where
<Pool::Block as BlockT>::Hash: Unpin,
Client: HeaderBackend<Pool::Block> + BlockchainEvents<Pool::Block> + Send + Sync + 'static,
{
fn broadcast(&self, bytes: Bytes) -> RpcResult<Option<String>> {
async fn broadcast(
&self,
connection_details: ConnectionDetails,
bytes: Bytes,
) -> RpcResult<Option<String>> {
let pool = self.pool.clone();
// The unique ID of this operation.
let id = self.generate_unique_id();
// Ensure that the connection has not reached the maximum number of active operations.
let Some(reserved_connection) = self.rpc_connections.reserve_space(connection_details.id())
else {
return Ok(None)
};
let Some(reserved_identifier) = reserved_connection.register(id.clone()) else {
// This can only happen if the generated operation ID is not unique.
return Ok(None)
};
// The JSON-RPC server might check whether the transaction is valid before broadcasting it.
// If it does so and if the transaction is invalid, the server should silently do nothing
// and the JSON-RPC client is not informed of the problem. Invalid transactions should still
......@@ -118,7 +151,11 @@ where
// Save the tx hash to remove it later.
let tx_hash = pool.hash_of(&decoded_extrinsic);
let mut best_block_import_stream =
// The compiler can no longer deduce the type of the stream and complains
// about `one type is more general than the other`.
let mut best_block_import_stream: std::pin::Pin<
Box<dyn Stream<Item = <Pool::Block as BlockT>::Hash> + Send>,
> =
Box::pin(self.client.import_notification_stream().filter_map(
|notification| async move { notification.is_new_best.then_some(notification.hash) },
));
......@@ -180,6 +217,9 @@ where
// The future expected by the executor must be `Future<Output = ()>` instead of
// `Future<Output = Result<(), Aborted>>`.
let fut = fut.map(move |result| {
// Connection space is cleaned when this object is dropped.
drop(reserved_identifier);
// Remove the entry from the broadcast IDs map.
let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };
......@@ -203,7 +243,16 @@ where
Ok(Some(id))
}
fn stop_broadcast(&self, operation_id: String) -> Result<(), ErrorBroadcast> {
async fn stop_broadcast(
&self,
connection_details: ConnectionDetails,
operation_id: String,
) -> Result<(), ErrorBroadcast> {
// The operation ID must correlate to the same connection ID.
if !self.rpc_connections.contains_identifier(connection_details.id(), &operation_id) {
return Err(ErrorBroadcast::InvalidOperationID)
}
let mut broadcast_ids = self.broadcast_ids.write();
let Some(broadcast_state) = broadcast_ids.remove(&operation_id) else {
......
......@@ -644,10 +644,13 @@ where
(chain, state, child_state)
};
const MAX_TRANSACTION_PER_CONNECTION: usize = 16;
let transaction_broadcast_rpc_v2 = sc_rpc_spec_v2::transaction::TransactionBroadcast::new(
client.clone(),
transaction_pool.clone(),
task_executor.clone(),
MAX_TRANSACTION_PER_CONNECTION,
)
.into_rpc();
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment