Unverified Commit 3a17550c authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Make sure we send the validator key to collators on status (#968)

Before the validator only send the keys if it was updated and thus the
collators would "never" be informed about the key of the validator.
parent 42ee7471
Pipeline #86301 canceled with stages
in 9 minutes and 19 seconds
...@@ -130,7 +130,7 @@ enum BackgroundToWorkerMsg { ...@@ -130,7 +130,7 @@ enum BackgroundToWorkerMsg {
} }
/// Operations that a handle to an underlying network service should provide. /// Operations that a handle to an underlying network service should provide.
trait NetworkServiceOps: Send + Sync { pub trait NetworkServiceOps: Send + Sync {
/// Report the peer as having a particular positive or negative value. /// Report the peer as having a particular positive or negative value.
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange); fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange);
...@@ -193,10 +193,18 @@ impl GossipOps for RegisteredMessageValidator { ...@@ -193,10 +193,18 @@ impl GossipOps for RegisteredMessageValidator {
} }
/// An async handle to the network service. /// An async handle to the network service.
#[derive(Clone)] pub struct Service<N = PolkadotNetworkService> {
pub struct Service {
sender: mpsc::Sender<ServiceToWorkerMsg>, sender: mpsc::Sender<ServiceToWorkerMsg>,
network_service: Arc<dyn NetworkServiceOps>, network_service: Arc<N>,
}
impl<N> Clone for Service<N> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
network_service: self.network_service.clone(),
}
}
} }
/// Registers the protocol. /// Registers the protocol.
...@@ -209,7 +217,7 @@ pub fn start<C, Api, SP>( ...@@ -209,7 +217,7 @@ pub fn start<C, Api, SP>(
chain_context: C, chain_context: C,
api: Arc<Api>, api: Arc<Api>,
executor: SP, executor: SP,
) -> Result<Service, futures::task::SpawnError> where ) -> Result<Service<PolkadotNetworkService>, futures::task::SpawnError> where
C: ChainContext + 'static, C: ChainContext + 'static,
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static, Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>, Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
...@@ -292,14 +300,14 @@ pub fn start<C, Api, SP>( ...@@ -292,14 +300,14 @@ pub fn start<C, Api, SP>(
} }
/// The Polkadot protocol status message. /// The Polkadot protocol status message.
#[derive(Debug, Encode, Decode)] #[derive(Debug, Encode, Decode, PartialEq)]
pub struct Status { pub struct Status {
version: u32, // protocol version. version: u32, // protocol version.
collating_for: Option<(CollatorId, ParaId)>, collating_for: Option<(CollatorId, ParaId)>,
} }
/// Polkadot-specific messages from peer to peer. /// Polkadot-specific messages from peer to peer.
#[derive(Debug, Encode, Decode)] #[derive(Debug, Encode, Decode, PartialEq)]
pub enum Message { pub enum Message {
/// Exchange status with a peer. This should be the first message sent. /// Exchange status with a peer. This should be the first message sent.
#[codec(index = "0")] #[codec(index = "0")]
...@@ -451,6 +459,11 @@ impl RecentValidatorIds { ...@@ -451,6 +459,11 @@ impl RecentValidatorIds {
fn as_slice(&self) -> &[ValidatorId] { fn as_slice(&self) -> &[ValidatorId] {
&*self.inner &*self.inner
} }
/// Returns the last inserted session key.
fn latest(&self) -> Option<&ValidatorId> {
self.inner.last()
}
} }
struct ProtocolHandler { struct ProtocolHandler {
...@@ -582,7 +595,19 @@ impl ProtocolHandler { ...@@ -582,7 +595,19 @@ impl ProtocolHandler {
let role = self.collators let role = self.collators
.on_new_collator(collator_id, para_id, remote.clone()); .on_new_collator(collator_id, para_id, remote.clone());
let service = &self.service; let service = &self.service;
let send_key = peer.should_send_key();
if let Some(c_state) = peer.collator_state_mut() { if let Some(c_state) = peer.collator_state_mut() {
if send_key {
if let Some(key) = self.local_keys.latest() {
c_state.send_key(key.clone(), |msg| service.write_notification(
remote.clone(),
POLKADOT_ENGINE_ID,
msg.encode(),
));
}
}
c_state.set_role(role, |msg| service.write_notification( c_state.set_role(role, |msg| service.write_notification(
remote.clone(), remote.clone(),
POLKADOT_ENGINE_ID, POLKADOT_ENGINE_ID,
...@@ -1323,7 +1348,7 @@ struct RouterInner { ...@@ -1323,7 +1348,7 @@ struct RouterInner {
sender: mpsc::Sender<ServiceToWorkerMsg>, sender: mpsc::Sender<ServiceToWorkerMsg>,
} }
impl Service { impl<N: NetworkServiceOps> Service<N> {
/// Register an availablility-store that the network can query. /// Register an availablility-store that the network can query.
pub fn register_availability_store(&self, store: av_store::Store) { pub fn register_availability_store(&self, store: av_store::Store) {
let _ = self.sender.clone() let _ = self.sender.clone()
...@@ -1373,7 +1398,7 @@ impl Service { ...@@ -1373,7 +1398,7 @@ impl Service {
} }
} }
impl ParachainNetwork for Service { impl<N> ParachainNetwork for Service<N> {
type Error = mpsc::SendError; type Error = mpsc::SendError;
type TableRouter = Router; type TableRouter = Router;
type BuildTableRouter = Pin<Box<dyn Future<Output=Result<Router,Self::Error>> + Send>>; type BuildTableRouter = Pin<Box<dyn Future<Output=Result<Router,Self::Error>> + Send>>;
...@@ -1403,7 +1428,7 @@ impl ParachainNetwork for Service { ...@@ -1403,7 +1428,7 @@ impl ParachainNetwork for Service {
} }
} }
impl Collators for Service { impl<N> Collators for Service<N> {
type Error = future::Either<mpsc::SendError, oneshot::Canceled>; type Error = future::Either<mpsc::SendError, oneshot::Canceled>;
type Collation = Pin<Box<dyn Future<Output = Result<Collation, Self::Error>> + Send>>; type Collation = Pin<Box<dyn Future<Output = Result<Collation, Self::Error>> + Send>>;
...@@ -1425,7 +1450,7 @@ impl Collators for Service { ...@@ -1425,7 +1450,7 @@ impl Collators for Service {
} }
} }
impl av_store::ErasureNetworking for Service { impl<N> av_store::ErasureNetworking for Service<N> {
type Error = future::Either<mpsc::SendError, oneshot::Canceled>; type Error = future::Either<mpsc::SendError, oneshot::Canceled>;
fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32) fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32)
......
...@@ -37,7 +37,7 @@ use futures::executor::LocalPool; ...@@ -37,7 +37,7 @@ use futures::executor::LocalPool;
use futures::task::LocalSpawnExt; use futures::task::LocalSpawnExt;
#[derive(Default)] #[derive(Default)]
struct MockNetworkOps { pub struct MockNetworkOps {
recorded: Mutex<Recorded>, recorded: Mutex<Recorded>,
} }
...@@ -188,7 +188,7 @@ sp_api::mock_impl_runtime_apis! { ...@@ -188,7 +188,7 @@ sp_api::mock_impl_runtime_apis! {
} }
} }
impl super::Service { impl super::Service<MockNetworkOps> {
async fn connect_peer(&mut self, peer: PeerId, roles: Roles) { async fn connect_peer(&mut self, peer: PeerId, roles: Roles) {
self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap(); self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap();
} }
...@@ -222,7 +222,7 @@ impl super::Service { ...@@ -222,7 +222,7 @@ impl super::Service {
} }
fn test_setup(config: Config) -> ( fn test_setup(config: Config) -> (
Service, Service<MockNetworkOps>,
MockGossip, MockGossip,
LocalPool, LocalPool,
impl Future<Output = ()> + 'static, impl Future<Output = ()> + 'static,
...@@ -264,7 +264,7 @@ fn worker_task_shuts_down_when_sender_dropped() { ...@@ -264,7 +264,7 @@ fn worker_task_shuts_down_when_sender_dropped() {
/// is handled. This helper functions checks multiple times that the given instance is dropped. Even /// is handled. This helper functions checks multiple times that the given instance is dropped. Even
/// if the first round fails, the second one should be successful as the consensus instance drop /// if the first round fails, the second one should be successful as the consensus instance drop
/// should be already handled this time. /// should be already handled this time.
fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) { fn wait_for_instance_drop(service: &mut Service<MockNetworkOps>, pool: &mut LocalPool, instance: Hash) {
let mut try_counter = 0; let mut try_counter = 0;
let max_tries = 3; let max_tries = 3;
...@@ -363,7 +363,6 @@ fn collation_is_received_with_dropped_router() { ...@@ -363,7 +363,6 @@ fn collation_is_received_with_dropped_router() {
}))); })));
} }
#[test] #[test]
fn validator_peer_cleaned_up() { fn validator_peer_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
...@@ -575,3 +574,32 @@ fn fetches_pov_block_from_gossip() { ...@@ -575,3 +574,32 @@ fn fetches_pov_block_from_gossip() {
pool.run_until(test_work).unwrap(); pool.run_until(test_work).unwrap();
} }
#[test]
fn validator_sends_key_to_collator_on_status() {
let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let peer = PeerId::random();
let peer_clone = peer.clone();
let validator_key = Sr25519Keyring::Alice.pair();
let validator_id = ValidatorId::from(validator_key.public());
let validator_id_clone = validator_id.clone();
let collator_id = CollatorId::from(Sr25519Keyring::Bob.public());
let para_id = ParaId::from(100);
let mut service_clone = service.clone();
pool.spawner().spawn_local(worker_task).unwrap();
pool.run_until(async move {
service_clone.synchronize(move |proto| { proto.local_keys.insert(validator_id_clone); }).await;
service_clone.connect_peer(peer_clone.clone(), Roles::AUTHORITY).await;
service_clone.peer_message(peer_clone.clone(), Message::Status(Status {
version: VERSION,
collating_for: Some((collator_id, para_id)),
})).await;
});
let expected_msg = Message::ValidatorId(validator_id.clone());
assert!(service.network_service.recorded.lock().notifications.iter().any(|(p, notification)| {
peer == *p && *notification == expected_msg
}));
}
...@@ -25,7 +25,7 @@ use std::time::Duration; ...@@ -25,7 +25,7 @@ use std::time::Duration;
use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance}; use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance};
#[cfg(feature = "full-node")] #[cfg(feature = "full-node")]
use polkadot_network::{legacy::gossip::Known, protocol as network_protocol}; use polkadot_network::{legacy::gossip::Known, protocol as network_protocol};
use service::{error::{Error as ServiceError}, ServiceBuilder}; use service::{error::Error as ServiceError, ServiceBuilder};
use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider};
use inherents::InherentDataProviders; use inherents::InherentDataProviders;
use sc_executor::native_executor_instance; use sc_executor::native_executor_instance;
...@@ -103,11 +103,9 @@ where ...@@ -103,11 +103,9 @@ where
<Self as sp_api::ApiExt<Block>>::StateBackend: sp_api::StateBackend<BlakeTwo256>, <Self as sp_api::ApiExt<Block>>::StateBackend: sp_api::StateBackend<BlakeTwo256>,
{} {}
pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static pub trait RuntimeExtrinsic: codec::Codec + Send + Sync + 'static {}
{}
impl<E> RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static impl<E> RuntimeExtrinsic for E where E: codec::Codec + Send + Sync + 'static {}
{}
/// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network. /// Can be called for a `Configuration` to check if it is a configuration for the `Kusama` network.
pub trait IsKusama { pub trait IsKusama {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment