Commit 08819d66 authored by Bryan Chen's avatar Bryan Chen
Browse files

Merge remote-tracking branch 'origin/master' into update-scheduler

parents 3703dcea 17650cef
Pipeline #98911 failed with stages
in 8 minutes and 2 seconds
This diff is collapsed.
......@@ -42,10 +42,12 @@ members = [
"service",
"validation",
"node/messages",
"node/network/bridge",
"node/overseer",
"node/primitives",
"node/service",
"node/subsystem",
"node/test-helpers/subsystem",
"parachain/test-parachains",
"parachain/test-parachains/adder",
......
......@@ -46,8 +46,8 @@ async fn start_inner(chain_spec: String, log_level: String) -> Result<Client, Bo
info!("👤 Role: {}", config.display_role());
// Create the service. This is the most heavy initialization step.
let service = service::kusama_new_light(config)
let (task_manager, rpc_handlers) = service::kusama_new_light(config)
.map_err(|e| format!("{:?}", e))?;
Ok(browser_utils::start_client(service))
Ok(browser_utils::start_client(task_manager, rpc_handlers))
}
......@@ -19,8 +19,7 @@ use log::info;
use service::{IdentifyVariant, self};
#[cfg(feature = "service-rewr")]
use service_new::{IdentifyVariant, self as service};
use sc_executor::NativeExecutionDispatch;
use sc_cli::{SubstrateCli, Result};
use sc_cli::{SubstrateCli, Result, RuntimeVersion, Role};
use crate::cli::{Cli, Subcommand};
fn get_exec_name() -> Option<String> {
......@@ -75,6 +74,16 @@ impl SubstrateCli for Cli {
path => Box::new(service::PolkadotChainSpec::from_json_file(std::path::PathBuf::from(path))?),
})
}
fn native_runtime_version(spec: &Box<dyn service::ChainSpec>) -> &'static RuntimeVersion {
if spec.is_kusama() {
&service::kusama_runtime::VERSION
} else if spec.is_westend() {
&service::westend_runtime::VERSION
} else {
&service::polkadot_runtime::VERSION
}
}
}
/// Parses polkadot specific CLI arguments and run the service.
......@@ -116,56 +125,44 @@ pub fn run() -> Result<()> {
info!(" KUSAMA FOUNDATION ");
info!("----------------------------");
runtime.run_node(
|config| {
service::kusama_new_light(config)
},
|config| {
service::kusama_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(s, _, _)| s)
},
service::KusamaExecutor::native_version().runtime_version
)
runtime.run_node_until_exit(|config| match config.role {
Role::Light => service::kusama_new_light(config)
.map(|(components, _)| components),
_ => service::kusama_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(components, _, _)| components)
})
} else if chain_spec.is_westend() {
runtime.run_node(
|config| {
service::westend_new_light(config)
},
|config| {
service::westend_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(s, _, _)| s)
},
service::WestendExecutor::native_version().runtime_version
)
runtime.run_node_until_exit(|config| match config.role {
Role::Light => service::westend_new_light(config)
.map(|(components, _)| components),
_ => service::westend_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(components, _, _)| components)
})
} else {
runtime.run_node(
|config| {
service::polkadot_new_light(config)
},
|config| {
service::polkadot_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(s, _, _)| s)
},
service::PolkadotExecutor::native_version().runtime_version
)
runtime.run_node_until_exit(|config| match config.role {
Role::Light => service::polkadot_new_light(config)
.map(|(components, _)| components),
_ => service::polkadot_new_full(
config,
None,
None,
authority_discovery_enabled,
6000,
grandpa_pause,
).map(|(components, _, _)| components)
})
}
},
Some(Subcommand::Base(subcommand)) => {
......
......@@ -28,14 +28,14 @@ mod command;
#[cfg(not(feature = "service-rewr"))]
pub use service::{
AbstractService, ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
Block, self, RuntimeApiCollection, TFullClient
};
#[cfg(feature = "service-rewr")]
pub use service_new::{
self as service,
AbstractService, ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
ProvideRuntimeApi, CoreApi, ParachainHost, IdentifyVariant,
Block, self, RuntimeApiCollection, TFullClient
};
......
......@@ -63,7 +63,7 @@ use polkadot_primitives::{
}
};
use polkadot_cli::{
ProvideRuntimeApi, AbstractService, ParachainHost, IdentifyVariant,
ProvideRuntimeApi, ParachainHost, IdentifyVariant,
service::{self, Role}
};
pub use polkadot_cli::service::Configuration;
......@@ -81,6 +81,7 @@ use polkadot_service_new::{
self as polkadot_service,
Error as ServiceError, FullNodeHandles, PolkadotClient,
};
use sc_service::SpawnTaskHandle;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
......@@ -236,8 +237,8 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
#[cfg(not(feature = "service-rewr"))]
fn build_collator_service<SP, P, C, R, Extrinsic>(
spawner: SP,
fn build_collator_service<P, C, R, Extrinsic>(
spawner: SpawnTaskHandle,
handles: FullNodeHandles,
client: Arc<C>,
para_id: ParaId,
......@@ -265,7 +266,6 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
Extrinsic: service::Codec + Send + Sync + 'static,
SP: Spawn + Clone + Send + Sync + 'static,
{
let polkadot_network = handles.polkadot_network
.ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;
......@@ -278,7 +278,7 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
let parachain_context = match build_parachain_context.build(
client.clone(),
spawner,
spawner.clone(),
polkadot_network.clone(),
) {
Ok(ctx) => ctx,
......@@ -359,7 +359,7 @@ fn build_collator_service<SP, P, C, R, Extrinsic>(
let future = silenced.map(drop);
tokio::spawn(future);
spawner.spawn("collation-work", future);
}
}.boxed();
......@@ -386,7 +386,7 @@ where
}
if config.chain_spec.is_kusama() {
let (service, client, handlers) = service::kusama_new_full(
let (task_manager, client, handlers) = service::kusama_new_full(
config,
Some((key.public(), para_id)),
None,
......@@ -394,7 +394,7 @@ where
6000,
None,
)?;
let spawn_handle = service.spawn_task_handle();
let spawn_handle = task_manager.spawn_handle();
build_collator_service(
spawn_handle,
handlers,
......@@ -404,7 +404,7 @@ where
build_parachain_context
)?.await;
} else if config.chain_spec.is_westend() {
let (service, client, handlers) = service::westend_new_full(
let (task_manager, client, handlers) = service::westend_new_full(
config,
Some((key.public(), para_id)),
None,
......@@ -412,7 +412,7 @@ where
6000,
None,
)?;
let spawn_handle = service.spawn_task_handle();
let spawn_handle = task_manager.spawn_handle();
build_collator_service(
spawn_handle,
handlers,
......@@ -422,7 +422,7 @@ where
build_parachain_context
)?.await;
} else {
let (service, client, handles) = service::polkadot_new_full(
let (task_manager, client, handles) = service::polkadot_new_full(
config,
Some((key.public(), para_id)),
None,
......@@ -430,7 +430,7 @@ where
6000,
None,
)?;
let spawn_handle = service.spawn_task_handle();
let spawn_handle = task_manager.spawn_handle();
build_collator_service(
spawn_handle,
handles,
......
......@@ -60,7 +60,7 @@ fn import_single_good_block_works() {
let mut expected_aux = ImportedAux::default();
expected_aux.is_new_best = true;
match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Ok(BlockImportResult::ImportedUnknown(ref num, ref aux, ref org))
if *num == number as u32 && *aux == expected_aux && *org == Some(peer_id) => {}
r @ _ => panic!("{:?}", r)
......@@ -70,7 +70,7 @@ fn import_single_good_block_works() {
#[test]
fn import_single_good_known_block_is_ignored() {
let (mut client, _hash, number, _, block) = prepare_good_block();
match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(&mut client, BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Ok(BlockImportResult::ImportedKnown(ref n)) if *n == number as u32 => {}
_ => panic!()
}
......@@ -80,7 +80,7 @@ fn import_single_good_known_block_is_ignored() {
fn import_single_good_block_without_header_fails() {
let (_, _, _, peer_id, mut block) = prepare_good_block();
block.header = None;
match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier(true)) {
match import_single_block(&mut polkadot_test_runtime_client::new(), BlockOrigin::File, block, &mut PassThroughVerifier::new(true)) {
Err(BlockImportError::IncompleteHeader(ref org)) if *org == Some(peer_id) => {}
_ => panic!()
}
......@@ -91,7 +91,7 @@ fn async_import_queue_drops() {
let executor = sp_core::testing::SpawnBlockingExecutor::new();
// Perform this test multiple times since it exhibits non-deterministic behavior.
for _ in 0..100 {
let verifier = PassThroughVerifier(true);
let verifier = PassThroughVerifier::new(true);
let queue = BasicQueue::new(
verifier,
......
......@@ -600,7 +600,7 @@ pub trait TestNetFactory: Sized {
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
}).unwrap();
......@@ -677,7 +677,7 @@ pub trait TestNetFactory: Sized {
transaction_pool: Arc::new(EmptyTransactionPool),
protocol_id: ProtocolId::from(&b"test-protocol-name"[..]),
import_queue,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
}).unwrap();
......@@ -804,7 +804,7 @@ impl TestNetFactory for TestNet {
fn make_verifier(&self, _client: PeersClient, _config: &ProtocolConfig, _peer_data: &())
-> Self::Verifier
{
PassThroughVerifier(false)
PassThroughVerifier::new(false)
}
fn peer(&mut self, i: usize) -> &mut Peer<()> {
......
[package]
name = "polkadot-network-bridge"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.8"
futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../../primitives" }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
parity-scale-codec = "1.3.0"
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
[dev-dependencies]
parking_lot = "0.10.0"
subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" }
assert_matches = "1.3.0"
This diff is collapsed.
......@@ -11,7 +11,8 @@ futures-timer = "3.0.2"
streamunordered = "0.5.1"
polkadot-primitives = { path = "../../primitives" }
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
messages = { package = "polkadot-node-messages", path = "../messages" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
async-trait = "0.1"
[dev-dependencies]
futures = { version = "0.3.5", features = ["thread-pool"] }
......
......@@ -28,16 +28,17 @@ use futures_timer::Delay;
use kv_log_macro as log;
use polkadot_primitives::parachain::{BlockData, PoVBlock};
use polkadot_overseer::{Overseer, Subsystem, SubsystemContext, SpawnedSubsystem};
use polkadot_overseer::Overseer;
use messages::{
AllMessages, CandidateBackingMessage, FromOverseer, CandidateValidationMessage
use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer};
use polkadot_subsystem::messages::{
AllMessages, CandidateBackingMessage, CandidateValidationMessage
};
struct Subsystem1;
impl Subsystem1 {
async fn run(mut ctx: SubsystemContext<CandidateBackingMessage>) {
async fn run(mut ctx: impl SubsystemContext<Message=CandidateBackingMessage>) {
loop {
match ctx.try_recv().await {
Ok(Some(msg)) => {
......@@ -56,7 +57,7 @@ impl Subsystem1 {
Delay::new(Duration::from_secs(1)).await;
let (tx, _) = oneshot::channel();
ctx.send_msg(AllMessages::CandidateValidation(
ctx.send_message(AllMessages::CandidateValidation(
CandidateValidationMessage::Validate(
Default::default(),
Default::default(),
......@@ -70,8 +71,10 @@ impl Subsystem1 {
}
}
impl Subsystem<CandidateBackingMessage> for Subsystem1 {
fn start(&mut self, ctx: SubsystemContext<CandidateBackingMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for Subsystem1
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
......@@ -81,7 +84,7 @@ impl Subsystem<CandidateBackingMessage> for Subsystem1 {
struct Subsystem2;
impl Subsystem2 {
async fn run(mut ctx: SubsystemContext<CandidateValidationMessage>) {
async fn run(mut ctx: impl SubsystemContext<Message=CandidateValidationMessage>) {
ctx.spawn(Box::pin(async {
loop {
log::info!("Job tick");
......@@ -105,8 +108,10 @@ impl Subsystem2 {
}
}
impl Subsystem<CandidateValidationMessage> for Subsystem2 {
fn start(&mut self, ctx: SubsystemContext<CandidateValidationMessage>) -> SpawnedSubsystem {
impl<C> Subsystem<C> for Subsystem2
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
......@@ -124,8 +129,8 @@ fn main() {
let (overseer, _handler) = Overseer::new(
vec![],
Box::new(Subsystem2),
Box::new(Subsystem1),
Subsystem2,
Subsystem1,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
......
......@@ -65,8 +65,8 @@ use futures::channel::{mpsc, oneshot};
use futures::{
pending, poll, select,
future::{BoxFuture, RemoteHandle},
stream::FuturesUnordered,
task::{Spawn, SpawnError, SpawnExt},
stream::{self, FuturesUnordered},
task::{Spawn, SpawnExt},
Future, FutureExt, SinkExt, StreamExt,
};
use futures_timer::Delay;
......@@ -75,50 +75,14 @@ use streamunordered::{StreamYield, StreamUnordered};
use polkadot_primitives::{Block, BlockNumber, Hash};
use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
pub use messages::{
OverseerSignal, CandidateValidationMessage, CandidateBackingMessage, AllMessages,
FromOverseer,
use polkadot_subsystem::messages::{
CandidateValidationMessage, CandidateBackingMessage, AllMessages
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem,
};
/// An error type that describes faults that may happen
///
/// These are:
/// * Channels being closed
/// * Subsystems dying when they are not expected to
/// * Subsystems not dying when they are told to die
/// * etc.
#[derive(Debug)]
pub struct SubsystemError;
impl From<mpsc::SendError> for SubsystemError {
fn from(_: mpsc::SendError) -> Self {
Self
}
}
impl From<oneshot::Canceled> for SubsystemError {
fn from(_: oneshot::Canceled) -> Self {
Self
}
}
impl From<SpawnError> for SubsystemError {
fn from(_: SpawnError) -> Self {
Self
}
}
/// A `Result` type that wraps [`SubsystemError`].
///
/// [`SubsystemError`]: struct.SubsystemError.html
pub type SubsystemResult<T> = Result<T, SubsystemError>;
/// An asynchronous subsystem task that runs inside and being overseen by the [`Overseer`].
///
/// In essence it's just a newtype wrapping a `BoxFuture`.
///
/// [`Overseer`]: struct.Overseer.html
pub struct SpawnedSubsystem(pub BoxFuture<'static, ()>);
// A capacity of bounded channels inside the overseer.
const CHANNEL_CAPACITY: usize = 1024;
......@@ -278,7 +242,7 @@ impl Debug for ToOverseer {
/// A running instance of some [`Subsystem`].
///
/// [`Subsystem`]: trait.Subsystem.html
struct SubsystemInstance<M: Debug> {
struct SubsystemInstance<M> {
tx: mpsc::Sender<FromOverseer<M>>,
}
......@@ -289,17 +253,17 @@ struct SubsystemInstance<M: Debug> {
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
/// [`SubsystemJob`]: trait.SubsystemJob.html
pub struct SubsystemContext<M: Debug>{
#[derive(Debug)]
pub struct OverseerSubsystemContext<M>{
rx: mpsc::Receiver<FromOverseer<M>>,
tx: mpsc::Sender<ToOverseer>,
}
impl<M: Debug> SubsystemContext<M> {
/// Try to asyncronously receive a message.
///
/// This has to be used with caution, if you loop over this without
/// using `pending!()` macro you will end up with a busy loop!
pub async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
#[async_trait::async_trait]
impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
type Message = M;
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
match poll!(self.rx.next()) {
Poll::Ready(Some(msg)) => Ok(Some(msg)),
Poll::Ready(None) => Err(()),
......@@ -307,13 +271,11 @@ impl<M: Debug> SubsystemContext<M> {
}
}
/// Receive a message.
pub async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
self.rx.next().await.ok_or(SubsystemError)
}
/// Spawn a child task on the executor.
pub async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> {
let (tx, rx) = oneshot::channel();
self.tx.send(ToOverseer::SpawnJob {
s,
......@@ -323,33 +285,25 @@ impl<M: Debug> SubsystemContext<M> {
rx.await?
}
/// Send a direct message to some other `Subsystem`, routed based on message type.
pub async fn send_msg(&mut self, msg: AllMessages) -> SubsystemResult<()> {
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
self.tx.send(ToOverseer::SubsystemMessage(msg)).await?;
Ok(())
}
fn new(rx: mpsc::Receiver<FromOverseer<M>>, tx: mpsc::Sender<ToOverseer>) -> Self {
Self {
rx,
tx,
}
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send
{
let mut msgs = stream::iter(msgs.into_iter().map(ToOverseer::SubsystemMessage).map(Ok));
self.tx.send_all(&mut msgs).await?;
Ok(())
}
}
/// A trait that describes the [`Subsystem`]s that can run on the [`Overseer`].
///
/// It is generic over the message type circulating in the system.
/// The idea that we want some type contaning persistent state that
/// can spawn actually running subsystems when asked to.
///
/// [`Overseer`]: struct.Overseer.html
/// [`Subsystem`]: trait.Subsystem.html
pub trait Subsystem<M: Debug> {
/// Start this `Subsystem` and return `SpawnedSubsystem`.
fn start(&mut self, ctx: SubsystemContext<M>) -> SpawnedSubsystem;
}
/// A subsystem compatible with the overseer - one which can be run in the context of the
/// overseer.
pub type CompatibleSubsystem<M> = Box<dyn Subsystem<OverseerSubsystemContext<M>> + Send>;
/// A subsystem that we oversee.
///
......@@ -359,8 +313,7 @@ pub trait Subsystem<M: Debug> {
///
/// [`Subsystem`]: trait.Subsystem.html
#[allow(dead_code)]
struct OverseenSubsystem<M: Debug> {
subsystem: Box<dyn Subsystem<M> + Send>,
struct OverseenSubsystem<M> {
instance: Option<SubsystemInstance<M>>,
}