Newer
Older
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Statement routing and consensus table router implementation.
//!
//! During the consensus process, validators exchange statements on validity and availability
//! of parachain candidates.
//! The `Router` in this file hooks into the underlying network to fulfill
//! the `TableRouter` trait from `polkadot-consensus`, which is expected to call into a shared statement table
//! and dispatch evaluation work as necessary when new statements come in.
use sr_primitives::traits::{ProvideRuntimeApi, BlakeTwo256, Hash as HashT};
use polkadot_consensus::{SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork};
use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt, ParachainHost};
use futures::prelude::*;
use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use consensus::Knowledge;
use super::NetworkService;
fn attestation_topic(parent_hash: Hash) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
v.extend(b"attestations");
BlakeTwo256::hash(&v[..])
}
/// Table routing implementation.
table: Arc<SharedTable>,
network: Arc<NetworkService>,
api: Arc<P>,
task_executor: TaskExecutor,
parent_hash: Hash,
knowledge: Arc<Mutex<Knowledge>>,
deferred_statements: Arc<Mutex<DeferredStatements>>,
}
pub(crate) fn new(
table: Arc<SharedTable>,
network: Arc<NetworkService>,
api: Arc<P>,
task_executor: TaskExecutor,
parent_hash: Hash,
knowledge: Arc<Mutex<Knowledge>>,
) -> Self {
Router {
table,
network,
api,
task_executor,
parent_hash,
attestation_topic: attestation_topic(parent_hash),
knowledge,
deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())),
}
}
/// Get the attestation topic for gossip.
pub(crate) fn gossip_topic(&self) -> Hash {
self.attestation_topic
}
}
fn clone(&self) -> Self {
Router {
table: self.table.clone(),
network: self.network.clone(),
api: self.api.clone(),
task_executor: self.task_executor.clone(),
parent_hash: self.parent_hash.clone(),
attestation_topic: self.attestation_topic.clone(),
deferred_statements: self.deferred_statements.clone(),
knowledge: self.knowledge.clone(),
}
}
}
impl<P: ProvideRuntimeApi + Send + Sync + 'static> Router<P>
where P::Api: ParachainHost<Block>
{
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement<Exit>(&self, statement: SignedStatement, exit: Exit)
where Exit: Future<Item=(),Error=()> + Clone + Send + 'static
{
asynchronous rob
committed
trace!(target: "p_net", "importing consensus statement {:?}", statement.statement);
// defer any statements for which we haven't imported the candidate yet
asynchronous rob
committed
let c_hash = {
let candidate_data = match statement.statement {
asynchronous rob
committed
GenericStatement::Candidate(ref c) => Some(c.hash()),
GenericStatement::Valid(ref hash)
| GenericStatement::Invalid(ref hash)
asynchronous rob
committed
=> self.table.with_candidate(hash, |c| c.map(|_| *hash)),
};
match candidate_data {
Some(x) => x,
None => {
self.deferred_statements.lock().push(statement);
return;
}
}
};
// import all statements pending on this candidate
let (mut statements, _traces) = if let GenericStatement::Candidate(_) = statement.statement {
self.deferred_statements.lock().get_deferred(&c_hash)
} else {
(Vec::new(), Vec::new())
};
// prepend the candidate statement.
asynchronous rob
committed
debug!(target: "consensus", "Importing statements about candidate {:?}", c_hash);
statements.insert(0, statement);
let producers: Vec<_> = self.table.import_remote_statements(
self,
statements.iter().cloned(),
);
// dispatch future work as necessary.
for (producer, statement) in producers.into_iter().zip(statements) {
self.knowledge.lock().note_statement(statement.sender, &statement.statement);
asynchronous rob
committed
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
asynchronous rob
committed
trace!(target: "consensus", "driving statement work to completion");
self.task_executor.spawn(work.select(exit.clone()).then(|_| Ok(())));
asynchronous rob
committed
}
}
}
fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
-> impl Future<Item=(),Error=()>
where
D: Future<Item=BlockData,Error=io::Error> + Send + 'static,
{
let table = self.table.clone();
let network = self.network.clone();
let knowledge = self.knowledge.clone();
let attestation_topic = self.attestation_topic.clone();
.map(move |produced| {
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
produced.extrinsic,
// consider something more targeted than gossip in the future.
let signed = table.sign_and_import(produced.validity);
network.gossip_consensus_message(attestation_topic, signed.encode(), false);
.map_err(|e| debug!(target: "p_net", "Failed to produce statements: {:?}", e))
}
}
impl<P: ProvideRuntimeApi + Send> TableRouter for Router<P>
where P::Api: ParachainHost<Block>
{
type FetchCandidate = BlockDataReceiver;
fn local_candidate(&self, receipt: CandidateReceipt, block_data: BlockData, extrinsic: Extrinsic) {
// give to network to make available.
let hash = receipt.hash();
let candidate = self.table.sign_and_import(GenericStatement::Candidate(receipt));
self.knowledge.lock().note_candidate(hash, Some(block_data), Some(extrinsic));
self.network.gossip_consensus_message(self.attestation_topic, candidate.encode(), false);
}
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> BlockDataReceiver {
let parent_hash = self.parent_hash.clone();
let candidate = candidate.clone();
let (tx, rx) = ::futures::sync::oneshot::channel();
self.network.with_spec(move |spec, ctx| {
let inner_rx = spec.fetch_block_data(ctx, &candidate, parent_hash);
let _ = tx.send(inner_rx);
});
BlockDataReceiver { outer: rx, inner: None }
}
}
impl<P> Drop for Router<P> {
fn drop(&mut self) {
let parent_hash = self.parent_hash.clone();
self.network.with_spec(move |spec, _| spec.remove_consensus(&parent_hash));
}
}
/// Receiver for block data.
pub struct BlockDataReceiver {
outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<BlockData>>,
inner: Option<::futures::sync::oneshot::Receiver<BlockData>>
}
impl Future for BlockDataReceiver {
type Item = BlockData;
fn poll(&mut self) -> Poll<BlockData, io::Error> {
if let Some(ref mut inner) = self.inner {
return inner
.poll()
.map_err(|_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
))
}
if let Ok(futures::Async::Ready(mut inner)) = self.outer.poll() {
let poll_result = inner.poll();
self.inner = Some(inner);
return poll_result
.map_err(|_| io::Error::new(
io::ErrorKind::Other,
"Sending end of channel hung up",
))
}
Ok(futures::Async::NotReady)
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
}
}
// A unique trace for valid statements issued by a validator.
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
enum StatementTrace {
Valid(SessionKey, Hash),
Invalid(SessionKey, Hash),
}
// helper for deferring statements whose associated candidate is unknown.
struct DeferredStatements {
deferred: HashMap<Hash, Vec<SignedStatement>>,
known_traces: HashSet<StatementTrace>,
}
impl DeferredStatements {
fn new() -> Self {
DeferredStatements {
deferred: HashMap::new(),
known_traces: HashSet::new(),
}
}
fn push(&mut self, statement: SignedStatement) {
let (hash, trace) = match statement.statement {
GenericStatement::Candidate(_) => return,
GenericStatement::Valid(hash) => (hash, StatementTrace::Valid(statement.sender, hash)),
GenericStatement::Invalid(hash) => (hash, StatementTrace::Invalid(statement.sender, hash)),
};
if self.known_traces.insert(trace) {
self.deferred.entry(hash).or_insert_with(Vec::new).push(statement);
}
}
fn get_deferred(&mut self, hash: &Hash) -> (Vec<SignedStatement>, Vec<StatementTrace>) {
match self.deferred.remove(hash) {
None => (Vec::new(), Vec::new()),
Some(deferred) => {
let mut traces = Vec::new();
for statement in deferred.iter() {
let trace = match statement.statement {
GenericStatement::Candidate(_) => continue,
GenericStatement::Valid(hash) => StatementTrace::Valid(statement.sender, hash),
GenericStatement::Invalid(hash) => StatementTrace::Invalid(statement.sender, hash),
};
self.known_traces.remove(&trace);
traces.push(trace);
}
(deferred, traces)
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use substrate_primitives::H512;
#[test]
fn deferred_statements_works() {
let mut deferred = DeferredStatements::new();
let hash = [1; 32].into();
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
let sender = [255; 32].into();
let statement = SignedStatement {
statement: GenericStatement::Valid(hash),
sender,
signature: sig,
};
// pre-push.
{
let (signed, traces) = deferred.get_deferred(&hash);
assert!(signed.is_empty());
assert!(traces.is_empty());
}
deferred.push(statement.clone());
deferred.push(statement.clone());
// draining: second push should have been ignored.
{
let (signed, traces) = deferred.get_deferred(&hash);
assert_eq!(signed.len(), 1);
assert_eq!(traces.len(), 1);
assert_eq!(signed[0].clone(), statement);
assert_eq!(traces[0].clone(), StatementTrace::Valid(sender, hash));
}
// after draining
{
let (signed, traces) = deferred.get_deferred(&hash);
assert!(signed.is_empty());
assert!(traces.is_empty());
}
}
}