diff --git a/substrate/core/service/src/chain_ops.rs b/substrate/core/service/src/chain_ops.rs index ea0311f3b56cbd4075e7febd95bf862edf7f2311..953bb8c3751ec6e59570b36f733d32e606e8cf6c 100644 --- a/substrate/core/service/src/chain_ops.rs +++ b/substrate/core/service/src/chain_ops.rs @@ -21,7 +21,7 @@ use futures::Future; use log::{info, warn}; use runtime_primitives::generic::{SignedBlock, BlockId}; -use runtime_primitives::traits::{As, Block, Header}; +use runtime_primitives::traits::{As, Block, Header, NumberFor}; use consensus_common::import_queue::{ImportQueue, IncomingBlock, Link}; use network::message; @@ -66,7 +66,10 @@ pub fn export_blocks<F, E, W>( }); info!("Exporting blocks from #{} to #{}", block, last); if !json { - output.write(&(last - block + As::sa(1)).encode())?; + let last_: u64 = last.as_(); + let block_: u64 = block.as_(); + let len: u64 = last_ - block_ + 1; + output.write(&len.encode())?; } loop { @@ -95,6 +98,25 @@ pub fn export_blocks<F, E, W>( Ok(()) } +struct WaitLink { + wait_send: std::sync::mpsc::Sender<()>, +} + +impl WaitLink { + fn new(wait_send: std::sync::mpsc::Sender<()>) -> WaitLink { + WaitLink { + wait_send, + } + } +} + +impl<B: Block> Link<B> for WaitLink { + fn block_imported(&self, _hash: &B::Hash, _number: NumberFor<B>) { + self.wait_send.send(()) + .expect("Unable to notify main process; if the main process panicked then this thread would already be dead as well. qed."); + } +} + /// Import blocks from a binary stream. pub fn import_blocks<F, E, R>( mut config: FactoryFullConfiguration<F>, @@ -103,13 +125,13 @@ pub fn import_blocks<F, E, R>( ) -> error::Result<()> where F: ServiceFactory, E: Future<Item=(),Error=()> + Send + 'static, R: Read, { - struct DummyLink; - impl<B: Block> Link<B> for DummyLink { } - let client = new_client::<F>(&config)?; // FIXME #1134 this shouldn't need a mutable config. let queue = components::FullComponents::<F>::build_import_queue(&mut config, client.clone())?; - queue.start(DummyLink)?; + + let (wait_send, wait_recv) = std::sync::mpsc::channel(); + let wait_link = WaitLink::new(wait_send); + queue.start(wait_link)?; let (exit_send, exit_recv) = std::sync::mpsc::channel(); ::std::thread::spawn(move || { @@ -117,7 +139,7 @@ pub fn import_blocks<F, E, R>( let _ = exit_send.send(()); }); - let count: u32 = Decode::decode(&mut input).ok_or("Error reading file")?; + let count: u64 = Decode::decode(&mut input).ok_or("Error reading file")?; info!("Importing {} blocks", count); let mut block_count = 0; for b in 0 .. count { @@ -155,6 +177,14 @@ pub fn import_blocks<F, E, R>( info!("#{}", b); } } + + let mut blocks_imported = 0; + while blocks_imported < count { + wait_recv.recv() + .expect("Importing thread has panicked. Then the main process will die before this can be reached. qed."); + blocks_imported += 1; + } + info!("Imported {} blocks. Best: #{}", block_count, client.info()?.chain.best_number); Ok(())