Commit bedde457 authored by asynchronous rob's avatar asynchronous rob Committed by Sergey Pepyakin
Browse files

Collator node workflow (#280)

* arbitrary application logic in CLI

* collation work

* split up exit and work futures in application

* collation node workflow

* typo

* indentation fix

* doc grumbles

* rename Application to Worker

* refactor Worker::exit to exit_only
parent 6d8720ac
......@@ -20,7 +20,6 @@ ed25519 = { path = "../../substrate/ed25519" }
app_dirs = "1.2"
tokio = "0.1.7"
futures = "0.1.17"
ctrlc = { git = "https://github.com/paritytech/rust-ctrlc.git" }
fdlimit = "0.1"
parking_lot = "0.4"
serde_json = "1.0"
......
......@@ -24,10 +24,9 @@ extern crate atty;
extern crate ansi_term;
extern crate regex;
extern crate time;
extern crate fdlimit;
extern crate futures;
extern crate tokio;
extern crate ctrlc;
extern crate fdlimit;
extern crate ed25519;
extern crate triehash;
extern crate parking_lot;
......@@ -66,6 +65,11 @@ mod informant;
mod chain_spec;
pub use chain_spec::ChainSpec;
pub use client::error::Error as ClientError;
pub use client::backend::Backend as ClientBackend;
pub use state_machine::Backend as StateMachineBackend;
pub use polkadot_primitives::Block as PolkadotBlock;
pub use service::{Components as ServiceComponents, Service};
use std::io::{self, Write, Read, stdin, stdout};
use std::fs::File;
......@@ -117,6 +121,26 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
.unwrap_or_else(default_base_path)
}
/// Additional worker making use of the node, to run asynchronously before shutdown.
///
/// This will be invoked with the service and spawn a future that resolves
/// when complete.
pub trait Worker {
/// A future that resolves when the work is done or the node should exit.
/// This will be run on a tokio runtime.
type Work: Future<Item=(),Error=()>;
/// An exit scheduled for the future.
type Exit: Future<Item=(),Error=()> + Send + 'static;
/// Don't work, but schedule an exit.
fn exit_only(self) -> Self::Exit;
/// Do work and schedule exit.
fn work<C: ServiceComponents>(self, service: &Service<C>) -> Self::Work
where ClientError: From<<<<C as ServiceComponents>::Backend as ClientBackend<PolkadotBlock>>::State as StateMachineBackend>::Error>;
}
/// Parse command line arguments and start the node.
///
/// IANA unassigned port ranges that we could use:
......@@ -125,9 +149,10 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
/// 9556-9591 Unassigned
/// 9803-9874 Unassigned
/// 9926-9949 Unassigned
pub fn run<I, T>(args: I) -> error::Result<()> where
pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
I: IntoIterator<Item = T>,
T: Into<std::ffi::OsString> + Clone,
W: Worker,
{
let yaml = load_yaml!("./cli.yml");
let matches = match clap::App::from_yaml(yaml).version(&(crate_version!().to_owned() + "\n")[..]).get_matches_from_safe(args) {
......@@ -154,11 +179,11 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
}
if let Some(matches) = matches.subcommand_matches("export-blocks") {
return export_blocks(matches);
return export_blocks(matches, worker.exit_only());
}
if let Some(matches) = matches.subcommand_matches("import-blocks") {
return import_blocks(matches);
return import_blocks(matches, worker.exit_only());
}
let spec = load_spec(&matches)?;
......@@ -255,8 +280,8 @@ pub fn run<I, T>(args: I) -> error::Result<()> where
};
match role == service::Role::LIGHT {
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf)?,
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
}
// TODO: hard exit if this stalls?
......@@ -272,16 +297,19 @@ fn build_spec(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}
fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn export_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let base_path = base_path(matches);
let spec = load_spec(&matches)?;
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
info!("DB path: {}", config.database_path);
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();
::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
info!("Exporting blocks");
let mut block: u32 = match matches.value_of("from") {
......@@ -310,7 +338,7 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
}
loop {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match client.block(&BlockId::number(block as u64))? {
......@@ -334,15 +362,19 @@ fn export_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}
fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
fn import_blocks<E>(matches: &clap::ArgMatches, exit: E) -> error::Result<()>
where E: Future<Item=(),Error=()> + Send + 'static
{
let spec = load_spec(&matches)?;
let base_path = base_path(matches);
let mut config = service::Configuration::default_with_spec(spec);
config.database_path = db_path(&base_path).to_string_lossy().into();
let client = service::new_client(config)?;
let (exit_send, exit) = std::sync::mpsc::channel();
ctrlc::CtrlC::set_handler(move || {
exit_send.clone().send(()).expect("Error sending exit notification");
let (exit_send, exit_recv) = std::sync::mpsc::channel();
::std::thread::spawn(move || {
let _ = exit.wait();
let _ = exit_send.send(());
});
let mut file: Box<Read> = match matches.value_of("INPUT") {
......@@ -354,7 +386,7 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
let count: u32 = Slicable::decode(&mut file).ok_or("Error reading file")?;
let mut block = 0;
for _ in 0 .. count {
if exit.try_recv().is_ok() {
if exit_recv.try_recv().is_ok() {
break;
}
match SignedBlock::decode(&mut file) {
......@@ -377,27 +409,19 @@ fn import_blocks(matches: &clap::ArgMatches) -> error::Result<()> {
Ok(())
}
fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matches: &clap::ArgMatches, sys_conf: SystemConfiguration) -> error::Result<()>
fn run_until_exit<C, W>(
runtime: &mut Runtime,
service: service::Service<C>,
matches: &clap::ArgMatches,
sys_conf: SystemConfiguration,
worker: W,
) -> error::Result<()>
where
C: service::Components,
W: Worker,
client::error::Error: From<<<<C as service::Components>::Backend as client::backend::Backend<Block>>::State as state_machine::Backend>::Error>,
{
let exit = {
let (exit_send, exit) = exit_future::signal();
let exit_send = ::std::cell::RefCell::new(Some(exit_send));
ctrlc::CtrlC::set_handler(move || {
let exit_send = exit_send
.try_borrow_mut()
.expect("only borrowed in non-reetrant signal handler; qed")
.take();
if let Some(signal) = exit_send {
signal.fire();
}
});
exit
};
let (exit_send, exit) = exit_future::signal();
let executor = runtime.executor();
informant::start(&service, exit.clone(), executor.clone());
......@@ -422,7 +446,8 @@ fn run_until_exit<C>(runtime: &mut Runtime, service: service::Service<C>, matche
)
};
let _ = exit.wait();
let _ = worker.work(&service).wait();
exit_send.fire();
Ok(())
}
......
......@@ -2,12 +2,16 @@
name = "polkadot-collator"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
description = "Abstract collation logic"
description = "Collator node implementation"
[dependencies]
futures = "0.1.17"
substrate-client = { path = "../../substrate/client" }
substrate-codec = { path = "../../substrate/codec", version = "0.1" }
substrate-primitives = { path = "../../substrate/primitives", version = "0.1" }
polkadot-api = { path = "../api" }
polkadot-runtime = { path = "../runtime", version = "0.1" }
polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-parachain = { path = "../parachain", version = "0.1" }
polkadot-cli = { path = "../cli" }
log = "0.4"
ed25519 = { path = "../../substrate/ed25519" }
......@@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Collation Logic.
//! Collation node logic.
//!
//! A collator node lives on a distinct parachain and submits a proposal for
//! a state transition, along with a proof for its validity
......@@ -28,7 +28,7 @@
//! destination B as egress(X)[A -> B]
//!
//! On every block, each parachain will be intended to route messages from some
//! subset of all the other parachains.
//! subset of all the other parachains. (NOTE: in practice this is not done until PoC-3)
//!
//! Since the egress information is unique to every block, when routing from a
//! parachain a collator must gather all egress posts from that parachain
......@@ -45,25 +45,41 @@
//! to be performed, as the collation logic itself.
extern crate futures;
extern crate substrate_client as client;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
extern crate ed25519;
extern crate polkadot_api;
extern crate polkadot_cli;
extern crate polkadot_runtime;
extern crate polkadot_primitives;
#[macro_use]
extern crate log;
use std::collections::{BTreeSet, BTreeMap};
use std::sync::Arc;
use futures::{stream, Stream, Future, IntoFuture};
use polkadot_primitives::parachain::{self, CandidateSignature, ConsolidatedIngress, Message, Id as ParaId};
use futures::{future, stream, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use polkadot_api::PolkadotApi;
use polkadot_primitives::BlockId;
use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId};
use polkadot_cli::{ClientError, ServiceComponents, ClientBackend, PolkadotBlock, StateMachineBackend, Service};
use polkadot_cli::Worker;
/// Parachain context needed for collation.
///
/// This can be implemented through an externally attached service or a stub.
pub trait ParachainContext {
/// Produce a candidate, given the latest ingress queue information.
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
/// Produce a candidate, given the latest ingress queue information and the last parachain head.
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self,
last_head: HeadData,
ingress: I,
) -> (parachain::BlockData, polkadot_primitives::AccountId, CandidateSignature);
) -> (BlockData, HeadData);
}
/// Relay chain context needed to collate.
......@@ -120,29 +136,145 @@ pub fn collate_ingress<'a, R>(relay_context: R)
.map(ConsolidatedIngress)
}
/// Produce a candidate for the parachain.
pub fn collate<'a, R: 'a, P>(local_id: ParaId, relay_context: R, para_context: P)
-> impl Future<Item=parachain::Candidate, Error=R::Error> + 'a
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub fn collate<'a, R, P>(
local_id: ParaId,
last_head: HeadData,
relay_context: R,
para_context: P,
key: Arc<ed25519::Pair>,
)
-> impl Future<Item=parachain::Collation, Error=R::Error> + 'a
where
R: RelayChainContext,
R::Error: 'a,
R: RelayChainContext + 'a,
R::Error: 'a,
R::FutureEgress: 'a,
P: ParachainContext + 'a,
{
collate_ingress(relay_context).map(move |ingress| {
let (block_data, _, signature) = para_context.produce_candidate(
let (block_data, head_data) = para_context.produce_candidate(
last_head,
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
);
parachain::Candidate {
let signature = key.sign(&block_data.0[..]).into();
let pubkey_bytes: [u8; 32] = key.public().into();
let receipt = parachain::CandidateReceipt {
parachain_index: local_id,
collator_signature: signature,
block: block_data,
unprocessed_ingress: ingress,
collator: pubkey_bytes.into(),
signature,
head_data,
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 0,
block_data_hash: block_data.hash(),
};
parachain::Collation {
receipt,
block_data,
}
})
}
/// Polkadot-api context.
struct ApiContext;
impl RelayChainContext for ApiContext {
type Error = ();
type FutureEgress = Result<Vec<Vec<Message>>, Self::Error>;
fn routing_parachains(&self) -> BTreeSet<ParaId> {
BTreeSet::new()
}
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
Ok(Vec::new())
}
}
struct CollationNode<P, E> {
parachain_context: P,
exit: E,
para_id: ParaId,
key: Arc<ed25519::Pair>,
}
impl<P, E> Worker for CollationNode<P, E> where
P: ParachainContext + 'static,
E: Future<Item=(),Error=()> + Send + 'static
{
type Work = Box<Future<Item=(),Error=()>>;
type Exit = E;
fn exit_only(self) -> Self::Exit {
self.exit
}
fn work<C: ServiceComponents>(self, service: &Service<C>) -> Self::Work
where ClientError: From<<<<C as ServiceComponents>::Backend as ClientBackend<PolkadotBlock>>::State as StateMachineBackend>::Error>,
{
let CollationNode { parachain_context, exit, para_id, key } = self;
let client = service.client();
let api = service.api();
let work = client.import_notification_stream()
.and_then(move |notification| {
let id = BlockId::hash(notification.hash);
match api.parachain_head(&id, para_id) {
Ok(Some(last_head)) => {
let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context.clone(),
key.clone(),
).map(Some);
future::Either::A(collation_work)
}
Ok(None) => {
info!("Parachain {:?} appears to be inactive. Cannot collate.", id);
future::Either::B(future::ok(None))
}
Err(e) => {
warn!("Could not collate for parachain {:?}: {:?}", id, e);
future::Either::B(future::ok(None)) // returning error would shut down the collation node
}
}
})
.for_each(|_collation: Option<Collation>| {
// TODO: import into network.
Ok(())
});
let work_and_exit = work.select(exit).then(|_| Ok(()));
Box::new(work_and_exit) as Box<_>
}
}
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
/// Provide a future which resolves when the node should exit.
/// This function blocks until done.
pub fn run_collator<P, E>(
parachain_context: P,
para_id: ParaId,
exit: E,
key: Arc<ed25519::Pair>,
args: Vec<::std::ffi::OsString>
) -> polkadot_cli::error::Result<()> where
P: ParachainContext + 'static,
E: IntoFuture<Item=(),Error=()>,
E::Future: Send + 'static,
{
let node_logic = CollationNode { parachain_context, exit: exit.into_future(), para_id, key };
polkadot_cli::run(args, node_logic)
}
#[cfg(test)]
mod tests {
use super::*;
......@@ -170,7 +302,7 @@ mod tests {
}
}
#[test]
#[test]
fn collates_ingress() {
let route_from = |x: &[ParaId]| {
let mut set = BTreeSet::new();
......
......@@ -12,7 +12,6 @@ error-chain = "0.12"
log = "0.3"
exit-future = "0.1"
polkadot-api = { path = "../api" }
polkadot-collator = { path = "../collator" }
polkadot-parachain = { path = "../parachain" }
polkadot-primitives = { path = "../primitives" }
polkadot-runtime = { path = "../runtime" }
......
......@@ -23,18 +23,10 @@ use std::sync::Arc;
use polkadot_api::PolkadotApi;
use polkadot_primitives::{Hash, AccountId, BlockId};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic};
use futures::prelude::*;
/// A full collation.
pub struct Collation {
/// Block data.
pub block_data: BlockData,
/// The candidate receipt itself.
pub receipt: CandidateReceipt,
}
/// Encapsulates connections to collators and allows collation on any parachain.
///
/// This is expected to be a lightweight, shared type like an `Arc`.
......
......@@ -32,7 +32,6 @@
extern crate ed25519;
extern crate parking_lot;
extern crate polkadot_api;
extern crate polkadot_collator as collator;
extern crate polkadot_statement_table as table;
extern crate polkadot_parachain as parachain;
extern crate polkadot_transaction_pool as transaction_pool;
......@@ -79,7 +78,7 @@ use futures::future;
use collation::CollationFetch;
use dynamic_inclusion::DynamicInclusion;
pub use self::collation::{validate_collation, Collators, Collation};
pub use self::collation::{validate_collation, Collators};
pub use self::error::{ErrorKind, Error};
pub use self::shared_table::{SharedTable, StatementProducer, ProducedStatements, Statement, SignedStatement, GenericStatement};
pub use service::Service;
......
......@@ -21,9 +21,8 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use table::{self, Table, Context as TableContextTrait};
use collation::Collation;
use polkadot_primitives::{Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Collation, Extrinsic, CandidateReceipt};
use parking_lot::Mutex;
use futures::{future, prelude::*};
......@@ -470,6 +469,7 @@ mod tests {
let candidate = CandidateReceipt {
parachain_index: para_id,
collator: [1; 32].into(),
signature: Default::default(),
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
......@@ -519,6 +519,7 @@ mod tests {
let candidate = CandidateReceipt {
parachain_index: para_id,
collator: [1; 32].into(),
signature: Default::default(),
head_data: ::polkadot_primitives::parachain::HeadData(vec![1, 2, 3, 4]),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
......
......@@ -23,9 +23,9 @@ use ed25519;
use substrate_network::{self as net, generic_message as msg};
use substrate_network::consensus_gossip::ConsensusMessage;
use polkadot_api::{PolkadotApi, LocalPolkadotApi};
use polkadot_consensus::{Network, SharedTable, Collators, Collation};
use polkadot_consensus::{Network, SharedTable, Collators};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::Id as ParaId;
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use futures::{future, prelude::*};
use futures::sync::mpsc;
......
......@@ -22,6 +22,7 @@ use parking_lot::Mutex;
use polkadot_consensus::GenericStatement;
use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData};
use substrate_primitives::H512;
use codec::Slicable;
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, message::Message as SubstrateMessage, message::Role, specialization::Specialization, generic_message::Message as GenericMessage};
......@@ -144,6 +145,7 @@ fn fetches_from_those_with_knowledge() {
parachain_index: 5.into(),
collator: [255; 32].into(),
head_data: HeadData(vec![9, 9, 9]),
signature: H512::from([1; 64]).into(),
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
fees: 1_000_000,
......
......@@ -134,26 +134,6 @@ impl Slicable for DutyRoster {
#[cfg_attr(feature = "std", serde(deny_unknown_fields))]
pub struct Extrinsic;
/// Candidate parachain block.
///
/// https://github.com/w3f/polkadot-spec/blob/master/spec.md#candidate-para-chain-block
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
#[cfg_attr(feature = "std", serde(rename_all = "camelCase"))]
#[cfg_attr(feature = "std", serde(deny_unknown_fields))]
pub struct Candidate {
/// The ID of the parachain this is a proposal for.
pub parachain_index: Id,
/// Collator's signature
pub collator_signature: CandidateSignature,
/// Unprocessed ingress queue.
///
/// Ordered by parachain ID and block number.
pub unprocessed_ingress: ConsolidatedIngress,
/// Block data
pub block: BlockData,
}
/// Candidate receipt type.
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
......@@ -164,6 +144,8 @@ pub struct CandidateReceipt {
pub parachain_index: Id,
/// The collator's relay-chain account ID
pub collator: super::AccountId,
/// Signature on block data by collator.
pub signature: CandidateSignature,
/// The head-data
pub head_data: HeadData,
/// Balance uploads to the relay chain.
......@@ -182,6 +164,7 @@ impl Slicable for CandidateReceipt {
self.parachain_index.using_encoded(|s| v.extend(s));
self.collator.using_encoded(|s| v.extend(s));
self.signature.using_encoded(|s| v.extend(s));
self.head_data.0.using_encoded(|s| v.extend(s));
self.balance_uploads.using_encoded(|s| v.extend(s));
self.egress_queue_roots.using_encoded(|s| v.extend(s));
......@@ -195,6 +178,7 @@ impl Slicable for CandidateReceipt {
Some(CandidateReceipt {
parachain_index: Slicable::decode(input)?,
collator: Slicable::decode(input)?,
signature: Slicable::decode(input)?,
head_data: Slicable::decode(input).map(HeadData)?,
balance_uploads: Slicable::decode(input)?,
egress_queue_roots: Slicable::decode(input)?,
......@@ -227,6 +211,18 @@ impl Ord for CandidateReceipt {
}
}
/// A full collation.
#[derive(PartialEq, Eq, Clone)]