5.78 KB
Newer Older
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <>.

use std::pin::Pin;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::BoxStream;

use parity_scale_codec::Encode;

use sc_network::Event as NetworkEvent;

use super::LOG_TARGET;
use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId, ReputationChange};
use polkadot_primitives::v1::{Block, Hash};
use polkadot_subsystem::{SubsystemError, SubsystemResult};

/// Send a message to the network.
/// This function is only used internally by the network-bridge, which is responsible to only send
/// messages that are compatible with the passed peer set, as that is currently not enforced by
/// this function. These are messages of type `WireMessage` parameterized on the matching type.
pub(crate) async fn send_message<M, I>(
	net: &mut impl Network,
	peers: I,
	peer_set: PeerSet,
	message: M,
) -> SubsystemResult<()>
	M: Encode + Clone,
	I: IntoIterator<Item = PeerId>,
	I::IntoIter: ExactSizeIterator,
	let mut message_producer = stream::iter({
		let peers = peers.into_iter();
		let n_peers = peers.len();
		let mut message = Some(message.encode());

		peers.enumerate().map(move |(i, peer)| {
			// optimization: avoid cloning the message for the last peer in the
			// list. The message payload can be quite large. If the underlying
			// network used `Bytes` this would not be necessary.
			let message = if i == n_peers - 1 {
					.expect("Only taken in last iteration of loop, never afterwards; qed")
			} else {
					.expect("Only taken in last iteration of loop, we are not there yet; qed")

			Ok(NetworkAction::WriteNotification(peer, peer_set, message))

	net.action_sink().send_all(&mut message_producer).await

/// An action to be carried out by the network.
/// This type is used for implementing `Sink` in order to cummunicate asynchronously with the
/// underlying network implementation in the `Network` trait.
#[derive(Debug, PartialEq)]
pub enum NetworkAction {
	/// Note a change in reputation for a peer.
	ReputationChange(PeerId, ReputationChange),
	/// Write a notification to a given peer on the given peer-set.
	WriteNotification(PeerId, PeerSet, Vec<u8>),

/// An abstraction over networking for the purposes of this subsystem.
pub trait Network: Send + 'static {
	/// Get a stream of all events occurring on the network. This may include events unrelated
	/// to the Polkadot protocol - the user of this function should filter only for events related
	fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;

	/// Get access to an underlying sink for all network actions.
	fn action_sink<'a>(
		&'a mut self,
	) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>;

	/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
	fn report_peer(
		&mut self,
		who: PeerId,
		cost_benefit: ReputationChange,
	) -> BoxFuture<SubsystemResult<()>> {
		async move {
				.send(NetworkAction::ReputationChange(who, cost_benefit))

	/// Write a notification to a peer on the given peer-set's protocol.
	fn write_notification(
		&mut self,
		who: PeerId,
		peer_set: PeerSet,
		message: Vec<u8>,
	) -> BoxFuture<SubsystemResult<()>> {
		async move {
				.send(NetworkAction::WriteNotification(who, peer_set, message))

impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
	fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
		sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()

	#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
	fn action_sink<'a>(
		&'a mut self,
	) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>> {
		use futures::task::{Context, Poll};

		// wrapper around a NetworkService to make it act like a sink.
		struct ActionSink<'b>(&'b sc_network::NetworkService<Block, Hash>);

		impl<'b> Sink<NetworkAction> for ActionSink<'b> {
			type Error = SubsystemError;

			fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {

			fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
				match action {
					NetworkAction::ReputationChange(peer, cost_benefit) => {
							target: LOG_TARGET,
							"Changing reputation: {:?} for {}",
						self.0.report_peer(peer, cost_benefit)
					NetworkAction::WriteNotification(peer, peer_set, message) => self
						.write_notification(peer, peer_set.into_protocol_name(), message),


			fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {

			fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {