Unverified Commit e9402a6c authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Collators get incoming parachain messages (#149)

* refactor out a consensus data fetcher from table router

* move statement checking logic into router

* refuse to start authority if collator

* support building the table router asynchronously

* instantiate_consensus does not overwrite old

* update key in new consensus if there was none before

* collator collects ingress from network

* test produced egress roots

* fix adder-collator compilation

* address first grumbles

* integrate new gossip with collator network launch

* address review
parent c12969e1
Pipeline #33512 passed with stages
in 17 minutes and 30 seconds
......@@ -24,7 +24,7 @@ version = "0.1.0"
dependencies = [
"adder 0.1.0",
"ctrlc 3.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-collator 0.1.0",
......@@ -670,11 +670,11 @@ dependencies = [
[[package]]
name = "exit-future"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)",
"parking_lot 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
......@@ -2160,7 +2160,7 @@ dependencies = [
name = "polkadot-cli"
version = "0.3.0"
dependencies = [
"exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-service 0.3.0",
......@@ -2176,9 +2176,12 @@ dependencies = [
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"polkadot-cli 0.3.0",
"polkadot-network 0.1.0",
"polkadot-primitives 0.1.0",
"polkadot-runtime 0.1.0",
"polkadot-validation 0.1.0",
"substrate-client 0.1.0 (git+https://github.com/paritytech/substrate)",
"substrate-keyring 0.1.0 (git+https://github.com/paritytech/substrate)",
"substrate-primitives 0.1.0 (git+https://github.com/paritytech/substrate)",
"tokio 0.1.15 (registry+https://github.com/rust-lang/crates.io-index)",
]
......@@ -2208,6 +2211,7 @@ name = "polkadot-network"
version = "0.1.0"
dependencies = [
"arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -2348,7 +2352,7 @@ name = "polkadot-validation"
version = "0.1.0"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"parity-codec 3.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -3540,7 +3544,7 @@ dependencies = [
"clap 2.32.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -3929,7 +3933,7 @@ version = "0.3.0"
source = "git+https://github.com/paritytech/substrate#45824913c980bb1ba3963f9bba67775a507d8624"
dependencies = [
"error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
"exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
......@@ -4789,7 +4793,7 @@ dependencies = [
"checksum env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "afb070faf94c85d17d50ca44f6ad076bce18ae92f0037d350947240a36e9d42e"
"checksum environmental 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "db746025e3ea695bfa0ae744dbacd5fcfc8db51b9760cf8bd0ab69708bb93c49"
"checksum error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07e791d3be96241c77c43846b665ef1384606da2cd2a48730abe606a12906e02"
"checksum exit-future 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "87559b08e99a81a92bbb867d237543e43495857749f688e0773390a20d56c61c"
"checksum exit-future 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "d8013f441e38e31c670e7f34ec8f1d5d3a2bd9d303c1ff83976ca886005e8f48"
"checksum failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "795bd83d3abeb9220f257e597aa0080a508b27533824adf336529648f6abf7e2"
"checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1"
"checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed"
......
......@@ -45,6 +45,7 @@ pub use service::{
pub use cli::{VersionInfo, IntoExit};
pub use cli::error;
pub use tokio::runtime::TaskExecutor;
fn load_spec(id: &str) -> Result<Option<service::ChainSpec>, String> {
Ok(match ChainSpec::from(id) {
......@@ -68,7 +69,7 @@ pub trait Worker: IntoExit {
fn configuration(&self) -> service::CustomConfiguration { Default::default() }
/// Do work and schedule exit.
fn work<S: PolkadotService>(self, service: &S) -> Self::Work;
fn work<S: PolkadotService>(self, service: &S, executor: TaskExecutor) -> Self::Work;
}
/// Parse command line arguments into service configuration.
......@@ -129,7 +130,7 @@ fn run_until_exit<T, C, W>(
let executor = runtime.executor();
cli::informant::start(&service, exit.clone(), executor.clone());
let _ = runtime.block_on(worker.work(&*service));
let _ = runtime.block_on(worker.work(&*service, executor.clone()));
exit_send.fire();
// we eagerly drop the service so that the internal exit future is fired,
......
......@@ -12,5 +12,10 @@ substrate-primitives = { git = "https://github.com/paritytech/substrate" }
polkadot-runtime = { path = "../runtime", version = "0.1" }
polkadot-primitives = { path = "../primitives", version = "0.1" }
polkadot-cli = { path = "../cli" }
polkadot-network = { path = "../network" }
polkadot-validation = { path = "../validation" }
log = "0.4"
tokio = "0.1.7"
[dev-dependencies]
substrate-keyring = { git = "https://github.com/paritytech/substrate" }
......@@ -53,25 +53,33 @@ extern crate tokio;
extern crate polkadot_cli;
extern crate polkadot_runtime;
extern crate polkadot_primitives;
extern crate polkadot_network;
extern crate polkadot_validation;
#[macro_use]
extern crate log;
use std::collections::{BTreeSet, BTreeMap, HashSet};
#[cfg(test)]
extern crate substrate_keyring as keyring;
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use futures::{future, stream, Stream, Future, IntoFuture};
use futures::{future, Stream, Future, IntoFuture};
use client::BlockchainEvents;
use primitives::{ed25519, Pair};
use polkadot_primitives::{BlockId, SessionKey};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId};
use polkadot_primitives::{BlockId, SessionKey, Hash, Block};
use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId, Extrinsic};
use polkadot_cli::{PolkadotService, CustomConfiguration, CoreApi, ParachainHost};
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi};
use polkadot_cli::{Worker, IntoExit, ProvideRuntimeApi, TaskExecutor};
use polkadot_network::validation::{ValidationNetwork, SessionParams};
use polkadot_network::NetworkService;
use tokio::timer::Timeout;
pub use polkadot_cli::VersionInfo;
pub use polkadot_network::validation::Incoming;
const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
......@@ -107,63 +115,23 @@ pub trait ParachainContext: Clone {
&self,
last_head: HeadData,
ingress: I,
) -> Result<(BlockData, HeadData), InvalidHead>;
) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead>;
}
/// Relay chain context needed to collate.
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
type Error;
type Error: ::std::fmt::Debug;
/// Future that resolves to the un-routed egress queues of a parachain.
/// The first item is the oldest.
type FutureEgress: IntoFuture<Item=Vec<Vec<Message>>, Error=Self::Error>;
/// Provide a set of all parachains meant to be routed to at a block.
fn routing_parachains(&self) -> BTreeSet<ParaId>;
type FutureEgress: IntoFuture<Item=ConsolidatedIngress, Error=Self::Error>;
/// Get un-routed egress queues from a parachain to the local parachain.
fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress;
}
/// Collate the necessary ingress queue using the given context.
pub fn collate_ingress<'a, R>(relay_context: R)
-> impl Future<Item=ConsolidatedIngress, Error=R::Error> + 'a
where
R: RelayChainContext,
R::Error: 'a,
R::FutureEgress: 'a,
{
let mut egress_fetch = Vec::new();
for routing_parachain in relay_context.routing_parachains() {
let fetch = relay_context
.unrouted_egress(routing_parachain)
.into_future()
.map(move |egresses| (routing_parachain, egresses));
egress_fetch.push(fetch);
}
// create a map ordered first by the depth of the egress queue
// and then by the parachain ID.
//
// then transform that into the consolidated egress queue.
stream::futures_unordered(egress_fetch)
.fold(BTreeMap::new(), |mut map, (routing_id, egresses)| {
for (depth, egress) in egresses.into_iter().rev().enumerate() {
let depth = -(depth as i64);
map.insert((depth, routing_id), egress);
}
Ok(map)
})
.map(|ordered| ordered.into_iter().map(|((_, id), egress)| (id, egress)))
.map(|i| i.collect::<Vec<_>>())
.map(ConsolidatedIngress)
}
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub fn collate<'a, R, P>(
local_id: ParaId,
......@@ -174,19 +142,22 @@ pub fn collate<'a, R, P>(
)
-> impl Future<Item=parachain::Collation, Error=Error<R::Error>> + 'a
where
R: RelayChainContext + 'a,
R: RelayChainContext,
R::Error: 'a,
R::FutureEgress: 'a,
P: ParachainContext + 'a,
{
collate_ingress(relay_context).map_err(Error::Polkadot).and_then(move |ingress| {
let (block_data, head_data) = para_context.produce_candidate(
let ingress = relay_context.unrouted_egress(local_id).into_future().map_err(Error::Polkadot);
ingress.and_then(move |ConsolidatedIngress(ingress)| {
let (block_data, head_data, mut extrinsic) = para_context.produce_candidate(
last_head,
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
ingress.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
).map_err(Error::Collator)?;
let block_data_hash = block_data.hash();
let signature = key.sign(block_data_hash.as_ref()).into();
let egress_queue_roots =
::polkadot_validation::egress_roots(&mut extrinsic.outgoing_messages);
let receipt = parachain::CandidateReceipt {
parachain_index: local_id,
......@@ -194,11 +165,12 @@ pub fn collate<'a, R, P>(
signature,
head_data,
balance_uploads: Vec::new(),
egress_queue_roots: Vec::new(),
egress_queue_roots,
fees: 0,
block_data_hash,
};
// not necessary to send extrinsic because it is recomputed from execution.
Ok(parachain::Collation {
receipt,
block_data,
......@@ -207,18 +179,34 @@ pub fn collate<'a, R, P>(
}
/// Polkadot-api context.
struct ApiContext;
impl RelayChainContext for ApiContext {
type Error = client::error::Error;
type FutureEgress = Result<Vec<Vec<Message>>, Self::Error>;
fn routing_parachains(&self) -> BTreeSet<ParaId> {
BTreeSet::new()
}
struct ApiContext<P, E> {
network: ValidationNetwork<P, E, NetworkService, TaskExecutor>,
parent_hash: Hash,
authorities: Vec<SessionKey>,
}
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
Ok(Vec::new())
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
P: ProvideRuntimeApi + Send + Sync,
P::Api: ParachainHost<Block>,
E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
{
type Error = String;
type FutureEgress = Box<Future<Item=ConsolidatedIngress, Error=String> + Send>;
fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress {
let session = self.network.instantiate_session(SessionParams {
local_session_key: None,
parent_hash: self.parent_hash,
authorities: self.authorities.clone(),
}).map_err(|e| format!("unable to instantiate validation session: {:?}", e));
let fetch_incoming = session
.and_then(move |session| session.fetch_incoming(id).map_err(|e|
format!("unable to fetch incoming data: {:?}", e)
))
.map(ConsolidatedIngress);
Box::new(fetch_incoming)
}
}
......@@ -241,7 +229,7 @@ impl<P, E> IntoExit for CollationNode<P, E> where
impl<P, E> Worker for CollationNode<P, E> where
P: ParachainContext + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static
E: Future<Item=(),Error=()> + Clone + Send + Sync + 'static
{
type Work = Box<Future<Item=(),Error=()> + Send>;
......@@ -254,13 +242,42 @@ impl<P, E> Worker for CollationNode<P, E> where
config
}
fn work<S>(self, service: &S) -> Self::Work
fn work<S>(self, service: &S, task_executor: TaskExecutor) -> Self::Work
where S: PolkadotService,
{
let CollationNode { parachain_context, exit, para_id, key } = self;
let client = service.client();
let network = service.network();
let known_oracle = client.clone();
let message_validator = polkadot_network::gossip::register_validator(
&*network,
move |block_hash: &Hash| {
use client::{BlockStatus, ChainHead};
use polkadot_network::gossip::Known;
match known_oracle.block_status(&BlockId::hash(*block_hash)) {
Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None,
Ok(BlockStatus::KnownBad) => Some(Known::Bad),
Ok(BlockStatus::InChain) => match known_oracle.leaves() {
Err(_) => None,
Ok(leaves) => if leaves.contains(block_hash) {
Some(Known::Leaf)
} else {
Some(Known::Old)
},
}
}
},
);
let validation_network = ValidationNetwork::new(
network.clone(),
exit.clone(),
message_validator,
client.clone(),
task_executor,
);
let inner_exit = exit.clone();
let work = client.import_notification_stream()
......@@ -269,7 +286,9 @@ impl<P, E> Worker for CollationNode<P, E> where
($e:expr) => {
match $e {
Ok(x) => x,
Err(e) => return future::Either::A(future::err(Error::Polkadot(e))),
Err(e) => return future::Either::A(future::err(Error::Polkadot(
format!("{:?}", e)
))),
}
}
}
......@@ -281,6 +300,7 @@ impl<P, E> Worker for CollationNode<P, E> where
let client = client.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();
let validation_network = validation_network.clone();
let work = future::lazy(move || {
let api = client.runtime_api();
......@@ -289,16 +309,24 @@ impl<P, E> Worker for CollationNode<P, E> where
None => return future::Either::A(future::ok(())),
};
let authorities = try_fr!(api.authorities(&id));
let targets = compute_targets(
para_id,
try_fr!(api.authorities(&id)).as_slice(),
authorities.as_slice(),
try_fr!(api.duty_roster(&id)),
);
let context = ApiContext {
network: validation_network,
parent_hash: relay_parent,
authorities,
};
let collation_work = collate(
para_id,
HeadData(last_head),
ApiContext,
context,
parachain_context,
key,
).map(move |collation| {
......@@ -355,7 +383,7 @@ pub fn run_collator<P, E, I, ArgT>(
) -> polkadot_cli::error::Result<()> where
P: ParachainContext + Send + 'static,
E: IntoFuture<Item=(),Error=()>,
E::Future: Send + Clone + 'static,
E::Future: Send + Clone + Sync + 'static,
I: IntoIterator<Item=ArgT>,
ArgT: Into<std::ffi::OsString> + Clone,
{
......@@ -365,74 +393,91 @@ pub fn run_collator<P, E, I, ArgT>(
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use polkadot_primitives::parachain::OutgoingMessage;
use keyring::AuthorityKeyring;
use super::*;
use std::collections::{HashMap, BTreeSet};
use futures::Future;
use polkadot_primitives::parachain::{Message, Id as ParaId};
pub struct DummyRelayChainCtx {
egresses: HashMap<ParaId, Vec<Vec<Message>>>,
currently_routing: BTreeSet<ParaId>,
#[derive(Default, Clone)]
struct DummyRelayChainContext {
ingress: HashMap<ParaId, ConsolidatedIngress>
}
impl RelayChainContext for DummyRelayChainCtx {
impl RelayChainContext for DummyRelayChainContext {
type Error = ();
type FutureEgress = Result<Vec<Vec<Message>>, ()>;
type FutureEgress = Box<Future<Item=ConsolidatedIngress,Error=()>>;
fn routing_parachains(&self) -> BTreeSet<ParaId> {
self.currently_routing.clone()
fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress {
match self.ingress.get(&para_id) {
Some(ingress) => Box::new(future::ok(ingress.clone())),
None => Box::new(future::empty()),
}
}
}
fn unrouted_egress(&self, id: ParaId) -> Result<Vec<Vec<Message>>, ()> {
Ok(self.egresses.get(&id).cloned().unwrap_or_default())
#[derive(Clone)]
struct DummyParachainContext;
impl ParachainContext for DummyParachainContext {
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
&self,
_last_head: HeadData,
ingress: I,
) -> Result<(BlockData, HeadData, Extrinsic), InvalidHead> {
// send messages right back.
Ok((
BlockData(vec![1, 2, 3, 4, 5,]),
HeadData(vec![9, 9, 9]),
Extrinsic {
outgoing_messages: ingress.into_iter().map(|(id, msg)| OutgoingMessage {
target: id,
data: msg.0,
}).collect(),
}
))
}
}
#[test]
fn collates_ingress() {
let route_from = |x: &[ParaId]| {
let mut set = BTreeSet::new();
set.extend(x.iter().cloned());
set
};
let message = |x: Vec<u8>| vec![Message(x)];
let dummy_ctx = DummyRelayChainCtx {
currently_routing: route_from(&[2.into(), 3.into()]),
egresses: vec![
// egresses for `2`: last routed successfully 5 blocks ago.
(2.into(), vec![
message(vec![1, 2, 3]),
message(vec![4, 5, 6]),
message(vec![7, 8]),
message(vec![10]),
message(vec![12]),
]),
// egresses for `3`: last routed successfully 3 blocks ago.
(3.into(), vec![
message(vec![9]),
message(vec![11]),
message(vec![13]),
]),
].into_iter().collect(),
};
assert_eq!(
collate_ingress(dummy_ctx).wait().unwrap(),
ConsolidatedIngress(vec![
(2.into(), message(vec![1, 2, 3])),
(2.into(), message(vec![4, 5, 6])),
(2.into(), message(vec![7, 8])),
(3.into(), message(vec![9])),
(2.into(), message(vec![10])),
(3.into(), message(vec![11])),
(2.into(), message(vec![12])),
(3.into(), message(vec![13])),
]
))
fn collates_correct_queue_roots() {
let mut context = DummyRelayChainContext::default();
let id = ParaId::from(100);
let a = ParaId::from(123);
let b = ParaId::from(456);
let messages_from_a = vec![
Message(vec![1, 1, 1]),
Message(b"helloworld".to_vec()),
];
let messages_from_b = vec![
Message(b"dogglesworth".to_vec()),
Message(b"buy_1_chili_con_carne_here_is_my_cash".to_vec()),
];
let root_a = ::polkadot_validation::message_queue_root(
messages_from_a.iter().map(|msg| &msg.0)
);
let root_b = ::polkadot_validation::message_queue_root(
messages_from_b.iter().map(|msg| &msg.0)
);
context.ingress.insert(id, ConsolidatedIngress(vec![
(b, messages_from_b),
(a, messages_from_a),
]));
let collation = collate(
id,
HeadData(vec![5]),
context.clone(),
DummyParachainContext,
AuthorityKeyring::Alice.pair().into(),
).wait().unwrap();
// ascending order by root.
assert_eq!(collation.receipt.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
}
}
......@@ -19,6 +19,7 @@ futures = "0.1"
tokio = "0.1.7"
log = "0.4"
slice-group-by = "0.2.2"
exit-future = "0.1.4"
[dev-dependencies]
substrate-client = { git = "https://github.com/paritytech/substrate" }
......
......@@ -32,6 +32,7 @@ extern crate arrayvec;
extern crate parking_lot;
extern crate tokio;
extern crate slice_group_by;
extern crate exit_future;
#[macro_use]
extern crate futures;
......@@ -66,7 +67,6 @@ use self::local_collations::LocalCollations;
use std::collections::{HashMap, HashSet};
#[cfg(test)]
mod tests;
......@@ -213,10 +213,13 @@ impl PolkadotProtocol {
fn new_validation_session(
&mut self,
ctx: &mut Context<Block>,
parent_hash: Hash,
session: validation::ValidationSession,
) {
if let Some(new_local) = self.live_validation_sessions.new_validation_session(parent_hash, session) {
params: validation::SessionParams,
) -> validation::ValidationSession {
let (session, new_local) = self.live_validation_sessions
.new_validation_session(params);
if let Some(new_local) = new_local {
for (id, peer_data) in self.peers.iter_mut()
.filter(|&(_, ref info)| info.should_send_key())
{
......@@ -227,10 +230,13 @@ impl PolkadotProtocol {
));
}
}