Unverified Commit 5fae68e6 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

network-bridge: remove action_sink abstraction (#3308)

* network-bridge: remove action_sink abstraction

* another wtf

* filter out event stream

* Revert "filter out event stream"

This reverts commit 63bd8f5d.

* retain cleanup though
parent 122734f1
Pipeline #143240 passed with stages
in 35 minutes and 52 seconds
......@@ -49,7 +49,6 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
pub use polkadot_node_network_protocol::peer_set::{peer_sets_info, IsAuthority};
use std::collections::{HashMap, hash_map, HashSet};
use std::iter::ExactSizeIterator;
use std::sync::Arc;
mod validator_discovery;
......@@ -413,7 +412,7 @@ where
&shared,
finalized_number,
&metrics,
).await?;
);
}
}
}
......@@ -443,7 +442,7 @@ where
action = "ReportPeer"
);
}
network_service.report_peer(peer, rep).await?
network_service.report_peer(peer, rep);
}
NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => {
tracing::trace!(
......@@ -452,7 +451,7 @@ where
?peer,
peer_set = ?peer_set,
);
network_service.disconnect_peer(peer, peer_set).await?;
network_service.disconnect_peer(peer, peer_set);
}
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
tracing::trace!(
......@@ -467,7 +466,7 @@ where
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
&metrics,
).await?
);
}
NetworkBridgeMessage::SendValidationMessages(msgs) => {
tracing::trace!(
......@@ -483,7 +482,7 @@ where
PeerSet::Validation,
WireMessage::ProtocolMessage(msg),
&metrics,
).await?
);
}
}
NetworkBridgeMessage::SendCollationMessage(peers, msg) => {
......@@ -499,7 +498,7 @@ where
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
&metrics,
).await?
);
}
NetworkBridgeMessage::SendCollationMessages(msgs) => {
tracing::trace!(
......@@ -515,7 +514,7 @@ where
PeerSet::Collation,
WireMessage::ProtocolMessage(msg),
&metrics,
).await?
);
}
}
NetworkBridgeMessage::SendRequests(reqs, if_disconnected) => {
......@@ -595,15 +594,16 @@ where
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
mut sender: impl SubsystemSender,
mut network_service: impl Network,
mut network_stream: BoxStream<'static, NetworkEvent>,
network_stream: BoxStream<'static, NetworkEvent>,
mut authority_discovery_service: AD,
mut request_multiplexer: RequestMultiplexer,
metrics: Metrics,
shared: Shared,
) -> Result<(), UnexpectedAbort> {
let mut network_stream = network_stream.fuse();
loop {
futures::select! {
network_event = network_stream.next().fuse() => match network_event {
network_event = network_stream.next() => match network_event {
None => return Err(UnexpectedAbort::EventStreamConcluded),
Some(NetworkEvent::Dht(_))
| Some(NetworkEvent::SyncConnected { .. })
......@@ -668,7 +668,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
local_view,
),
&metrics,
).await?;
);
}
PeerSet::Collation => {
dispatch_collation_events_to_all(
......@@ -690,7 +690,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
local_view,
),
&metrics,
).await?;
);
}
}
}
......@@ -754,7 +754,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
action = "ReportPeer"
);
network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?;
network_service.report_peer(remote, MALFORMED_MESSAGE_COST);
continue;
}
Ok(v) => v,
......@@ -778,7 +778,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
action = "ReportPeer"
);
network_service.report_peer(remote, MALFORMED_MESSAGE_COST).await?;
network_service.report_peer(remote, MALFORMED_MESSAGE_COST);
continue;
}
Ok(c_messages) => {
......@@ -803,7 +803,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
);
for report in reports {
network_service.report_peer(remote.clone(), report).await?;
network_service.report_peer(remote.clone(), report);
}
dispatch_validation_events_to_all(events, &mut sender).await;
......@@ -819,7 +819,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
);
for report in reports {
network_service.report_peer(remote.clone(), report).await?;
network_service.report_peer(remote.clone(), report);
}
......@@ -833,10 +833,7 @@ async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
req_res_event = request_multiplexer.next().fuse() => match req_res_event {
None => return Err(UnexpectedAbort::RequestStreamConcluded),
Some(Err(err)) => {
sender.send_message(NetworkBridgeMessage::ReportPeer(
err.peer,
MALFORMED_MESSAGE_COST,
).into()).await;
network_service.report_peer(err.peer, MALFORMED_MESSAGE_COST);
}
Some(Ok(msg)) => {
sender.send_message(msg).await;
......@@ -874,7 +871,7 @@ where
authority_discovery_service,
metrics,
sync_oracle,
} = bridge;
} = bridge;
let statement_receiver = request_multiplexer
.get_statement_fetching()
......@@ -953,14 +950,14 @@ fn construct_view(live_heads: impl DoubleEndedIterator<Item = Hash>, finalized_n
)
}
async fn update_our_view(
fn update_our_view(
net: &mut impl Network,
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
live_heads: &[ActivatedLeaf],
shared: &Shared,
finalized_number: BlockNumber,
metrics: &Metrics,
) -> SubsystemResult<()> {
) {
let new_view = construct_view(live_heads.iter().map(|v| v.hash), finalized_number);
let (validation_peers, collation_peers) = {
......@@ -973,11 +970,11 @@ async fn update_our_view(
// there is no need to send anything.
match shared.local_view {
Some(ref v) if v.check_heads_eq(&new_view) => {
return Ok(())
return;
}
None if live_heads.is_empty() => {
shared.local_view = Some(new_view);
return Ok(())
return;
}
_ => {
shared.local_view = Some(new_view.clone());
......@@ -996,14 +993,14 @@ async fn update_our_view(
validation_peers,
WireMessage::ViewUpdate(new_view.clone()),
metrics,
).await?;
);
send_collation_message(
net,
collation_peers,
WireMessage::ViewUpdate(new_view),
metrics,
).await?;
);
let our_view = OurView::new(
live_heads.iter().take(MAX_VIEW_HEADS).cloned().map(|a| (a.hash, a.span)),
......@@ -1019,8 +1016,6 @@ async fn update_our_view(
NetworkBridgeEvent::OurViewChange(our_view),
ctx.sender(),
);
Ok(())
}
// Handle messages on a specific peer-set. The peer is expected to be connected on that
......@@ -1075,30 +1070,22 @@ fn handle_peer_messages<M>(
(outgoing_messages, reports)
}
async fn send_validation_message<I>(
fn send_validation_message(
net: &mut impl Network,
peers: I,
peers: Vec<PeerId>,
message: WireMessage<protocol_v1::ValidationProtocol>,
metrics: &Metrics,
) -> SubsystemResult<()>
where
I: IntoIterator<Item=PeerId>,
I::IntoIter: ExactSizeIterator,
{
send_message(net, peers, PeerSet::Validation, message, metrics).await
) {
send_message(net, peers, PeerSet::Validation, message, metrics);
}
async fn send_collation_message<I>(
fn send_collation_message(
net: &mut impl Network,
peers: I,
peers: Vec<PeerId>,
message: WireMessage<protocol_v1::CollationProtocol>,
metrics: &Metrics,
) -> SubsystemResult<()>
where
I: IntoIterator<Item=PeerId>,
I::IntoIter: ExactSizeIterator,
{
send_message(net, peers, PeerSet::Collation, message, metrics).await
) {
send_message(net, peers, PeerSet::Collation, message, metrics)
}
......
......@@ -16,11 +16,9 @@
use std::borrow::Cow;
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::BoxStream;
......@@ -36,7 +34,6 @@ use polkadot_node_network_protocol::{
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
use polkadot_subsystem::{SubsystemError, SubsystemResult};
use crate::validator_discovery::AuthorityDiscovery;
......@@ -47,62 +44,32 @@ use super::LOG_TARGET;
/// This function is only used internally by the network-bridge, which is responsible to only send
/// messages that are compatible with the passed peer set, as that is currently not enforced by
/// this function. These are messages of type `WireMessage` parameterized on the matching type.
pub(crate) async fn send_message<M, I>(
pub(crate) fn send_message<M>(
net: &mut impl Network,
peers: I,
mut peers: Vec<PeerId>,
peer_set: PeerSet,
message: M,
metrics: &super::Metrics,
) -> SubsystemResult<()>
)
where
M: Encode + Clone,
I: IntoIterator<Item = PeerId>,
I::IntoIter: ExactSizeIterator,
{
let mut message_producer = stream::iter({
let peers = peers.into_iter();
let n_peers = peers.len();
let mut message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, encoded.len(), n_peers);
Some(encoded)
};
peers.enumerate().map(move |(i, peer)| {
// optimization: avoid cloning the message for the last peer in the
// list. The message payload can be quite large. If the underlying
// network used `Bytes` this would not be necessary.
let message = if i == n_peers - 1 {
message
.take()
.expect("Only taken in last iteration of loop, never afterwards; qed")
} else {
message
.as_ref()
.expect("Only taken in last iteration of loop, we are not there yet; qed")
.clone()
};
Ok(NetworkAction::WriteNotification(peer, peer_set, message))
})
let message = {
let encoded = message.encode();
metrics.on_notification_sent(peer_set, encoded.len(), peers.len());
encoded
};
// optimization: avoid cloning the message for the last peer in the
// list. The message payload can be quite large. If the underlying
// network used `Bytes` this would not be necessary.
let last_peer = peers.pop();
peers.into_iter().for_each(|peer| {
net.write_notification(peer, peer_set, message.clone());
});
net.action_sink().send_all(&mut message_producer).await
}
/// An action to be carried out by the network.
///
/// This type is used for implementing `Sink` in order to communicate asynchronously with the
/// underlying network implementation in the `Network` trait.
#[derive(Debug, PartialEq)]
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, Rep),
/// Disconnect a peer from the given peer-set.
DisconnectPeer(PeerId, PeerSet),
/// Write a notification to a given peer on the given peer-set.
WriteNotification(PeerId, PeerSet, Vec<u8>),
if let Some(peer) = last_peer {
net.write_notification(peer, peer_set, message);
}
}
/// An abstraction over networking for the purposes of this subsystem.
......@@ -117,14 +84,18 @@ pub trait Network: Clone + Send + 'static {
/// Ask the network to keep a substream open with these nodes and not disconnect from them
/// until removed from the protocol's peer set.
/// Note that `out_peers` setting has no effect on this.
async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
/// Cancels the effects of `add_to_peers_set`.
async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String>;
async fn add_to_peers_set(
&mut self,
protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String>;
/// Get access to an underlying sink for all network actions.
fn action_sink<'a>(
&'a mut self,
) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>;
/// Cancels the effects of `add_to_peers_set`.
async fn remove_from_peers_set(
&mut self,
protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String>;
/// Send a request to a remote peer.
async fn start_request<AD: AuthorityDiscovery>(
......@@ -135,47 +106,18 @@ pub trait Network: Clone + Send + 'static {
);
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
fn report_peer(
&mut self,
who: PeerId,
cost_benefit: Rep,
) -> BoxFuture<SubsystemResult<()>> {
async move {
self.action_sink()
.send(NetworkAction::ReputationChange(who, cost_benefit))
.await
}
.boxed()
}
fn report_peer(&self, who: PeerId, cost_benefit: Rep);
/// Disconnect a given peer from the peer set specified without harming reputation.
fn disconnect_peer(
&mut self,
who: PeerId,
peer_set: PeerSet,
) -> BoxFuture<SubsystemResult<()>> {
async move {
self.action_sink()
.send(NetworkAction::DisconnectPeer(who, peer_set))
.await
}
.boxed()
}
fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet);
/// Write a notification to a peer on the given peer-set's protocol.
fn write_notification(
&mut self,
&self,
who: PeerId,
peer_set: PeerSet,
message: Vec<u8>,
) -> BoxFuture<SubsystemResult<()>> {
async move {
self.action_sink()
.send(NetworkAction::WriteNotification(who, peer_set, message))
.await
}
.boxed()
}
);
}
#[async_trait]
......@@ -184,56 +126,42 @@ impl Network for Arc<NetworkService<Block, Hash>> {
NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}
async fn add_to_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
async fn add_to_peers_set(
&mut self,
protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
sc_network::NetworkService::add_peers_to_reserved_set(&**self, protocol, multiaddresses)
}
async fn remove_from_peers_set(&mut self, protocol: Cow<'static, str>, multiaddresses: HashSet<Multiaddr>) -> Result<(), String> {
sc_network::NetworkService::remove_peers_from_reserved_set(&**self, protocol.clone(), multiaddresses.clone())?;
async fn remove_from_peers_set(
&mut self,
protocol: Cow<'static, str>,
multiaddresses: HashSet<Multiaddr>,
) -> Result<(), String> {
sc_network::NetworkService::remove_peers_from_reserved_set(
&**self,
protocol.clone(),
multiaddresses.clone(),
)?;
sc_network::NetworkService::remove_from_peers_set(&**self, protocol, multiaddresses)
}
fn action_sink<'a>(
&'a mut self,
) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>> {
use futures::task::{Context, Poll};
// wrapper around a NetworkService to make it act like a sink.
struct ActionSink<'b>(&'b NetworkService<Block, Hash>);
impl<'b> Sink<NetworkAction> for ActionSink<'b> {
type Error = SubsystemError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
match action {
NetworkAction::ReputationChange(peer, cost_benefit) => {
self.0.report_peer(peer, cost_benefit.into_base_rep())
}
NetworkAction::DisconnectPeer(peer, peer_set) => self
.0
.disconnect_peer(peer, peer_set.into_protocol_name()),
NetworkAction::WriteNotification(peer, peer_set, message) => self
.0
.write_notification(peer, peer_set.into_protocol_name(), message),
}
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
sc_network::NetworkService::report_peer(&**self, who, cost_benefit.into_base_rep());
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
}
fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) {
sc_network::NetworkService::disconnect_peer(&**self, who, peer_set.into_protocol_name());
}
Box::pin(ActionSink(&**self))
fn write_notification(&self, who: PeerId, peer_set: PeerSet, message: Vec<u8>) {
sc_network::NetworkService::write_notification(
&**self,
who,
peer_set.into_protocol_name(),
message,
);
}
async fn start_request<AD: AuthorityDiscovery>(
......
......@@ -21,7 +21,6 @@ use futures::channel::oneshot;
use std::borrow::Cow;
use std::collections::HashSet;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use parking_lot::Mutex;
......@@ -46,14 +45,25 @@ use sp_keyring::Sr25519Keyring;
use polkadot_primitives::v1::AuthorityDiscoveryId;
use polkadot_node_network_protocol::{ObservedRole, request_response::request::Requests};
use crate::network::{Network, NetworkAction};
use crate::network::Network;
use crate::validator_discovery::AuthorityDiscovery;
use crate::Rep;
#[derive(Debug, PartialEq)]
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, Rep),
/// Disconnect a peer from the given peer-set.
DisconnectPeer(PeerId, PeerSet),
/// Write a notification to a given peer on the given peer-set.
WriteNotification(PeerId, PeerSet, Vec<u8>),
}
// The subsystem's view of the network - only supports a single call to `event_stream`.
#[derive(Clone)]
struct TestNetwork {
net_events: Arc<Mutex<Option<SingleItemStream<NetworkEvent>>>>,
action_tx: metered::UnboundedMeteredSender<NetworkAction>,
action_tx: Arc<Mutex<metered::UnboundedMeteredSender<NetworkAction>>>,
_req_configs: Vec<RequestResponseConfig>,
}
......@@ -78,7 +88,7 @@ fn new_test_network(req_configs: Vec<RequestResponseConfig>) -> (
(
TestNetwork {
net_events: Arc::new(Mutex::new(Some(net_rx))),
action_tx,
action_tx: Arc::new(Mutex::new(action_tx)),
_req_configs: req_configs,
},
TestNetworkHandle {
......@@ -106,13 +116,30 @@ impl Network for TestNetwork {
Ok(())
}
fn action_sink<'a>(&'a mut self)
-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
{
Box::pin((&mut self.action_tx).sink_map_err(Into::into))
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
}
async fn start_request<AD: AuthorityDiscovery>(&self, _: &mut AD, _: Requests, _: IfDisconnected) {
fn report_peer(&self, who: PeerId, cost_benefit: Rep) {
self.action_tx.lock().unbounded_send(
NetworkAction::ReputationChange(who, cost_benefit)
).unwrap();
}
fn disconnect_peer(&self, who: PeerId, peer_set: PeerSet) {
self.action_tx.lock().unbounded_send(
NetworkAction::DisconnectPeer(who, peer_set)
).unwrap();
}
fn write_notification(
&self,
who: PeerId,
peer_set: PeerSet,
message: Vec<u8>,
) {
self.action_tx.lock().unbounded_send(
NetworkAction::WriteNotification(who, peer_set, message)
).unwrap();
}
}
......
......@@ -143,10 +143,10 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
#[cfg(test)]
mod tests {
use super::*;
use crate::network::{Network, NetworkAction};
use crate::network::Network;
use std::{borrow::Cow, pin::Pin, collections::HashMap};
use futures::{sink::Sink, stream::BoxStream};
use std::{borrow::Cow, collections::HashMap};
use futures::stream::BoxStream;
use sc_network::{Event as NetworkEvent, IfDisconnected};
use sp_keyring::Sr25519Keyring;
use polkadot_node_network_protocol::request_response::request::Requests;
......@@ -203,13 +203,24 @@ mod tests {
Ok(())
}
fn action_sink<'a>(&'a mut self)