Skip to content
Snippets Groups Projects
Commit 4eaf3161 authored by Svyatoslav Nikolsky's avatar Svyatoslav Nikolsky
Browse files

handle reorganizations in BlocksWriter

parent cbdb97a0
No related merge requests found
use std::collections::VecDeque;
use std::sync::Arc;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use chain;
use storage;
use miner::MemoryPool;
use network::ConsensusParams;
use primitives::hash::H256;
use super::Error;
use synchronization_chain::Chain;
use synchronization_verifier::{Verifier, SyncVerifier, VerificationTask,
VerificationSink, BlockVerificationSink, TransactionVerificationSink};
use types::StorageRef;
use types::{PeerIndex, StorageRef};
use utils::OrphanBlocksPool;
use VerificationParameters;
......@@ -24,27 +26,27 @@ pub struct BlocksWriter {
/// Blocks verifier
verifier: SyncVerifier<BlocksWriterSink>,
/// Verification events receiver
sink: Arc<BlocksWriterSinkData>,
sink: Arc<Mutex<BlocksWriterSinkData>>,
}
/// Verification events receiver
struct BlocksWriterSink {
/// Reference to blocks writer data
data: Arc<BlocksWriterSinkData>,
data: Arc<Mutex<BlocksWriterSinkData>>,
}
/// Blocks writer data
struct BlocksWriterSinkData {
/// Blocks storage
storage: StorageRef,
/// Synchronization chain.
chain: Chain,
/// Last verification error
err: Mutex<Option<Error>>,
err: Option<Error>,
}
impl BlocksWriter {
/// Create new synchronous blocks writer
pub fn new(storage: StorageRef, consensus: ConsensusParams, verification_params: VerificationParameters) -> BlocksWriter {
let sink_data = Arc::new(BlocksWriterSinkData::new(storage.clone()));
let sink_data = Arc::new(Mutex::new(BlocksWriterSinkData::new(storage.clone())));
let sink = Arc::new(BlocksWriterSink::new(sink_data.clone()));
let verifier = SyncVerifier::new(consensus, storage.clone(), sink, verification_params);
BlocksWriter {
......@@ -88,7 +90,7 @@ impl BlocksWriter {
impl BlocksWriterSink {
/// Create new verification events receiver
pub fn new(data: Arc<BlocksWriterSinkData>) -> Self {
pub fn new(data: Arc<Mutex<BlocksWriterSinkData>>) -> Self {
BlocksWriterSink {
data: data,
}
......@@ -99,14 +101,14 @@ impl BlocksWriterSinkData {
/// Create new blocks writer data
pub fn new(storage: StorageRef) -> Self {
BlocksWriterSinkData {
storage: storage,
err: Mutex::new(None),
chain: Chain::new(storage, Arc::new(RwLock::new(MemoryPool::new()))),
err: None,
}
}
/// Take last verification error
pub fn error(&self) -> Option<Error> {
self.err.lock().take()
pub fn error(&mut self) -> Option<Error> {
self.err.take()
}
}
......@@ -115,19 +117,16 @@ impl VerificationSink for BlocksWriterSink {
impl BlockVerificationSink for BlocksWriterSink {
fn on_block_verification_success(&self, block: chain::IndexedBlock) -> Option<Vec<VerificationTask>> {
let hash = block.hash().clone();
if let Err(err) = self.data.storage.insert(block) {
*self.data.err.lock() = Some(Error::Database(err));
}
if let Err(err) = self.data.storage.canonize(&hash) {
*self.data.err.lock() = Some(Error::Database(err));
let mut data = self.data.lock();
if let Err(err) = data.chain.insert_best_block(block) {
data.err = Some(Error::Database(err));
}
None
}
fn on_block_verification_error(&self, err: &str, _hash: &H256) {
*self.data.err.lock() = Some(Error::Verification(err.into()));
self.data.lock().err = Some(Error::Verification(err.into()));
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment