Commit d3f3cb71 authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by asynchronous rob
Browse files

Polkadot service (#82)

* Block import notifications

* Build fix

* Consensus messages supported in the networking

* Started consensus service

* BFT service

* Transaction propagation

* Polkadot service

* CLI integration

* Build fix

* Added signatures validation

* Removed executor argument

* Refactored steam loops; Queue size increased

* Limit queue size

* Fixed doc comment

* Fixed wasm build

* Fixed wasm build

* Check id properly
parent 776b61ed
......@@ -23,4 +23,4 @@ substrate-rpc-servers = { path = "../../substrate/rpc-servers" }
polkadot-primitives = { path = "../primitives" }
polkadot-executor = { path = "../executor" }
polkadot-runtime = { path = "../runtime" }
polkadot-keystore = { path = "../keystore" }
polkadot-service = { path = "../service" }
......@@ -22,15 +22,11 @@ error_chain! {
foreign_links {
Io(::std::io::Error) #[doc="IO error"];
Cli(::clap::Error) #[doc="CLI error"];
Service(::service::Error) #[doc="Polkadot service error"];
}
links {
Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"];
}
errors {
/// Key store errors
Keystore(e: ::keystore::Error) {
description("Keystore error"),
display("Keystore error: {:?}", e),
}
}
}
......@@ -30,10 +30,8 @@ extern crate substrate_rpc_servers as rpc;
extern crate polkadot_primitives;
extern crate polkadot_executor;
extern crate polkadot_runtime;
extern crate polkadot_keystore as keystore;
extern crate polkadot_service as service;
#[macro_use]
extern crate hex_literal;
#[macro_use]
extern crate clap;
#[macro_use]
......@@ -45,11 +43,6 @@ pub mod error;
use std::path::{Path, PathBuf};
use codec::Slicable;
use polkadot_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig};
use client::genesis;
use keystore::Store as Keystore;
/// Parse command line arguments and start the node.
///
/// IANA unassigned port ranges that we could use:
......@@ -69,52 +62,33 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
let log_pattern = matches.value_of("log").unwrap_or("");
init_logger(log_pattern);
// Create client
let executor = polkadot_executor::Executor::new();
let mut storage = Default::default();
let god_key = hex!["3d866ec8a9190c8343c2fc593d21d8a6d0c5c4763aaab2349de3a6111d64d124"];
let genesis_config = GenesisConfig {
validators: vec![god_key.clone()],
authorities: vec![god_key.clone()],
balances: vec![(god_key.clone(), 1u64 << 63)].into_iter().collect(),
block_time: 5, // 5 second block time.
session_length: 720, // that's 1 hour per session.
sessions_per_era: 24, // 24 hours per era.
bonding_duration: 90, // 90 days per bond.
approval_ratio: 667, // 66.7% approvals required for legislation.
};
let prepare_genesis = || {
storage = genesis_config.genesis_map();
let block = genesis::construct_genesis_block(&storage);
storage.extend(additional_storage_with_genesis(&block));
(primitives::block::Header::decode(&mut block.header.encode().as_ref()).expect("to_vec() always gives a valid serialisation; qed"), storage.into_iter().collect())
};
let mut config = service::Configuration::default();
let keystore_path = matches.value_of("keystore")
config.keystore_path = matches.value_of("keystore")
.map(|x| Path::new(x).to_owned())
.unwrap_or_else(default_keystore_path);
let _keystore = Keystore::open(keystore_path).map_err(::error::ErrorKind::Keystore)?;
let client = client::new_in_mem(executor, prepare_genesis)?;
let address = "127.0.0.1:9933".parse().unwrap();
let handler = rpc::rpc_handler(client);
let server = rpc::start_http(&address, handler)?;
.unwrap_or_else(default_keystore_path)
.to_string_lossy()
.into();
let mut role = service::Role::FULL;
if let Some(_) = matches.subcommand_matches("collator") {
info!("Starting collator.");
server.wait();
return Ok(());
role = service::Role::COLLATOR;
}
if let Some(_) = matches.subcommand_matches("validator") {
else if let Some(_) = matches.subcommand_matches("validator") {
info!("Starting validator.");
server.wait();
return Ok(());
role = service::Role::VALIDATOR;
}
config.roles = role;
let service = service::Service::new(config)?;
let address = "127.0.0.1:9933".parse().unwrap();
let handler = rpc::rpc_handler(service.client());
let server = rpc::start_http(&address, handler)?;
server.wait();
println!("No command given.\n");
let _ = clap::App::from_yaml(yaml).print_long_help();
......
......@@ -18,3 +18,8 @@ polkadot-transaction-pool = { path = "../transaction-pool" }
substrate-bft = { path = "../../substrate/bft" }
substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-network = { path = "../../substrate/network" }
tokio-core = "0.1.12"
substrate-keyring = { path = "../../substrate/keyring" }
substrate-client = { path = "../../substrate/client" }
......@@ -48,6 +48,10 @@ error_chain! {
::MAX_TRANSACTIONS_SIZE, ::MAX_TRANSACTIONS_SIZE.saturating_sub(*size)
),
}
Executor(e: ::futures::future::ExecuteErrorKind) {
description("Unable to dispatch agreement future"),
display("Unable to dispatch agreement future: {:?}", e),
}
}
}
......
......@@ -41,10 +41,14 @@ extern crate polkadot_transaction_pool as transaction_pool;
extern crate substrate_bft as bft;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
extern crate substrate_network;
extern crate tokio_core;
extern crate substrate_keyring;
extern crate substrate_client as client;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
......@@ -67,8 +71,10 @@ use futures::future;
use parking_lot::Mutex;
pub use self::error::{ErrorKind, Error};
pub use service::Service;
mod error;
mod service;
// block size limit.
const MAX_TRANSACTIONS_SIZE: usize = 4 * 1024 * 1024;
......@@ -83,7 +89,7 @@ pub trait TableRouter {
type FetchExtrinsic: IntoFuture<Item=Extrinsic,Error=Self::Error>;
/// Note local candidate data.
fn local_candidate_data(&self, block_data: BlockData, extrinsic: Extrinsic);
fn local_candidate_data(&self, hash: Hash, block_data: BlockData, extrinsic: Extrinsic);
/// Fetch block data for a specific candidate.
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate;
......
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Consensus service.
/// Consensus service. A long runnung service that manages BFT agreement and parachain
/// candidate agreement over the network.
use std::thread;
use std::sync::Arc;
use futures::{future, Future, Stream, Sink, Async, Canceled};
use parking_lot::Mutex;
use substrate_network as net;
use tokio_core::reactor;
use client::BlockchainEvents;
use substrate_keyring::Keyring;
use primitives::{Hash, AuthorityId};
use primitives::block::{Id as BlockId, HeaderHash, Header};
use polkadot_primitives::parachain::{BlockData, Extrinsic, CandidateReceipt};
use polkadot_api::PolkadotApi;
use bft::{self, BftService};
use transaction_pool::TransactionPool;
use ed25519;
use super::{TableRouter, SharedTable, ProposerFactory};
use error::Error;
struct BftSink<E> {
network: Arc<net::ConsensusService>,
_e: ::std::marker::PhantomData<E>,
}
fn process_message(msg: net::BftMessage, authorities: &[AuthorityId], parent_hash: HeaderHash) -> Result<bft::Communication, bft::Error> {
Ok(match msg {
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
let proposal = bft::generic::LocalizedProposal {
round_number: proposal.round_number as usize,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: ed25519::LocalizedSignature {
signature: proposal.digest_signature,
signer: ed25519::Public(proposal.sender),
},
full_signature: ed25519::LocalizedSignature {
signature: proposal.full_signature,
signer: ed25519::Public(proposal.sender),
}
};
bft::check_proposal(authorities, &parent_hash, &proposal)?;
proposal
}),
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
let vote = bft::generic::LocalizedVote {
sender: vote.sender,
signature: ed25519::LocalizedSignature {
signature: vote.signature,
signer: ed25519::Public(vote.sender),
},
vote: match vote.vote {
net::ConsensusVote::Prepare(r, h) => bft::generic::Vote::Prepare(r as usize, h),
net::ConsensusVote::Commit(r, h) => bft::generic::Vote::Commit(r as usize, h),
net::ConsensusVote::AdvanceRound(r) => bft::generic::Vote::AdvanceRound(r as usize),
}
};
bft::check_vote(authorities, &parent_hash, &vote)?;
vote
}),
}),
net::BftMessage::Auxiliary(a) => {
let justification = bft::UncheckedJustification::from(a);
// TODO: get proper error
let justification: Result<_, bft::Error> = bft::check_prepare_justification(authorities, parent_hash, justification)
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?)
},
})
}
impl<E> Sink for BftSink<E> {
type SinkItem = bft::Communication;
// TODO: replace this with the ! type when that's stabilized
type SinkError = E;
fn start_send(&mut self, message: bft::Communication) -> ::futures::StartSend<bft::Communication, E> {
let network_message = match message {
bft::generic::Communication::Consensus(c) => net::BftMessage::Consensus(match c {
bft::generic::LocalizedMessage::Propose(proposal) => net::SignedConsensusMessage::Propose(net::SignedConsensusProposal {
round_number: proposal.round_number as u32,
proposal: proposal.proposal,
digest: proposal.digest,
sender: proposal.sender,
digest_signature: proposal.digest_signature.signature,
full_signature: proposal.full_signature.signature,
}),
bft::generic::LocalizedMessage::Vote(vote) => net::SignedConsensusMessage::Vote(net::SignedConsensusVote {
sender: vote.sender,
signature: vote.signature.signature,
vote: match vote.vote {
bft::generic::Vote::Prepare(r, h) => net::ConsensusVote::Prepare(r as u32, h),
bft::generic::Vote::Commit(r, h) => net::ConsensusVote::Commit(r as u32, h),
bft::generic::Vote::AdvanceRound(r) => net::ConsensusVote::AdvanceRound(r as u32),
}
}),
}),
bft::generic::Communication::Auxiliary(justification) => net::BftMessage::Auxiliary(justification.uncheck().into()),
};
self.network.send_bft_message(network_message);
Ok(::futures::AsyncSink::Ready)
}
fn poll_complete(&mut self) -> ::futures::Poll<(), E> {
Ok(Async::Ready(()))
}
}
/// Consensus service. Starts working when created.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
}
struct Network(Arc<net::ConsensusService>);
impl Service {
/// Create and start a new instance.
pub fn new<C>(client: Arc<C>, network: Arc<net::ConsensusService>, transaction_pool: Arc<Mutex<TransactionPool>>, best_header: &Header) -> Service
where C: BlockchainEvents + bft::BlockImport + bft::Authorities + PolkadotApi + Send + Sync + 'static
{
let best_header = best_header.clone();
let thread = thread::spawn(move || {
let mut core = reactor::Core::new().expect("tokio::Core could not be created");
let key = Arc::new(Keyring::One.into());
let factory = ProposerFactory {
client: client.clone(),
transaction_pool: transaction_pool.clone(),
network: Network(network.clone()),
};
let bft_service = BftService::new(client.clone(), key, factory);
let build_bft = |header: &Header| -> Result<_, Error> {
let hash = header.hash();
let authorities = client.authorities(&BlockId::Hash(hash))?;
let input = network.bft_messages()
.filter_map(move |message| {
process_message(message, &authorities, hash.clone())
.map_err(|e| debug!("Message validation failed: {:?}", e))
.ok()
})
.map_err(|_| bft::InputStreamConcluded.into());
let output = BftSink { network: network.clone(), _e: Default::default() };
Ok(bft_service.build_upon(&header, input, output)?)
};
// Kickstart BFT agreement on start.
if let Err(e) = build_bft(&best_header)
.map_err(|e| debug!("Error creating initial BFT agreement: {:?}", e))
.and_then(|bft| core.run(bft))
{
debug!("Error starting initial BFT agreement: {:?}", e);
}
let bft = client.import_notification_stream().and_then(|notification| {
build_bft(&notification.header).map_err(|e| debug!("BFT agreement error: {:?}", e))
}).for_each(|f| f);
if let Err(e) = core.run(bft) {
debug!("BFT event loop error {:?}", e);
}
});
Service {
thread: Some(thread)
}
}
}
impl Drop for Service {
fn drop(&mut self) {
if let Some(thread) = self.thread.take() {
thread.join().expect("The service thread has panicked");
}
}
}
impl super::Network for Network {
type TableRouter = Router;
fn table_router(&self, _table: Arc<SharedTable>) -> Self::TableRouter {
Router {
network: self.0.clone()
}
}
}
type FetchCandidateAdapter = future::Map<net::FetchFuture, fn(Vec<u8>) -> BlockData>;
struct Router {
network: Arc<net::ConsensusService>,
}
impl Router {
fn fetch_candidate_adapter(data: Vec<u8>) -> BlockData {
BlockData(data)
}
}
impl TableRouter for Router {
type Error = Canceled;
type FetchCandidate = FetchCandidateAdapter;
type FetchExtrinsic = future::FutureResult<Extrinsic, Self::Error>;
fn local_candidate_data(&self, hash: Hash, block_data: BlockData, _extrinsic: Extrinsic) {
let data = block_data.0;
self.network.set_local_candidate(Some((hash, data)))
}
fn fetch_block_data(&self, candidate: &CandidateReceipt) -> Self::FetchCandidate {
let hash = candidate.hash();
self.network.fetch_candidate(&hash).map(Self::fetch_candidate_adapter)
}
fn fetch_extrinsic_data(&self, _candidate: &CandidateReceipt) -> Self::FetchExtrinsic {
future::ok(Extrinsic)
}
}
[package]
name = "polkadot-service"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
[dependencies]
futures = "0.1.17"
parking_lot = "0.4"
tokio-timer = "0.1.2"
ed25519 = { path = "../../substrate/ed25519" }
error-chain = "0.11"
log = "0.4"
tokio-core = "0.1.12"
polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" }
polkadot-consensus = { path = "../consensus" }
polkadot-executor = { path = "../executor" }
polkadot-api = { path = "../api" }
polkadot-transaction-pool = { path = "../transaction-pool" }
polkadot-keystore = { path = "../keystore" }
substrate-primitives = { path = "../../substrate/primitives" }
substrate-network = { path = "../../substrate/network" }
substrate-client = { path = "../../substrate/client" }
substrate-keyring = { path = "../../substrate/keyring" }
substrate-codec = { path = "../../substrate/codec" }
substrate-executor = { path = "../../substrate/executor" }
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?
//! Service configuration.
use transaction_pool;
pub use network::Role;
pub use network::NetworkConfiguration;
/// Service configuration.
pub struct Configuration {
/// Node roles.
pub roles: Role,
/// Transaction pool configuration.
pub transaction_pool: transaction_pool::Options,
/// Network configuration.
pub network: NetworkConfiguration,
/// Path to key files.
pub keystore_path: String,
// TODO: add more network, client, tx pool configuration options
}
impl Default for Configuration {
fn default() -> Configuration {
Configuration {
roles: Role::FULL,
transaction_pool: Default::default(),
network: Default::default(),
keystore_path: Default::default(),
}
}
}
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Errors that can occur during the service operation.
use client;
use network;
error_chain! {
links {
Client(client::error::Error, client::error::ErrorKind) #[doc="Client error"];
Network(network::error::Error, network::error::ErrorKind) #[doc="Network error"];
}
errors {
/// Key store errors
Keystore(e: ::keystore::Error) {
description("Keystore error"),
display("Keystore error: {:?}", e),
}
}
}
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Polkadot service. Starts a thread that spins the network, the client and the transaction pool.
//! Manages communication between them.
extern crate futures;
extern crate ed25519;
extern crate parking_lot;
extern crate tokio_timer;
extern crate polkadot_primitives;
extern crate polkadot_runtime;
extern crate polkadot_executor;
extern crate polkadot_api;
extern crate polkadot_consensus as consensus;
extern crate polkadot_transaction_pool as transaction_pool;
extern crate polkadot_keystore as keystore;
extern crate substrate_primitives as primitives;
extern crate substrate_network as network;
extern crate substrate_codec as codec;
extern crate substrate_executor;
extern crate tokio_core;
extern crate substrate_keyring;
extern crate substrate_client as client;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
mod error;
mod config;
use std::sync::Arc;
use std::thread;
use futures::prelude::*;
use parking_lot::Mutex;
use tokio_core::reactor::Core;
use codec::Slicable;
use primitives::block::{Id as BlockId, TransactionHash};
use transaction_pool::TransactionPool;
use substrate_keyring::Keyring;
use substrate_executor::NativeExecutor;
use polkadot_executor::Executor as LocalDispatch;
use polkadot_primitives::AccountId;
use keystore::Store as Keystore;
use polkadot_api::PolkadotApi;
use polkadot_runtime::genesismap::{additional_storage_with_genesis, GenesisConfig};
use client::{genesis, BlockchainEvents};
use client::in_mem::Backend as InMemory;
use network::ManageNetwork;
pub use self::error::{ErrorKind, Error};
pub use config::{Configuration, Role};
type Client = client::Client<InMemory, NativeExecutor<LocalDispatch>>;
/// Polkadot service.
pub struct Service {
thread: Option<thread::JoinHandle<()>>,
client: Arc<Client>,
network: Arc<network::Service>,
_consensus: Option<consensus::Service>,
}
struct TransactionPoolAdapter {
pool: Arc<Mutex<TransactionPool>>,
client: Arc<Client>,
}
impl network::TransactionPool for TransactionPoolAdapter {
fn transactions(&self) -> Vec<(TransactionHash, Vec<u8>)> {
let best_block = match self.client.info() {
Ok(info) => info.chain.best_hash,