Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
parity
Mirrored projects
polkadot
Commits
15cc9127
Unverified
Commit
15cc9127
authored
Apr 01, 2021
by
Andronik Ordian
Committed by
GitHub
Apr 01, 2021
Browse files
gossip: move authorities request to runtime api subsystem (#2798)
parent
ecb2920b
Pipeline
#132430
canceled with stages
Changes
9
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Cargo.lock
View file @
15cc9127
...
...
@@ -5636,9 +5636,7 @@ dependencies = [
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-api",
"sp-application-crypto",
"sp-authority-discovery",
"sp-keystore",
"tracing",
]
...
...
@@ -5890,6 +5888,7 @@ dependencies = [
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sp-api",
"sp-authority-discovery",
"sp-consensus-babe",
"sp-core",
"tracing",
...
...
node/core/runtime-api/Cargo.toml
View file @
15cc9127
...
...
@@ -11,6 +11,7 @@ memory-lru = "0.1.0"
parity-util-mem
=
{
version
=
"0.9.0"
,
default-features
=
false
}
sp-api
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
sp-authority-discovery
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
sp-core
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
sp-consensus-babe
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
...
...
node/core/runtime-api/src/cache.rs
View file @
15cc9127
...
...
@@ -19,6 +19,7 @@ use polkadot_primitives::v1::{
CoreState
,
GroupRotationInfo
,
InboundDownwardMessage
,
InboundHrmpMessage
,
Hash
,
PersistedValidationData
,
Id
as
ParaId
,
OccupiedCoreAssumption
,
SessionIndex
,
SessionInfo
,
ValidationCode
,
ValidatorId
,
ValidatorIndex
,
AuthorityDiscoveryId
,
};
use
sp_consensus_babe
::
Epoch
;
use
parity_util_mem
::{
MallocSizeOf
,
MallocSizeOfExt
};
...
...
@@ -28,6 +29,7 @@ use memory_lru::{MemoryLruCache, ResidentSize};
use
std
::
collections
::
btree_map
::
BTreeMap
;
const
AUTHORITIES_CACHE_SIZE
:
usize
=
128
*
1024
;
const
VALIDATORS_CACHE_SIZE
:
usize
=
64
*
1024
;
const
VALIDATOR_GROUPS_CACHE_SIZE
:
usize
=
64
*
1024
;
const
AVAILABILITY_CORES_CACHE_SIZE
:
usize
=
64
*
1024
;
...
...
@@ -59,7 +61,18 @@ impl<T> ResidentSize for DoesNotAllocate<T> {
}
}
// this is an ugly workaround for `AuthorityDiscoveryId`
// not implementing `MallocSizeOf`
struct
VecOfDoesNotAllocate
<
T
>
(
Vec
<
T
>
);
impl
<
T
>
ResidentSize
for
VecOfDoesNotAllocate
<
T
>
{
fn
resident_size
(
&
self
)
->
usize
{
std
::
mem
::
size_of
::
<
T
>
()
*
self
.0
.capacity
()
}
}
pub
(
crate
)
struct
RequestResultCache
{
authorities
:
MemoryLruCache
<
Hash
,
VecOfDoesNotAllocate
<
AuthorityDiscoveryId
>>
,
validators
:
MemoryLruCache
<
Hash
,
ResidentSizeOf
<
Vec
<
ValidatorId
>>>
,
validator_groups
:
MemoryLruCache
<
Hash
,
ResidentSizeOf
<
(
Vec
<
Vec
<
ValidatorIndex
>>
,
GroupRotationInfo
)
>>
,
availability_cores
:
MemoryLruCache
<
Hash
,
ResidentSizeOf
<
Vec
<
CoreState
>>>
,
...
...
@@ -79,6 +92,7 @@ pub(crate) struct RequestResultCache {
impl
Default
for
RequestResultCache
{
fn
default
()
->
Self
{
Self
{
authorities
:
MemoryLruCache
::
new
(
AUTHORITIES_CACHE_SIZE
),
validators
:
MemoryLruCache
::
new
(
VALIDATORS_CACHE_SIZE
),
validator_groups
:
MemoryLruCache
::
new
(
VALIDATOR_GROUPS_CACHE_SIZE
),
availability_cores
:
MemoryLruCache
::
new
(
AVAILABILITY_CORES_CACHE_SIZE
),
...
...
@@ -98,6 +112,14 @@ impl Default for RequestResultCache {
}
impl
RequestResultCache
{
pub
(
crate
)
fn
authorities
(
&
mut
self
,
relay_parent
:
&
Hash
)
->
Option
<&
Vec
<
AuthorityDiscoveryId
>>
{
self
.authorities
.get
(
relay_parent
)
.map
(|
v
|
&
v
.0
)
}
pub
(
crate
)
fn
cache_authorities
(
&
mut
self
,
relay_parent
:
Hash
,
authorities
:
Vec
<
AuthorityDiscoveryId
>
)
{
self
.authorities
.insert
(
relay_parent
,
VecOfDoesNotAllocate
(
authorities
));
}
pub
(
crate
)
fn
validators
(
&
mut
self
,
relay_parent
:
&
Hash
)
->
Option
<&
Vec
<
ValidatorId
>>
{
self
.validators
.get
(
relay_parent
)
.map
(|
v
|
&
v
.0
)
}
...
...
@@ -212,6 +234,7 @@ impl RequestResultCache {
}
pub
(
crate
)
enum
RequestResult
{
Authorities
(
Hash
,
Vec
<
AuthorityDiscoveryId
>
),
Validators
(
Hash
,
Vec
<
ValidatorId
>
),
ValidatorGroups
(
Hash
,
(
Vec
<
Vec
<
ValidatorIndex
>>
,
GroupRotationInfo
)),
AvailabilityCores
(
Hash
,
Vec
<
CoreState
>
),
...
...
node/core/runtime-api/src/lib.rs
View file @
15cc9127
...
...
@@ -34,6 +34,7 @@ use polkadot_node_subsystem_util::metrics::{self, prometheus};
use
polkadot_primitives
::
v1
::{
Block
,
BlockId
,
Hash
,
ParachainHost
};
use
sp_api
::
ProvideRuntimeApi
;
use
sp_authority_discovery
::
AuthorityDiscoveryApi
;
use
sp_core
::
traits
::
SpawnNamed
;
use
sp_consensus_babe
::
BabeApi
;
...
...
@@ -83,7 +84,7 @@ impl<Client> RuntimeApiSubsystem<Client> {
impl
<
Client
,
Context
>
Subsystem
<
Context
>
for
RuntimeApiSubsystem
<
Client
>
where
Client
:
ProvideRuntimeApi
<
Block
>
+
Send
+
'static
+
Sync
,
Client
::
Api
:
ParachainHost
<
Block
>
+
BabeApi
<
Block
>
,
Client
::
Api
:
ParachainHost
<
Block
>
+
BabeApi
<
Block
>
+
AuthorityDiscoveryApi
<
Block
>
,
Context
:
SubsystemContext
<
Message
=
RuntimeApiMessage
>
{
fn
start
(
self
,
ctx
:
Context
)
->
SpawnedSubsystem
{
...
...
@@ -96,12 +97,14 @@ impl<Client, Context> Subsystem<Context> for RuntimeApiSubsystem<Client> where
impl
<
Client
>
RuntimeApiSubsystem
<
Client
>
where
Client
:
ProvideRuntimeApi
<
Block
>
+
Send
+
'static
+
Sync
,
Client
::
Api
:
ParachainHost
<
Block
>
+
BabeApi
<
Block
>
,
Client
::
Api
:
ParachainHost
<
Block
>
+
BabeApi
<
Block
>
+
AuthorityDiscoveryApi
<
Block
>
,
{
fn
store_cache
(
&
mut
self
,
result
:
RequestResult
)
{
use
RequestResult
::
*
;
match
result
{
Authorities
(
relay_parent
,
authorities
)
=>
self
.requests_cache
.cache_authorities
(
relay_parent
,
authorities
),
Validators
(
relay_parent
,
validators
)
=>
self
.requests_cache
.cache_validators
(
relay_parent
,
validators
),
ValidatorGroups
(
relay_parent
,
groups
)
=>
...
...
@@ -160,6 +163,8 @@ impl<Client> RuntimeApiSubsystem<Client> where
}
match
request
{
Request
::
Authorities
(
sender
)
=>
query!
(
authorities
(),
sender
)
.map
(|
sender
|
Request
::
Authorities
(
sender
)),
Request
::
Validators
(
sender
)
=>
query!
(
validators
(),
sender
)
.map
(|
sender
|
Request
::
Validators
(
sender
)),
Request
::
ValidatorGroups
(
sender
)
=>
query!
(
validator_groups
(),
sender
)
...
...
@@ -263,7 +268,7 @@ async fn run<Client>(
mut
subsystem
:
RuntimeApiSubsystem
<
Client
>
,
)
->
SubsystemResult
<
()
>
where
Client
:
ProvideRuntimeApi
<
Block
>
+
Send
+
Sync
+
'static
,
Client
::
Api
:
ParachainHost
<
Block
>
+
BabeApi
<
Block
>
,
Client
::
Api
:
ParachainHost
<
Block
>
+
BabeApi
<
Block
>
+
AuthorityDiscoveryApi
<
Block
>
,
{
loop
{
select!
{
...
...
@@ -291,7 +296,7 @@ fn make_runtime_api_request<Client>(
)
->
Option
<
RequestResult
>
where
Client
:
ProvideRuntimeApi
<
Block
>
,
Client
::
Api
:
ParachainHost
<
Block
>
+
BabeApi
<
Block
>
,
Client
::
Api
:
ParachainHost
<
Block
>
+
BabeApi
<
Block
>
+
AuthorityDiscoveryApi
<
Block
>
,
{
let
_timer
=
metrics
.time_make_runtime_api_request
();
...
...
@@ -327,6 +332,7 @@ where
}
match
request
{
Request
::
Authorities
(
sender
)
=>
query!
(
Authorities
,
authorities
(),
sender
),
Request
::
Validators
(
sender
)
=>
query!
(
Validators
,
validators
(),
sender
),
Request
::
ValidatorGroups
(
sender
)
=>
query!
(
ValidatorGroups
,
validator_groups
(),
sender
),
Request
::
AvailabilityCores
(
sender
)
=>
query!
(
AvailabilityCores
,
availability_cores
(),
sender
),
...
...
@@ -416,7 +422,7 @@ mod tests {
ValidatorId
,
ValidatorIndex
,
GroupRotationInfo
,
CoreState
,
PersistedValidationData
,
Id
as
ParaId
,
OccupiedCoreAssumption
,
SessionIndex
,
ValidationCode
,
CommittedCandidateReceipt
,
CandidateEvent
,
InboundDownwardMessage
,
BlockNumber
,
InboundHrmpMessage
,
SessionInfo
,
BlockNumber
,
InboundHrmpMessage
,
SessionInfo
,
AuthorityDiscoveryId
,
};
use
polkadot_node_subsystem_test_helpers
as
test_helpers
;
use
sp_core
::
testing
::
TaskExecutor
;
...
...
@@ -428,6 +434,7 @@ mod tests {
#[derive(Default,
Clone)]
struct
MockRuntimeApi
{
authorities
:
Vec
<
AuthorityDiscoveryId
>
,
validators
:
Vec
<
ValidatorId
>
,
validator_groups
:
Vec
<
Vec
<
ValidatorIndex
>>
,
availability_cores
:
Vec
<
CoreState
>
,
...
...
@@ -582,6 +589,36 @@ mod tests {
None
}
}
impl
AuthorityDiscoveryApi
<
Block
>
for
MockRuntimeApi
{
fn
authorities
(
&
self
)
->
Vec
<
AuthorityDiscoveryId
>
{
self
.authorities
.clone
()
}
}
}
#[test]
fn
requests_authorities
()
{
let
(
ctx
,
mut
ctx_handle
)
=
test_helpers
::
make_subsystem_context
(
TaskExecutor
::
new
());
let
runtime_api
=
Arc
::
new
(
MockRuntimeApi
::
default
());
let
relay_parent
=
[
1
;
32
]
.into
();
let
spawner
=
sp_core
::
testing
::
TaskExecutor
::
new
();
let
subsystem
=
RuntimeApiSubsystem
::
new
(
runtime_api
.clone
(),
Metrics
(
None
),
spawner
);
let
subsystem_task
=
run
(
ctx
,
subsystem
)
.map
(|
x
|
x
.unwrap
());
let
test_task
=
async
move
{
let
(
tx
,
rx
)
=
oneshot
::
channel
();
ctx_handle
.send
(
FromOverseer
::
Communication
{
msg
:
RuntimeApiMessage
::
Request
(
relay_parent
,
Request
::
Authorities
(
tx
))
})
.await
;
assert_eq!
(
rx
.await
.unwrap
()
.unwrap
(),
runtime_api
.authorities
);
ctx_handle
.send
(
FromOverseer
::
Signal
(
OverseerSignal
::
Conclude
))
.await
;
};
futures
::
executor
::
block_on
(
future
::
join
(
subsystem_task
,
test_task
));
}
#[test]
...
...
node/network/gossip-support/Cargo.toml
View file @
15cc9127
...
...
@@ -5,10 +5,8 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition
=
"2018"
[dependencies]
sp-api
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
sp-application-crypto
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
sp-keystore
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
sp-authority-discovery
=
{
git
=
"https://github.com/paritytech/substrate"
,
branch
=
"master"
}
polkadot-node-network-protocol
=
{
path
=
"../protocol"
}
polkadot-node-subsystem
=
{
path
=
"../../subsystem"
}
...
...
node/network/gossip-support/src/lib.rs
View file @
15cc9127
...
...
@@ -19,9 +19,6 @@
//! the gossiping subsystems on every new session.
use
futures
::{
channel
::
mpsc
,
FutureExt
as
_
};
use
std
::
sync
::
Arc
;
use
sp_api
::
ProvideRuntimeApi
;
use
sp_authority_discovery
::
AuthorityDiscoveryApi
;
use
polkadot_node_subsystem
::{
messages
::{
GossipSupportMessage
,
...
...
@@ -34,7 +31,7 @@ use polkadot_node_subsystem_util::{
self
as
util
,
};
use
polkadot_primitives
::
v1
::{
Hash
,
SessionIndex
,
AuthorityDiscoveryId
,
Block
,
BlockId
,
Hash
,
SessionIndex
,
AuthorityDiscoveryId
,
};
use
polkadot_node_network_protocol
::{
peer_set
::
PeerSet
,
PeerId
};
use
sp_keystore
::{
CryptoStore
,
SyncCryptoStorePtr
};
...
...
@@ -43,8 +40,7 @@ use sp_application_crypto::{Public, AppKey};
const
LOG_TARGET
:
&
str
=
"parachain::gossip-support"
;
/// The Gossip Support subsystem.
pub
struct
GossipSupport
<
Client
>
{
client
:
Arc
<
Client
>
,
pub
struct
GossipSupport
{
keystore
:
SyncCryptoStorePtr
,
}
...
...
@@ -55,15 +51,10 @@ struct State {
_last_connection_request
:
Option
<
mpsc
::
Receiver
<
(
AuthorityDiscoveryId
,
PeerId
)
>>
,
}
impl
<
Client
>
GossipSupport
<
Client
>
where
Client
:
ProvideRuntimeApi
<
Block
>
,
Client
::
Api
:
AuthorityDiscoveryApi
<
Block
>
,
{
impl
GossipSupport
{
/// Create a new instance of the [`GossipSupport`] subsystem.
pub
fn
new
(
keystore
:
SyncCryptoStorePtr
,
client
:
Arc
<
Client
>
)
->
Self
{
pub
fn
new
(
keystore
:
SyncCryptoStorePtr
)
->
Self
{
Self
{
client
,
keystore
,
}
}
...
...
@@ -74,7 +65,7 @@ where
Context
:
SubsystemContext
<
Message
=
GossipSupportMessage
>
,
{
let
mut
state
=
State
::
default
();
let
Self
{
client
,
keystore
}
=
self
;
let
Self
{
keystore
}
=
self
;
loop
{
let
message
=
match
ctx
.recv
()
.await
{
Ok
(
message
)
=>
message
,
...
...
@@ -96,7 +87,7 @@ where
tracing
::
trace!
(
target
:
LOG_TARGET
,
"active leaves signal"
);
let
leaves
=
activated
.into_iter
()
.map
(|
a
|
a
.hash
);
if
let
Err
(
e
)
=
state
.handle_active_leaves
(
&
mut
ctx
,
client
.clone
(),
&
keystore
,
leaves
)
.await
{
if
let
Err
(
e
)
=
state
.handle_active_leaves
(
&
mut
ctx
,
&
keystore
,
leaves
)
.await
{
tracing
::
debug!
(
target
:
LOG_TARGET
,
error
=
?
e
);
}
}
...
...
@@ -109,18 +100,12 @@ where
}
}
async
fn
determine_relevant_authorities
<
Client
>
(
c
lient
:
Arc
<
Client
>
,
async
fn
determine_relevant_authorities
(
c
tx
:
&
mut
impl
SubsystemContext
,
relay_parent
:
Hash
,
)
->
Result
<
Vec
<
AuthorityDiscoveryId
>
,
util
::
Error
>
where
Client
:
ProvideRuntimeApi
<
Block
>
,
Client
::
Api
:
AuthorityDiscoveryApi
<
Block
>
,
{
let
api
=
client
.runtime_api
();
let
result
=
api
.authorities
(
&
BlockId
::
Hash
(
relay_parent
))
.map_err
(|
e
|
util
::
Error
::
RuntimeApi
(
format!
(
"{:?}"
,
e
)
.into
()));
result
)
->
Result
<
Vec
<
AuthorityDiscoveryId
>
,
util
::
Error
>
{
let
authorities
=
util
::
request_authorities_ctx
(
relay_parent
,
ctx
)
.await
?
.await
??
;
Ok
(
authorities
)
}
/// Return an error if we're not a validator in the given set (do not have keys).
...
...
@@ -143,17 +128,12 @@ impl State {
/// 1. Determine if the current session index has changed.
/// 2. If it has, determine relevant validators
/// and issue a connection request.
async
fn
handle_active_leaves
<
Client
>
(
async
fn
handle_active_leaves
(
&
mut
self
,
ctx
:
&
mut
impl
SubsystemContext
,
client
:
Arc
<
Client
>
,
keystore
:
&
SyncCryptoStorePtr
,
leaves
:
impl
Iterator
<
Item
=
Hash
>
,
)
->
Result
<
(),
util
::
Error
>
where
Client
:
ProvideRuntimeApi
<
Block
>
,
Client
::
Api
:
AuthorityDiscoveryApi
<
Block
>
,
{
)
->
Result
<
(),
util
::
Error
>
{
for
leaf
in
leaves
{
let
current_index
=
util
::
request_session_index_for_child_ctx
(
leaf
,
ctx
)
.await
?
.await
??
;
let
maybe_new_session
=
match
self
.last_session_index
{
...
...
@@ -163,7 +143,7 @@ impl State {
if
let
Some
((
new_session
,
relay_parent
))
=
maybe_new_session
{
tracing
::
debug!
(
target
:
LOG_TARGET
,
%
new_session
,
"New session detected"
);
let
authorities
=
determine_relevant_authorities
(
c
lient
.clone
()
,
relay_parent
)
.await
?
;
let
authorities
=
determine_relevant_authorities
(
c
tx
,
relay_parent
)
.await
?
;
ensure_i_am_an_authority
(
keystore
,
&
authorities
)
.await
?
;
tracing
::
debug!
(
target
:
LOG_TARGET
,
num
=
?
authorities
.len
(),
"Issuing a connection request"
);
...
...
@@ -182,11 +162,9 @@ impl State {
}
}
impl
<
Client
,
Context
>
Subsystem
<
Context
>
for
GossipSupport
<
Client
>
impl
<
Context
>
Subsystem
<
Context
>
for
GossipSupport
where
Context
:
SubsystemContext
<
Message
=
GossipSupportMessage
>
+
Sync
+
Send
,
Client
:
ProvideRuntimeApi
<
Block
>
+
Send
+
'static
+
Sync
,
Client
::
Api
:
AuthorityDiscoveryApi
<
Block
>
,
{
fn
start
(
self
,
ctx
:
Context
)
->
SpawnedSubsystem
{
let
future
=
self
.run
(
ctx
)
...
...
node/service/src/lib.rs
View file @
15cc9127
...
...
@@ -570,7 +570,6 @@ where
),
gossip_support
:
GossipSupportSubsystem
::
new
(
keystore
.clone
(),
runtime_client
.clone
(),
),
};
...
...
node/subsystem-util/src/lib.rs
View file @
15cc9127
...
...
@@ -39,6 +39,7 @@ use polkadot_primitives::v1::{
CandidateEvent
,
CommittedCandidateReceipt
,
CoreState
,
EncodeAs
,
PersistedValidationData
,
GroupRotationInfo
,
Hash
,
Id
as
ParaId
,
OccupiedCoreAssumption
,
SessionIndex
,
Signed
,
SigningContext
,
ValidationCode
,
ValidatorId
,
ValidatorIndex
,
SessionInfo
,
AuthorityDiscoveryId
,
};
use
sp_core
::{
traits
::
SpawnNamed
,
Public
};
use
sp_application_crypto
::
AppKey
;
...
...
@@ -166,6 +167,7 @@ macro_rules! specialize_requests {
}
specialize_requests!
{
fn
request_authorities
()
->
Vec
<
AuthorityDiscoveryId
>
;
Authorities
;
fn
request_validators
()
->
Vec
<
ValidatorId
>
;
Validators
;
fn
request_validator_groups
()
->
(
Vec
<
Vec
<
ValidatorIndex
>>
,
GroupRotationInfo
);
ValidatorGroups
;
fn
request_availability_cores
()
->
Vec
<
CoreState
>
;
AvailabilityCores
;
...
...
@@ -247,6 +249,7 @@ macro_rules! specialize_requests_ctx {
}
specialize_requests_ctx!
{
fn
request_authorities_ctx
()
->
Vec
<
AuthorityDiscoveryId
>
;
Authorities
;
fn
request_validators_ctx
()
->
Vec
<
ValidatorId
>
;
Validators
;
fn
request_validator_groups_ctx
()
->
(
Vec
<
Vec
<
ValidatorIndex
>>
,
GroupRotationInfo
);
ValidatorGroups
;
fn
request_availability_cores_ctx
()
->
Vec
<
CoreState
>
;
AvailabilityCores
;
...
...
node/subsystem/src/messages.rs
View file @
15cc9127
...
...
@@ -437,6 +437,8 @@ pub type RuntimeApiSender<T> = oneshot::Sender<Result<T, crate::errors::RuntimeA
/// A request to the Runtime API subsystem.
#[derive(Debug)]
pub
enum
RuntimeApiRequest
{
/// Get the next, current and some previous authority discovery set deduplicated.
Authorities
(
RuntimeApiSender
<
Vec
<
AuthorityDiscoveryId
>>
),
/// Get the current validator set.
Validators
(
RuntimeApiSender
<
Vec
<
ValidatorId
>>
),
/// Get the validator groups and group rotation info.
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment