Commit b2870ce0 authored by asynchronous rob's avatar asynchronous rob Committed by Gav Wood
Browse files

Allow many attestation instances to live at once in network (#46)

* multiple consensus sessions in network

* tests compile, add a test for RecentSessionKeys

* track recently received session keys from validators

* add a test for desired key-sending behavior
parent 4b0c4968
Pipeline #26462 passed with stages
in 9 minutes and 29 seconds
......@@ -1981,6 +1981,7 @@ dependencies = [
name = "polkadot-network"
version = "0.1.0"
dependencies = [
"arrayvec 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
......
......@@ -5,6 +5,7 @@ authors = ["Parity Technologies <admin@parity.io>"]
description = "Polkadot-specific networking protocol"
[dependencies]
arrayvec = "0.4"
parking_lot = "0.4"
polkadot-availability-store = { path = "../availability-store" }
polkadot-consensus = { path = "../consensus" }
......
......@@ -21,20 +21,22 @@
use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::consensus_gossip::ConsensusMessage;
use polkadot_consensus::{Network, SharedTable, Collators};
use polkadot_consensus::{Network, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{AccountId, Block, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation, ParachainHost};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, BlockData};
use codec::Decode;
use futures::prelude::*;
use futures::sync::mpsc;
use std::collections::HashMap;
use std::sync::Arc;
use arrayvec::ArrayVec;
use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;
use super::{NetworkService, Knowledge, CurrentConsensus};
use super::NetworkService;
use router::Router;
// task that processes all gossipped consensus messages,
......@@ -142,9 +144,8 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Network for ConsensusNetwork<
// TODO: propagate statements on a timer?
let inner_stream = self.network.consensus_gossip().write().messages_for(attestation_topic);
task_executor.spawn(self.network.with_spec(|spec, ctx| {
spec.new_consensus(ctx, CurrentConsensus {
spec.new_consensus(ctx, parent_hash, CurrentConsensus {
knowledge,
parent_hash,
local_session_key,
});
......@@ -175,7 +176,6 @@ impl Future for AwaitingCollation {
}
}
impl<P: ProvideRuntimeApi + Send + Sync + 'static> Collators for ConsensusNetwork<P>
where P::Api: ParachainHost<Block>,
{
......@@ -192,3 +192,250 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static> Collators for ConsensusNetwor
self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator));
}
}
#[derive(Default)]
struct KnowledgeEntry {
knows_block_data: Vec<SessionKey>,
knows_extrinsic: Vec<SessionKey>,
block_data: Option<BlockData>,
extrinsic: Option<Extrinsic>,
}
/// Tracks knowledge of peers.
pub(crate) struct Knowledge {
candidates: HashMap<Hash, KnowledgeEntry>,
}
impl Knowledge {
/// Create a new knowledge instance.
pub(crate) fn new() -> Self {
Knowledge {
candidates: HashMap::new(),
}
}
/// Note a statement seen from another validator.
pub(crate) fn note_statement(&mut self, from: SessionKey, statement: &Statement) {
match *statement {
GenericStatement::Candidate(ref c) => {
let mut entry = self.candidates.entry(c.hash()).or_insert_with(Default::default);
entry.knows_block_data.push(from);
entry.knows_extrinsic.push(from);
}
GenericStatement::Available(ref hash) => {
let mut entry = self.candidates.entry(*hash).or_insert_with(Default::default);
entry.knows_block_data.push(from);
entry.knows_extrinsic.push(from);
}
GenericStatement::Valid(ref hash) | GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash)
.or_insert_with(Default::default)
.knows_block_data
.push(from),
}
}
/// Note a candidate collated or seen locally.
pub(crate) fn note_candidate(&mut self, hash: Hash, block_data: Option<BlockData>, extrinsic: Option<Extrinsic>) {
let entry = self.candidates.entry(hash).or_insert_with(Default::default);
entry.block_data = entry.block_data.take().or(block_data);
entry.extrinsic = entry.extrinsic.take().or(extrinsic);
}
}
/// A current consensus instance.
pub(crate) struct CurrentConsensus {
knowledge: Arc<Mutex<Knowledge>>,
local_session_key: SessionKey,
}
impl CurrentConsensus {
#[cfg(test)]
pub(crate) fn new(knowledge: Arc<Mutex<Knowledge>>, local_session_key: SessionKey) -> Self {
CurrentConsensus {
knowledge,
local_session_key
}
}
// execute a closure with locally stored block data for a candidate, or a slice of session identities
// we believe should have the data.
fn with_block_data<F, U>(&self, hash: &Hash, f: F) -> U
where F: FnOnce(Result<&BlockData, &[SessionKey]>) -> U
{
let knowledge = self.knowledge.lock();
let res = knowledge.candidates.get(hash)
.ok_or(&[] as &_)
.and_then(|entry| entry.block_data.as_ref().ok_or(&entry.knows_block_data[..]));
f(res)
}
}
// 3 is chosen because sessions change infrequently and usually
// only the last 2 (current session and "last" session) are relevant.
// the extra is an error boundary.
const RECENT_SESSIONS: usize = 3;
/// Result when inserting recent session key.
#[derive(PartialEq, Eq)]
pub(crate) enum InsertedRecentKey {
/// Key was already known.
AlreadyKnown,
/// Key was new and pushed out optional old item.
New(Option<SessionKey>),
}
/// Wrapper for managing recent session keys.
#[derive(Default)]
pub(crate) struct RecentSessionKeys {
inner: ArrayVec<[SessionKey; RECENT_SESSIONS]>,
}
impl RecentSessionKeys {
/// Insert a new session key. This returns one to be pushed out if the
/// set is full.
pub(crate) fn insert(&mut self, key: SessionKey) -> InsertedRecentKey {
if self.inner.contains(&key) { return InsertedRecentKey::AlreadyKnown }
let old = if self.inner.len() == RECENT_SESSIONS {
Some(self.inner.remove(0))
} else {
None
};
self.inner.push(key);
InsertedRecentKey::New(old)
}
/// As a slice.
pub(crate) fn as_slice(&self) -> &[SessionKey] {
&*self.inner
}
fn remove(&mut self, key: &SessionKey) {
self.inner.retain(|k| k != key)
}
}
/// Manages requests and session keys for live consensus instances.
pub(crate) struct LiveConsensusInstances {
// recent local session keys.
recent: RecentSessionKeys,
// live consensus instances, on `parent_hash`.
live_instances: HashMap<Hash, CurrentConsensus>,
}
impl LiveConsensusInstances {
/// Create a new `LiveConsensusInstances`
pub(crate) fn new() -> Self {
LiveConsensusInstances {
recent: Default::default(),
live_instances: HashMap::new(),
}
}
/// Note new consensus session. If the used session key is new,
/// it returns it to be broadcasted to peers.
pub(crate) fn new_consensus(
&mut self,
parent_hash: Hash,
consensus: CurrentConsensus,
) -> Option<SessionKey> {
let inserted_key = self.recent.insert(consensus.local_session_key);
let maybe_new = if let InsertedRecentKey::New(_) = inserted_key {
Some(consensus.local_session_key)
} else {
None
};
self.live_instances.insert(parent_hash, consensus);
maybe_new
}
/// Remove consensus session.
pub(crate) fn remove(&mut self, parent_hash: &Hash) {
if let Some(consensus) = self.live_instances.remove(parent_hash) {
let key_still_used = self.live_instances.values()
.any(|c| c.local_session_key == consensus.local_session_key);
if !key_still_used {
self.recent.remove(&consensus.local_session_key)
}
}
}
/// Recent session keys as a slice.
pub(crate) fn recent_keys(&self) -> &[SessionKey] {
self.recent.as_slice()
}
/// Call a closure with block data from consensus session at parent hash.
///
/// This calls the closure with `Some(data)` where the session and data are live,
/// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys
/// who have the data, and `Err(None)` where the session is unknown.
pub(crate) fn with_block_data<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
where F: FnOnce(Result<&BlockData, Option<&[SessionKey]>>) -> U
{
match self.live_instances.get(parent_hash) {
Some(c) => c.with_block_data(c_hash, |res| f(res.map_err(Some))),
None => f(Err(None))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn last_keys_works() {
let a = [1; 32].into();
let b = [2; 32].into();
let c = [3; 32].into();
let d = [4; 32].into();
let mut recent = RecentSessionKeys::default();
match recent.insert(a) {
InsertedRecentKey::New(None) => {},
_ => panic!("is new, not at capacity"),
}
match recent.insert(a) {
InsertedRecentKey::AlreadyKnown => {},
_ => panic!("not new"),
}
match recent.insert(b) {
InsertedRecentKey::New(None) => {},
_ => panic!("is new, not at capacity"),
}
match recent.insert(b) {
InsertedRecentKey::AlreadyKnown => {},
_ => panic!("not new"),
}
match recent.insert(c) {
InsertedRecentKey::New(None) => {},
_ => panic!("is new, not at capacity"),
}
match recent.insert(c) {
InsertedRecentKey::AlreadyKnown => {},
_ => panic!("not new"),
}
match recent.insert(d) {
InsertedRecentKey::New(Some(old)) => assert_eq!(old, a),
_ => panic!("is new, and at capacity"),
}
match recent.insert(d) {
InsertedRecentKey::AlreadyKnown => {},
_ => panic!("not new"),
}
}
}
......@@ -29,6 +29,7 @@ extern crate polkadot_consensus;
extern crate polkadot_availability_store as av_store;
extern crate polkadot_primitives;
extern crate arrayvec;
extern crate futures;
extern crate parking_lot;
extern crate tokio;
......@@ -46,19 +47,17 @@ pub mod consensus;
use codec::{Decode, Encode};
use futures::sync::oneshot;
use parking_lot::Mutex;
use polkadot_consensus::{Statement, GenericStatement};
use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation};
use polkadot_primitives::parachain::{Id as ParaId, BlockData, CandidateReceipt, Collation};
use substrate_network::{NodeIndex, RequestId, Context, Severity};
use substrate_network::{message, generic_message};
use substrate_network::specialization::NetworkSpecialization as Specialization;
use substrate_network::StatusMessage as GenericFullStatus;
use self::consensus::{LiveConsensusInstances, RecentSessionKeys, InsertedRecentKey};
use self::collator_pool::{CollatorPool, Role, Action};
use self::local_collations::LocalCollations;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
#[cfg(test)]
......@@ -91,92 +90,46 @@ struct BlockDataRequest {
enum CollatorState {
Fresh,
RolePending(Role),
Primed,
Primed(Option<Role>),
}
impl CollatorState {
fn send_key<F: FnMut(Message)>(&mut self, key: SessionKey, mut f: F) {
f(Message::SessionKey(key));
if let CollatorState::RolePending(role) = ::std::mem::replace(self, CollatorState::Primed) {
if let CollatorState::RolePending(role) = *self {
f(Message::CollatorRole(role));
*self = CollatorState::Primed(Some(role));
}
}
fn set_role<F: FnMut(Message)>(&mut self, role: Role, mut f: F) {
if let CollatorState::Primed = *self {
if let CollatorState::Primed(ref mut r) = *self {
f(Message::CollatorRole(role));
*r = Some(role);
} else {
*self = CollatorState::RolePending(role);
}
}
fn role(&self) -> Option<Role> {
match *self {
CollatorState::Fresh => None,
CollatorState::RolePending(role) => Some(role),
CollatorState::Primed(role) => role,
}
}
}
struct PeerInfo {
collating_for: Option<(AccountId, ParaId)>,
validator_key: Option<SessionKey>,
validator_keys: RecentSessionKeys,
claimed_validator: bool,
collator_state: CollatorState,
}
#[derive(Default)]
struct KnowledgeEntry {
knows_block_data: Vec<SessionKey>,
knows_extrinsic: Vec<SessionKey>,
block_data: Option<BlockData>,
extrinsic: Option<Extrinsic>,
}
/// Tracks knowledge of peers.
struct Knowledge {
candidates: HashMap<Hash, KnowledgeEntry>,
}
impl Knowledge {
pub fn new() -> Self {
Knowledge {
candidates: HashMap::new(),
}
}
fn note_statement(&mut self, from: SessionKey, statement: &Statement) {
match *statement {
GenericStatement::Candidate(ref c) => {
let mut entry = self.candidates.entry(c.hash()).or_insert_with(Default::default);
entry.knows_block_data.push(from);
entry.knows_extrinsic.push(from);
}
GenericStatement::Available(ref hash) => {
let mut entry = self.candidates.entry(*hash).or_insert_with(Default::default);
entry.knows_block_data.push(from);
entry.knows_extrinsic.push(from);
}
GenericStatement::Valid(ref hash) | GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash)
.or_insert_with(Default::default)
.knows_block_data
.push(from),
}
}
fn note_candidate(&mut self, hash: Hash, block_data: Option<BlockData>, extrinsic: Option<Extrinsic>) {
let entry = self.candidates.entry(hash).or_insert_with(Default::default);
entry.block_data = entry.block_data.take().or(block_data);
entry.extrinsic = entry.extrinsic.take().or(extrinsic);
}
}
struct CurrentConsensus {
knowledge: Arc<Mutex<Knowledge>>,
parent_hash: Hash,
local_session_key: SessionKey,
}
impl CurrentConsensus {
// get locally stored block data for a candidate.
fn block_data(&self, relay_parent: &Hash, hash: &Hash) -> Option<BlockData> {
if relay_parent != &self.parent_hash { return None }
self.knowledge.lock().candidates.get(hash)
.and_then(|entry| entry.block_data.clone())
impl PeerInfo {
fn should_send_key(&self) -> bool {
self.claimed_validator || self.collating_for.is_some()
}
}
......@@ -209,7 +162,7 @@ pub struct PolkadotProtocol {
collators: CollatorPool,
validators: HashMap<SessionKey, NodeIndex>,
local_collations: LocalCollations<Collation>,
live_consensus: Option<CurrentConsensus>,
live_consensus: LiveConsensusInstances,
in_flight: HashMap<(RequestId, NodeIndex), BlockDataRequest>,
pending: Vec<BlockDataRequest>,
extrinsic_store: Option<::av_store::Store>,
......@@ -225,7 +178,7 @@ impl PolkadotProtocol {
collating_for,
validators: HashMap::new(),
local_collations: LocalCollations::new(),
live_consensus: None,
live_consensus: LiveConsensusInstances::new(),
in_flight: HashMap::new(),
pending: Vec::new(),
extrinsic_store: None,
......@@ -250,69 +203,75 @@ impl PolkadotProtocol {
}
/// Note new consensus session.
fn new_consensus(&mut self, ctx: &mut Context<Block>, consensus: CurrentConsensus) {
let old_data = self.live_consensus.as_ref().map(|c| (c.parent_hash, c.local_session_key));
if Some(&consensus.local_session_key) != old_data.as_ref().map(|&(_, ref key)| key) {
fn new_consensus(
&mut self,
ctx: &mut Context<Block>,
parent_hash: Hash,
consensus: consensus::CurrentConsensus,
) {
if let Some(new_local) = self.live_consensus.new_consensus(parent_hash, consensus) {
for (id, peer_data) in self.peers.iter_mut()
.filter(|&(_, ref info)| info.claimed_validator || info.collating_for.is_some())
.filter(|&(_, ref info)| info.should_send_key())
{
peer_data.collator_state.send_key(consensus.local_session_key, |msg| send_polkadot_message(
peer_data.collator_state.send_key(new_local, |msg| send_polkadot_message(
ctx,
*id,
msg
));
}
}
}
self.live_consensus = Some(consensus);
fn remove_consensus(&mut self, parent_hash: &Hash) {
self.live_consensus.remove(parent_hash);
}
fn dispatch_pending_requests(&mut self, ctx: &mut Context<Block>) {
let consensus = match self.live_consensus {
Some(ref mut c) => c,
None => {
self.pending.clear();
return;
}
};
let knowledge = consensus.knowledge.lock();
let mut new_pending = Vec::new();
let validator_keys = &mut self.validators;
let next_req_id = &mut self.next_req_id;
let in_flight = &mut self.in_flight;
for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) {
if pending.consensus_parent != consensus.parent_hash { continue }
let parent = pending.consensus_parent;
let c_hash = pending.candidate_hash;
if let Some(entry) = knowledge.candidates.get(&pending.candidate_hash) {
// answer locally
if let Some(ref data) = entry.block_data {
let still_pending = self.live_consensus.with_block_data(&parent, &c_hash, |x| match x {
Ok(data @ &_) => {
// answer locally.
let _ = pending.sender.send(data.clone());
continue;
None
}
let validator_keys = &mut self.validators;
let next_peer = entry.knows_block_data.iter()
.filter_map(|x| validator_keys.get(x).map(|id| (*x, *id)))
.find(|&(ref key, _)| pending.attempted_peers.insert(*key))
.map(|(_, id)| id);
// dispatch to peer
if let Some(who) = next_peer {
let req_id = self.next_req_id;
self.next_req_id += 1;
send_polkadot_message(
ctx,
who,
Message::RequestBlockData(req_id, pending.consensus_parent, pending.candidate_hash)
);
self.in_flight.insert((req_id, who), pending);
continue;
Err(Some(known_keys)) => {
let next_peer = known_keys.iter()
.filter_map(|x| validator_keys.get(x).map(|id| (*x, *id)))
.find(|&(ref key, _)| pending.attempted_peers.insert(*key))
.map(|(_, id)| id);
// dispatch to peer
if let Some(who) = next_peer {
let req_id = *next_req_id;
*next_req_id += 1;
send_polkadot_message(
ctx,
who,
Message::RequestBlockData(req_id, parent, c_hash)
);
in_flight.insert((req_id, who), pending);
None
} else {
Some(pending)
}
}
}
Err(None) => None, // no such known consensus session. prune out.
});
new_pending.push(pending);
if let Some(pending) = still_pending {
new_pending.push(pending);
}
}
self.pending = new_pending;
......@@ -323,8 +282,12 @@ impl PolkadotProtocol {
match msg {
Message::SessionKey(key) => self.on_session_key(ctx, who, key),
Message::RequestBlockData(req_id, relay_parent, candidate_hash) => {
let block_data = self.live_consensus.as_ref()
.and_then(|c| c.block_data(&relay_parent, &candidate_hash))
let block_data = self.live_consensus
.with_block_data(
&relay_parent,
&candidate_hash,
|res| res.ok().map(|b| b.clone()),
)
.or_else(|| self.extrinsic_store.as_ref()
.and_then(|s| s.block_data(relay_parent, candidate_hash))
);
......@@ -352,18 +315,26 @@ impl PolkadotProtocol {
return;
}
if let Some(old_key) = ::std::mem::replace(&mut info.validator_key, Some(key)) {
self.validators.remove(&old_key);
for (relay_parent, collation) in self.local_collations.fresh_key