From 9ad9f7eebd573c514bd54397dc548575462488bd Mon Sep 17 00:00:00 2001
From: Pierre Krieger <pierre.krieger1708@gmail.com>
Date: Wed, 5 Jun 2019 09:15:54 +0200
Subject: [PATCH] Minor sync refactoring (#2767)

* Make maintain_sync private

* Remove sync::Context::peer_info

* Print errors if sync state mismatch

* Line width
---
 substrate/core/network/src/protocol.rs |  18 +-
 substrate/core/network/src/sync.rs     | 294 +++++++++++++------------
 2 files changed, 166 insertions(+), 146 deletions(-)

diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs
index 569f1df0309..2281b9da7fb 100644
--- a/substrate/core/network/src/protocol.rs
+++ b/substrate/core/network/src/protocol.rs
@@ -119,7 +119,7 @@ pub struct ProtocolStatus<B: BlockT> {
 }
 
 /// Peer information
-#[derive(Debug)]
+#[derive(Debug, Clone)]
 struct Peer<B: BlockT, H: ExHashT> {
 	info: PeerInfo<B>,
 	/// Current block request, if any.
@@ -342,10 +342,6 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext<B> for ProtocolContext<'a,
 		self.network_out.disconnect_peer(who)
 	}
 
-	fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> {
-		self.context_data.peers.get(who).map(|p| p.info.clone())
-	}
-
 	fn client(&self) -> &dyn Client<B> {
 		&*self.context_data.chain
 	}
@@ -906,9 +902,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 			status.version
 		};
 
+		let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
 		self.on_demand_core.on_connect(&mut network_out, who.clone(), status.roles, status.best_number);
 		let mut context = ProtocolContext::new(&mut self.context_data, network_out);
-		self.sync.new_peer(&mut context, who.clone());
+		self.sync.new_peer(&mut context, who.clone(), info);
 		if protocol_version > 2 {
 			self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles);
 		}
@@ -1188,16 +1185,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 		processed_blocks: Vec<B::Hash>,
 		has_error: bool
 	) {
-		self.sync.blocks_processed(processed_blocks, has_error);
-		let mut context =
-			ProtocolContext::new(&mut self.context_data, network_out);
-		self.sync.maintain_sync(&mut context);
+		let mut context = ProtocolContext::new(&mut self.context_data, network_out);
+		self.sync.blocks_processed(&mut context, processed_blocks, has_error);
 	}
 
 	/// Restart the sync process.
 	pub fn restart(&mut self, network_out: &mut dyn NetworkOut<B>) {
+		let peers = self.context_data.peers.clone();
 		let mut context = ProtocolContext::new(&mut self.context_data, network_out);
-		self.sync.restart(&mut context);
+		self.sync.restart(&mut context, |peer_id| peers.get(peer_id).map(|i| i.info.clone()));
 	}
 
 	/// Notify about successful import of the given block.
diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs
index 9b327c1a204..62cb0ce14d6 100644
--- a/substrate/core/network/src/sync.rs
+++ b/substrate/core/network/src/sync.rs
@@ -33,7 +33,7 @@
 use std::cmp::max;
 use std::ops::Range;
 use std::collections::{HashMap, VecDeque};
-use log::{debug, trace, warn, info};
+use log::{debug, trace, warn, info, error};
 use crate::protocol::PeerInfo as ProtocolPeerInfo;
 use network_libp2p::PeerId;
 use client::{BlockStatus, ClientInfo};
