Commit 3c273882 authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by asynchronous rob
Browse files

Use substrate codec for network messages (#333)

* Use substrate codec for network messages

* Version bump

* Removed redundant format
parent 28c41813
......@@ -222,19 +222,19 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
// TODO [rob]: collation node implementation
// This isn't a thing. Different parachains will have their own collator executables and
// maybe link to libpolkadot to get a light-client.
service::Role::LIGHT
service::Roles::LIGHT
} else if matches.is_present("light") {
info!("Starting (light)");
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
service::Role::LIGHT
service::Roles::LIGHT
} else if matches.is_present("validator") || matches.is_present("dev") {
info!("Starting validator");
config.execution_strategy = service::ExecutionStrategy::Both;
service::Role::AUTHORITY
service::Roles::AUTHORITY
} else {
info!("Starting (heavy)");
config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible;
service::Role::FULL
service::Roles::FULL
};
if let Some(s) = matches.value_of("execution") {
......@@ -303,7 +303,7 @@ pub fn run<I, T, W>(args: I, worker: W) -> error::Result<()> where
None
};
match role == service::Role::LIGHT {
match role == service::Roles::LIGHT {
true => run_until_exit(&mut runtime, service::new_light(config, executor)?, &matches, sys_conf, worker)?,
false => run_until_exit(&mut runtime, service::new_full(config, executor)?, &matches, sys_conf, worker)?,
}
......
......@@ -5,9 +5,6 @@ authors = ["Parity Technologies <admin@parity.io>"]
description = "Polkadot-specific networking protocol"
[dependencies]
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
parking_lot = "0.4"
polkadot-api = { path = "../api" }
polkadot-consensus = { path = "../consensus" }
......
......@@ -18,6 +18,7 @@
use polkadot_primitives::{AccountId, Hash};
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use codec;
use futures::sync::oneshot;
......@@ -27,12 +28,28 @@ use std::time::{Duration, Instant};
const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);
/// The role of the collator. Whether they're the primary or backup for this parachain.
#[derive(PartialEq, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum Role {
/// Primary collators should send collations whenever it's time.
Primary,
Primary = 0,
/// Backup collators should not.
Backup,
Backup = 1,
}
impl codec::Encode for Role {
fn encode_to<T: codec::Output>(&self, dest: &mut T) {
dest.push_byte(*self as u8);
}
}
impl codec::Decode for Role {
fn decode<I: codec::Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
x if x == Role::Primary as u8 => Some(Role::Primary),
x if x == Role::Backup as u8 => Some(Role::Backup),
_ => None,
}
}
}
/// A maintenance action for the collator set.
......
......@@ -26,6 +26,7 @@ use polkadot_api::{PolkadotApi, LocalPolkadotApi};
use polkadot_consensus::{Network, SharedTable, Collators};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation};
use codec::Decode;
use futures::prelude::*;
use futures::sync::mpsc;
......@@ -175,7 +176,7 @@ impl<P: LocalPolkadotApi + Send + Sync + 'static> MessageProcessTask<P> {
}
}
ConsensusMessage::ChainSpecific(msg, _) => {
if let Ok(Message::Statement(parent_hash, statement)) = ::serde_json::from_slice(&msg) {
if let Some(Message::Statement(parent_hash, statement)) = Decode::decode(&mut msg.as_slice()) {
if ::polkadot_consensus::check_statement(&statement.statement, &statement.signature, statement.sender, &parent_hash) {
self.table_router.import_statement(statement);
}
......
......@@ -20,11 +20,6 @@
//! parachain block and extrinsic data fetching, communication between collators and validators,
//! and more.
extern crate serde;
#[macro_use]
extern crate serde_derive;
extern crate serde_json;
extern crate substrate_bft as bft;
extern crate substrate_codec as codec;
extern crate substrate_network;
......@@ -47,7 +42,7 @@ mod collator_pool;
mod router;
pub mod consensus;
use codec::{Decode, Encode};
use codec::{Decode, Encode, Input, Output};
use futures::sync::oneshot;
use parking_lot::Mutex;
use polkadot_consensus::{Statement, SignedStatement, GenericStatement};
......@@ -188,7 +183,6 @@ impl CurrentConsensus {
}
/// Polkadot-specific messages.
#[derive(Serialize, Deserialize)]
pub enum Message {
/// signed statement and localized parent hash.
Statement(Hash, SignedStatement),
......@@ -205,8 +199,58 @@ pub enum Message {
Collation(Hash, Collation),
}
impl Encode for Message {
fn encode_to<T: Output>(&self, dest: &mut T) {
match *self {
Message::Statement(ref h, ref s) => {
dest.push_byte(0);
dest.push(h);
dest.push(s);
}
Message::SessionKey(ref h, ref k) => {
dest.push_byte(1);
dest.push(h);
dest.push(k);
}
Message::RequestBlockData(ref id, ref d) => {
dest.push_byte(2);
dest.push(id);
dest.push(d);
}
Message::BlockData(ref id, ref d) => {
dest.push_byte(3);
dest.push(id);
dest.push(d);
}
Message::CollatorRole(ref r) => {
dest.push_byte(4);
dest.push(r);
}
Message::Collation(ref h, ref c) => {
dest.push_byte(5);
dest.push(h);
dest.push(c);
}
}
}
}
impl Decode for Message {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
match input.read_byte()? {
0 => Some(Message::Statement(Decode::decode(input)?, Decode::decode(input)?)),
1 => Some(Message::SessionKey(Decode::decode(input)?, Decode::decode(input)?)),
2 => Some(Message::RequestBlockData(Decode::decode(input)?, Decode::decode(input)?)),
3 => Some(Message::BlockData(Decode::decode(input)?, Decode::decode(input)?)),
4 => Some(Message::CollatorRole(Decode::decode(input)?)),
5 => Some(Message::Collation(Decode::decode(input)?, Decode::decode(input)?)),
_ => None,
}
}
}
fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) {
let encoded = ::serde_json::to_vec(&message).expect("serialization of messages infallible; qed");
let encoded = message.encode();
ctx.send_message(to, generic_message::Message::ChainSpecific(encoded))
}
......@@ -244,9 +288,7 @@ impl PolkadotProtocol {
/// Send a statement to a validator.
fn send_statement(&mut self, ctx: &mut Context<Block>, _val: SessionKey, parent_hash: Hash, statement: SignedStatement) {
// TODO: something more targeted than gossip.
let raw = ::serde_json::to_vec(&Message::Statement(parent_hash, statement))
.expect("message serialization infallible; qed");
let raw = Message::Statement(parent_hash, statement).encode();
self.consensus_gossip.multicast_chain_specific(ctx, raw, parent_hash);
}
......@@ -427,7 +469,7 @@ impl Specialization<Block> for PolkadotProtocol {
);
}
let validator = status.roles.iter().any(|r| *r == message::Role::Authority);
let validator = status.roles.contains(substrate_network::Roles::AUTHORITY);
let send_key = validator || local_status.collating_for.is_some();
self.peers.insert(peer_id, PeerInfo {
......@@ -436,7 +478,7 @@ impl Specialization<Block> for PolkadotProtocol {
validator,
});
self.consensus_gossip.new_peer(ctx, peer_id, &status.roles);
self.consensus_gossip.new_peer(ctx, peer_id, status.roles);
if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) {
send_polkadot_message(
ctx,
......@@ -497,11 +539,11 @@ impl Specialization<Block> for PolkadotProtocol {
self.consensus_gossip.on_bft_message(ctx, peer_id, msg)
}
generic_message::Message::ChainSpecific(raw) => {
match serde_json::from_slice(&raw) {
Ok(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
Err(e) => {
trace!(target: "p_net", "Bad message from {}: {}", peer_id, e);
ctx.disable_peer(peer_id, "Unknown Polkadot-protocol reason");
match Message::decode(&mut raw.as_slice()) {
Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg),
None => {
trace!(target: "p_net", "Bad message from {}", peer_id);
ctx.disable_peer(peer_id, "Invalid polkadot protocol message format");
}
}
}
......
......@@ -24,7 +24,7 @@ use polkadot_primitives::{Block, Hash, SessionKey};
use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData};
use substrate_primitives::H512;
use codec::Encode;
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, message::Message as SubstrateMessage, message::Role, specialization::Specialization, generic_message::Message as GenericMessage};
use substrate_network::{PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage};
use std::sync::Arc;
use futures::Future;
......@@ -62,7 +62,7 @@ impl TestContext {
fn has_message(&self, to: PeerId, message: Message) -> bool {
use substrate_network::generic_message::Message as GenericMessage;
let encoded = ::serde_json::to_vec(&message).unwrap();
let encoded = message.encode();
self.messages.iter().any(|&(ref peer, ref msg)| match msg {
GenericMessage::ChainSpecific(ref data) => peer == &to && data == &encoded,
_ => false,
......@@ -70,7 +70,7 @@ impl TestContext {
}
}
fn make_status(status: &Status, roles: Vec<Role>) -> FullStatus {
fn make_status(status: &Status, roles: Roles) -> FullStatus {
FullStatus {
version: 1,
roles,
......@@ -78,9 +78,6 @@ fn make_status(status: &Status, roles: Vec<Role>) -> FullStatus {
best_hash: Default::default(),
genesis_hash: Default::default(),
chain_status: status.encode(),
parachain_id: None,
validator_id: None,
validator_signature: None,
}
}
......@@ -97,7 +94,7 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus
}
fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) {
let encoded = ::serde_json::to_vec(&message).unwrap();
let encoded = message.encode();
protocol.on_message(ctx, from, GenericMessage::ChainSpecific(encoded));
}
......@@ -115,7 +112,7 @@ fn sends_session_key() {
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, Roles::AUTHORITY));
assert!(ctx.messages.is_empty());
}
......@@ -129,7 +126,7 @@ fn sends_session_key() {
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![]));
protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, Roles::NONE));
assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key)));
}
}
......@@ -171,7 +168,7 @@ fn fetches_from_those_with_knowledge() {
// connect peer A
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_a, make_status(&status, Roles::AUTHORITY));
assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key)));
}
......@@ -187,7 +184,7 @@ fn fetches_from_those_with_knowledge() {
// peer B connects and sends session key. request already assigned to A
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority]));
protocol.on_connect(&mut ctx, peer_b, make_status(&status, Roles::AUTHORITY));
on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(parent_hash, b_key));
assert!(!ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash)));
......@@ -220,7 +217,7 @@ fn remove_bad_collator() {
{
let mut ctx = TestContext::default();
protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![]));
protocol.on_connect(&mut ctx, peer_id, make_status(&status, Roles::NONE));
}
{
......
......@@ -224,6 +224,22 @@ pub struct Collation {
pub receipt: CandidateReceipt,
}
impl Decode for Collation {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(Collation {
block_data: Decode::decode(input)?,
receipt: Decode::decode(input)?,
})
}
}
impl Encode for Collation {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.block_data);
dest.push(&self.receipt);
}
}
/// Parachain ingress queue message.
#[derive(PartialEq, Eq, Clone)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
......@@ -253,6 +269,18 @@ impl BlockData {
}
}
impl Decode for BlockData {
fn decode<I: Input>(input: &mut I) -> Option<Self> {
Some(BlockData(Decode::decode(input)?))
}
}
impl Encode for BlockData {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.0);
}
}
/// Parachain header raw bytes wrapper type.
#[derive(PartialEq, Eq)]
#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug))]
......
......@@ -52,7 +52,7 @@ use client::Client;
use polkadot_network::{PolkadotProtocol, consensus::ConsensusNetwork};
use tokio::runtime::TaskExecutor;
pub use service::{Configuration, Role, PruningMode, ExtrinsicPoolOptions,
pub use service::{Configuration, Roles, PruningMode, ExtrinsicPoolOptions,
ErrorKind, Error, ComponentBlock, LightComponents, FullComponents};
pub use client::ExecutionStrategy;
......@@ -166,7 +166,7 @@ pub fn new_light(config: Configuration<GenesisConfig>, executor: TaskExecutor)
pub fn new_full(config: Configuration<GenesisConfig>, executor: TaskExecutor)
-> Result<Service<FullComponents<Factory>>, Error>
{
let is_validator = (config.roles & Role::AUTHORITY) == Role::AUTHORITY;
let is_validator = (config.roles & Roles::AUTHORITY) == Roles::AUTHORITY;
let service = service::Service::<FullComponents<Factory>>::new(config, executor.clone())?;
// Spin consensus service if configured
let consensus = if is_validator {
......
......@@ -7,5 +7,3 @@ authors = ["Parity Technologies <admin@parity.io>"]
substrate-codec = { path = "../../substrate/codec" }
substrate-primitives = { path = "../../substrate/primitives" }
polkadot-primitives = { path = "../primitives" }
serde = "1.0"
serde_derive = "1.0"
......@@ -70,7 +70,7 @@ pub trait Context {
}
/// Statements circulated among peers.
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum Statement<C, D> {
/// Broadcast by a authority to indicate that this is his candidate for
/// inclusion.
......@@ -141,7 +141,7 @@ impl<C: Decode, D: Decode> Decode for Statement<C, D> {
}
/// A signed statement.
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct SignedStatement<C, D, V, S> {
/// The statement.
pub statement: Statement<C, D>,
......@@ -151,6 +151,23 @@ pub struct SignedStatement<C, D, V, S> {
pub sender: V,
}
impl<C: Encode, D: Encode, V: Encode, S: Encode> Encode for SignedStatement<C, D, V, S> {
fn encode_to<T: Output>(&self, dest: &mut T) {
dest.push(&self.statement);
dest.push(&self.signature);
dest.push(&self.sender);
}
}
impl<C: Decode, D: Decode, V: Decode, S: Decode> Decode for SignedStatement<C, D, V, S> {
fn decode<I: Input>(value: &mut I) -> Option<Self> {
Some(SignedStatement {
statement: Decode::decode(value)?,
signature: Decode::decode(value)?,
sender: Decode::decode(value)?,
})
}
}
/// Misbehavior: voting more than one way on candidate validity.
///
/// Since there are three possible ways to vote, a double vote is possible in
......
......@@ -18,10 +18,6 @@ extern crate substrate_codec as codec;
extern crate substrate_primitives;
extern crate polkadot_primitives as primitives;
extern crate serde;
#[macro_use]
extern crate serde_derive;
pub mod generic;
pub use generic::Table;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment