Unverified Commit b84f3c03 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

NetworkBridge: validator (authorities) discovery api (#1699)



* stupid, but it compiles

* redo

* cleanup

* add ValidatorDiscovery to msgs

* sketch network bridge code

* ConnectToAuthorities instead of validators

* more stuff

* cleanup

* more stuff

* complete ConnectToAuthoritiesState

* Update node/network/bridge/src/lib.rs

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Collator protocol subsystem (#1659)

* WIP

* The initial implementation of the collator side.

* Improve comments

* Multiple collation requests

* Add more tests and comments to validator side

* Add comments, remove dead code

* Apply suggestions from code review

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Fix build after suggested changes

* Also connect to the next validator group

* Remove a Future impl and move TimeoutExt to util

* Minor nits

* Fix build

* Change FetchCollations back to FetchCollation

* Try this

* Final fixes

* Fix build

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* handle multiple in-flight connection requests

* handle cancelled requests

* Update node/core/runtime-api/src/lib.rs

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* redo it again

* more stuff

* redo it again

* update comments

* workaround Future is not Send

* fix trailing spaces

* clarify comments

* bridge: fix compilation in tests

* update more comments

* small fixes

* port collator protocol to new validator discovery api

* collator tests compile

* collator tests pass

* do not revoke a request when the stream receiver is closed

* make revoking opt-in

* fix is_fulfilled

* handle request revokation in collator

* tests

* wait for validator connections asyncronously

* fix compilation

* relabel my todos

* apply Fedor's patch

* resolve reconnection TODO

* resolve revoking TODO

* resolve channel capacity TODO

* resolve peer cloning TODO

* resolve peer disconnected TODO

* resolve PeerSet TODO

* wip tests

* more tests

* resolve Arc TODO

* rename pending to non_revoked

* one more test

* extract utility function into util crate

* fix compilation in tests

* Apply suggestions from code review

Co-authored-by: Fedor Sakharov's avatarFedor Sakharov <fedor.sakharov@gmail.com>

* revert pin_project removal

* fix while let loop

* Revert "revert pin_project removal"

This reverts commit ae7f529d

.

* fix compilation

* Update node/subsystem/src/messages.rs

* docs on pub items

* guide updates

* remove a TODO

* small guide update

* fix a typo

* link to the issue

* validator discovery: on_request docs

Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
Co-authored-by: Fedor Sakharov's avatarFedor Sakharov <fedor.sakharov@gmail.com>
Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>
parent b56f2b30
Pipeline #109688 passed with stages
in 23 minutes and 15 seconds
...@@ -4799,6 +4799,7 @@ name = "polkadot-network-bridge" ...@@ -4799,6 +4799,7 @@ name = "polkadot-network-bridge"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"assert_matches", "assert_matches",
"async-trait",
"futures 0.3.5", "futures 0.3.5",
"futures-timer 3.0.2", "futures-timer 3.0.2",
"log 0.4.11", "log 0.4.11",
...@@ -4808,6 +4809,7 @@ dependencies = [ ...@@ -4808,6 +4809,7 @@ dependencies = [
"polkadot-node-subsystem", "polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-test-helpers",
"polkadot-primitives", "polkadot-primitives",
"sc-authority-discovery",
"sc-network", "sc-network",
"sp-core", "sp-core",
"sp-keyring", "sp-keyring",
...@@ -5168,6 +5170,7 @@ dependencies = [ ...@@ -5168,6 +5170,7 @@ dependencies = [
"sp-api", "sp-api",
"sp-application-crypto", "sp-application-crypto",
"sp-arithmetic", "sp-arithmetic",
"sp-authority-discovery",
"sp-core", "sp-core",
"sp-inherents", "sp-inherents",
"sp-runtime", "sp-runtime",
...@@ -5334,6 +5337,7 @@ dependencies = [ ...@@ -5334,6 +5337,7 @@ dependencies = [
"hex-literal 0.2.1", "hex-literal 0.2.1",
"libsecp256k1", "libsecp256k1",
"log 0.3.9", "log 0.3.9",
"pallet-authority-discovery",
"pallet-authorship", "pallet-authorship",
"pallet-babe", "pallet-babe",
"pallet-balances", "pallet-balances",
......
...@@ -120,6 +120,7 @@ fn make_runtime_api_request<Client>( ...@@ -120,6 +120,7 @@ fn make_runtime_api_request<Client>(
Request::CandidatePendingAvailability(para, sender) => Request::CandidatePendingAvailability(para, sender) =>
query!(candidate_pending_availability(para), sender), query!(candidate_pending_availability(para), sender),
Request::CandidateEvents(sender) => query!(candidate_events(), sender), Request::CandidateEvents(sender) => query!(candidate_events(), sender),
Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender),
} }
} }
...@@ -169,7 +170,7 @@ mod tests { ...@@ -169,7 +170,7 @@ mod tests {
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData, ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData,
Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode, Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode,
CommittedCandidateReceipt, CandidateEvent, CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId,
}; };
use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_test_helpers as test_helpers;
use sp_core::testing::TaskExecutor; use sp_core::testing::TaskExecutor;
...@@ -258,6 +259,10 @@ mod tests { ...@@ -258,6 +259,10 @@ mod tests {
fn candidate_events(&self) -> Vec<CandidateEvent> { fn candidate_events(&self) -> Vec<CandidateEvent> {
self.candidate_events.clone() self.candidate_events.clone()
} }
fn validator_discovery(ids: Vec<ValidatorId>) -> Vec<Option<AuthorityDiscoveryId>> {
vec![None; ids.len()]
}
} }
} }
......
...@@ -5,12 +5,14 @@ authors = ["Parity Technologies <admin@parity.io>"] ...@@ -5,12 +5,14 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
async-trait = "0.1"
futures = "0.3.5" futures = "0.3.5"
log = "0.4.8" log = "0.4.8"
futures-timer = "3.0.2" futures-timer = "3.0.2"
streamunordered = "0.5.1" streamunordered = "0.5.1"
polkadot-primitives = { path = "../../../primitives" } polkadot-primitives = { path = "../../../primitives" }
parity-scale-codec = "1.3.4" parity-scale-codec = "1.3.4"
sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
......
...@@ -20,7 +20,7 @@ use parity_scale_codec::{Encode, Decode}; ...@@ -20,7 +20,7 @@ use parity_scale_codec::{Encode, Decode};
use futures::prelude::*; use futures::prelude::*;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::stream::BoxStream; use futures::stream::BoxStream;
use futures::channel::oneshot; use futures::channel::{mpsc, oneshot};
use sc_network::Event as NetworkEvent; use sc_network::Event as NetworkEvent;
use sp_runtime::ConsensusEngineId; use sp_runtime::ConsensusEngineId;
...@@ -34,16 +34,19 @@ use polkadot_subsystem::messages::{ ...@@ -34,16 +34,19 @@ use polkadot_subsystem::messages::{
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage, BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
CollatorProtocolMessage, CollatorProtocolMessage,
}; };
use polkadot_primitives::v1::{Block, Hash, ValidatorId}; use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
use polkadot_node_network_protocol::{ use polkadot_node_network_protocol::{
ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1 ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1
}; };
use std::collections::hash_map::{HashMap, Entry as HEntry}; use std::collections::{HashMap, hash_map};
use std::iter::ExactSizeIterator; use std::iter::ExactSizeIterator;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
mod validator_discovery;
/// The maximum amount of heads a peer is allowed to have in their view at any time. /// The maximum amount of heads a peer is allowed to have in their view at any time.
/// ///
/// We use the same limit to compute the view sent to peers locally. /// We use the same limit to compute the view sent to peers locally.
...@@ -188,29 +191,41 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> { ...@@ -188,29 +191,41 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
} }
/// The network bridge subsystem. /// The network bridge subsystem.
pub struct NetworkBridge<N>(N); pub struct NetworkBridge<N, AD> {
network_service: N,
authority_discovery_service: AD,
}
impl<N> NetworkBridge<N> { impl<N, AD> NetworkBridge<N, AD> {
/// Create a new network bridge subsystem with underlying network service. /// Create a new network bridge subsystem with underlying network service and authority discovery service.
/// ///
/// This assumes that the network service has had the notifications protocol for the network /// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info). /// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
pub fn new(net_service: N) -> Self { pub fn new(network_service: N, authority_discovery_service: AD) -> Self {
NetworkBridge(net_service) NetworkBridge {
network_service,
authority_discovery_service,
}
} }
} }
impl<Net, Context> Subsystem<Context> for NetworkBridge<Net> impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
where where
Net: Network, Net: Network + validator_discovery::Network,
AD: validator_discovery::AuthorityDiscovery,
Context: SubsystemContext<Message=NetworkBridgeMessage>, Context: SubsystemContext<Message=NetworkBridgeMessage>,
{ {
fn start(self, ctx: Context) -> SpawnedSubsystem { fn start(self, ctx: Context) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision // Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`. // within `run_network`.
let Self { network_service, authority_discovery_service } = self;
SpawnedSubsystem { SpawnedSubsystem {
name: "network-bridge-subsystem", name: "network-bridge-subsystem",
future: run_network(self.0, ctx).map(|_| ()).boxed(), future: run_network(
network_service,
authority_discovery_service,
ctx,
).map(|_| ()).boxed(),
} }
} }
} }
...@@ -224,7 +239,11 @@ struct PeerData { ...@@ -224,7 +239,11 @@ struct PeerData {
enum Action { enum Action {
SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol), SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol),
SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol), SendCollationMessage(Vec<PeerId>, protocol_v1::CollationProtocol),
ConnectToValidators(PeerSet, Vec<ValidatorId>, oneshot::Sender<Vec<(ValidatorId, PeerId)>>), ConnectToValidators {
validator_ids: Vec<AuthorityDiscoveryId>,
connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>,
revoke: oneshot::Receiver<()>,
},
ReportPeer(PeerId, ReputationChange), ReportPeer(PeerId, ReputationChange),
ActiveLeaves(ActiveLeavesUpdate), ActiveLeaves(ActiveLeavesUpdate),
...@@ -254,8 +273,11 @@ fn action_from_overseer_message( ...@@ -254,8 +273,11 @@ fn action_from_overseer_message(
=> Action::SendValidationMessage(peers, msg), => Action::SendValidationMessage(peers, msg),
NetworkBridgeMessage::SendCollationMessage(peers, msg) NetworkBridgeMessage::SendCollationMessage(peers, msg)
=> Action::SendCollationMessage(peers, msg), => Action::SendCollationMessage(peers, msg),
NetworkBridgeMessage::ConnectToValidators(peer_set, validators, res) NetworkBridgeMessage::ConnectToValidators {
=> Action::ConnectToValidators(peer_set, validators, res), validator_ids,
connected,
revoke,
} => Action::ConnectToValidators { validator_ids, connected, revoke },
}, },
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_))) Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_)))
=> Action::Nop, => Action::Nop,
...@@ -538,11 +560,16 @@ async fn dispatch_collation_events_to_all<I>( ...@@ -538,11 +560,16 @@ async fn dispatch_collation_events_to_all<I>(
ctx.send_messages(events.into_iter().flat_map(messages_for)).await ctx.send_messages(events.into_iter().flat_map(messages_for)).await
} }
async fn run_network<N: Network>( async fn run_network<N, AD>(
mut net: N, mut network_service: N,
mut authority_discovery_service: AD,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>, mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()> { ) -> SubsystemResult<()>
let mut event_stream = net.event_stream().fuse(); where
N: Network + validator_discovery::Network,
AD: validator_discovery::AuthorityDiscovery,
{
let mut event_stream = network_service.event_stream().fuse();
// Most recent heads are at the back. // Most recent heads are at the back.
let mut live_heads: Vec<Hash> = Vec::with_capacity(MAX_VIEW_HEADS); let mut live_heads: Vec<Hash> = Vec::with_capacity(MAX_VIEW_HEADS);
...@@ -551,7 +578,10 @@ async fn run_network<N: Network>( ...@@ -551,7 +578,10 @@ async fn run_network<N: Network>(
let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new(); let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new(); let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut validator_discovery = validator_discovery::Service::<N, AD>::new();
loop { loop {
let action = { let action = {
let subsystem_next = ctx.recv().fuse(); let subsystem_next = ctx.recv().fuse();
let mut net_event_next = event_stream.next().fuse(); let mut net_event_next = event_stream.next().fuse();
...@@ -568,31 +598,43 @@ async fn run_network<N: Network>( ...@@ -568,31 +598,43 @@ async fn run_network<N: Network>(
Action::Abort => return Ok(()), Action::Abort => return Ok(()),
Action::SendValidationMessage(peers, msg) => send_message( Action::SendValidationMessage(peers, msg) => send_message(
&mut net, &mut network_service,
peers, peers,
PeerSet::Validation, PeerSet::Validation,
WireMessage::ProtocolMessage(msg), WireMessage::ProtocolMessage(msg),
).await?, ).await?,
Action::SendCollationMessage(peers, msg) => send_message( Action::SendCollationMessage(peers, msg) => send_message(
&mut net, &mut network_service,
peers, peers,
PeerSet::Collation, PeerSet::Collation,
WireMessage::ProtocolMessage(msg), WireMessage::ProtocolMessage(msg),
).await?, ).await?,
Action::ConnectToValidators(_peer_set, _validators, _res) => { Action::ConnectToValidators {
// TODO: https://github.com/paritytech/polkadot/issues/1461 validator_ids,
} connected,
revoke,
} => {
let (ns, ads) = validator_discovery.on_request(
validator_ids,
connected,
revoke,
network_service,
authority_discovery_service,
).await;
network_service = ns;
authority_discovery_service = ads;
},
Action::ReportPeer(peer, rep) => net.report_peer(peer, rep).await?, Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?,
Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
live_heads.extend(activated); live_heads.extend(activated);
live_heads.retain(|h| !deactivated.contains(h)); live_heads.retain(|h| !deactivated.contains(h));
update_view( update_view(
&mut net, &mut network_service,
&mut ctx, &mut ctx,
&live_heads, &live_heads,
&mut local_view, &mut local_view,
...@@ -607,9 +649,11 @@ async fn run_network<N: Network>( ...@@ -607,9 +649,11 @@ async fn run_network<N: Network>(
PeerSet::Collation => &mut collation_peers, PeerSet::Collation => &mut collation_peers,
}; };
validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await;
match peer_map.entry(peer.clone()) { match peer_map.entry(peer.clone()) {
HEntry::Occupied(_) => continue, hash_map::Entry::Occupied(_) => continue,
HEntry::Vacant(vacant) => { hash_map::Entry::Vacant(vacant) => {
vacant.insert(PeerData { vacant.insert(PeerData {
view: View(Vec::new()), view: View(Vec::new()),
}); });
...@@ -650,6 +694,8 @@ async fn run_network<N: Network>( ...@@ -650,6 +694,8 @@ async fn run_network<N: Network>(
PeerSet::Collation => &mut collation_peers, PeerSet::Collation => &mut collation_peers,
}; };
validator_discovery.on_peer_disconnected(&peer, &mut authority_discovery_service).await;
if peer_map.remove(&peer).is_some() { if peer_map.remove(&peer).is_some() {
let res = match peer_set { let res = match peer_set {
PeerSet::Validation => dispatch_validation_event_to_all( PeerSet::Validation => dispatch_validation_event_to_all(
...@@ -677,7 +723,7 @@ async fn run_network<N: Network>( ...@@ -677,7 +723,7 @@ async fn run_network<N: Network>(
peer.clone(), peer.clone(),
&mut validation_peers, &mut validation_peers,
v_messages, v_messages,
&mut net, &mut network_service,
).await?; ).await?;
if let Err(e) = dispatch_validation_events_to_all( if let Err(e) = dispatch_validation_events_to_all(
...@@ -697,7 +743,7 @@ async fn run_network<N: Network>( ...@@ -697,7 +743,7 @@ async fn run_network<N: Network>(
peer.clone(), peer.clone(),
&mut collation_peers, &mut collation_peers,
c_messages, c_messages,
&mut net, &mut network_service,
).await?; ).await?;
if let Err(e) = dispatch_collation_events_to_all( if let Err(e) = dispatch_collation_events_to_all(
...@@ -716,6 +762,7 @@ async fn run_network<N: Network>( ...@@ -716,6 +762,7 @@ async fn run_network<N: Network>(
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
...@@ -723,6 +770,8 @@ mod tests { ...@@ -723,6 +770,8 @@ mod tests {
use futures::executor; use futures::executor;
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashSet;
use async_trait::async_trait;
use parking_lot::Mutex; use parking_lot::Mutex;
use assert_matches::assert_matches; use assert_matches::assert_matches;
...@@ -730,6 +779,7 @@ mod tests { ...@@ -730,6 +779,7 @@ mod tests {
use polkadot_node_subsystem_test_helpers::{ use polkadot_node_subsystem_test_helpers::{
SingleItemSink, SingleItemStream, TestSubsystemContextHandle, SingleItemSink, SingleItemStream, TestSubsystemContextHandle,
}; };
use sc_network::Multiaddr;
use sp_keyring::Sr25519Keyring; use sp_keyring::Sr25519Keyring;
// The subsystem's view of the network - only supports a single call to `event_stream`. // The subsystem's view of the network - only supports a single call to `event_stream`.
...@@ -738,6 +788,8 @@ mod tests { ...@@ -738,6 +788,8 @@ mod tests {
action_tx: mpsc::UnboundedSender<NetworkAction>, action_tx: mpsc::UnboundedSender<NetworkAction>,
} }
struct TestAuthorityDiscovery;
// The test's view of the network. This receives updates from the subsystem in the form // The test's view of the network. This receives updates from the subsystem in the form
// of `NetworkAction`s. // of `NetworkAction`s.
struct TestNetworkHandle { struct TestNetworkHandle {
...@@ -748,6 +800,7 @@ mod tests { ...@@ -748,6 +800,7 @@ mod tests {
fn new_test_network() -> ( fn new_test_network() -> (
TestNetwork, TestNetwork,
TestNetworkHandle, TestNetworkHandle,
TestAuthorityDiscovery,
) { ) {
let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink();
let (action_tx, action_rx) = mpsc::unbounded(); let (action_tx, action_rx) = mpsc::unbounded();
...@@ -761,6 +814,7 @@ mod tests { ...@@ -761,6 +814,7 @@ mod tests {
action_rx, action_rx,
net_tx, net_tx,
}, },
TestAuthorityDiscovery,
) )
} }
...@@ -786,6 +840,23 @@ mod tests { ...@@ -786,6 +840,23 @@ mod tests {
} }
} }
impl validator_discovery::Network for TestNetwork {
fn set_priority_group(&self, _group_id: String, _multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
Ok(())
}
}
#[async_trait]
impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery {
async fn get_addresses_by_authority_id(&mut self, _authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>> {
None
}
async fn get_authority_id_by_peer_id(&mut self, _peer_id: PeerId) -> Option<AuthorityDiscoveryId> {
None
}
}
impl TestNetworkHandle { impl TestNetworkHandle {
// Get the next network action. // Get the next network action.
async fn next_network_action(&mut self) -> NetworkAction { async fn next_network_action(&mut self) -> NetworkAction {
...@@ -842,11 +913,12 @@ mod tests { ...@@ -842,11 +913,12 @@ mod tests {
fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) { fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::TaskExecutor::new(); let pool = sp_core::testing::TaskExecutor::new();
let (network, network_handle) = new_test_network(); let (network, network_handle, discovery) = new_test_network();
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let network_bridge = run_network( let network_bridge = run_network(
network, network,
discovery,
context, context,
) )
.map_err(|_| panic!("subsystem execution failed")) .map_err(|_| panic!("subsystem execution failed"))
......
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! A validator discovery service for the Network Bridge.
use core::marker::PhantomData;
use std::collections::{HashSet, HashMap, hash_map};
use std::sync::Arc;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use sc_network::Multiaddr;
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
const PRIORITY_GROUP: &'static str = "parachain_validators";
/// An abstraction over networking for the purposes of validator discovery service.
pub trait Network: Send + 'static {
/// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group.
fn set_priority_group(&self, group_id: String, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
// TODO (ordian): we might want to add `add_to_priority_group` and `remove_from_priority_group`
// https://github.com/paritytech/polkadot/issues/1763
}
/// An abstraction over the authority discovery service.
#[async_trait]
pub trait AuthorityDiscovery: Send + 'static {
/// Get the addresses for the given [`AuthorityId`] from the local address cache.
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>>;