Newer
Older
use std::collections::HashSet;
use orphan_blocks_pool::OrphanBlocksPool;
use orphan_transactions_pool::OrphanTransactionsPool;
/// Management interval (in ms)
pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
/// Response time before getting block to decrease peer score
const DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
/// Response time before getting inventory to decrease peer score
const DEFAULT_PEER_INVENTORY_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000;
const DEFAULT_UNKNOWN_BLOCKS_MAX_LEN: usize = 16;
/// Unknown orphan transaction removal time
const DEFAULT_ORPHAN_TRANSACTION_REMOVAL_TIME_MS: u32 = 10 * 60 * 1000;
/// Maximal number of orphaned transactions
const DEFAULT_ORPHAN_TRANSACTIONS_MAX_LEN: usize = 10000;
/// Peers management configuration
pub struct ManagePeersConfig {
/// Time interval (in milliseconds) to wait block from the peer before penalizing && reexecuting tasks
pub block_failure_interval_ms: u32,
/// Time interval (in milliseconds) to wait inventory from the peer before penalizing && reexecuting tasks
pub inventory_failure_interval_ms: u32,
}
impl Default for ManagePeersConfig {
fn default() -> Self {
ManagePeersConfig {
block_failure_interval_ms: DEFAULT_PEER_BLOCK_FAILURE_INTERVAL_MS,
inventory_failure_interval_ms: DEFAULT_PEER_INVENTORY_FAILURE_INTERVAL_MS,
}
}
}
/// Unknown blocks management configuration
pub struct ManageUnknownBlocksConfig {
/// Time interval (in milliseconds) to wait before removing unknown blocks from in-memory pool
pub removal_time_ms: u32,
/// Maximal # of unknown blocks in the in-memory pool
pub max_number: usize,
}
impl Default for ManageUnknownBlocksConfig {
fn default() -> Self {
ManageUnknownBlocksConfig {
removal_time_ms: DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS,
max_number: DEFAULT_UNKNOWN_BLOCKS_MAX_LEN,
}
}
}
/// Orphan transactions management configuration
pub struct ManageOrphanTransactionsConfig {
/// Time interval (in milliseconds) to wait before removing orphan transactions from orphan pool
pub removal_time_ms: u32,
/// Maximal # of unknown transactions in the orphan pool
pub max_number: usize,
}
impl Default for ManageOrphanTransactionsConfig {
fn default() -> Self {
ManageOrphanTransactionsConfig {
removal_time_ms: DEFAULT_ORPHAN_TRANSACTION_REMOVAL_TIME_MS,
max_number: DEFAULT_ORPHAN_TRANSACTIONS_MAX_LEN,
}
}
}
/// Manage stalled synchronization peers blocks tasks
pub fn manage_synchronization_peers_blocks(config: &ManagePeersConfig, peers: &mut Peers) -> Option<Vec<H256>> {
let mut blocks_to_request: Vec<H256> = Vec::new();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.ordered_blocks_requests() {
// check if peer has not responded within given time
let time_diff = now - worst_peer_time;
if time_diff <= config.block_failure_interval_ms as f64 / 1000f64 {
break;
}
// decrease score && move to the idle queue
warn!(target: "sync", "Failed to get requested block from peer#{} in {} seconds", worst_peer_index, time_diff);
let peer_tasks = peers.reset_blocks_tasks(worst_peer_index);
blocks_to_request.extend(peer_tasks);
// if peer failed many times => forget it
if peers.on_peer_block_failure(worst_peer_index) {
warn!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index);
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) }
/// Manage stalled synchronization peers inventory tasks
pub fn manage_synchronization_peers_inventory(config: &ManagePeersConfig, peers: &mut Peers) {
let now = precise_time_s();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.ordered_inventory_requests() {
// check if peer has not responded within given time
let time_diff = now - worst_peer_time;
if time_diff <= config.inventory_failure_interval_ms as f64 / 1000f64 {
break;
}
peers.on_peer_inventory_failure(worst_peer_index);
}
}
pub fn manage_unknown_orphaned_blocks(config: &ManageUnknownBlocksConfig, orphaned_blocks_pool: &mut OrphanBlocksPool) -> Option<Vec<H256>> {
let unknown_to_remove = {
let unknown_blocks = orphaned_blocks_pool.unknown_blocks();
let mut unknown_to_remove: HashSet<H256> = HashSet::new();
let mut remove_num = if unknown_blocks.len() > config.max_number { unknown_blocks.len() - config.max_number } else { 0 };
let now = precise_time_s();
// remove oldest blocks if there are more unknown blocks that we can hold in memory
if remove_num > 0 {
unknown_to_remove.insert(hash.clone());
remove_num -= 1;
continue;
}
// check if block is unknown for too long
let time_diff = now - time;
if time_diff <= config.removal_time_ms as f64 / 1000f64 {
break;
}
unknown_to_remove.insert(hash.clone());
unknown_to_remove
};
// remove unknown blocks
let unknown_to_remove: Vec<H256> = orphaned_blocks_pool.remove_blocks(&unknown_to_remove).into_iter()
.map(|t| t.0)
.collect();
if unknown_to_remove.is_empty() { None } else { Some(unknown_to_remove) }
}
/// Manage orphaned transactions
pub fn manage_orphaned_transactions(config: &ManageOrphanTransactionsConfig, orphaned_transactions_pool: &mut OrphanTransactionsPool) -> Option<Vec<H256>> {
let orphans_to_remove = {
let unknown_transactions = orphaned_transactions_pool.transactions();
let mut orphans_to_remove: Vec<H256> = Vec::new();
let mut remove_num = if unknown_transactions.len() > config.max_number { unknown_transactions.len() - config.max_number } else { 0 };
let now = precise_time_s();
// remove oldest blocks if there are more unknown blocks that we can hold in memory
if remove_num > 0 {
orphans_to_remove.push(hash.clone());
remove_num -= 1;
continue;
}
// check if block is unknown for too long
let time_diff = now - orphan_tx.insertion_time;
if time_diff <= config.removal_time_ms as f64 / 1000f64 {
break;
}
orphans_to_remove.push(hash.clone());
orphans_to_remove
};
let orphans_to_remove: Vec<H256> = orphaned_transactions_pool.remove_transactions(&orphans_to_remove).into_iter()
.map(|t| t.0)
.collect();
if orphans_to_remove.is_empty() { None } else { Some(orphans_to_remove) }
use std::collections::HashSet;
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, ManageOrphanTransactionsConfig, manage_synchronization_peers_blocks,
manage_unknown_orphaned_blocks, manage_orphaned_transactions};
use chain::RepresentH256;
use synchronization_peers::Peers;
use primitives::hash::H256;
use test_data;
use orphan_blocks_pool::OrphanBlocksPool;
use orphan_transactions_pool::OrphanTransactionsPool;
#[test]
fn manage_good_peer() {
let config = ManagePeersConfig { block_failure_interval_ms: 1000, ..Default::default() };
let mut peers = Peers::new();
peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]);
peers.on_block_received(1, &H256::from(0));
assert_eq!(manage_synchronization_peers_blocks(&config, &mut peers), None);
assert_eq!(peers.idle_peers_for_blocks(), vec![]);
}
#[test]
fn manage_bad_peers() {
use std::thread::sleep;
use std::time::Duration;
let config = ManagePeersConfig { block_failure_interval_ms: 0, ..Default::default() };
let mut peers = Peers::new();
peers.on_blocks_requested(1, &vec![H256::from(0)]);
peers.on_blocks_requested(2, &vec![H256::from(1)]);
sleep(Duration::from_millis(1));
let managed_tasks = manage_synchronization_peers_blocks(&config, &mut peers).expect("managed tasks");
assert!(managed_tasks.contains(&H256::from(0)));
assert!(managed_tasks.contains(&H256::from(1)));
let idle_peers = peers.idle_peers_for_blocks();
assert_eq!(2, idle_peers.len());
assert!(idle_peers.contains(&1));
assert!(idle_peers.contains(&2));
}
#[test]
fn manage_unknown_blocks_good() {
let config = ManageUnknownBlocksConfig { removal_time_ms: 1000, max_number: 100 };
let mut pool = OrphanBlocksPool::new();
let block = test_data::genesis();
pool.insert_unknown_block(block.hash(), block);
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut pool), None);
assert_eq!(pool.len(), 1);
}
#[test]
fn manage_unknown_blocks_by_time() {
use std::thread::sleep;
use std::time::Duration;
let config = ManageUnknownBlocksConfig { removal_time_ms: 0, max_number: 100 };
let mut pool = OrphanBlocksPool::new();
let block = test_data::genesis();
let block_hash = block.hash();
pool.insert_unknown_block(block_hash.clone(), block);
sleep(Duration::from_millis(1));
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut pool), Some(vec![block_hash]));
assert_eq!(pool.len(), 0);
}
#[test]
fn manage_unknown_blocks_by_max_number() {
let config = ManageUnknownBlocksConfig { removal_time_ms: 100, max_number: 1 };
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
let mut pool = OrphanBlocksPool::new();
let block1 = test_data::genesis();
let block1_hash = block1.hash();
let block2 = test_data::block_h2();
let block2_hash = block2.hash();
pool.insert_unknown_block(block1_hash.clone(), block1);
pool.insert_unknown_block(block2_hash.clone(), block2);
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut pool), Some(vec![block1_hash]));
assert_eq!(pool.len(), 1);
}
#[test]
fn manage_orphan_transactions_good() {
let config = ManageOrphanTransactionsConfig { removal_time_ms: 1000, max_number: 100 };
let mut pool = OrphanTransactionsPool::new();
let transaction = test_data::block_h170().transactions[1].clone();
let unknown_inputs: HashSet<H256> = transaction.inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
pool.insert(transaction.hash(), transaction, unknown_inputs);
assert_eq!(manage_orphaned_transactions(&config, &mut pool), None);
assert_eq!(pool.len(), 1);
}
#[test]
fn manage_orphan_transactions_by_time() {
use std::thread::sleep;
use std::time::Duration;
let config = ManageOrphanTransactionsConfig { removal_time_ms: 0, max_number: 100 };
let mut pool = OrphanTransactionsPool::new();
let transaction = test_data::block_h170().transactions[1].clone();
let unknown_inputs: HashSet<H256> = transaction.inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let transaction_hash = transaction.hash();
pool.insert(transaction_hash.clone(), transaction, unknown_inputs);
sleep(Duration::from_millis(1));
assert_eq!(manage_orphaned_transactions(&config, &mut pool), Some(vec![transaction_hash]));
assert_eq!(pool.len(), 0);
}
#[test]
fn manage_orphan_transactions_by_max_number() {
let config = ManageOrphanTransactionsConfig { removal_time_ms: 100, max_number: 1 };
let mut pool = OrphanTransactionsPool::new();
let transaction1 = test_data::block_h170().transactions[1].clone();
let unknown_inputs1: HashSet<H256> = transaction1.inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let transaction1_hash = transaction1.hash();
let transaction2 = test_data::block_h182().transactions[1].clone();
let unknown_inputs2: HashSet<H256> = transaction2.inputs.iter().map(|i| i.previous_output.hash.clone()).collect();
let transaction2_hash = transaction2.hash();
pool.insert(transaction1_hash.clone(), transaction1, unknown_inputs1);
pool.insert(transaction2_hash.clone(), transaction2, unknown_inputs2);
assert_eq!(manage_orphaned_transactions(&config, &mut pool), Some(vec![transaction1_hash]));
assert_eq!(pool.len(), 1);