Unverified Commit f5c84254 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Rewrite client handling (#1531)



* Rewrite client handling

We are supporting muliple polkadot-like chains and all have different
client types. This pr reworks the client handling by having all of them
in one enum combined. Besides that, there is added a special trait
`ExecuteWithClient` to use the internal client.

* Apply suggestions from code review

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* Up the versions

* Fix Cargo.lock

* Fix merge conflict

* ......................

* ....v2

* yep

* I'm dumb...

* Browser lol

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
parent e7600080
Pipeline #102849 passed with stages
in 20 minutes and 36 seconds
......@@ -5997,7 +5997,7 @@ dependencies = [
[[package]]
name = "rococo-runtime"
version = "0.8.14"
version = "0.8.22"
dependencies = [
"bitvec",
"frame-benchmarking",
......@@ -6073,7 +6073,7 @@ dependencies = [
[[package]]
name = "rococo-v1-runtime"
version = "0.8.19"
version = "0.8.22"
dependencies = [
"frame-executive",
"frame-support",
......
......@@ -46,8 +46,7 @@ async fn start_inner(chain_spec: String, log_level: String) -> Result<Client, Bo
info!("👤 Role: {}", config.display_role());
// Create the service. This is the most heavy initialization step.
let builder = service::NodeBuilder::new(config);
let (task_manager, rpc_handlers) = builder.build_light().map_err(|e| format!("{:?}", e))?;
let (task_manager, rpc_handlers) = service::build_light(config).map_err(|e| format!("{:?}", e))?;
Ok(browser_utils::start_client(task_manager, rpc_handlers))
}
......@@ -131,17 +131,17 @@ pub fn run() -> Result<()> {
runner.run_node_until_exit(|config| {
let role = config.role.clone();
let builder = service::NodeBuilder::new(config);
match role {
Role::Light => builder.build_light().map(|(task_manager, _)| task_manager),
_ => builder.build_full(
Role::Light => service::build_light(config).map(|(task_manager, _)| task_manager),
_ => service::build_full(
config,
None,
None,
authority_discovery_disabled,
6000,
grandpa_pause,
),
).map(|r| r.0),
}
})
},
......
......@@ -51,8 +51,6 @@ use std::time::Duration;
use std::pin::Pin;
use futures::{future, Future, Stream, FutureExt, StreamExt};
use sc_client_api::{StateBackend, BlockchainEvents};
use sp_blockchain::HeaderBackend;
use sp_core::Pair;
use polkadot_primitives::v0::{
BlockId, Hash, Block, DownwardMessage,
......@@ -60,27 +58,24 @@ use polkadot_primitives::v0::{
PoVBlock, ValidatorId, CollatorPair, LocalValidationData, GlobalValidationData,
Collation, CollationInfo, collator_signature_payload,
};
use polkadot_cli::{
ProvideRuntimeApi, ParachainHost, IdentifyVariant,
service::{self, Role}
};
use polkadot_cli::service::{self, Role};
pub use polkadot_cli::service::Configuration;
pub use polkadot_cli::Cli;
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::v0::CollatorId;
pub use sc_network::PeerId;
pub use service::RuntimeApiCollection;
pub use service::{RuntimeApiCollection, Client};
pub use sc_cli::SubstrateCli;
use sp_api::{ConstructRuntimeApi, ApiExt, HashFor};
#[cfg(not(feature = "service-rewr"))]
use polkadot_service::{FullNodeHandles, PolkadotClient};
use polkadot_service::{FullNodeHandles, AbstractClient};
#[cfg(feature = "service-rewr")]
use polkadot_service_new::{
self as polkadot_service,
Error as ServiceError, FullNodeHandles, PolkadotClient,
Error as ServiceError, FullNodeHandles, AbstractClient,
};
use sc_service::SpawnTaskHandle;
use sp_core::traits::SpawnNamed;
use sp_runtime::traits::BlakeTwo256;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
......@@ -121,16 +116,13 @@ pub trait BuildParachainContext {
type ParachainContext: self::ParachainContext;
/// Build the `ParachainContext`.
fn build<Client, SP>(
fn build<SP>(
self,
client: Arc<Client>,
client: polkadot_service::Client,
spawner: SP,
network: impl Network + Clone + 'static,
) -> Result<Self::ParachainContext, ()>
where
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + BlockchainEvents<Block> + Send + Sync + 'static,
Client::Api: RuntimeApiCollection,
<Client::Api as ApiExt<Block>>::StateBackend: StateBackend<HashFor<Block>>,
SP: SpawnNamed + Clone + Send + Sync + 'static;
}
......@@ -202,171 +194,179 @@ pub async fn collate<P>(
}
#[cfg(feature = "service-rewr")]
fn build_collator_service<SP, P, C, R>(
_spawner: SP,
_handles: FullNodeHandles,
_client: Arc<C>,
_para_id: ParaId,
_key: Arc<CollatorPair>,
_build_parachain_context: P,
) -> Result<future::Ready<()>, polkadot_service::Error>
fn build_collator_service<P>(
spawner: SpawnTaskHandle,
handles: FullNodeHandles,
client: polkadot_service::Client,
para_id: ParaId,
key: Arc<CollatorPair>,
build_parachain_context: P,
) -> Result<Pin<Box<dyn Future<Output = ()> + Send + 'static>>, polkadot_service::Error>
where
C: PolkadotClient<
service::Block,
service::TFullBackend<service::Block>,
R
> + 'static,
R: ConstructRuntimeApi<service::Block, C> + Sync + Send,
<R as ConstructRuntimeApi<service::Block, C>>::RuntimeApi:
sp_api::ApiExt<
service::Block,
StateBackend = <service::TFullBackend<service::Block> as service::Backend<service::Block>>::State,
>
+ RuntimeApiCollection<
StateBackend = <service::TFullBackend<service::Block> as service::Backend<service::Block>>::State,
>
+ Sync + Send,
P: BuildParachainContext,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
SP: SpawnNamed + Clone + Send + Sync + 'static,
{
Err("Collator is not functional with the new service yet".into())
}
#[cfg(not(feature = "service-rewr"))]
fn build_collator_service<P, C, R>(
spawner: SpawnTaskHandle,
handles: FullNodeHandles,
client: Arc<C>,
struct BuildCollationWork<P> {
handles: polkadot_service::FullNodeHandles,
para_id: ParaId,
key: Arc<CollatorPair>,
build_parachain_context: P,
) -> Result<impl Future<Output = ()> + Send + 'static, polkadot_service::Error>
spawner: SpawnTaskHandle,
client: polkadot_service::Client,
}
impl<P> polkadot_service::ExecuteWithClient for BuildCollationWork<P>
where
C: PolkadotClient<
service::Block,
service::TFullBackend<service::Block>,
R
> + 'static,
R: ConstructRuntimeApi<service::Block, C> + Sync + Send,
<R as ConstructRuntimeApi<service::Block, C>>::RuntimeApi:
sp_api::ApiExt<
service::Block,
StateBackend = <service::TFullBackend<service::Block> as service::Backend<service::Block>>::State,
>
+ RuntimeApiCollection<
StateBackend = <service::TFullBackend<service::Block> as service::Backend<service::Block>>::State,
>
+ Sync + Send,
P: BuildParachainContext,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
{
let polkadot_network = handles.polkadot_network
.ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;
// We don't require this here, but we need to make sure that the validation service is started.
// This service makes sure the collator is joining the correct gossip topics and receives the appropiate
// messages.
handles.validation_service_handle
.ok_or_else(|| "Collator cannot run when validation networking has not been started")?;
let parachain_context = match build_parachain_context.build(
client.clone(),
spawner.clone(),
polkadot_network.clone(),
) {
Ok(ctx) => ctx,
Err(()) => {
return Err("Could not build the parachain context!".into())
}
};
let work = async move {
let mut notification_stream = client.import_notification_stream();
while let Some(notification) = notification_stream.next().await {
macro_rules! try_fr {
($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return future::Either::Left(future::err(Error::Polkadot(
format!("{:?}", e)
))),
type Output = Result<Pin<Box<dyn Future<Output = ()> + Send + 'static>>, polkadot_service::Error>;
fn execute_with_client<Client, Api, Backend>(self, client: Arc<Client>) -> Self::Output
where<Api as sp_api::ApiExt<Block>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
Backend: sc_client_api::Backend<Block>,
Backend::State: sp_api::StateBackend<BlakeTwo256>,
Api: RuntimeApiCollection<StateBackend = Backend::State>,
Client: AbstractClient<Block, Backend, Api = Api> + 'static
{
let polkadot_network = self.handles
.polkadot_network
.ok_or_else(|| "Collator cannot run when Polkadot-specific networking has not been started")?;
// We don't require this here, but we need to make sure that the validation service is started.
// This service makes sure the collator is joining the correct gossip topics and receives the appropiate
// messages.
self.handles.validation_service_handle
.ok_or_else(|| "Collator cannot run when validation networking has not been started")?;
let parachain_context = match self.build_parachain_context.build(
self.client,
self.spawner.clone(),
polkadot_network.clone(),
) {
Ok(ctx) => ctx,
Err(()) => {
return Err("Could not build the parachain context!".into())
}
};
let key = self.key;
let para_id = self.para_id;
let spawner = self.spawner;
let res = async move {
let mut notification_stream = client.import_notification_stream();
while let Some(notification) = notification_stream.next().await {
macro_rules! try_fr {
($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return future::Either::Left(future::err(Error::Polkadot(
format!("{:?}", e)
))),
}
}
}
}
let relay_parent = notification.hash;
let id = BlockId::hash(relay_parent);
let network = polkadot_network.clone();
let client = client.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();
let work = future::lazy(move |_| {
let api = client.runtime_api();
let global_validation = try_fr!(api.global_validation_data(&id));
let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
Some(local_validation) => local_validation,
None => return future::Either::Left(future::ok(())),
};
let downward_messages = try_fr!(api.downward_messages(&id, para_id));
let validators = try_fr!(api.validators(&id));
let targets = compute_targets(
para_id,
validators.as_slice(),
try_fr!(api.duty_roster(&id)),
);
let relay_parent = notification.hash;
let id = BlockId::hash(relay_parent);
let network = polkadot_network.clone();
let client = client.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();
let work = future::lazy(move |_| {
let api = client.runtime_api();
let global_validation = try_fr!(api.global_validation_data(&id));
let local_validation = match try_fr!(api.local_validation_data(&id, para_id)) {
Some(local_validation) => local_validation,
None => return future::Either::Left(future::ok(())),
};
let downward_messages = try_fr!(api.downward_messages(&id, para_id));
let validators = try_fr!(api.validators(&id));
let targets = compute_targets(
para_id,
validators.as_slice(),
try_fr!(api.duty_roster(&id)),
);
let collation_work = collate(
relay_parent,
para_id,
global_validation,
local_validation,
downward_messages,
parachain_context,
key,
).map(move |collation| {
match collation {
Some(collation) => network.distribute_collation(targets, collation),
None => log::trace!("Skipping collation as `collate` returned `None`"),
}
let collation_work = collate(
relay_parent,
para_id,
global_validation,
local_validation,
downward_messages,
parachain_context,
key,
).map(move |collation| {
match collation {
Some(collation) => network.distribute_collation(targets, collation),
None => log::trace!("Skipping collation as `collate` returned `None`"),
}
Ok(())
});
Ok(())
future::Either::Right(collation_work)
});
future::Either::Right(collation_work)
});
let deadlined = future::select(
work.then(|f| f).boxed(),
futures_timer::Delay::new(COLLATION_TIMEOUT)
);
let deadlined = future::select(
work.then(|f| f).boxed(),
futures_timer::Delay::new(COLLATION_TIMEOUT)
);
let silenced = deadlined
.map(|either| {
match either {
future::Either::Right(_) => log::warn!("Collation failure: timeout"),
future::Either::Left((Err(e), _)) => {
log::error!("Collation failed: {:?}", e)
let silenced = deadlined
.map(|either| {
match either {
future::Either::Right(_) => log::warn!("Collation failure: timeout"),
future::Either::Left((Err(e), _)) => {
log::error!("Collation failed: {:?}", e)
}
future::Either::Left((Ok(()), _)) => {},
}
future::Either::Left((Ok(()), _)) => {},
}
});
});
let future = silenced.map(drop);
let future = silenced.map(drop);
spawner.spawn("collation-work", future);
}
}.boxed();
spawner.spawn("collation-work", future);
}
};
Ok(work)
Ok(res.boxed())
}
}
#[cfg(not(feature = "service-rewr"))]
fn build_collator_service<P>(
spawner: SpawnTaskHandle,
handles: FullNodeHandles,
client: polkadot_service::Client,
para_id: ParaId,
key: Arc<CollatorPair>,
build_parachain_context: P,
) -> Result<Pin<Box<dyn Future<Output = ()> + Send + 'static>>, polkadot_service::Error>
where
P: BuildParachainContext,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send,
{
client.execute_with(BuildCollationWork {
handles,
para_id,
key,
build_parachain_context,
spawner,
client: client.clone(),
})
}
/// Async function that will run the collator node with the given `RelayChainContext` and `ParachainContext`
......@@ -391,64 +391,25 @@ where
.into());
}
if config.chain_spec.is_kusama() {
let (task_manager, client, handlers) = service::kusama_new_full(
config,
Some((key.public(), para_id)),
None,
false,
6000,
None,
)?;
let spawn_handle = task_manager.spawn_handle();
let future = build_collator_service(
spawn_handle,
handlers,
client,
para_id,
key,
build_parachain_context
)?;
Ok((future.boxed(), task_manager))
} else if config.chain_spec.is_westend() {
let (task_manager, client, handlers) = service::westend_new_full(
config,
Some((key.public(), para_id)),
None,
false,
6000,
None,
)?;
let spawn_handle = task_manager.spawn_handle();
let future = build_collator_service(
spawn_handle,
handlers,
client,
para_id,
key,
build_parachain_context
)?;
Ok((future.boxed(), task_manager))
} else {
let (task_manager, client, handles) = service::polkadot_new_full(
config,
Some((key.public(), para_id)),
None,
false,
6000,
None,
)?;
let spawn_handle = task_manager.spawn_handle();
let future = build_collator_service(
spawn_handle,
handles,
client,
para_id,
key,
build_parachain_context
)?;
Ok((future.boxed(), task_manager))
}
let (task_manager, client, handlers) = polkadot_service::build_full(
config,
Some((key.public(), para_id)),
None,
false,
6000,
None,
)?;
let future = build_collator_service(
task_manager.spawn_handle(),
handlers,
client,
para_id,
key,
build_parachain_context
)?;
Ok((future, task_manager))
}
#[cfg(not(feature = "service-rewr"))]
......@@ -492,9 +453,9 @@ mod tests {
impl BuildParachainContext for BuildDummyParachainContext {
type ParachainContext = DummyParachainContext;
fn build<C, SP>(
fn build<SP>(
self,
_: Arc<C>,
_: polkadot_service::Client,
_: SP,
_: impl Network + Clone + 'static,
) -> Result<Self::ParachainContext, ()> {
......
......@@ -26,9 +26,7 @@ use polkadot_primitives::v0::{
Block, Hash, CollatorId, Id as ParaId,
};
use polkadot_runtime_common::{parachains, registrar, BlockHashCount};
use polkadot_service::{
new_full, FullNodeHandles, PolkadotClient,
};
use polkadot_service::{new_full, FullNodeHandles, AbstractClient};
use polkadot_test_runtime::{RestrictFunctionality, Runtime, SignedExtra, SignedPayload, VERSION};
use sc_chain_spec::ChainSpec;
use sc_client_api::{execution_extensions::ExecutionStrategies, BlockchainEvents};
......@@ -69,7 +67,7 @@ pub fn polkadot_test_new_full(
) -> Result<
(
TaskManager,
Arc<impl PolkadotClient<Block, TFullBackend<Block>, polkadot_test_runtime::RuntimeApi>>,
Arc<impl AbstractClient<Block, TFullBackend<Block>>>,
FullNodeHandles,
Arc<NetworkService<Block, Hash>>,
Arc<RpcHandlers>,
......@@ -196,7 +194,7 @@ pub fn run_test_node(
boot_nodes: Vec<MultiaddrWithPeerId>,
) -> PolkadotTestNode<
TaskManager,
impl PolkadotClient<Block, TFullBackend<Block>, polkadot_test_runtime::RuntimeApi>,
impl AbstractClient<Block, TFullBackend<Block>>,
> {
let config = node_config(storage_update_func, task_executor, key, boot_nodes);
let multiaddr = config.network.listen_addresses[0].clone();
......
......@@ -89,8 +89,11 @@ impl ParachainContext for AdderContext {
let encoded_head = HeadData(next_head.encode());
let encoded_body = BlockData(next_body.encode());
println!("Created collation for #{}, post-state={}",
next_head.number, next_body.state.overflowing_add(next_body.add).0);
println!(
"Created collation for #{}, post-state={}",
next_head.number,
next_body.state.overflowing_add(next_body.add).0,
);
db.insert(next_head.clone(), next_body);
ready(Some((encoded_body, encoded_head)))
......@@ -100,9 +103,9 @@ impl ParachainContext for AdderContext {
impl BuildParachainContext for AdderContext {
type ParachainContext = Self;
fn build<Client, SP>(
fn build<SP>(
self,
_: Arc<Client>,
_: collator::Client,
_: SP,
network: impl Network + Clone + 'static,
) -> Result<Self::ParachainContext, ()> {
......
[package]
name = "rococo-v1-runtime"
version = "0.8.19"
version = "0.8.22"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
build = "build.rs"
......
[package]
name = "rococo-runtime"
version = "0.8.14"
version = "0.8.22"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
build = "build.rs"
......
......@@ -14,40 +14,137 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Polkadot Client meta trait
//! Polkadot Client abstractions.
use sp_api::{ProvideRuntimeApi, ConstructRuntimeApi, CallApiAt};
use std::sync::Arc;
use sp_api::{ProvideRuntimeApi, CallApiAt};
use sp_blockchain::HeaderBackend;