From 9fa5d28e0be79d66b5f2d0245fdae7fc44ff8165 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= <bkchr@users.noreply.github.com> Date: Mon, 21 Jun 2021 21:51:31 +0200 Subject: [PATCH] Only fetch one collation at a time per relay parent (#3333) * Only fetch one collation at a time per relay parent Before a validator would fetch all collations that were advertised to him. This pr changes the behavior to always just fetch one collation at a time. If fetching fails, the validator will start fetching one of the other collations. * Use enum to be more explicit * Review comments --- .../src/validator_side/mod.rs | 206 +++-- .../src/validator_side/tests.rs | 864 ++++++------------ .../protocol/src/request_response/mod.rs | 2 +- .../protocol/src/request_response/request.rs | 5 +- 4 files changed, 396 insertions(+), 681 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs index 5070402edab..51890d5fa6d 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs @@ -164,6 +164,7 @@ struct PerRequest { span: Option<jaeger::Span>, } +#[derive(Debug)] struct CollatingPeerState { collator_id: CollatorId, para_id: ParaId, @@ -172,6 +173,7 @@ struct CollatingPeerState { last_active: Instant, } +#[derive(Debug)] enum PeerState { // The peer has connected at the given instant. Connected(Instant), @@ -186,6 +188,7 @@ enum AdvertisementError { UndeclaredCollator, } +#[derive(Debug)] struct PeerData { view: View, state: PeerState, @@ -465,8 +468,7 @@ struct PendingCollation { impl PendingCollation { fn new(relay_parent: Hash, para_id: &ParaId, peer_id: &PeerId) -> Self { - let commitments_hash = None; - Self { relay_parent, para_id: para_id.clone(), peer_id: peer_id.clone(), commitments_hash } + Self { relay_parent, para_id: para_id.clone(), peer_id: peer_id.clone(), commitments_hash: None } } } @@ -477,6 +479,32 @@ type PendingCollationFetch = ( std::result::Result<(CandidateReceipt, PoV), oneshot::Canceled>, ); +/// The status of the collations in [`CollationsPerRelayParent`]. +#[derive(Debug)] +enum CollationStatus { + /// We are waiting for a collation to be advertised to us. + Waiting, + /// We are currently fetching a collation. + Fetching, + /// We have seconded a collation. + Seconded, +} + +impl Default for CollationStatus { + fn default() -> Self { + Self::Waiting + } +} + +/// Information about collations per relay parent. +#[derive(Default)] +struct CollationsPerRelayParent { + /// What is the current status in regards to a collation for this relay parent? + status: CollationStatus, + /// Collation that were advertised to us, but we did not yet fetch. + unfetched_collations: Vec<(PendingCollation, CollatorId)>, +} + /// All state relevant for the validator side of the protocol lives here. #[derive(Default)] struct State { @@ -503,7 +531,10 @@ struct State { span_per_relay_parent: HashMap<Hash, PerLeafSpan>, /// Keep track of all fetch collation requests - collations: FuturesUnordered<BoxFuture<'static, PendingCollationFetch>>, + collation_fetches: FuturesUnordered<BoxFuture<'static, PendingCollationFetch>>, + + /// Information about the collations per relay parent. + collations_per_relay_parent: HashMap<Hash, CollationsPerRelayParent>, /// Keep track of all pending candidate collations pending_candidates: HashMap<Hash, CollationEvent>, @@ -528,19 +559,20 @@ async fn disconnect_peer(ctx: &mut impl SubsystemContext, peer_id: PeerId) { } /// Another subsystem has requested to fetch collations on a particular leaf for some para. -async fn fetch_collation<Context>( - ctx: &mut Context, +async fn fetch_collation( + ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>, state: &mut State, pc: PendingCollation, - tx: oneshot::Sender<(CandidateReceipt, PoV)> -) -where - Context: SubsystemContext<Message = CollatorProtocolMessage> -{ + id: CollatorId, +) { + let (tx, rx) = oneshot::channel(); + let PendingCollation { relay_parent, para_id, peer_id, .. } = pc; if state.peer_data.get(&peer_id).map_or(false, |d| d.has_advertised(&relay_parent)) { request_collation(ctx, state, relay_parent, para_id, peer_id, tx).await; } + + state.collation_fetches.push(rx.map(|r| ((id, pc), r)).boxed()); } /// Report a collator for some malicious actions. @@ -770,22 +802,26 @@ where ?relay_parent, "Received advertise collation", ); - let (tx, rx) = oneshot::channel::<( - CandidateReceipt, - PoV, - )>(); let pending_collation = PendingCollation::new( relay_parent, ¶_id, &origin, ); - fetch_collation(ctx, state, pending_collation.clone(), tx).await; - let future = rx.map(|r| - ((id, pending_collation), r) - ); - state.collations.push(Box::pin(future)); + let collations = state.collations_per_relay_parent.entry(relay_parent).or_default(); + + match collations.status { + CollationStatus::Fetching => + collations.unfetched_collations.push((pending_collation, id)), + CollationStatus::Waiting => { + collations.status = CollationStatus::Fetching; + drop(collations); + + fetch_collation(ctx, state, pending_collation.clone(), id).await; + }, + CollationStatus::Seconded => {}, + } } Err(error) => { tracing::debug!( @@ -824,6 +860,8 @@ async fn remove_relay_parent( state.pending_candidates.retain(|k, _| { k != &relay_parent }); + + state.collations_per_relay_parent.remove(&relay_parent); Ok(()) } @@ -973,6 +1011,10 @@ where let PendingCollation { relay_parent, peer_id, .. } = pending_collation; note_good_collation(ctx, &state.peer_data, collator_id).await; notify_collation_seconded(ctx, peer_id, relay_parent, stmt).await; + + if let Some(collations) = state.collations_per_relay_parent.get_mut(&parent) { + collations.status = CollationStatus::Seconded; + } } else { tracing::debug!( target: LOG_TARGET, @@ -982,12 +1024,11 @@ where } } Invalid(parent, candidate_receipt) => { - if match state.pending_candidates.get(&parent) { - Some(collation_event) - if Some(candidate_receipt.commitments_hash) == collation_event.1.commitments_hash - => true, - _ => false, - } { + if state.pending_candidates + .get(&parent) + .map(|e| e.1.commitments_hash == Some(candidate_receipt.commitments_hash)) + .unwrap_or_default() + { if let Some((id, _)) = state.pending_candidates.remove(&parent) { report_collator(ctx, &state.peer_data, id).await; } @@ -1022,7 +1063,6 @@ pub(crate) async fn run<Context>( let mut state = State { metrics, - ..Default::default() }; @@ -1053,53 +1093,9 @@ pub(crate) async fn run<Context>( _ = next_inactivity_stream.next() => { disconnect_inactive_peers(&mut ctx, &eviction_policy, &state.peer_data).await; } - res = state.collations.next() => { - // If no prior collation for this relay parent has been seconded, then - // memoize the collation_event for that relay_parent, such that we may - // notify the collator of their successful second backing - if let Some((relay_parent, collation_event)) = match res { - Some( - (mut collation_event, Ok((candidate_receipt, pov))) - ) => { - let relay_parent = &collation_event.1.relay_parent; - // Verify whether this relay_parent has already been seconded - if state.pending_candidates.get(relay_parent).is_none() { - // Forward Candidate Receipt and PoV to candidate backing [CB] - collation_event.1 - .commitments_hash = Some(candidate_receipt.commitments_hash); - ctx.send_message( - CandidateBackingMessage::Second( - relay_parent.clone(), - candidate_receipt, - pov, - ).into() - ).await; - Some((relay_parent.clone(), collation_event)) - } else { - tracing::debug!( - target: LOG_TARGET, - relay_parent = ?relay_parent, - collator_id = ?collation_event.0, - "Collation for this relay parent has already been seconded.", - ); - None - } - } - Some( - (collation_event, _) - ) => { - let (id, pending_collation) = collation_event; - tracing::debug!( - target: LOG_TARGET, - relay_parent = ?pending_collation.relay_parent, - collator_id = ?id, - "Collation fetching has timed out.", - ); - None - } - _ => None, - } { - state.pending_candidates.insert(relay_parent, collation_event); + res = state.collation_fetches.next() => { + if let Some(res) = res { + handle_collation_fetched_result(&mut ctx, &mut state, res).await; } } } @@ -1119,6 +1115,68 @@ pub(crate) async fn run<Context>( Ok(()) } +/// Handle a fetched collation result. +async fn handle_collation_fetched_result( + ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>, + state: &mut State, + (mut collation_event, res): PendingCollationFetch, +) { + // If no prior collation for this relay parent has been seconded, then + // memoize the collation_event for that relay_parent, such that we may + // notify the collator of their successful second backing + let relay_parent = collation_event.1.relay_parent; + + let (candidate_receipt, pov) = match res { + Ok(res) => res, + Err(e) => { + tracing::debug!( + target: LOG_TARGET, + relay_parent = ?collation_event.1.relay_parent, + para_id = ?collation_event.1.para_id, + peer_id = ?collation_event.1.peer_id, + collator_id = ?collation_event.0, + error = ?e, + "Failed to fetch collation.", + ); + + let (next_try, id) = if let Some(collations) = state.collations_per_relay_parent.get_mut(&relay_parent) { + if let Some(next_try) = collations.unfetched_collations.pop() { + next_try + } else if matches!(collations.status, CollationStatus::Fetching) { + collations.status = CollationStatus::Waiting; + return + } else { + tracing::error!( + target: LOG_TARGET, + status = ?collations.status, + "Expected status `CollationStatus::Fetching` but got unexpected status." + ); + return + } + } else { + return + }; + + fetch_collation(ctx, state, next_try, id).await; + + return + }, + }; + + if let Entry::Vacant(entry) = state.pending_candidates.entry(relay_parent) { + collation_event.1.commitments_hash = Some(candidate_receipt.commitments_hash); + ctx.send_message( + CandidateBackingMessage::Second( + relay_parent.clone(), + candidate_receipt, + pov, + ).into() + ).await; + + entry.insert(collation_event); + } +} + // This issues `NetworkBridge` notifications to disconnect from all inactive peers at the // earliest possible point. This does not yet clean up any metadata, as that will be done upon // receipt of the `PeerDisconnected` event. @@ -1145,7 +1203,7 @@ async fn poll_collation_response<Context>( metrics: &Metrics, spans: &HashMap<Hash, PerLeafSpan>, pending_collation: &PendingCollation, - per_req: &mut PerRequest + per_req: &mut PerRequest, ) -> bool where diff --git a/polkadot/node/network/collator-protocol/src/validator_side/tests.rs b/polkadot/node/network/collator-protocol/src/validator_side/tests.rs index a6cfde80ba2..dc79c575cde 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/tests.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/tests.rs @@ -32,8 +32,8 @@ use polkadot_node_primitives::BlockData; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; use polkadot_subsystem::messages::{RuntimeApiMessage, RuntimeApiRequest}; -use polkadot_node_network_protocol::{our_view, ObservedRole, - request_response::Requests +use polkadot_node_network_protocol::{ + our_view, ObservedRole, request_response::{Requests, ResponseSender}, }; const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(50); @@ -256,6 +256,135 @@ async fn respond_to_core_info_queries( ); } +/// Assert that the next message is a `CandidateBacking(Second())`. +async fn assert_candidate_backing_second( + virtual_overseer: &mut VirtualOverseer, + expected_relay_parent: Hash, + expected_para_id: ParaId, + expected_pov: &PoV, +) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::CandidateBacking(CandidateBackingMessage::Second(relay_parent, candidate_receipt, incoming_pov) + ) => { + assert_eq!(expected_relay_parent, relay_parent); + assert_eq!(expected_para_id, candidate_receipt.descriptor.para_id); + assert_eq!(*expected_pov, incoming_pov); + }); +} + +/// Assert that a collator got disconnected. +async fn assert_collator_disconnect( + virtual_overseer: &mut VirtualOverseer, + expected_peer: PeerId, +) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + peer, + peer_set, + )) => { + assert_eq!(expected_peer, peer); + assert_eq!(PeerSet::Collation, peer_set); + } + ); +} + +/// Assert that the given collators got disconnected. +async fn assert_collators_disconnect( + virtual_overseer: &mut VirtualOverseer, + expected_peers: &[PeerId], +) { + for _ in expected_peers { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + peer, + peer_set, + )) => { + assert!(expected_peers.contains(&peer), "Unexpected collator disconnected: {:?}", peer); + assert_eq!(PeerSet::Collation, peer_set); + } + ); + } +} + +/// Assert that a fetch collation request was send. +async fn assert_fetch_collation_request( + virtual_overseer: &mut VirtualOverseer, + relay_parent: Hash, + para_id: ParaId, +) -> ResponseSender { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) + ) => { + let req = reqs.into_iter().next() + .expect("There should be exactly one request"); + match req { + Requests::CollationFetching(req) => { + let payload = req.payload; + assert_eq!(payload.relay_parent, relay_parent); + assert_eq!(payload.para_id, para_id); + req.pending_response + } + _ => panic!("Unexpected request"), + } + }) +} + +/// Connect and declare a collator +async fn connect_and_declare_collator( + virtual_overseer: &mut VirtualOverseer, + peer: PeerId, + collator: CollatorPair, + para_id: ParaId, +) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer.clone(), + ObservedRole::Full, + None, + ), + ) + ).await; + + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer.clone(), + protocol_v1::CollatorProtocolMessage::Declare( + collator.public(), + para_id, + collator.sign(&protocol_v1::declare_signature_payload(&peer)), + ) + ) + ) + ).await; +} + +/// Advertise a collation. +async fn advertise_collation( + virtual_overseer: &mut VirtualOverseer, + peer: PeerId, + relay_parent: Hash, +) { + overseer_send( + virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer, + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + relay_parent, + ) + ) + ) + ).await; +} + // As we receive a relevant advertisement act on it and issue a collation request. #[test] fn act_on_advertisement() { @@ -280,58 +409,11 @@ fn act_on_advertisement() { let peer_b = PeerId::random(); - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_b, - ObservedRole::Full, - None, - ), - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - pair.public(), - test_state.chain_ids[0], - pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), - ) - ) - ) - ).await; + connect_and_declare_collator(&mut virtual_overseer, peer_b.clone(), pair.clone(), test_state.chain_ids[0]).await; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - test_state.relay_parent, - ) - ) - ) - ).await; + advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, test_state.relay_parent); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - } - _ => panic!("Unexpected request"), - } - }); + assert_fetch_collation_request(&mut virtual_overseer, test_state.relay_parent, test_state.chain_ids[0]).await; virtual_overseer }); @@ -359,54 +441,18 @@ fn collator_reporting_works() { let peer_b = PeerId::random(); let peer_c = PeerId::random(); - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_b, - ObservedRole::Full, - None, - ), - ) - ).await; - - overseer_send( + connect_and_declare_collator( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_c, - ObservedRole::Full, - None, - ), - ) + peer_b.clone(), + test_state.collators[0].clone(), + test_state.chain_ids[0].clone(), ).await; - overseer_send( + connect_and_declare_collator( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - test_state.collators[0].public(), - test_state.chain_ids[0], - test_state.collators[0].sign(&protocol_v1::declare_signature_payload(&peer_b)), - ), - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_c.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - test_state.collators[1].public(), - test_state.chain_ids[0], - test_state.collators[1].sign(&protocol_v1::declare_signature_payload(&peer_c)), - ), - ) - ) + peer_c.clone(), + test_state.collators[1].clone(), + test_state.chain_ids[0].clone(), ).await; overseer_send( @@ -482,12 +528,11 @@ fn collator_authentication_verification_works() { // A test scenario that takes the following steps // - Two collators connect, declare themselves and advertise a collation relevant to // our view. -// - This results subsystem acting upon these advertisements and issuing two messages to -// the CandidateBacking subsystem. -// - CandidateBacking requests both of the collations. -// - Collation protocol requests these collations. -// - The collations are sent to it. -// - Collations are fetched correctly. +// - Collation protocol should request one PoV. +// - Collation protocol should disconnect both collators after having received the collation. +// - The same collators connect again and send povs for a different relay parent. +// - Collation protocol will request one PoV, but we will cancel it. +// - Collation protocol should request the second PoV. #[test] fn fetch_collations_works() { let test_state = TestState::default(); @@ -497,137 +542,140 @@ fn fetch_collations_works() { mut virtual_overseer, } = test_harness; + let second = Hash::random(); + overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) + NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent, second]) ), ).await; + respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; let peer_b = PeerId::random(); let peer_c = PeerId::random(); - overseer_send( + connect_and_declare_collator( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_b, - ObservedRole::Full, - None, - ), - ) + peer_b.clone(), + test_state.collators[0].clone(), + test_state.chain_ids[0].clone(), ).await; - overseer_send( + connect_and_declare_collator( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_c, - ObservedRole::Full, - None, - ), - ) + peer_c.clone(), + test_state.collators[1].clone(), + test_state.chain_ids[0].clone(), ).await; - overseer_send( + advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await; + advertise_collation(&mut virtual_overseer, peer_c.clone(), test_state.relay_parent).await; + + let response_channel = assert_fetch_collation_request( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - test_state.collators[0].public(), - test_state.chain_ids[0], - test_state.collators[0].sign(&protocol_v1::declare_signature_payload(&peer_b)), - ) - ) - ) + test_state.relay_parent, + test_state.chain_ids[0], ).await; - overseer_send( + assert!( + overseer_recv_with_timeout(&mut &mut virtual_overseer, Duration::from_millis(30)).await.is_none(), + "There should not be sent any other PoV request while the first one wasn't finished", + ); + + let pov = PoV { block_data: BlockData(vec![]) }; + let mut candidate_a = CandidateReceipt::default(); + candidate_a.descriptor.para_id = test_state.chain_ids[0]; + candidate_a.descriptor.relay_parent = test_state.relay_parent; + response_channel.send(Ok( + CollationFetchingResponse::Collation( + candidate_a.clone(), + pov.clone(), + ).encode() + )).expect("Sending response should succeed"); + + assert_candidate_backing_second( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_c.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - test_state.collators[1].public(), - test_state.chain_ids[0], - test_state.collators[1].sign(&protocol_v1::declare_signature_payload(&peer_c)), - ) - ) - ) + test_state.relay_parent, + test_state.chain_ids[0], + &pov, ).await; + assert_collators_disconnect(&mut virtual_overseer, &[peer_b.clone(), peer_c.clone()]).await; + overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - test_state.relay_parent, - ) - ) - ) + CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer_b.clone())), ).await; overseer_send( &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_c.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - test_state.relay_parent, - ) - ) - ) + CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerDisconnected(peer_c.clone())), + ).await; + + let peer_b = PeerId::random(); + let peer_c = PeerId::random(); + + connect_and_declare_collator( + &mut virtual_overseer, + peer_b.clone(), + test_state.collators[2].clone(), + test_state.chain_ids[0].clone(), + ).await; + + connect_and_declare_collator( + &mut virtual_overseer, + peer_c.clone(), + test_state.collators[3].clone(), + test_state.chain_ids[0].clone(), + ).await; + + advertise_collation(&mut virtual_overseer, peer_b.clone(), second).await; + advertise_collation(&mut virtual_overseer, peer_c.clone(), second).await; + + // Dropping the response channel should lead to fetching the second collation. + assert_fetch_collation_request( + &mut virtual_overseer, + second, + test_state.chain_ids[0], ).await; - let response_channel = assert_matches!( + assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, test_state.relay_parent); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - req.pending_response - } - _ => panic!("Unexpected request"), + AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer( + peer, + rep, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(rep, COST_REQUEST_TIMED_OUT); } - }); + ); + + let response_channel = assert_fetch_collation_request( + &mut virtual_overseer, + second, + test_state.chain_ids[0], + ).await; + let pov = PoV { block_data: BlockData(vec![1]) }; let mut candidate_a = CandidateReceipt::default(); candidate_a.descriptor.para_id = test_state.chain_ids[0]; - candidate_a.descriptor.relay_parent = test_state.relay_parent; + candidate_a.descriptor.relay_parent = second; response_channel.send(Ok( CollationFetchingResponse::Collation( candidate_a.clone(), - PoV { - block_data: BlockData(vec![]), - }, + pov.clone(), ).encode() )).expect("Sending response should succeed"); - let _ = assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, test_state.relay_parent); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - req.pending_response - } - _ => panic!("Unexpected request"), - } - }); + assert_candidate_backing_second( + &mut virtual_overseer, + second, + test_state.chain_ids[0], + &pov, + ).await; virtual_overseer }); @@ -657,59 +705,10 @@ fn inactive_disconnected() { let peer_b = PeerId::random(); - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_b.clone(), - ObservedRole::Full, - None, - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - pair.public(), - test_state.chain_ids[0], - pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), - ) - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - test_state.relay_parent, - ) - ) - ) - ).await; + connect_and_declare_collator(&mut virtual_overseer, peer_b.clone(), pair.clone(), test_state.chain_ids[0]).await; + advertise_collation(&mut virtual_overseer, peer_b.clone(), test_state.relay_parent).await; - let _ = assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, test_state.relay_parent); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - req.pending_response - } - _ => panic!("Unexpected request"), - } - }); + assert_fetch_collation_request(&mut virtual_overseer, test_state.relay_parent, test_state.chain_ids[0]).await; Delay::new(ACTIVITY_TIMEOUT * 3).await; @@ -724,16 +723,7 @@ fn inactive_disconnected() { } ); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( - peer, - peer_set, - )) => { - assert_eq!(peer, peer_b); - assert_eq!(peer_set, PeerSet::Collation); - } - ); + assert_collator_disconnect(&mut virtual_overseer, peer_b.clone()).await; virtual_overseer }); } @@ -767,74 +757,17 @@ fn activity_extends_life() { let peer_b = PeerId::random(); - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_b.clone(), - ObservedRole::Full, - None, - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - pair.public(), - test_state.chain_ids[0], - pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), - ) - ) - ) - ).await; + connect_and_declare_collator(&mut virtual_overseer, peer_b.clone(), pair.clone(), test_state.chain_ids[0]).await; Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - hash_a, - ) - ) - ) - ).await; + advertise_collation(&mut virtual_overseer, peer_b.clone(), hash_a).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, hash_a); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - } - _ => panic!("Unexpected request"), - } - }); + assert_fetch_collation_request(&mut virtual_overseer, hash_a, test_state.chain_ids[0]).await; Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - hash_b - ) - ) - ) - ).await; + advertise_collation(&mut virtual_overseer, peer_b.clone(), hash_b).await; assert_matches!( overseer_recv(&mut virtual_overseer).await, @@ -847,35 +780,11 @@ fn activity_extends_life() { } ); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, hash_b); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - } - _ => panic!("Unexpected request"), - } - }); + assert_fetch_collation_request(&mut virtual_overseer, hash_b, test_state.chain_ids[0]).await; Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - hash_c, - ) - ) - ) - ).await; + advertise_collation(&mut virtual_overseer, peer_b.clone(), hash_c).await; assert_matches!( overseer_recv(&mut virtual_overseer).await, @@ -888,21 +797,7 @@ fn activity_extends_life() { } ); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, hash_c); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - } - _ => panic!("Unexpected request"), - } - }); + assert_fetch_collation_request(&mut virtual_overseer, hash_c, test_state.chain_ids[0]).await; Delay::new(ACTIVITY_TIMEOUT * 3 / 2).await; @@ -917,16 +812,8 @@ fn activity_extends_life() { } ); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( - peer, - peer_set, - )) => { - assert_eq!(peer, peer_b); - assert_eq!(peer_set, PeerSet::Collation); - } - ); + assert_collator_disconnect(&mut virtual_overseer, peer_b.clone()).await; + virtual_overseer }); } @@ -962,16 +849,8 @@ fn disconnect_if_no_declare() { ) ).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( - peer, - peer_set, - )) => { - assert_eq!(peer, peer_b); - assert_eq!(peer_set, PeerSet::Collation); - } - ); + assert_collator_disconnect(&mut virtual_overseer, peer_b.clone()).await; + virtual_overseer }) } @@ -1034,16 +913,8 @@ fn disconnect_if_wrong_declare() { } ); - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( - peer, - peer_set, - )) => { - assert_eq!(peer, peer_b); - assert_eq!(peer_set, PeerSet::Collation); - } - ); + assert_collator_disconnect(&mut virtual_overseer, peer_b.clone()).await; + virtual_overseer }) } @@ -1070,30 +941,7 @@ fn view_change_clears_old_collators() { let peer_b = PeerId::random(); - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_b.clone(), - ObservedRole::Full, - None, - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - pair.public(), - test_state.chain_ids[0], - pair.sign(&protocol_v1::declare_signature_payload(&peer_b)), - ) - ) - ) - ).await; + connect_and_declare_collator(&mut virtual_overseer, peer_b.clone(), pair.clone(), test_state.chain_ids[0]).await; let hash_b = Hash::repeat_byte(69); @@ -1107,202 +955,8 @@ fn view_change_clears_old_collators() { test_state.group_rotation_info = test_state.group_rotation_info.bump_rotation(); respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( - peer, - peer_set, - )) => { - assert_eq!(peer, peer_b); - assert_eq!(peer_set, PeerSet::Collation); - } - ); - virtual_overseer - }) -} - -// A test scenario that takes the following steps -// - Two collators connect, declare themselves and advertise a collation relevant to -// our view. -// - This results subsystem acting upon these advertisements and issuing two messages to -// the CandidateBacking subsystem. -// - CandidateBacking requests both of the collations. -// - Collation protocol requests these collations. -// - The collations are sent to it. -// - Collations are fetched correctly. -#[test] -fn seconding_works() { - let test_state = TestState::default(); - - test_harness(|test_harness| async move { - let TestHarness { - mut virtual_overseer, - } = test_harness; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::OurViewChange(our_view![test_state.relay_parent]) - ), - ).await; - - respond_to_core_info_queries(&mut virtual_overseer, &test_state).await; - - let peer_b = PeerId::random(); - let peer_c = PeerId::random(); - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_b, - ObservedRole::Full, - None, - ), - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - peer_c, - ObservedRole::Full, - None, - ), - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - test_state.collators[0].public(), - test_state.chain_ids[0], - test_state.collators[0].sign(&protocol_v1::declare_signature_payload(&peer_b)), - ) - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_c.clone(), - protocol_v1::CollatorProtocolMessage::Declare( - test_state.collators[1].public(), - test_state.chain_ids[0], - test_state.collators[1].sign(&protocol_v1::declare_signature_payload(&peer_c)), - ) - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_b.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - test_state.relay_parent, - ) - ) - ) - ).await; - - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerMessage( - peer_c.clone(), - protocol_v1::CollatorProtocolMessage::AdvertiseCollation( - test_state.relay_parent, - ) - ) - ) - ).await; - - let response_channel = assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, test_state.relay_parent); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - req.pending_response - } - _ => panic!("Unexpected request"), - } - }); - - let mut candidate_a = CandidateReceipt::default(); - // Memoize PoV data to ensure we receive the right one - let pov = PoV { - block_data: BlockData(vec![1, 2, 3, 4, 5]), - }; - candidate_a.descriptor.para_id = test_state.chain_ids[0]; - candidate_a.descriptor.relay_parent = test_state.relay_parent; - response_channel.send(Ok( - CollationFetchingResponse::Collation( - candidate_a.clone(), - pov.clone(), - ).encode() - )).expect("Sending response should succeed"); - - let response_channel = assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::ImmediateError) - ) => { - let req = reqs.into_iter().next() - .expect("There should be exactly one request"); - match req { - Requests::CollationFetching(req) => { - let payload = req.payload; - assert_eq!(payload.relay_parent, test_state.relay_parent); - assert_eq!(payload.para_id, test_state.chain_ids[0]); - req.pending_response - } - _ => panic!("Unexpected request"), - } - }); - - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::CandidateBacking(CandidateBackingMessage::Second(relay_parent, candidate_receipt, incoming_pov) - ) => { - assert_eq!(relay_parent, test_state.relay_parent); - assert_eq!(candidate_receipt.descriptor.para_id, test_state.chain_ids[0]); - assert_eq!(incoming_pov, pov); - }); - - let mut candidate_b = CandidateReceipt::default(); - candidate_b.descriptor.para_id = test_state.chain_ids[0]; - candidate_b.descriptor.relay_parent = test_state.relay_parent; - - // Send second collation to ensure first collation gets seconded - response_channel.send(Ok( - CollationFetchingResponse::Collation( - candidate_b.clone(), - PoV { - block_data: BlockData(vec![]), - }, - ).encode() - )).expect("Sending response should succeed after seconding"); - - // Ensure we don't receive any message related to candidate backing - // All Peers should get disconnected after successful Candidate Backing Message - assert_matches!( - overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(_, _) - ) => {}); + assert_collator_disconnect(&mut virtual_overseer, peer_b.clone()).await; virtual_overseer - }); + }) } diff --git a/polkadot/node/network/protocol/src/request_response/mod.rs b/polkadot/node/network/protocol/src/request_response/mod.rs index f4ca6b336f2..2ad57889103 100644 --- a/polkadot/node/network/protocol/src/request_response/mod.rs +++ b/polkadot/node/network/protocol/src/request_response/mod.rs @@ -44,7 +44,7 @@ pub use sc_network::config::RequestResponseConfig; /// All requests that can be sent to the network bridge. pub mod request; -pub use request::{IncomingRequest, OutgoingRequest, Requests, Recipient, OutgoingResult}; +pub use request::{IncomingRequest, OutgoingRequest, Requests, Recipient, OutgoingResult, ResponseSender}; ///// Multiplexer for incoming requests. // pub mod multiplexer; diff --git a/polkadot/node/network/protocol/src/request_response/request.rs b/polkadot/node/network/protocol/src/request_response/request.rs index f8df5ee4a15..4e6456627bd 100644 --- a/polkadot/node/network/protocol/src/request_response/request.rs +++ b/polkadot/node/network/protocol/src/request_response/request.rs @@ -29,6 +29,9 @@ use crate::UnifiedReputationChange; use super::{v1, Protocol}; +/// Used by the network to send us a response to a request. +pub type ResponseSender = oneshot::Sender<Result<Vec<u8>, network::RequestFailure>>; + /// Common properties of any `Request`. pub trait IsRequest { /// Each request has a corresponding `Response`. @@ -109,7 +112,7 @@ pub struct OutgoingRequest<Req> { /// The actual request to send over the wire. pub payload: Req, /// Sender which is used by networking to get us back a response. - pub pending_response: oneshot::Sender<Result<Vec<u8>, network::RequestFailure>>, + pub pending_response: ResponseSender, } /// Any error that can occur when sending a request. -- GitLab