// Copyright 2017-2019 Parity Technologies (UK) Ltd. // This file is part of Substrate. // Substrate is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Substrate is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Substrate. If not, see <http://www.gnu.org/licenses/>. //! Main entry point of the sc-network crate. //! //! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`]. //! The [`NetworkWorker`] *is* the network and implements the `Future` trait. It must be polled in //! order fo the network to advance. //! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an //! `Arc<NetworkService>` by calling [`NetworkWorker::service`]. //! //! The methods of the [`NetworkService`] are implemented by sending a message over a channel, //! which is then processed by [`NetworkWorker::poll`]. use std::{collections::{HashMap, HashSet}, fs, marker::PhantomData, io, path::Path}; use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; use sp_consensus::import_queue::{ImportQueue, Link}; use sp_consensus::import_queue::{BlockImportResult, BlockImportError}; use futures::{prelude::*, sync::mpsc}; use futures03::TryFutureExt as _; use log::{warn, error, info}; use libp2p::{PeerId, Multiaddr, kad::record}; use libp2p::core::{transport::boxed::Boxed, muxing::StreamMuxerBox}; use libp2p::swarm::NetworkBehaviour; use parking_lot::Mutex; use sc_peerset::PeersetHandle; use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; use crate::{behaviour::{Behaviour, BehaviourOut}, config::{parse_str_addr, parse_addr}}; use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer}; use crate::{transport, config::NonReservedPeerMode, ReputationChange}; use crate::config::{Params, TransportConfig}; use crate::error::Error; use crate::protocol::{self, Protocol, Context, PeerInfo}; use crate::protocol::{event::Event, light_dispatch::{AlwaysBadChecker, RequestData}}; use crate::protocol::specialization::NetworkSpecialization; use crate::protocol::sync::SyncState; /// Minimum Requirements for a Hash within Networking pub trait ExHashT: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} impl<T> ExHashT for T where T: std::hash::Hash + Eq + std::fmt::Debug + Clone + Send + Sync + 'static {} /// Transaction pool interface pub trait TransactionPool<H: ExHashT, B: BlockT>: Send + Sync { /// Get transactions from the pool that are ready to be propagated. fn transactions(&self) -> Vec<(H, B::Extrinsic)>; /// Get hash of transaction. fn hash_of(&self, transaction: &B::Extrinsic) -> H; /// Import a transaction into the pool. /// /// Peer reputation is changed by reputation_change if transaction is accepted by the pool. fn import( &self, report_handle: ReportHandle, who: PeerId, reputation_change_good: ReputationChange, reputation_change_bad: ReputationChange, transaction: B::Extrinsic, ); /// Notify the pool about transactions broadcast. fn on_broadcasted(&self, propagations: HashMap<H, Vec<String>>); } /// A cloneable handle for reporting cost/benefits of peers. #[derive(Clone)] pub struct ReportHandle { inner: PeersetHandle, // wraps it so we don't have to worry about breaking API. } impl From<PeersetHandle> for ReportHandle { fn from(peerset_handle: PeersetHandle) -> Self { ReportHandle { inner: peerset_handle } } } impl ReportHandle { /// Report a given peer as either beneficial (+) or costly (-) according to the /// given scalar. pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { self.inner.report_peer(who, cost_benefit); } } /// Substrate network service. Handles network IO and manages connectivity. pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> { /// Number of peers we're connected to. num_connected: Arc<AtomicUsize>, /// The local external addresses. external_addresses: Arc<Mutex<Vec<Multiaddr>>>, /// Are we actively catching up with the chain? is_major_syncing: Arc<AtomicBool>, /// Local copy of the `PeerId` of the local node. local_peer_id: PeerId, /// Bandwidth logging system. Can be queried to know the average bandwidth consumed. bandwidth: Arc<transport::BandwidthSinks>, /// Peerset manager (PSM); manages the reputation of nodes and indicates the network which /// nodes it should be connected to or not. peerset: PeersetHandle, /// Channel that sends messages to the actual worker. to_worker: mpsc::UnboundedSender<ServiceToWorkerMsg<B, S>>, /// Marker to pin the `H` generic. Serves no purpose except to not break backwards /// compatibility. _marker: PhantomData<H>, } impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkWorker<B, S, H> { /// Creates the network service. /// /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. pub fn new(params: Params<B, S, H>) -> Result<NetworkWorker<B, S, H>, Error> { let (to_worker, from_worker) = mpsc::unbounded(); if let Some(ref path) = params.network_config.net_config_path { fs::create_dir_all(Path::new(path))?; } // List of multiaddresses that we know in the network. let mut known_addresses = Vec::new(); let mut bootnodes = Vec::new(); let mut reserved_nodes = Vec::new(); // Process the bootnodes. for bootnode in params.network_config.boot_nodes.iter() { match parse_str_addr(bootnode) { Ok((peer_id, addr)) => { bootnodes.push(peer_id.clone()); known_addresses.push((peer_id, addr)); }, Err(_) => warn!(target: "sub-libp2p", "Not a valid bootnode address: {}", bootnode), } } // Check for duplicate bootnodes. known_addresses.iter() .try_for_each(|(peer_id, addr)| if let Some(other) = known_addresses .iter() .find(|o| o.1 == *addr && o.0 != *peer_id) { Err(Error::DuplicateBootnode { address: addr.clone(), first_id: peer_id.clone(), second_id: other.0.clone(), }) } else { Ok(()) } )?; // Initialize the reserved peers. for reserved in params.network_config.reserved_nodes.iter() { if let Ok((peer_id, addr)) = parse_str_addr(reserved) { reserved_nodes.push(peer_id.clone()); known_addresses.push((peer_id, addr)); } else { warn!(target: "sub-libp2p", "Not a valid reserved node address: {}", reserved); } } let peerset_config = sc_peerset::PeersetConfig { in_peers: params.network_config.in_peers, out_peers: params.network_config.out_peers, bootnodes, reserved_only: params.network_config.non_reserved_mode == NonReservedPeerMode::Deny, reserved_nodes, }; // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; let local_public = local_identity.public(); let local_peer_id = local_public.clone().into_peer_id(); info!(target: "sub-libp2p", "Local node identity is: {}", local_peer_id.to_base58()); let num_connected = Arc::new(AtomicUsize::new(0)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let (protocol, peerset_handle) = Protocol::new( protocol::ProtocolConfig { roles: params.roles, max_parallel_downloads: params.network_config.max_parallel_downloads, }, params.chain, params.on_demand.as_ref().map(|od| od.checker().clone()) .unwrap_or(Arc::new(AlwaysBadChecker)), params.specialization, params.transaction_pool, params.finality_proof_provider, params.finality_proof_request_builder, params.protocol_id, peerset_config, params.block_announce_validator )?; // Build the swarm. let (mut swarm, bandwidth) = { let user_agent = format!( "{} ({})", params.network_config.client_version, params.network_config.node_name ); let behaviour = Behaviour::new( protocol, user_agent, local_public, known_addresses, match params.network_config.transport { TransportConfig::MemoryOnly => false, TransportConfig::Normal { enable_mdns, .. } => enable_mdns, }, match params.network_config.transport { TransportConfig::MemoryOnly => false, TransportConfig::Normal { allow_private_ipv4, .. } => allow_private_ipv4, }, ); let (transport, bandwidth) = { let (config_mem, config_wasm) = match params.network_config.transport { TransportConfig::MemoryOnly => (true, None), TransportConfig::Normal { wasm_external_transport, .. } => (false, wasm_external_transport) }; transport::build_transport(local_identity, config_mem, config_wasm) }; (Swarm::<B, S, H>::new(transport, behaviour, local_peer_id.clone()), bandwidth) }; // Listen on multiaddresses. for addr in ¶ms.network_config.listen_addresses { if let Err(err) = Swarm::<B, S, H>::listen_on(&mut swarm, addr.clone()) { warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err) } } // Add external addresses. for addr in ¶ms.network_config.public_addresses { Swarm::<B, S, H>::add_external_address(&mut swarm, addr.clone()); } let external_addresses = Arc::new(Mutex::new(Vec::new())); let service = Arc::new(NetworkService { bandwidth, external_addresses: external_addresses.clone(), num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), peerset: peerset_handle, local_peer_id, to_worker: to_worker.clone(), _marker: PhantomData, }); Ok(NetworkWorker { external_addresses, num_connected, is_major_syncing, network_service: swarm, service, import_queue: params.import_queue, from_worker, light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), event_streams: Vec::new(), }) } /// Returns the downloaded bytes per second averaged over the past few seconds. pub fn average_download_per_sec(&self) -> u64 { self.service.bandwidth.average_download_per_sec() } /// Returns the uploaded bytes per second averaged over the past few seconds. pub fn average_upload_per_sec(&self) -> u64 { self.service.bandwidth.average_upload_per_sec() } /// Returns the number of peers we're connected to. pub fn num_connected_peers(&self) -> usize { self.network_service.user_protocol().num_connected_peers() } /// Returns the number of peers we're connected to and that are being queried. pub fn num_active_peers(&self) -> usize { self.network_service.user_protocol().num_active_peers() } /// Current global sync state. pub fn sync_state(&self) -> SyncState { self.network_service.user_protocol().sync_state() } /// Target sync block number. pub fn best_seen_block(&self) -> Option<NumberFor<B>> { self.network_service.user_protocol().best_seen_block() } /// Number of peers participating in syncing. pub fn num_sync_peers(&self) -> u32 { self.network_service.user_protocol().num_sync_peers() } /// Number of blocks in the import queue. pub fn num_queued_blocks(&self) -> u32 { self.network_service.user_protocol().num_queued_blocks() } /// Adds an address for a node. pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) { self.network_service.add_known_address(peer_id, addr); } /// Return a `NetworkService` that can be shared through the code base and can be used to /// manipulate the worker. pub fn service(&self) -> &Arc<NetworkService<B, S, H>> { &self.service } /// You must call this when a new block is imported by the client. pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, data: Vec<u8>, is_best: bool) { self.network_service.user_protocol_mut().on_block_imported(hash, &header, data, is_best); } /// You must call this when a new block is finalized by the client. pub fn on_block_finalized(&mut self, hash: B::Hash, header: B::Header) { self.network_service.user_protocol_mut().on_block_finalized(hash, &header); } /// Get network state. /// /// **Note**: Use this only for debugging. This API is unstable. There are warnings literaly /// everywhere about this. Please don't use this function to retrieve actual information. pub fn network_state(&mut self) -> NetworkState { let swarm = &mut self.network_service; let open = swarm.user_protocol().open_peers().cloned().collect::<Vec<_>>(); let connected_peers = { let swarm = &mut *swarm; open.iter().filter_map(move |peer_id| { let known_addresses = NetworkBehaviour::addresses_of_peer(&mut **swarm, peer_id) .into_iter().collect(); let endpoint = if let Some(e) = swarm.node(peer_id).map(|i| i.endpoint()) { e.clone().into() } else { error!(target: "sub-libp2p", "Found state inconsistency between custom protocol \ and debug information about {:?}", peer_id); return None }; Some((peer_id.to_base58(), NetworkStatePeer { endpoint, version_string: swarm.node(peer_id) .and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), latest_ping_time: swarm.node(peer_id).and_then(|i| i.latest_ping()), enabled: swarm.user_protocol().is_enabled(&peer_id), open: swarm.user_protocol().is_open(&peer_id), known_addresses, })) }).collect() }; let not_connected_peers = { let swarm = &mut *swarm; let list = swarm.known_peers().filter(|p| open.iter().all(|n| n != *p)) .cloned().collect::<Vec<_>>(); list.into_iter().map(move |peer_id| { (peer_id.to_base58(), NetworkStateNotConnectedPeer { version_string: swarm.node(&peer_id) .and_then(|i| i.client_version().map(|s| s.to_owned())).clone(), latest_ping_time: swarm.node(&peer_id).and_then(|i| i.latest_ping()), known_addresses: NetworkBehaviour::addresses_of_peer(&mut **swarm, &peer_id) .into_iter().collect(), }) }).collect() }; NetworkState { peer_id: Swarm::<B, S, H>::local_peer_id(&swarm).to_base58(), listened_addresses: Swarm::<B, S, H>::listeners(&swarm).cloned().collect(), external_addresses: Swarm::<B, S, H>::external_addresses(&swarm).cloned().collect(), average_download_per_sec: self.service.bandwidth.average_download_per_sec(), average_upload_per_sec: self.service.bandwidth.average_upload_per_sec(), connected_peers, not_connected_peers, peerset: swarm.user_protocol_mut().peerset_debug_info(), } } /// Get currently connected peers. pub fn peers_debug_info(&mut self) -> Vec<(PeerId, PeerInfo<B>)> { self.network_service.user_protocol_mut() .peers_info() .map(|(id, info)| (id.clone(), info.clone())) .collect() } /// Removes a `PeerId` from the list of reserved peers. pub fn remove_reserved_peer(&self, peer: PeerId) { self.service.remove_reserved_peer(peer); } /// Adds a `PeerId` and its address as reserved. The string should encode the address /// and peer ID of the remote node. pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> { self.service.add_reserved_peer(peer) } } impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkService<B, S, H> { /// Writes a message on an open notifications channel. Has no effect if the notifications /// channel with this protocol name is closed. /// /// > **Note**: The reason why this is a no-op in the situation where we have no channel is /// > that we don't guarantee message delivery anyway. Networking issues can cause /// > connections to drop at any time, and higher-level logic shouldn't differentiate /// > between the remote voluntarily closing a substream or a network error /// > preventing the message from being delivered. /// /// The protocol must have been registered with `register_notifications_protocol`. /// pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::WriteNotification { target, engine_id, message, }); } /// Returns a stream containing the events that happen on the network. /// /// If this method is called multiple times, the events are duplicated. /// /// The stream never ends (unless the `NetworkWorker` gets shut down). pub fn event_stream(&self) -> impl Stream<Item = Event, Error = ()> { // Note: when transitioning to stable futures, remove the `Error` entirely let (tx, rx) = mpsc::unbounded(); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); rx } /// Registers a new notifications protocol. /// /// After that, you can call `write_notifications`. /// /// Please call `event_stream` before registering a protocol, otherwise you may miss events /// about the protocol that you have registered. /// /// You are very strongly encouraged to call this method very early on. Any connection open /// will retain the protocols that were registered then, and not any new one. pub fn register_notifications_protocol( &self, engine_id: ConsensusEngineId, ) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, }); } /// You must call this when new transactons are imported by the transaction pool. /// /// The latest transactions will be fetched from the `TransactionPool` that was passed at /// initialization as part of the configuration. pub fn trigger_repropagate(&self) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsics); } /// Make sure an important block is propagated to peers. /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. This function forces such an announcement. pub fn announce_block(&self, hash: B::Hash, data: Vec<u8>) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AnnounceBlock(hash, data)); } /// Report a given peer as either beneficial (+) or costly (-) according to the /// given scalar. pub fn report_peer(&self, who: PeerId, cost_benefit: ReputationChange) { self.peerset.report_peer(who, cost_benefit); } /// Disconnect from a node as soon as possible. /// /// This triggers the same effects as if the connection had closed itself spontaneously. pub fn disconnect_peer(&self, who: PeerId) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(who)); } /// Request a justification for the given block from the network. /// /// On success, the justification will be passed to the import queue that was part at /// initialization as part of the configuration. pub fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) { let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::RequestJustification(hash.clone(), number)); } /// Execute a closure with the chain-specific network specialization. pub fn with_spec<F>(&self, f: F) where F: FnOnce(&mut S, &mut dyn Context<B>) + Send + 'static { let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::ExecuteWithSpec(Box::new(f))); } /// Are we in the process of downloading the chain? pub fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) } /// Start getting a value from the DHT. /// /// This will generate either a `ValueFound` or a `ValueNotFound` event and pass it as an /// item on the [`NetworkWorker`] stream. pub fn get_value(&self, key: &record::Key) { let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::GetValue(key.clone())); } /// Start putting a value in the DHT. /// /// This will generate either a `ValuePut` or a `ValuePutFailed` event and pass it as an /// item on the [`NetworkWorker`] stream. pub fn put_value(&self, key: record::Key, value: Vec<u8>) { let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::PutValue(key, value)); } /// Connect to unreserved peers and allow unreserved peers to connect. pub fn accept_unreserved_peers(&self) { self.peerset.set_reserved_only(false); } /// Disconnect from unreserved peers and deny new unreserved peers to connect. pub fn deny_unreserved_peers(&self) { self.peerset.set_reserved_only(true); } /// Removes a `PeerId` from the list of reserved peers. pub fn remove_reserved_peer(&self, peer: PeerId) { self.peerset.remove_reserved_peer(peer); } /// Adds a `PeerId` and its address as reserved. The string should encode the address /// and peer ID of the remote node. pub fn add_reserved_peer(&self, peer: String) -> Result<(), String> { let (peer_id, addr) = parse_str_addr(&peer).map_err(|e| format!("{:?}", e))?; self.peerset.add_reserved_peer(peer_id.clone()); let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); Ok(()) } /// Configure an explicit fork sync request. /// Note that this function should not be used for recent blocks. /// Sync should be able to download all the recent forks normally. /// `set_sync_fork_request` should only be used if external code detects that there's /// a stale fork missing. /// Passing empty `peers` set effectively removes the sync request. pub fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) { let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number)); } /// Modify a peerset priority group. pub fn set_priority_group(&self, group_id: String, peers: HashSet<Multiaddr>) -> Result<(), String> { let peers = peers.into_iter().map(|p| { parse_addr(p).map_err(|e| format!("{:?}", e)) }).collect::<Result<Vec<(PeerId, Multiaddr)>, String>>()?; let peer_ids = peers.iter().map(|(peer_id, _addr)| peer_id.clone()).collect(); self.peerset.set_priority_group(group_id, peer_ids); for (peer_id, addr) in peers.into_iter() { let _ = self .to_worker .unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr)); } Ok(()) } /// Returns the number of peers we're connected to. pub fn num_connected(&self) -> usize { self.num_connected.load(Ordering::Relaxed) } } impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> sp_consensus::SyncOracle for NetworkService<B, S, H> { fn is_major_syncing(&mut self) -> bool { NetworkService::is_major_syncing(self) } fn is_offline(&mut self) -> bool { self.num_connected.load(Ordering::Relaxed) == 0 } } impl<'a, B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> sp_consensus::SyncOracle for &'a NetworkService<B, S, H> { fn is_major_syncing(&mut self) -> bool { NetworkService::is_major_syncing(self) } fn is_offline(&mut self) -> bool { self.num_connected.load(Ordering::Relaxed) == 0 } } /// Trait for providing information about the local network state pub trait NetworkStateInfo { /// Returns the local external addresses. fn external_addresses(&self) -> Vec<Multiaddr>; /// Returns the local Peer ID. fn local_peer_id(&self) -> PeerId; } impl<B, S, H> NetworkStateInfo for NetworkService<B, S, H> where B: sp_runtime::traits::Block, S: NetworkSpecialization<B>, H: ExHashT, { /// Returns the local external addresses. fn external_addresses(&self) -> Vec<Multiaddr> { self.external_addresses.lock().clone() } /// Returns the local Peer ID. fn local_peer_id(&self) -> PeerId { self.local_peer_id.clone() } } /// Messages sent from the `NetworkService` to the `NetworkWorker`. /// /// Each entry corresponds to a method of `NetworkService`. enum ServiceToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> { PropagateExtrinsics, RequestJustification(B::Hash, NumberFor<B>), AnnounceBlock(B::Hash, Vec<u8>), ExecuteWithSpec(Box<dyn FnOnce(&mut S, &mut dyn Context<B>) + Send>), GetValue(record::Key), PutValue(record::Key, Vec<u8>), AddKnownAddress(PeerId, Multiaddr), SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>), EventStream(mpsc::UnboundedSender<Event>), WriteNotification { message: Vec<u8>, engine_id: ConsensusEngineId, target: PeerId, }, RegisterNotifProtocol { engine_id: ConsensusEngineId, }, DisconnectPeer(PeerId), } /// Main network worker. Must be polled in order for the network to advance. /// /// You are encouraged to poll this in a separate background thread or task. #[must_use = "The NetworkWorker must be polled in order for the network to work"] pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> { /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. external_addresses: Arc<Mutex<Vec<Multiaddr>>>, /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. num_connected: Arc<AtomicUsize>, /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. is_major_syncing: Arc<AtomicBool>, /// The network service that can be extracted and shared through the codebase. service: Arc<NetworkService<B, S, H>>, /// The *actual* network. network_service: Swarm<B, S, H>, /// The import queue that was passed as initialization. import_queue: Box<dyn ImportQueue<B>>, /// Messages from the `NetworkService` and that must be processed. from_worker: mpsc::UnboundedReceiver<ServiceToWorkerMsg<B, S>>, /// Receiver for queries from the light client that must be processed. light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>, /// Senders for events that happen on the network. event_streams: Vec<mpsc::UnboundedSender<Event>>, } impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for NetworkWorker<B, S, H> { type Item = (); type Error = io::Error; fn poll(&mut self) -> Poll<Self::Item, Self::Error> { // Poll the import queue for actions to perform. let _ = futures03::future::poll_fn(|cx| { self.import_queue.poll_actions(cx, &mut NetworkLink { protocol: &mut self.network_service, }); std::task::Poll::Pending::<Result<(), ()>> }).compat().poll(); // Check for new incoming light client requests. if let Some(light_client_rqs) = self.light_client_rqs.as_mut() { while let Ok(Async::Ready(Some(rq))) = light_client_rqs.poll() { self.network_service.user_protocol_mut().add_light_client_request(rq); } } loop { // Process the next message coming from the `NetworkService`. let msg = match self.from_worker.poll() { Ok(Async::Ready(Some(msg))) => msg, Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Ok(Async::NotReady) => break, }; match msg { ServiceToWorkerMsg::ExecuteWithSpec(task) => { let protocol = self.network_service.user_protocol_mut(); let (mut context, spec) = protocol.specialization_lock(); task(spec, &mut context); }, ServiceToWorkerMsg::AnnounceBlock(hash, data) => self.network_service.user_protocol_mut().announce_block(hash, data), ServiceToWorkerMsg::RequestJustification(hash, number) => self.network_service.user_protocol_mut().request_justification(&hash, number), ServiceToWorkerMsg::PropagateExtrinsics => self.network_service.user_protocol_mut().propagate_extrinsics(), ServiceToWorkerMsg::GetValue(key) => self.network_service.get_value(&key), ServiceToWorkerMsg::PutValue(key, value) => self.network_service.put_value(key, value), ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => self.network_service.add_known_address(peer_id, addr), ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) => self.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender), ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => self.network_service.user_protocol_mut().write_notification(target, engine_id, message), ServiceToWorkerMsg::RegisterNotifProtocol { engine_id } => { let events = self.network_service.user_protocol_mut().register_notifications_protocol(engine_id); for event in events { self.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok()); } }, ServiceToWorkerMsg::DisconnectPeer(who) => self.network_service.user_protocol_mut().disconnect_peer(&who), } } loop { // Process the next action coming from the network. let poll_value = self.network_service.poll(); match poll_value { Ok(Async::NotReady) => break, Ok(Async::Ready(Some(BehaviourOut::BlockImport(origin, blocks)))) => self.import_queue.import_blocks(origin, blocks), Ok(Async::Ready(Some(BehaviourOut::JustificationImport(origin, hash, nb, justification)))) => self.import_queue.import_justification(origin, hash, nb, justification), Ok(Async::Ready(Some(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)))) => self.import_queue.import_finality_proof(origin, hash, nb, proof), Ok(Async::Ready(Some(BehaviourOut::Event(ev)))) => { self.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok()); }, Ok(Async::Ready(None)) => {}, Err(err) => { error!(target: "sync", "Error in the network: {:?}", err); return Err(err) } }; } // Update the variables shared with the `NetworkService`. self.num_connected.store(self.network_service.user_protocol_mut().num_connected_peers(), Ordering::Relaxed); { let external_addresses = Swarm::<B, S, H>::external_addresses(&self.network_service).cloned().collect(); *self.external_addresses.lock() = external_addresses; } self.is_major_syncing.store(match self.network_service.user_protocol_mut().sync_state() { SyncState::Idle => false, SyncState::Downloading => true, }, Ordering::Relaxed); Ok(Async::NotReady) } } /// The libp2p swarm, customized for our needs. type Swarm<B, S, H> = libp2p::swarm::Swarm< Boxed<(PeerId, StreamMuxerBox), io::Error>, Behaviour<B, S, H> >; // Implementation of `import_queue::Link` trait using the available local variables. struct NetworkLink<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { protocol: &'a mut Swarm<B, S, H>, } impl<'a, B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Link<B> for NetworkLink<'a, B, S, H> { fn blocks_processed( &mut self, imported: usize, count: usize, results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)> ) { self.protocol.user_protocol_mut().blocks_processed(imported, count, results) } fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) { self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success); if !success { info!("Invalid justification provided by {} for #{}", who, hash); self.protocol.user_protocol_mut().disconnect_peer(&who); self.protocol.user_protocol_mut().report_peer(who, ReputationChange::new_fatal("Invalid justification")); } } fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) { self.protocol.user_protocol_mut().request_justification(hash, number) } fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) { self.protocol.user_protocol_mut().request_finality_proof(hash, number) } fn finality_proof_imported( &mut self, who: PeerId, request_block: (B::Hash, NumberFor<B>), finalization_result: Result<(B::Hash, NumberFor<B>), ()>, ) { let success = finalization_result.is_ok(); self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result); if !success { info!("Invalid finality proof provided by {} for #{}", who, request_block.0); self.protocol.user_protocol_mut().disconnect_peer(&who); self.protocol.user_protocol_mut().report_peer(who, ReputationChange::new_fatal("Invalid finality proof")); } } }