Unverified Commit cd66280d authored by Tomasz Drwięga's avatar Tomasz Drwięga Committed by GitHub
Browse files

Use Substrate Block Proposer (#1156)



* Use Substrate block builder.

* Clean up metrics.

* Lock.

* Lock.

* Switch to newest basic authorship interface

* Update Substrate reference and polkadot spec_version

* Let's improve

Co-authored-by: Bastian Köcher's avatarBastian Köcher <git@kchr.de>
parent 8072fb25
Pipeline #94695 passed with stages
in 20 minutes and 47 seconds
This diff is collapsed.
......@@ -88,7 +88,7 @@ pub const VERSION: RuntimeVersion = RuntimeVersion {
spec_name: create_runtime_str!("polkadot"),
impl_name: create_runtime_str!("parity-polkadot"),
authoring_version: 0,
spec_version: 1,
spec_version: 2,
impl_version: 0,
apis: RUNTIME_API_VERSIONS,
transaction_version: 0,
......
......@@ -293,8 +293,6 @@ macro_rules! new_full {
let (builder, mut import_setup, inherent_data_providers, mut rpc_setup) =
new_full_start!($config, $runtime, $dispatch);
let backend = builder.backend().clone();
let service = builder
.with_finality_proof_provider(|client, backend| {
let provider = client as Arc<dyn grandpa::StorageAndProofProvider<_, _>>;
......@@ -403,7 +401,6 @@ macro_rules! new_full {
service.transaction_pool(),
validation_service_handle,
slot_duration,
backend,
service.prometheus_registry().as_ref(),
);
......
......@@ -34,8 +34,8 @@ bitvec = { version = "0.17.4", default-features = false, features = ["alloc"] }
runtime_babe = { package = "pallet-babe", git = "https://github.com/paritytech/substrate", branch = "master" }
babe-primitives = { package = "sp-consensus-babe", git = "https://github.com/paritytech/substrate", branch = "master" }
keystore = { package = "sc-keystore", git = "https://github.com/paritytech/substrate", branch = "master" }
sc-proposer-metrics = { git = "https://github.com/paritytech/substrate", branch = "master" }
prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/paritytech/substrate", branch = "master" }
sc-basic-authorship = { git = "https://github.com/paritytech/substrate", branch = "master" }
[dev-dependencies]
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
......@@ -26,24 +26,21 @@ use std::{
};
use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use codec::Encode;
use block_builder::{BlockBuilderApi, BlockBuilderProvider};
use consensus::{Proposal, RecordProof};
use polkadot_primitives::{Hash, Block, BlockId, Header};
use polkadot_primitives::{Block, Header};
use polkadot_primitives::parachain::{
ParachainHost, AttestedCandidate, NEW_HEADS_IDENTIFIER,
ParachainHost, NEW_HEADS_IDENTIFIER,
};
use runtime_primitives::traits::{DigestFor, HashFor};
use futures_timer::Delay;
use txpool_api::{TransactionPool, InPoolTransaction};
use txpool_api::TransactionPool;
use futures::prelude::*;
use inherents::InherentData;
use sp_timestamp::TimestampInherentData;
use log::{info, debug, warn, trace};
use sp_api::{ApiExt, ProvideRuntimeApi};
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_proposer_metrics::MetricsLink as PrometheusMetrics;
use crate::{
Error,
......@@ -51,17 +48,11 @@ use crate::{
validation_service::ServiceHandle,
};
// block size limit.
pub(crate) const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
// Polkadot proposer factory.
pub struct ProposerFactory<Client, TxPool, Backend> {
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
service_handle: ServiceHandle,
babe_slot_duration: u64,
backend: Arc<Backend>,
metrics: PrometheusMetrics,
factory: sc_basic_authorship::ProposerFactory<TxPool, Backend, Client>,
}
impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
......@@ -71,16 +62,17 @@ impl<Client, TxPool, Backend> ProposerFactory<Client, TxPool, Backend> {
transaction_pool: Arc<TxPool>,
service_handle: ServiceHandle,
babe_slot_duration: u64,
backend: Arc<Backend>,
prometheus: Option<&PrometheusRegistry>,
) -> Self {
ProposerFactory {
let factory = sc_basic_authorship::ProposerFactory::new(
client,
transaction_pool,
service_handle: service_handle,
prometheus,
);
ProposerFactory {
service_handle,
babe_slot_duration,
backend,
metrics: PrometheusMetrics::new(prometheus),
factory,
}
}
}
......@@ -89,7 +81,7 @@ impl<Client, TxPool, Backend> consensus::Environment<Block>
for ProposerFactory<Client, TxPool, Backend>
where
TxPool: TransactionPool<Block=Block> + 'static,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client: BlockBuilderProvider<Backend, Block, Client> + ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block>
+ ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<
......@@ -110,45 +102,35 @@ where
parent_header: &Header,
) -> Self::CreateProposer {
let parent_hash = parent_header.hash();
let parent_id = BlockId::hash(parent_hash);
let client = self.client.clone();
let transaction_pool = self.transaction_pool.clone();
let backend = self.backend.clone();
let slot_duration = self.babe_slot_duration.clone();
let metrics = self.metrics.clone();
let proposer = self.factory.init(parent_header).into_inner();
let maybe_proposer = self.service_handle
.clone()
.get_validation_instance(parent_hash)
.and_then(move |tracker| future::ready(Ok(Proposer {
client,
tracker,
parent_id,
transaction_pool,
slot_duration,
backend,
metrics,
})));
.and_then(move |tracker| future::ready(proposer
.map_err(Into::into)
.map(|proposer| Proposer {
tracker,
slot_duration,
proposer,
})
));
Box::pin(maybe_proposer)
}
}
/// The Polkadot proposer logic.
pub struct Proposer<Client, TxPool, Backend> {
client: Arc<Client>,
parent_id: BlockId,
pub struct Proposer<Client, TxPool: TransactionPool<Block=Block>, Backend> {
tracker: crate::validation_service::ValidationInstanceHandle,
transaction_pool: Arc<TxPool>,
slot_duration: u64,
backend: Arc<Backend>,
metrics: PrometheusMetrics,
proposer: sc_basic_authorship::Proposer<Backend, Block, Client, TxPool>,
}
impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, TxPool, Backend> where
TxPool: TransactionPool<Block=Block> + 'static,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client: BlockBuilderProvider<Backend, Block, Client> + ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync + 'static,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
......@@ -163,7 +145,8 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
>
>;
fn propose(&mut self,
fn propose(
self,
inherent_data: InherentData,
inherent_digests: DigestFor<Block>,
max_duration: Duration,
......@@ -180,15 +163,7 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
);
let parent_id = self.parent_id.clone();
let client = self.client.clone();
let transaction_pool = self.transaction_pool.clone();
let table = self.tracker.table().clone();
let backend = self.backend.clone();
let metrics = self.metrics.clone();
async move {
let block_timer = metrics.report(|metrics| metrics.block_constructed.start_timer());
let enough_candidates = dynamic_inclusion.acceptable_in(
now,
initial_included,
......@@ -200,22 +175,6 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
};
let deadline_diff = max_duration - max_duration / 3;
let deadline = match Instant::now().checked_add(deadline_diff) {
None => return Err(Error::DeadlineComputeFailure(deadline_diff)),
Some(d) => d,
};
let data = CreateProposalData {
parent_id,
client,
transaction_pool,
inherent_data: Some(inherent_data),
inherent_digests,
// leave some time for the proposal finalisation
deadline,
record_proof,
backend,
};
// set up delay until next allowed timestamp.
let current_timestamp = current_timestamp();
......@@ -226,18 +185,18 @@ impl<Client, TxPool, Backend> consensus::Proposer<Block> for Proposer<Client, Tx
Delay::new(enough_candidates).await;
let result = tokio::task::spawn_blocking(
move || {
let proposed_candidates = table.proposed_set();
data.propose_with(proposed_candidates)
}
).await?;
let proposed_candidates = self.tracker.table().proposed_set();
drop(block_timer);
let transactions = result.as_ref().map(|proposal| proposal.block.extrinsics.len()).unwrap_or_default();
metrics.report(|metrics| metrics.number_of_transactions.set(transactions as u64));
let mut inherent_data = inherent_data;
inherent_data.put_data(NEW_HEADS_IDENTIFIER, &proposed_candidates)
.map_err(Error::InherentError)?;
result
self.proposer.propose(
inherent_data,
inherent_digests.clone(),
deadline_diff,
record_proof
).await.map_err(Into::into)
}.boxed()
}
}
......@@ -247,114 +206,3 @@ fn current_timestamp() -> u64 {
.expect("now always later than unix epoch; qed")
.as_millis() as u64
}
/// Inner data of the create proposal.
struct CreateProposalData<Client, TxPool, Backend> {
parent_id: BlockId,
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
inherent_data: Option<InherentData>,
inherent_digests: DigestFor<Block>,
deadline: Instant,
record_proof: RecordProof,
backend: Arc<Backend>,
}
impl<Client, TxPool, Backend> CreateProposalData<Client, TxPool, Backend> where
TxPool: TransactionPool<Block=Block>,
Client: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
Client::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend: sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>> + 'static,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
{
fn propose_with(
mut self,
candidates: Vec<AttestedCandidate>,
) -> Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error> {
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
const MAX_TRANSACTIONS: usize = 1200;
let mut inherent_data = self.inherent_data
.take()
.expect("CreateProposal is not polled after finishing; qed");
inherent_data.put_data(NEW_HEADS_IDENTIFIER, &candidates)
.map_err(Error::InherentError)?;
let runtime_api = self.client.runtime_api();
let mut block_builder = block_builder::BlockBuilder::new(
&*self.client,
self.client.expect_block_hash_from_id(&self.parent_id)?,
self.client.expect_block_number_from_id(&self.parent_id)?,
self.record_proof,
self.inherent_digests.clone(),
&*self.backend,
)?;
{
let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
for inherent in inherents {
match block_builder.push(inherent) {
Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
if e.exhausted_resources() => {
warn!("⚠️ Dropping non-mandatory inherent from overweight block.");
}
Err(e) => {
warn!("❗️ Inherent extrinsic returned unexpected error: {}. Dropping.", e);
}
Ok(_) => {}
}
}
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
let ready_iter = self.transaction_pool.ready();
for ready in ready_iter.take(MAX_TRANSACTIONS) {
let encoded_size = ready.data().encode().len();
if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE {
break;
}
if Instant::now() > self.deadline {
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
break;
}
match block_builder.push(ready.data().clone()) {
Ok(()) => {
debug!("[{:?}] Pushed to the block.", ready.hash());
pending_size += encoded_size;
}
Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
if e.exhausted_resources() =>
{
debug!("Block is full, proceed with proposing.");
break;
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(ready.hash().clone());
}
}
}
self.transaction_pool.remove_invalid(&unqueue_invalid);
}
let (new_block, storage_changes, proof) = block_builder.build()?.into_inner();
info!("🎁 Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
new_block.header.number,
Hash::from(new_block.header.hash()),
new_block.header.parent_hash,
new_block.extrinsics.iter()
.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
.collect::<Vec<_>>()
.join(", ")
);
Ok(Proposal { block: new_block, storage_changes, proof })
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment