Unverified Commit 639dfd67 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Rework consensus instance communication with the network worker (#958)

Up to now consensus instances used the main channel to communicate with
the background network worker. This lead to a race condition when
sending a local collation and dropping the router before driving the
send local collation future until it is finished. This pr changes the
communication between worker and the instances to use their own
channels. This has the advantage that we don't need an extra
`DropConsensusNetworking` message as the network is dropped
automatically when the last sender is dropped.
parent d6c5acc1
Pipeline #85857 passed with stages
in 25 minutes and 4 seconds
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
//! This manages routing for parachain statements, parachain block and outgoing message //! This manages routing for parachain statements, parachain block and outgoing message
//! data fetching, communication between collators and validators, and more. //! data fetching, communication between collators and validators, and more.
#![recursion_limit="256"]
use polkadot_primitives::{Block, Hash, BlakeTwo256, HashT}; use polkadot_primitives::{Block, Hash, BlakeTwo256, HashT};
pub mod legacy; pub mod legacy;
......
...@@ -26,7 +26,8 @@ use codec::{Decode, Encode}; ...@@ -26,7 +26,8 @@ use codec::{Decode, Encode};
use futures::channel::{mpsc, oneshot}; use futures::channel::{mpsc, oneshot};
use futures::future::Either; use futures::future::Either;
use futures::prelude::*; use futures::prelude::*;
use futures::task::{Spawn, SpawnExt}; use futures::task::{Spawn, SpawnExt, Context, Poll};
use futures::stream::{FuturesUnordered, StreamFuture};
use log::{debug, trace}; use log::{debug, trace};
use polkadot_primitives::{ use polkadot_primitives::{
...@@ -76,8 +77,7 @@ enum ServiceToWorkerMsg { ...@@ -76,8 +77,7 @@ enum ServiceToWorkerMsg {
PeerDisconnected(PeerId), PeerDisconnected(PeerId),
// service messages. // service messages.
BuildConsensusNetworking(Arc<SharedTable>, Vec<ValidatorId>), BuildConsensusNetworking(mpsc::Receiver<ServiceToWorkerMsg>, Arc<SharedTable>, Vec<ValidatorId>),
DropConsensusNetworking(Hash),
SubmitValidatedCollation( SubmitValidatedCollation(
AbridgedCandidateReceipt, AbridgedCandidateReceipt,
PoVBlock, PoVBlock,
...@@ -782,6 +782,21 @@ fn send_peer_collations( ...@@ -782,6 +782,21 @@ fn send_peer_collations(
} }
} }
/// Receives messages associated to a certain consensus networking instance.
struct ConsensusNetworkingReceiver {
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
/// The relay parent of this consensus network.
relay_parent: Hash,
}
impl Stream for ConsensusNetworkingReceiver {
type Item = ServiceToWorkerMsg;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_next(cx)
}
}
struct Worker<Api, Sp, Gossip> { struct Worker<Api, Sp, Gossip> {
protocol_handler: ProtocolHandler, protocol_handler: ProtocolHandler,
api: Arc<Api>, api: Arc<Api>,
...@@ -790,6 +805,7 @@ struct Worker<Api, Sp, Gossip> { ...@@ -790,6 +805,7 @@ struct Worker<Api, Sp, Gossip> {
background_to_main_sender: mpsc::Sender<BackgroundToWorkerMsg>, background_to_main_sender: mpsc::Sender<BackgroundToWorkerMsg>,
background_receiver: mpsc::Receiver<BackgroundToWorkerMsg>, background_receiver: mpsc::Receiver<BackgroundToWorkerMsg>,
service_receiver: mpsc::Receiver<ServiceToWorkerMsg>, service_receiver: mpsc::Receiver<ServiceToWorkerMsg>,
consensus_networking_receivers: FuturesUnordered<StreamFuture<ConsensusNetworkingReceiver>>,
} }
impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
...@@ -801,6 +817,7 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where ...@@ -801,6 +817,7 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
// spawns a background task to spawn consensus networking. // spawns a background task to spawn consensus networking.
fn build_consensus_networking( fn build_consensus_networking(
&mut self, &mut self,
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
table: Arc<SharedTable>, table: Arc<SharedTable>,
authorities: Vec<ValidatorId>, authorities: Vec<ValidatorId>,
) { ) {
...@@ -832,6 +849,9 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where ...@@ -832,6 +849,9 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
}, },
); );
let relay_parent = table.signing_context().parent_hash;
self.consensus_networking_receivers.push(ConsensusNetworkingReceiver { receiver, relay_parent }.into_future());
// glue the incoming messages, shared table, and validation // glue the incoming messages, shared table, and validation
// work together. // work together.
let _ = self.executor.spawn(statement_import_loop( let _ = self.executor.spawn(statement_import_loop(
...@@ -855,12 +875,8 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where ...@@ -855,12 +875,8 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
ServiceToWorkerMsg::PeerMessage(remote, messages) => { ServiceToWorkerMsg::PeerMessage(remote, messages) => {
self.protocol_handler.on_raw_messages(remote, messages) self.protocol_handler.on_raw_messages(remote, messages)
} }
ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities) => {
ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => { self.build_consensus_networking(receiver, table, authorities);
self.build_consensus_networking(table, authorities);
}
ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => {
self.protocol_handler.drop_consensus_networking(&relay_parent);
} }
ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => { ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => {
let relay_parent = receipt.relay_parent; let relay_parent = receipt.relay_parent;
...@@ -985,6 +1001,16 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where ...@@ -985,6 +1001,16 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
Some(msg) => self.handle_service_message(msg), Some(msg) => self.handle_service_message(msg),
None => return, None => return,
}, },
consensus_service_msg = self.consensus_networking_receivers.next() => match consensus_service_msg {
Some((Some(msg), receiver)) => {
self.handle_service_message(msg);
self.consensus_networking_receivers.push(receiver.into_future());
},
Some((None, receiver)) => {
self.protocol_handler.drop_consensus_networking(&receiver.relay_parent);
},
None => {},
},
background_msg = self.background_receiver.next() => match background_msg { background_msg = self.background_receiver.next() => match background_msg {
Some(msg) => self.handle_background_message(msg), Some(msg) => self.handle_background_message(msg),
None => return, None => return,
...@@ -1017,6 +1043,7 @@ async fn worker_loop<Api, Sp>( ...@@ -1017,6 +1043,7 @@ async fn worker_loop<Api, Sp>(
background_to_main_sender: background_tx, background_to_main_sender: background_tx,
background_receiver: background_rx, background_receiver: background_rx,
service_receiver: receiver, service_receiver: receiver,
consensus_networking_receivers: Default::default(),
}; };
worker.main_loop().await worker.main_loop().await
...@@ -1296,24 +1323,6 @@ struct RouterInner { ...@@ -1296,24 +1323,6 @@ struct RouterInner {
sender: mpsc::Sender<ServiceToWorkerMsg>, sender: mpsc::Sender<ServiceToWorkerMsg>,
} }
impl Drop for RouterInner {
fn drop(&mut self) {
let res = self.sender.try_send(
ServiceToWorkerMsg::DropConsensusNetworking(self.relay_parent)
);
if let Err(e) = res {
assert!(
!e.is_full(),
"futures 0.3 guarantees at least one free slot in the capacity \
per sender; this is the first message sent via this sender; \
therefore we will not have to wait for capacity; qed"
);
// other error variants (disconnection) are fine here.
}
}
}
impl Service { impl Service {
/// Register an availablility-store that the network can query. /// Register an availablility-store that the network can query.
pub fn register_availability_store(&self, store: av_store::Store) { pub fn register_availability_store(&self, store: av_store::Store) {
...@@ -1379,14 +1388,15 @@ impl ParachainNetwork for Service { ...@@ -1379,14 +1388,15 @@ impl ParachainNetwork for Service {
let relay_parent = table.signing_context().parent_hash.clone(); let relay_parent = table.signing_context().parent_hash.clone();
Box::pin(async move { Box::pin(async move {
let (router_sender, receiver) = mpsc::channel(0);
sender.send( sender.send(
ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities)
).await?; ).await?;
Ok(Router { Ok(Router {
inner: Arc::new(RouterInner { inner: Arc::new(RouterInner {
relay_parent, relay_parent,
sender, sender: router_sender,
}) })
}) })
}) })
......
...@@ -54,6 +54,7 @@ type GossipStreamEntry = (mpsc::UnboundedReceiver<TopicNotification>, oneshot::S ...@@ -54,6 +54,7 @@ type GossipStreamEntry = (mpsc::UnboundedReceiver<TopicNotification>, oneshot::S
#[derive(Default, Clone)] #[derive(Default, Clone)]
struct MockGossip { struct MockGossip {
inner: Arc<Mutex<HashMap<Hash, GossipStreamEntry>>>, inner: Arc<Mutex<HashMap<Hash, GossipStreamEntry>>>,
gossip_messages: Arc<Mutex<HashMap<Hash, GossipMessage>>>,
} }
impl MockGossip { impl MockGossip {
...@@ -102,8 +103,8 @@ impl crate::legacy::GossipService for MockGossip { ...@@ -102,8 +103,8 @@ impl crate::legacy::GossipService for MockGossip {
}) })
} }
fn gossip_message(&self, _topic: Hash, _message: GossipMessage) { fn gossip_message(&self, topic: Hash, message: GossipMessage) {
self.gossip_messages.lock().insert(topic, message);
} }
fn send_message(&self, _who: PeerId, _message: GossipMessage) { fn send_message(&self, _who: PeerId, _message: GossipMessage) {
...@@ -250,22 +251,6 @@ fn test_setup(config: Config) -> ( ...@@ -250,22 +251,6 @@ fn test_setup(config: Config) -> (
(service, mock_gossip, pool, worker_task) (service, mock_gossip, pool, worker_task)
} }
#[test]
fn router_inner_drop_sends_worker_message() {
let parent = [1; 32].into();
let (sender, mut receiver) = mpsc::channel(0);
drop(RouterInner {
relay_parent: parent,
sender,
});
match receiver.try_next() {
Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x),
_ => panic!("message not sent"),
}
}
#[test] #[test]
fn worker_task_shuts_down_when_sender_dropped() { fn worker_task_shuts_down_when_sender_dropped() {
let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
...@@ -274,6 +259,30 @@ fn worker_task_shuts_down_when_sender_dropped() { ...@@ -274,6 +259,30 @@ fn worker_task_shuts_down_when_sender_dropped() {
let _ = pool.run_until(worker_task); let _ = pool.run_until(worker_task);
} }
/// Given the async nature of `select!` that is being used in the main loop of the worker
/// and that consensus instances use their own channels, we don't know when the synchronize message
/// is handled. This helper functions checks multiple times that the given instance is dropped. Even
/// if the first round fails, the second one should be successful as the consensus instance drop
/// should be already handled this time.
fn wait_for_instance_drop(service: &mut Service, pool: &mut LocalPool, instance: Hash) {
let mut try_counter = 0;
let max_tries = 3;
while try_counter < max_tries {
let dropped = pool.run_until(service.synchronize(move |proto| {
!proto.consensus_instances.contains_key(&instance)
}));
if dropped {
return;
}
try_counter += 1;
}
panic!("Consensus instance `{}` wasn't dropped!", instance);
}
#[test] #[test]
fn consensus_instances_cleaned_up() { fn consensus_instances_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
...@@ -300,11 +309,61 @@ fn consensus_instances_cleaned_up() { ...@@ -300,11 +309,61 @@ fn consensus_instances_cleaned_up() {
drop(router); drop(router);
wait_for_instance_drop(&mut service, &mut pool, relay_parent);
}
#[test]
fn collation_is_received_with_dropped_router() {
let (mut service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
let relay_parent = [0; 32].into();
let topic = crate::legacy::gossip::attestation_topic(relay_parent);
let signing_context = SigningContext {
session_index: Default::default(),
parent_hash: relay_parent,
};
let table = Arc::new(SharedTable::new(
vec![Sr25519Keyring::Alice.public().into()],
HashMap::new(),
Some(Arc::new(Sr25519Keyring::Alice.pair().into())),
signing_context,
AvailabilityStore::new_in_memory(service.clone()),
None,
));
pool.spawner().spawn_local(worker_task).unwrap();
let router = pool.run_until(
service.build_table_router(table, &[])
).unwrap();
let receipt = AbridgedCandidateReceipt { relay_parent, ..Default::default() };
let local_collation_future = router.local_collation(
receipt,
PoVBlock { block_data: BlockData(Vec::new()) },
(0, &[]),
);
// Drop the router and make sure that the consensus instance is still alive
drop(router);
assert!(pool.run_until(service.synchronize(move |proto| { assert!(pool.run_until(service.synchronize(move |proto| {
!proto.consensus_instances.contains_key(&relay_parent) proto.consensus_instances.contains_key(&relay_parent)
})));
// The gossip message should still be unknown
assert!(!gossip.gossip_messages.lock().contains_key(&topic));
pool.run_until(local_collation_future).unwrap();
// Make sure the instance is now dropped and the message was gossiped
wait_for_instance_drop(&mut service, &mut pool, relay_parent);
assert!(pool.run_until(service.synchronize(move |_| {
gossip.gossip_messages.lock().contains_key(&topic)
}))); })));
} }
#[test] #[test]
fn validator_peer_cleaned_up() { fn validator_peer_cleaned_up() {
let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None });
......
...@@ -75,9 +75,10 @@ pub type ValidatorId = validator_app::Public; ...@@ -75,9 +75,10 @@ pub type ValidatorId = validator_app::Public;
/// Index of the validator is used as a lightweight replacement of the `ValidatorId` when appropriate. /// Index of the validator is used as a lightweight replacement of the `ValidatorId` when appropriate.
pub type ValidatorIndex = u32; pub type ValidatorIndex = u32;
/// A Parachain validator keypair. application_crypto::with_pair! {
#[cfg(feature = "std")] /// A Parachain validator keypair.
pub type ValidatorPair = validator_app::Pair; pub type ValidatorPair = validator_app::Pair;
}
/// Signature with which parachain validators sign blocks. /// Signature with which parachain validators sign blocks.
/// ///
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment