diff --git a/substrate/client/service/src/chain_ops.rs b/substrate/client/service/src/chain_ops.rs index 0297ad5c9053ed84357588bf7e9552c7f799ff0d..cb4ed24b60b624c78a6640ab25ba4cd1ceff7a6b 100644 --- a/substrate/client/service/src/chain_ops.rs +++ b/substrate/client/service/src/chain_ops.rs @@ -39,7 +39,9 @@ use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageC use sc_client_api::{StorageProvider, BlockBackend, UsageProvider}; use std::{io::{Read, Write, Seek}, pin::Pin, collections::HashMap}; -use std::{thread, time::{Duration, Instant}}; +use std::time::{Duration, Instant}; +use futures_timer::Delay; +use std::task::Poll; use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer}; use std::convert::{TryFrom, TryInto}; use sp_runtime::traits::{CheckedDiv, Saturating}; @@ -272,14 +274,14 @@ enum ImportState<R, B> where /// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to catch up. WaitingForImportQueueToCatchUp{ block_iter: BlockIter<R, B>, - delay: Duration, + delay: Delay, block: SignedBlock<B> }, // We have added all the blocks to the queue but they are still being processed. WaitingForImportQueueToFinish{ num_expected_blocks: Option<u64>, read_block_count: u64, - delay: Duration, + delay: Delay, }, } @@ -373,7 +375,7 @@ impl< // The iterator is over: we now need to wait for the import queue to finish. let num_expected_blocks = block_iter.num_expected_blocks(); let read_block_count = block_iter.read_block_count(); - let delay = Duration::from_millis(DELAY_TIME); + let delay = Delay::new(Duration::from_millis(DELAY_TIME)); state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); }, Some(block_result) => { @@ -383,7 +385,7 @@ impl< if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { // The queue is full, so do not add this block and simply wait until // the queue has made some progress. - let delay = Duration::from_millis(DELAY_TIME); + let delay = Delay::new(Duration::from_millis(DELAY_TIME)); state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); } else { // Queue is not full, we can keep on adding blocks to the queue. @@ -392,18 +394,26 @@ impl< } } Err(e) => { - return std::task::Poll::Ready( + return Poll::Ready( Err(Error::Other(format!("Error reading block #{}: {}", read_block_count, e)))) } } } } }, - ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block} => { + ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => { let read_block_count = block_iter.read_block_count(); if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { - thread::sleep(delay); // Queue is still full, so wait until there is room to insert our block. + match Pin::new(&mut delay).poll(cx) { + Poll::Pending => { + state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); + return Poll::Pending + }, + Poll::Ready(_) => { + delay.reset(Duration::from_millis(DELAY_TIME)); + }, + } state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); } else { // Queue is no longer full, so we can add our block to the queue. @@ -412,7 +422,7 @@ impl< state = Some(ImportState::Reading{block_iter}); } }, - ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay} => { + ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, mut delay} => { // All the blocks have been added to the queue, which doesn't mean they // have all been properly imported. if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) { @@ -421,10 +431,20 @@ impl< "🎉 Imported {} blocks. Best: #{}", read_block_count, client.chain_info().best_number ); - return std::task::Poll::Ready(Ok(())) + return Poll::Ready(Ok(())) } else { - thread::sleep(delay); // Importing is not done, we still have to wait for the queue to finish. + // Wait for the delay, because we know the queue is lagging behind. + match Pin::new(&mut delay).poll(cx) { + Poll::Pending => { + state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); + return Poll::Pending + }, + Poll::Ready(_) => { + delay.reset(Duration::from_millis(DELAY_TIME)); + }, + } + state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); } } @@ -436,7 +456,7 @@ impl< speedometer.notify_user(best_number); if link.has_error { - return std::task::Poll::Ready(Err( + return Poll::Ready(Err( Error::Other( format!("Stopping after #{} blocks because of an error", link.imported_blocks) ) @@ -444,7 +464,7 @@ impl< } cx.waker().wake_by_ref(); - std::task::Poll::Pending + Poll::Pending }); Box::pin(import) } @@ -477,7 +497,7 @@ impl< let client = &self.client; if last < block { - return std::task::Poll::Ready(Err("Invalid block range specified".into())); + return Poll::Ready(Err("Invalid block range specified".into())); } if !wrote_header { @@ -501,19 +521,19 @@ impl< } }, // Reached end of the chain. - None => return std::task::Poll::Ready(Ok(())), + None => return Poll::Ready(Ok(())), } if (block % 10000.into()).is_zero() { info!("#{}", block); } if block == last { - return std::task::Poll::Ready(Ok(())); + return Poll::Ready(Ok(())); } block += One::one(); // Re-schedule the task in order to continue the operation. cx.waker().wake_by_ref(); - std::task::Poll::Pending + Poll::Pending }); Box::pin(export)