Unverified Commit f25628ac authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Companion for #6726 (#1469)

* Companion for #6726

* Spaces

* 'Update substrate'

Co-authored-by: parity-processbot <>
parent c206b36b
Pipeline #101683 failed with stages
in 23 minutes and 35 seconds
This diff is collapsed.
......@@ -30,25 +30,12 @@ use av_store::{Store as AvailabilityStore, ErasureNetworking};
use sc_network_gossip::TopicNotification;
use sp_api::{ApiRef, ProvideRuntimeApi};
use sp_runtime::traits::Block as BlockT;
use sp_core::{crypto::Pair, traits::SpawnNamed};
use sp_core::{crypto::Pair, testing::TaskExecutor};
use sp_keyring::Sr25519Keyring;
use futures::executor::{LocalPool, LocalSpawner};
use futures::executor::LocalPool;
use futures::task::{LocalSpawnExt, SpawnExt};
#[derive(Clone)]
struct Executor(LocalSpawner);
impl SpawnNamed for Executor {
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_local(future).unwrap();
}
fn spawn_blocking(&self, name: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.spawn(name, future);
}
}
#[derive(Default)]
pub struct MockNetworkOps {
recorded: Mutex<Recorded>,
......@@ -256,7 +243,7 @@ fn test_setup(config: Config) -> (
mock_gossip.clone(),
api.clone(),
worker_rx,
Executor(pool.spawner()),
TaskExecutor::new(),
);
let service = Service {
......
......@@ -88,7 +88,7 @@ fn import_single_good_block_without_header_fails() {
#[test]
fn async_import_queue_drops() {
let executor = sp_core::testing::SpawnBlockingExecutor::new();
let executor = sp_core::testing::TaskExecutor::new();
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier::new(true);
......
......@@ -573,7 +573,7 @@ pub trait TestNetFactory: Sized {
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
&sp_core::testing::SpawnBlockingExecutor::new(),
&sp_core::testing::TaskExecutor::new(),
None,
));
......@@ -650,7 +650,7 @@ pub trait TestNetFactory: Sized {
Box::new(block_import.clone()),
justification_import,
finality_proof_import,
&sp_core::testing::SpawnBlockingExecutor::new(),
&sp_core::testing::TaskExecutor::new(),
None,
));
......
......@@ -903,7 +903,7 @@ mod tests {
}
fn test_harness<T: Future<Output=()>>(keystore: KeyStorePtr, test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (context, virtual_overseer) = polkadot_subsystem::test_helpers::make_subsystem_context(pool.clone());
......
......@@ -706,7 +706,7 @@ mod test {
signed_availability: signed.clone(),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
......@@ -766,7 +766,7 @@ mod test {
signed_availability: signed.clone(),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
......@@ -818,7 +818,7 @@ mod test {
signed_availability: signed_bitfield.clone(),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
......@@ -915,7 +915,7 @@ mod test {
signed_availability: signed_bitfield.clone(),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
......@@ -1052,7 +1052,7 @@ mod test {
// validator 0 key pair
let (mut state, _signing_context, _validator_pair) = state_with_view(view![], hash_a.clone());
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) =
make_subsystem_context::<BitfieldDistributionMessage, _>(pool);
......
......@@ -635,7 +635,7 @@ mod tests {
}
fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (network, network_handle) = new_test_network();
let (context, virtual_overseer) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
......
......@@ -619,7 +619,7 @@ mod tests {
our_view: View(vec![hash_a, hash_b]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
......@@ -699,7 +699,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let mut descriptor = CandidateDescriptor::default();
descriptor.pov_hash = pov_hash;
......@@ -777,7 +777,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -849,7 +849,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -937,7 +937,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1000,7 +1000,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1061,7 +1061,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1119,7 +1119,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1204,7 +1204,7 @@ mod tests {
our_view: View(vec![hash_a, hash_b]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1266,7 +1266,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1343,7 +1343,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......@@ -1426,7 +1426,7 @@ mod tests {
our_view: View(vec![hash_a]),
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......
......@@ -1212,7 +1212,7 @@ mod tests {
},
};
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
let peer = PeerId::random();
......@@ -1304,7 +1304,7 @@ mod tests {
(peer_c.clone(), peer_data_from_view(peer_c_view)),
].into_iter().collect();
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (mut ctx, mut handle) = polkadot_subsystem::test_helpers::make_subsystem_context(pool);
executor::block_on(async move {
......
......@@ -135,7 +135,7 @@ impl<C> Subsystem<C> for Subsystem2
fn main() {
femme::with_level(femme::LevelFilter::Trace);
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
let spawner = sp_core::testing::TaskExecutor::new();
futures::executor::block_on(async {
let timer_stream = stream::repeat(()).then(|_| async {
Delay::new(Duration::from_secs(1)).await;
......
......@@ -485,7 +485,7 @@ where
/// }
///
/// # fn main() { executor::block_on(async move {
/// let spawner = sp_core::testing::SpawnBlockingExecutor::new();
/// let spawner = sp_core::testing::TaskExecutor::new();
/// let all_subsystems = AllSubsystems {
/// candidate_validation: ValidationSubsystem,
/// candidate_backing: DummySubsystem,
......@@ -1058,7 +1058,7 @@ mod tests {
// Checks that a minimal configuration of two jobs can run and exchange messages.
#[test]
fn overseer_works() {
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let (s1_tx, mut s1_rx) = mpsc::channel(64);
......@@ -1123,7 +1123,7 @@ mod tests {
// Should immediately conclude the overseer itself with an error.
#[test]
fn overseer_panics_on_subsystem_exit() {
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let (s1_tx, _) = mpsc::channel(64);
......@@ -1218,7 +1218,7 @@ mod tests {
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
#[test]
fn overseer_start_stop_works() {
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let first_block_hash = [1; 32].into();
......@@ -1314,7 +1314,7 @@ mod tests {
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
#[test]
fn overseer_finalize_works() {
let spawner = sp_core::testing::SpawnBlockingExecutor::new();
let spawner = sp_core::testing::TaskExecutor::new();
executor::block_on(async move {
let first_block_hash = [1; 32].into();
......
......@@ -891,7 +891,7 @@ mod tests {
type OverseerHandle = test_helpers::TestSubsystemContextHandle<CandidateSelectionMessage>;
fn test_harness<T: Future<Output=()>>(run_args: HashMap<Hash, Vec<FromJob>>, test: impl FnOnce(OverseerHandle, mpsc::Receiver<(Option<Hash>, JobsError<Error>)>) -> T) {
let pool = sp_core::testing::SpawnBlockingExecutor::new();
let pool = sp_core::testing::TaskExecutor::new();
let (context, overseer_handle) = make_subsystem_context(pool.clone());
let (err_tx, err_rx) = mpsc::channel(16);
......
......@@ -23,6 +23,7 @@ sc-executor = { git = "https://github.com/paritytech/substrate", branch = "maste
sp-io = { git = "https://github.com/paritytech/substrate", branch = "master", optional = true }
parking_lot = { version = "0.10.0", optional = true }
log = { version = "0.4.8", optional = true }
futures = { version = "0.3.4", optional = true }
[target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies]
shared_memory = { version = "0.10.0", optional = true }
......@@ -43,4 +44,5 @@ std = [
"sc-executor",
"sp-io",
"polkadot-core-primitives/std",
"futures",
]
......@@ -23,8 +23,7 @@
use std::any::{TypeId, Any};
use crate::primitives::{ValidationParams, ValidationResult};
use codec::{Decode, Encode};
use sp_core::storage::ChildInfo;
use sp_core::traits::CallInWasm;
use sp_core::{storage::ChildInfo, traits::{CallInWasm, SpawnNamed}};
use sp_externalities::Extensions;
use sp_wasm_interface::HostFunctions as _;
......@@ -119,10 +118,11 @@ pub fn validate_candidate(
validation_code: &[u8],
params: ValidationParams,
options: ExecutionMode<'_>,
spawner: impl SpawnNamed + 'static,
) -> Result<ValidationResult, Error> {
match options {
ExecutionMode::Local => {
validate_candidate_internal(validation_code, &params.encode())
validate_candidate_internal(validation_code, &params.encode(), spawner)
},
#[cfg(not(any(target_os = "android", target_os = "unknown")))]
ExecutionMode::Remote(pool) => {
......@@ -154,9 +154,10 @@ type HostFunctions = sp_io::SubstrateHostFunctions;
pub fn validate_candidate_internal(
validation_code: &[u8],
encoded_call_data: &[u8],
spawner: impl SpawnNamed + 'static,
) -> Result<ValidationResult, Error> {
let mut extensions = Extensions::new();
extensions.register(sp_core::traits::TaskExecutorExt(sp_core::tasks::executor()));
extensions.register(sp_core::traits::TaskExecutorExt::new(spawner));
let mut ext = ValidationExternalities(extensions);
......
......@@ -19,11 +19,12 @@
use std::{process, env, sync::Arc, sync::atomic};
use codec::{Decode, Encode};
use crate::primitives::{ValidationParams, ValidationResult};
use super::{validate_candidate_internal, Error};
use super::{MAX_CODE_MEM, MAX_RUNTIME_MEM};
use super::{validate_candidate_internal, Error, MAX_CODE_MEM, MAX_RUNTIME_MEM};
use shared_memory::{SharedMem, SharedMemConf, EventState, WriteLockable, EventWait, EventSet};
use parking_lot::Mutex;
use log::{debug, trace};
use futures::executor::ThreadPool;
use sp_core::traits::SpawnNamed;
const WORKER_ARGS_TEST: &[&'static str] = &["--nocapture", "validation_worker"];
/// CLI Argument to start in validation worker mode.
......@@ -43,6 +44,25 @@ enum Event {
WorkerReady = 2,
}
#[derive(Clone)]
struct TaskExecutor(ThreadPool);
impl TaskExecutor {
fn new() -> Result<Self, String> {
ThreadPool::new().map_err(|e| e.to_string()).map(Self)
}
}
impl SpawnNamed for TaskExecutor {
fn spawn_blocking(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_ok(future);
}
fn spawn(&self, _: &'static str, future: futures::future::BoxFuture<'static, ()>) {
self.0.spawn_ok(future);
}
}
/// A pool of hosts.
#[derive(Clone)]
pub struct ValidationPool {
......@@ -92,6 +112,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
};
let exit = Arc::new(atomic::AtomicBool::new(false));
let task_executor = TaskExecutor::new()?;
// spawn parent monitor thread
let watch_exit = exit.clone();
std::thread::spawn(move || {
......@@ -139,7 +160,7 @@ pub fn run_worker(mem_id: &str) -> Result<(), String> {
let (call_data, _) = rest.split_at_mut(MAX_RUNTIME_MEM);
let (call_data, _) = call_data.split_at_mut(header.params_size as usize);
let result = validate_candidate_internal(code, call_data);
let result = validate_candidate_internal(code, call_data, task_executor.clone());
debug!("{} Candidate validated: {:?}", process::id(), result);
match result {
......
......@@ -14,6 +14,9 @@ adder = { package = "test-parachain-adder", path = "adder" }
halt = { package = "test-parachain-halt", path = "halt" }
code-upgrader = { package = "test-parachain-code-upgrader", path = "code-upgrader" }
[dev-dependencies]
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
[features]
default = [ "std" ]
std = [
......
......@@ -78,6 +78,7 @@ pub fn execute_good_on_parent() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
......@@ -117,6 +118,7 @@ fn execute_good_chain_on_parent() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
......@@ -157,5 +159,6 @@ fn execute_bad_on_parent() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap_err();
}
......@@ -50,6 +50,7 @@ pub fn execute_good_no_upgrade() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
......@@ -86,6 +87,7 @@ pub fn execute_good_with_upgrade() {
code_upgrade_allowed: Some(20),
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
......@@ -129,6 +131,7 @@ pub fn code_upgrade_not_allowed() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();
}
......@@ -159,6 +162,7 @@ pub fn applies_code_upgrade_after_delay() {
code_upgrade_allowed: Some(2),
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
......@@ -194,6 +198,7 @@ pub fn applies_code_upgrade_after_delay() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
).unwrap();
let new_head = HeadData::decode(&mut &ret.head_data.0[..]).unwrap();
......
......@@ -37,6 +37,7 @@ fn terminates_on_timeout() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
);
match result {
Err(parachain::wasm_executor::Error::Timeout) => {},
......@@ -66,6 +67,7 @@ fn parallel_execution() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool2),
sp_core::testing::TaskExecutor::new(),
).ok());
let _ = parachain::wasm_executor::validate_candidate(
halt::wasm_binary_unwrap(),
......@@ -78,6 +80,7 @@ fn parallel_execution() {
code_upgrade_allowed: None,
},
parachain::wasm_executor::ExecutionMode::RemoteTest(&pool),
sp_core::testing::TaskExecutor::new(),
);
thread.join().unwrap();
// total time should be < 2 x EXECUTION_TIMEOUT_SEC
......
......@@ -295,7 +295,7 @@ pub fn new_light() -> (
let local_call_executor = LocalCallExecutor::new(
backend.clone(),
executor,
sp_core::tasks::executor(),
Box::new(sp_core::testing::TaskExecutor::new()),
Default::default()
);
let call_executor = LightExecutor::new(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment