Unverified Commit ecc3772d authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Generic request/response infrastructure for Polkadot (#2352)

* Move NetworkBridgeEvent to subsystem::messages.

It is not protocol related at all, it is in fact only part of the
subsystem communication as it gets wrapped into messages of each
subsystem.

* Request/response infrastructure is taking shape.

WIP: Does not compile.

* Multiplexer variant not supported by Rusts type system.

* request_response::request type checks.

* Cleanup.

* Minor fixes for request_response.

* Implement request sending + move multiplexer.

Request multiplexer is moved to bridge, as there the implementation is
more straight forward as we can specialize on `AllMessages` for the
multiplexing target.

Sending of requests is mostly complete, apart from a few `From`
instances. Receiving is also almost done, initializtion needs to be
fixed and the multiplexer needs to be invoked.

* Remove obsolete multiplexer.

* Initialize bridge with multiplexer.

* Finish generic request sending/receiving.

Subsystems are now able to receive and send requests and responses via
the overseer.

* Doc update.

* Fixes.

* Link issue for not yet implemented code.

* Fixes suggested by @ordian

 - thanks!

- start encoding at 0
- don't crash on zero protocols
- don't panic on not yet implemented request handling

* Update node/network/protocol/src/request_response/v1.rs

Use index 0 instead of 1.

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Update node/network/protocol/src/request_response.rs

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>

* Fix existing tests.

* Better avoidance of division by zoro errors.

* Doc fixes.

* send_request -> start_request.

* Fix missing renamings.

* Update substrate.

* Pass TryConnect instead of true.

* Actually import `IfDisconnected`.

* Fix wrong import.

* Update node/network/bridge/src/lib.rs

typo

Co-authored-by: Pierre Krieger's avatarPierre Krieger <pierre.krieger1708@gmail.com>

* Update node/network/bridge/src/multiplexer.rs

Remove redundant import.

Co-authored-by: Pierre Krieger's avatarPierre Krieger <pierre.krieger1708@gmail.com>

* Stop doing tracing from within `From` instance.

Thanks for the catch @tomaka

!

* Get rid of redundant import.

* Formatting cleanup.

* Fix tests.

* Add link to issue.

* Clarify comments some more.

* Fix tests.

* Formatting fix.

* tabs

* Fix link

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* Use map_err.

Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>

* Improvements inspired by suggestions by @drahnr.

- Channel size is now determined by function.
- Explicitely scope NetworkService::start_request.

Co-authored-by: Andronik Ordian's avatarAndronik Ordian <write@reusable.software>
Co-authored-by: Pierre Krieger's avatarPierre Krieger <pierre.krieger1708@gmail.com>
Co-authored-by: default avatarBernhard Schuster <bernhard@ahoi.io>
parent 90e6d6e8
Pipeline #122989 passed with stages
in 27 minutes and 18 seconds
This diff is collapsed.
......@@ -35,13 +35,13 @@ use polkadot_node_primitives::{
use polkadot_node_subsystem::{
messages::{
AllMessages, ApprovalDistributionMessage, ApprovalVotingMessage, NetworkBridgeMessage,
AssignmentCheckResult, ApprovalCheckResult,
AssignmentCheckResult, ApprovalCheckResult, NetworkBridgeEvent,
},
ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_network_protocol::{
PeerId, View, NetworkBridgeEvent, v1 as protocol_v1, ReputationChange as Rep,
PeerId, View, v1 as protocol_v1, ReputationChange as Rep,
};
const LOG_TARGET: &str = "approval_distribution";
......
......@@ -32,7 +32,7 @@ use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, View, OurView,
v1 as protocol_v1, PeerId, ReputationChange as Rep, View, OurView,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{
......@@ -42,7 +42,7 @@ use polkadot_primitives::v1::{
};
use polkadot_subsystem::messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, ChainApiMessage,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest,
NetworkBridgeMessage, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent
};
use polkadot_subsystem::{
jaeger, errors::{ChainApiError, RuntimeApiError}, PerLeafSpan,
......@@ -843,6 +843,15 @@ impl AvailabilityDistributionSubsystem {
);
}
}
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::AvailabilityFetchingRequest(_),
} => {
// TODO: Implement issue 2306:
tracing::warn!(
target: LOG_TARGET,
"To be implemented, see: https://github.com/paritytech/polkadot/issues/2306 !",
);
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: _,
deactivated: _,
......
......@@ -39,10 +39,11 @@ use polkadot_subsystem::{
errors::RecoveryError,
messages::{
AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage,
NetworkBridgeEvent,
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, NetworkBridgeEvent, PeerId, ReputationChange as Rep, RequestId,
v1 as protocol_v1, PeerId, ReputationChange as Rep, RequestId,
};
use polkadot_node_subsystem_util::{
Timeout, TimeoutExt,
......
......@@ -30,7 +30,7 @@ use polkadot_primitives::v1::{
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest}, JaegerSpan};
use polkadot_subsystem::{messages::{RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeEvent}, JaegerSpan};
type VirtualOverseer = test_helpers::TestSubsystemContextHandle<AvailabilityRecoveryMessage>;
......
......@@ -27,12 +27,12 @@ use futures::{channel::oneshot, FutureExt};
use polkadot_subsystem::messages::*;
use polkadot_subsystem::{
PerLeafSpan, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem, SubsystemContext,
SubsystemResult,
PerLeafSpan, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, Subsystem,
SubsystemContext, SubsystemResult,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{Hash, SignedAvailabilityBitfield, SigningContext, ValidatorId};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, NetworkBridgeEvent, View, ReputationChange, OurView};
use polkadot_node_network_protocol::{v1 as protocol_v1, PeerId, View, ReputationChange, OurView};
use std::collections::{HashMap, HashSet};
const COST_SIGNATURE_INVALID: ReputationChange =
......
......@@ -15,6 +15,7 @@ sc-authority-discovery = { git = "https://github.com/paritytech/substrate", bran
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-network-protocol = { path = "../protocol" }
strum = "0.20.0"
[dev-dependencies]
assert_matches = "1.4.0"
......
......@@ -22,13 +22,14 @@ use polkadot_node_network_protocol::{
peer_set::PeerSet, v1 as protocol_v1, PeerId, ReputationChange,
};
use polkadot_primitives::v1::{AuthorityDiscoveryId, BlockNumber};
use polkadot_subsystem::messages::NetworkBridgeMessage;
use polkadot_subsystem::messages::{AllMessages, NetworkBridgeMessage};
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal};
use sc_network::Event as NetworkEvent;
use polkadot_node_network_protocol::ObservedRole;
use polkadot_node_network_protocol::{request_response::Requests, ObservedRole};
use super::{WireMessage, LOG_TARGET, MALFORMED_MESSAGE_COST};
use super::multiplexer::RequestMultiplexError;
use super::{WireMessage, MALFORMED_MESSAGE_COST};
/// Internal type combining all actions a `NetworkBridge` might perform.
///
......@@ -43,6 +44,9 @@ pub(crate) enum Action {
/// Ask network to send a collation message.
SendCollationMessages(Vec<(Vec<PeerId>, protocol_v1::CollationProtocol)>),
/// Ask network to send requests.
SendRequests(Vec<Requests>),
/// Ask network to connect to validators.
ConnectToValidators {
validator_ids: Vec<AuthorityDiscoveryId>,
......@@ -76,13 +80,32 @@ pub(crate) enum Action {
Vec<WireMessage<protocol_v1::CollationProtocol>>,
),
Abort,
/// Send a message to another subsystem or the overseer.
///
/// Used for handling incoming requests.
SendMessage(AllMessages),
/// Abort with reason.
Abort(AbortReason),
Nop,
}
#[derive(Debug)]
pub(crate) enum AbortReason {
/// Received error from overseer:
SubsystemError(polkadot_subsystem::SubsystemError),
/// The stream of incoming events concluded.
EventStreamConcluded,
/// The stream of incoming requests concluded.
RequestStreamConcluded,
/// We received OverseerSignal::Conclude
OverseerConcluded,
}
impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>> for Action {
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn from(res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>) -> Self {
fn from(
res: polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>>,
) -> Self {
match res {
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) => {
Action::ActiveLeaves(active_leaves)
......@@ -90,7 +113,9 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number))) => {
Action::BlockFinalized(number)
}
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => {
Action::Abort(AbortReason::OverseerConcluded)
}
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
......@@ -99,6 +124,7 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
Action::SendCollationMessages(vec![(peers, msg)])
}
NetworkBridgeMessage::SendRequests(reqs) => Action::SendRequests(reqs),
NetworkBridgeMessage::SendValidationMessages(msgs) => {
Action::SendValidationMessages(msgs)
}
......@@ -113,25 +139,15 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
connected,
},
},
Err(e) => {
tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error");
Action::Abort
}
Err(e) => Action::Abort(AbortReason::SubsystemError(e)),
}
}
}
impl From<Option<NetworkEvent>> for Action {
#[tracing::instrument(level = "trace", fields(subsystem = LOG_TARGET))]
fn from(event: Option<NetworkEvent>) -> Action {
match event {
None => {
tracing::info!(
target: LOG_TARGET,
"Shutting down Network Bridge: underlying event stream concluded"
);
Action::Abort
}
None => Action::Abort(AbortReason::EventStreamConcluded),
Some(NetworkEvent::Dht(_))
| Some(NetworkEvent::SyncConnected { .. })
| Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop,
......@@ -153,7 +169,9 @@ impl From<Option<NetworkEvent>> for Action {
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let v_messages: Result<Vec<_>, _> = messages
.iter()
.filter(|(protocol, _)| protocol == &PeerSet::Validation.into_protocol_name())
.filter(|(protocol, _)| {
protocol == &PeerSet::Validation.into_protocol_name()
})
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();
......@@ -164,7 +182,9 @@ impl From<Option<NetworkEvent>> for Action {
let c_messages: Result<Vec<_>, _> = messages
.iter()
.filter(|(protocol, _)| protocol == &PeerSet::Collation.into_protocol_name())
.filter(|(protocol, _)| {
protocol == &PeerSet::Collation.into_protocol_name()
})
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();
......@@ -182,3 +202,13 @@ impl From<Option<NetworkEvent>> for Action {
}
}
}
impl From<Option<Result<AllMessages, RequestMultiplexError>>> for Action {
fn from(event: Option<Result<AllMessages, RequestMultiplexError>>) -> Self {
match event {
None => Action::Abort(AbortReason::RequestStreamConcluded),
Some(Err(err)) => Action::ReportPeer(err.peer, MALFORMED_MESSAGE_COST),
Some(Ok(msg)) => Action::SendMessage(msg),
}
}
}
......@@ -30,11 +30,11 @@ use polkadot_subsystem::{
use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages, AvailabilityDistributionMessage,
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
CollatorProtocolMessage, ApprovalDistributionMessage,
CollatorProtocolMessage, ApprovalDistributionMessage, NetworkBridgeEvent,
};
use polkadot_primitives::v1::{Hash, BlockNumber};
use polkadot_node_network_protocol::{
ReputationChange, PeerId, peer_set::PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView,
ReputationChange, PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView,
};
/// Peer set infos for network initialization.
......@@ -53,7 +53,7 @@ mod validator_discovery;
/// All requested `NetworkBridgeMessage` user actions and `NetworkEvent` network messages are
/// translated to `Action` before being processed by `run_network`.
mod action;
use action::Action;
use action::{Action, AbortReason};
/// Actual interfacing to the network based on the `Network` trait.
///
......@@ -61,6 +61,10 @@ use action::Action;
mod network;
use network::{Network, send_message};
/// Request multiplexer for combining the multiple request sources into a single `Stream` of `AllMessages`.
mod multiplexer;
pub use multiplexer::RequestMultiplexer;
/// The maximum amount of heads a peer is allowed to have in their view at any time.
///
......@@ -95,6 +99,7 @@ pub struct NetworkBridge<N, AD> {
/// `Network` trait implementing type.
network_service: N,
authority_discovery_service: AD,
request_multiplexer: RequestMultiplexer,
}
impl<N, AD> NetworkBridge<N, AD> {
......@@ -102,10 +107,11 @@ impl<N, AD> NetworkBridge<N, AD> {
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`peers_sets_info`](peers_sets_info).
pub fn new(network_service: N, authority_discovery_service: AD) -> Self {
pub fn new(network_service: N, authority_discovery_service: AD, request_multiplexer: RequestMultiplexer) -> Self {
NetworkBridge {
network_service,
authority_discovery_service,
request_multiplexer,
}
}
}
......@@ -119,12 +125,7 @@ impl<Net, AD, Context> Subsystem<Context> for NetworkBridge<Net, AD>
fn start(self, ctx: Context) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
let Self { network_service, authority_discovery_service } = self;
let future = run_network(
network_service,
authority_discovery_service,
ctx,
)
let future = run_network(self, ctx)
.map_err(|e| {
SubsystemError::with_origin("network-bridge", e)
})
......@@ -142,17 +143,16 @@ struct PeerData {
}
/// Main driver, processing network events and messages from other subsystems.
#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(bridge, ctx), fields(subsystem = LOG_TARGET))]
async fn run_network<N, AD>(
mut network_service: N,
mut authority_discovery_service: AD,
mut bridge: NetworkBridge<N, AD>,
mut ctx: impl SubsystemContext<Message=NetworkBridgeMessage>,
) -> SubsystemResult<()>
where
N: Network + validator_discovery::Network,
AD: validator_discovery::AuthorityDiscovery,
{
let mut event_stream = network_service.event_stream().fuse();
let mut event_stream = bridge.network_service.event_stream().fuse();
// Most recent heads are at the back.
let mut live_heads: Vec<(Hash, Arc<JaegerSpan>)> = Vec::with_capacity(MAX_VIEW_HEADS);
......@@ -169,22 +169,55 @@ where
let action = {
let subsystem_next = ctx.recv().fuse();
let mut net_event_next = event_stream.next().fuse();
let mut req_res_event_next = bridge.request_multiplexer.next().fuse();
futures::pin_mut!(subsystem_next);
futures::select! {
subsystem_msg = subsystem_next => Action::from(subsystem_msg),
net_event = net_event_next => Action::from(net_event),
req_res_event = req_res_event_next => Action::from(req_res_event),
}
};
match action {
Action::Nop => {}
Action::Abort => return Ok(()),
Action::Abort(reason) => match reason {
AbortReason::SubsystemError(err) => {
tracing::warn!(
target: LOG_TARGET,
err = ?err,
"Shutting down Network Bridge due to error"
);
return Err(SubsystemError::Context(format!(
"Received SubsystemError from overseer: {:?}",
err
)));
}
AbortReason::EventStreamConcluded => {
tracing::info!(
target: LOG_TARGET,
"Shutting down Network Bridge: underlying request stream concluded"
);
return Err(SubsystemError::Context(
"Incoming network event stream concluded.".to_string(),
));
}
AbortReason::RequestStreamConcluded => {
tracing::info!(
target: LOG_TARGET,
"Shutting down Network Bridge: underlying request stream concluded"
);
return Err(SubsystemError::Context(
"Incoming network request stream concluded".to_string(),
));
}
AbortReason::OverseerConcluded => return Ok(()),
}
Action::SendValidationMessages(msgs) => {
for (peers, msg) in msgs {
send_message(
&mut network_service,
&mut bridge.network_service,
peers,
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
......@@ -195,7 +228,7 @@ where
Action::SendCollationMessages(msgs) => {
for (peers, msg) in msgs {
send_message(
&mut network_service,
&mut bridge.network_service,
peers,
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
......@@ -203,6 +236,12 @@ where
}
}
Action::SendRequests(reqs) => {
for req in reqs {
bridge.network_service.start_request(req);
}
},
Action::ConnectToValidators {
validator_ids,
connected,
......@@ -210,21 +249,28 @@ where
let (ns, ads) = validator_discovery.on_request(
validator_ids,
connected,
network_service,
authority_discovery_service,
bridge.network_service,
bridge.authority_discovery_service,
).await;
network_service = ns;
authority_discovery_service = ads;
bridge.network_service = ns;
bridge.authority_discovery_service = ads;
},
Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?,
Action::ReportPeer(peer, rep) => {
tracing::debug!(
target: LOG_TARGET,
peer = ?peer,
"Peer sent us an invalid request",
);
bridge.network_service.report_peer(peer, rep).await?
}
Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
live_heads.extend(activated);
live_heads.retain(|h| !deactivated.contains(&h.0));
update_our_view(
&mut network_service,
&mut bridge.network_service,
&mut ctx,
&live_heads,
&mut local_view,
......@@ -250,7 +296,7 @@ where
PeerSet::Collation => &mut collation_peers,
};
validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await;
validator_discovery.on_peer_connected(&peer, &mut bridge.authority_discovery_service).await;
match peer_map.entry(peer.clone()) {
hash_map::Entry::Occupied(_) => continue,
......@@ -311,7 +357,7 @@ where
peer.clone(),
&mut validation_peers,
v_messages,
&mut network_service,
&mut bridge.network_service,
).await?;
dispatch_validation_events_to_all(events, &mut ctx).await;
......@@ -322,12 +368,13 @@ where
peer.clone(),
&mut collation_peers,
c_messages,
&mut network_service,
&mut bridge.network_service,
).await?;
dispatch_collation_events_to_all(events, &mut ctx).await;
}
},
Action::SendMessage(msg) => ctx.send_message(msg).await,
}
}
}
......@@ -562,9 +609,10 @@ mod tests {
use polkadot_node_subsystem_util::metered;
use polkadot_node_network_protocol::view;
use sc_network::Multiaddr;
use sc_network::config::RequestResponseConfig;
use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::AuthorityDiscoveryId;
use polkadot_node_network_protocol::ObservedRole;
use polkadot_node_network_protocol::{ObservedRole, request_response::request::Requests};
use crate::network::{Network, NetworkAction};
......@@ -572,6 +620,7 @@ mod tests {
struct TestNetwork {
net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
action_tx: metered::UnboundedMeteredSender<NetworkAction>,
_req_configs: Vec<RequestResponseConfig>,
}
struct TestAuthorityDiscovery;
......@@ -583,7 +632,7 @@ mod tests {
net_tx: SingleItemSink<NetworkEvent>,
}
fn new_test_network() -> (
fn new_test_network(req_configs: Vec<RequestResponseConfig>) -> (
TestNetwork,
TestNetworkHandle,
TestAuthorityDiscovery,
......@@ -595,6 +644,7 @@ mod tests {
TestNetwork {
net_events: Arc::new(Mutex::new(Some(net_rx))),
action_tx,
_req_configs: req_configs,
},
TestNetworkHandle {
action_rx,
......@@ -617,6 +667,9 @@ mod tests {
{
Box::pin((&mut self.action_tx).sink_map_err(Into::into))
}
fn start_request(&self, _: Requests) {
}
}
#[async_trait]
......@@ -698,12 +751,18 @@ mod tests {
fn test_harness<T: Future<Output=()>>(test: impl FnOnce(TestHarness) -> T) {
let pool = sp_core::testing::TaskExecutor::new();
let (network, network_handle, discovery) = new_test_network();
let (request_multiplexer, req_configs) = RequestMultiplexer::new();
let (network, network_handle, discovery) = new_test_network(req_configs);
let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool);
let bridge = NetworkBridge {
network_service: network,
authority_discovery_service: discovery,
request_multiplexer,
};
let network_bridge = run_network(
network,
discovery,
bridge,
context,
)
.map_err(|_| panic!("subsystem execution failed"))
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::pin::Pin;
use futures::channel::mpsc;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use strum::IntoEnumIterator;
use parity_scale_codec::{Decode, Error as DecodingError};
use sc_network::config as network;
use sc_network::PeerId;
use polkadot_node_network_protocol::request_response::{
request::IncomingRequest, v1, Protocol, RequestResponseConfig,
};
use polkadot_subsystem::messages::AllMessages;
/// Multiplex incoming network requests.
///
/// This multiplexer consumes all request streams and makes them a `Stream` of a single message
/// type, useful for the network bridge to send them via the `Overseer` to other subsystems.
pub struct RequestMultiplexer {
receivers: Vec<(Protocol, mpsc::Receiver<network::IncomingRequest>)>,
next_poll: usize,
}
/// Multiplexing can fail in case of invalid messages.
pub struct RequestMultiplexError {
/// The peer that sent the invalid message.
pub peer: PeerId,
/// The error that occurred.
pub error: DecodingError,
}
impl RequestMultiplexer {
/// Create a new `RequestMultiplexer`.
///
/// This function uses `Protocol::get_config` for each available protocol and creates a
/// `RequestMultiplexer` from it. The returned `RequestResponseConfig`s must be passed to the
/// network implementation.
pub fn new() -> (Self, Vec<RequestResponseConfig>) {
let (receivers, cfgs): (Vec<_>, Vec<_>) = Protocol::iter()