Unverified Commit d00c05ba authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Process runtime api requests in the background (#2035)

This pr changes how the runtime api subsystem processes runtime api
requests. Instead of answering all of them in the subsystem task and
thus, making all requests sequential, we now answer them in a background
task. This enables us to serve multiple requests at once.
parent d8ae52a8
Pipeline #115427 passed with stages
in 21 minutes and 21 seconds
......@@ -5450,12 +5450,10 @@ dependencies = [
"futures 0.3.8",
"hex-literal",
"kusama-runtime",
"lazy_static",
"pallet-babe",
"pallet-im-online",
"pallet-staking",
"pallet-transaction-payment-rpc-runtime-api",
"parking_lot 0.11.1",
"polkadot-availability-bitfield-distribution",
"polkadot-availability-distribution",
"polkadot-collator-protocol",
......@@ -5497,7 +5495,6 @@ dependencies = [
"sc-telemetry",
"sc-transaction-pool",
"serde",
"slog",
"sp-api",
"sp-authority-discovery",
"sp-block-builder",
......
......@@ -9,6 +9,7 @@ futures = "0.3.8"
tracing = "0.1.22"
tracing-futures = "0.2.4"
sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
......
......@@ -30,15 +30,14 @@ use polkadot_subsystem::{
},
errors::RuntimeApiError,
};
use polkadot_node_subsystem_util::{
metrics::{self, prometheus},
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost};
use std::sync::Arc;
use sp_api::{ProvideRuntimeApi};
use sp_api::ProvideRuntimeApi;
use sp_core::traits::SpawnNamed;
use futures::prelude::*;
use std::sync::Arc;
const LOG_TARGET: &str = "runtime_api";
......@@ -46,12 +45,13 @@ const LOG_TARGET: &str = "runtime_api";
pub struct RuntimeApiSubsystem<Client> {
client: Arc<Client>,
metrics: Metrics,
spawn_handle: Box<dyn SpawnNamed>,
}
impl<Client> RuntimeApiSubsystem<Client> {
/// Create a new Runtime API subsystem wrapping the given client and metrics.
pub fn new(client: Arc<Client>, metrics: Metrics) -> Self {
RuntimeApiSubsystem { client, metrics }
pub fn new(client: Arc<Client>, metrics: Metrics, spawn_handle: impl SpawnNamed + 'static) -> Self {
RuntimeApiSubsystem { client, metrics, spawn_handle: Box::new(spawn_handle) }
}
}
......@@ -73,7 +73,7 @@ async fn run<Client>(
mut ctx: impl SubsystemContext<Message = RuntimeApiMessage>,
subsystem: RuntimeApiSubsystem<Client>,
) -> SubsystemResult<()> where
Client: ProvideRuntimeApi<Block>,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block>,
{
loop {
......@@ -82,12 +82,19 @@ async fn run<Client>(
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {},
FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request(
&*subsystem.client,
&subsystem.metrics,
relay_parent,
request,
),
RuntimeApiMessage::Request(relay_parent, request) => {
let client = subsystem.client.clone();
let metrics = subsystem.metrics.clone();
subsystem.spawn_handle.spawn_blocking("polkadot-runtime-api-request", async move {
make_runtime_api_request(
client,
metrics,
relay_parent,
request,
)
}.boxed())
},
}
}
}
......@@ -95,8 +102,8 @@ async fn run<Client>(
#[tracing::instrument(level = "trace", skip(client, metrics), fields(subsystem = LOG_TARGET))]
fn make_runtime_api_request<Client>(
client: &Client,
metrics: &Metrics,
client: Arc<Client>,
metrics: Metrics,
relay_parent: Hash,
request: Request,
) where
......@@ -347,8 +354,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -370,8 +378,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -393,8 +402,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -414,14 +424,16 @@ mod tests {
#[test]
fn requests_persisted_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_data.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -454,14 +466,16 @@ mod tests {
#[test]
fn requests_full_validation_data() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
Arc::get_mut(&mut runtime_api).unwrap().validation_data.insert(para_a, Default::default());
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_data.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -499,13 +513,14 @@ mod tests {
let para_a = 5.into();
let para_b = 6.into();
let commitments = polkadot_primitives::v1::CandidateCommitments::default();
let spawner = sp_core::testing::TaskExecutor::new();
runtime_api.validation_outputs_results.insert(para_a, false);
runtime_api.validation_outputs_results.insert(para_b, true);
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -552,8 +567,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -577,10 +593,11 @@ mod tests {
let session_index = 1;
runtime_api.session_info.insert(session_index, Default::default());
let runtime_api = Arc::new(runtime_api);
let spawner = sp_core::testing::TaskExecutor::new();
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -600,14 +617,17 @@ mod tests {
#[test]
fn requests_validation_code() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
Arc::get_mut(&mut runtime_api).unwrap().validation_code.insert(para_a, Default::default());
let mut runtime_api = MockRuntimeApi::default();
runtime_api.validation_code.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -640,16 +660,16 @@ mod tests {
#[test]
fn requests_candidate_pending_availability() {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let mut runtime_api = MockRuntimeApi::default();
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let mut runtime_api = MockRuntimeApi::default();
runtime_api.candidate_pending_availability.insert(para_a, Default::default());
let runtime_api = Arc::new(runtime_api);
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -685,8 +705,9 @@ mod tests {
let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new());
let runtime_api = Arc::new(MockRuntimeApi::default());
let relay_parent = [1; 32].into();
let spawner = sp_core::testing::TaskExecutor::new();
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -710,6 +731,7 @@ mod tests {
let relay_parent = [1; 32].into();
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
......@@ -726,7 +748,7 @@ mod tests {
runtime_api
});
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -766,6 +788,7 @@ mod tests {
let para_a = 99.into();
let para_b = 66.into();
let para_c = 33.into();
let spawner = sp_core::testing::TaskExecutor::new();
let para_b_inbound_channels = [
(para_a, vec![]),
......@@ -792,7 +815,7 @@ mod tests {
runtime_api
});
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api.clone(), Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
let (tx, rx) = oneshot::channel();
......@@ -830,6 +853,7 @@ mod tests {
let para_a = 5.into();
let para_b = 6.into();
let spawner = sp_core::testing::TaskExecutor::new();
let runtime_api = Arc::new({
let mut runtime_api = MockRuntimeApi::default();
......@@ -848,7 +872,7 @@ mod tests {
});
let relay_parent = [1; 32].into();
let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None));
let subsystem = RuntimeApiSubsystem::new(runtime_api, Metrics(None), spawner);
let subsystem_task = run(ctx, subsystem).map(|x| x.unwrap());
let test_task = async move {
{
......
......@@ -54,12 +54,9 @@ prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https:
# External Crates
futures = "0.3.8"
hex-literal = "0.3.1"
lazy_static = "1.4.0"
tracing = "0.1.22"
tracing-futures = "0.2.4"
parking_lot = "0.11.1"
serde = { version = "1.0.117", features = ["derive"] }
slog = "2.5.2"
# Polkadot
polkadot-node-core-proposer = { path = "../core/proposer" }
......
......@@ -406,6 +406,7 @@ where
runtime_api: RuntimeApiSubsystem::new(
runtime_client,
Metrics::register(registry)?,
spawner.clone(),
),
statement_distribution: StatementDistributionSubsystem::new(
Metrics::register(registry)?,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment