Unverified Commit c429e15c authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

refactor View to include finalized_number (#2128)

* refactor View to include finalized_number

* guide: update the NetworkBridge on BlockFinalized

* av-store: fix the tests

* actually fix tests

* grumbles

* ignore macro doctest

* use Hash::repeat_bytes more consistently

* broadcast empty leaves updates as well

* fix issuing view updates on empty leaves updates
parent d3a0c571
Pipeline #117500 passed with stages
in 26 minutes
...@@ -145,7 +145,7 @@ impl CollationGenerationSubsystem { ...@@ -145,7 +145,7 @@ impl CollationGenerationSubsystem {
} }
false false
} }
Ok(Signal(BlockFinalized(_))) => false, Ok(Signal(BlockFinalized(..))) => false,
Err(err) => { Err(err) => {
tracing::error!( tracing::error!(
target: LOG_TARGET, target: LOG_TARGET,
......
...@@ -538,8 +538,8 @@ where ...@@ -538,8 +538,8 @@ where
process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?; process_block_activated(ctx, &subsystem.inner, activated, &subsystem.metrics).await?;
} }
} }
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => { FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
process_block_finalized(subsystem, ctx, &subsystem.inner, hash).await?; process_block_finalized(subsystem, &subsystem.inner, number).await?;
} }
FromOverseer::Communication { msg } => { FromOverseer::Communication { msg } => {
process_message(subsystem, ctx, msg).await?; process_message(subsystem, ctx, msg).await?;
...@@ -564,20 +564,14 @@ where ...@@ -564,20 +564,14 @@ where
/// The state of data has to be changed from /// The state of data has to be changed from
/// `CandidateState::Included` to `CandidateState::Finalized` and their pruning times have /// `CandidateState::Included` to `CandidateState::Finalized` and their pruning times have
/// to be updated to `now` + keep_finalized_{block, chunk}_for`. /// to be updated to `now` + keep_finalized_{block, chunk}_for`.
#[tracing::instrument(level = "trace", skip(subsystem, ctx, db), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(subsystem, db), fields(subsystem = LOG_TARGET))]
async fn process_block_finalized<Context>( async fn process_block_finalized(
subsystem: &AvailabilityStoreSubsystem, subsystem: &AvailabilityStoreSubsystem,
ctx: &mut Context,
db: &Arc<dyn KeyValueDB>, db: &Arc<dyn KeyValueDB>,
hash: Hash, block_number: BlockNumber,
) -> Result<(), Error> ) -> Result<(), Error> {
where
Context: SubsystemContext<Message=AvailabilityStoreMessage>
{
let _timer = subsystem.metrics.time_process_block_finalized(); let _timer = subsystem.metrics.time_process_block_finalized();
let block_number = get_block_number(ctx, hash).await?;
if let Some(mut pov_pruning) = pov_pruning(db) { if let Some(mut pov_pruning) = pov_pruning(db) {
// Since the records are sorted by time in which they need to be pruned and not by block // Since the records are sorted by time in which they need to be pruned and not by block
// numbers we have to iterate through the whole collection here. // numbers we have to iterate through the whole collection here.
......
...@@ -274,7 +274,7 @@ fn store_block_works() { ...@@ -274,7 +274,7 @@ fn store_block_works() {
let test_state = TestState::default(); let test_state = TestState::default();
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness; let TestHarness { mut virtual_overseer } = test_harness;
let candidate_hash = CandidateHash(Hash::from([1; 32])); let candidate_hash = CandidateHash(Hash::repeat_byte(1));
let validator_index = 5; let validator_index = 5;
let n_validators = 10; let n_validators = 10;
...@@ -328,7 +328,7 @@ fn store_pov_and_query_chunk_works() { ...@@ -328,7 +328,7 @@ fn store_pov_and_query_chunk_works() {
test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move { test_harness(test_state.pruning_config.clone(), store.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness; let TestHarness { mut virtual_overseer } = test_harness;
let candidate_hash = CandidateHash(Hash::from([1; 32])); let candidate_hash = CandidateHash(Hash::repeat_byte(1));
let n_validators = 10; let n_validators = 10;
let pov = PoV { let pov = PoV {
...@@ -543,20 +543,9 @@ fn stored_data_kept_until_finalized() { ...@@ -543,20 +543,9 @@ fn stored_data_kept_until_finalized() {
overseer_signal( overseer_signal(
&mut virtual_overseer, &mut virtual_overseer,
OverseerSignal::BlockFinalized(new_leaf) OverseerSignal::BlockFinalized(new_leaf, 10)
).await; ).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, new_leaf);
tx.send(Ok(Some(10))).unwrap();
}
);
// Wait for a half of the time finalized data should be available for // Wait for a half of the time finalized data should be available for
Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await; Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await;
...@@ -658,20 +647,9 @@ fn stored_chunk_kept_until_finalized() { ...@@ -658,20 +647,9 @@ fn stored_chunk_kept_until_finalized() {
overseer_signal( overseer_signal(
&mut virtual_overseer, &mut virtual_overseer,
OverseerSignal::BlockFinalized(new_leaf) OverseerSignal::BlockFinalized(new_leaf, 10)
).await; ).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, new_leaf);
tx.send(Ok(Some(10))).unwrap();
}
);
// Wait for a half of the time finalized data should be available for // Wait for a half of the time finalized data should be available for
Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await; Delay::new(test_state.pruning_config.keep_finalized_block_for / 2).await;
...@@ -812,21 +790,9 @@ fn forkfullness_works() { ...@@ -812,21 +790,9 @@ fn forkfullness_works() {
overseer_signal( overseer_signal(
&mut virtual_overseer, &mut virtual_overseer,
OverseerSignal::BlockFinalized(new_leaf_1) OverseerSignal::BlockFinalized(new_leaf_1, 5)
).await; ).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockNumber(
hash,
tx,
)) => {
assert_eq!(hash, new_leaf_1);
tx.send(Ok(Some(5))).unwrap();
}
);
// Data of both candidates should be still present in the DB. // Data of both candidates should be still present in the DB.
assert_eq!( assert_eq!(
query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(), query_available_data(&mut virtual_overseer, candidate_1_hash).await.unwrap(),
......
...@@ -1157,7 +1157,7 @@ mod tests { ...@@ -1157,7 +1157,7 @@ mod tests {
let mut head_data = HashMap::new(); let mut head_data = HashMap::new();
head_data.insert(chain_a, HeadData(vec![4, 5, 6])); head_data.insert(chain_a, HeadData(vec![4, 5, 6]));
let relay_parent = Hash::from([5; 32]); let relay_parent = Hash::repeat_byte(5);
let signing_context = SigningContext { let signing_context = SigningContext {
session_index: 1, session_index: 1,
......
...@@ -95,7 +95,7 @@ async fn run( ...@@ -95,7 +95,7 @@ async fn run(
loop { loop {
match ctx.recv().await? { match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {} FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {} FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Communication { msg } => match msg { FromOverseer::Communication { msg } => match msg {
CandidateValidationMessage::ValidateFromChainState( CandidateValidationMessage::ValidateFromChainState(
......
...@@ -89,7 +89,7 @@ where ...@@ -89,7 +89,7 @@ where
match ctx.recv().await? { match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}, FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Communication { msg } => match msg { FromOverseer::Communication { msg } => match msg {
ChainApiMessage::BlockNumber(hash, response_channel) => { ChainApiMessage::BlockNumber(hash, response_channel) => {
let _timer = subsystem.metrics.time_block_number(); let _timer = subsystem.metrics.time_block_number();
......
...@@ -152,7 +152,7 @@ async fn run<Client>( ...@@ -152,7 +152,7 @@ async fn run<Client>(
req = ctx.recv().fuse() => match req? { req = ctx.recv().fuse() => match req? {
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}, FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {},
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Communication { msg } => match msg { FromOverseer::Communication { msg } => match msg {
RuntimeApiMessage::Request(relay_parent, request) => { RuntimeApiMessage::Request(relay_parent, request) => {
subsystem.spawn_request(relay_parent, request); subsystem.spawn_request(relay_parent, request);
......
...@@ -416,7 +416,7 @@ where ...@@ -416,7 +416,7 @@ where
.filter(|(_peer, view)| { .filter(|(_peer, view)| {
// collect all direct interests of a peer w/o ancestors // collect all direct interests of a peer w/o ancestors
state state
.cached_live_candidates_unioned(view.0.iter()) .cached_live_candidates_unioned(view.heads.iter())
.contains_key(&candidate_hash) .contains_key(&candidate_hash)
}) })
.map(|(peer, _view)| peer.clone()) .map(|(peer, _view)| peer.clone())
...@@ -619,7 +619,7 @@ where ...@@ -619,7 +619,7 @@ where
let _timer = metrics.time_process_incoming_peer_message(); let _timer = metrics.time_process_incoming_peer_message();
// obtain the set of candidates we are interested in based on our current view // obtain the set of candidates we are interested in based on our current view
let live_candidates = state.cached_live_candidates_unioned(state.view.0.iter()); let live_candidates = state.cached_live_candidates_unioned(state.view.heads.iter());
// check if the candidate is of interest // check if the candidate is of interest
let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) { let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) {
...@@ -707,7 +707,7 @@ where ...@@ -707,7 +707,7 @@ where
.filter(|(_peer, view)| { .filter(|(_peer, view)| {
// peers view must contain the candidate hash too // peers view must contain the candidate hash too
state state
.cached_live_candidates_unioned(view.0.iter()) .cached_live_candidates_unioned(view.heads.iter())
.contains_key(&message_id.0) .contains_key(&message_id.0)
}) })
.map(|(peer, _)| -> PeerId { peer.clone() }) .map(|(peer, _)| -> PeerId { peer.clone() })
...@@ -781,7 +781,7 @@ impl AvailabilityDistributionSubsystem { ...@@ -781,7 +781,7 @@ impl AvailabilityDistributionSubsystem {
})) => { })) => {
// handled at view change // handled at view change
} }
FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {} FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}
FromOverseer::Signal(OverseerSignal::Conclude) => { FromOverseer::Signal(OverseerSignal::Conclude) => {
return Ok(()); return Ok(());
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
use super::*; use super::*;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks};
use polkadot_node_network_protocol::ObservedRole; use polkadot_node_network_protocol::{view, ObservedRole};
use polkadot_node_subsystem_util::TimeoutExt; use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{ use polkadot_primitives::v1::{
AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex, AvailableData, BlockData, CandidateCommitments, CandidateDescriptor, GroupIndex,
...@@ -33,11 +33,6 @@ use sp_application_crypto::AppKey; ...@@ -33,11 +33,6 @@ use sp_application_crypto::AppKey;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
macro_rules! view {
( $( $hash:expr ),* $(,)? ) => [
View(vec![ $( $hash.clone() ),* ])
];
}
macro_rules! delay { macro_rules! delay {
($delay:expr) => { ($delay:expr) => {
......
...@@ -212,8 +212,8 @@ impl BitfieldDistribution { ...@@ -212,8 +212,8 @@ impl BitfieldDistribution {
// defer the cleanup to the view change // defer the cleanup to the view change
} }
} }
FromOverseer::Signal(OverseerSignal::BlockFinalized(hash)) => { FromOverseer::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
tracing::trace!(target: LOG_TARGET, hash = %hash, "block finalized"); tracing::trace!(target: LOG_TARGET, hash = %hash, number = %number, "block finalized");
} }
FromOverseer::Signal(OverseerSignal::Conclude) => { FromOverseer::Signal(OverseerSignal::Conclude) => {
tracing::trace!(target: LOG_TARGET, "Conclude"); tracing::trace!(target: LOG_TARGET, "Conclude");
...@@ -770,13 +770,7 @@ mod test { ...@@ -770,13 +770,7 @@ mod test {
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use polkadot_node_network_protocol::ObservedRole; use polkadot_node_network_protocol::{view, ObservedRole};
macro_rules! view {
( $( $hash:expr ),* $(,)? ) => [
View(vec![ $( $hash.clone() ),* ])
];
}
macro_rules! launch { macro_rules! launch {
($fut:expr) => { ($fut:expr) => {
...@@ -833,7 +827,7 @@ mod test { ...@@ -833,7 +827,7 @@ mod test {
let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None) let validator = SyncCryptoStore::sr25519_generate_new(&*keystore, ValidatorId::ID, None)
.expect("generating sr25519 key not to fail"); .expect("generating sr25519 key not to fail");
state.per_relay_parent = view.0.iter().map(|relay_parent| {( state.per_relay_parent = view.heads.iter().map(|relay_parent| {(
relay_parent.clone(), relay_parent.clone(),
PerRelayParentData { PerRelayParentData {
signing_context: signing_context.clone(), signing_context: signing_context.clone(),
......
...@@ -37,7 +37,7 @@ use polkadot_subsystem::messages::{ ...@@ -37,7 +37,7 @@ use polkadot_subsystem::messages::{
BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage, BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage,
CollatorProtocolMessage, CollatorProtocolMessage,
}; };
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash}; use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash, BlockNumber};
use polkadot_node_network_protocol::{ use polkadot_node_network_protocol::{
ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1 ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1
}; };
...@@ -254,6 +254,7 @@ enum Action { ...@@ -254,6 +254,7 @@ enum Action {
ReportPeer(PeerId, ReputationChange), ReportPeer(PeerId, ReputationChange),
ActiveLeaves(ActiveLeavesUpdate), ActiveLeaves(ActiveLeavesUpdate),
BlockFinalized(BlockNumber),
PeerConnected(PeerSet, PeerId, ObservedRole), PeerConnected(PeerSet, PeerId, ObservedRole),
PeerDisconnected(PeerSet, PeerId), PeerDisconnected(PeerSet, PeerId),
...@@ -274,6 +275,8 @@ fn action_from_overseer_message( ...@@ -274,6 +275,8 @@ fn action_from_overseer_message(
match res { match res {
Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves))) Ok(FromOverseer::Signal(OverseerSignal::ActiveLeaves(active_leaves)))
=> Action::ActiveLeaves(active_leaves), => Action::ActiveLeaves(active_leaves),
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, number)))
=> Action::BlockFinalized(number),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort, Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => Action::Abort,
Ok(FromOverseer::Communication { msg }) => match msg { Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
...@@ -284,8 +287,6 @@ fn action_from_overseer_message( ...@@ -284,8 +287,6 @@ fn action_from_overseer_message(
NetworkBridgeMessage::ConnectToValidators { validator_ids, connected } NetworkBridgeMessage::ConnectToValidators { validator_ids, connected }
=> Action::ConnectToValidators { validator_ids, connected }, => Action::ConnectToValidators { validator_ids, connected },
}, },
Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_)))
=> Action::Nop,
Err(e) => { Err(e) => {
tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error"); tracing::warn!(target: LOG_TARGET, err = ?e, "Shutting down Network Bridge due to error");
Action::Abort Action::Abort
...@@ -348,21 +349,25 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Action { ...@@ -348,21 +349,25 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Action {
} }
} }
fn construct_view(live_heads: &[Hash]) -> View { fn construct_view(live_heads: &[Hash], finalized_number: BlockNumber) -> View {
View(live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect()) View {
heads: live_heads.iter().rev().take(MAX_VIEW_HEADS).cloned().collect(),
finalized_number
}
} }
#[tracing::instrument(level = "trace", skip(net, ctx, validation_peers, collation_peers), fields(subsystem = LOG_TARGET))] #[tracing::instrument(level = "trace", skip(net, ctx, validation_peers, collation_peers), fields(subsystem = LOG_TARGET))]
async fn update_view( async fn update_our_view(
net: &mut impl Network, net: &mut impl Network,
ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>, ctx: &mut impl SubsystemContext<Message = NetworkBridgeMessage>,
live_heads: &[Hash], live_heads: &[Hash],
local_view: &mut View, local_view: &mut View,
finalized_number: BlockNumber,
validation_peers: &HashMap<PeerId, PeerData>, validation_peers: &HashMap<PeerId, PeerData>,
collation_peers: &HashMap<PeerId, PeerData>, collation_peers: &HashMap<PeerId, PeerData>,
) -> SubsystemResult<()> { ) -> SubsystemResult<()> {
let new_view = construct_view(live_heads); let new_view = construct_view(live_heads, finalized_number);
if *local_view == new_view { return Ok(()) } if *local_view == new_view { return Ok(()) }
*local_view = new_view.clone(); *local_view = new_view.clone();
...@@ -413,7 +418,7 @@ async fn handle_peer_messages<M>( ...@@ -413,7 +418,7 @@ async fn handle_peer_messages<M>(
for message in messages { for message in messages {
outgoing_messages.push(match message { outgoing_messages.push(match message {
WireMessage::ViewUpdate(new_view) => { WireMessage::ViewUpdate(new_view) => {
if new_view.0.len() > MAX_VIEW_HEADS { if new_view.heads.len() > MAX_VIEW_HEADS {
net.report_peer( net.report_peer(
peer.clone(), peer.clone(),
MALFORMED_VIEW_COST, MALFORMED_VIEW_COST,
...@@ -580,7 +585,8 @@ where ...@@ -580,7 +585,8 @@ where
// Most recent heads are at the back. // Most recent heads are at the back.
let mut live_heads: Vec<Hash> = Vec::with_capacity(MAX_VIEW_HEADS); let mut live_heads: Vec<Hash> = Vec::with_capacity(MAX_VIEW_HEADS);
let mut local_view = View(Vec::new()); let mut local_view = View::default();
let mut finalized_number = 0;
let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new(); let mut validation_peers: HashMap<PeerId, PeerData> = HashMap::new();
let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new(); let mut collation_peers: HashMap<PeerId, PeerData> = HashMap::new();
...@@ -638,16 +644,27 @@ where ...@@ -638,16 +644,27 @@ where
live_heads.extend(activated); live_heads.extend(activated);
live_heads.retain(|h| !deactivated.contains(h)); live_heads.retain(|h| !deactivated.contains(h));
update_view( update_our_view(
&mut network_service, &mut network_service,
&mut ctx, &mut ctx,
&live_heads, &live_heads,
&mut local_view, &mut local_view,
finalized_number,
&validation_peers, &validation_peers,
&collation_peers, &collation_peers,
).await?; ).await?;
} }
Action::BlockFinalized(number) => {
debug_assert!(finalized_number < number);
// we don't send the view updates here, but delay them until the next `Action::ActiveLeaves`
// otherwise it might break assumptions of some of the subsystems
// that we never send the same `ActiveLeavesUpdate`
// this is fine, we will get `Action::ActiveLeaves` on block finalization anyway
finalized_number = number;
},
Action::PeerConnected(peer_set, peer, role) => { Action::PeerConnected(peer_set, peer, role) => {
let peer_map = match peer_set { let peer_map = match peer_set {
PeerSet::Validation => &mut validation_peers, PeerSet::Validation => &mut validation_peers,
...@@ -660,7 +677,7 @@ where ...@@ -660,7 +677,7 @@ where
hash_map::Entry::Occupied(_) => continue, hash_map::Entry::Occupied(_) => continue,
hash_map::Entry::Vacant(vacant) => { hash_map::Entry::Vacant(vacant) => {
let _ = vacant.insert(PeerData { let _ = vacant.insert(PeerData {
view: View(Vec::new()), view: View::default(),
}); });
match peer_set { match peer_set {
...@@ -669,7 +686,7 @@ where ...@@ -669,7 +686,7 @@ where
NetworkBridgeEvent::PeerConnected(peer.clone(), role), NetworkBridgeEvent::PeerConnected(peer.clone(), role),
NetworkBridgeEvent::PeerViewChange( NetworkBridgeEvent::PeerViewChange(
peer, peer,
View(Default::default()), View::default(),
), ),
], ],
&mut ctx, &mut ctx,
...@@ -679,7 +696,7 @@ where ...@@ -679,7 +696,7 @@ where
NetworkBridgeEvent::PeerConnected(peer.clone(), role), NetworkBridgeEvent::PeerConnected(peer.clone(), role),
NetworkBridgeEvent::PeerViewChange( NetworkBridgeEvent::PeerViewChange(
peer, peer,
View(Default::default()), View::default(),
), ),
], ],
&mut ctx, &mut ctx,
...@@ -753,6 +770,7 @@ mod tests { ...@@ -753,6 +770,7 @@ mod tests {
use polkadot_node_subsystem_test_helpers::{ use polkadot_node_subsystem_test_helpers::{
SingleItemSink, SingleItemStream, TestSubsystemContextHandle, SingleItemSink, SingleItemStream, TestSubsystemContextHandle,
}; };
use polkadot_node_network_protocol::view;
use sc_network::Multiaddr; use sc_network::Multiaddr;
use sp_keyring::Sr25519Keyring; use sp_keyring::Sr25519Keyring;
...@@ -978,7 +996,7 @@ mod tests { ...@@ -978,7 +996,7 @@ mod tests {
ObservedRole::Full, ObservedRole::Full,
).await; ).await;
let hash_a = Hash::from([1; 32]);