From e7a50ff52ab007ad5282e525fb77c3dfd1a3c59a Mon Sep 17 00:00:00 2001 From: Robert Habermeier <rphmeier@gmail.com> Date: Wed, 17 Oct 2018 15:56:20 -0700 Subject: [PATCH] generalize some import_queue params --- substrate/core/client/src/client.rs | 4 +- substrate/core/network/src/import_queue.rs | 237 ++++++++++----------- substrate/core/network/src/service.rs | 19 +- substrate/core/network/src/test/mod.rs | 11 +- 4 files changed, 135 insertions(+), 136 deletions(-) diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 81b59e2f7ab..9122e1692cc 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -892,7 +892,9 @@ impl<B, E, Block> Client<B, E, Block> where /// TODO [snd] possibly implement this on blockchain::Backend and just redirect here /// Returns `Ok(None)` if `target_hash` is not found in search space. /// TODO [snd] write down time complexity - pub fn best_containing(&self, target_hash: Block::Hash, maybe_max_number: Option<NumberFor<Block>>) -> error::Result<Option<Block::Hash>> { + pub fn best_containing(&self, target_hash: Block::Hash, maybe_max_number: Option<NumberFor<Block>>) + -> error::Result<Option<Block::Hash>> + { let target_header = { match self.backend.blockchain().header(BlockId::Hash(target_hash))? { Some(x) => x, diff --git a/substrate/core/network/src/import_queue.rs b/substrate/core/network/src/import_queue.rs index 26ad83e852f..79cf19ecc02 100644 --- a/substrate/core/network/src/import_queue.rs +++ b/substrate/core/network/src/import_queue.rs @@ -65,15 +65,9 @@ pub trait ImportQueue<B: BlockT>: Send + Sync { /// /// This is called automatically by the network service when synchronization /// begins. - - fn start<E>( - &self, - _sync: Weak<RwLock<ChainSync<B>>>, - _service: Weak<E>, - _chain: Weak<Client<B>> - ) -> Result<(), Error> where + fn start<L>(&self, _link: L) -> Result<(), Error> where Self: Sized, - E: 'static + ExecuteInContext<B>, + L: 'static + Link<B>, { Ok(()) } @@ -138,18 +132,16 @@ impl<B: BlockT> AsyncImportQueueData<B> { } impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> { - fn start<E: 'static + ExecuteInContext<B>>( + fn start<L: 'static + Link<B>>( &self, - sync: Weak<RwLock<ChainSync<B>>>, - service: Weak<E>, - chain: Weak<Client<B>> + link: L, ) -> Result<(), Error> { debug_assert!(self.handle.lock().is_none()); let qdata = self.data.clone(); let verifier = self.verifier.clone(); *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { - import_thread(sync, service, chain, qdata, verifier) + import_thread(link, qdata, verifier) }).map_err(|err| Error::from(ErrorKind::Io(err)))?); Ok(()) } @@ -211,10 +203,8 @@ impl<B: BlockT, V: 'static + Verifier<B>> Drop for BasicQueue<B, V> { } /// Blocks import thread. -fn import_thread<B: BlockT, E: ExecuteInContext<B>, V: Verifier<B>>( - sync: Weak<RwLock<ChainSync<B>>>, - service: Weak<E>, - chain: Weak<Client<B>>, +fn import_thread<B: BlockT, L: Link<B>, V: Verifier<B>>( + link: L, qdata: Arc<AsyncImportQueueData<B>>, verifier: Arc<V> ) { @@ -236,91 +226,89 @@ fn import_thread<B: BlockT, E: ExecuteInContext<B>, V: Verifier<B>>( } }; - match (sync.upgrade(), service.upgrade(), chain.upgrade()) { - (Some(sync), Some(service), Some(chain)) => { - let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect(); - if !import_many_blocks( - &mut SyncLink{chain: &sync, client: &*chain, context: &*service}, - Some(&*qdata), - new_blocks, - verifier.clone(), - ) { - break; - } - - let mut queue_blocks = qdata.queue_blocks.write(); - for blocks_hash in blocks_hashes { - queue_blocks.remove(&blocks_hash); - } - }, - _ => break, + let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect(); + if !import_many_blocks( + &link, + Some(&*qdata), + new_blocks, + verifier.clone(), + ) { + break; + } + + let mut queue_blocks = qdata.queue_blocks.write(); + for blocks_hash in blocks_hashes { + queue_blocks.remove(&blocks_hash); } } trace!(target: "sync", "Stopping import thread"); } -/// ChainSync link trait. -trait SyncLinkApi<B: BlockT> { + +/// Hooks that the verification queue can use to influence the synchronization +/// algorithm. +pub trait Link<B: BlockT>: Send { /// Get chain reference. fn chain(&self) -> &Client<B>; /// Block imported. - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>); + fn block_imported(&self, hash: &B::Hash, number: NumberFor<B>); /// Maintain sync. - fn maintain_sync(&mut self); + fn maintain_sync(&self); /// Disconnect from peer. - fn useless_peer(&mut self, who: NodeIndex, reason: &str); + fn useless_peer(&self, who: NodeIndex, reason: &str); /// Disconnect from peer and restart sync. - fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str); + fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str); /// Restart sync. - fn restart(&mut self); + fn restart(&self); } - -/// Link with the ChainSync service. -struct SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext<B>> { - pub chain: &'a RwLock<ChainSync<B>>, - pub client: &'a Client<B>, - pub context: &'a E, +/// A link implementation that connects to the network. +pub struct NetworkLink<B: BlockT, E: ExecuteInContext<B>> { + /// The client handle. + pub(crate) client: Arc<Client<B>>, + /// The chain-sync handle + pub(crate) sync: Weak<RwLock<ChainSync<B>>>, + /// Network context. + pub(crate) context: Weak<E>, } -impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLink<'a, B, E> { - /// Execute closure with locked ChainSync. - fn with_sync<F: Fn(&mut ChainSync<B>, &mut Context<B>)>(&mut self, closure: F) { - let service = self.context; - let sync = self.chain; - service.execute_in_context(move |protocol| { - let mut sync = sync.write(); - closure(&mut *sync, protocol) - }); +impl<B: BlockT, E: ExecuteInContext<B>> NetworkLink<B, E> { + /// Execute closure with locked ChainSync. + fn with_sync<F: Fn(&mut ChainSync<B>, &mut Context<B>)>(&self, closure: F) { + if let (Some(sync), Some(service)) = (self.sync.upgrade(), self.context.upgrade()) { + service.execute_in_context(move |protocol| { + let mut sync = sync.write(); + closure(&mut *sync, protocol) + }); + } } } -impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext<B>> SyncLinkApi<B> for SyncLink<'a, B, E> { - +impl<B: BlockT, E: ExecuteInContext<B>> Link<B> for NetworkLink<B, E> { fn chain(&self) -> &Client<B> { - self.client + &*self.client } - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor<B>) { + fn block_imported(&self, hash: &B::Hash, number: NumberFor<B>) { self.with_sync(|sync, _| sync.block_imported(&hash, number)) } - fn maintain_sync(&mut self) { + fn maintain_sync(&self) { self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) } - fn useless_peer(&mut self, who: NodeIndex, reason: &str) { + fn useless_peer(&self, who: NodeIndex, reason: &str) { self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason))) } - fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str) { + fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) { self.with_sync(|sync, protocol| { protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless? sync.restart(protocol); }) } - fn restart(&mut self) { + fn restart(&self) { self.with_sync(|sync, protocol| sync.restart(protocol)) } } @@ -352,8 +340,8 @@ enum BlockImportError { } /// Import a bunch of blocks. -fn import_many_blocks<'a, B: BlockT, V: Verifier<B>>( - link: &mut SyncLinkApi<B>, +fn import_many_blocks<'a, B: BlockT, L: Link<B>, V: Verifier<B>>( + link: &L, qdata: Option<&AsyncImportQueueData<B>>, blocks: (BlockOrigin, Vec<BlockData<B>>), verifier: Arc<V> @@ -473,7 +461,7 @@ fn import_single_block<B: BlockT, V: Verifier<B>>( /// Process single block import result. fn process_import_result<'a, B: BlockT>( - link: &mut SyncLinkApi<B>, + link: &Link<B>, result: Result<BlockImportResult<B::Hash, <<B as BlockT>::Header as HeaderT>::Number>, BlockImportError> ) -> usize { @@ -545,7 +533,7 @@ unsafe impl<B: BlockT> Sync for ImportCB<B> {} #[cfg(any(test, feature = "test-helpers"))] /// A Verifier that accepts all blocks and passes them on with the configured -/// finality to be imported. +/// finality to be imported. pub struct PassThroughVerifier(pub bool); #[cfg(any(test, feature = "test-helpers"))] @@ -585,25 +573,19 @@ impl<B: BlockT, V: Verifier<B>> SyncImportQueue<B, V> { #[cfg(any(test, feature = "test-helpers"))] impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImportQueue<B, V> { - fn start<E: 'static + ExecuteInContext<B>>( + fn start<L: 'static + Link<B>>( &self, - sync: Weak<RwLock<ChainSync<B>>>, - service: Weak<E>, - chain: Weak<Client<B>> + link: L, ) -> Result<(), Error> { let v = self.0.clone(); - self.1.set(Box::new(move | origin, new_blocks | { + self.1.set(Box::new(move |origin, new_blocks| { let verifier = v.clone(); - match (sync.upgrade(), service.upgrade(), chain.upgrade()) { - (Some(sync), Some(service), Some(chain)) => - import_many_blocks( - &mut SyncLink{chain: &sync, client: &*chain, context: &*service}, - None, - (origin, new_blocks), - verifier, - ), - _ => false - } + import_many_blocks( + &link, + None, + (origin, new_blocks), + verifier, + ) })); Ok(()) } @@ -635,40 +617,51 @@ pub mod tests { use test_client::runtime::{Block, Hash}; use on_demand::tests::DummyExecutor; use runtime_primitives::generic::BlockId; + use std::cell::Cell; use super::*; - struct TestLink { chain: Arc<Client<Block>>, - imported: usize, - maintains: usize, - disconnects: usize, - restarts: usize, + imported: Cell<usize>, + maintains: Cell<usize>, + disconnects: Cell<usize>, + restarts: Cell<usize>, } impl TestLink { fn new() -> TestLink { TestLink { chain: Arc::new(test_client::new()), - imported: 0, - maintains: 0, - disconnects: 0, - restarts: 0, + imported: Cell::new(0), + maintains: Cell::new(0), + disconnects: Cell::new(0), + restarts: Cell::new(0), } } fn total(&self) -> usize { - self.imported + self.maintains + self.disconnects + self.restarts + self.imported.get() + self.maintains.get() + self.disconnects.get() + self.restarts.get() } } - impl SyncLinkApi<Block> for TestLink { + impl Link<Block> for TestLink { fn chain(&self) -> &Client<Block> { &*self.chain } - fn block_imported(&mut self, _hash: &Hash, _number: NumberFor<Block>) { self.imported += 1; } - fn maintain_sync(&mut self) { self.maintains += 1; } - fn useless_peer(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; } - fn note_useless_and_restart_sync(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; self.restarts += 1; } - fn restart(&mut self) { self.restarts += 1; } + fn block_imported(&self, _hash: &Hash, _number: NumberFor<Block>) { + self.imported.set(self.imported.get() + 1); + } + fn maintain_sync(&self) { + self.maintains.set(self.maintains.get() + 1); + } + fn useless_peer(&self, _: NodeIndex, _: &str) { + self.disconnects.set(self.disconnects.get() + 1); + } + fn note_useless_and_restart_sync(&self, id: NodeIndex, r: &str) { + self.useless_peer(id, r); + self.restart(); + } + fn restart(&self) { + self.restarts.set(self.restarts.get() + 1); + } } fn prepare_good_block() -> (client::Client<test_client::Backend, test_client::Executor, Block>, Hash, u64, BlockData<Block>) { @@ -729,39 +722,39 @@ pub mod tests { #[test] fn process_import_result_works() { - let mut link = TestLink::new(); - assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); + let link = TestLink::new(); + assert_eq!(process_import_result::<Block>(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); assert_eq!(link.total(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); + let link = TestLink::new(); + assert_eq!(process_import_result::<Block>(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); assert_eq!(link.total(), 1); - assert_eq!(link.imported, 1); + assert_eq!(link.imported.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::<Block>(&mut link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); + let link = TestLink::new(); + assert_eq!(process_import_result::<Block>(&link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); assert_eq!(link.total(), 1); - assert_eq!(link.imported, 1); + assert_eq!(link.imported.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0); + let link = TestLink::new(); + assert_eq!(process_import_result::<Block>(&link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0); assert_eq!(link.total(), 1); - assert_eq!(link.disconnects, 1); + assert_eq!(link.disconnects.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::IncompleteJustification(Some(0)))), 0); + let link = TestLink::new(); + assert_eq!(process_import_result::<Block>(&link, Err(BlockImportError::IncompleteJustification(Some(0)))), 0); assert_eq!(link.total(), 1); - assert_eq!(link.disconnects, 1); + assert_eq!(link.disconnects.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::UnknownParent)), 0); + let link = TestLink::new(); + assert_eq!(process_import_result::<Block>(&link, Err(BlockImportError::UnknownParent)), 0); assert_eq!(link.total(), 1); - assert_eq!(link.restarts, 1); + assert_eq!(link.restarts.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::<Block>(&mut link, Err(BlockImportError::Error)), 0); + let link = TestLink::new(); + assert_eq!(process_import_result::<Block>(&link, Err(BlockImportError::Error)), 0); assert_eq!(link.total(), 1); - assert_eq!(link.restarts, 1); + assert_eq!(link.restarts.get(), 1); } #[test] @@ -782,9 +775,7 @@ pub mod tests { fn async_import_queue_drops() { let verifier = Arc::new(PassThroughVerifier(true)); let queue = BasicQueue::new(verifier); - let service = Arc::new(DummyExecutor); - let chain = Arc::new(test_client::new()); - queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak<Client<Block>>).unwrap(); + queue.start(TestLink::new()).unwrap(); drop(queue); } } diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 5fd479c167e..52db914d6ea 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -129,7 +129,7 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> { params: Params<B, S, H>, protocol_id: ProtocolId, import_queue: I, - ) -> Result<Arc<Service<B, S, H>>, Error> { + ) -> Result<Arc<Service<B, S, H>>, Error> { let chain = params.chain.clone(); let import_queue = Arc::new(import_queue); let handler = Arc::new(Protocol::new( @@ -144,20 +144,23 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> { let registered = RegisteredProtocol::new(protocol_id, &versions[..]); let (thread, network) = start_thread(params.network_config, handler.clone(), registered)?; - let sync = Arc::new(Service { + let service = Arc::new(Service { network, protocol_id, handler, bg_thread: Some(thread), }); - import_queue.start( - Arc::downgrade(sync.handler.sync()), - Arc::downgrade(&sync), - Arc::downgrade(&chain) - )?; + // connect the import-queue to the network service. + let link = ::import_queue::NetworkLink { + client: chain, + sync: Arc::downgrade(service.handler.sync()), + context: Arc::downgrade(&service), + }; + + import_queue.start(link)?; - Ok(sync) + Ok(service) } /// Called when a new block is imported by the client. diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 243e63e2b38..50e644c2612 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -158,10 +158,13 @@ impl Peer { // Update the sync state to the latest chain state. let info = self.client.info().expect("In-mem client does not fail"); let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); - self.import_queue.start( - Arc::downgrade(&self.sync.sync()), - Arc::downgrade(&self.executor), - Arc::downgrade(&self.sync.context_data().chain)).expect("Test ImportQueue always starts"); + let network_link = ::import_queue::NetworkLink { + client: self.sync.context_data().chain.clone(), + sync: Arc::downgrade(self.sync.sync()), + context: Arc::downgrade(&self.executor), + }; + + self.import_queue.start(network_link).expect("Test ImportQueue always starts"); self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header); } -- GitLab