1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
use std::net::SocketAddr; use std::thread; use std::sync::Arc; use std::sync::mpsc::{channel, Sender, Receiver}; use std::sync::atomic::{AtomicBool, Ordering}; use sync::{create_sync_peers, create_local_sync_node, create_sync_connection_factory, SyncListener}; use primitives::hash::H256; use util::{init_db, node_table_path}; use {config, p2p, PROTOCOL_VERSION, PROTOCOL_MINIMUM}; use super::super::rpc; enum BlockNotifierTask { NewBlock(H256), Stop, } struct BlockNotifier { tx: Sender<BlockNotifierTask>, is_synchronizing: Arc<AtomicBool>, worker_thread: Option<thread::JoinHandle<()>>, } impl BlockNotifier { pub fn new(block_notify_command: String) -> Self { let (tx, rx) = channel(); let is_synchronizing = Arc::new(AtomicBool::default()); BlockNotifier { tx: tx, is_synchronizing: is_synchronizing.clone(), worker_thread: Some(thread::Builder::new() .name("Block notification thread".to_owned()) .spawn(move || BlockNotifier::worker(rx, block_notify_command)) .expect("Error creating block notification thread")) } } fn worker(rx: Receiver<BlockNotifierTask>, block_notify_command: String) { for cmd in rx { match cmd { BlockNotifierTask::NewBlock(new_block_hash) => { let new_block_hash = new_block_hash.to_reversed_str(); let command = block_notify_command.replace("%s", &new_block_hash); let c_command = ::std::ffi::CString::new(command.clone()).unwrap(); unsafe { use libc::system; let err = system(c_command.as_ptr()); if err != 0 { error!(target: "pbtc", "Block notification command {} exited with error code {}", command, err); } } }, BlockNotifierTask::Stop => { break } } } trace!(target: "pbtc", "Block notification thread stopped"); } } impl SyncListener for BlockNotifier { fn synchronization_state_switched(&self, is_synchronizing: bool) { self.is_synchronizing.store(is_synchronizing, Ordering::SeqCst); } fn best_storage_block_inserted(&self, block_hash: &H256) { if !self.is_synchronizing.load(Ordering::SeqCst) { self.tx.send(BlockNotifierTask::NewBlock(block_hash.clone())) .expect("Block notification thread have the same lifetime as `BlockNotifier`") } } } impl Drop for BlockNotifier { fn drop(&mut self) { if let Some(join_handle) = self.worker_thread.take() { let _ = self.tx.send(BlockNotifierTask::Stop); join_handle.join().expect("Clean shutdown."); } } } pub fn start(cfg: config::Config) -> Result<(), String> { let mut el = p2p::event_loop(); init_db(&cfg)?; let nodes_path = node_table_path(&cfg); let p2p_cfg = p2p::Config { threads: cfg.p2p_threads, inbound_connections: cfg.inbound_connections, outbound_connections: cfg.outbound_connections, connection: p2p::NetConfig { protocol_version: PROTOCOL_VERSION, protocol_minimum: PROTOCOL_MINIMUM, magic: cfg.consensus.magic(), local_address: SocketAddr::new(cfg.host, cfg.port), services: cfg.services, user_agent: cfg.user_agent, start_height: 0, relay: true, }, peers: cfg.connect.map_or_else(|| vec![], |x| vec![x]), seeds: cfg.seednodes, node_table_path: nodes_path, preferable_services: cfg.services, internet_protocol: cfg.internet_protocol, }; let sync_peers = create_sync_peers(); let local_sync_node = create_local_sync_node(cfg.consensus, cfg.db.clone(), sync_peers.clone(), cfg.verification_params); let sync_connection_factory = create_sync_connection_factory(sync_peers.clone(), local_sync_node.clone()); if let Some(block_notify_command) = cfg.block_notify_command { local_sync_node.install_sync_listener(Box::new(BlockNotifier::new(block_notify_command))); } let p2p = try!(p2p::P2P::new(p2p_cfg, sync_connection_factory, el.handle()).map_err(|x| x.to_string())); let rpc_deps = rpc::Dependencies { network: cfg.network, storage: cfg.db, local_sync_node: local_sync_node, p2p_context: p2p.context().clone(), remote: el.remote(), }; let _rpc_server = try!(rpc::new_http(cfg.rpc_config, rpc_deps)); try!(p2p.run().map_err(|_| "Failed to start p2p module")); el.run(p2p::forever()).unwrap(); Ok(()) }