diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index 7fc709e4037971ae1e842983f8d197ef6ce83f87..f083673ee21a88ca888e9dffb3c85d642cedb758 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -16,7 +16,7 @@ //! On-demand requests service. -use std::collections::VecDeque; +use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Weak}; use std::time::{Instant, Duration}; use futures::{Async, Future, Poll}; @@ -41,7 +41,10 @@ const RETRY_COUNT: usize = 1; /// On-demand service API. pub trait OnDemandService<Block: BlockT>: Send + Sync { /// When new node is connected. - fn on_connect(&self, peer: NodeIndex, role: service::Roles); + fn on_connect(&self, peer: NodeIndex, role: service::Roles, best_number: NumberFor<Block>); + + /// When block is announced by the peer. + fn on_block_announce(&self, peer: NodeIndex, best_number: NumberFor<Block>); /// When node is disconnected. fn on_disconnect(&self, peer: NodeIndex); @@ -90,6 +93,7 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> { pending_requests: VecDeque<Request<B>>, active_peers: LinkedHashMap<NodeIndex, Request<B>>, idle_peers: VecDeque<NodeIndex>, + best_blocks: HashMap<NodeIndex, NumberFor<B>>, } struct Request<Block: BlockT> { @@ -141,6 +145,7 @@ impl<B: BlockT, E> OnDemand<B, E> where pending_requests: VecDeque::new(), active_peers: LinkedHashMap::new(), idle_peers: VecDeque::new(), + best_blocks: HashMap::new(), }) } } @@ -206,13 +211,19 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where E: service::ExecuteInContext<B>, B::Header: HeaderT, { - fn on_connect(&self, peer: NodeIndex, role: service::Roles) { + fn on_connect(&self, peer: NodeIndex, role: service::Roles, best_number: NumberFor<B>) { if !role.intersects(service::Roles::FULL | service::Roles::AUTHORITY) { // TODO: correct? return; } let mut core = self.core.lock(); - core.add_peer(peer); + core.add_peer(peer, best_number); + core.dispatch(); + } + + fn on_block_announce(&self, peer: NodeIndex, best_number: NumberFor<B>) { + let mut core = self.core.lock(); + core.update_peer(peer, best_number); core.dispatch(); } @@ -329,11 +340,18 @@ impl<B, E> OnDemandCore<B, E> where E: service::ExecuteInContext<B>, B::Header: HeaderT, { - pub fn add_peer(&mut self, peer: NodeIndex) { + pub fn add_peer(&mut self, peer: NodeIndex, best_number: NumberFor<B>) { self.idle_peers.push_back(peer); + self.best_blocks.insert(peer, best_number); + } + + pub fn update_peer(&mut self, peer: NodeIndex, best_number: NumberFor<B>) { + self.best_blocks.insert(peer, best_number); } pub fn remove_peer(&mut self, peer: NodeIndex) { + self.best_blocks.remove(&peer); + if let Some(request) = self.active_peers.remove(&peer) { self.pending_requests.push_front(request); return; @@ -390,12 +408,35 @@ impl<B, E> OnDemandCore<B, E> where None => return, }; + let last_peer = self.idle_peers.back().cloned(); while !self.pending_requests.is_empty() { let peer = match self.idle_peers.pop_front() { Some(peer) => peer, None => return, }; + // check if request can (optimistically) be processed by the peer + let can_be_processed_by_peer = { + let request = self.pending_requests.front().expect("checked in loop condition; qed"); + let peer_best_block = self.best_blocks.get(&peer) + .expect("entries are inserted into best_blocks when peer is connected; + entries are removed from best_blocks when peer is disconnected; + peer is in idle_peers and thus connected; qed"); + request.required_block() <= *peer_best_block + }; + + if !can_be_processed_by_peer { + // return peer to the back of the queue + self.idle_peers.push_back(peer); + + // we have enumerated all peers and noone can handle request + if Some(peer) == last_peer { + break; + } + + continue; + } + let mut request = self.pending_requests.pop_front().expect("checked in loop condition; qed"); request.timestamp = Instant::now(); trace!(target: "sync", "Dispatching remote request {} to peer {}", request.id, peer); @@ -407,6 +448,15 @@ impl<B, E> OnDemandCore<B, E> where } impl<Block: BlockT> Request<Block> { + pub fn required_block(&self) -> NumberFor<Block> { + match self.data { + RequestData::RemoteHeader(ref data, _) => data.block, + RequestData::RemoteRead(ref data, _) => *data.header.number(), + RequestData::RemoteCall(ref data, _) => *data.header.number(), + RequestData::RemoteChanges(ref data, _) => data.max_block.0, + } + } + pub fn message(&self) -> message::Message<Block> { match self.data { RequestData::RemoteHeader(ref data, _) => @@ -545,19 +595,24 @@ pub mod tests { #[test] fn knows_about_peers_roles() { let (_, on_demand) = dummy(true); - on_demand.on_connect(0, Roles::LIGHT); - on_demand.on_connect(1, Roles::FULL); - on_demand.on_connect(2, Roles::AUTHORITY); + on_demand.on_connect(0, Roles::LIGHT, 1000); + on_demand.on_connect(1, Roles::FULL, 2000); + on_demand.on_connect(2, Roles::AUTHORITY, 3000); assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>()); + assert_eq!(on_demand.core.lock().best_blocks.get(&1), Some(&2000)); + assert_eq!(on_demand.core.lock().best_blocks.get(&2), Some(&3000)); } #[test] fn disconnects_from_idle_peer() { let (_, on_demand) = dummy(true); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 100); assert_eq!(1, total_peers(&*on_demand)); + assert!(!on_demand.core.lock().best_blocks.is_empty()); + on_demand.on_disconnect(0); assert_eq!(0, total_peers(&*on_demand)); + assert!(on_demand.core.lock().best_blocks.is_empty()); } #[test] @@ -566,8 +621,8 @@ pub mod tests { let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Roles::FULL); - on_demand.on_connect(1, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); + on_demand.on_connect(1, Roles::FULL, 1000); assert_eq!(vec![0, 1], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>()); assert!(on_demand.core.lock().active_peers.is_empty()); @@ -593,7 +648,7 @@ pub mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); on_demand.remote_call(RemoteCallRequest { block: Default::default(), @@ -620,7 +675,7 @@ pub mod tests { retry_count: Some(1), }); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); receive_call_response(&*on_demand, &mut network, 0, 0); assert!(network.to_disconnect.contains(&0)); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); @@ -631,7 +686,7 @@ pub mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); receive_call_response(&*on_demand, &mut network, 0, 0); assert!(network.to_disconnect.contains(&0)); @@ -642,7 +697,7 @@ pub mod tests { let (_x, on_demand) = dummy(false); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); on_demand.remote_call(RemoteCallRequest { block: Default::default(), @@ -669,7 +724,7 @@ pub mod tests { let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); for i in 0..retry_count+1 { - on_demand.on_connect(i, Roles::FULL); + on_demand.on_connect(i, Roles::FULL, 1000); } let sync = Arc::new((Mutex::new(0), Mutex::new(0), Condvar::new())); @@ -708,7 +763,7 @@ pub mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); let response = on_demand.remote_call(RemoteCallRequest { block: Default::default(), @@ -731,7 +786,7 @@ pub mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); let response = on_demand.remote_read(RemoteReadRequest { header: dummy_header(), @@ -756,7 +811,7 @@ pub mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); let response = on_demand.remote_header(RemoteHeaderRequest { cht_root: Default::default(), @@ -787,7 +842,7 @@ pub mod tests { let (_x, on_demand) = dummy(true); let queue = RwLock::new(VecDeque::new()); let mut network = TestIo::new(&queue, None); - on_demand.on_connect(0, Roles::FULL); + on_demand.on_connect(0, Roles::FULL, 1000); let response = on_demand.remote_changes(RemoteChangesRequest { changes_trie_config: changes_trie_config(), @@ -810,4 +865,53 @@ pub mod tests { }); thread.join().unwrap(); } + + #[test] + fn does_not_sends_request_to_peer_who_has_no_required_block() { + let (_x, on_demand) = dummy(true); + let queue = RwLock::new(VecDeque::new()); + let mut network = TestIo::new(&queue, None); + + on_demand.on_connect(1, Roles::FULL, 100); + + on_demand.remote_header(RemoteHeaderRequest { + cht_root: Default::default(), + block: 200, + retry_count: None, + }); + on_demand.remote_header(RemoteHeaderRequest { + cht_root: Default::default(), + block: 250, + retry_count: None, + }); + on_demand.remote_header(RemoteHeaderRequest { + cht_root: Default::default(), + block: 250, + retry_count: None, + }); + + on_demand.on_connect(2, Roles::FULL, 150); + + assert_eq!(vec![1, 2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>()); + assert_eq!(on_demand.core.lock().pending_requests.len(), 3); + + on_demand.on_block_announce(1, 250); + + assert_eq!(vec![2], on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>()); + assert_eq!(on_demand.core.lock().pending_requests.len(), 2); + + on_demand.on_block_announce(2, 250); + + assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true)); + assert_eq!(on_demand.core.lock().pending_requests.len(), 1); + + on_demand.on_remote_header_response(&mut network, 1, message::RemoteHeaderResponse { + id: 0, + header: Some(dummy_header()), + proof: vec![], + }); + + assert!(!on_demand.core.lock().idle_peers.iter().any(|_| true)); + assert_eq!(on_demand.core.lock().pending_requests.len(), 0); + } } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index 3a558bd359e5a427483b9e946a4d9f942b3b4cac..9d3174122979b4c01b5bf94bbb68f2da2cf6f59b 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -46,6 +46,10 @@ pub (crate) const CURRENT_PACKET_COUNT: u8 = 1; // Maximum allowed entries in `BlockResponse` const MAX_BLOCK_DATA_RESPONSE: u32 = 128; +/// When light node connects to the full node and the full node is behind light node +/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it unuseful +/// and disconnect to free connection slot. +const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192; // Lock must always be taken in order declared here. pub struct Protocol<B: BlockT, S: Specialization<B>, H: ExHashT> { @@ -436,6 +440,16 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> { io.report_peer(who, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version))); return; } + if self.config.roles & Roles::LIGHT == Roles::LIGHT { + let self_best_block = self.context_data.chain.info().ok() + .and_then(|info| info.best_queued_number) + .unwrap_or_else(|| Zero::zero()); + let blocks_difference = self_best_block.as_().checked_sub(status.best_number.as_()).unwrap_or(0); + if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { + io.report_peer(who, Severity::Useless("Peer is far behind us and will unable to serve light requests")); + return; + } + } let peer = Peer { protocol_version: status.version, @@ -454,9 +468,9 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> { } let mut context = ProtocolContext::new(&self.context_data, io); + self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles, status.best_number)); self.sync.write().new_peer(&mut context, who); - self.specialization.write().on_connect(&mut context, who, status.clone()); - self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles)); + self.specialization.write().on_connect(&mut context, who, status); } /// Called when peer sends us new extrinsics @@ -559,6 +573,7 @@ impl<B: BlockT, S: Specialization<B>, H: ExHashT> Protocol<B, S, H> { peer.known_blocks.insert(hash.clone()); } } + self.on_demand.as_ref().map(|s| s.on_block_announce(who, *header.number())); self.sync.write().on_block_announce(&mut ProtocolContext::new(&self.context_data, io), who, hash, &header); }