@@ -81,9 +81,6 @@ pub trait Context<B: BlockT> {
 	/// Force disconnecting from a peer. Use this when a peer misbehaved.
 	fn disconnect_peer(&mut self, who: PeerId);
 
-	/// Get peer info.
-	fn peer_info(&self, peer: &PeerId) -> Option<ProtocolPeerInfo<B>>;
-
 	/// Request a finality proof from a peer.
 	fn send_finality_proof_request(&mut self, who: PeerId, request: message::FinalityProofRequest<B::Hash>);
 
@@ -237,82 +234,90 @@ impl<B: BlockT> ChainSync<B> {
 	}
 
 	/// Handle new connected peer. Call this method whenever we connect to a new peer.
-	pub(crate) fn new_peer(&mut self, protocol: &mut dyn Context<B>, who: PeerId) {
-		if let Some(info) = protocol.peer_info(&who) {
-			// there's nothing sync can get from the node that has no blockchain data
-			// (the opposite is not true, but all requests are served at protocol level)
-			if !info.roles.is_full() {
-				return;
-			}
+	pub(crate) fn new_peer(
+		&mut self,
+		protocol: &mut dyn Context<B>,
+		who: PeerId,
+		info: ProtocolPeerInfo<B>
+	) {
+		// there's nothing sync can get from the node that has no blockchain data
+		// (the opposite is not true, but all requests are served at protocol level)
+		if !info.roles.is_full() {
+			return;
+		}
 
-			let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash);
-			match (status, info.best_number) {
-				(Err(e), _) => {
-					debug!(target:"sync", "Error reading blockchain: {:?}", e);
-					protocol.report_peer(who.clone(), BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE);
-					protocol.disconnect_peer(who);
-				},
-				(Ok(BlockStatus::KnownBad), _) => {
-					info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
-					protocol.report_peer(who.clone(), i32::min_value());
-					protocol.disconnect_peer(who);
-				},
-				(Ok(BlockStatus::Unknown), b) if b.is_zero() => {
-					info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
-					protocol.report_peer(who.clone(), i32::min_value());
-					protocol.disconnect_peer(who);
-				},
-				(Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => {
-					// when actively syncing the common point moves too fast.
-					debug!(target:"sync", "New peer with unknown best hash {} ({}), assuming common block.", self.best_queued_hash, self.best_queued_number);
-					self.peers.insert(who, PeerSync {
-						common_number: self.best_queued_number,
+		let status = block_status(&*protocol.client(), &self.queue_blocks, info.best_hash);
+		match (status, info.best_number) {
+			(Err(e), _) => {
+				debug!(target:"sync", "Error reading blockchain: {:?}", e);
+				protocol.report_peer(who.clone(), BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE);
+				protocol.disconnect_peer(who);
+			},
+			(Ok(BlockStatus::KnownBad), _) => {
+				info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number);
+				protocol.report_peer(who.clone(), i32::min_value());
+				protocol.disconnect_peer(who);
+			},
+			(Ok(BlockStatus::Unknown), b) if b.is_zero() => {
+				info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
+				protocol.report_peer(who.clone(), i32::min_value());
+				protocol.disconnect_peer(who);
+			},
+			(Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => {
+				// when actively syncing the common point moves too fast.
+				debug!(
+					target:"sync",
+					"New peer with unknown best hash {} ({}), assuming common block.",
+					self.best_queued_hash,
+					self.best_queued_number
+				);
+				self.peers.insert(who, PeerSync {
+					common_number: self.best_queued_number,
+					best_hash: info.best_hash,
+					best_number: info.best_number,
+					state: PeerSyncState::Available,
+					recently_announced: Default::default(),
+				});
+			}
+			(Ok(BlockStatus::Unknown), _) => {
+				let our_best = self.best_queued_number;
+				if our_best.is_zero() {
+					// We are at genesis, just start downloading
+					debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
+					self.peers.insert(who.clone(), PeerSync {
+						common_number: Zero::zero(),
 						best_hash: info.best_hash,
 						best_number: info.best_number,
 						state: PeerSyncState::Available,
 						recently_announced: Default::default(),
 					});
-				}
-				(Ok(BlockStatus::Unknown), _) => {
-					let our_best = self.best_queued_number;
-					if our_best.is_zero() {
-						// We are at genesis, just start downloading
-						debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number);
-						self.peers.insert(who.clone(), PeerSync {
-							common_number: Zero::zero(),
-							best_hash: info.best_hash,
-							best_number: info.best_number,
-							state: PeerSyncState::Available,
-							recently_announced: Default::default(),
-						});
-						self.download_new(protocol, who)
-					} else {
-						let common_best = ::std::cmp::min(our_best, info.best_number);
-						debug!(target:"sync",
-							"New peer with unknown best hash {} ({}), searching for common ancestor.",
-							info.best_hash,
-							info.best_number
-						);
-						self.peers.insert(who.clone(), PeerSync {
-							common_number: Zero::zero(),
-							best_hash: info.best_hash,
-							best_number: info.best_number,
-							state: PeerSyncState::AncestorSearch(common_best, AncestorSearchState::ExponentialBackoff(One::one())),
-							recently_announced: Default::default(),
-						});
-						Self::request_ancestry(protocol, who, common_best)
-					}
-				},
-				(Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChainWithState), _) | (Ok(BlockStatus::InChainPruned), _) => {
-					debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
+					self.download_new(protocol, who)
+				} else {
+					let common_best = ::std::cmp::min(our_best, info.best_number);
+					debug!(target:"sync",
+						"New peer with unknown best hash {} ({}), searching for common ancestor.",
+						info.best_hash,
+						info.best_number
+					);
 					self.peers.insert(who.clone(), PeerSync {
-						common_number: info.best_number,
+						common_number: Zero::zero(),
 						best_hash: info.best_hash,
 						best_number: info.best_number,
-						state: PeerSyncState::Available,
+						state: PeerSyncState::AncestorSearch(common_best, AncestorSearchState::ExponentialBackoff(One::one())),
 						recently_announced: Default::default(),
 					});
+					Self::request_ancestry(protocol, who, common_best)
 				}
+			},
+			(Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChainWithState), _) | (Ok(BlockStatus::InChainPruned), _) => {
+				debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number);
+				self.peers.insert(who.clone(), PeerSync {
+					common_number: info.best_number,
+					best_hash: info.best_hash,
+					best_number: info.best_number,
+					state: PeerSyncState::Available,
+					recently_announced: Default::default(),
+				});
 			}
 		}
 	}
@@ -488,36 +493,41 @@ impl<B: BlockT> ChainSync<B> {
 		_request: message::BlockRequest<B>,
 		response: message::BlockResponse<B>,
 	) -> Option<(PeerId, B::Hash, NumberFor<B>, Justification)> {
-		if let Some(ref mut peer) = self.peers.get_mut(&who) {
-			if let PeerSyncState::DownloadingJustification(hash) = peer.state {
-				peer.state = PeerSyncState::Available;
+		let peer = if let Some(peer) = self.peers.get_mut(&who) {
+			peer
+		} else {
+			error!(target: "sync", "Called on_block_justification_data with a bad peer ID");
+			return None;
+		};
 
-				// we only request one justification at a time
-				match response.blocks.into_iter().next() {
-					Some(response) => {
-						if hash != response.hash {
-							info!("Invalid block justification provided by {}: requested: {:?} got: {:?}",
-								who, hash, response.hash);
-							protocol.report_peer(who.clone(), i32::min_value());
-							protocol.disconnect_peer(who);
-							return None;
-						}
+		if let PeerSyncState::DownloadingJustification(hash) = peer.state {
+			peer.state = PeerSyncState::Available;
 
-						return self.extra_requests.justifications().on_response(
-							who,
-							response.justification,
-						);
-					},
-					None => {
-						// we might have asked the peer for a justification on a block that we thought it had
-						// (regardless of whether it had a justification for it or not).
-						trace!(target: "sync", "Peer {:?} provided empty response for justification request {:?}",
-							who,
-							hash,
-						);
+			// we only request one justification at a time
+			match response.blocks.into_iter().next() {
+				Some(response) => {
+					if hash != response.hash {
+						info!("Invalid block justification provided by {}: requested: {:?} got: {:?}",
+							who, hash, response.hash);
+						protocol.report_peer(who.clone(), i32::min_value());
+						protocol.disconnect_peer(who);
 						return None;
-					},
-				}
+					}
+
+					return self.extra_requests.justifications().on_response(
+						who,
+						response.justification,
+					);
+				},
+				None => {
+					// we might have asked the peer for a justification on a block that we thought it had
+					// (regardless of whether it had a justification for it or not).
+					trace!(target: "sync", "Peer {:?} provided empty response for justification request {:?}",
+						who,
+						hash,
+					);
+					return None;
+				},
 			}
 		}
 
@@ -532,28 +542,33 @@ impl<B: BlockT> ChainSync<B> {
 		who: PeerId,
 		response: message::FinalityProofResponse<B::Hash>,
 	) -> Option<(PeerId, B::Hash, NumberFor<B>, Vec<u8>)> {
-		if let Some(ref mut peer) = self.peers.get_mut(&who) {
-			if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state {
-				peer.state = PeerSyncState::Available;
-
-				// we only request one finality proof at a time
-				if hash != response.block {
-					info!(
-						"Invalid block finality proof provided: requested: {:?} got: {:?}",
-						hash,
-						response.block,
-					);
+		let peer = if let Some(peer) = self.peers.get_mut(&who) {
+			peer
+		} else {
+			error!(target: "sync", "Called on_block_finality_proof_data with a bad peer ID");
+			return None;
+		};
 
-					protocol.report_peer(who.clone(), i32::min_value());
-					protocol.disconnect_peer(who);
-					return None;
-				}
+		if let PeerSyncState::DownloadingFinalityProof(hash) = peer.state {
+			peer.state = PeerSyncState::Available;
 
-				return self.extra_requests.finality_proofs().on_response(
-					who,
-					response.proof,
+			// we only request one finality proof at a time
+			if hash != response.block {
+				info!(
+					"Invalid block finality proof provided: requested: {:?} got: {:?}",
+					hash,
+					response.block,
 				);
+
+				protocol.report_peer(who.clone(), i32::min_value());
+				protocol.disconnect_peer(who);
+				return None;
 			}
+
+			return self.extra_requests.finality_proofs().on_response(
+				who,
+				response.proof,
+			);
 		}
 
 		self.maintain_sync(protocol);
@@ -563,17 +578,18 @@ impl<B: BlockT> ChainSync<B> {
 	/// A batch of blocks have been processed, with or without errors.
 	/// Call this when a batch of blocks have been processed by the import queue, with or without
 	/// errors.
-	pub fn blocks_processed(&mut self, processed_blocks: Vec<B::Hash>, has_error: bool) {
+	pub fn blocks_processed(&mut self, protocol: &mut Context<B>, processed_blocks: Vec<B::Hash>, has_error: bool) {
 		for hash in processed_blocks {
 			self.queue_blocks.remove(&hash);
 		}
 		if has_error {
 			self.best_importing_number = Zero::zero();
 		}
+		self.maintain_sync(protocol)
 	}
 
 	/// Maintain the sync process (download new blocks, fetch justifications).
-	pub fn maintain_sync(&mut self, protocol: &mut dyn Context<B>) {
+	fn maintain_sync(&mut self, protocol: &mut dyn Context<B>) {
 		let peers: Vec<PeerId> = self.peers.keys().map(|p| p.clone()).collect();
 		for peer in peers {
 			self.download_new(protocol, peer);
@@ -716,26 +732,28 @@ impl<B: BlockT> ChainSync<B> {
 		let ancient_parent = parent_status == BlockStatus::InChainPruned;
 
 		let known = self.is_known(protocol, &hash);
-		if let Some(ref mut peer) = self.peers.get_mut(&who) {
-			while peer.recently_announced.len() >= ANNOUNCE_HISTORY_SIZE {
-				peer.recently_announced.pop_front();
-			}
-			peer.recently_announced.push_back(hash.clone());
-			if number > peer.best_number {
-				// update their best block
-				peer.best_number = number;
-				peer.best_hash = hash;
-			}
-			if let PeerSyncState::AncestorSearch(_, _) = peer.state {
-				return false;
-			}
-			if header.parent_hash() == &self.best_queued_hash || known_parent {
-				peer.common_number = number - One::one();
-			} else if known {
-				peer.common_number = number
-			}
+		let peer = if let Some(peer) = self.peers.get_mut(&who) {
+			peer
 		} else {
+			error!(target: "sync", "Called on_block_announce with a bad peer ID");
 			return false;
+		};
+		while peer.recently_announced.len() >= ANNOUNCE_HISTORY_SIZE {
+			peer.recently_announced.pop_front();
+		}
+		peer.recently_announced.push_back(hash.clone());
+		if number > peer.best_number {
+			// update their best block
+			peer.best_number = number;
+			peer.best_hash = hash;
+		}
+		if let PeerSyncState::AncestorSearch(_, _) = peer.state {
+			return false;
+		}
+		if header.parent_hash() == &self.best_queued_hash || known_parent {
+			peer.common_number = number - One::one();
+		} else if known {
+			peer.common_number = number
 		}
 
 		// known block case
@@ -825,7 +843,11 @@ impl<B: BlockT> ChainSync<B> {
 	}
 
 	/// Restart the sync process.
-	pub(crate) fn restart(&mut self, protocol: &mut dyn Context<B>) {
+	pub(crate) fn restart(
+		&mut self,
+		protocol: &mut dyn Context<B>,
+		mut peer_info: impl FnMut(&PeerId) -> Option<ProtocolPeerInfo<B>>
+	) {
 		self.queue_blocks.clear();
 		self.best_importing_number = Zero::zero();
 		self.blocks.clear();
@@ -835,7 +857,9 @@ impl<B: BlockT> ChainSync<B> {
 		debug!(target:"sync", "Restarted with {} ({})", self.best_queued_number, self.best_queued_hash);
 		let ids: Vec<PeerId> = self.peers.drain().map(|(id, _)| id).collect();
 		for id in ids {
-			self.new_peer(protocol, id);
+			if let Some(info) = peer_info(&id) {
+				self.new_peer(protocol, id, info);
+			}
 		}
 	}
 
-- 
GitLab