Newer
Older
/// Management interval (in ms)
pub const MANAGEMENT_INTERVAL_MS: u64 = 10 * 1000;
/// Response time to decrease peer score
const DEFAULT_PEER_FAILURE_INTERVAL_MS: u32 = 5 * 1000;
const DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS: u32 = 20 * 60 * 1000;
const DEFAULT_UNKNOWN_BLOCKS_MAX_LEN: usize = 16;
/// Peers management configuration
pub struct ManagePeersConfig {
/// Time interval (in milliseconds) to wait answer from the peer before penalizing && reexecuting tasks
pub failure_interval_ms: u32,
}
impl Default for ManagePeersConfig {
fn default() -> Self {
ManagePeersConfig {
failure_interval_ms: DEFAULT_PEER_FAILURE_INTERVAL_MS,
}
}
}
/// Unknown blocks management configuration
pub struct ManageUnknownBlocksConfig {
/// Time interval (in milliseconds) to wait before removing unknown blocks from in-memory pool
pub removal_time_ms: u32,
/// Maximal # of unknown blocks in the in-memory pool
pub max_number: usize,
}
impl Default for ManageUnknownBlocksConfig {
fn default() -> Self {
ManageUnknownBlocksConfig {
removal_time_ms: DEFAULT_UNKNOWN_BLOCK_REMOVAL_TIME_MS,
max_number: DEFAULT_UNKNOWN_BLOCKS_MAX_LEN,
}
}
}
/// Manage stalled synchronization peers tasks
pub fn manage_synchronization_peers(config: &ManagePeersConfig, peers: &mut Peers) -> Option<Vec<H256>> {
let mut blocks_to_request: Vec<H256> = Vec::new();
// reset tasks for peers, which has not responded during given period
for (worst_peer_index, worst_peer_time) in peers.active_peers_order() {
// check if peer has not responded within given time
let time_diff = now - worst_peer_time;
if time_diff <= config.failure_interval_ms as f64 / 1000f64 {
break;
}
// decrease score && move to the idle queue
warn!(target: "sync", "Failed to get response from peer#{} in {} seconds", worst_peer_index, time_diff);
let peer_tasks = peers.reset_tasks(worst_peer_index);
blocks_to_request.extend(peer_tasks);
// if peer failed many times => forget it
if peers.on_peer_failure(worst_peer_index) {
warn!(target: "sync", "Too many failures for peer#{}. Excluding from synchronization", worst_peer_index);
if blocks_to_request.is_empty() { None } else { Some(blocks_to_request) }
pub fn manage_unknown_orphaned_blocks(config: &ManageUnknownBlocksConfig, unknown_blocks: &mut LinkedHashMap<H256, f64>) -> Option<Vec<H256>> {
let mut unknown_to_remove: Vec<H256> = Vec::new();
let mut remove_num = if unknown_blocks.len() > config.max_number { unknown_blocks.len() - config.max_number } else { 0 };
let now = precise_time_s();
for (hash, time) in unknown_blocks.iter() {
// remove oldest blocks if there are more unknown blocks that we can hold in memory
if remove_num > 0 {
unknown_to_remove.push(hash.clone());
remove_num -= 1;
continue;
}
// check if block is unknown for too long
let time_diff = now - time;
if time_diff <= config.removal_time_ms as f64 / 1000f64 {
break;
}
unknown_to_remove.push(hash.clone());
}
// remove unknown blocks
for unknown_block in unknown_to_remove.iter() {
unknown_blocks.remove(unknown_block);
}
if unknown_to_remove.is_empty() { None } else { Some(unknown_to_remove) }
}
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#[cfg(test)]
mod tests {
use time::precise_time_s;
use linked_hash_map::LinkedHashMap;
use super::{ManagePeersConfig, ManageUnknownBlocksConfig, manage_synchronization_peers, manage_unknown_orphaned_blocks};
use synchronization_peers::Peers;
use primitives::hash::H256;
#[test]
fn manage_good_peer() {
let config = ManagePeersConfig { failure_interval_ms: 1000, };
let mut peers = Peers::new();
peers.on_blocks_requested(1, &vec![H256::from(0), H256::from(1)]);
peers.on_block_received(1, &H256::from(0));
assert_eq!(manage_synchronization_peers(&config, &mut peers), None);
assert_eq!(peers.idle_peers(), vec![]);
}
#[test]
fn manage_bad_peers() {
use std::thread::sleep;
use std::time::Duration;
let config = ManagePeersConfig { failure_interval_ms: 0, };
let mut peers = Peers::new();
peers.on_blocks_requested(1, &vec![H256::from(0)]);
peers.on_blocks_requested(2, &vec![H256::from(1)]);
sleep(Duration::from_millis(1));
let managed_tasks = manage_synchronization_peers(&config, &mut peers).expect("managed tasks");
assert!(managed_tasks.contains(&H256::from(0)));
assert!(managed_tasks.contains(&H256::from(1)));
let idle_peers = peers.idle_peers();
assert_eq!(2, idle_peers.len());
assert!(idle_peers.contains(&1));
assert!(idle_peers.contains(&2));
}
#[test]
fn manage_unknown_blocks_good() {
let config = ManageUnknownBlocksConfig { removal_time_ms: 1000, max_number: 100 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), None);
assert_eq!(unknown_blocks.len(), 1);
}
#[test]
fn manage_unknown_blocks_by_time() {
use std::thread::sleep;
use std::time::Duration;
let config = ManageUnknownBlocksConfig { removal_time_ms: 0, max_number: 100 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
sleep(Duration::from_millis(1));
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), Some(vec![H256::from(0)]));
assert_eq!(unknown_blocks.len(), 0);
}
#[test]
fn manage_unknown_blocks_by_max_number() {
let config = ManageUnknownBlocksConfig { removal_time_ms: 100, max_number: 1 };
let mut unknown_blocks: LinkedHashMap<H256, f64> = LinkedHashMap::new();
unknown_blocks.insert(H256::from(0), precise_time_s());
unknown_blocks.insert(H256::from(1), precise_time_s());
assert_eq!(manage_unknown_orphaned_blocks(&config, &mut unknown_blocks), Some(vec![H256::from(0)]));
assert_eq!(unknown_blocks.len(), 1);
}
}