Commit 222c6c23 authored by asynchronous rob's avatar asynchronous rob Committed by Bastian Köcher
Browse files

Update to new gossip system. (#172)

* Integrates new gossip system into Polkadot (#166)

* new gossip validation in network

* integrate new gossip into service

* Fix build

* Fix claims module

* fix warning

* update to latest master again

* update runtime
parent 6e9ab774
Pipeline #32276 passed with stages
in 12 minutes and 13 seconds
This diff is collapsed.
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Gossip messages and the message validator
use substrate_network::consensus_gossip::{
self as network_gossip, ValidationResult as GossipValidationResult,
};
use polkadot_validation::SignedStatement;
use polkadot_primitives::{Hash, SessionKey};
use codec::Decode;
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::RwLock;
use super::NetworkService;
/// The engine ID of the polkadot attestation system.
pub const POLKADOT_ENGINE_ID: substrate_network::ConsensusEngineId = [b'd', b'o', b't', b'1'];
/// A gossip message.
#[derive(Encode, Decode, Clone)]
pub(crate) struct GossipMessage {
/// The relay chain parent hash.
pub(crate) relay_parent: Hash,
/// The signed statement being gossipped.
pub(crate) statement: SignedStatement,
}
/// whether a block is known.
pub enum Known {
/// The block is a known leaf.
Leaf,
/// The block is known to be old.
Old,
/// The block is known to be bad.
Bad,
}
/// An oracle for known blocks.
pub trait KnownOracle: Send + Sync {
/// whether a block is known. If it's not, returns `None`.
fn is_known(&self, block_hash: &Hash) -> Option<Known>;
}
impl<F> KnownOracle for F where F: Fn(&Hash) -> Option<Known> + Send + Sync {
fn is_known(&self, block_hash: &Hash) -> Option<Known> {
(self)(block_hash)
}
}
/// Register a gossip validator on the network service.
///
/// This returns a `RegisteredMessageValidator`
// NOTE: since RegisteredMessageValidator is meant to be a type-safe proof
// that we've actually done the registration, this should be the only way
// to construct it outside of tests.
pub fn register_validator<O: KnownOracle + 'static>(
service: &NetworkService,
oracle: O,
) -> RegisteredMessageValidator {
let validator = Arc::new(MessageValidator {
live_session: RwLock::new(HashMap::new()),
oracle,
});
let gossip_side = validator.clone();
service.with_gossip(|gossip, _| gossip.register_validator(POLKADOT_ENGINE_ID, gossip_side));
RegisteredMessageValidator { inner: validator as _ }
}
/// A registered message validator.
///
/// Create this using `register_validator`.
#[derive(Clone)]
pub struct RegisteredMessageValidator {
inner: Arc<MessageValidator<KnownOracle>>,
}
impl RegisteredMessageValidator {
#[cfg(test)]
pub(crate) fn new_test<O: KnownOracle + 'static>(oracle: O) -> Self {
let validator = Arc::new(MessageValidator {
live_session: RwLock::new(HashMap::new()),
oracle,
});
RegisteredMessageValidator { inner: validator as _ }
}
/// Note a live attestation session. This must be removed later with
/// `remove_session`.
pub(crate) fn note_session(&self, relay_parent: Hash, validation: MessageValidationData) {
self.inner.live_session.write().insert(relay_parent, validation);
}
/// Remove a live attestation session when it is no longer live.
pub(crate) fn remove_session(&self, relay_parent: &Hash) {
self.inner.live_session.write().remove(relay_parent);
}
}
// data needed for validating gossip.
pub(crate) struct MessageValidationData {
/// The authorities at a block.
pub(crate) authorities: Vec<SessionKey>,
}
impl MessageValidationData {
fn check_statement(&self, relay_parent: &Hash, statement: &SignedStatement) -> bool {
self.authorities.contains(&statement.sender) &&
::polkadot_validation::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
relay_parent,
)
}
}
/// An unregistered message validator. Register this with `register_validator`.
pub struct MessageValidator<O: ?Sized> {
live_session: RwLock<HashMap<Hash, MessageValidationData>>,
oracle: O,
}
impl<O: KnownOracle + ?Sized> network_gossip::Validator<Hash> for MessageValidator<O> {
fn validate(&self, mut data: &[u8]) -> GossipValidationResult<Hash> {
match GossipMessage::decode(&mut data) {
Some(GossipMessage { relay_parent, statement }) => {
let live = self.live_session.read();
let topic = || ::router::attestation_topic(relay_parent.clone());
if let Some(validation) = live.get(&relay_parent) {
if validation.check_statement(&relay_parent, &statement) {
GossipValidationResult::Valid(topic())
} else {
GossipValidationResult::Invalid
}
} else {
match self.oracle.is_known(&relay_parent) {
None | Some(Known::Leaf) => GossipValidationResult::Future(topic()),
Some(Known::Old) => GossipValidationResult::Expired,
Some(Known::Bad) => GossipValidationResult::Invalid,
}
}
}
None => {
debug!(target: "validation", "Error decoding gossip message");
GossipValidationResult::Invalid
}
}
}
}
......@@ -50,6 +50,7 @@ mod collator_pool;
mod local_collations;
mod router;
pub mod validation;
pub mod gossip;
use codec::{Decode, Encode};
use futures::sync::oneshot;
......
......@@ -41,12 +41,14 @@ use std::collections::{hash_map::{Entry, HashMap}, HashSet};
use std::{io, mem};
use std::sync::Arc;
use gossip::{RegisteredMessageValidator};
use validation::{NetworkService, Knowledge, Executor};
type IngressPair = (ParaId, Vec<Message>);
type IngressPairRef<'a> = (ParaId, &'a [Message]);
fn attestation_topic(parent_hash: Hash) -> Hash {
/// Compute the gossip topic for attestations on the given parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
v.extend(b"attestations");
......@@ -124,6 +126,7 @@ pub struct Router<P, E, N: NetworkService, T> {
knowledge: Arc<Mutex<Knowledge>>,
fetch_incoming: Arc<Mutex<HashMap<ParaId, IncomingReceiver>>>,
deferred_statements: Arc<Mutex<DeferredStatements>>,
message_validator: RegisteredMessageValidator,
}
impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
......@@ -135,6 +138,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
parent_hash: Hash,
knowledge: Arc<Mutex<Knowledge>>,
exit: E,
message_validator: RegisteredMessageValidator,
) -> Self {
Router {
table,
......@@ -147,6 +151,7 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
fetch_incoming: Arc::new(Mutex::new(HashMap::new())),
deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())),
exit,
message_validator,
}
}
......@@ -169,6 +174,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
fetch_incoming: self.fetch_incoming.clone(),
knowledge: self.knowledge.clone(),
exit: self.exit.clone(),
message_validator: self.message_validator.clone(),
}
}
}
......@@ -392,6 +398,7 @@ impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
));
}
}
self.message_validator.remove_session(&parent_hash);
}
}
......
......@@ -79,6 +79,7 @@ impl TestContext {
fn make_status(status: &Status, roles: Roles) -> FullStatus {
FullStatus {
version: 1,
min_supported_version: 1,
roles,
best_number: 0,
best_hash: Default::default(),
......
......@@ -17,7 +17,7 @@
//! Tests and helpers for validation networking.
use validation::NetworkService;
use substrate_network::{consensus_gossip::ConsensusMessage, Context as NetContext};
use substrate_network::Context as NetContext;
use substrate_primitives::{Ed25519AuthorityId, NativeOrEncoded};
use substrate_keyring::Keyring;
use {PolkadotProtocol};
......@@ -51,21 +51,21 @@ impl Future for NeverExit {
}
struct GossipRouter {
incoming_messages: mpsc::UnboundedReceiver<(Hash, ConsensusMessage)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
messages: Vec<(Hash, ConsensusMessage)>,
incoming_messages: mpsc::UnboundedReceiver<(Hash, Vec<u8>)>,
incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
outgoing: Vec<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
messages: Vec<(Hash, Vec<u8>)>,
}
impl GossipRouter {
fn add_message(&mut self, topic: Hash, message: ConsensusMessage) {
fn add_message(&mut self, topic: Hash, message: Vec<u8>) {
self.outgoing.retain(|&(ref o_topic, ref sender)| {
o_topic != &topic || sender.unbounded_send(message.clone()).is_ok()
});
self.messages.push((topic, message));
}
fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<ConsensusMessage>) {
fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<Vec<u8>>) {
for message in self.messages.iter()
.filter(|&&(ref t, _)| t == &topic)
.map(|&(_, ref msg)| msg.clone())
......@@ -105,8 +105,8 @@ impl Future for GossipRouter {
#[derive(Clone)]
struct GossipHandle {
send_message: mpsc::UnboundedSender<(Hash, ConsensusMessage)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<ConsensusMessage>)>,
send_message: mpsc::UnboundedSender<(Hash, Vec<u8>)>,
send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<Vec<u8>>)>,
}
fn make_gossip() -> (GossipRouter, GossipHandle) {
......@@ -130,13 +130,13 @@ struct TestNetwork {
}
impl NetworkService for TestNetwork {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
let (tx, rx) = mpsc::unbounded();
let _ = self.gossip.send_listener.unbounded_send((topic, tx));
rx
}
fn gossip_message(&self, topic: Hash, message: ConsensusMessage) {
fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
let _ = self.gossip.send_message.unbounded_send((topic, message));
}
......@@ -322,9 +322,14 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
gossip: gossip_handle.clone(),
});
let message_val = crate::gossip::RegisteredMessageValidator::new_test(
|_hash: &_| Some(crate::gossip::Known::Leaf),
);
TestValidationNetwork::new(
net,
NeverExit,
message_val,
runtime_api.clone(),
executor.clone(),
)
......@@ -427,15 +432,22 @@ fn ingress_fetch_works() {
let parent_hash = [1; 32].into();
let (router_a, router_b, router_c) = {
let validators: Vec<Hash> = vec![
key_a.to_raw_public().into(),
key_b.to_raw_public().into(),
key_c.to_raw_public().into(),
];
let authorities: Vec<_> = validators.iter().cloned()
.map(|h| h.to_fixed_bytes())
.map(Ed25519AuthorityId)
.collect();
let mut api_handle = built.api_handle.lock();
*api_handle = ApiData {
active_parachains: vec![id_a, id_b, id_c],
duties: vec![Chain::Parachain(id_a), Chain::Parachain(id_b), Chain::Parachain(id_c)],
validators: vec![
key_a.to_raw_public().into(),
key_b.to_raw_public().into(),
key_c.to_raw_public().into(),
],
validators,
ingress,
};
......@@ -443,14 +455,17 @@ fn ingress_fetch_works() {
built.networks[0].communication_for(
make_table(&*api_handle, &key_a, parent_hash),
vec![MessagesFrom::from_messages(id_a, messages_from_a)],
&authorities,
),
built.networks[1].communication_for(
make_table(&*api_handle, &key_b, parent_hash),
vec![MessagesFrom::from_messages(id_b, messages_from_b)],
&authorities,
),
built.networks[2].communication_for(
make_table(&*api_handle, &key_c, parent_hash),
vec![MessagesFrom::from_messages(id_c, messages_from_c)],
&authorities,
),
)
};
......
......@@ -20,7 +20,7 @@
//! each time a validation session begins on a new chain head.
use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::{consensus_gossip::ConsensusMessage, Context as NetContext};
use substrate_network::Context as NetContext;
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData};
......@@ -38,6 +38,8 @@ use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;
use router::Router;
use gossip::{POLKADOT_ENGINE_ID, GossipMessage, RegisteredMessageValidator, MessageValidationData};
use super::PolkadotProtocol;
/// An executor suitable for dispatching async consensus tasks.
......@@ -67,7 +69,7 @@ impl Executor for TaskExecutor {
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<ConsensusMessage>;
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>>;
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: Vec<u8>);
......@@ -81,11 +83,11 @@ pub trait NetworkService: Send + Sync + 'static {
}
impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<ConsensusMessage> {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<Vec<u8>> {
let (tx, rx) = std::sync::mpsc::channel();
self.with_gossip(move |gossip, _| {
let inner_rx = gossip.messages_for(topic);
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
......@@ -96,14 +98,10 @@ impl NetworkService for super::NetworkService {
}
fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
self.gossip_consensus_message(topic, message, false);
self.gossip_consensus_message(topic, POLKADOT_ENGINE_ID, message);
}
fn drop_gossip(&self, topic: Hash) {
self.with_gossip(move |gossip, _| {
gossip.collect_garbage_for_topic(topic);
})
}
fn drop_gossip(&self, _topic: Hash) { }
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
......@@ -115,8 +113,7 @@ impl NetworkService for super::NetworkService {
// task that processes all gossipped consensus messages,
// checking signatures
struct MessageProcessTask<P, E, N: NetworkService, T> {
inner_stream: mpsc::UnboundedReceiver<ConsensusMessage>,
parent_hash: Hash,
inner_stream: mpsc::UnboundedReceiver<Vec<u8>>,
table_router: Router<P, E, N, T>,
}
......@@ -127,19 +124,12 @@ impl<P, E, N, T> MessageProcessTask<P, E, N, T> where
N: NetworkService,
T: Clone + Executor + Send + 'static,
{
fn process_message(&self, msg: ConsensusMessage) -> Option<Async<()>> {
use polkadot_validation::SignedStatement;
debug!(target: "validation", "Processing validation statement for live session");
if let Some(statement) = SignedStatement::decode(&mut msg.as_slice()) {
if ::polkadot_validation::check_statement(
&statement.statement,
&statement.signature,
statement.sender,
&self.parent_hash
) {
self.table_router.import_statement(statement);
}
fn process_message(&self, msg: Vec<u8>) -> Option<Async<()>> {
debug!(target: "validation", "Processing consensus statement for live consensus");
// statements are already checked by gossip validator.
if let Some(message) = GossipMessage::decode(&mut &msg[..]) {
self.table_router.import_statement(message.statement);
}
None
......@@ -175,13 +165,20 @@ pub struct ValidationNetwork<P, E, N, T> {
network: Arc<N>,
api: Arc<P>,
executor: T,
message_validator: RegisteredMessageValidator,
exit: E,
}
impl<P, E, N, T> ValidationNetwork<P, E, N, T> {
/// Create a new validation session networking object.
pub fn new(network: Arc<N>, exit: E, api: Arc<P>, executor: T) -> Self {
ValidationNetwork { network, exit, api, executor }
/// Create a new consensus networking object.
pub fn new(
network: Arc<N>,
exit: E,
message_validator: RegisteredMessageValidator,
api: Arc<P>,
executor: T,
) -> Self {
ValidationNetwork { network, exit, message_validator, api, executor }
}
}
......@@ -192,6 +189,7 @@ impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
exit: self.exit.clone(),
api: self.api.clone(),
executor: self.executor.clone(),
message_validator: self.message_validator.clone(),
}
}
}
......@@ -210,6 +208,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
&self,
table: Arc<SharedTable>,
outgoing: polkadot_validation::Outgoing,
authorities: &[SessionKey],
) -> Self::TableRouter {
let parent_hash = table.consensus_parent_hash().clone();
......@@ -224,6 +223,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
parent_hash,
knowledge.clone(),
self.exit.clone(),
self.message_validator.clone(),
);
table_router.broadcast_egress(outgoing);
......@@ -234,6 +234,12 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let executor = self.executor.clone();
let exit = self.exit.clone();
// before requesting messages, note live consensus session.
self.message_validator.note_session(
parent_hash,
MessageValidationData { authorities: authorities.to_vec() },
);
// spin up a task in the background that processes all incoming statements
// TODO: propagate statements on a timer?
let inner_stream = self.network.gossip_messages_for(attestation_topic);
......@@ -245,7 +251,6 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
});
let process_task = MessageProcessTask {
inner_stream,
parent_hash,
table_router: table_router_clone,
};
......@@ -276,12 +281,14 @@ impl Future for AwaitingCollation {
.poll()
.map_err(|_| NetworkDown)
}
if let Ok(futures::Async::Ready(mut inner)) = self.outer.poll() {
let poll_result = inner.poll();
self.inner = Some(inner);
return poll_result.map_err(|_| NetworkDown)
match self.outer.poll() {
Ok(futures::Async::Ready(inner)) => {
self.inner = Some(inner);
self.poll()
},
Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
Err(_) => Err(NetworkDown)
}
Ok(futures::Async::NotReady)
}
}
......
......@@ -19,16 +19,20 @@
use rstd::prelude::*;
use sr_io::{keccak_256, secp256k1_ecdsa_recover};
use srml_support::{StorageValue, StorageMap};
use srml_support::traits::{Currency, ArithmeticType};
use system::ensure_signed;
use codec::Encode;
#[cfg(feature = "std")]
use sr_primitives::traits::Zero;
use balances;
use system;
type BalanceOf<T> = <<T as Trait>::Currency as ArithmeticType>::Type;
/// Configuration trait.
pub trait Trait: balances::Trait {
pub trait Trait: system::Trait {
/// The overarching event type.
type Event: From<Event<Self>> + Into<<Self as system::Trait>::Event>;
type Currency: ArithmeticType + Currency<Self::AccountId, Balance=BalanceOf<Self>>;
}
type EthereumAddress = [u8; 20];
......@@ -58,7 +62,7 @@ impl EcdsaSignature {
/// An event in this module.
decl_event!(
pub enum Event<T> where
B = <T as balances::Trait>::Balance,
B = BalanceOf<T>,
A = <T as system::Trait>::AccountId
{
/// Someone claimed some DOTs.
......@@ -73,13 +77,13 @@ decl_storage! {
trait Store for Module<T: Trait> as Claims {
Claims get(claims) build(|config: &GenesisConfig<T>| {
config.claims.iter().map(|(a, b)| (a.clone(), b.clone())).collect::<Vec<_>>()
}): map EthereumAddress => Option<T::Balance>;
}): map EthereumAddress => Option<BalanceOf<T>>;
Total get(total) build(|config: &GenesisConfig<T>| {
config.claims.iter().fold(Zero::zero(), |acc: T::Balance, &(_, n)| acc + n)
}): T::Balance;
config.claims.iter().fold(Zero::zero(), |acc: BalanceOf<T>, &(_, n)| acc + n)
}): BalanceOf<T>;
}
add_extra_genesis {
config(claims): Vec<(EthereumAddress, T::Balance)>;
config(claims): Vec<(EthereumAddress, BalanceOf<T>)>;
}
}
......@@ -117,21 +121,21 @@ decl_module! {
fn claim(origin, ethereum_signature: EcdsaSignature) {
// This is a public call, so we ensure that the origin is some signed account.
let sender = ensure_signed(origin)?;
let signer = sender.using_encoded(|data|
eth_recover(&ethereum_signature, data)
).ok_or("Invalid Ethereum signature")?;
let balance_due = <Claims<T>>::take(&signer)
.ok_or("Ethereum address has no claim")?;
<Total<T>>::mutate(|t| if *t < balance_due {