Newer
Older
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The proposer proposes new blocks to include
#![deny(unused_crate_dependencies, unused_results)]
use futures::prelude::*;
use futures::select;
use polkadot_node_subsystem::{messages::{AllMessages, ProvisionerInherentData, ProvisionerMessage}, SubsystemError};
use polkadot_overseer::OverseerHandler;
Block, Hash, Header,
};
use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider};
use sp_core::traits::SpawnNamed;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_blockchain::HeaderBackend;
use sp_consensus::{Proposal, RecordProof};
use sp_inherents::InherentData;
use sp_runtime::traits::{DigestFor, HashFor};
use sp_transaction_pool::TransactionPool;
use prometheus_endpoint::Registry as PrometheusRegistry;
use std::{fmt, pin::Pin, sync::Arc, time};
/// How long proposal can take before we give up and err out
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(2500);
/// Custom Proposer factory for Polkadot
pub struct ProposerFactory<TxPool, Backend, Client> {
inner: sc_basic_authorship::ProposerFactory<TxPool, Backend, Client>,
overseer: OverseerHandler,
}
impl<TxPool, Backend, Client> ProposerFactory<TxPool, Backend, Client> {
pub fn new(
spawn_handle: impl SpawnNamed + 'static,
client: Arc<Client>,
transaction_pool: Arc<TxPool>,
overseer: OverseerHandler,
prometheus: Option<&PrometheusRegistry>,
) -> Self {
ProposerFactory {
inner: sc_basic_authorship::ProposerFactory::new(
),
overseer,
}
}
}
impl<TxPool, Backend, Client> sp_consensus::Environment<Block>
for ProposerFactory<TxPool, Backend, Client>
where
TxPool: 'static + TransactionPool<Block = Block>,
Client: 'static
+ BlockBuilderProvider<Backend, Block, Client>
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ Send
+ Sync,
Client::Api:
BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
Backend:
'static + sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
{
type CreateProposer = Pin<Box<
dyn Future<Output = Result<Self::Proposer, Self::Error>> + Send + 'static,
>>;
type Proposer = Proposer<TxPool, Backend, Client>;
type Error = Error;
fn init(&mut self, parent_header: &Header) -> Self::CreateProposer {
// create the inner proposer
let proposer = self.inner.init(parent_header).into_inner();
// data to be moved into the future
let overseer = self.overseer.clone();
let parent_header_hash = parent_header.hash();
async move {
Ok(Proposer {
inner: proposer?,
overseer,
parent_header_hash,
})
}.boxed()
}
}
/// Custom Proposer for Polkadot.
///
/// This proposer gets the ProvisionerInherentData and injects it into the wrapped
/// proposer's inherent data, then delegates the actual proposal generation.
pub struct Proposer<TxPool: TransactionPool<Block = Block>, Backend, Client> {
inner: sc_basic_authorship::Proposer<Backend, Block, Client, TxPool>,
overseer: OverseerHandler,
parent_header_hash: Hash,
}
// This impl has the same generic bounds as the Proposer impl.
impl<TxPool, Backend, Client> Proposer<TxPool, Backend, Client>
where
TxPool: 'static + TransactionPool<Block = Block>,
Client: 'static
+ BlockBuilderProvider<Backend, Block, Client>
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ Send
+ Sync,
Client::Api:
BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend:
'static + sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
{
/// Get provisioner inherent data
///
/// This function has a constant timeout: `PROPOSE_TIMEOUT`.
async fn get_provisioner_data(&self) -> Result<ProvisionerInherentData, Error> {
// clone this (lightweight) data because we're going to move it into the future
let mut overseer = self.overseer.clone();
let parent_header_hash = self.parent_header_hash.clone();
let pid = async {
let (sender, receiver) = futures::channel::oneshot::channel();
overseer.wait_for_activation(parent_header_hash, sender).await;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??;
let (sender, receiver) = futures::channel::oneshot::channel();
overseer.send_msg(AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData)
};
let mut timeout = futures_timer::Delay::new(PROPOSE_TIMEOUT).fuse();
pid = pid.fuse() => pid,
}
}
}
impl<TxPool, Backend, Client> sp_consensus::Proposer<Block> for Proposer<TxPool, Backend, Client>
where
TxPool: 'static + TransactionPool<Block = Block>,
Client: 'static
+ BlockBuilderProvider<Backend, Block, Client>
+ ProvideRuntimeApi<Block>
+ HeaderBackend<Block>
+ Send
+ Sync,
Client::Api:
BlockBuilderApi<Block> + ApiExt<Block, Error = sp_blockchain::Error>,
Backend:
'static + sc_client_api::Backend<Block, State = sp_api::StateBackendFor<Client, Block>>,
// Rust bug: https://github.com/rust-lang/rust/issues/24159
sp_api::StateBackendFor<Client, Block>: sp_api::StateBackend<HashFor<Block>> + Send,
{
type Transaction = sc_client_api::TransactionFor<Backend, Block>;
type Proposal = Pin<Box<
dyn Future<Output = Result<Proposal<Block, sp_api::TransactionFor<Client, Block>>, Error>> + Send,
>>;
type Error = Error;
fn propose(
self,
mut inherent_data: InherentData,
inherent_digests: DigestFor<Block>,
max_duration: time::Duration,
record_proof: RecordProof,
) -> Self::Proposal {
async move {
// TODO: how can we tell, here, if we expect a heavy block?
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
//
// The runtime exposes `frame_system::Module::block_weight`
// (https://substrate.dev/rustdocs/v2.0.0/frame_system/struct.Module.html?search=#method.block_weight)
// and the `MaximumBlockWeight`
// (https://substrate.dev/rustdocs/v2.0.0/frame_system/trait.Trait.html#associatedtype.MaximumBlockWeight),
// so fundamentally this is a question of propagating the necessary runtime information
// through the Runtime API. At that point, it's a simple inequality:
//
// ```rust
// let expect_heavy_block = block_weight > maximum_block_weight - MARGIN;
// ```
//
// Unfortunately, it's not quite that simple, because the whole point of this proposer
// is to inject the provisioner data before the substrate proposer runs. Before it runs,
// the `block_weight` function isn't going to give us any useful information, beacuse
// nothing has yet been proposed to be included in the block.
//
// The naive option is very simple: run the proposer, then weigh the block. Either add a
// dry-run mode to the internal proposer, or run the internal proposer and then revert
// all state changes that it's made. The downside of this approach is that it runs
// everything twice, cutting runtime performance literally in half. That would be
// suboptimal.
//
// A somewhat more sophisticated approach takes advantage of the fact that Substrate's
// proposer is greedy: if it is possible to include all proposed transactions, then it
// will do so. This means that we can just compute the weight of all the transactions in
// the pool, and use essentially the same inequality:
//
// ```rust
// let expect_heavy_block = sum_of_tx_weights > maximum_block_weight - MARGIN;
// ```
//
// This is complicated by the fact that transactions are code, not data: in principle,
// it would be possible for an attacker to craft a transaction which is heavy and looks
// valid to the transaction pool, but which aborts cheaply when it is executed,
// preventing its costs from being deducted from the attacker. Spamming the relay chain
// with sufficient of these transactions would prevent all parachain progress.
let expect_heavy_block = false;
let provisioner_data = if !expect_heavy_block {
match self.get_provisioner_data().await {
Ok(pd) => pd,
Err(err) => {
tracing::warn!(err = ?err, "could not get provisioner inherent data; injecting default data");
Default::default()
}
} else {
Default::default()
};
inherent_data.put_data(
polkadot_primitives::v1::INCLUSION_INHERENT_IDENTIFIER,
&provisioner_data,
)?;
self.inner
.propose(inherent_data, inherent_digests, max_duration, record_proof)
.await
.map_err(Into::into)
}
.boxed()
}
}
// It would have been more ergonomic to use thiserror to derive the
// From implementations, Display, and std::error::Error, but unfortunately
// one of the wrapped errors (sp_inherents::Error) also
// don't impl std::error::Error, which breaks the thiserror derive.
#[derive(Debug)]
pub enum Error {
Consensus(sp_consensus::Error),
Blockchain(sp_blockchain::Error),
Inherent(sp_inherents::Error),
Timeout,
ClosedChannelAwaitingActivation,
ClosedChannelAwaitingInherentData,
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
Subsystem(SubsystemError)
}
impl From<sp_consensus::Error> for Error {
fn from(e: sp_consensus::Error) -> Error {
Error::Consensus(e)
}
}
impl From<sp_blockchain::Error> for Error {
fn from(e: sp_blockchain::Error) -> Error {
Error::Blockchain(e)
}
}
impl From<sp_inherents::Error> for Error {
fn from(e: sp_inherents::Error) -> Error {
Error::Inherent(e)
}
}
impl From<SubsystemError> for Error {
fn from(e: SubsystemError) -> Error {
Error::Subsystem(e)
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Consensus(err) => write!(f, "consensus error: {}", err),
Self::Blockchain(err) => write!(f, "blockchain error: {}", err),
Self::Inherent(err) => write!(f, "inherent error: {:?}", err),
Self::Timeout => write!(f, "timeout: provisioner did not return inherent data after {:?}", PROPOSE_TIMEOUT),
Self::ClosedChannelAwaitingActivation => write!(f, "closed channel from overseer when awaiting activation"),
Self::ClosedChannelAwaitingInherentData => write!(f, "closed channel from provisioner when awaiting inherent data"),
Self::Subsystem(err) => write!(f, "subsystem error: {:?}", err),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Consensus(err) => Some(err),
Self::Blockchain(err) => Some(err),
Self::Subsystem(err) => Some(err),