Unverified Commit 00e167fe authored by Wei Tang's avatar Wei Tang Committed by GitHub
Browse files

Move propose_with into a dedicated blocking threadpool (#614)

parent a99977f6
Pipeline #70245 failed with stages
in 17 minutes and 9 seconds
......@@ -3818,6 +3818,7 @@ dependencies = [
"substrate-transaction-pool 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"substrate-trie 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-executor 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
......@@ -6031,6 +6032,8 @@ version = "0.2.0-alpha.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
......
......@@ -14,6 +14,7 @@ tokio = "0.1.22"
derive_more = "0.14.1"
log = "0.4.8"
exit-future = "0.1.4"
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
availability_store = { package = "polkadot-availability-store", path = "../availability-store" }
parachain = { package = "polkadot-parachain", path = "../parachain" }
......
......@@ -34,7 +34,8 @@ use std::{
pin::Pin,
sync::Arc,
time::{self, Duration, Instant},
task::{Poll, Context}
task::{Poll, Context},
mem,
};
use babe_primitives::BabeApi;
......@@ -498,7 +499,7 @@ impl<C, N, P, SC, TxApi> ProposerFactory<C, N, P, SC, TxApi> where
impl<C, N, P, SC, TxApi> consensus::Environment<Block> for ProposerFactory<C, N, P, SC, TxApi> where
C: Collators + Send + 'static,
N: Network,
TxApi: PoolChainApi<Block=Block>,
TxApi: PoolChainApi<Block=Block> + 'static,
P: ProvideRuntimeApi + HeaderBackend<Block> + BlockBody<Block> + Send + Sync + 'static,
P::Api: ParachainHost<Block> +
BlockBuilderApi<Block> +
......@@ -557,8 +558,8 @@ pub struct Proposer<C: Send + Sync, TxApi: PoolChainApi> where
}
impl<C, TxApi> consensus::Proposer<Block> for Proposer<C, TxApi> where
TxApi: PoolChainApi<Block=Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
TxApi: PoolChainApi<Block=Block> + 'static,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
{
type Error = Error;
......@@ -616,18 +617,20 @@ impl<C, TxApi> consensus::Proposer<Block> for Proposer<C, TxApi> where
};
Either::Left(CreateProposal {
parent_hash: self.parent_hash.clone(),
parent_number: self.parent_number.clone(),
parent_id: self.parent_id.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
table: self.tracker.table.clone(),
believed_minimum_timestamp: believed_timestamp,
timing,
inherent_data: Some(inherent_data),
inherent_digests,
// leave some time for the proposal finalisation
deadline,
state: CreateProposalState::Pending(CreateProposalData {
parent_hash: self.parent_hash.clone(),
parent_number: self.parent_number.clone(),
parent_id: self.parent_id.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
table: self.tracker.table.clone(),
believed_minimum_timestamp: believed_timestamp,
timing,
inherent_data: Some(inherent_data),
inherent_digests,
// leave some time for the proposal finalisation
deadline,
})
})
}
}
......@@ -686,6 +689,21 @@ impl ProposalTiming {
/// Future which resolves upon the creation of a proposal.
pub struct CreateProposal<C: Send + Sync, TxApi: PoolChainApi> {
state: CreateProposalState<C, TxApi>,
}
/// Current status of the proposal future.
enum CreateProposalState<C: Send + Sync, TxApi: PoolChainApi> {
/// Pending inclusion, with given proposal data.
Pending(CreateProposalData<C, TxApi>),
/// Represents the state when we switch from pending to fired.
Switching,
/// Block proposing has fired.
Fired(tokio_executor::blocking::Blocking<Result<Block, Error>>),
}
/// Inner data of the create proposal.
struct CreateProposalData<C: Send + Sync, TxApi: PoolChainApi> {
parent_hash: Hash,
parent_number: BlockNumber,
parent_id: BlockId,
......@@ -699,12 +717,12 @@ pub struct CreateProposal<C: Send + Sync, TxApi: PoolChainApi> {
deadline: Instant,
}
impl<C, TxApi> CreateProposal<C, TxApi> where
impl<C, TxApi> CreateProposalData<C, TxApi> where
TxApi: PoolChainApi<Block=Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
{
fn propose_with(&mut self, candidates: Vec<AttestedCandidate>) -> Result<Block, Error> {
fn propose_with(mut self, candidates: Vec<AttestedCandidate>) -> Result<Block, Error> {
use block_builder::BlockBuilder;
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
......@@ -792,22 +810,51 @@ impl<C, TxApi> CreateProposal<C, TxApi> where
}
impl<C, TxApi> futures03::Future for CreateProposal<C, TxApi> where
TxApi: PoolChainApi<Block=Block>,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync,
TxApi: PoolChainApi<Block=Block> + 'static,
C: ProvideRuntimeApi + HeaderBackend<Block> + Send + Sync + 'static,
C::Api: ParachainHost<Block> + BlockBuilderApi<Block> + ApiExt<Block, Error = client_error::Error>,
{
type Output = Result<Block, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut state = CreateProposalState::Switching;
mem::swap(&mut state, &mut self.state);
// 1. try to propose if we have enough includable candidates and other
// delays have concluded.
let included = self.table.includable_count();
futures03::ready!(self.timing.poll(cx, included));
let data = match state {
CreateProposalState::Pending(mut data) => {
let included = data.table.includable_count();
match data.timing.poll(cx, included) {
Poll::Pending => {
self.state = CreateProposalState::Pending(data);
return Poll::Pending
},
Poll::Ready(()) => (),
}
data
},
CreateProposalState::Switching => return Poll::Pending,
CreateProposalState::Fired(mut future) => {
let ret = Pin::new(&mut future).poll(cx);
self.state = CreateProposalState::Fired(future);
return ret
},
};
// 2. propose
let proposed_candidates = self.table.proposed_set();
let future = tokio_executor::blocking::run(move || {
let proposed_candidates = data.table.proposed_set();
data.propose_with(proposed_candidates)
});
self.state = CreateProposalState::Fired(future);
Poll::Ready(self.propose_with(proposed_candidates))
match &mut self.state {
CreateProposalState::Fired(future) => Pin::new(future).poll(cx),
CreateProposalState::Switching | CreateProposalState::Pending(_) =>
Poll::Pending,
}
}
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment