From 0302a1fc5e511cf4027c9f417f796076651f7c84 Mon Sep 17 00:00:00 2001 From: Robert Habermeier <rphmeier@gmail.com> Date: Mon, 29 Oct 2018 15:56:10 +0100 Subject: [PATCH] refactor import queue to use explicit block import references --- substrate/core/network/src/import_queue.rs | 61 +++++++++++++--------- substrate/core/network/src/service.rs | 2 - substrate/core/network/src/test/mod.rs | 38 +++++++++++--- 3 files changed, 67 insertions(+), 34 deletions(-) diff --git a/substrate/core/network/src/import_queue.rs b/substrate/core/network/src/import_queue.rs index 4a6775a2a9d..42fcb1b6d6b 100644 --- a/substrate/core/network/src/import_queue.rs +++ b/substrate/core/network/src/import_queue.rs @@ -34,14 +34,16 @@ use primitives::AuthorityId; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}; pub use blocks::BlockData; -use chain::Client; +use client::error::Error as ClientError; use error::{ErrorKind, Error}; use protocol::Context; use service::ExecuteInContext; use sync::ChainSync; -pub use consensus::{ImportBlock, ImportResult, BlockOrigin}; +pub use consensus::{ImportBlock, BlockImport, ImportResult, BlockOrigin}; +/// Shared block import struct used by the queue. +pub type SharedBlockImport<B> = Arc<dyn BlockImport<B,Error=ClientError> + Send + Sync>; #[cfg(any(test, feature = "test-helpers"))] use std::cell::RefCell; @@ -98,6 +100,7 @@ pub struct BasicQueue<B: BlockT, V: 'static + Verifier<B>> { handle: Mutex<Option<::std::thread::JoinHandle<()>>>, data: Arc<AsyncImportQueueData<B>>, verifier: Arc<V>, + block_import: SharedBlockImport<B>, } /// Locks order: queue, queue_blocks, best_importing_number @@ -111,11 +114,12 @@ struct AsyncImportQueueData<B: BlockT> { impl<B: BlockT, V: Verifier<B>> BasicQueue<B, V> { /// Instantiate a new basic queue, with given verifier. - pub fn new(verifier: Arc<V>) -> Self { + pub fn new(verifier: Arc<V>, block_import: SharedBlockImport<B>) -> Self { Self { handle: Mutex::new(None), data: Arc::new(AsyncImportQueueData::new()), verifier, + block_import, } } } @@ -141,8 +145,9 @@ impl<B: BlockT, V: 'static + Verifier<B>> ImportQueue<B> for BasicQueue<B, V> { let qdata = self.data.clone(); let verifier = self.verifier.clone(); + let block_import = self.block_import.clone(); *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { - import_thread(link, qdata, verifier) + import_thread(block_import, link, qdata, verifier) }).map_err(|err| Error::from(ErrorKind::Io(err)))?); Ok(()) } @@ -205,6 +210,7 @@ impl<B: BlockT, V: 'static + Verifier<B>> Drop for BasicQueue<B, V> { /// Blocks import thread. fn import_thread<B: BlockT, L: Link<B>, V: Verifier<B>>( + block_import: SharedBlockImport<B>, link: L, qdata: Arc<AsyncImportQueueData<B>>, verifier: Arc<V> @@ -229,6 +235,7 @@ fn import_thread<B: BlockT, L: Link<B>, V: Verifier<B>>( let blocks_hashes: Vec<B::Hash> = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect(); if !import_many_blocks( + &*block_import, &link, Some(&*qdata), new_blocks, @@ -249,8 +256,6 @@ fn import_thread<B: BlockT, L: Link<B>, V: Verifier<B>>( /// Hooks that the verification queue can use to influence the synchronization /// algorithm. pub trait Link<B: BlockT>: Send { - /// Get chain reference. - fn chain(&self) -> &Client<B>; /// Block imported. fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) { } /// Maintain sync. @@ -265,8 +270,6 @@ pub trait Link<B: BlockT>: Send { /// A link implementation that connects to the network. pub struct NetworkLink<B: BlockT, E: ExecuteInContext<B>> { - /// The client handle. - pub(crate) client: Arc<Client<B>>, /// The chain-sync handle pub(crate) sync: Weak<RwLock<ChainSync<B>>>, /// Network context. @@ -286,10 +289,6 @@ impl<B: BlockT, E: ExecuteInContext<B>> NetworkLink<B, E> { } impl<B: BlockT, E: ExecuteInContext<B>> Link<B> for NetworkLink<B, E> { - fn chain(&self) -> &Client<B> { - &*self.client - } - fn block_imported(&self, hash: &B::Hash, number: NumberFor<B>) { self.with_sync(|sync, _| sync.block_imported(&hash, number)) } @@ -342,6 +341,7 @@ enum BlockImportError { /// Import a bunch of blocks. fn import_many_blocks<'a, B: BlockT, L: Link<B>, V: Verifier<B>>( + import_handle: &BlockImport<B, Error=ClientError>, link: &L, qdata: Option<&AsyncImportQueueData<B>>, blocks: (BlockOrigin, Vec<BlockData<B>>), @@ -365,7 +365,7 @@ fn import_many_blocks<'a, B: BlockT, L: Link<B>, V: Verifier<B>>( // Blocks in the response/drain should be in ascending order. for block in blocks { let import_result = import_single_block( - link.chain(), + import_handle, blocks_origin.clone(), block, verifier.clone(), @@ -389,7 +389,7 @@ fn import_many_blocks<'a, B: BlockT, L: Link<B>, V: Verifier<B>>( /// Single block import function. fn import_single_block<B: BlockT, V: Verifier<B>>( - chain: &Client<B>, + import_handle: &BlockImport<B,Error=ClientError>, block_origin: BlockOrigin, block: BlockData<B>, verifier: Arc<V> @@ -431,7 +431,7 @@ fn import_single_block<B: BlockT, V: Verifier<B>>( BlockImportError::VerificationFailed(peer, msg) })?; - match chain.import(import_block, new_authorities) { + match import_handle.import_block(import_block, new_authorities) { Ok(ImportResult::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); Ok(BlockImportResult::ImportedKnown(hash, number)) @@ -558,15 +558,25 @@ impl<B: BlockT> Verifier<B> for PassThroughVerifier { } } -#[cfg(any(test, feature = "test-helpers"))] /// Blocks import queue that is importing blocks in the same thread. /// The boolean value indicates whether blocks should be imported without instant finality. -pub struct SyncImportQueue<B: BlockT, V: Verifier<B>>(Arc<V>, ImportCB<B>); +#[cfg(any(test, feature = "test-helpers"))] +pub struct SyncImportQueue<B: BlockT, V: Verifier<B>> { + verifier: Arc<V>, + link: ImportCB<B>, + block_import: SharedBlockImport<B>, +} + #[cfg(any(test, feature = "test-helpers"))] impl<B: BlockT, V: Verifier<B>> SyncImportQueue<B, V> { - /// Create a new SyncImportQueue wrapping the given Verifier - pub fn new(verifier: Arc<V>) -> Self { - SyncImportQueue(verifier, ImportCB::new()) + /// Create a new SyncImportQueue wrapping the given Verifier and block import + /// handle. + pub fn new(verifier: Arc<V>, block_import: SharedBlockImport<B>) -> Self { + SyncImportQueue { + verifier, + link: ImportCB::new(), + block_import, + } } } @@ -577,10 +587,12 @@ impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImpor &self, link: L, ) -> Result<(), Error> { - let v = self.0.clone(); - self.1.set(Box::new(move |origin, new_blocks| { + let v = self.verifier.clone(); + let import_handle = self.block_import.clone(); + self.link.set(Box::new(move |origin, new_blocks| { let verifier = v.clone(); import_many_blocks( + &*import_handle, &link, None, (origin, new_blocks), @@ -605,7 +617,7 @@ impl<B: 'static + BlockT, V: 'static + Verifier<B>> ImportQueue<B> for SyncImpor } fn import_blocks(&self, origin: BlockOrigin, blocks: Vec<BlockData<B>>) { - self.1.call(origin, blocks); + self.link.call(origin, blocks); } } @@ -644,7 +656,6 @@ pub mod tests { } impl Link<Block> for TestLink { - fn chain(&self) -> &Client<Block> { &*self.chain } fn block_imported(&self, _hash: &Hash, _number: NumberFor<Block>) { self.imported.set(self.imported.get() + 1); } @@ -762,7 +773,9 @@ pub mod tests { let qdata = AsyncImportQueueData::new(); let verifier = Arc::new(PassThroughVerifier(true)); qdata.is_stopping.store(true, Ordering::SeqCst); + let client = test_client::new(); assert!(!import_many_blocks( + &client, &mut TestLink::new(), Some(&qdata), (BlockOrigin::File, vec![block.clone(), block]), diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 0e46f50998b..976fb8bc8e9 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -130,7 +130,6 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> { protocol_id: ProtocolId, import_queue: I, ) -> Result<Arc<Service<B, S, H>>, Error> { - let chain = params.chain.clone(); let import_queue = Arc::new(import_queue); let handler = Arc::new(Protocol::new( params.config, @@ -153,7 +152,6 @@ impl<B: BlockT + 'static, S: Specialization<B>, H: ExHashT> Service<B, S, H> { // connect the import-queue to the network service. let link = ::import_queue::NetworkLink { - client: chain, sync: Arc::downgrade(service.handler.sync()), context: Arc::downgrade(&service), }; diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 34eb873cc64..b852ceab80e 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use parking_lot::RwLock; use client; +use client::error::Error as ClientError; use client::block_builder::BlockBuilder; use runtime_primitives::generic::BlockId; use io::SyncIo; @@ -38,7 +39,7 @@ use import_queue::{SyncImportQueue, PassThroughVerifier, Verifier}; use consensus::BlockOrigin; use specialization::Specialization; use consensus_gossip::ConsensusGossip; -use import_queue::ImportQueue; +use import_queue::{BlockImport, ImportQueue}; use service::ExecuteInContext; use test_client; @@ -163,7 +164,6 @@ impl<V: 'static + Verifier<Block>> Peer<V> { let info = self.client.info().expect("In-mem client does not fail"); let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); let network_link = ::import_queue::NetworkLink { - client: self.sync.context_data().chain.clone(), sync: Arc::downgrade(self.sync.sync()), context: Arc::downgrade(&self.executor), }; @@ -235,16 +235,31 @@ impl<V: 'static + Verifier<Block>> Peer<V> { pub fn generate_blocks<F>(&self, count: usize, origin: BlockOrigin, mut edit_block: F) where F: FnMut(&mut BlockBuilder<test_client::Backend, test_client::Executor, Block, Blake2Hasher>) { - for _ in 0 .. count { + use blocks::BlockData; + + for _ in 0..count { let mut builder = self.client.new_block().unwrap(); edit_block(&mut builder); let block = builder.bake().unwrap(); let hash = block.header.hash(); trace!("Generating {}, (#{}, parent={})", hash, block.header.number, block.header.parent_hash); let header = block.header.clone(); - self.client.justify_and_import(origin, block).unwrap(); - self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), hash, &header); + + // NOTE: if we use a non-synchronous queue in the test-net in the future, + // this may not work. + self.import_queue.import_blocks(origin, vec![BlockData { + origin: None, + block: ::message::BlockData::<Block> { + hash, + header: Some(header), + body: Some(block.extrinsics), + receipt: None, + message_queue: None, + justification: None, + }, + }]); } + } /// Push blocks to the peer (simplified: with or without a TX) @@ -308,7 +323,12 @@ pub trait TestNetFactory: Sized { fn mut_peers<F: Fn(&mut Vec<Arc<Peer<Self::Verifier>>>)>(&mut self, closure: F ); fn started(&self) -> bool; - fn set_started(&mut self, now: bool); + fn set_started(&mut self, now: bool); + + /// Get custom block import handle for fresh client. + fn make_block_import(&self, client: Arc<PeersClient>) -> Arc<BlockImport<Block,Error=ClientError> + Send + Sync> { + client + } fn default_config() -> ProtocolConfig { ProtocolConfig::default() @@ -330,7 +350,9 @@ pub trait TestNetFactory: Sized { let client = Arc::new(test_client::new()); let tx_pool = Arc::new(EmptyTransactionPool); let verifier = self.make_verifier(client.clone(), config); - let import_queue = Arc::new(SyncImportQueue::new(verifier)); + let block_import = self.make_block_import(client.clone()); + + let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import)); let specialization = DummySpecialization { gossip: ConsensusGossip::new(), }; @@ -471,7 +493,7 @@ impl TestNetFactory for TestNet { started: false } } - + fn make_verifier(&self, _client: Arc<PeersClient>, _config: &ProtocolConfig) -> Arc<Self::Verifier> { -- GitLab