Unverified Commit d7257026 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Add an upper number of maximum parallel runtime api requests (#2069)



* Add an upper number of maximum parallel runtime api requests

Instead of spawning all runtime api requests in the background and using
all wasm instances. This pr adds a maximum number of parallel requests.

* Update node/core/runtime-api/src/lib.rs

Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>

* Review feedback

* Increase instances

* Add warning

* Update node/core/runtime-api/src/lib.rs

Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>

Co-authored-by: Sergey Pepyakin's avatarSergei Shulepov <sergei@parity.io>
parent 59ee3ebf
Pipeline #116209 passed with stages
in 18 minutes and 3 seconds
......@@ -36,22 +36,38 @@ use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
use sp_api::ProvideRuntimeApi;
use sp_core::traits::SpawnNamed;
use futures::prelude::*;
use std::sync::Arc;
use futures::{prelude::*, stream::FuturesUnordered, channel::oneshot, select};
use std::{sync::Arc, collections::VecDeque, pin::Pin};
const LOG_TARGET: &str = "runtime_api";
/// The number of maximum runtime api requests can be executed in parallel. Further requests will be buffered.
const MAX_PARALLEL_REQUESTS: usize = 4;
/// The name of the blocking task that executes a runtime api request.
const API_REQUEST_TASK_NAME: &str = "polkadot-runtime-api-request";
/// The `RuntimeApiSubsystem`. See module docs for more details.
pub struct RuntimeApiSubsystem<Client> {
client: Arc<Client>,
metrics: Metrics,
spawn_handle: Box<dyn SpawnNamed>,
/// If there are [`MAX_PARALLEL_REQUESTS`] requests being executed, we buffer them in here until they can be executed.
waiting_requests: VecDeque<(Pin<Box<dyn Future<Output = ()> + Send>>, oneshot::Receiver<()>)>,
/// All the active runtime api requests that are currently being executed.
active_requests: FuturesUnordered<oneshot::Receiver<()>>,
}
impl<Client> RuntimeApiSubsystem<Client> {
/// Create a new Runtime API subsystem wrapping the given client and metrics.
pub fn new(client: Arc<Client>, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self {
RuntimeApiSubsystem { client, metrics, spawn_handle: Box::new(spawn_handle) }
RuntimeApiSubsystem {
client,
metrics,
spawn_handle: Box::new(spawn_handle),
waiting_requests: Default::default(),
active_requests: Default::default(),
}
}
}
......@@ -68,34 +84,82 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
}
}
impl<Client> RuntimeApiSubsystem<Client> where
Client: ProvideRuntimeApi<Block> + Send + 'static + Sync,
Client::Api: ParachainHost<Block>,
{
/// Spawn a runtime api request.
///
/// If there are already [`MAX_PARALLEL_REQUESTS`] requests being executed, the request will be buffered.
fn spawn_request(&mut self, relay_parent: Hash, request: Request) {
let client = self.client.clone();
let metrics = self.metrics.clone();
let (sender, receiver) = oneshot::channel();
let request = async move {
make_runtime_api_request(
client,
metrics,
relay_parent,
request,
);
let _ = sender.send(());
}.boxed();
if self.active_requests.len() >= MAX_PARALLEL_REQUESTS {
self.waiting_requests.push_back((request, receiver));
if self.waiting_requests.len() > MAX_PARALLEL_REQUESTS * 10 {
tracing::warn!(
target: LOG_TARGET,
"{} runtime api requests waiting to be executed.",
self.waiting_requests.len(),
)
}
} else {
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, request);
self.active_requests.push(receiver);
}
}
/// Poll the active runtime api requests.
async fn poll_requests(&mut self) {
// If there are no active requests, this future should be pending forever.
if self.active_requests.len() == 0 {
return futures::pending!()
}
// If there are active requests, this will always resolve to `Some(_)` when a request is finished.
let _ = self.active_requests.next().await;
if let Some((req, recv)) = self.waiting_requests.pop_front() {
self.spawn_handle.spawn_blocking(API_REQUEST_TASK_NAME, req);
self.active_requests.push(recv);
}
}
}
#[tracing::instrument(skip(ctx, subsystem), fields(subsystem = LOG_TARGET))]
async fn run<Client>(
mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
subsystem: RuntimeApiSubsystem<Client>,
mut subsystem: RuntimeApiSubsystem<Client>,
) -> SubsystemResult<()> where
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block>,
{
loop {
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => {
let client = subsystem.client.clone();
let metrics = subsystem.metrics.clone();
subsystem.spawn_handle.spawn_blocking("polkadot-runtime-api-request", async move {
make_runtime_api_request(
client,
metrics,
relay_parent,
request,
)
}.boxed())
},
}
select! {
req = ctx.recv().fuse() => match req? {
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => {
subsystem.spawn_request(relay_parent, request);
},
}
},
_ = subsystem.poll_requests().fuse() => {},
}
}
}
......@@ -213,7 +277,7 @@ mod tests {
};
use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor;
use std::collections::{HashMap, BTreeMap};
use std::{collections::{HashMap, BTreeMap}, sync::{Arc, Mutex}};
use futures::channel::oneshot;
#[derive(Default, Clone)]
......@@ -221,6 +285,7 @@ mod tests {
validators: Vec<ValidatorId>,
validator_groups: Vec<Vec<ValidatorIndex>>,
availability_cores: Vec<CoreState>,
availability_cores_wait: Arc<Mutex<()>>,
validation_data: HashMap<ParaId, ValidationData>,
session_index_for_child: SessionIndex,
session_info: HashMap<SessionIndex, SessionInfo>,
......@@ -261,6 +326,7 @@ mod tests {
}
fn availability_cores(&self) -> Vec<CoreState> {
let _ = self.availability_cores_wait.lock().unwrap();
self.availability_cores.clone()
}
......@@ -916,4 +982,44 @@ mod tests {
futures::executor::block_on(future::join(subsystem_task, test_task));
}
#[test]
fn multiple_requests_in_parallel_are_working() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let mutex = runtime_api.availability_cores_wait.clone();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
// Make all requests block until we release this mutex.
let lock = mutex.lock().unwrap();
let mut receivers = Vec::new();
for _ in 0..MAX_PARALLEL_REQUESTS * 10 {
let (tx, rx) = oneshot::channel();
ctx_handle.send(FromOverseer::Communication {
msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx))
}).await;
receivers.push(rx);
}
let join = future::join_all(receivers);
drop(lock);
join.await
.into_iter()
.for_each(|r| assert_eq!(r.unwrap().unwrap(), runtime_api.availability_cores));
ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await;
};
futures::executor::block_on(future::join(subsystem_task, test_task));
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment