Commit 5b9b95a8 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Collator-side of collator protocol (#351)

* skeleton of collators object

* awaiting and handling collations. rename `collators` to CollationPool

* add some tests

* add tests

* implement Collators trait for ConsensusNetwork

* plug collators into main polkadot-network

* ignore collator role message

* add a couple more tests

* garbage collection for collations

* extract session-key tracking from consensus

* add local_collations.rs

* finish polish of local_collations

* integrate local_collations into network layer

* introduce API for adding local collations

* mostly finish collator implementation pending service fix

* Specialized network()

* push collations to the network

* grumbles

* substrate-service has custom configuration

* initialize network in collator mode as necessary
parent 80f62b9d
......@@ -29,10 +29,6 @@ args:
value_name: KEY
help: Specify node secret key (64-character hex string)
takes_value: true
- collator:
long: collator
help: Enable collator mode
takes_value: false
- validator:
long: validator
help: Enable validator mode
......
......@@ -71,7 +71,7 @@ pub use client::error::Error as ClientError;
pub use client::backend::Backend as ClientBackend;
pub use state_machine::Backend as StateMachineBackend;
pub use polkadot_primitives::Block as PolkadotBlock;
pub use service::{Components as ServiceComponents, Service};
pub use service::{Components as ServiceComponents, Service, CustomConfiguration};
use std::io::{self, Write, Read, stdin, stdout};
use std::fs::File;
......@@ -134,11 +134,16 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf {
pub trait Worker {
/// A future that resolves when the work is done or the node should exit.
/// This will be run on a tokio runtime.
type Work: Future<Item=(),Error=()>;
type Work: Future<Item=(),Error=()> + Send + 'static;
/// An exit scheduled for the future.
type Exit: Future<Item=(),Error=()> + Send + 'static;
/// Return configuration for the polkadot node.
// TODO: make this the full configuration, so embedded nodes don't need
// string CLI args
fn configuration(&self) -> CustomConfiguration { Default::default() }
/// Don't work, but schedule an exit.
fn exit_only(self) -> Self::Exit;
......@@ -217,13 +222,7 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
};
let role =
if matches.is_present("collator") {
info!("Starting collator");
// TODO [rob]: collation node implementation
// This isn't a thing. Different parachains will have their own collator executables and
// maybe link to libpolkadot to get a light-client.
service::Roles::LIGHT
} else if matches.is_present("light") {
if matches.is_present("light") {
info!("Starting (light)");
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
service::Roles::LIGHT
......@@ -262,9 +261,10 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
config.network.net_config_path = config.network.config_path.clone();
let port = match matches.value_of("port") {
Some(port) => port.parse().expect("Invalid p2p port value specified."),
Some(port) => port.parse().map_err(|_| "Invalid p2p port value specified.")?,
None => 30333,
};
config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port));
config.network.public_address = None;
config.network.client_version = format!("parity-polkadot/{}", crate_version!());
......@@ -275,6 +275,8 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
};
}
config.custom = worker.configuration();
config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect();
if matches.is_present("dev") {
config.keys.push("Alice".into());
......@@ -494,7 +496,7 @@ fn run_until_exit<C, W>(
)
};
let _ = worker.work(&service).wait();
let _ = runtime.block_on(worker.work(&service));
exit_send.fire();
Ok(())
}
......
......@@ -15,3 +15,4 @@ polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-cli = { path = "../cli" }
log = "0.4"
ed25519 = { path = "../../substrate/ed25519" }
tokio = "0.1.7"
......@@ -49,6 +49,7 @@ extern crate substrate_client as client;
extern crate substrate_codec as codec;
extern crate substrate_primitives as primitives;
extern crate ed25519;
extern crate tokio;
extern crate polkadot_api;
extern crate polkadot_cli;
......@@ -58,16 +59,20 @@ extern crate polkadot_primitives;
#[macro_use]
extern crate log;
use std::collections::{BTreeSet, BTreeMap};
use std::collections::{BTreeSet, BTreeMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use futures::{future, stream, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use polkadot_api::PolkadotApi;
use polkadot_primitives::BlockId;
use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId};
use polkadot_cli::{ServiceComponents, Service};
use polkadot_primitives::{AccountId, BlockId, SessionKey};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId};
use polkadot_cli::{ServiceComponents, Service, CustomConfiguration};
use polkadot_cli::Worker;
use tokio::timer::Deadline;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
/// Parachain context needed for collation.
///
......@@ -99,6 +104,11 @@ pub trait RelayChainContext {
fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress;
}
fn key_to_account_id(key: &ed25519::Pair) -> AccountId {
let pubkey_bytes: [u8; 32] = key.public().into();
pubkey_bytes.into()
}
/// Collate the necessary ingress queue using the given context.
pub fn collate_ingress<'a, R>(relay_context: R)
-> impl Future<Item=ConsolidatedIngress, Error=R::Error> + 'a
......@@ -159,11 +169,10 @@ pub fn collate<'a, R, P>(
let block_data_hash = block_data.hash();
let signature = key.sign(&block_data_hash.0[..]).into();
let pubkey_bytes: [u8; 32] = key.public().into();
let receipt = parachain::CandidateReceipt {
parachain_index: local_id,
collator: pubkey_bytes.into(),
collator: key_to_account_id(&*key),
signature,
head_data,
balance_uploads: Vec::new(),
......@@ -183,7 +192,7 @@ pub fn collate<'a, R, P>(
struct ApiContext;
impl RelayChainContext for ApiContext {
type Error = ();
type Error = ::polkadot_api::Error;
type FutureEgress = Result<Vec<Vec<Message>>, Self::Error>;
fn routing_parachains(&self) -> BTreeSet<ParaId> {
......@@ -203,12 +212,21 @@ struct CollationNode<P, E> {
}
impl<P, E> Worker for CollationNode<P, E> where
P: ParachainContext + 'static,
P: ParachainContext + Send + 'static,
E: Future<Item=(),Error=()> + Send + 'static
{
type Work = Box<Future<Item=(),Error=()>>;
type Work = Box<Future<Item=(),Error=()> + Send>;
type Exit = E;
fn configuration(&self) -> CustomConfiguration {
let mut config = CustomConfiguration::default();
config.collating_for = Some((
key_to_account_id(&*self.key),
self.para_id.clone(),
));
config
}
fn exit_only(self) -> Self::Exit {
self.exit
}
......@@ -217,35 +235,66 @@ impl<P, E> Worker for CollationNode<P, E> where
let CollationNode { parachain_context, exit, para_id, key } = self;
let client = service.client();
let api = service.api();
let network = service.network();
let work = client.import_notification_stream()
.and_then(move |notification| {
let id = BlockId::hash(notification.hash);
match api.parachain_head(&id, para_id) {
Ok(Some(last_head)) => {
let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context.clone(),
key.clone(),
).map(Some);
future::Either::A(collation_work)
}
Ok(None) => {
info!("Parachain {:?} appears to be inactive. Cannot collate.", id);
future::Either::B(future::ok(None))
.for_each(move |notification| {
macro_rules! try_fr {
($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return future::Either::A(future::err(e)),
}
}
}
let relay_parent = notification.hash;
let id = BlockId::hash(relay_parent);
let network = network.clone();
let api = api.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();
let work = future::lazy(move || {
let last_head = match try_fr!(api.parachain_head(&id, para_id)) {
Some(last_head) => last_head,
None => return future::Either::A(future::ok(())),
};
let targets = compute_targets(
para_id,
try_fr!(api.session_keys(&id)).as_slice(),
try_fr!(api.duty_roster(&id)),
);
let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
parachain_context,
key,
).map(move |collation| {
network.with_spec(|spec, ctx| spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
));
});
future::Either::B(collation_work)
});
let deadlined = Deadline::new(work, Instant::now() + COLLATION_TIMEOUT);
let silenced = deadlined.then(|res| match res {
Ok(()) => Ok(()),
Err(e) => {
warn!("Could not collate for parachain {:?}: {:?}", id, e);
future::Either::B(future::ok(None)) // returning error would shut down the collation node
warn!("Collation failure: {}", e);
Ok(())
}
}
})
.for_each(|_collation: Option<Collation>| {
// TODO: import into network.
});
tokio::spawn(silenced);
Ok(())
});
......@@ -254,6 +303,16 @@ impl<P, E> Worker for CollationNode<P, E> where
}
}
fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet<SessionKey> {
use polkadot_primitives::parachain::Chain;
roster.validator_duty.iter().enumerate()
.filter(|&(_, c)| c == &Chain::Parachain(para_id))
.filter_map(|(i, _)| session_keys.get(i))
.cloned()
.collect()
}
/// Run a collator node with the given `RelayChainContext` and `ParachainContext` and
/// arguments to the underlying polkadot node.
///
......@@ -266,7 +325,7 @@ pub fn run_collator<P, E>(
key: Arc<ed25519::Pair>,
args: Vec<::std::ffi::OsString>
) -> polkadot_cli::error::Result<()> where
P: ParachainContext + 'static,
P: ParachainContext + Send + 'static,
E: IntoFuture<Item=(),Error=()>,
E::Future: Send + 'static,
{
......
......@@ -285,7 +285,6 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> Network for ConsensusNetwork<P
knowledge,
parent_hash,
local_session_key,
session_keys: Default::default(),
});
MessageProcessTask {
......
......@@ -39,6 +39,7 @@ extern crate rhododendron;
extern crate log;
mod collator_pool;
mod local_collations;
mod router;
pub mod consensus;
......@@ -54,6 +55,7 @@ use substrate_network::{message, generic_message};
use substrate_network::specialization::Specialization;
use substrate_network::StatusMessage as GenericFullStatus;
use self::collator_pool::{CollatorPool, Role, Action};
use self::local_collations::LocalCollations;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
......@@ -110,9 +112,9 @@ struct BlockDataRequest {
}
struct PeerInfo {
status: Status,
validator: bool,
session_keys: HashMap<Hash, SessionKey>,
collating_for: Option<(AccountId, ParaId)>,
validator_key: Option<SessionKey>,
claimed_validator: bool,
}
#[derive(Default)]
......@@ -164,7 +166,6 @@ impl Knowledge {
struct CurrentConsensus {
knowledge: Arc<Mutex<Knowledge>>,
parent_hash: Hash,
session_keys: HashMap<SessionKey, PeerId>,
local_session_key: SessionKey,
}
......@@ -174,12 +175,6 @@ impl CurrentConsensus {
self.knowledge.lock().candidates.get(hash)
.and_then(|entry| entry.block_data.clone())
}
fn peer_disconnected(&mut self, peer: &PeerInfo) {
if let Some(key) = peer.session_keys.get(&self.parent_hash) {
self.session_keys.remove(key);
}
}
}
/// Polkadot-specific messages.
......@@ -187,9 +182,9 @@ impl CurrentConsensus {
pub enum Message {
/// signed statement and localized parent hash.
Statement(Hash, SignedStatement),
/// Tell the peer your session key for the current block.
// TODO: do this with a random challenge protocol
SessionKey(Hash, SessionKey),
/// As a validator, tell the peer your current session key.
// TODO: do this with a cryptographic proof of some kind
SessionKey(SessionKey),
/// Requesting parachain block data by candidate hash.
RequestBlockData(RequestId, Hash),
/// Provide block data by candidate hash or nothing if unknown.
......@@ -208,9 +203,8 @@ impl Encode for Message {
dest.push(h);
dest.push(s);
}
Message::SessionKey(ref h, ref k) => {
Message::SessionKey(ref k) => {
dest.push_byte(1);
dest.push(h);
dest.push(k);
}
Message::RequestBlockData(ref id, ref d) => {
......@@ -240,7 +234,7 @@ impl Decode for Message {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(Message::Statement(Decode::decode(input)?, Decode::decode(input)?)),
1 => Some(Message::SessionKey(Decode::decode(input)?, Decode::decode(input)?)),
1 => Some(Message::SessionKey(Decode::decode(input)?)),
2 => Some(Message::RequestBlockData(Decode::decode(input)?, Decode::decode(input)?)),
3 => Some(Message::BlockData(Decode::decode(input)?, Decode::decode(input)?)),
4 => Some(Message::CollatorRole(Decode::decode(input)?)),
......@@ -259,27 +253,27 @@ fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message)
/// Polkadot protocol attachment for substrate.
pub struct PolkadotProtocol {
peers: HashMap<PeerId, PeerInfo>,
collating_for: Option<(AccountId, ParaId)>,
consensus_gossip: ConsensusGossip<Block>,
collators: CollatorPool,
validators: HashMap<SessionKey, PeerId>,
local_collations: LocalCollations<Collation>,
live_consensus: Option<CurrentConsensus>,
in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>,
pending: Vec<BlockDataRequest>,
next_req_id: u64,
}
impl Default for PolkadotProtocol {
fn default() -> Self {
Self::new()
}
}
impl PolkadotProtocol {
/// Instantiate a polkadot protocol handler.
pub fn new() -> Self {
pub fn new(collating_for: Option<(AccountId, ParaId)>) -> Self {
PolkadotProtocol {
peers: HashMap::new(),
consensus_gossip: ConsensusGossip::new(),
collators: CollatorPool::new(),
collating_for,
validators: HashMap::new(),
local_collations: LocalCollations::new(),
live_consensus: None,
in_flight: HashMap::new(),
pending: Vec::new(),
......@@ -311,31 +305,23 @@ impl PolkadotProtocol {
}
/// Note new consensus session.
fn new_consensus(&mut self, ctx: &mut Context<Block>, mut consensus: CurrentConsensus) {
let parent_hash = consensus.parent_hash;
let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash);
// TODO: optimize for when session key changes and only send to collators who are relevant in next few blocks.
for (id, info) in self.peers.iter_mut()
.filter(|&(_, ref info)| info.validator || info.status.collating_for.is_some())
{
send_polkadot_message(
ctx,
*id,
Message::SessionKey(parent_hash, consensus.local_session_key)
);
fn new_consensus(&mut self, ctx: &mut Context<Block>, consensus: CurrentConsensus) {
let old_data = self.live_consensus.as_ref().map(|c| (c.parent_hash, c.local_session_key));
if let Some(key) = info.session_keys.get(&parent_hash) {
consensus.session_keys.insert(*key, *id);
}
if let Some(ref old_parent) = old_parent {
info.session_keys.remove(old_parent);
if Some(&consensus.local_session_key) != old_data.as_ref().map(|&(_, ref key)| key) {
for (id, _) in self.peers.iter()
.filter(|&(_, ref info)| info.claimed_validator || info.collating_for.is_some())
{
send_polkadot_message(
ctx,
*id,
Message::SessionKey(consensus.local_session_key)
);
}
}
self.live_consensus = Some(consensus);
self.consensus_gossip.collect_garbage(old_parent.as_ref());
self.consensus_gossip.collect_garbage(old_data.as_ref().map(|&(ref hash, _)| hash));
}
fn dispatch_pending_requests(&mut self, ctx: &mut Context<Block>) {
......@@ -359,8 +345,9 @@ impl PolkadotProtocol {
continue;
}
let validator_keys = &mut self.validators;
let next_peer = entry.knows_block_data.iter()
.filter_map(|x| consensus.session_keys.get(x).map(|id| (*x, *id)))
.filter_map(|x| validator_keys.get(x).map(|id| (*x, *id)))
.find(|&(ref key, _)| pending.attempted_peers.insert(*key))
.map(|(_, id)| id);
......@@ -392,29 +379,7 @@ impl PolkadotProtocol {
match msg {
Message::Statement(parent_hash, _statement) =>
self.consensus_gossip.on_chain_specific(ctx, peer_id, raw, parent_hash),
Message::SessionKey(parent_hash, key) => {
{
let info = match self.peers.get_mut(&peer_id) {
Some(peer) => peer,
None => return,
};
if !info.validator {
ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason");
return;
}
match self.live_consensus {
Some(ref mut consensus) if consensus.parent_hash == parent_hash => {
consensus.session_keys.insert(key, peer_id);
}
_ => {}
}
info.session_keys.insert(parent_hash, key);
}
self.dispatch_pending_requests(ctx);
}
Message::SessionKey(key) => self.on_session_key(ctx, peer_id, key),
Message::RequestBlockData(req_id, hash) => {
let block_data = self.live_consensus.as_ref()
.and_then(|c| c.block_data(&hash));
......@@ -423,8 +388,41 @@ impl PolkadotProtocol {
}
Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data),
Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation),
Message::CollatorRole(_) => {},
Message::CollatorRole(role) => self.on_new_role(ctx, peer_id, role),
}
}
fn on_session_key(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, key: SessionKey) {
{
let info = match self.peers.get_mut(&peer_id) {
Some(peer) => peer,
None => {
trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id);
return
}
};
if !info.claimed_validator {
ctx.disable_peer(peer_id, "Session key broadcasted without setting authority role");
return;
}
if let Some(old_key) = ::std::mem::replace(&mut info.validator_key, Some(key)) {
self.validators.remove(&old_key);
for (relay_parent, collation) in self.local_collations.fresh_key(&old_key, &key) {
send_polkadot_message(
ctx,
peer_id,
Message::Collation(relay_parent, collation),
)
}
}
self.validators.insert(key, peer_id);
}
self.dispatch_pending_requests(ctx);
}
fn on_block_data(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, req_id: RequestId, data: Option<BlockData>) {
......@@ -440,14 +438,39 @@ impl PolkadotProtocol {
self.pending.push(req);
self.dispatch_pending_requests(ctx);
}
None => ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason"),
None => ctx.disable_peer(peer_id, "Unexpected block data response"),
}
}
// when a validator sends us (a collator) a new role.
fn on_new_role(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, role: Role) {
let info = match self.peers.get(&peer_id) {
Some(peer) => peer,
None => {
trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id);
return
}
};
match info.validator_key {
None => ctx.disable_peer(
peer_id,
"Sent collator role without registering first as validator",
),
Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) {
send_polkadot_message(
ctx,
peer_id,
Message::Collation(relay_parent, collation),
)
},
}
}
}
impl Specialization<Block> for PolkadotProtocol {
fn status(&self) -> Vec<u8> {
Status { collating_for: None }.encode()
Status { collating_for: self.collating_for.clone() }.encode()
}
fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, status: FullStatus) {
......@@ -460,7 +483,7 @@ impl Specialization<Block> for PolkadotProtocol {
if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
if self.collator_peer_id(acc_id.clone()).is_some() {
ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason");
ctx.disconnect_peer(peer_id);
return
}
......@@ -476,9 +499,9 @@ impl Specialization<Block> for PolkadotProtocol {
let send_key = validator || local_status.collating_for.is_some();
self.peers.insert(peer_id, PeerInfo {
status: local_status,
session_keys: Default::default(),
validator,
collating_for: local_status.collating_for,
validator_key: None,
claimed_validator: validator,
});
self.consensus_gossip.new_peer(ctx, peer_id, status.roles);
......@@ -486,7 +509,7 @@ impl Specialization<Block> for PolkadotProtocol {
send_polkadot_message(
ctx,
peer_id,
Message::SessionKey(consensus.parent_hash, consensus.local_session_key)
Message::SessionKey(consensus.local_session_key)
);
}
......@@ -495,7 +518,7 @@ impl Specialization<Block> for PolkadotProtocol {
fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId) {