Skip to content
GitLab
Explore
Sign in
Radu Popa
substrate
Compare revisions
9692a80520e3290708207534b5c8d5e6133d8fcc to 1fbcc94bcec7b0fcc038906593f40369bfc5704d
Expand all
Hide whitespace changes
Inline
Side-by-side
client/network-gossip/src/bridge.rs
View file @
1fbcc94b
...
...
@@ -69,7 +69,7 @@ impl<B: BlockT> GossipEngine<B> {
pub
fn
new
<
N
:
Network
<
B
>
+
Send
+
Clone
+
'static
>
(
network
:
N
,
engine_id
:
ConsensusEngineId
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
,
validator
:
Arc
<
dyn
Validator
<
B
>>
,
)
->
Self
where
B
:
'static
{
// We grab the event stream before registering the notifications protocol, otherwise we
...
...
@@ -333,7 +333,7 @@ mod tests {
unimplemented!
();
}
fn
register_notifications_protocol
(
&
self
,
_
:
ConsensusEngineId
,
_
:
Cow
<
'static
,
[
u8
]
>
)
{}
fn
register_notifications_protocol
(
&
self
,
_
:
ConsensusEngineId
,
_
:
Cow
<
'static
,
str
>
)
{}
fn
announce
(
&
self
,
_
:
B
::
Hash
,
_
:
Vec
<
u8
>
)
{
unimplemented!
();
...
...
@@ -362,7 +362,7 @@ mod tests {
let
mut
gossip_engine
=
GossipEngine
::
<
Block
>
::
new
(
network
.clone
(),
[
1
,
2
,
3
,
4
],
"my_protocol"
.as_bytes
()
,
"my_protocol"
,
Arc
::
new
(
AllowAll
{}),
);
...
...
@@ -390,7 +390,7 @@ mod tests {
let
mut
gossip_engine
=
GossipEngine
::
<
Block
>
::
new
(
network
.clone
(),
engine_id
.clone
(),
"my_protocol"
.as_bytes
()
,
"my_protocol"
,
Arc
::
new
(
AllowAll
{}),
);
...
...
@@ -525,7 +525,7 @@ mod tests {
let
mut
gossip_engine
=
GossipEngine
::
<
Block
>
::
new
(
network
.clone
(),
engine_id
.clone
(),
"my_protocol"
.as_bytes
()
,
"my_protocol"
,
Arc
::
new
(
TestValidator
{}),
);
...
...
client/network-gossip/src/lib.rs
View file @
1fbcc94b
...
...
@@ -87,7 +87,7 @@ pub trait Network<B: BlockT> {
fn
register_notifications_protocol
(
&
self
,
engine_id
:
ConsensusEngineId
,
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
);
/// Notify everyone we're connected to that we have the given block.
...
...
@@ -117,7 +117,7 @@ impl<B: BlockT, H: ExHashT> Network<B> for Arc<NetworkService<B, H>> {
fn
register_notifications_protocol
(
&
self
,
engine_id
:
ConsensusEngineId
,
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
)
{
NetworkService
::
register_notifications_protocol
(
self
,
engine_id
,
protocol_name
)
}
...
...
client/network-gossip/src/state_machine.rs
View file @
1fbcc94b
...
...
@@ -489,7 +489,7 @@ mod tests {
unimplemented!
();
}
fn
register_notifications_protocol
(
&
self
,
_
:
ConsensusEngineId
,
_
:
Cow
<
'static
,
[
u8
]
>
)
{}
fn
register_notifications_protocol
(
&
self
,
_
:
ConsensusEngineId
,
_
:
Cow
<
'static
,
str
>
)
{}
fn
announce
(
&
self
,
_
:
B
::
Hash
,
_
:
Vec
<
u8
>
)
{
unimplemented!
();
...
...
client/network/Cargo.toml
View file @
1fbcc94b
...
...
@@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"]
prost-build
=
"0.6.1"
[dependencies]
async-trait
=
"0.1"
async-std
=
{
version
=
"1.6.2"
,
features
=
[
"unstable"
]
}
bitflags
=
"1.2.0"
bs58
=
"0.3.1"
...
...
@@ -64,7 +65,7 @@ zeroize = "1.0.0"
[dependencies.libp2p]
version
=
"0.24.0"
default-features
=
false
features
=
[
"identify"
,
"kad"
,
"mdns-async-std"
,
"mplex"
,
"noise"
,
"ping"
,
"tcp-async-std"
,
"websocket"
,
"yamux"
]
features
=
[
"identify"
,
"kad"
,
"mdns-async-std"
,
"mplex"
,
"noise"
,
"ping"
,
"request-response"
,
"tcp-async-std"
,
"websocket"
,
"yamux"
]
[dev-dependencies]
assert_matches
=
"1.3"
...
...
client/network/src/behaviour.rs
View file @
1fbcc94b
...
...
@@ -16,7 +16,7 @@
use
crate
::{
config
::{
ProtocolId
,
Role
},
block_requests
,
light_client_handler
,
finality_requests
,
peer_info
,
discovery
::{
DiscoveryBehaviour
,
DiscoveryConfig
,
DiscoveryOut
},
peer_info
,
request_responses
,
discovery
::{
DiscoveryBehaviour
,
DiscoveryConfig
,
DiscoveryOut
},
protocol
::{
message
::{
self
,
Roles
},
CustomMessageOutcome
,
NotificationsSink
,
Protocol
},
ObservedRole
,
DhtEvent
,
ExHashT
,
};
...
...
@@ -39,6 +39,10 @@ use std::{
time
::
Duration
,
};
pub
use
crate
::
request_responses
::{
ResponseFailure
,
InboundFailure
,
RequestFailure
,
OutboundFailure
,
RequestId
,
SendRequestError
};
/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event
=
"BehaviourOut<B>"
,
poll_method
=
"poll"
)]
...
...
@@ -50,6 +54,8 @@ pub struct Behaviour<B: BlockT, H: ExHashT> {
peer_info
:
peer_info
::
PeerInfoBehaviour
,
/// Discovers nodes of the network.
discovery
:
DiscoveryBehaviour
,
/// Generic request-reponse protocols.
request_responses
:
request_responses
::
RequestResponsesBehaviour
,
/// Block request handling.
block_requests
:
block_requests
::
BlockRequests
<
B
>
,
/// Finality proof request handling.
...
...
@@ -76,22 +82,40 @@ pub enum BehaviourOut<B: BlockT> {
RandomKademliaStarted
(
ProtocolId
),
/// We have received a request from a peer and answered it.
AnsweredRequest
{
///
/// This event is generated for statistics purposes.
InboundRequest
{
/// Peer which sent us a request.
peer
:
PeerId
,
/// Protocol name of the request.
protocol
:
String
,
/// Time it took to build the response.
build_time
:
Duration
,
protocol
:
Cow
<
'static
,
str
>
,
/// If `Ok`, contains the time elapsed between when we received the request and when we
/// sent back the response. If `Err`, the error that happened.
result
:
Result
<
Duration
,
ResponseFailure
>
,
},
/// A request initiated using [`Behaviour::send_request`] has succeeded or failed.
RequestFinished
{
/// Request that has succeeded.
request_id
:
RequestId
,
/// Response sent by the remote or reason for failure.
result
:
Result
<
Vec
<
u8
>
,
RequestFailure
>
,
},
/// Started a new request with the given node.
RequestStarted
{
///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestStarted
{
peer
:
PeerId
,
/// Protocol name of the request.
protocol
:
String
,
},
/// Finished, successfully or not, a previously-started request.
RequestFinished
{
///
/// This event is for statistics purposes only. The request and response handling are entirely
/// internal to the behaviour.
OpaqueRequestFinished
{
/// Who we were requesting.
peer
:
PeerId
,
/// Protocol name of the request.
...
...
@@ -161,17 +185,20 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
finality_proof_requests
:
finality_requests
::
FinalityProofRequests
<
B
>
,
light_client_handler
:
light_client_handler
::
LightClientHandler
<
B
>
,
disco_config
:
DiscoveryConfig
,
)
->
Self
{
Behaviour
{
request_response_protocols
:
Vec
<
request_responses
::
ProtocolConfig
>
,
)
->
Result
<
Self
,
request_responses
::
RegisterError
>
{
Ok
(
Behaviour
{
substrate
,
peer_info
:
peer_info
::
PeerInfoBehaviour
::
new
(
user_agent
,
local_public_key
),
discovery
:
disco_config
.finish
(),
request_responses
:
request_responses
::
RequestResponsesBehaviour
::
new
(
request_response_protocols
.into_iter
())
?
,
block_requests
,
finality_proof_requests
,
light_client_handler
,
events
:
VecDeque
::
new
(),
role
,
}
}
)
}
/// Returns the list of nodes that we know exist in the network.
...
...
@@ -208,6 +235,16 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
self
.peer_info
.node
(
peer_id
)
}
/// Initiates sending a request.
///
/// An error is returned if we are not connected to the target peer of if the protocol doesn't
/// match one that has been registered.
pub
fn
send_request
(
&
mut
self
,
target
:
&
PeerId
,
protocol
:
&
str
,
request
:
Vec
<
u8
>
)
->
Result
<
RequestId
,
SendRequestError
>
{
self
.request_responses
.send_request
(
target
,
protocol
,
request
)
}
/// Registers a new notifications protocol.
///
/// Please call `event_stream` before registering a protocol, otherwise you may miss events
...
...
@@ -218,7 +255,7 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
pub
fn
register_notifications_protocol
(
&
mut
self
,
engine_id
:
ConsensusEngineId
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
,
)
{
// This is the message that we will send to the remote as part of the initial handshake.
// At the moment, we force this to be an encoded `Roles`.
...
...
@@ -298,18 +335,18 @@ Behaviour<B, H> {
CustomMessageOutcome
::
BlockRequest
{
target
,
request
}
=>
{
match
self
.block_requests
.send_request
(
&
target
,
request
)
{
block_requests
::
SendRequestOutcome
::
Ok
=>
{
self
.events
.push_back
(
BehaviourOut
::
RequestStarted
{
self
.events
.push_back
(
BehaviourOut
::
Opaque
RequestStarted
{
peer
:
target
,
protocol
:
self
.block_requests
.protocol_name
()
.to_owned
(),
});
},
block_requests
::
SendRequestOutcome
::
Replaced
{
request_duration
,
..
}
=>
{
self
.events
.push_back
(
BehaviourOut
::
RequestFinished
{
self
.events
.push_back
(
BehaviourOut
::
Opaque
RequestFinished
{
peer
:
target
.clone
(),
protocol
:
self
.block_requests
.protocol_name
()
.to_owned
(),
request_duration
,
});
self
.events
.push_back
(
BehaviourOut
::
RequestStarted
{
self
.events
.push_back
(
BehaviourOut
::
Opaque
RequestStarted
{
peer
:
target
,
protocol
:
self
.block_requests
.protocol_name
()
.to_owned
(),
});
...
...
@@ -358,18 +395,39 @@ Behaviour<B, H> {
}
}
impl
<
B
:
BlockT
,
H
:
ExHashT
>
NetworkBehaviourEventProcess
<
request_responses
::
Event
>
for
Behaviour
<
B
,
H
>
{
fn
inject_event
(
&
mut
self
,
event
:
request_responses
::
Event
)
{
match
event
{
request_responses
::
Event
::
InboundRequest
{
peer
,
protocol
,
result
}
=>
{
self
.events
.push_back
(
BehaviourOut
::
InboundRequest
{
peer
,
protocol
,
result
,
});
}
request_responses
::
Event
::
RequestFinished
{
request_id
,
result
}
=>
{
self
.events
.push_back
(
BehaviourOut
::
RequestFinished
{
request_id
,
result
,
});
},
}
}
}
impl
<
B
:
BlockT
,
H
:
ExHashT
>
NetworkBehaviourEventProcess
<
block_requests
::
Event
<
B
>>
for
Behaviour
<
B
,
H
>
{
fn
inject_event
(
&
mut
self
,
event
:
block_requests
::
Event
<
B
>
)
{
match
event
{
block_requests
::
Event
::
AnsweredRequest
{
peer
,
total_handling_time
}
=>
{
self
.events
.push_back
(
BehaviourOut
::
Answere
dRequest
{
self
.events
.push_back
(
BehaviourOut
::
Inboun
dRequest
{
peer
,
protocol
:
self
.block_requests
.protocol_name
()
.to_owned
(),
build_time
:
total_handling_time
,
protocol
:
self
.block_requests
.protocol_name
()
.to_owned
()
.into
()
,
result
:
Ok
(
total_handling_time
)
,
});
},
block_requests
::
Event
::
Response
{
peer
,
original_request
:
_
,
response
,
request_duration
}
=>
{
self
.events
.push_back
(
BehaviourOut
::
RequestFinished
{
self
.events
.push_back
(
BehaviourOut
::
Opaque
RequestFinished
{
peer
:
peer
.clone
(),
protocol
:
self
.block_requests
.protocol_name
()
.to_owned
(),
request_duration
,
...
...
@@ -381,7 +439,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<block_requests::Event<B
block_requests
::
Event
::
RequestTimeout
{
peer
,
request_duration
,
..
}
=>
{
// There doesn't exist any mechanism to report cancellations or timeouts yet, so
// we process them by disconnecting the node.
self
.events
.push_back
(
BehaviourOut
::
RequestFinished
{
self
.events
.push_back
(
BehaviourOut
::
Opaque
RequestFinished
{
peer
:
peer
.clone
(),
protocol
:
self
.block_requests
.protocol_name
()
.to_owned
(),
request_duration
,
...
...
client/network/src/config.rs
View file @
1fbcc94b
...
...
@@ -23,6 +23,7 @@
pub
use
crate
::
chain
::{
Client
,
FinalityProofProvider
};
pub
use
crate
::
on_demand_layer
::{
AlwaysBadChecker
,
OnDemand
};
pub
use
crate
::
request_responses
::{
IncomingRequest
,
ProtocolConfig
as
RequestResponseConfig
};
pub
use
libp2p
::{
identity
,
core
::
PublicKey
,
wasm_ext
::
ExtTransport
,
build_multiaddr
};
// Note: this re-export shouldn't be part of the public API of the crate and will be removed in
...
...
@@ -34,9 +35,10 @@ use crate::ExHashT;
use
core
::{
fmt
,
iter
};
use
futures
::
future
;
use
libp2p
::
identity
::{
ed25519
,
Keypair
};
use
libp2p
::
wasm_ext
;
use
libp2p
::{
multiaddr
,
Multiaddr
,
PeerId
};
use
libp2p
::{
identity
::{
ed25519
,
Keypair
},
multiaddr
,
wasm_ext
,
Multiaddr
,
PeerId
,
};
use
prometheus_endpoint
::
Registry
;
use
sp_consensus
::{
block_validation
::
BlockAnnounceValidator
,
import_queue
::
ImportQueue
};
use
sp_runtime
::{
traits
::
Block
as
BlockT
,
ConsensusEngineId
};
...
...
@@ -413,7 +415,9 @@ pub struct NetworkConfiguration {
pub
node_key
:
NodeKeyConfig
,
/// List of notifications protocols that the node supports. Must also include a
/// `ConsensusEngineId` for backwards-compatibility.
pub
notifications_protocols
:
Vec
<
(
ConsensusEngineId
,
Cow
<
'static
,
[
u8
]
>
)
>
,
pub
notifications_protocols
:
Vec
<
(
ConsensusEngineId
,
Cow
<
'static
,
str
>
)
>
,
/// List of request-response protocols that the node supports.
pub
request_response_protocols
:
Vec
<
RequestResponseConfig
>
,
/// Maximum allowed number of incoming connections.
pub
in_peers
:
u32
,
/// Number of outgoing connections we're trying to maintain.
...
...
@@ -449,6 +453,7 @@ impl NetworkConfiguration {
boot_nodes
:
Vec
::
new
(),
node_key
,
notifications_protocols
:
Vec
::
new
(),
request_response_protocols
:
Vec
::
new
(),
in_peers
:
25
,
out_peers
:
75
,
reserved_nodes
:
Vec
::
new
(),
...
...
@@ -465,9 +470,7 @@ impl NetworkConfiguration {
allow_non_globals_in_dht
:
false
,
}
}
}
impl
NetworkConfiguration
{
/// Create new default configuration for localhost-only connection with random port (useful for testing)
pub
fn
new_local
()
->
NetworkConfiguration
{
let
mut
config
=
NetworkConfiguration
::
new
(
...
...
client/network/src/error.rs
View file @
1fbcc94b
...
...
@@ -21,7 +21,7 @@
use
crate
::
config
::
TransportConfig
;
use
libp2p
::{
PeerId
,
Multiaddr
};
use
std
::
fmt
;
use
std
::
{
borrow
::
Cow
,
fmt
}
;
/// Result type alias for the network.
pub
type
Result
<
T
>
=
std
::
result
::
Result
<
T
,
Error
>
;
...
...
@@ -61,6 +61,12 @@ pub enum Error {
/// The invalid addresses.
addresses
:
Vec
<
Multiaddr
>
,
},
/// The same request-response protocol has been registered multiple times.
#[display(fmt
=
"Request-response protocol registered multiple times: {}"
,
protocol)]
DuplicateRequestResponseProtocol
{
/// Name of the protocol registered multiple times.
protocol
:
Cow
<
'static
,
str
>
,
},
}
// Make `Debug` use the `Display` implementation.
...
...
@@ -78,6 +84,7 @@ impl std::error::Error for Error {
Error
::
DuplicateBootnode
{
..
}
=>
None
,
Error
::
Prometheus
(
ref
err
)
=>
Some
(
err
),
Error
::
AddressesForAnotherTransport
{
..
}
=>
None
,
Error
::
DuplicateRequestResponseProtocol
{
..
}
=>
None
,
}
}
}
client/network/src/gossip/tests.rs
View file @
1fbcc94b
...
...
@@ -130,14 +130,14 @@ fn build_nodes_one_proto()
let
listen_addr
=
config
::
build_multiaddr!
[
Memory
(
rand
::
random
::
<
u64
>
())];
let
(
node1
,
events_stream1
)
=
build_test_full_node
(
config
::
NetworkConfiguration
{
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
&
b
"/foo"
[
..
]
))],
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
"/foo"
))],
listen_addresses
:
vec!
[
listen_addr
.clone
()],
transport
:
config
::
TransportConfig
::
MemoryOnly
,
..
config
::
NetworkConfiguration
::
new_local
()
});
let
(
node2
,
events_stream2
)
=
build_test_full_node
(
config
::
NetworkConfiguration
{
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
&
b
"/foo"
[
..
]
))],
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
"/foo"
))],
listen_addresses
:
vec!
[],
reserved_nodes
:
vec!
[
config
::
MultiaddrWithPeerId
{
multiaddr
:
listen_addr
,
...
...
client/network/src/lib.rs
View file @
1fbcc94b
...
...
@@ -253,6 +253,7 @@ mod finality_requests;
mod
light_client_handler
;
mod
on_demand_layer
;
mod
protocol
;
mod
request_responses
;
mod
schema
;
mod
service
;
mod
transport
;
...
...
@@ -263,13 +264,10 @@ pub mod error;
pub
mod
gossip
;
pub
mod
network_state
;
pub
use
service
::{
NetworkService
,
NetworkWorker
};
pub
use
protocol
::
PeerInfo
;
pub
use
protocol
::
event
::{
Event
,
DhtEvent
,
ObservedRole
};
pub
use
protocol
::
sync
::
SyncState
;
pub
use
libp2p
::{
Multiaddr
,
PeerId
};
#[doc(inline)]
pub
use
libp2p
::
multiaddr
;
pub
use
libp2p
::{
multiaddr
,
Multiaddr
,
PeerId
};
pub
use
protocol
::{
event
::{
DhtEvent
,
Event
,
ObservedRole
},
sync
::
SyncState
,
PeerInfo
};
pub
use
service
::{
NetworkService
,
NetworkWorker
,
RequestFailure
,
OutboundFailure
};
pub
use
sc_peerset
::
ReputationChange
;
use
sp_runtime
::
traits
::{
Block
as
BlockT
,
NumberFor
};
...
...
client/network/src/protocol.rs
View file @
1fbcc94b
...
...
@@ -245,13 +245,13 @@ pub struct Protocol<B: BlockT, H: ExHashT> {
/// Handles opening the unique substream and sending and receiving raw messages.
behaviour
:
GenericProto
,
/// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine
:
HashMap
<
ConsensusEngineId
,
Cow
<
'static
,
[
u8
]
>>
,
protocol_name_by_engine
:
HashMap
<
ConsensusEngineId
,
Cow
<
'static
,
str
>>
,
/// For each protocol name, the legacy equivalent.
legacy_equiv_by_name
:
HashMap
<
Cow
<
'static
,
[
u8
]
>
,
Fallback
>
,
legacy_equiv_by_name
:
HashMap
<
Cow
<
'static
,
str
>
,
Fallback
>
,
/// Name of the protocol used for transactions.
transactions_protocol
:
Cow
<
'static
,
[
u8
]
>
,
transactions_protocol
:
Cow
<
'static
,
str
>
,
/// Name of the protocol used for block announces.
block_announces_protocol
:
Cow
<
'static
,
[
u8
]
>
,
block_announces_protocol
:
Cow
<
'static
,
str
>
,
/// Prometheus metrics.
metrics
:
Option
<
Metrics
>
,
/// The `PeerId`'s of all boot nodes.
...
...
@@ -417,19 +417,21 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
let
mut
legacy_equiv_by_name
=
HashMap
::
new
();
let
transactions_protocol
:
Cow
<
'static
,
[
u8
]
>
=
Cow
::
from
({
let
mut
proto
=
b"/"
.to_vec
();
proto
.extend
(
protocol_id
.as_ref
()
.as_bytes
());
proto
.extend
(
b"/transactions/1"
);
let
transactions_protocol
:
Cow
<
'static
,
str
>
=
Cow
::
from
({
let
mut
proto
=
String
::
new
();
proto
.push_str
(
"/"
);
proto
.push_str
(
protocol_id
.as_ref
());
proto
.push_str
(
"/transactions/1"
);
proto
});
behaviour
.register_notif_protocol
(
transactions_protocol
.clone
(),
Vec
::
new
());
legacy_equiv_by_name
.insert
(
transactions_protocol
.clone
(),
Fallback
::
Transactions
);
let
block_announces_protocol
:
Cow
<
'static
,
[
u8
]
>
=
Cow
::
from
({
let
mut
proto
=
b"/"
.to_vec
();
proto
.extend
(
protocol_id
.as_ref
()
.as_bytes
());
proto
.extend
(
b"/block-announces/1"
);
let
block_announces_protocol
:
Cow
<
'static
,
str
>
=
Cow
::
from
({
let
mut
proto
=
String
::
new
();
proto
.push_str
(
"/"
);
proto
.push_str
(
protocol_id
.as_ref
());
proto
.push_str
(
"/block-announces/1"
);
proto
});
behaviour
.register_notif_protocol
(
...
...
@@ -646,7 +648,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
messages
:
vec!
[(
msg
.engine_id
,
From
::
from
(
msg
.data
))],
}
}
else
{
warn
!
(
target
:
"sync"
,
"Received message on non-registered protocol: {:?}"
,
msg
.engine_id
);
debug
!
(
target
:
"sync"
,
"Received message on non-registered protocol: {:?}"
,
msg
.engine_id
);
CustomMessageOutcome
::
None
},
GenericMessage
::
ConsensusBatch
(
messages
)
=>
{
...
...
@@ -656,7 +658,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if
self
.protocol_name_by_engine
.contains_key
(
&
msg
.engine_id
)
{
Some
((
msg
.engine_id
,
From
::
from
(
msg
.data
)))
}
else
{
warn
!
(
target
:
"sync"
,
"Received message on non-registered protocol: {:?}"
,
msg
.engine_id
);
debug
!
(
target
:
"sync"
,
"Received message on non-registered protocol: {:?}"
,
msg
.engine_id
);
None
}
})
...
...
@@ -679,7 +681,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
fn
send_message
(
&
mut
self
,
who
:
&
PeerId
,
message
:
Option
<
(
Cow
<
'static
,
[
u8
]
>
,
Vec
<
u8
>
)
>
,
message
:
Option
<
(
Cow
<
'static
,
str
>
,
Vec
<
u8
>
)
>
,
legacy
:
Message
<
B
>
,
)
{
send_message
::
<
B
>
(
...
...
@@ -1076,7 +1078,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
pub
fn
register_notifications_protocol
<
'a
>
(
&
'a
mut
self
,
engine_id
:
ConsensusEngineId
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
,
handshake_message
:
Vec
<
u8
>
,
)
->
impl
Iterator
<
Item
=
(
&
'a
PeerId
,
Roles
,
&
'a
NotificationsSink
)
>
+
'a
{
let
protocol_name
=
protocol_name
.into
();
...
...
@@ -1607,7 +1609,7 @@ fn send_message<B: BlockT>(
behaviour
:
&
mut
GenericProto
,
stats
:
&
mut
HashMap
<&
'static
str
,
PacketStats
>
,
who
:
&
PeerId
,
message
:
Option
<
(
Cow
<
'static
,
[
u8
]
>
,
Vec
<
u8
>
)
>
,
message
:
Option
<
(
Cow
<
'static
,
str
>
,
Vec
<
u8
>
)
>
,
legacy_message
:
Message
<
B
>
,
)
{
let
encoded
=
legacy_message
.encode
();
...
...
@@ -1795,7 +1797,7 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
}
None
=>
{
error
!
(
target
:
"sub-libp2p"
,
"Received notification from unknown protocol {:?}"
,
protocol_name
);
debug
!
(
target
:
"sub-libp2p"
,
"Received notification from unknown protocol {:?}"
,
protocol_name
);
CustomMessageOutcome
::
None
}
}
...
...
client/network/src/protocol/generic_proto/behaviour.rs
View file @
1fbcc94b
...
...
@@ -120,7 +120,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols
:
Vec
<
(
Cow
<
'static
,
[
u8
]
>
,
Arc
<
RwLock
<
Vec
<
u8
>>>
)
>
,
notif_protocols
:
Vec
<
(
Cow
<
'static
,
str
>
,
Arc
<
RwLock
<
Vec
<
u8
>>>
)
>
,
/// Receiver for instructions about who to connect to or disconnect from.
peerset
:
sc_peerset
::
Peerset
,
...
...
@@ -322,7 +322,7 @@ pub enum GenericProtoOut {
/// Id of the peer the message came from.
peer_id
:
PeerId
,
/// Engine corresponding to the message.
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
/// Message that has been received.
message
:
BytesMut
,
},
...
...
@@ -360,7 +360,7 @@ impl GenericProto {
/// will retain the protocols that were registered then, and not any new one.
pub
fn
register_notif_protocol
(
&
mut
self
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
,
handshake_msg
:
impl
Into
<
Vec
<
u8
>>
)
{
self
.notif_protocols
.push
((
protocol_name
.into
(),
Arc
::
new
(
RwLock
::
new
(
handshake_msg
.into
()))));
...
...
@@ -371,10 +371,10 @@ impl GenericProto {
/// Has no effect if the protocol is unknown.
pub
fn
set_notif_protocol_handshake
(
&
mut
self
,
protocol_name
:
&
[
u8
]
,
protocol_name
:
&
str
,
handshake_message
:
impl
Into
<
Vec
<
u8
>>
)
{
if
let
Some
(
protocol
)
=
self
.notif_protocols
.iter_mut
()
.find
(|(
name
,
_
)|
name
==
&
protocol_name
)
{
if
let
Some
(
protocol
)
=
self
.notif_protocols
.iter_mut
()
.find
(|(
name
,
_
)|
name
==
protocol_name
)
{
*
protocol
.1
.write
()
=
handshake_message
.into
();
}
}
...
...
@@ -551,7 +551,7 @@ impl GenericProto {
pub
fn
write_notification
(
&
mut
self
,
target
:
&
PeerId
,
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
message
:
impl
Into
<
Vec
<
u8
>>
,
encoded_fallback_message
:
Vec
<
u8
>
,
)
{
...
...
@@ -569,11 +569,11 @@ impl GenericProto {
target
:
"sub-libp2p"
,
"External API => Notification({:?}, {:?})"
,
target
,
str
::
from_utf8
(
&
protocol_name
)
protocol_name
,
);
trace!
(
target
:
"sub-libp2p"
,
"Handler({:?}) <= Packet"
,
target
);
notifs_sink
.send_sync_notification
(
&
protocol_name
,
protocol_name
,
encoded_fallback_message
,
message
);
...
...
@@ -1374,7 +1374,7 @@ impl NetworkBehaviour for GenericProto {
target
:
"sub-libp2p"
,
"Handler({:?}) => Notification({:?})"
,
source
,
str
::
from_utf8
(
&
protocol_name
)
protocol_name
,
);
trace!
(
target
:
"sub-libp2p"
,
"External API <= Message({:?}, {:?})"
,
protocol_name
,
source
);
let
event
=
GenericProtoOut
::
Notification
{
...
...
client/network/src/protocol/generic_proto/handler/group.rs
View file @
1fbcc94b
...
...
@@ -224,7 +224,7 @@ pub enum NotifsHandlerOut {
/// Received a message on a custom protocol substream.
Notification
{
/// Name of the protocol of the message.
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
/// Message that has been received.
message
:
BytesMut
,
...
...
@@ -270,7 +270,7 @@ enum NotificationsSinkMessage {
/// Message emitted by [`NotificationsSink::reserve_notification`] and
/// [`NotificationsSink::write_notification_now`].
Notification
{
protocol_name
:
Vec
<
u8
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
encoded_fallback_message
:
Vec
<
u8
>
,
message
:
Vec
<
u8
>
,
},
...
...
@@ -311,13 +311,13 @@ impl NotificationsSink {
/// This method will be removed in a future version.
pub
fn
send_sync_notification
<
'a
>
(
&
'a
self
,
protocol_name
:
&
[
u8
]
,
protocol_name
:
Cow
<
'static
,
str
>
,
encoded_fallback_message
:
impl
Into
<
Vec
<
u8
>>
,
message
:
impl
Into
<
Vec
<
u8
>>
)
{
let
mut
lock
=
self
.inner.sync_channel
.lock
();
let
result
=
lock
.try_send
(
NotificationsSinkMessage
::
Notification
{
protocol_name
:
protocol_name
.to_owned
()
,
protocol_name
:
protocol_name
,
encoded_fallback_message
:
encoded_fallback_message
.into
(),
message
:
message
.into
()
});
...
...
@@ -336,12 +336,12 @@ impl NotificationsSink {
///
/// The protocol name is expected to be checked ahead of calling this method. It is a logic
/// error to send a notification using an unknown protocol.
pub
async
fn
reserve_notification
<
'a
>
(
&
'a
self
,
protocol_name
:
&
[
u8
]
)
->
Result
<
Ready
<
'a
>
,
()
>
{
pub
async
fn
reserve_notification
<
'a
>
(
&
'a
self
,
protocol_name
:
Cow
<
'static
,
str
>
)
->
Result
<
Ready
<
'a
>
,
()
>
{
let
mut
lock
=
self
.inner.async_channel
.lock
()
.await
;
let
poll_ready
=
future
::
poll_fn
(|
cx
|
lock
.poll_ready
(
cx
))
.await
;
if
poll_ready
.is_ok
()
{
Ok
(
Ready
{
protocol_name
:
protocol_name
.to_owned
()
,
lock
})
Ok
(
Ready
{
protocol_name
:
protocol_name
,
lock
})
}
else
{
Err
(())
}
...
...
@@ -355,7 +355,7 @@ pub struct Ready<'a> {
/// Guarded channel. The channel inside is guaranteed to not be full.
lock
:
FuturesMutexGuard
<
'a
,
mpsc
::
Sender
<
NotificationsSinkMessage
>>
,
/// Name of the protocol. Should match one of the protocols passed at initialization.
protocol_name
:
Vec
<
u8
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
}
impl
<
'a
>
Ready
<
'a
>
{
...
...
@@ -392,7 +392,7 @@ impl NotifsHandlerProto {
/// ourselves or respond to handshake from the remote.
pub
fn
new
(
legacy
:
RegisteredProtocol
,
list
:
impl
Into
<
Vec
<
(
Cow
<
'static
,
[
u8
]
>
,
Arc
<
RwLock
<
Vec
<
u8
>>>
)
>>
,
list
:
impl
Into
<
Vec
<
(
Cow
<
'static
,
str
>
,
Arc
<
RwLock
<
Vec
<
u8
>>>
)
>>
,
)
->
Self
{
let
list
=
list
.into
();
...
...
@@ -613,7 +613,7 @@ impl ProtocolsHandler for NotifsHandler {
message
}
=>
{
for
(
handler
,
_
)
in
&
mut
self
.out_handlers
{
if
handler
.protocol_name
()
==
&
protocol_name
[
..
]
&&
handler
.is_open
()
{
if
*
handler
.protocol_name
()
==
protocol_name
&&
handler
.is_open
()
{
handler
.send_or_discard
(
message
);
continue
'poll_notifs_sink
;
}
...
...
@@ -698,7 +698,7 @@ impl ProtocolsHandler for NotifsHandler {
if
self
.notifications_sink_rx
.is_some
()
{
let
msg
=
NotifsHandlerOut
::
Notification
{
message
,
protocol_name
:
handler
.protocol_name
()
.
to_owned
()
.into
(),
protocol_name
:
handler
.protocol_name
()
.
clone
(),
};
return
Poll
::
Ready
(
ProtocolsHandlerEvent
::
Custom
(
msg
));
}
...
...
client/network/src/protocol/generic_proto/handler/notif_in.rs
View file @
1fbcc94b
...
...
@@ -109,7 +109,7 @@ pub enum NotifsInHandlerOut {
impl
NotifsInHandlerProto
{
/// Builds a new `NotifsInHandlerProto`.
pub
fn
new
(
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
)
->
Self
{
NotifsInHandlerProto
{
in_protocol
:
NotificationsIn
::
new
(
protocol_name
),
...
...
@@ -136,7 +136,7 @@ impl IntoProtocolsHandler for NotifsInHandlerProto {
impl
NotifsInHandler
{
/// Returns the name of the protocol that we accept.
pub
fn
protocol_name
(
&
self
)
->
&
[
u8
]
{
pub
fn
protocol_name
(
&
self
)
->
&
Cow
<
'static
,
str
>
{
self
.in_protocol
.protocol_name
()
}
}
...
...
client/network/src/protocol/generic_proto/handler/notif_out.rs
View file @
1fbcc94b
...
...
@@ -57,13 +57,13 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
/// See the documentation of [`NotifsOutHandler`] for more information.
pub
struct
NotifsOutHandlerProto
{
/// Name of the protocol to negotiate.
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
}
impl
NotifsOutHandlerProto
{
/// Builds a new [`NotifsOutHandlerProto`]. Will use the given protocol name for the
/// notifications substream.
pub
fn
new
(
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
)
->
Self
{
pub
fn
new
(
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
)
->
Self
{
NotifsOutHandlerProto
{
protocol_name
:
protocol_name
.into
(),
}
...
...
@@ -97,7 +97,7 @@ impl IntoProtocolsHandler for NotifsOutHandlerProto {
/// the remote for the purpose of sending notifications to it.
pub
struct
NotifsOutHandler
{
/// Name of the protocol to negotiate.
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
/// Relationship with the node we're connected to.
state
:
State
,
...
...
@@ -220,7 +220,7 @@ impl NotifsOutHandler {
}
/// Returns the name of the protocol that we negotiate.
pub
fn
protocol_name
(
&
self
)
->
&
[
u8
]
{
pub
fn
protocol_name
(
&
self
)
->
&
Cow
<
'static
,
str
>
{
&
self
.protocol_name
}
...
...
client/network/src/protocol/generic_proto/upgrade/notifications.rs
View file @
1fbcc94b
...
...
@@ -50,7 +50,7 @@ const MAX_HANDSHAKE_SIZE: usize = 1024;
#[derive(Debug,
Clone)]
pub
struct
NotificationsIn
{
/// Protocol name to use when negotiating the substream.
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
}
/// Upgrade that opens a substream, waits for the remote to accept by sending back a status
...
...
@@ -58,7 +58,7 @@ pub struct NotificationsIn {
#[derive(Debug,
Clone)]
pub
struct
NotificationsOut
{
/// Protocol name to use when negotiating the substream.
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
/// Message to send when we start the handshake.
initial_message
:
Vec
<
u8
>
,
}
...
...
@@ -100,14 +100,14 @@ pub struct NotificationsOutSubstream<TSubstream> {
impl
NotificationsIn
{
/// Builds a new potential upgrade.
pub
fn
new
(
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
)
->
Self
{
pub
fn
new
(
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
)
->
Self
{
NotificationsIn
{
protocol_name
:
protocol_name
.into
(),
}
}
/// Returns the name of the protocol that we accept.
pub
fn
protocol_name
(
&
self
)
->
&
[
u8
]
{
pub
fn
protocol_name
(
&
self
)
->
&
Cow
<
'static
,
str
>
{
&
self
.protocol_name
}
}
...
...
@@ -117,7 +117,11 @@ impl UpgradeInfo for NotificationsIn {
type
InfoIter
=
iter
::
Once
<
Self
::
Info
>
;
fn
protocol_info
(
&
self
)
->
Self
::
InfoIter
{
iter
::
once
(
self
.protocol_name
.clone
())
let
bytes
:
Cow
<
'static
,
[
u8
]
>
=
match
&
self
.protocol_name
{
Cow
::
Borrowed
(
s
)
=>
Cow
::
Borrowed
(
s
.as_bytes
()),
Cow
::
Owned
(
s
)
=>
Cow
::
Owned
(
s
.as_bytes
()
.to_vec
())
};
iter
::
once
(
bytes
)
}
}
...
...
@@ -144,7 +148,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
let
mut
initial_message
=
vec!
[
0u8
;
initial_message_len
];
if
!
initial_message
.is_empty
()
{
socket
.read
(
&
mut
initial_message
)
.await
?
;
socket
.read
_exact
(
&
mut
initial_message
)
.await
?
;
}
let
substream
=
NotificationsInSubstream
{
...
...
@@ -244,7 +248,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,
impl
NotificationsOut
{
/// Builds a new potential upgrade.
pub
fn
new
(
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
,
initial_message
:
impl
Into
<
Vec
<
u8
>>
)
->
Self
{
pub
fn
new
(
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
,
initial_message
:
impl
Into
<
Vec
<
u8
>>
)
->
Self
{
let
initial_message
=
initial_message
.into
();
if
initial_message
.len
()
>
MAX_HANDSHAKE_SIZE
{
error!
(
target
:
"sub-libp2p"
,
"Outbound networking handshake is above allowed protocol limit"
);
...
...
@@ -262,7 +266,11 @@ impl UpgradeInfo for NotificationsOut {
type
InfoIter
=
iter
::
Once
<
Self
::
Info
>
;
fn
protocol_info
(
&
self
)
->
Self
::
InfoIter
{
iter
::
once
(
self
.protocol_name
.clone
())
let
bytes
:
Cow
<
'static
,
[
u8
]
>
=
match
&
self
.protocol_name
{
Cow
::
Borrowed
(
s
)
=>
Cow
::
Borrowed
(
s
.as_bytes
()),
Cow
::
Owned
(
s
)
=>
Cow
::
Owned
(
s
.as_bytes
()
.to_vec
())
};
iter
::
once
(
bytes
)
}
}
...
...
@@ -292,7 +300,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
let
mut
handshake
=
vec!
[
0u8
;
handshake_len
];
if
!
handshake
.is_empty
()
{
socket
.read
(
&
mut
handshake
)
.await
?
;
socket
.read
_exact
(
&
mut
handshake
)
.await
?
;
}
Ok
((
handshake
,
NotificationsOutSubstream
{
...
...
@@ -378,10 +386,11 @@ mod tests {
use
async_std
::
net
::{
TcpListener
,
TcpStream
};
use
futures
::{
prelude
::
*
,
channel
::
oneshot
};
use
libp2p
::
core
::
upgrade
;
use
std
::
borrow
::
Cow
;
#[test]
fn
basic_works
()
{
const
PROTO_NAME
:
&
'static
[
u8
]
=
b
"/test/proto/1"
;
const
PROTO_NAME
:
Cow
<
'static
,
str
>
=
Cow
::
Borrowed
(
"/test/proto/1"
)
;
let
(
listener_addr_tx
,
listener_addr_rx
)
=
oneshot
::
channel
();
let
client
=
async_std
::
task
::
spawn
(
async
move
{
...
...
@@ -420,7 +429,7 @@ mod tests {
fn
empty_handshake
()
{
// Check that everything still works when the handshake messages are empty.
const
PROTO_NAME
:
&
'static
[
u8
]
=
b
"/test/proto/1"
;
const
PROTO_NAME
:
Cow
<
'static
,
str
>
=
Cow
::
Borrowed
(
"/test/proto/1"
)
;
let
(
listener_addr_tx
,
listener_addr_rx
)
=
oneshot
::
channel
();
let
client
=
async_std
::
task
::
spawn
(
async
move
{
...
...
@@ -457,7 +466,7 @@ mod tests {
#[test]
fn
refused
()
{
const
PROTO_NAME
:
&
'static
[
u8
]
=
b
"/test/proto/1"
;
const
PROTO_NAME
:
Cow
<
'static
,
str
>
=
Cow
::
Borrowed
(
"/test/proto/1"
)
;
let
(
listener_addr_tx
,
listener_addr_rx
)
=
oneshot
::
channel
();
let
client
=
async_std
::
task
::
spawn
(
async
move
{
...
...
@@ -495,7 +504,7 @@ mod tests {
#[test]
fn
large_initial_message_refused
()
{
const
PROTO_NAME
:
&
'static
[
u8
]
=
b
"/test/proto/1"
;
const
PROTO_NAME
:
Cow
<
'static
,
str
>
=
Cow
::
Borrowed
(
"/test/proto/1"
)
;
let
(
listener_addr_tx
,
listener_addr_rx
)
=
oneshot
::
channel
();
let
client
=
async_std
::
task
::
spawn
(
async
move
{
...
...
@@ -526,7 +535,7 @@ mod tests {
#[test]
fn
large_handshake_refused
()
{
const
PROTO_NAME
:
&
'static
[
u8
]
=
b
"/test/proto/1"
;
const
PROTO_NAME
:
Cow
<
'static
,
str
>
=
Cow
::
Borrowed
(
"/test/proto/1"
)
;
let
(
listener_addr_tx
,
listener_addr_rx
)
=
oneshot
::
channel
();
let
client
=
async_std
::
task
::
spawn
(
async
move
{
...
...
client/network/src/request_responses.rs
0 → 100644
View file @
1fbcc94b
This diff is collapsed.
Click to expand it.
client/network/src/service.rs
View file @
1fbcc94b
...
...
@@ -29,7 +29,7 @@
use
crate
::{
ExHashT
,
NetworkStateInfo
,
behaviour
::{
Behaviour
,
BehaviourOut
},
behaviour
::{
self
,
Behaviour
,
BehaviourOut
},
config
::{
parse_str_addr
,
NonReservedPeerMode
,
Params
,
Role
,
TransportConfig
},
DhtEvent
,
discovery
::
DiscoveryConfig
,
...
...
@@ -42,7 +42,7 @@ use crate::{
protocol
::{
self
,
event
::
Event
,
NotifsHandlerError
,
LegacyConnectionKillError
,
NotificationsSink
,
Ready
,
sync
::
SyncState
,
PeerInfo
,
Protocol
},
transport
,
ReputationChange
,
};
use
futures
::
prelude
::
*
;
use
futures
::
{
channel
::
oneshot
,
prelude
::
*
}
;
use
libp2p
::{
PeerId
,
multiaddr
,
Multiaddr
};
use
libp2p
::
core
::{
ConnectedPoint
,
Executor
,
connection
::{
ConnectionError
,
PendingConnectionError
},
either
::
EitherError
};
use
libp2p
::
kad
::
record
;
...
...
@@ -76,6 +76,9 @@ use std::{
},
task
::
Poll
,
};
use
wasm_timer
::
Instant
;
pub
use
behaviour
::{
ResponseFailure
,
InboundFailure
,
RequestFailure
,
OutboundFailure
};
mod
out_events
;
#[cfg(test)]
...
...
@@ -102,7 +105,7 @@ pub struct NetworkService<B: BlockT + 'static, H: ExHashT> {
/// that peer. Updated by the [`NetworkWorker`].
peers_notifications_sinks
:
Arc
<
Mutex
<
HashMap
<
(
PeerId
,
ConsensusEngineId
),
NotificationsSink
>>>
,
/// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine
:
Mutex
<
HashMap
<
ConsensusEngineId
,
Cow
<
'static
,
[
u8
]
>>>
,
protocol_name_by_engine
:
Mutex
<
HashMap
<
ConsensusEngineId
,
Cow
<
'static
,
str
>>>
,
/// Field extracted from the [`Metrics`] struct and necessary to report the
/// notifications-related metrics.
notifications_sizes_metric
:
Option
<
HistogramVec
>
,
...
...
@@ -309,16 +312,28 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
config
};
let
mut
behaviour
=
Behaviour
::
new
(
protocol
,
params
.role
,
user_agent
,
local_public
,
block_requests
,
finality_proof_requests
,
light_client_handler
,
discovery_config
);
let
mut
behaviour
=
{
let
result
=
Behaviour
::
new
(
protocol
,
params
.role
,
user_agent
,
local_public
,
block_requests
,
finality_proof_requests
,
light_client_handler
,
discovery_config
,
params
.network_config.request_response_protocols
,
);
match
result
{
Ok
(
b
)
=>
b
,
Err
(
crate
::
request_responses
::
RegisterError
::
DuplicateProtocol
(
proto
))
=>
{
return
Err
(
Error
::
DuplicateRequestResponseProtocol
{
protocol
:
proto
,
})
},
}
};
for
(
engine_id
,
protocol_name
)
in
&
params
.network_config.notifications_protocols
{
behaviour
.register_notifications_protocol
(
*
engine_id
,
protocol_name
.clone
());
...
...
@@ -404,6 +419,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peers_notifications_sinks
,
metrics
,
boot_node_ids
,
pending_requests
:
HashMap
::
with_capacity
(
128
),
})
}
...
...
@@ -630,7 +646,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
})
});
sink
.send_sync_notification
(
&
protocol_name
,
fallback
,
message
);
sink
.send_sync_notification
(
protocol_name
,
fallback
,
message
);
}
else
{
return
;
}
...
...
@@ -752,12 +768,50 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having
/// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory
pub
fn
event_stream
(
&
self
,
name
:
&
'static
str
)
->
impl
Stream
<
Item
=
Event
>
{
// Note: when transitioning to stable futures, remove the `Error` entirely
let
(
tx
,
rx
)
=
out_events
::
channel
(
name
);
let
_
=
self
.to_worker
.unbounded_send
(
ServiceToWorkerMsg
::
EventStream
(
tx
));
rx
}
/// Sends a single targeted request to a specific peer. On success, returns the response of
/// the peer.
///
/// Request-response protocols are a way to complement notifications protocols, but
/// notifications should remain the default ways of communicating information. For example, a
/// peer can announce something through a notification, after which the recipient can obtain
/// more information by performing a request.
/// As such, this function is meant to be called only with peers we are already connected to.
/// Calling this method with a `target` we are not connected to will *not* attempt to connect
/// to said peer.
///
/// No limit or throttling of concurrent outbound requests per peer and protocol are enforced.
/// Such restrictions, if desired, need to be enforced at the call site(s).
///
/// The protocol must have been registered through
/// [`NetworkConfiguration::request_response_protocols`].
pub
async
fn
request
(
&
self
,
target
:
PeerId
,
protocol
:
impl
Into
<
Cow
<
'static
,
str
>>
,
request
:
Vec
<
u8
>
)
->
Result
<
Vec
<
u8
>
,
RequestFailure
>
{
let
(
tx
,
rx
)
=
oneshot
::
channel
();
let
_
=
self
.to_worker
.unbounded_send
(
ServiceToWorkerMsg
::
Request
{
target
,
protocol
:
protocol
.into
(),
request
,
pending_response
:
tx
});
match
rx
.await
{
Ok
(
v
)
=>
v
,
// The channel can only be closed if the network worker no longer exists. If the
// network worker no longer exists, then all connections to `target` are necessarily
// closed, and we legitimately report this situation as a "ConnectionClosed".
Err
(
_
)
=>
Err
(
RequestFailure
::
Network
(
OutboundFailure
::
ConnectionClosed
)),
}
}
/// Registers a new notifications protocol.
///
/// After a protocol has been registered, you can call `write_notifications`.
...
...
@@ -774,7 +828,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
pub
fn
register_notifications_protocol
(
&
self
,
engine_id
:
ConsensusEngineId
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
[
u8
]
>>
,
protocol_name
:
impl
Into
<
Cow
<
'static
,
str
>>
,
)
{
let
protocol_name
=
protocol_name
.into
();
self
.protocol_name_by_engine
.lock
()
.insert
(
engine_id
,
protocol_name
.clone
());
...
...
@@ -1008,7 +1062,7 @@ pub struct NotificationSender {
sink
:
NotificationsSink
,
/// Name of the protocol on the wire.
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
/// Engine ID used for the fallback message.
engine_id
:
ConsensusEngineId
,
...
...
@@ -1022,7 +1076,7 @@ impl NotificationSender {
/// Returns a future that resolves when the `NotificationSender` is ready to send a notification.
pub
async
fn
ready
<
'a
>
(
&
'a
self
)
->
Result
<
NotificationSenderReady
<
'a
>
,
NotificationSenderError
>
{
Ok
(
NotificationSenderReady
{
ready
:
match
self
.sink
.reserve_notification
(
&
self
.protocol_name
)
.await
{
ready
:
match
self
.sink
.reserve_notification
(
self
.protocol_name
.clone
()
)
.await
{
Ok
(
r
)
=>
r
,
Err
(())
=>
return
Err
(
NotificationSenderError
::
Closed
),
},
...
...
@@ -1096,9 +1150,15 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
AddKnownAddress
(
PeerId
,
Multiaddr
),
SyncFork
(
Vec
<
PeerId
>
,
B
::
Hash
,
NumberFor
<
B
>
),
EventStream
(
out_events
::
Sender
),
Request
{
target
:
PeerId
,
protocol
:
Cow
<
'static
,
str
>
,
request
:
Vec
<
u8
>
,
pending_response
:
oneshot
::
Sender
<
Result
<
Vec
<
u8
>
,
RequestFailure
>>
,
},
RegisterNotifProtocol
{
engine_id
:
ConsensusEngineId
,
protocol_name
:
Cow
<
'static
,
[
u8
]
>
,
protocol_name
:
Cow
<
'static
,
str
>
,
},
DisconnectPeer
(
PeerId
),
UpdateChain
,
...
...
@@ -1132,6 +1192,13 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
metrics
:
Option
<
Metrics
>
,
/// The `PeerId`'s of all boot nodes.
boot_node_ids
:
Arc
<
HashSet
<
PeerId
>>
,
/// Requests started using [`NetworkService::request`]. Includes the channel to send back the
/// response, when the request has started, and the name of the protocol for diagnostic
/// purposes.
pending_requests
:
HashMap
<
behaviour
::
RequestId
,
(
oneshot
::
Sender
<
Result
<
Vec
<
u8
>
,
RequestFailure
>>
,
Instant
,
String
)
>
,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Shared with the [`NetworkService`].
peers_notifications_sinks
:
Arc
<
Mutex
<
HashMap
<
(
PeerId
,
ConsensusEngineId
),
NotificationsSink
>>>
,
...
...
@@ -1165,8 +1232,10 @@ struct Metrics {
peerset_num_requested
:
Gauge
<
U64
>
,
pending_connections
:
Gauge
<
U64
>
,
pending_connections_errors_total
:
CounterVec
<
U64
>
,
requests_in_total
:
HistogramVec
,
requests_out_finished
:
HistogramVec
,
requests_in_failure_total
:
CounterVec
<
U64
>
,
requests_in_success_total
:
HistogramVec
,
requests_out_failure_total
:
CounterVec
<
U64
>
,
requests_out_success_total
:
HistogramVec
,
requests_out_started_total
:
CounterVec
<
U64
>
,
}
...
...
@@ -1347,10 +1416,17 @@ impl Metrics {
),
&
[
"reason"
]
)
?
,
registry
)
?
,
requests_in_total
:
register
(
HistogramVec
::
new
(
requests_in_failure_total
:
register
(
CounterVec
::
new
(
Opts
::
new
(
"sub_libp2p_requests_in_failure_total"
,
"Total number of incoming requests that the node has failed to answer"
),
&
[
"protocol"
,
"reason"
]
)
?
,
registry
)
?
,
requests_in_success_total
:
register
(
HistogramVec
::
new
(
HistogramOpts
{
common_opts
:
Opts
::
new
(
"sub_libp2p_requests_in_total"
,
"sub_libp2p_requests_in_
success_
total"
,
"Total number of requests received and answered"
),
buckets
:
prometheus_endpoint
::
exponential_buckets
(
0.001
,
2.0
,
16
)
...
...
@@ -1358,11 +1434,18 @@ impl Metrics {
},
&
[
"protocol"
]
)
?
,
registry
)
?
,
requests_out_finished
:
register
(
HistogramVec
::
new
(
requests_out_failure_total
:
register
(
CounterVec
::
new
(
Opts
::
new
(
"sub_libp2p_requests_out_failure_total"
,
"Total number of requests that have failed"
),
&
[
"protocol"
,
"reason"
]
)
?
,
registry
)
?
,
requests_out_success_total
:
register
(
HistogramVec
::
new
(
HistogramOpts
{
common_opts
:
Opts
::
new
(
"sub_libp2p_requests_out_
finished
"
,
"
T
ime between a request's start and finish
(successful or not)
"
"sub_libp2p_requests_out_
success_total
"
,
"
For successful requests, t
ime between a request's start and finish"
),
buckets
:
prometheus_endpoint
::
exponential_buckets
(
0.001
,
2.0
,
16
)
.expect
(
"parameters are always valid values; qed"
),
...
...
@@ -1446,6 +1529,31 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this
.network_service
.user_protocol_mut
()
.set_sync_fork_request
(
peer_ids
,
&
hash
,
number
),
ServiceToWorkerMsg
::
EventStream
(
sender
)
=>
this
.event_streams
.push
(
sender
),
ServiceToWorkerMsg
::
Request
{
target
,
protocol
,
request
,
pending_response
}
=>
{
// Calling `send_request` can fail immediately in some circumstances.
// This is handled by sending back an error on the channel.
match
this
.network_service
.send_request
(
&
target
,
&
protocol
,
request
)
{
Ok
(
request_id
)
=>
{
if
let
Some
(
metrics
)
=
this
.metrics
.as_ref
()
{
metrics
.requests_out_started_total
.with_label_values
(
&
[
&
protocol
])
.inc
();
}
this
.pending_requests
.insert
(
request_id
,
(
pending_response
,
Instant
::
now
(),
protocol
.to_string
())
);
},
Err
(
behaviour
::
SendRequestError
::
NotConnected
)
=>
{
let
err
=
RequestFailure
::
Network
(
OutboundFailure
::
ConnectionClosed
);
let
_
=
pending_response
.send
(
Err
(
err
));
},
Err
(
behaviour
::
SendRequestError
::
UnknownProtocol
)
=>
{
let
err
=
RequestFailure
::
Network
(
OutboundFailure
::
UnsupportedProtocols
);
let
_
=
pending_response
.send
(
Err
(
err
));
},
}
},
ServiceToWorkerMsg
::
RegisterNotifProtocol
{
engine_id
,
protocol_name
}
=>
{
this
.network_service
.register_notifications_protocol
(
engine_id
,
protocol_name
);
...
...
@@ -1494,23 +1602,72 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
}
this
.import_queue
.import_finality_proof
(
origin
,
hash
,
nb
,
proof
);
},
Poll
::
Ready
(
SwarmEvent
::
Behaviour
(
BehaviourOut
::
Answere
dRequest
{
protocol
,
build_time
,
..
}))
=>
{
Poll
::
Ready
(
SwarmEvent
::
Behaviour
(
BehaviourOut
::
Inboun
dRequest
{
protocol
,
result
,
..
}))
=>
{
if
let
Some
(
metrics
)
=
this
.metrics
.as_ref
()
{
metrics
.requests_in_total
.with_label_values
(
&
[
&
protocol
])
.observe
(
build_time
.as_secs_f64
());
match
result
{
Ok
(
serve_time
)
=>
{
metrics
.requests_in_success_total
.with_label_values
(
&
[
&
protocol
])
.observe
(
serve_time
.as_secs_f64
());
}
Err
(
err
)
=>
{
let
reason
=
match
err
{
ResponseFailure
::
Busy
=>
"busy"
,
ResponseFailure
::
Network
(
InboundFailure
::
Timeout
)
=>
"timeout"
,
ResponseFailure
::
Network
(
InboundFailure
::
UnsupportedProtocols
)
=>
"unsupported"
,
};
metrics
.requests_in_failure_total
.with_label_values
(
&
[
&
protocol
,
reason
])
.inc
();
}
}
}
},
Poll
::
Ready
(
SwarmEvent
::
Behaviour
(
BehaviourOut
::
RequestFinished
{
request_id
,
result
}))
=>
{
if
let
Some
((
send_back
,
started
,
protocol
))
=
this
.pending_requests
.remove
(
&
request_id
)
{
if
let
Some
(
metrics
)
=
this
.metrics
.as_ref
()
{
match
&
result
{
Ok
(
_
)
=>
{
metrics
.requests_out_success_total
.with_label_values
(
&
[
&
protocol
])
.observe
(
started
.elapsed
()
.as_secs_f64
());
}
Err
(
err
)
=>
{
let
reason
=
match
err
{
RequestFailure
::
Refused
=>
"refused"
,
RequestFailure
::
Network
(
OutboundFailure
::
DialFailure
)
=>
"dial-failure"
,
RequestFailure
::
Network
(
OutboundFailure
::
Timeout
)
=>
"timeout"
,
RequestFailure
::
Network
(
OutboundFailure
::
ConnectionClosed
)
=>
"connection-closed"
,
RequestFailure
::
Network
(
OutboundFailure
::
UnsupportedProtocols
)
=>
"unsupported"
,
};
metrics
.requests_out_failure_total
.with_label_values
(
&
[
&
protocol
,
reason
])
.inc
();
}
}
}
let
_
=
send_back
.send
(
result
);
}
else
{
error!
(
"Request not in pending_requests"
);
}
},
Poll
::
Ready
(
SwarmEvent
::
Behaviour
(
BehaviourOut
::
RequestStarted
{
protocol
,
..
}))
=>
{
Poll
::
Ready
(
SwarmEvent
::
Behaviour
(
BehaviourOut
::
Opaque
RequestStarted
{
protocol
,
..
}))
=>
{
if
let
Some
(
metrics
)
=
this
.metrics
.as_ref
()
{
metrics
.requests_out_started_total
.with_label_values
(
&
[
&
protocol
])
.inc
();
}
},
Poll
::
Ready
(
SwarmEvent
::
Behaviour
(
BehaviourOut
::
RequestFinished
{
protocol
,
request_duration
,
..
}))
=>
{
Poll
::
Ready
(
SwarmEvent
::
Behaviour
(
BehaviourOut
::
Opaque
RequestFinished
{
protocol
,
request_duration
,
..
}))
=>
{
if
let
Some
(
metrics
)
=
this
.metrics
.as_ref
()
{
metrics
.requests_out_
finished
metrics
.requests_out_
success_total
.with_label_values
(
&
[
&
protocol
])
.observe
(
request_duration
.as_secs_f64
());
}
...
...
@@ -1635,14 +1792,14 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
let
reason
=
match
cause
{
Some
(
ConnectionError
::
IO
(
_
))
=>
"transport-error"
,
Some
(
ConnectionError
::
Handler
(
NodeHandlerWrapperError
::
Handler
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
B
(
EitherError
::
A
(
PingFailure
::
Timeout
)))))))))
=>
"ping-timeout"
,
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
B
(
EitherError
::
A
(
PingFailure
::
Timeout
)))))))))
)
=>
"ping-timeout"
,
Some
(
ConnectionError
::
Handler
(
NodeHandlerWrapperError
::
Handler
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
NotifsHandlerError
::
Legacy
(
LegacyConnectionKillError
)))))))))
=>
"force-closed"
,
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
NotifsHandlerError
::
Legacy
(
LegacyConnectionKillError
)))))))))
)
=>
"force-closed"
,
Some
(
ConnectionError
::
Handler
(
NodeHandlerWrapperError
::
Handler
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
NotifsHandlerError
::
SyncNotificationsClogged
))))))))
=>
"sync-notifications-clogged"
,
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
EitherError
::
A
(
NotifsHandlerError
::
SyncNotificationsClogged
))))))))
)
=>
"sync-notifications-clogged"
,
Some
(
ConnectionError
::
Handler
(
NodeHandlerWrapperError
::
Handler
(
_
)))
=>
"protocol-error"
,
Some
(
ConnectionError
::
Handler
(
NodeHandlerWrapperError
::
KeepAliveTimeout
))
=>
"keep-alive-timeout"
,
None
=>
"actively-closed"
,
...
...
@@ -1800,7 +1957,7 @@ impl<B: BlockT + 'static, H: ExHashT> Unpin for NetworkWorker<B, H> {
/// Turns bytes that are potentially UTF-8 into a reasonable representable string.
///
/// Meant to be used only for debugging or metrics-reporting purposes.
fn
maybe_utf8_bytes_to_string
(
id
:
&
[
u8
])
->
Cow
<
str
>
{
pub
(
crate
)
fn
maybe_utf8_bytes_to_string
(
id
:
&
[
u8
])
->
Cow
<
str
>
{
if
let
Ok
(
s
)
=
std
::
str
::
from_utf8
(
&
id
[
..
])
{
Cow
::
Borrowed
(
s
)
}
else
{
...
...
client/network/src/service/tests.rs
View file @
1fbcc94b
...
...
@@ -131,14 +131,14 @@ fn build_nodes_one_proto()
let
listen_addr
=
config
::
build_multiaddr!
[
Memory
(
rand
::
random
::
<
u64
>
())];
let
(
node1
,
events_stream1
)
=
build_test_full_node
(
config
::
NetworkConfiguration
{
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
&
b
"/foo"
[
..
]
))],
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
"/foo"
))],
listen_addresses
:
vec!
[
listen_addr
.clone
()],
transport
:
config
::
TransportConfig
::
MemoryOnly
,
..
config
::
NetworkConfiguration
::
new_local
()
});
let
(
node2
,
events_stream2
)
=
build_test_full_node
(
config
::
NetworkConfiguration
{
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
&
b
"/foo"
[
..
]
))],
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
"/foo"
))],
listen_addresses
:
vec!
[],
reserved_nodes
:
vec!
[
config
::
MultiaddrWithPeerId
{
multiaddr
:
listen_addr
,
...
...
@@ -281,7 +281,7 @@ fn lots_of_incoming_peers_works() {
let
listen_addr
=
config
::
build_multiaddr!
[
Memory
(
rand
::
random
::
<
u64
>
())];
let
(
main_node
,
_
)
=
build_test_full_node
(
config
::
NetworkConfiguration
{
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
&
b
"/foo"
[
..
]
))],
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
"/foo"
))],
listen_addresses
:
vec!
[
listen_addr
.clone
()],
in_peers
:
u32
::
max_value
(),
transport
:
config
::
TransportConfig
::
MemoryOnly
,
...
...
@@ -298,7 +298,7 @@ fn lots_of_incoming_peers_works() {
let
main_node_peer_id
=
main_node_peer_id
.clone
();
let
(
_dialing_node
,
event_stream
)
=
build_test_full_node
(
config
::
NetworkConfiguration
{
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
&
b
"/foo"
[
..
]
))],
notifications_protocols
:
vec!
[(
ENGINE_ID
,
From
::
from
(
"/foo"
))],
listen_addresses
:
vec!
[],
reserved_nodes
:
vec!
[
config
::
MultiaddrWithPeerId
{
multiaddr
:
listen_addr
.clone
(),
...
...
client/service/test/Cargo.toml
View file @
1fbcc94b
...
...
@@ -18,7 +18,7 @@ tokio = "0.1.22"
futures01
=
{
package
=
"futures"
,
version
=
"0.1.29"
}
log
=
"0.4.8"
env_logger
=
"0.7.0"
fdlimit
=
"0.
1.4
"
fdlimit
=
"0.
2.0
"
parking_lot
=
"0.10.0"
sc-light
=
{
version
=
"2.0.0-rc6"
,
path
=
"../../light"
}
sp-blockchain
=
{
version
=
"2.0.0-rc6"
,
path
=
"../../../primitives/blockchain"
}
...
...
frame/babe/src/lib.rs
View file @
1fbcc94b
...
...
@@ -255,7 +255,7 @@ decl_module! {
/// the equivocation proof and validate the given key ownership proof
/// against the extracted offender. If both are valid, the offence will
/// be reported.
#[weight
=
weight::
weight_for
_
report_equivocation::
<
T
>
()]
#[weight
=
weight_for
::
report_equivocation::
<
T
>
(
key_owner_proof
.
validator_count()
)]
fn
report_equivocation
(
origin
,
equivocation_proof
:
EquivocationProof
<
T
::
Header
>
,
...
...
@@ -278,7 +278,7 @@ decl_module! {
/// block authors will call it (validated in `ValidateUnsigned`), as such
/// if the block author is defined it will be defined as the equivocation
/// reporter.
#[weight
=
weight::
weight_for
_
report_equivocation::
<
T
>
()]
#[weight
=
weight_for
::
report_equivocation::
<
T
>
(
key_owner_proof
.
validator_count()
)]
fn
report_equivocation_unsigned
(
origin
,
equivocation_proof
:
EquivocationProof
<
T
::
Header
>
,
...
...
@@ -295,24 +295,35 @@ decl_module! {
}
}
mod
weight
{
mod
weight
_for
{
use
frame_support
::{
traits
::
Get
,
weights
::{
constants
::
WEIGHT_PER_MICROS
,
Weight
},
weights
::{
constants
::{
WEIGHT_PER_MICROS
,
WEIGHT_PER_NANOS
},
Weight
,
},
};
pub
fn
weight_for_report_equivocation
<
T
:
super
::
Trait
>
()
->
Weight
{
pub
fn
report_equivocation
<
T
:
super
::
Trait
>
(
validator_count
:
u32
)
->
Weight
{
// we take the validator set count from the membership proof to
// calculate the weight but we set a floor of 100 validators.
let
validator_count
=
validator_count
.max
(
100
)
as
u64
;
// worst case we are considering is that the given offender
// is backed by 200 nominators
const
MAX_NOMINATORS
:
u64
=
200
;
// checking membership proof
(
35
*
WEIGHT_PER_MICROS
)
.saturating_add
((
175
*
WEIGHT_PER_NANOS
)
.saturating_mul
(
validator_count
))
.saturating_add
(
T
::
DbWeight
::
get
()
.reads
(
5
))
// check equivocation proof
.saturating_add
(
110
*
WEIGHT_PER_MICROS
)
// report offence
.saturating_add
(
110
*
WEIGHT_PER_MICROS
)
// worst case we are considering is that the given offender
// is backed by 200 nominators
.saturating_add
(
T
::
DbWeight
::
get
()
.reads
(
14
+
3
*
200
))
.saturating_add
(
T
::
DbWeight
::
get
()
.writes
(
10
+
3
*
200
))
.saturating_add
(
25
*
WEIGHT_PER_MICROS
*
MAX_NOMINATORS
)
.saturating_add
(
T
::
DbWeight
::
get
()
.reads
(
14
+
3
*
MAX_NOMINATORS
))
.saturating_add
(
T
::
DbWeight
::
get
()
.writes
(
10
+
3
*
MAX_NOMINATORS
))
}
}
...
...
Prev
1
2
3
Next