Newer
Older
Robert Klotzner
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright 2020-2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::pin::Pin;
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::prelude::*;
use futures::stream::BoxStream;
use parity_scale_codec::Encode;
use sc_network::Event as NetworkEvent;
use super::LOG_TARGET;
use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId, ReputationChange};
use polkadot_primitives::v1::{Block, Hash};
use polkadot_subsystem::{SubsystemError, SubsystemResult};
/// Send a message to the network.
///
/// This function is only used internally by the network-bridge, which is responsible to only send
/// messages that are compatible with the passed peer set, as that is currently not enforced by
/// this function. These are messages of type `WireMessage` parameterized on the matching type.
pub(crate) async fn send_message<M, I>(
net: &mut impl Network,
peers: I,
peer_set: PeerSet,
message: M,
) -> SubsystemResult<()>
where
M: Encode + Clone,
I: IntoIterator<Item = PeerId>,
I::IntoIter: ExactSizeIterator,
{
let mut message_producer = stream::iter({
let peers = peers.into_iter();
let n_peers = peers.len();
let mut message = Some(message.encode());
peers.enumerate().map(move |(i, peer)| {
// optimization: avoid cloning the message for the last peer in the
// list. The message payload can be quite large. If the underlying
// network used `Bytes` this would not be necessary.
let message = if i == n_peers - 1 {
message
.take()
.expect("Only taken in last iteration of loop, never afterwards; qed")
} else {
message
.as_ref()
.expect("Only taken in last iteration of loop, we are not there yet; qed")
.clone()
};
Ok(NetworkAction::WriteNotification(peer, peer_set, message))
})
});
net.action_sink().send_all(&mut message_producer).await
}
/// An action to be carried out by the network.
///
/// This type is used for implementing `Sink` in order to cummunicate asynchronously with the
/// underlying network implementation in the `Network` trait.
#[derive(Debug, PartialEq)]
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, ReputationChange),
/// Write a notification to a given peer on the given peer-set.
WriteNotification(PeerId, PeerSet, Vec<u8>),
}
/// An abstraction over networking for the purposes of this subsystem.
///
pub trait Network: Send + 'static {
/// Get a stream of all events occurring on the network. This may include events unrelated
/// to the Polkadot protocol - the user of this function should filter only for events related
/// to the [`VALIDATION_PROTOCOL_NAME`](VALIDATION_PROTOCOL_NAME)
/// or [`COLLATION_PROTOCOL_NAME`](COLLATION_PROTOCOL_NAME)
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent>;
/// Get access to an underlying sink for all network actions.
fn action_sink<'a>(
&'a mut self,
) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>;
/// Report a given peer as either beneficial (+) or costly (-) according to the given scalar.
fn report_peer(
&mut self,
who: PeerId,
cost_benefit: ReputationChange,
) -> BoxFuture<SubsystemResult<()>> {
async move {
self.action_sink()
.send(NetworkAction::ReputationChange(who, cost_benefit))
.await
}
.boxed()
}
/// Write a notification to a peer on the given peer-set's protocol.
fn write_notification(
&mut self,
who: PeerId,
peer_set: PeerSet,
message: Vec<u8>,
) -> BoxFuture<SubsystemResult<()>> {
async move {
self.action_sink()
.send(NetworkAction::WriteNotification(who, peer_set, message))
.await
}
.boxed()
}
}
impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn action_sink<'a>(
&'a mut self,
) -> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>> {
use futures::task::{Context, Poll};
// wrapper around a NetworkService to make it act like a sink.
struct ActionSink<'b>(&'b sc_network::NetworkService<Block, Hash>);
impl<'b> Sink<NetworkAction> for ActionSink<'b> {
type Error = SubsystemError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
match action {
NetworkAction::ReputationChange(peer, cost_benefit) => {
tracing::debug!(
target: LOG_TARGET,
"Changing reputation: {:?} for {}",
cost_benefit,
peer
);
self.0.report_peer(peer, cost_benefit)
}
NetworkAction::WriteNotification(peer, peer_set, message) => self
.0
.write_notification(peer, peer_set.into_protocol_name(), message),
}
Ok(())
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<SubsystemResult<()>> {
Poll::Ready(Ok(()))
}
}
Box::pin(ActionSink(&**self))
}
}