diff --git a/substrate/core/network/src/config.rs b/substrate/core/network/src/config.rs index 965aeba8c9f35c0b54ae1acdcc9c7d2ce7206e05..05cf27ca7550532786b6385f1234b86baffff418 100644 --- a/substrate/core/network/src/config.rs +++ b/substrate/core/network/src/config.rs @@ -39,7 +39,7 @@ pub struct Params<B: BlockT, S, H: ExHashT> { /// On-demand service reference. pub on_demand: Option<Arc<OnDemandService<B>>>, /// Transaction pool. - pub transaction_pool: Arc<TransactionPool<H, B>>, + pub transaction_pool: Arc<dyn TransactionPool<H, B>>, /// Protocol specialization. pub specialization: S, } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index cd7a14bdbbcbe4e79d2296801f6ec162bb64c178..c9e56362d1108f89da689a8711dfdb2ca2d3159d 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -35,7 +35,6 @@ use crate::specialization::NetworkSpecialization; use crate::sync::{ChainSync, Context as SyncContext, Status as SyncStatus, SyncState}; use crate::service::{TransactionPool, ExHashT}; use crate::config::{ProtocolConfig, Roles}; -use parking_lot::RwLock; use rustc_hex::ToHex; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; @@ -92,10 +91,6 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { context_data: ContextData<B, H>, // Connected peers pending Status message. handshaking_peers: HashMap<PeerId, HandshakingPeer>, - // Connected peers from whom we received a Status message, - // similar to context_data.peers but shared with the SyncProvider. - connected_peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>, - transaction_pool: Arc<TransactionPool<H, B>>, } /// A peer from whom we have received a Status message. @@ -261,18 +256,14 @@ struct ContextData<B: BlockT, H: ExHashT> { // All connected peers peers: HashMap<PeerId, Peer<B, H>>, pub chain: Arc<Client<B>>, - pub finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>, } impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Create a new instance. pub fn new( - connected_peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>, config: ProtocolConfig, chain: Arc<Client<B>>, - finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>, on_demand: Option<Arc<OnDemandService<B>>>, - transaction_pool: Arc<TransactionPool<H, B>>, specialization: S, ) -> error::Result<Protocol<B, S, H>> { let info = chain.info()?; @@ -284,7 +275,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { context_data: ContextData { peers: HashMap::new(), chain, - finality_proof_provider, }, on_demand, genesis_hash: info.chain.genesis_hash, @@ -292,8 +282,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { specialization: specialization, consensus_gossip: ConsensusGossip::new(), handshaking_peers: HashMap::new(), - connected_peers, - transaction_pool: transaction_pool, }) } @@ -319,13 +307,17 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { self.sync.status().is_offline() } - pub fn poll(&mut self, network_out: &mut dyn NetworkOut<B>) -> Poll<void::Void, void::Void> { + pub fn poll( + &mut self, + network_out: &mut dyn NetworkOut<B>, + transaction_pool: &(impl TransactionPool<H, B> + ?Sized) + ) -> Poll<void::Void, void::Void> { while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { self.tick(network_out); } while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { - self.propagate_extrinsics(network_out); + self.propagate_extrinsics(network_out, transaction_pool); } Ok(Async::NotReady) @@ -364,19 +356,21 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { peer.info.best_hash = info.best_hash; peer.info.best_number = info.best_number; } - let mut peers = self.connected_peers.write(); - if let Some(ref mut peer) = peers.get_mut(who) { - peer.peer_info.best_hash = info.best_hash; - peer.peer_info.best_number = info.best_number; - } } } + /// Returns information about all the peers we are connected to after the handshake message. + pub fn peers_info(&self) -> impl Iterator<Item = (&PeerId, &PeerInfo<B>)> { + self.context_data.peers.iter().map(|(id, peer)| (id, &peer.info)) + } + pub fn on_custom_message( &mut self, network_out: &mut dyn NetworkOut<B>, + transaction_pool: &(impl TransactionPool<H, B> + ?Sized), who: PeerId, - message: Message<B> + message: Message<B>, + finality_proof_provider: Option<&FinalityProofProvider<B>> ) -> CustomMessageOutcome<B> { match message { GenericMessage::Status(s) => self.on_status_message(network_out, who, s), @@ -397,7 +391,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { self.on_block_announce(network_out, who.clone(), announce); self.update_peer_info(&who); }, - GenericMessage::Transactions(m) => self.on_extrinsics(network_out, who, m), + GenericMessage::Transactions(m) => + self.on_extrinsics(network_out, transaction_pool, who, m), GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request), GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(who, response), GenericMessage::RemoteReadRequest(request) => self.on_remote_read_request(network_out, who, request), @@ -406,7 +401,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { GenericMessage::RemoteHeaderResponse(response) => self.on_remote_header_response(who, response), GenericMessage::RemoteChangesRequest(request) => self.on_remote_changes_request(network_out, who, request), GenericMessage::RemoteChangesResponse(response) => self.on_remote_changes_response(who, response), - GenericMessage::FinalityProofRequest(request) => self.on_finality_proof_request(network_out, who, request), + GenericMessage::FinalityProofRequest(request) => + self.on_finality_proof_request(network_out, who, request, finality_proof_provider), GenericMessage::FinalityProofResponse(response) => return self.on_finality_proof_response(network_out, who, response), GenericMessage::Consensus(msg) => { @@ -489,7 +485,6 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { // lock all the the peer lists so that add/remove peer events are in order let removed = { self.handshaking_peers.remove(&peer); - self.connected_peers.write().remove(&peer); self.context_data.peers.remove(&peer) }; if let Some(peer_data) = removed { @@ -734,16 +729,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { let info = match self.handshaking_peers.remove(&who) { Some(_handshaking) => { - let peer_info = PeerInfo { + PeerInfo { protocol_version: status.version, roles: status.roles, best_hash: status.best_hash, best_number: status.best_number - }; - self.connected_peers - .write() - .insert(who.clone(), ConnectedPeer { peer_info: peer_info.clone() }); - peer_info + } }, None => { error!(target: "sync", "Received status from previously unconnected node {}", who); @@ -780,6 +771,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { fn on_extrinsics( &mut self, network_out: &mut dyn NetworkOut<B>, + transaction_pool: &(impl TransactionPool<H, B> + ?Sized), who: PeerId, extrinsics: message::Transactions<B::Extrinsic> ) { @@ -791,7 +783,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who); if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { - if let Some(hash) = self.transaction_pool.import(&t) { + if let Some(hash) = transaction_pool.import(&t) { network_out.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE); peer.known_extrinsics.insert(hash); } else { @@ -802,7 +794,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { } /// Call when we must propagate ready extrinsics to peers. - pub fn propagate_extrinsics(&mut self, network_out: &mut dyn NetworkOut<B>) { + pub fn propagate_extrinsics( + &mut self, + network_out: &mut dyn NetworkOut<B>, + transaction_pool: &(impl TransactionPool<H, B> + ?Sized) + ) { debug!(target: "sync", "Propagating extrinsics"); // Accept transactions only when fully synced @@ -810,7 +806,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { return; } - let extrinsics = self.transaction_pool.transactions(); + let extrinsics = transaction_pool.transactions(); let mut propagated_to = HashMap::new(); for (who, peer) in self.context_data.peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics @@ -830,7 +826,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { network_out.send_message(who.clone(), GenericMessage::Transactions(to_send)) } } - self.transaction_pool.on_broadcasted(propagated_to); + + transaction_pool.on_broadcasted(propagated_to); } /// Make sure an important block is propagated to peers. @@ -1203,9 +1200,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { network_out: &mut dyn NetworkOut<B>, who: PeerId, request: message::FinalityProofRequest<B::Hash>, + finality_proof_provider: Option<&FinalityProofProvider<B>> ) { trace!(target: "sync", "Finality proof request from {} for {}", who, request.block); - let finality_proof = self.context_data.finality_proof_provider.as_ref() + let finality_proof = finality_proof_provider.as_ref() .ok_or_else(|| String::from("Finality provider is not configured")) .and_then(|provider| provider.prove_finality(request.block, &request.request).map_err(|e| e.to_string()) diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 1a966fc933c2b257f23fdac6e82ae8f3d11cc967..4516609c5e7f0ea33243f4ca48c61b60e029fca9 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -29,6 +29,7 @@ use peerset::PeersetHandle; use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use crate::chain::FinalityProofProvider; use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::message::Message; use crate::protocol::{self, Context, CustomMessageOutcome, Protocol, ConnectedPeer}; @@ -41,6 +42,8 @@ use tokio::runtime::Builder as RuntimeBuilder; /// Interval at which we send status updates on the SyncProvider status stream. const STATUS_INTERVAL: Duration = Duration::from_millis(5000); +/// Interval at which we update the `peers` field on the main thread. +const CONNECTED_PEERS_INTERVAL: Duration = Duration::from_millis(500); pub use network_libp2p::PeerId; @@ -53,8 +56,13 @@ pub trait SyncProvider<B: BlockT>: Send + Sync { fn status(&self) -> mpsc::UnboundedReceiver<ProtocolStatus<B>>; /// Get network state. fn network_state(&self) -> NetworkState; - /// Get currently connected peers - fn peers(&self) -> Vec<(PeerId, PeerInfo<B>)>; + + /// Get currently connected peers. + /// + /// > **Warning**: This method can return outdated information and should only ever be used + /// > when obtaining outdated information is acceptable. + fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo<B>)>; + /// Are we in the process of downloading the chain? fn is_major_syncing(&self) -> bool; } @@ -206,12 +214,9 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>> = Arc::new(Default::default()); let protocol = Protocol::new( - peers.clone(), params.config, params.chain, - params.finality_proof_provider, params.on_demand, - params.transaction_pool, params.specialization, )?; let versions: Vec<_> = ((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect(); @@ -220,7 +225,10 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> Service<B, S> { is_offline.clone(), is_major_syncing.clone(), protocol, + peers.clone(), import_queue.clone(), + params.transaction_pool, + params.finality_proof_provider, network_port, protocol_rx, status_sinks.clone(), @@ -392,7 +400,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>> SyncProvider<B> for Servi self.network.lock().state() } - fn peers(&self) -> Vec<(PeerId, PeerInfo<B>)> { + fn peers_debug_info(&self) -> Vec<(PeerId, PeerInfo<B>)> { let peers = (*self.peers.read()).clone(); peers.into_iter().map(|(idx, connected)| (idx, connected.peer_info)).collect() } @@ -514,7 +522,10 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( is_offline: Arc<AtomicBool>, is_major_syncing: Arc<AtomicBool>, protocol: Protocol<B, S, H>, + peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>, import_queue: Box<ImportQueue<B>>, + transaction_pool: Arc<dyn TransactionPool<H, B>>, + finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>, network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>, protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>, status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>, @@ -540,7 +551,10 @@ fn start_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( is_major_syncing, protocol, service_clone, + peers, import_queue, + transaction_pool, + finality_proof_provider, network_port, protocol_rx, status_sinks, @@ -567,7 +581,10 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( is_major_syncing: Arc<AtomicBool>, mut protocol: Protocol<B, S, H>, network_service: Arc<Mutex<NetworkService<Message<B>>>>, + peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<B>>>>, import_queue: Box<ImportQueue<B>>, + transaction_pool: Arc<dyn TransactionPool<H, B>>, + finality_proof_provider: Option<Arc<FinalityProofProvider<B>>>, mut network_port: mpsc::UnboundedReceiver<NetworkMsg<B>>, mut protocol_rx: mpsc::UnboundedReceiver<ProtocolMsg<B, S>>, status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>, @@ -589,6 +606,8 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( // Interval at which we send status updates on the `status_sinks`. let mut status_interval = tokio::timer::Interval::new_interval(STATUS_INTERVAL); + // Interval at which we update the `connected_peers` Arc. + let mut connected_peers_interval = tokio::timer::Interval::new_interval(CONNECTED_PEERS_INTERVAL); futures::future::poll_fn(move || { while let Ok(Async::Ready(_)) = status_interval.poll() { @@ -596,7 +615,14 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( status_sinks.lock().retain(|sink| sink.unbounded_send(status.clone()).is_ok()); } - match protocol.poll(&mut Ctxt(&mut network_service.lock(), &peerset)) { + while let Ok(Async::Ready(_)) = connected_peers_interval.poll() { + let infos = protocol.peers_info().map(|(id, info)| { + (id.clone(), ConnectedPeer { peer_info: info.clone() }) + }).collect(); + *peers.write() = infos; + } + + match protocol.poll(&mut Ctxt(&mut network_service.lock(), &peerset), &*transaction_pool) { Ok(Async::Ready(v)) => void::unreachable(v), Ok(Async::NotReady) => {} Err(err) => void::unreachable(err), @@ -646,7 +672,7 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( protocol.gossip_consensus_message(&mut network_out, topic, engine_id, message, recipient), ProtocolMsg::BlocksProcessed(hashes, has_error) => protocol.blocks_processed(&mut network_out, hashes, has_error), - ProtocolMsg::RestartSync => + ProtocolMsg::RestartSync => protocol.restart(&mut network_out), ProtocolMsg::AnnounceBlock(hash) => protocol.announce_block(&mut network_out, hash), @@ -664,7 +690,8 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( protocol.request_finality_proof(&mut network_out, &hash, number), ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => protocol.finality_proof_import_result(requested_block, finalziation_result), - ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(&mut network_out), + ProtocolMsg::PropagateExtrinsics => + protocol.propagate_extrinsics(&mut network_out, &*transaction_pool), #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Tick => protocol.tick(&mut network_out), #[cfg(any(test, feature = "test-helpers"))] @@ -692,7 +719,13 @@ fn run_thread<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT>( CustomMessageOutcome::None }, Ok(Async::Ready(Some(NetworkServiceEvent::CustomMessage { peer_id, message, .. }))) => - protocol.on_custom_message(&mut network_out, peer_id, message), + protocol.on_custom_message( + &mut network_out, + &*transaction_pool, + peer_id, + message, + finality_proof_provider.as_ref().map(|p| &**p) + ), Ok(Async::Ready(Some(NetworkServiceEvent::Clogged { peer_id, messages, .. }))) => { debug!(target: "sync", "{} clogging messages:", messages.len()); for msg in messages.into_iter().take(5) { diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 82d54d40b0c5086b87e2a3faa65bc3478082cbf0..7cbbbf1370471ede5f21a102194bbb17d6000818 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -44,7 +44,7 @@ use crate::message::Message; use network_libp2p::PeerId; use parking_lot::{Mutex, RwLock}; use primitives::{H256, sr25519::Public as AuthorityId, Blake2Hasher}; -use crate::protocol::{ConnectedPeer, Context, Protocol, ProtocolStatus, CustomMessageOutcome, NetworkOut}; +use crate::protocol::{Context, Protocol, ProtocolStatus, CustomMessageOutcome, NetworkOut}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{AuthorityIdFor, Block as BlockT, Digest, DigestItem, Header, NumberFor}; use runtime_primitives::{Justification, ConsensusEngineId}; @@ -274,7 +274,6 @@ impl<S: NetworkSpecialization<Block>> Link<Block> for TestLink<S> { } pub struct Peer<D, S: NetworkSpecialization<Block>> { - peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>, peer_id: PeerId, client: PeersClient, net_proto_channel: ProtocolChannel<S>, @@ -411,7 +410,7 @@ impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> { Ok(Async::Ready(None)) => None, })) }); - + if self.use_tokio { fut.wait() } else { @@ -423,7 +422,6 @@ impl<S: NetworkSpecialization<Block>> ProtocolChannel<S> { impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { fn new( protocol_status: Arc<RwLock<ProtocolStatus<Block>>>, - peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>>, client: PeersClient, import_queue: Box<BasicQueue<Block>>, use_tokio: bool, @@ -447,7 +445,6 @@ impl<D, S: NetworkSpecialization<Block>> Peer<D, S> { import_queue.start(Box::new(network_link)).expect("Test ImportQueue always starts"); Peer { protocol_status, - peers, peer_id: PeerId::random(), client, import_queue, @@ -792,6 +789,8 @@ pub trait TestNetFactory: Sized { &mut self, protocol_status: Arc<RwLock<ProtocolStatus<Block>>>, import_queue: Box<BasicQueue<Block>>, + tx_pool: EmptyTransactionPool, + finality_proof_provider: Option<Arc<FinalityProofProvider<Block>>>, mut protocol: Protocol<Block, Self::Specialization, Hash>, network_sender: mpsc::UnboundedSender<NetworkMsg<Block>>, mut network_to_protocol_rx: mpsc::UnboundedReceiver<FromNetworkMsg<Block>>, @@ -825,7 +824,13 @@ pub trait TestNetFactory: Sized { CustomMessageOutcome::None }, Some(FromNetworkMsg::CustomMessage(peer_id, message)) => - protocol.on_custom_message(&mut Ctxt(&network_sender), peer_id, message), + protocol.on_custom_message( + &mut Ctxt(&network_sender), + &tx_pool, + peer_id, + message, + finality_proof_provider.as_ref().map(|p| &**p) + ), Some(FromNetworkMsg::Synchronize) => { let _ = network_sender.unbounded_send(NetworkMsg::Synchronized); CustomMessageOutcome::None @@ -876,7 +881,7 @@ pub trait TestNetFactory: Sized { ), ProtocolMsg::BlocksProcessed(hashes, has_error) => protocol.blocks_processed(&mut Ctxt(&network_sender), hashes, has_error), - ProtocolMsg::RestartSync => + ProtocolMsg::RestartSync => protocol.restart(&mut Ctxt(&network_sender)), ProtocolMsg::AnnounceBlock(hash) => protocol.announce_block(&mut Ctxt(&network_sender), hash), @@ -894,7 +899,8 @@ pub trait TestNetFactory: Sized { protocol.request_finality_proof(&mut Ctxt(&network_sender), &hash, number), ProtocolMsg::FinalityProofImportResult(requested_block, finalziation_result) => protocol.finality_proof_import_result(requested_block, finalziation_result), - ProtocolMsg::PropagateExtrinsics => protocol.propagate_extrinsics(&mut Ctxt(&network_sender)), + ProtocolMsg::PropagateExtrinsics => + protocol.propagate_extrinsics(&mut Ctxt(&network_sender), &tx_pool), #[cfg(any(test, feature = "test-helpers"))] ProtocolMsg::Tick => protocol.tick(&mut Ctxt(&network_sender)), #[cfg(any(test, feature = "test-helpers"))] @@ -905,7 +911,7 @@ pub trait TestNetFactory: Sized { } } - if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender)).unwrap() { + if let Async::Ready(_) = protocol.poll(&mut Ctxt(&network_sender), &tx_pool).unwrap() { return Ok(Async::Ready(())) } @@ -930,7 +936,6 @@ pub trait TestNetFactory: Sized { /// Add a full peer. fn add_full_peer(&mut self, config: &ProtocolConfig) { let client = Arc::new(test_client::new()); - let tx_pool = Arc::new(EmptyTransactionPool); let verifier = self.make_verifier(PeersClient::Full(client.clone()), config); let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) = self.make_block_import(PeersClient::Full(client.clone())); @@ -944,18 +949,14 @@ pub trait TestNetFactory: Sized { finality_proof_request_builder, )); let specialization = self::SpecializationFactory::create(); - let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>> = Arc::new(Default::default()); let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded(); let (protocol_sender, protocol_rx) = mpsc::unbounded(); let protocol = Protocol::new( - peers.clone(), config.clone(), client.clone(), - self.make_finality_proof_provider(PeersClient::Full(client.clone())), None, - tx_pool, specialization, ).unwrap(); @@ -963,13 +964,14 @@ pub trait TestNetFactory: Sized { self.add_peer( protocol_status.clone(), import_queue.clone(), + EmptyTransactionPool, + self.make_finality_proof_provider(PeersClient::Full(client.clone())), protocol, network_sender.clone(), network_to_protocol_rx, protocol_rx, Arc::new(Peer::new( protocol_status, - peers, PeersClient::Full(client), import_queue, self.uses_tokio(), @@ -988,7 +990,6 @@ pub trait TestNetFactory: Sized { config.roles = Roles::LIGHT; let client = Arc::new(test_client::new_light()); - let tx_pool = Arc::new(EmptyTransactionPool); let verifier = self.make_verifier(PeersClient::Light(client.clone()), &config); let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) = self.make_block_import(PeersClient::Light(client.clone())); @@ -1002,18 +1003,14 @@ pub trait TestNetFactory: Sized { finality_proof_request_builder, )); let specialization = self::SpecializationFactory::create(); - let peers: Arc<RwLock<HashMap<PeerId, ConnectedPeer<Block>>>> = Arc::new(Default::default()); let (network_to_protocol_sender, network_to_protocol_rx) = mpsc::unbounded(); let (protocol_sender, protocol_rx) = mpsc::unbounded(); let protocol = Protocol::new( - peers.clone(), config, client.clone(), - self.make_finality_proof_provider(PeersClient::Light(client.clone())), None, - tx_pool, specialization, ).unwrap(); @@ -1021,13 +1018,14 @@ pub trait TestNetFactory: Sized { self.add_peer( protocol_status.clone(), import_queue.clone(), + EmptyTransactionPool, + self.make_finality_proof_provider(PeersClient::Light(client.clone())), protocol, network_sender.clone(), network_to_protocol_rx, protocol_rx, Arc::new(Peer::new( protocol_status, - peers, PeersClient::Light(client), import_queue, self.uses_tokio(), diff --git a/substrate/core/network/src/test/sync.rs b/substrate/core/network/src/test/sync.rs index da42ae47582f1aa20f6862405c5bbaa1eb51f5d4..8462304e42f1582845fc9264ac399a987bf25925 100644 --- a/substrate/core/network/src/test/sync.rs +++ b/substrate/core/network/src/test/sync.rs @@ -45,7 +45,7 @@ fn sync_peers_works() { net.sync(); for peer in 0..3 { // Assert peers is up to date. - assert_eq!(net.peer(peer).peers.read().len(), 2); + assert_eq!(net.peer(peer).protocol_status.read().num_peers, 2); // And then disconnect. for other in 0..3 { if other != peer { @@ -56,8 +56,8 @@ fn sync_peers_works() { net.sync(); // Now peers are disconnected. for peer in 0..3 { - let peers = net.peer(peer).peers.read(); - assert_eq!(peers.len(), 0); + let status = net.peer(peer).protocol_status.read(); + assert_eq!(status.num_peers, 0); } } diff --git a/substrate/core/rpc/src/system/mod.rs b/substrate/core/rpc/src/system/mod.rs index 331d9cd85ba61739c3833c5ec2497f2cf2331988..46cbc99fabd610b5681d2f68b0364f042ef8bb34 100644 --- a/substrate/core/rpc/src/system/mod.rs +++ b/substrate/core/rpc/src/system/mod.rs @@ -110,14 +110,14 @@ impl<B: traits::Block> SystemApi<B::Hash, <B::Header as HeaderT>::Number> for Sy fn system_health(&self) -> Result<Health> { Ok(Health { - peers: self.sync.peers().len(), + peers: self.sync.peers_debug_info().len(), is_syncing: self.sync.is_major_syncing(), should_have_peers: self.should_have_peers, }) } fn system_peers(&self) -> Result<Vec<PeerInfo<B::Hash, <B::Header as HeaderT>::Number>>> { - Ok(self.sync.peers().into_iter().map(|(peer_id, p)| PeerInfo { + Ok(self.sync.peers_debug_info().into_iter().map(|(peer_id, p)| PeerInfo { peer_id: peer_id.to_base58(), roles: format!("{:?}", p.roles), protocol_version: p.protocol_version, diff --git a/substrate/core/rpc/src/system/tests.rs b/substrate/core/rpc/src/system/tests.rs index b4b71a7937af16e988e6e156a1bdecde1b05ecca..14cd421fd19e2770ab00d781a31344dc7f802e21 100644 --- a/substrate/core/rpc/src/system/tests.rs +++ b/substrate/core/rpc/src/system/tests.rs @@ -59,7 +59,7 @@ impl network::SyncProvider<Block> for Status { } } - fn peers(&self) -> Vec<(PeerId, NetworkPeerInfo<Block>)> { + fn peers_debug_info(&self) -> Vec<(PeerId, NetworkPeerInfo<Block>)> { let mut peers = vec![]; for _peer in 0..self.peers { peers.push( diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index 7b73c370eee0b889c159ae4720b6b19dc8de0723..dc4676dc0aff1536648c2561865978fdf76491f7 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -195,7 +195,7 @@ pub fn connectivity<F: ServiceFactory>(spec: FactoryChainSpec<F>) { service.network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } network.run_until_all_full(|_index, service| - service.network().peers().len() == NUM_NODES as usize - 1 + service.network().peers_debug_info().len() == NUM_NODES as usize - 1 ); network.runtime }; @@ -215,7 +215,7 @@ pub fn connectivity<F: ServiceFactory>(spec: FactoryChainSpec<F>) { address = node_id.clone(); } network.run_until_all_full(|_index, service| { - service.network().peers().len() == NUM_NODES as usize - 1 + service.network().peers_debug_info().len() == NUM_NODES as usize - 1 }); } temp.close().expect("Error removing temp dir");