diff --git a/prdoc/pr_7011.prdoc b/prdoc/pr_7011.prdoc new file mode 100644 index 0000000000000000000000000000000000000000..55fe0c73ca091365fedec3dfe43a6c7988d9679f --- /dev/null +++ b/prdoc/pr_7011.prdoc @@ -0,0 +1,16 @@ +title: 'sync: Send already connected peers to new subscribers' +doc: +- audience: Node Dev + description: |- + Introduce `SyncEvent::InitialPeers` message sent to new subscribers to allow them correctly tracking sync peers. This resolves a race condition described in https://github.com/paritytech/polkadot-sdk/issues/6573#issuecomment-2563091343. + + Fixes https://github.com/paritytech/polkadot-sdk/issues/6573. +crates: +- name: sc-network-gossip + bump: major +- name: sc-network-statement + bump: patch +- name: sc-network-sync + bump: major +- name: sc-network-transactions + bump: patch diff --git a/substrate/client/network-gossip/src/bridge.rs b/substrate/client/network-gossip/src/bridge.rs index 2daf1e49ee4b49c0e45f3efd0baeef1488b319c6..bff258a9a011bbf9cb6b1bcee319044f54b830de 100644 --- a/substrate/client/network-gossip/src/bridge.rs +++ b/substrate/client/network-gossip/src/bridge.rs @@ -254,10 +254,12 @@ impl<B: BlockT> Future for GossipEngine<B> { match sync_event_stream { Poll::Ready(Some(event)) => match event { - SyncEvent::PeerConnected(remote) => - this.network.add_set_reserved(remote, this.protocol.clone()), - SyncEvent::PeerDisconnected(remote) => - this.network.remove_set_reserved(remote, this.protocol.clone()), + SyncEvent::InitialPeers(peer_ids) => + this.network.add_set_reserved(peer_ids, this.protocol.clone()), + SyncEvent::PeerConnected(peer_id) => + this.network.add_set_reserved(vec![peer_id], this.protocol.clone()), + SyncEvent::PeerDisconnected(peer_id) => + this.network.remove_set_reserved(peer_id, this.protocol.clone()), }, // The sync event stream closed. Do the same for [`GossipValidator`]. Poll::Ready(None) => { diff --git a/substrate/client/network-gossip/src/lib.rs b/substrate/client/network-gossip/src/lib.rs index 20d9922200c2c3c6cf394692099a21c01b540bd0..2ec573bf9e3ef8056f6fc6309110aa26d7e7e44b 100644 --- a/substrate/client/network-gossip/src/lib.rs +++ b/substrate/client/network-gossip/src/lib.rs @@ -82,15 +82,18 @@ mod validator; /// Abstraction over a network. pub trait Network<B: BlockT>: NetworkPeers + NetworkEventStream { - fn add_set_reserved(&self, who: PeerId, protocol: ProtocolName) { - let addr = Multiaddr::empty().with(Protocol::P2p(*who.as_ref())); - let result = self.add_peers_to_reserved_set(protocol, iter::once(addr).collect()); + fn add_set_reserved(&self, peer_ids: Vec<PeerId>, protocol: ProtocolName) { + let addrs = peer_ids + .into_iter() + .map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into()))) + .collect(); + let result = self.add_peers_to_reserved_set(protocol, addrs); if let Err(err) = result { log::error!(target: "gossip", "add_set_reserved failed: {}", err); } } - fn remove_set_reserved(&self, who: PeerId, protocol: ProtocolName) { - let result = self.remove_peers_from_reserved_set(protocol, iter::once(who).collect()); + fn remove_set_reserved(&self, peer_id: PeerId, protocol: ProtocolName) { + let result = self.remove_peers_from_reserved_set(protocol, iter::once(peer_id).collect()); if let Err(err) = result { log::error!(target: "gossip", "remove_set_reserved failed: {}", err); } diff --git a/substrate/client/network/statement/src/lib.rs b/substrate/client/network/statement/src/lib.rs index df93788696e381b18e80cff73646bb1594636386..586a15cadd68eeb8d0615c01c7bbf33177a5113a 100644 --- a/substrate/client/network/statement/src/lib.rs +++ b/substrate/client/network/statement/src/lib.rs @@ -33,7 +33,8 @@ use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered, FutureExt} use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_network::{ config::{NonReservedPeerMode, SetConfig}, - error, multiaddr, + error, + multiaddr::{Multiaddr, Protocol}, peer_store::PeerStoreProvider, service::{ traits::{NotificationEvent, NotificationService, ValidationResult}, @@ -296,9 +297,19 @@ where fn handle_sync_event(&mut self, event: SyncEvent) { match event { - SyncEvent::PeerConnected(remote) => { - let addr = iter::once(multiaddr::Protocol::P2p(remote.into())) - .collect::<multiaddr::Multiaddr>(); + SyncEvent::InitialPeers(peer_ids) => { + let addrs = peer_ids + .into_iter() + .map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into()))) + .collect(); + let result = + self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs); + if let Err(err) = result { + log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err); + } + }, + SyncEvent::PeerConnected(peer_id) => { + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); let result = self.network.add_peers_to_reserved_set( self.protocol_name.clone(), iter::once(addr).collect(), @@ -307,10 +318,10 @@ where log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err); } }, - SyncEvent::PeerDisconnected(remote) => { + SyncEvent::PeerDisconnected(peer_id) => { let result = self.network.remove_peers_from_reserved_set( self.protocol_name.clone(), - iter::once(remote).collect(), + iter::once(peer_id).collect(), ); if let Err(err) = result { log::error!(target: LOG_TARGET, "Failed to remove reserved peer: {err}"); diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index 0c39ea0b93c040c17870fb7011fe667d4af0317c..4003361525e18c45d73cb95c20376c337251cade 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -656,7 +656,11 @@ where ToServiceCommand::SetSyncForkRequest(peers, hash, number) => { self.strategy.set_sync_fork_request(peers, &hash, number); }, - ToServiceCommand::EventStream(tx) => self.event_streams.push(tx), + ToServiceCommand::EventStream(tx) => { + let _ = tx + .unbounded_send(SyncEvent::InitialPeers(self.peers.keys().cloned().collect())); + self.event_streams.push(tx); + }, ToServiceCommand::RequestJustification(hash, number) => self.strategy.request_justification(&hash, number), ToServiceCommand::ClearJustificationRequests => diff --git a/substrate/client/network/sync/src/types.rs b/substrate/client/network/sync/src/types.rs index 5745a34378df68f65446953b55c8a0ce39f3c3d0..a72a2f7c1ffe475fc4c78263a0766be484af7663 100644 --- a/substrate/client/network/sync/src/types.rs +++ b/substrate/client/network/sync/src/types.rs @@ -127,6 +127,10 @@ where /// Syncing-related events that other protocols can subscribe to. pub enum SyncEvent { + /// All connected peers that the syncing implementation is tracking. + /// Always sent as the first message to the stream. + InitialPeers(Vec<PeerId>), + /// Peer that the syncing implementation is tracking connected. PeerConnected(PeerId), diff --git a/substrate/client/network/transactions/src/lib.rs b/substrate/client/network/transactions/src/lib.rs index 44fa702ef6d4f7bff2e4046728508922ab1fa00a..49f429a04ee2e61861d3871cfc042df92e23c5c0 100644 --- a/substrate/client/network/transactions/src/lib.rs +++ b/substrate/client/network/transactions/src/lib.rs @@ -35,7 +35,8 @@ use log::{debug, trace, warn}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; use sc_network::{ config::{NonReservedPeerMode, ProtocolId, SetConfig}, - error, multiaddr, + error, + multiaddr::{Multiaddr, Protocol}, peer_store::PeerStoreProvider, service::{ traits::{NotificationEvent, NotificationService, ValidationResult}, @@ -377,9 +378,19 @@ where fn handle_sync_event(&mut self, event: SyncEvent) { match event { - SyncEvent::PeerConnected(remote) => { - let addr = iter::once(multiaddr::Protocol::P2p(remote.into())) - .collect::<multiaddr::Multiaddr>(); + SyncEvent::InitialPeers(peer_ids) => { + let addrs = peer_ids + .into_iter() + .map(|peer_id| Multiaddr::empty().with(Protocol::P2p(peer_id.into()))) + .collect(); + let result = + self.network.add_peers_to_reserved_set(self.protocol_name.clone(), addrs); + if let Err(err) = result { + log::error!(target: LOG_TARGET, "Add reserved peers failed: {}", err); + } + }, + SyncEvent::PeerConnected(peer_id) => { + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); let result = self.network.add_peers_to_reserved_set( self.protocol_name.clone(), iter::once(addr).collect(), @@ -388,10 +399,10 @@ where log::error!(target: LOG_TARGET, "Add reserved peer failed: {}", err); } }, - SyncEvent::PeerDisconnected(remote) => { + SyncEvent::PeerDisconnected(peer_id) => { let result = self.network.remove_peers_from_reserved_set( self.protocol_name.clone(), - iter::once(remote).collect(), + iter::once(peer_id).collect(), ); if let Err(err) = result { log::error!(target: LOG_TARGET, "Remove reserved peer failed: {}", err);