{
/// The client instance.
client: Arc,
/// The backing network handle.
network: N,
/// Parachain collators.
collators: C,
/// handle to remote task executor
handle: TaskExecutor,
/// Store for extrinsic data.
availability_store: AvailabilityStore,
/// Live agreements. Maps relay chain parent hashes to attestation
/// instances.
live_instances: Mutex>>,
}
impl ParachainValidation where
C: Collators + Send + Unpin + 'static,
N: Network,
P: ProvideRuntimeApi + HeaderBackend + BlockBody + Send + Sync + 'static,
P::Api: ParachainHost + BlockBuilderApi + ApiExt,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
N::BuildTableRouter: Unpin + Send + 'static,
{
/// Get an attestation table for given parent hash.
///
/// This starts a parachain agreement process on top of the parent hash if
/// one has not already started.
///
/// Additionally, this will trigger broadcast of data to the new block's duty
/// roster.
fn get_or_instantiate(
&self,
parent_hash: Hash,
keystore: &KeyStorePtr,
max_block_data_size: Option,
)
-> Result, Error>
{
let mut live_instances = self.live_instances.lock();
if let Some(tracker) = live_instances.get(&parent_hash) {
return Ok(tracker.clone());
}
let id = BlockId::hash(parent_hash);
let validators = self.client.runtime_api().validators(&id)?;
let sign_with = signing_key(&validators[..], keystore);
let duty_roster = self.client.runtime_api().duty_roster(&id)?;
let (group_info, local_duty) = make_group_info(
duty_roster,
&validators,
sign_with.as_ref().map(|k| k.public()),
)?;
info!(
"Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
parent_hash,
local_duty,
);
let active_parachains = self.client.runtime_api().active_parachains(&id)?;
debug!(target: "validation", "Active parachains: {:?}", active_parachains);
// If we are a validator, we need to store our index in this round in availability store.
// This will tell which erasure chunk we should store.
if let Some(ref local_duty) = local_duty {
if let Err(e) = self.availability_store.add_validator_index_and_n_validators(
&parent_hash,
local_duty.index,
validators.len() as u32,
) {
warn!(
target: "validation",
"Failed to add validator index and n_validators to the availability-store: {:?}", e
)
}
}
let table = Arc::new(SharedTable::new(
validators.clone(),
group_info,
sign_with,
parent_hash,
self.availability_store.clone(),
max_block_data_size,
));
let (_drop_signal, exit) = exit_future::signal();
let router = self.network.communication_for(
table.clone(),
&validators,
exit.clone(),
);
if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) {
self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index, exit);
}
let tracker = Arc::new(AttestationTracker {
table,
started: Instant::now(),
_drop_signal,
});
live_instances.insert(parent_hash, tracker.clone());
Ok(tracker)
}
/// Retain validation sessions matching predicate.
fn retain bool>(&self, mut pred: F) {
self.live_instances.lock().retain(|k, _| pred(k))
}
// launch parachain work asynchronously.
fn launch_work(
&self,
relay_parent: Hash,
validation_para: ParaId,
build_router: N::BuildTableRouter,
max_block_data_size: Option,
authorities_num: usize,
local_id: ValidatorIndex,
exit: exit_future::Exit,
) {
let (collators, client) = (self.collators.clone(), self.client.clone());
let availability_store = self.availability_store.clone();
let with_router = move |router: N::TableRouter| {
// fetch a local collation from connected collators.
let collation_work = CollationFetch::new(
validation_para,
relay_parent,
collators,
client.clone(),
max_block_data_size,
);
collation_work.map(move |result| match result {
Ok((collation, outgoing_targeted, fees_charged)) => {
match produce_receipt_and_chunks(
authorities_num,
&collation.pov,
&outgoing_targeted,
fees_charged,
&collation.info,
) {
Ok((receipt, chunks)) => {
// Apparently the `async move` block is the only way to convince
// the compiler that we are not moving values out of borrowed context.
let av_clone = availability_store.clone();
let chunks_clone = chunks.clone();
let receipt_clone = receipt.clone();
let res = async move {
if let Err(e) = av_clone.clone().add_erasure_chunks(
relay_parent.clone(),
receipt_clone,
chunks_clone,
).await {
warn!(target: "validation", "Failed to add erasure chunks: {}", e);
}
}
.unit_error()
.boxed()
.then(move |_| {
router.local_collation(collation, receipt, outgoing_targeted, (local_id, &chunks));
ready(())
});
Some(res)
}
Err(e) => {
warn!(target: "validation", "Failed to produce a receipt: {:?}", e);
None
}
}
}
Err(e) => {
warn!(target: "validation", "Failed to collate candidate: {:?}", e);
None
}
})
};
let router = build_router
.map_ok(with_router)
.map_err(|e| {
warn!(target: "validation" , "Failed to build table router: {:?}", e);
});
let cancellable_work = select(exit, router).map(drop);
// spawn onto thread pool.
if self.handle.spawn(cancellable_work).is_err() {
error!("Failed to spawn cancellable work task");
}
}
}
/// Parachain validation for a single block.
struct AttestationTracker {
_drop_signal: exit_future::Signal,
table: Arc,
started: Instant,
}
/// Polkadot proposer factory.
pub struct ProposerFactory {
parachain_validation: Arc>,
transaction_pool: Arc,
keystore: KeyStorePtr,
_service_handle: ServiceHandle,
babe_slot_duration: u64,
_select_chain: SC,
max_block_data_size: Option,
}
impl ProposerFactory where
C: Collators + Send + Sync + Unpin + 'static,
C::Collation: Send + Unpin + 'static,
P: BlockchainEvents + BlockBody,
P: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static,
P::Api: ParachainHost +
BlockBuilderApi +
BabeApi +
ApiExt,
N: Network + Send + Sync + 'static,
N::TableRouter: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
TxPool: TransactionPool,
SC: SelectChain + 'static,
{
/// Create a new proposer factory.
pub fn new(
client: Arc,
_select_chain: SC,
network: N,
collators: C,
transaction_pool: Arc,
thread_pool: TaskExecutor,
keystore: KeyStorePtr,
availability_store: AvailabilityStore,
babe_slot_duration: u64,
max_block_data_size: Option,
) -> Self {
let parachain_validation = Arc::new(ParachainValidation {
client: client.clone(),
network,
collators,
handle: thread_pool.clone(),
availability_store: availability_store.clone(),
live_instances: Mutex::new(HashMap::new()),
});
let service_handle = crate::attestation_service::start(
client,
_select_chain.clone(),
parachain_validation.clone(),
thread_pool,
keystore.clone(),
max_block_data_size,
);
ProposerFactory {
parachain_validation,
transaction_pool,
keystore,
_service_handle: service_handle,
babe_slot_duration,
_select_chain,
max_block_data_size,
}
}
}
impl consensus::Environment for ProposerFactory where
C: Collators + Send + Unpin + 'static,
N: Network,
TxPool: TransactionPool + 'static,
P: ProvideRuntimeApi + HeaderBackend + BlockBody + Send + Sync + 'static,
P::Api: ParachainHost +
BlockBuilderApi +
BabeApi +
ApiExt,
C::Collation: Send + Unpin + 'static,
N::TableRouter: Send + 'static,
N::BuildTableRouter: Send + Unpin + 'static,
SC: SelectChain,
{
type Proposer = Proposer;
type Error = Error;
fn init(
&mut self,
parent_header: &Header,
) -> Result {
let parent_hash = parent_header.hash();
let parent_id = BlockId::hash(parent_hash);
let tracker = self.parachain_validation.get_or_instantiate(
parent_hash,
&self.keystore,
self.max_block_data_size,
)?;
Ok(Proposer {
client: self.parachain_validation.client.clone(),
tracker,
parent_hash,
parent_id,
parent_number: parent_header.number,
transaction_pool: self.transaction_pool.clone(),
slot_duration: self.babe_slot_duration,
})
}
}
/// The local duty of a validator.
#[derive(Debug)]
pub struct LocalDuty {
validation: Chain,
index: ValidatorIndex,
}
/// The Polkadot proposer logic.
pub struct Proposer where
C: ProvideRuntimeApi + HeaderBackend,
{
client: Arc,
parent_hash: Hash,
parent_id: BlockId,
parent_number: BlockNumber,
tracker: Arc,
transaction_pool: Arc,
slot_duration: u64,
}
impl consensus::Proposer for Proposer where
TxPool: TransactionPool + 'static,
C: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static,
C::Api: ParachainHost + BlockBuilderApi + ApiExt,
{
type Error = Error;
type Create = Either, future::Ready>>;
fn propose(&mut self,
inherent_data: InherentData,
inherent_digests: DigestFor,
max_duration: Duration,
) -> Self::Create {
const ATTEMPT_PROPOSE_EVERY: Duration = Duration::from_millis(100);
const SLOT_DURATION_DENOMINATOR: u64 = 3; // wait up to 1/3 of the slot for candidates.
let initial_included = self.tracker.table.includable_count();
let now = Instant::now();
let dynamic_inclusion = DynamicInclusion::new(
self.tracker.table.num_parachains(),
self.tracker.started,
Duration::from_millis(self.slot_duration / SLOT_DURATION_DENOMINATOR),
);
let enough_candidates = dynamic_inclusion.acceptable_in(
now,
initial_included,
).unwrap_or_else(|| Duration::from_millis(1));
let believed_timestamp = match inherent_data.timestamp_inherent_data() {
Ok(timestamp) => timestamp,
Err(e) => return Either::Right(future::err(Error::InherentError(e))),
};
// set up delay until next allowed timestamp.
let current_timestamp = current_timestamp();
let delay_future = if current_timestamp >= believed_timestamp {
None
} else {
Some(Delay::new(Duration::from_millis (current_timestamp - believed_timestamp)))
};
let timing = ProposalTiming {
minimum: delay_future,
attempt_propose: Box::new(interval(ATTEMPT_PROPOSE_EVERY)),
enough_candidates: Delay::new(enough_candidates),
dynamic_inclusion,
last_included: initial_included,
};
let deadline_diff = max_duration - max_duration / 3;
let deadline = match Instant::now().checked_add(deadline_diff) {
None => return Either::Right(
future::err(Error::DeadlineComputeFailure(deadline_diff)),
),
Some(d) => d,
};
Either::Left(CreateProposal {
state: CreateProposalState::Pending(CreateProposalData {
parent_hash: self.parent_hash.clone(),
parent_number: self.parent_number.clone(),
parent_id: self.parent_id.clone(),
client: self.client.clone(),
transaction_pool: self.transaction_pool.clone(),
table: self.tracker.table.clone(),
believed_minimum_timestamp: believed_timestamp,
timing,
inherent_data: Some(inherent_data),
inherent_digests,
// leave some time for the proposal finalisation
deadline,
})
})
}
}
fn current_timestamp() -> u64 {
time::SystemTime::now().duration_since(time::UNIX_EPOCH)
.expect("now always later than unix epoch; qed")
.as_millis() as u64
}
struct ProposalTiming {
minimum: Option,
attempt_propose: Box + Send + Unpin>,
dynamic_inclusion: DynamicInclusion,
enough_candidates: Delay,
last_included: usize,
}
impl ProposalTiming {
// whether it's time to attempt a proposal.
// shouldn't be called outside of the context of a task.
fn poll(&mut self, cx: &mut Context, included: usize) -> Poll<()> {
// first drain from the interval so when the minimum delay is up
// we don't have any notifications built up.
//
// this interval is just meant to produce periodic task wakeups
// that lead to the `dynamic_inclusion` getting updated as necessary.
while let Poll::Ready(x) = self.attempt_propose.poll_next_unpin(cx) {
x.expect("timer still alive; intervals never end; qed");
}
// wait until the minimum time has passed.
if let Some(mut minimum) = self.minimum.take() {
if let Poll::Pending = minimum.poll_unpin(cx) {
self.minimum = Some(minimum);
return Poll::Pending;
}
}
if included == self.last_included {
return self.enough_candidates.poll_unpin(cx);
}
// the amount of includable candidates has changed. schedule a wakeup
// if it's not sufficient anymore.
match self.dynamic_inclusion.acceptable_in(Instant::now(), included) {
Some(instant) => {
self.last_included = included;
self.enough_candidates.reset(Instant::now() + instant);
self.enough_candidates.poll_unpin(cx)
}
None => Poll::Ready(()),
}
}
}
/// Future which resolves upon the creation of a proposal.
pub struct CreateProposal {
state: CreateProposalState,
}
/// Current status of the proposal future.
enum CreateProposalState {
/// Pending inclusion, with given proposal data.
Pending(CreateProposalData),
/// Represents the state when we switch from pending to fired.
Switching,
/// Block proposing has fired.
Fired(tokio::task::JoinHandle>),
}
/// Inner data of the create proposal.
struct CreateProposalData {
parent_hash: Hash,
parent_number: BlockNumber,
parent_id: BlockId,
client: Arc,
transaction_pool: Arc,
table: Arc,
timing: ProposalTiming,
believed_minimum_timestamp: u64,
inherent_data: Option,
inherent_digests: DigestFor,
deadline: Instant,
}
impl CreateProposalData where
TxPool: TransactionPool,
C: ProvideRuntimeApi + HeaderBackend + Send + Sync,
C::Api: ParachainHost + BlockBuilderApi + ApiExt,
{
fn propose_with(mut self, candidates: Vec) -> Result {
use block_builder::BlockBuilder;
use runtime_primitives::traits::{Hash as HashT, BlakeTwo256};
const MAX_TRANSACTIONS: usize = 40;
let mut inherent_data = self.inherent_data
.take()
.expect("CreateProposal is not polled after finishing; qed");
inherent_data.put_data(polkadot_runtime::NEW_HEADS_IDENTIFIER, &candidates)
.map_err(Error::InherentError)?;
let runtime_api = self.client.runtime_api();
let mut block_builder = BlockBuilder::new(
&*self.client,
self.client.expect_block_hash_from_id(&self.parent_id)?,
self.client.expect_block_number_from_id(&self.parent_id)?,
false,
self.inherent_digests.clone(),
)?;
{
let inherents = runtime_api.inherent_extrinsics(&self.parent_id, inherent_data)?;
for inherent in inherents {
block_builder.push(inherent)?;
}
let mut unqueue_invalid = Vec::new();
let mut pending_size = 0;
let ready_iter = self.transaction_pool.ready();
for ready in ready_iter.take(MAX_TRANSACTIONS) {
let encoded_size = ready.data().encode().len();
if pending_size + encoded_size >= MAX_TRANSACTIONS_SIZE {
break;
}
if Instant::now() > self.deadline {
debug!("Consensus deadline reached when pushing block transactions, proceeding with proposing.");
break;
}
match block_builder.push(ready.data().clone()) {
Ok(()) => {
debug!("[{:?}] Pushed to the block.", ready.hash());
pending_size += encoded_size;
}
Err(sp_blockchain::Error::ApplyExtrinsicFailed(sp_blockchain::ApplyExtrinsicFailed::Validity(e)))
if e.exhausted_resources() =>
{
debug!("Block is full, proceed with proposing.");
break;
}
Err(e) => {
trace!(target: "transaction-pool", "Invalid transaction: {}", e);
unqueue_invalid.push(ready.hash().clone());
}
}
}
self.transaction_pool.remove_invalid(&unqueue_invalid);
}
let new_block = block_builder.bake()?;
info!("Prepared block for proposing at {} [hash: {:?}; parent_hash: {}; extrinsics: [{}]]",
new_block.header.number,
Hash::from(new_block.header.hash()),
new_block.header.parent_hash,
new_block.extrinsics.iter()
.map(|xt| format!("{}", BlakeTwo256::hash_of(xt)))
.collect::>()
.join(", ")
);
// TODO: full re-evaluation (https://github.com/paritytech/polkadot/issues/216)
let active_parachains = runtime_api.active_parachains(&self.parent_id)?;
assert!(evaluation::evaluate_initial(
&new_block,
self.believed_minimum_timestamp,
&self.parent_hash,
self.parent_number,
&active_parachains[..],
).is_ok());
Ok(new_block)
}
}
impl Future for CreateProposal where
TxPool: TransactionPool + 'static,
C: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static,
C::Api: ParachainHost + BlockBuilderApi + ApiExt,
{
type Output = Result;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll {
let mut state = CreateProposalState::Switching;
mem::swap(&mut state, &mut self.state);
// 1. try to propose if we have enough includable candidates and other
// delays have concluded.
let data = match state {
CreateProposalState::Pending(mut data) => {
let included = data.table.includable_count();
match data.timing.poll(cx, included) {
Poll::Pending => {
self.state = CreateProposalState::Pending(data);
return Poll::Pending
},
Poll::Ready(()) => (),
}
data
},
CreateProposalState::Switching =>
unreachable!(
"State Switching are only created on call, \
and immediately swapped out; \
the data being read is from state; \
thus Switching will never be reachable here; qed"
),
CreateProposalState::Fired(mut future) => {
let ret = Pin::new(&mut future)
.poll(cx)
.map(|res| res.map_err(Error::Join).and_then(|res| res));
self.state = CreateProposalState::Fired(future);
return ret
},
};
// 2. propose
let mut future = tokio::task::spawn_blocking(move || {
let proposed_candidates = data.table.proposed_set();
data.propose_with(proposed_candidates)
});
let polled = Pin::new(&mut future)
.poll(cx)
.map(|res| res.map_err(Error::Join).and_then(|res| res));
self.state = CreateProposalState::Fired(future);
polled
}
}
#[cfg(test)]
mod tests {
use super::*;
use sp_keyring::Sr25519Keyring;
#[test]
fn sign_and_check_statement() {
let statement: Statement = GenericStatement::Valid([1; 32].into());
let parent_hash = [2; 32].into();
let sig = sign_table_statement(&statement, &Sr25519Keyring::Alice.pair().into(), &parent_hash);
assert!(check_statement(&statement, &sig, Sr25519Keyring::Alice.public().into(), &parent_hash));
assert!(!check_statement(&statement, &sig, Sr25519Keyring::Alice.public().into(), &[0xff; 32].into()));
assert!(!check_statement(&statement, &sig, Sr25519Keyring::Bob.public().into(), &parent_hash));
}
}