Unverified Commit 502205d1 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

strip out all ICMP network code and begin gossip refactor for attestations (#256)

* strip out all ICMP code and begin gossip refactor

* validate incoming statements

* message_allowed logic

* compiles

* do reporting and neighbor packet validation

* tests compile

* propagate gossip messages

* test message_allowed

* some more tests

* address grumbles
parent d29d0bfd
......@@ -2314,7 +2314,6 @@ dependencies = [
"polkadot-availability-store 0.1.0",
"polkadot-primitives 0.1.0",
"polkadot-validation 0.1.0",
"slice-group-by 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"sr-primitives 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"substrate-client 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
"substrate-keyring 2.0.0 (git+https://github.com/paritytech/substrate?branch=polkadot-master)",
......@@ -3017,11 +3016,6 @@ name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "slice-group-by"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "slog"
version = "2.4.1"
......@@ -5233,7 +5227,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum sha3 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "34a5e54083ce2b934bf059fdf38e7330a154177e029ab6c4e18638f2f624053a"
"checksum shell32-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "9ee04b46101f57121c9da2b151988283b6beb79b34f5bb29a58ee48cb695122c"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum slice-group-by 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "049599674ed27c9b78b93265482068999c0fc71116e186ea4a408e9fc47723b0"
"checksum slog 2.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e1a2eec401952cd7b12a84ea120e2d57281329940c3f93c2bf04f462539508e"
"checksum slog-async 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e544d16c6b230d84c866662fe55e31aacfca6ae71e6fc49ae9a311cb379bfc2f"
"checksum slog-json 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ddc0d2aff1f8f325ef660d9a0eb6e6dcd20b30b3f581a5897f58bf42d061c37a"
......
......@@ -136,7 +136,7 @@ pub trait RelayChainContext {
type FutureEgress: IntoFuture<Item=ConsolidatedIngress, Error=Self::Error>;
/// Get un-routed egress queues from a parachain to the local parachain.
fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress;
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress;
}
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
......@@ -202,20 +202,17 @@ impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
type Error = String;
type FutureEgress = Box<Future<Item=ConsolidatedIngress, Error=String> + Send>;
fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress {
let session = self.network.instantiate_session(SessionParams {
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// Fetch ingress and accumulate all unrounted egress
let _session = self.network.instantiate_session(SessionParams {
local_session_key: None,
parent_hash: self.parent_hash,
authorities: self.authorities.clone(),
}).map_err(|e| format!("unable to instantiate validation session: {:?}", e));
let fetch_incoming = session
.and_then(move |session| session.fetch_incoming(id).map_err(|e|
format!("unable to fetch incoming data: {:?}", e)
))
.map(ConsolidatedIngress);
Box::new(fetch_incoming)
Box::new(future::ok(ConsolidatedIngress(Vec::new())))
}
}
......@@ -266,7 +263,7 @@ impl<P, E> Worker for CollationNode<P, E> where
};
let message_validator = polkadot_network::gossip::register_validator(
&*network,
network.clone(),
move |block_hash: &Hash| {
use client::BlockStatus;
use polkadot_network::gossip::Known;
......
......@@ -18,7 +18,6 @@ sr-primitives = { git = "https://github.com/paritytech/substrate", branch = "pol
futures = "0.1"
tokio = "0.1.7"
log = "0.4"
slice-group-by = "0.2.2"
exit-future = "0.1.4"
[dev-dependencies]
......
This diff is collapsed.
......@@ -31,10 +31,8 @@ extern crate polkadot_primitives;
extern crate arrayvec;
extern crate parking_lot;
extern crate tokio;
extern crate slice_group_by;
extern crate exit_future;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate log;
......
......@@ -25,10 +25,10 @@
use sr_primitives::traits::{ProvideRuntimeApi, BlakeTwo256, Hash as HashT};
use polkadot_validation::{
SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Outgoing, Validated
SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated
};
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost, Id as ParaId, Message,
use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost,
ValidatorIndex, Collation, PoVBlock,
};
use gossip::RegisteredMessageValidator;
......@@ -43,8 +43,6 @@ use std::sync::Arc;
use validation::{self, SessionDataFetcher, NetworkService, Executor};
type IngressPairRef<'a> = (ParaId, &'a [Message]);
/// Compute the gossip topic for attestations on the given parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
......@@ -86,16 +84,14 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
// this will block internally until the gossip messages stream is obtained.
self.network().gossip_messages_for(self.attestation_topic)
.filter_map(|msg| {
use crate::gossip::GossipMessage;
debug!(target: "validation", "Processing statement for live validation session");
crate::gossip::GossipMessage::decode(&mut &msg.message[..])
match GossipMessage::decode(&mut &msg.message[..]) {
Some(GossipMessage::Statement(s)) => Some(s.signed_statement),
_ => None,
}
})
.map(|msg| msg.statement)
}
/// Get access to the session data fetcher.
#[cfg(test)]
pub(crate) fn fetcher(&self) -> &SessionDataFetcher<P, E, N, T> {
&self.fetcher
}
fn parent_hash(&self) -> Hash {
......@@ -174,38 +170,6 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
}
}
/// Broadcast outgoing messages to peers.
pub(crate) fn broadcast_egress(&self, outgoing: Outgoing) {
use slice_group_by::LinearGroupBy;
let mut group_messages = Vec::new();
for egress in outgoing {
let source = egress.from;
let messages = egress.messages.outgoing_messages;
let groups = LinearGroupBy::new(&messages, |a, b| a.target == b.target);
for group in groups {
let target = match group.get(0) {
Some(msg) => msg.target,
None => continue, // skip empty.
};
group_messages.clear(); // reuse allocation from previous iterations.
group_messages.extend(group.iter().map(|msg| msg.data.clone()).map(Message));
debug!(target: "valdidation", "Circulating messages from {:?} to {:?} at {}",
source, target, self.parent_hash());
// this is the ingress from source to target, with given messages.
let target_incoming =
validation::incoming_message_topic(self.parent_hash(), target);
let ingress_for: IngressPairRef = (source, &group_messages[..]);
self.network().gossip_message(target_incoming, ingress_for.encode());
}
}
}
fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
-> impl Future<Item=(),Error=()> + Send + 'static
where
......@@ -263,7 +227,6 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
fn drop(&mut self) {
let parent_hash = self.parent_hash().clone();
self.message_validator.remove_session(&parent_hash);
self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
}
}
......
......@@ -27,11 +27,10 @@ use polkadot_primitives::parachain::{
ConsolidatedIngressRoots,
};
use substrate_primitives::crypto::UncheckedInto;
use sr_primitives::traits::Block as BlockT;
use codec::Encode;
use substrate_network::{
PeerId, PeerInfo, ClientHandle, Context, config::Roles,
message::{BlockRequest, generic::{ConsensusMessage, FinalityProofRequest}},
PeerId, Context, config::Roles,
message::generic::ConsensusMessage,
specialization::NetworkSpecialization, generic_message::Message as GenericMessage
};
......@@ -79,7 +78,6 @@ impl TestContext {
}
}
fn make_pov(block_data: Vec<u8>) -> PoVBlock {
PoVBlock {
block_data: BlockData(block_data),
......
......@@ -16,7 +16,9 @@
//! Tests and helpers for validation networking.
use validation::NetworkService;
#![allow(unused)]
use validation::{NetworkService, GossipService};
use substrate_network::Context as NetContext;
use substrate_network::consensus_gossip::TopicNotification;
use substrate_primitives::{NativeOrEncoded, ExecutionContext};
......@@ -151,7 +153,11 @@ impl NetworkService for TestNetwork {
let _ = self.gossip.send_message.unbounded_send((topic, notification));
}
fn drop_gossip(&self, _topic: Hash) {}
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut GossipService, &mut NetContext<Block>)
{
unimplemented!()
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
......@@ -342,6 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
let message_val = crate::gossip::RegisteredMessageValidator::new_test(
|_hash: &_| Some(crate::gossip::Known::Leaf),
Box::new(|_, _| {}),
);
TestValidationNetwork::new(
......@@ -408,95 +415,3 @@ fn make_table(data: &ApiData, local_key: &AuthorityKeyring, parent_hash: Hash) -
store,
))
}
#[test]
fn ingress_fetch_works() {
let mut runtime = Runtime::new().unwrap();
let built = build_network(3, runtime.executor());
let id_a: ParaId = 1.into();
let id_b: ParaId = 2.into();
let id_c: ParaId = 3.into();
let key_a = AuthorityKeyring::Alice;
let key_b = AuthorityKeyring::Bob;
let key_c = AuthorityKeyring::Charlie;
let messages_from_a = vec![
OutgoingMessage { target: id_b, data: vec![1, 2, 3] },
OutgoingMessage { target: id_b, data: vec![3, 4, 5] },
OutgoingMessage { target: id_c, data: vec![9, 9, 9] },
];
let messages_from_b = vec![
OutgoingMessage { target: id_a, data: vec![1, 1, 1, 1, 1,] },
OutgoingMessage { target: id_c, data: b"hello world".to_vec() },
];
let messages_from_c = vec![
OutgoingMessage { target: id_a, data: b"dog42".to_vec() },
OutgoingMessage { target: id_b, data: b"dogglesworth".to_vec() },
];
let ingress = {
let mut builder = IngressBuilder::default();
builder.add_messages(id_a, &messages_from_a);
builder.add_messages(id_b, &messages_from_b);
builder.add_messages(id_c, &messages_from_c);
builder.build()
};
let parent_hash = [1; 32].into();
let (router_a, router_b, router_c) = {
let validators: Vec<ValidatorId> = vec![
key_a.into(),
key_b.into(),
key_c.into(),
];
// NOTE: this is possible only because we are currently asserting that parachain validators
// share their crypto with the (Aura) authority set. Once that assumption breaks, so will this
// code.
let authorities = validators.clone();
let mut api_handle = built.api_handle.lock();
*api_handle = ApiData {
active_parachains: vec![id_a, id_b, id_c],
duties: vec![Chain::Parachain(id_a), Chain::Parachain(id_b), Chain::Parachain(id_c)],
validators,
ingress,
};
(
built.networks[0].communication_for(
make_table(&*api_handle, &key_a, parent_hash),
vec![MessagesFrom::from_messages(id_a, messages_from_a)],
&authorities,
),
built.networks[1].communication_for(
make_table(&*api_handle, &key_b, parent_hash),
vec![MessagesFrom::from_messages(id_b, messages_from_b)],
&authorities,
),
built.networks[2].communication_for(
make_table(&*api_handle, &key_c, parent_hash),
vec![MessagesFrom::from_messages(id_c, messages_from_c)],
&authorities,
),
)
};
// make sure everyone can get ingress for their own parachain.
let fetch_a = router_a.then(move |r| r.unwrap().fetcher()
.fetch_incoming(id_a).map_err(|_| format!("Could not fetch ingress_a")));
let fetch_b = router_b.then(move |r| r.unwrap().fetcher()
.fetch_incoming(id_b).map_err(|_| format!("Could not fetch ingress_b")));
let fetch_c = router_c.then(move |r| r.unwrap().fetcher()
.fetch_incoming(id_c).map_err(|_| format!("Could not fetch ingress_c")));
let work = fetch_a.join3(fetch_b, fetch_c);
runtime.spawn(built.gossip.then(|_| Ok(()))); // in background.
runtime.block_on(work).unwrap();
}
......@@ -19,13 +19,14 @@
//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
//! each time a validation session begins on a new chain head.
use sr_primitives::traits::{BlakeTwo256, ProvideRuntimeApi, Hash as HashT};
use substrate_network::Context as NetContext;
use substrate_network::consensus_gossip::{TopicNotification, MessageRecipient as GossipMessageRecipient};
use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::{PeerId, Context as NetContext};
use substrate_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, Message, CandidateReceipt, CollatorId, ValidatorId, PoVBlock, ValidatorIndex};
use codec::{Encode, Decode};
use polkadot_primitives::parachain::{Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId, ValidatorId, PoVBlock, ValidatorIndex};
use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor};
......@@ -71,6 +72,17 @@ impl Executor for TaskExecutor {
}
}
/// A gossip network subservice.
pub trait GossipService {
fn send_message(&mut self, ctx: &mut NetContext<Block>, who: &PeerId, message: ConsensusMessage);
}
impl GossipService for consensus_gossip::ConsensusGossip<Block> {
fn send_message(&mut self, ctx: &mut NetContext<Block>, who: &PeerId, message: ConsensusMessage) {
consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
}
}
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
/// Get a stream of gossip messages for a given hash.
......@@ -79,8 +91,9 @@ pub trait NetworkService: Send + Sync + 'static {
/// Gossip a message on given topic.
fn gossip_message(&self, topic: Hash, message: Vec<u8>);
/// Drop a gossip topic.
fn drop_gossip(&self, topic: Hash);
/// Execute a closure with the gossip service.
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut GossipService, &mut NetContext<Block>);
/// Execute a closure with the polkadot protocol.
fn with_spec<F: Send + 'static>(&self, with: F)
......@@ -91,7 +104,7 @@ impl NetworkService for super::NetworkService {
fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> {
let (tx, rx) = std::sync::mpsc::channel();
self.with_gossip(move |gossip, _| {
super::NetworkService::with_gossip(self, move |gossip, _| {
let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
let _ = tx.send(inner_rx);
});
......@@ -111,7 +124,11 @@ impl NetworkService for super::NetworkService {
);
}
fn drop_gossip(&self, _topic: Hash) { }
fn with_gossip<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut GossipService, &mut NetContext<Block>)
{
super::NetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
}
fn with_spec<F: Send + 'static>(&self, with: F)
where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
......@@ -199,16 +216,20 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
.collect();
let (tx, rx) = oneshot::channel();
self.network.with_spec(move |spec, ctx| {
// before requesting messages, note live consensus session.
message_validator.note_session(
parent_hash,
MessageValidationData {
authorities: params.authorities.clone(),
index_mapping,
},
);
{
let message_validator = self.message_validator.clone();
let authorities = params.authorities.clone();
self.network.with_gossip(move |gossip, ctx| {
message_validator.note_session(
parent_hash,
MessageValidationData { authorities, index_mapping },
|peer_id, message| gossip.send_message(ctx, peer_id, message),
);
});
}
self.network.with_spec(move |spec, ctx| {
let session = spec.new_validation_session(ctx, params);
let _ = tx.send(SessionDataFetcher {
network,
......@@ -217,7 +238,6 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
parent_hash,
knowledge: session.knowledge().clone(),
exit,
fetch_incoming: session.fetched_incoming().clone(),
message_validator,
});
});
......@@ -241,7 +261,6 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
fn communication_for(
&self,
table: Arc<SharedTable>,
outgoing: polkadot_validation::Outgoing,
authorities: &[ValidatorId],
) -> Self::BuildTableRouter {
let parent_hash = table.consensus_parent_hash().clone();
......@@ -264,8 +283,6 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
message_validator,
);
table_router.broadcast_egress(outgoing);
let table_router_clone = table_router.clone();
let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
......@@ -406,22 +423,12 @@ impl Future for IncomingReceiver {
}
}
/// Incoming message gossip topic for a parachain at a given block hash.
pub(crate) fn incoming_message_topic(parent_hash: Hash, parachain: ParaId) -> Hash {
let mut v = parent_hash.as_ref().to_vec();
parachain.using_encoded(|s| v.extend(s));
v.extend(b"incoming");
BlakeTwo256::hash(&v[..])
}
/// A current validation session instance.
#[derive(Clone)]
pub(crate) struct ValidationSession {
parent_hash: Hash,
knowledge: Arc<Mutex<Knowledge>>,
local_session_key: Option<ValidatorId>,
fetch_incoming: Arc<Mutex<FetchIncoming>>,
}
impl ValidationSession {
......@@ -432,7 +439,6 @@ impl ValidationSession {
parent_hash: params.parent_hash,
knowledge: Arc::new(Mutex::new(Knowledge::new())),
local_session_key: params.local_session_key,
fetch_incoming: Arc::new(Mutex::new(FetchIncoming::new())),
}
}
......@@ -442,11 +448,6 @@ impl ValidationSession {
&self.knowledge
}
/// Get a handle to the shared list of parachains' incoming data fetch.
pub(crate) fn fetched_incoming(&self) -> &Arc<Mutex<FetchIncoming>> {
&self.fetch_incoming
}
// execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities
// we believe should have the data.
fn with_pov_block<F, U>(&self, hash: &Hash, f: F) -> U
......@@ -646,58 +647,10 @@ impl Future for PoVReceiver {
}
}
/// Wrapper around bookkeeping for tracking which parachains we're fetching incoming messages
/// for.
pub(crate) struct FetchIncoming {
exit_signal: ::exit_future::Signal,
parachains_fetching: HashMap<ParaId, IncomingReceiver>,
}
impl FetchIncoming {
fn new() -> Self {
FetchIncoming {
exit_signal: ::exit_future::signal_only(),
parachains_fetching: HashMap::new(),
}
}
// registers intent to fetch incoming. returns an optional piece of work
// that, if some, is needed to be run to completion in order for the future to
// resolve.
//
// impl Future has a bug here where it wrongly assigns a `'static` bound to `M`.
fn fetch_with_work<M, W>(&mut self, para_id: ParaId, make_work: M)
-> (IncomingReceiver, Option<Box<Future<Item=(),Error=()> + Send>>) where
M: FnOnce() -> W,
W: Future<Item=Option<Incoming>> + Send + 'static,
{
let (tx, rx) = match self.parachains_fetching.entry(para_id) {
Entry::Occupied(entry) => return (entry.get().clone(), None),
Entry::Vacant(entry) => {
// has not been requested yet.
let (tx, rx) = oneshot::channel();
let rx = IncomingReceiver { inner: rx.shared() };
entry.insert(rx.clone());
(tx, rx)
}
};
let exit = self.exit_signal.make_exit();
let work = make_work()
.map(move |incoming| if let Some(i) = incoming { let _ = tx.send(i); })
.select2(exit)
.then(|_| Ok(()));
(rx, Some(Box::new(work)))
}
}
/// Can fetch data for a given validation session
pub struct SessionDataFetcher<P, E, N: NetworkService, T> {
network: Arc<N>,
api: Arc<P>,
fetch_incoming: Arc<Mutex<FetchIncoming>>,
exit: E,
task_executor: T,
knowledge: Arc<Mutex<Knowledge>>,
......@@ -744,7 +697,6 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E
api: self.api.clone(),
task_executor: self.task_executor.clone(),
parent_hash: self.parent_hash.clone(),
fetch_incoming: self.fetch_incoming.clone(),
knowledge: self.knowledge.clone(),
exit: self.exit.clone(),
message_validator: self.message_validator.clone(),
......@@ -783,130 +735,11 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
});
PoVReceiver { outer: rx, inner: None }
}
/// Fetch incoming messages for a parachain.
pub fn fetch_incoming(&self, parachain: ParaId) -> IncomingReceiver {
let (rx, work) = self.fetch_incoming.lock().fetch_with_work(parachain.clone(), move || {
let parent_hash: Hash = self.parent_hash();
let topic = incoming_message_topic(parent_hash, parachain);
let gossip_messages = self.network().gossip_messages_for(topic)
.map_err(|()| panic!("unbounded receivers do not throw errors; qed"))
.filter_map(|msg| IngressPair::decode(&mut msg.message.as_slice()));
let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
.map_err(|e| format!("Cannot fetch ingress for parachain {:?} at {:?}: {:?}",
parachain, parent_hash, e)
);
canon_roots.into_future()
.and_then(move |ingress_roots| match ingress_roots {
None => Err(format!("No parachain {:?} registered at {}", parachain, parent_hash)),
Some(roots) => Ok(roots.0.into_iter().collect())
})
.and_then(move |ingress_roots| ComputeIngress {
inner: gossip_messages,
ingress_roots,
incoming: Vec::new(),
})
.select2(self.exit.clone())
.map(|res| match res {
future::Either::A((incoming, _)) => incoming,
future::Either::B(_) => None,
})
});
if let Some(work) = work {
self.task_executor.spawn(work);
}
rx