diff --git a/substrate/polkadot/network/src/lib.rs b/substrate/polkadot/network/src/lib.rs index 016a884fe9af63797ea040af8928b942ee92e9d7..8fbc6dba3a7b5757cc6d90caca494fe1fa9ede47 100644 --- a/substrate/polkadot/network/src/lib.rs +++ b/substrate/polkadot/network/src/lib.rs @@ -49,7 +49,7 @@ use parking_lot::Mutex; use polkadot_consensus::{Statement, SignedStatement, GenericStatement}; use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header}; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation}; -use substrate_network::{PeerId, RequestId, Context, Severity}; +use substrate_network::{NodeIndex, RequestId, Context, Severity}; use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; @@ -244,7 +244,7 @@ impl Decode for Message { } } -fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) { +fn send_polkadot_message(ctx: &mut Context<Block>, to: NodeIndex, message: Message) { trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message); let encoded = message.encode(); ctx.send_message(to, generic_message::Message::ChainSpecific(encoded)) @@ -252,14 +252,14 @@ fn send_polkadot_message(ctx: &mut Context<Block>, to: PeerId, message: Message) /// Polkadot protocol attachment for substrate. pub struct PolkadotProtocol { - peers: HashMap<PeerId, PeerInfo>, + peers: HashMap<NodeIndex, PeerInfo>, collating_for: Option<(AccountId, ParaId)>, consensus_gossip: ConsensusGossip<Block>, collators: CollatorPool, - validators: HashMap<SessionKey, PeerId>, + validators: HashMap<SessionKey, NodeIndex>, local_collations: LocalCollations<Collation>, live_consensus: Option<CurrentConsensus>, - in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, + in_flight: HashMap<(RequestId, NodeIndex), BlockDataRequest>, pending: Vec<BlockDataRequest>, next_req_id: u64, } @@ -352,17 +352,17 @@ impl PolkadotProtocol { .map(|(_, id)| id); // dispatch to peer - if let Some(peer_id) = next_peer { + if let Some(who) = next_peer { let req_id = self.next_req_id; self.next_req_id += 1; send_polkadot_message( ctx, - peer_id, + who, Message::RequestBlockData(req_id, pending.candidate_hash) ); - self.in_flight.insert((req_id, peer_id), pending); + self.in_flight.insert((req_id, who), pending); continue; } @@ -374,36 +374,36 @@ impl PolkadotProtocol { self.pending = new_pending; } - fn on_polkadot_message(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, raw: Vec<u8>, msg: Message) { - trace!(target: "p_net", "Polkadot message from {}: {:?}", peer_id, msg); + fn on_polkadot_message(&mut self, ctx: &mut Context<Block>, who: NodeIndex, raw: Vec<u8>, msg: Message) { + trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg); match msg { Message::Statement(parent_hash, _statement) => - self.consensus_gossip.on_chain_specific(ctx, peer_id, raw, parent_hash), - Message::SessionKey(key) => self.on_session_key(ctx, peer_id, key), + self.consensus_gossip.on_chain_specific(ctx, who, raw, parent_hash), + Message::SessionKey(key) => self.on_session_key(ctx, who, key), Message::RequestBlockData(req_id, hash) => { let block_data = self.live_consensus.as_ref() .and_then(|c| c.block_data(&hash)); - send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data)); + send_polkadot_message(ctx, who, Message::BlockData(req_id, block_data)); } - Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), - Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), - Message::CollatorRole(role) => self.on_new_role(ctx, peer_id, role), + Message::BlockData(req_id, data) => self.on_block_data(ctx, who, req_id, data), + Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation), + Message::CollatorRole(role) => self.on_new_role(ctx, who, role), } } - fn on_session_key(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, key: SessionKey) { + fn on_session_key(&mut self, ctx: &mut Context<Block>, who: NodeIndex, key: SessionKey) { { - let info = match self.peers.get_mut(&peer_id) { + let info = match self.peers.get_mut(&who) { Some(peer) => peer, None => { - trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id); + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); return } }; if !info.claimed_validator { - ctx.report_peer(peer_id, Severity::Bad("Session key broadcasted without setting authority role")); + ctx.report_peer(who, Severity::Bad("Session key broadcasted without setting authority role")); return; } @@ -413,20 +413,20 @@ impl PolkadotProtocol { for (relay_parent, collation) in self.local_collations.fresh_key(&old_key, &key) { send_polkadot_message( ctx, - peer_id, + who, Message::Collation(relay_parent, collation), ) } } - self.validators.insert(key, peer_id); + self.validators.insert(key, who); } self.dispatch_pending_requests(ctx); } - fn on_block_data(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, req_id: RequestId, data: Option<BlockData>) { - match self.in_flight.remove(&(req_id, peer_id)) { + fn on_block_data(&mut self, ctx: &mut Context<Block>, who: NodeIndex, req_id: RequestId, data: Option<BlockData>) { + match self.in_flight.remove(&(req_id, who)) { Some(req) => { if let Some(data) = data { if data.hash() == req.block_data_hash { @@ -438,29 +438,29 @@ impl PolkadotProtocol { self.pending.push(req); self.dispatch_pending_requests(ctx); } - None => ctx.report_peer(peer_id, Severity::Bad("Unexpected block data response")), + None => ctx.report_peer(who, Severity::Bad("Unexpected block data response")), } } // when a validator sends us (a collator) a new role. - fn on_new_role(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, role: Role) { - let info = match self.peers.get(&peer_id) { + fn on_new_role(&mut self, ctx: &mut Context<Block>, who: NodeIndex, role: Role) { + let info = match self.peers.get(&who) { Some(peer) => peer, None => { - trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id); + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); return } }; match info.validator_key { None => ctx.report_peer( - peer_id, + who, Severity::Bad("Sent collator role without registering first as validator"), ), Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) { send_polkadot_message( ctx, - peer_id, + who, Message::Collation(relay_parent, collation), ) }, @@ -473,7 +473,7 @@ impl Specialization<Block> for PolkadotProtocol { Status { collating_for: self.collating_for.clone() }.encode() } - fn on_connect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, status: FullStatus) { + fn on_connect(&mut self, ctx: &mut Context<Block>, who: NodeIndex, status: FullStatus) { let local_status = match Status::decode(&mut &status.chain_status[..]) { Some(status) => status, None => { @@ -483,14 +483,14 @@ impl Specialization<Block> for PolkadotProtocol { if let Some((ref acc_id, ref para_id)) = local_status.collating_for { if self.collator_peer_id(acc_id.clone()).is_some() { - ctx.report_peer(peer_id, Severity::Useless("Unknown Polkadot-specific reason")); + ctx.report_peer(who, Severity::Useless("Unknown Polkadot-specific reason")); return } let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone()); send_polkadot_message( ctx, - peer_id, + who, Message::CollatorRole(collator_role), ); } @@ -498,17 +498,17 @@ impl Specialization<Block> for PolkadotProtocol { let validator = status.roles.contains(substrate_network::Roles::AUTHORITY); let send_key = validator || local_status.collating_for.is_some(); - self.peers.insert(peer_id, PeerInfo { + self.peers.insert(who, PeerInfo { collating_for: local_status.collating_for, validator_key: None, claimed_validator: validator, }); - self.consensus_gossip.new_peer(ctx, peer_id, status.roles); + self.consensus_gossip.new_peer(ctx, who, status.roles); if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) { send_polkadot_message( ctx, - peer_id, + who, Message::SessionKey(consensus.local_session_key) ); } @@ -516,8 +516,8 @@ impl Specialization<Block> for PolkadotProtocol { self.dispatch_pending_requests(ctx); } - fn on_disconnect(&mut self, ctx: &mut Context<Block>, peer_id: PeerId) { - if let Some(info) = self.peers.remove(&peer_id) { + fn on_disconnect(&mut self, ctx: &mut Context<Block>, who: NodeIndex) { + if let Some(info) = self.peers.remove(&who) { if let Some((acc_id, _)) = info.collating_for { let new_primary = self.collators.on_disconnect(acc_id) .and_then(|new_primary| self.collator_peer_id(new_primary)); @@ -539,7 +539,7 @@ impl Specialization<Block> for PolkadotProtocol { { let pending = &mut self.pending; self.in_flight.retain(|&(_, ref peer), val| { - let retain = peer != &peer_id; + let retain = peer != &who; if !retain { let (sender, _) = oneshot::channel(); pending.push(::std::mem::replace(val, BlockDataRequest { @@ -554,24 +554,24 @@ impl Specialization<Block> for PolkadotProtocol { retain }); } - self.consensus_gossip.peer_disconnected(ctx, peer_id); + self.consensus_gossip.peer_disconnected(ctx, who); self.dispatch_pending_requests(ctx); } } - fn on_message(&mut self, ctx: &mut Context<Block>, peer_id: PeerId, message: message::Message<Block>) { + fn on_message(&mut self, ctx: &mut Context<Block>, who: NodeIndex, message: message::Message<Block>) { match message { generic_message::Message::BftMessage(msg) => { - trace!(target: "p_net", "Polkadot BFT message from {}: {:?}", peer_id, msg); + trace!(target: "p_net", "Polkadot BFT message from {}: {:?}", who, msg); // TODO: check signature here? what if relevant block is unknown? - self.consensus_gossip.on_bft_message(ctx, peer_id, msg) + self.consensus_gossip.on_bft_message(ctx, who, msg) } generic_message::Message::ChainSpecific(raw) => { match Message::decode(&mut raw.as_slice()) { - Some(msg) => self.on_polkadot_message(ctx, peer_id, raw, msg), + Some(msg) => self.on_polkadot_message(ctx, who, raw, msg), None => { - trace!(target: "p_net", "Bad message from {}", peer_id); - ctx.report_peer(peer_id, Severity::Bad("Invalid polkadot protocol message format")); + trace!(target: "p_net", "Bad message from {}", who); + ctx.report_peer(who, Severity::Bad("Invalid polkadot protocol message format")); } } } @@ -611,7 +611,7 @@ impl Specialization<Block> for PolkadotProtocol { impl PolkadotProtocol { // we received a collation from a peer - fn on_collation(&mut self, ctx: &mut Context<Block>, from: PeerId, relay_parent: Hash, collation: Collation) { + fn on_collation(&mut self, ctx: &mut Context<Block>, from: NodeIndex, relay_parent: Hash, collation: Collation) { let collation_para = collation.receipt.parachain_index; let collated_acc = collation.receipt.collator; @@ -638,7 +638,7 @@ impl PolkadotProtocol { } // get connected peer with given account ID for collation. - fn collator_peer_id(&self, account_id: AccountId) -> Option<PeerId> { + fn collator_peer_id(&self, account_id: AccountId) -> Option<NodeIndex> { let check_info = |info: &PeerInfo| info .collating_for .as_ref() @@ -647,14 +647,14 @@ impl PolkadotProtocol { self.peers .iter() .filter(|&(_, info)| check_info(info)) - .map(|(peer_id, _)| *peer_id) + .map(|(who, _)| *who) .next() } // disconnect a collator by account-id. fn disconnect_bad_collator(&self, ctx: &mut Context<Block>, account_id: AccountId) { - if let Some(peer_id) = self.collator_peer_id(account_id) { - ctx.report_peer(peer_id, Severity::Bad("Consensus layer determined the given collator misbehaved")) + if let Some(who) = self.collator_peer_id(account_id) { + ctx.report_peer(who, Severity::Bad("Consensus layer determined the given collator misbehaved")) } } } @@ -670,9 +670,9 @@ impl PolkadotProtocol { ) { for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { match self.validators.get(&primary) { - Some(peer_id) => send_polkadot_message( + Some(who) => send_polkadot_message( ctx, - *peer_id, + *who, Message::Collation(relay_parent, cloned_collation), ), None => diff --git a/substrate/polkadot/network/src/tests.rs b/substrate/polkadot/network/src/tests.rs index 6d7fde3fe51236960a4b2e47ea30789d65cccdea..356d272e3f610961f7d7ba8de43bc6c59066a210 100644 --- a/substrate/polkadot/network/src/tests.rs +++ b/substrate/polkadot/network/src/tests.rs @@ -24,16 +24,16 @@ use polkadot_primitives::{Block, Hash, SessionKey}; use polkadot_primitives::parachain::{CandidateReceipt, HeadData, BlockData}; use substrate_primitives::H512; use codec::Encode; -use substrate_network::{Severity, PeerId, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage}; +use substrate_network::{Severity, NodeIndex, PeerInfo, ClientHandle, Context, Roles, message::Message as SubstrateMessage, specialization::Specialization, generic_message::Message as GenericMessage}; use std::sync::Arc; use futures::Future; #[derive(Default)] struct TestContext { - disabled: Vec<PeerId>, - disconnected: Vec<PeerId>, - messages: Vec<(PeerId, SubstrateMessage<Block>)>, + disabled: Vec<NodeIndex>, + disconnected: Vec<NodeIndex>, + messages: Vec<(NodeIndex, SubstrateMessage<Block>)>, } impl Context<Block> for TestContext { @@ -41,24 +41,24 @@ impl Context<Block> for TestContext { unimplemented!() } - fn report_peer(&mut self, peer: PeerId, reason: Severity) { + fn report_peer(&mut self, peer: NodeIndex, reason: Severity) { match reason { Severity::Bad(_) => self.disabled.push(peer), _ => self.disconnected.push(peer), } } - fn peer_info(&self, _peer: PeerId) -> Option<PeerInfo<Block>> { + fn peer_info(&self, _peer: NodeIndex) -> Option<PeerInfo<Block>> { unimplemented!() } - fn send_message(&mut self, peer_id: PeerId, data: SubstrateMessage<Block>) { - self.messages.push((peer_id, data)) + fn send_message(&mut self, who: NodeIndex, data: SubstrateMessage<Block>) { + self.messages.push((who, data)) } } impl TestContext { - fn has_message(&self, to: PeerId, message: Message) -> bool { + fn has_message(&self, to: NodeIndex, message: Message) -> bool { use substrate_network::generic_message::Message as GenericMessage; let encoded = message.encode(); @@ -91,7 +91,7 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus (c, knowledge) } -fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: PeerId, message: Message) { +fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: NodeIndex, message: Message) { let encoded = message.encode(); protocol.on_message(ctx, from, GenericMessage::ChainSpecific(encoded)); } @@ -209,19 +209,19 @@ fn fetches_from_those_with_knowledge() { fn remove_bad_collator() { let mut protocol = PolkadotProtocol::new(None); - let peer_id = 1; + let who = 1; let account_id = [2; 32].into(); let status = Status { collating_for: Some((account_id, 5.into())) }; { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_id, make_status(&status, Roles::NONE)); + protocol.on_connect(&mut ctx, who, make_status(&status, Roles::NONE)); } { let mut ctx = TestContext::default(); protocol.disconnect_bad_collator(&mut ctx, account_id); - assert!(ctx.disabled.contains(&peer_id)); + assert!(ctx.disabled.contains(&who)); } } diff --git a/substrate/substrate/network-libp2p/src/network_state.rs b/substrate/substrate/network-libp2p/src/network_state.rs index 789482554d12d22a61755ac4a199f5305c3ad181..2c1634ce91db68de1629e69f1561f208013becba 100644 --- a/substrate/substrate/network-libp2p/src/network_state.rs +++ b/substrate/substrate/network-libp2p/src/network_state.rs @@ -18,7 +18,7 @@ use bytes::Bytes; use fnv::{FnvHashMap, FnvHashSet}; use futures::sync::mpsc; use libp2p::core::{multiaddr::ToMultiaddr, Multiaddr, AddrComponent, Endpoint, UniqueConnec}; -use libp2p::core::{UniqueConnecState, PeerId as PeerstorePeerId, PublicKey}; +use libp2p::core::{UniqueConnecState, PeerId, PublicKey}; use libp2p::kad::KadConnecController; use libp2p::peerstore::{Peerstore, PeerAccess}; use libp2p::peerstore::json_peerstore::JsonPeerstore; @@ -26,7 +26,7 @@ use libp2p::peerstore::memory_peerstore::MemoryPeerstore; use libp2p::ping::Pinger; use libp2p::secio; use {Error, ErrorKind, NetworkConfiguration, NonReservedPeerMode}; -use {PeerId, ProtocolId, SessionInfo}; +use {NodeIndex, ProtocolId, SessionInfo}; use parking_lot::{Mutex, RwLock}; use rand::{self, Rng}; use std::cmp; @@ -46,7 +46,7 @@ const PEER_DISABLE_DURATION: Duration = Duration::from_secs(5 * 60); // Common struct shared throughout all the components of the service. pub struct NetworkState { /// Contains the information about the network. - peerstore: PeersStorage, + node_store: NodeStore, /// Active connections. connections: RwLock<Connections>, @@ -59,14 +59,14 @@ pub struct NetworkState { /// If true, only reserved peers can connect. reserved_only: atomic::AtomicBool, /// List of the IDs of the reserved peers. - reserved_peers: RwLock<FnvHashSet<PeerstorePeerId>>, + reserved_peers: RwLock<FnvHashSet<PeerId>>, - /// Each peer gets assigned a new unique ID. This ID increases linearly. - next_peer_id: atomic::AtomicUsize, + /// Each node we discover gets assigned a new unique ID. This ID increases linearly. + next_node_index: atomic::AtomicUsize, /// List of the IDs of the disabled peers. These peers will see their /// connections refused. Includes the time when the disabling expires. - disabled_peers: Mutex<FnvHashMap<PeerstorePeerId, Instant>>, + disabled_nodes: Mutex<FnvHashMap<PeerId, Instant>>, /// Local private key. local_private_key: secio::SecioKeyPair, @@ -74,7 +74,7 @@ pub struct NetworkState { local_public_key: PublicKey, } -enum PeersStorage { +enum NodeStore { /// Peers are stored in memory. Nothing is stored on disk. Memory(MemoryPeerstore), /// Peers are stored in a JSON file on the disk. @@ -84,10 +84,10 @@ enum PeersStorage { struct Connections { /// For each libp2p peer ID, the ID of the peer in the API we expose. /// Also corresponds to the index in `info_by_peer`. - peer_by_nodeid: FnvHashMap<PeerstorePeerId, usize>, + peer_by_nodeid: FnvHashMap<PeerId, usize>, /// For each peer ID, information about our connection to this peer. - info_by_peer: FnvHashMap<PeerId, PeerConnectionInfo>, + info_by_peer: FnvHashMap<NodeIndex, PeerConnectionInfo>, } struct PeerConnectionInfo { @@ -104,7 +104,7 @@ struct PeerConnectionInfo { ping_connec: UniqueConnec<Pinger>, /// Id of the peer. - id: PeerstorePeerId, + id: PeerId, /// True if this connection was initiated by us. /// Note that it is theoretically possible that we dial the remote at the @@ -130,7 +130,7 @@ struct PeerConnectionInfo { #[derive(Debug, Clone)] pub struct PeerInfo { /// Id of the peer. - pub id: PeerstorePeerId, + pub id: PeerId, /// True if this connection was initiated by us. /// Note that it is theoretically possible that we dial the remote at the @@ -172,21 +172,21 @@ impl NetworkState { let local_public_key = local_private_key.to_public_key(); // Build the storage for peers, including the bootstrap nodes. - let peerstore = if let Some(ref path) = config.net_config_path { + let node_store = if let Some(ref path) = config.net_config_path { let path = Path::new(path).join(NODES_FILE); - if let Ok(peerstore) = JsonPeerstore::new(path.clone()) { + if let Ok(node_store) = JsonPeerstore::new(path.clone()) { debug!(target: "sub-libp2p", "Initialized peer store for JSON \ file {:?}", path); - PeersStorage::Json(peerstore) + NodeStore::Json(node_store) } else { warn!(target: "sub-libp2p", "Failed to open peer storage {:?} \ ; peers won't be saved", path); - PeersStorage::Memory(MemoryPeerstore::empty()) + NodeStore::Memory(MemoryPeerstore::empty()) } } else { debug!(target: "sub-libp2p", "No peers file configured ; peers \ won't be saved"); - PeersStorage::Memory(MemoryPeerstore::empty()) + NodeStore::Memory(MemoryPeerstore::empty()) }; let reserved_peers = { @@ -195,7 +195,7 @@ impl NetworkState { Default::default() ); for peer in config.reserved_nodes.iter() { - let id = parse_and_add_to_peerstore(peer, &peerstore)?; + let id = parse_and_add_to_node_store(peer, &node_store)?; reserved_peers.insert(id); } RwLock::new(reserved_peers) @@ -205,7 +205,7 @@ impl NetworkState { config.reserved_nodes.len()); Ok(NetworkState { - peerstore, + node_store, min_peers: config.min_peers, max_peers: config.max_peers, connections: RwLock::new(Connections { @@ -214,8 +214,8 @@ impl NetworkState { }), reserved_only: atomic::AtomicBool::new(false), reserved_peers, - next_peer_id: atomic::AtomicUsize::new(0), - disabled_peers: Mutex::new(Default::default()), + next_node_index: atomic::AtomicUsize::new(0), + disabled_nodes: Mutex::new(Default::default()), local_private_key, local_public_key, }) @@ -234,13 +234,13 @@ impl NetworkState { /// Returns the ID of a random peer of the network. /// /// Returns `None` if we don't know any peer. - pub fn random_peer(&self) -> Option<PeerstorePeerId> { - // TODO: optimize by putting the operation directly in the peerstore + pub fn random_peer(&self) -> Option<PeerId> { + // TODO: optimize by putting the operation directly in the node_store // https://github.com/libp2p/rust-libp2p/issues/316 - let peers = match self.peerstore { - PeersStorage::Memory(ref mem) => + let peers = match self.node_store { + NodeStore::Memory(ref mem) => mem.peers().collect::<Vec<_>>(), - PeersStorage::Json(ref json) => + NodeStore::Json(ref json) => json.peers().collect::<Vec<_>>(), }; @@ -255,11 +255,11 @@ impl NetworkState { /// Returns all the IDs of the peers on the network we have knowledge of. /// /// This includes peers we are not connected to. - pub fn known_peers(&self) -> impl Iterator<Item = PeerstorePeerId> { - match self.peerstore { - PeersStorage::Memory(ref mem) => + pub fn known_peers(&self) -> impl Iterator<Item = PeerId> { + match self.node_store { + NodeStore::Memory(ref mem) => mem.peers().collect::<Vec<_>>().into_iter(), - PeersStorage::Json(ref json) => + NodeStore::Json(ref json) => json.peers().collect::<Vec<_>>().into_iter(), } } @@ -270,34 +270,32 @@ impl NetworkState { } /// Get a list of all connected peers by id. - pub fn connected_peers(&self) -> Vec<PeerId> { + pub fn connected_peers(&self) -> Vec<NodeIndex> { self.connections.read().peer_by_nodeid.values().cloned().collect() } - /// Returns true if the given `PeerId` is valid. + /// Returns true if the given `NodeIndex` is valid. /// - /// `PeerId`s are never reused, so once this function returns `false` it - /// will never return `true` again for the same `PeerId`. - pub fn is_peer_connected(&self, peer: PeerId) -> bool { + /// `NodeIndex`s are never reused, so once this function returns `false` it + /// will never return `true` again for the same `NodeIndex`. + pub fn is_peer_connected(&self, peer: NodeIndex) -> bool { self.connections.read().info_by_peer.contains_key(&peer) } /// Reports the ping of the peer. Returned later by `session_info()`. - /// No-op if the `peer_id` is not valid/expired. - pub fn report_ping_duration(&self, peer_id: PeerId, ping: Duration) { - let connections = self.connections.read(); - let info = match connections.info_by_peer.get(&peer_id) { + /// No-op if the `who` is not valid/expired. + pub fn report_ping_duration(&self, who: NodeIndex, ping: Duration) { + let mut connections = self.connections.write(); + let info = match connections.info_by_peer.get_mut(&who) { Some(info) => info, None => return, }; - *info.ping.lock() = Some(ping); } /// If we're connected to a peer with the given protocol, returns /// information about the connection. Otherwise, returns `None`. - pub fn session_info(&self, peer: PeerId, protocol: ProtocolId) - -> Option<SessionInfo> { + pub fn session_info(&self, peer: NodeIndex, protocol: ProtocolId) -> Option<SessionInfo> { let connections = self.connections.read(); let info = match connections.info_by_peer.get(&peer) { Some(info) => info, @@ -333,8 +331,7 @@ impl NetworkState { /// If we're connected to a peer with the given protocol, returns the /// protocol version. Otherwise, returns `None`. - pub fn protocol_version(&self, peer: PeerId, protocol: ProtocolId) - -> Option<u8> { + pub fn protocol_version(&self, peer: NodeIndex, protocol: ProtocolId) -> Option<u8> { let connections = self.connections.read(); let peer = match connections.info_by_peer.get(&peer) { Some(peer) => peer, @@ -348,8 +345,7 @@ impl NetworkState { } /// Equivalent to `session_info(peer).map(|info| info.client_version)`. - pub fn peer_client_version(&self, peer: PeerId, protocol: ProtocolId) - -> Option<String> { + pub fn peer_client_version(&self, peer: NodeIndex, protocol: ProtocolId) -> Option<String> { // TODO: implement more directly, without going through `session_info` self.session_info(peer, protocol) .map(|info| info.client_version) @@ -357,14 +353,14 @@ impl NetworkState { /// Adds an address discovered by Kademlia. /// Note that we don't have to be connected to a peer to add an address. - pub fn add_kad_discovered_addr(&self, node_id: &PeerstorePeerId, addr: Multiaddr) { + pub fn add_kad_discovered_addr(&self, node_id: &PeerId, addr: Multiaddr) { trace!(target: "sub-libp2p", "Peer store: adding address {} for {:?}", addr, node_id); - match self.peerstore { - PeersStorage::Memory(ref mem) => + match self.node_store { + NodeStore::Memory(ref mem) => mem.peer_or_create(node_id) .add_addr(addr, Duration::from_secs(3600)), - PeersStorage::Json(ref json) => + NodeStore::Json(ref json) => json.peer_or_create(node_id) .add_addr(addr, Duration::from_secs(3600)), } @@ -373,15 +369,14 @@ impl NetworkState { /// Signals that an address doesn't match the corresponding node ID. /// This removes the address from the peer store, so that it is not /// returned by `addrs_of_peer` again in the future. - pub fn set_invalid_kad_address(&self, node_id: &PeerstorePeerId, - addr: &Multiaddr) { + pub fn set_invalid_kad_address(&self, node_id: &PeerId, addr: &Multiaddr) { // TODO: blacklist the address? - match self.peerstore { - PeersStorage::Memory(ref mem) => + match self.node_store { + NodeStore::Memory(ref mem) => if let Some(mut peer) = mem.peer(node_id) { peer.rm_addr(addr.clone()) // TODO: cloning necessary? }, - PeersStorage::Json(ref json) => + NodeStore::Json(ref json) => if let Some(mut peer) = json.peer(node_id) { peer.rm_addr(addr.clone()) // TODO: cloning necessary? }, @@ -389,14 +384,14 @@ impl NetworkState { } /// Returns the known multiaddresses of a peer. - pub fn addrs_of_peer(&self, node_id: &PeerstorePeerId) -> Vec<Multiaddr> { - match self.peerstore { - PeersStorage::Memory(ref mem) => + pub fn addrs_of_peer(&self, node_id: &PeerId) -> Vec<Multiaddr> { + match self.node_store { + NodeStore::Memory(ref mem) => mem.peer(node_id) .into_iter() .flat_map(|p| p.addrs()) .collect::<Vec<_>>(), - PeersStorage::Json(ref json) => + NodeStore::Json(ref json) => json.peer(node_id) .into_iter() .flat_map(|p| p.addrs()) @@ -407,35 +402,35 @@ impl NetworkState { /// Sets information about a peer. pub fn set_peer_info( &self, - node_id: PeerstorePeerId, + node_id: PeerId, endpoint: Endpoint, client_version: String, local_addr: Multiaddr, remote_addr: Multiaddr - ) -> Result<PeerId, IoError> { + ) -> Result<NodeIndex, IoError> { let mut connections = self.connections.write(); - let peer_id = accept_connection(&mut connections, &self.next_peer_id, + let who = accept_connection(&mut connections, &self.next_node_index, node_id.clone(), endpoint)?; - let infos = connections.info_by_peer.get_mut(&peer_id) + let infos = connections.info_by_peer.get_mut(&who) .expect("Newly-created peer id is always valid"); infos.client_version = Some(client_version); infos.remote_address = Some(remote_addr); infos.local_address = Some(local_addr); - Ok(peer_id) + Ok(who) } /// Adds a peer to the internal peer store. /// Returns an error if the peer address is invalid. - pub fn add_peer(&self, peer: &str) -> Result<PeerstorePeerId, Error> { - parse_and_add_to_peerstore(peer, &self.peerstore) + pub fn add_peer(&self, peer: &str) -> Result<PeerId, Error> { + parse_and_add_to_node_store(peer, &self.node_store) } /// Adds a reserved peer to the list of reserved peers. /// Returns an error if the peer address is invalid. pub fn add_reserved_peer(&self, peer: &str) -> Result<(), Error> { - let id = parse_and_add_to_peerstore(peer, &self.peerstore)?; + let id = parse_and_add_to_node_store(peer, &self.node_store)?; self.reserved_peers.write().insert(id); Ok(()) } @@ -444,14 +439,14 @@ impl NetworkState { /// active connection to this peer. /// Returns an error if the peer address is invalid. pub fn remove_reserved_peer(&self, peer: &str) -> Result<(), Error> { - let id = parse_and_add_to_peerstore(peer, &self.peerstore)?; + let id = parse_and_add_to_node_store(peer, &self.node_store)?; self.reserved_peers.write().remove(&id); // Dropping the peer if we're in reserved mode. if self.reserved_only.load(atomic::Ordering::SeqCst) { let mut connections = self.connections.write(); - if let Some(peer_id) = connections.peer_by_nodeid.remove(&id) { - connections.info_by_peer.remove(&peer_id); + if let Some(who) = connections.peer_by_nodeid.remove(&id) { + connections.info_by_peer.remove(&who); } } @@ -486,7 +481,7 @@ impl NetworkState { } /// Returns true if we are connected to the given node. - pub fn has_connection(&self, node_id: &PeerstorePeerId) -> bool { + pub fn has_connection(&self, node_id: &PeerId) -> bool { let connections = self.connections.read(); connections.peer_by_nodeid.contains_key(node_id) } @@ -494,56 +489,56 @@ impl NetworkState { /// Obtains the `UniqueConnec` corresponding to the Kademlia connection to a peer. pub fn kad_connection( &self, - node_id: PeerstorePeerId - ) -> Result<(PeerId, UniqueConnec<KadConnecController>), IoError> { + node_id: PeerId + ) -> Result<(NodeIndex, UniqueConnec<KadConnecController>), IoError> { // TODO: check that the peer is disabled? should disabling a peer also prevent // kad from working? let mut connections = self.connections.write(); - let peer_id = accept_connection(&mut connections, &self.next_peer_id, + let who = accept_connection(&mut connections, &self.next_node_index, node_id, Endpoint::Listener)?; - let infos = connections.info_by_peer.get_mut(&peer_id) + let infos = connections.info_by_peer.get_mut(&who) .expect("Newly-created peer id is always valid"); let connec = infos.kad_connec.clone(); - Ok((peer_id, connec)) + Ok((who, connec)) } /// Obtains the `UniqueConnec` corresponding to the Ping connection to a peer. pub fn ping_connection( &self, - node_id: PeerstorePeerId - ) -> Result<(PeerId, UniqueConnec<Pinger>), IoError> { + node_id: PeerId + ) -> Result<(NodeIndex, UniqueConnec<Pinger>), IoError> { let mut connections = self.connections.write(); - let peer_id = accept_connection(&mut connections, &self.next_peer_id, + let who = accept_connection(&mut connections, &self.next_node_index, node_id, Endpoint::Listener)?; - let infos = connections.info_by_peer.get_mut(&peer_id) + let infos = connections.info_by_peer.get_mut(&who) .expect("Newly-created peer id is always valid"); let connec = infos.ping_connec.clone(); - Ok((peer_id, connec)) + Ok((who, connec)) } /// Cleans up inactive connections and returns a list of /// connections to ping. pub fn cleanup_and_prepare_ping( &self - ) -> Vec<(PeerId, PeerstorePeerId, UniqueConnec<Pinger>)> { + ) -> Vec<(NodeIndex, PeerId, UniqueConnec<Pinger>)> { let mut connections = self.connections.write(); let connections = &mut *connections; let peer_by_nodeid = &mut connections.peer_by_nodeid; let info_by_peer = &mut connections.info_by_peer; let mut ret = Vec::with_capacity(info_by_peer.len()); - info_by_peer.retain(|&peer_id, infos| { + info_by_peer.retain(|&who, infos| { // Remove the peer if neither Kad nor any protocol is alive. if !infos.kad_connec.is_alive() && !infos.protocols.iter().any(|(_, conn)| conn.is_alive()) { peer_by_nodeid.remove(&infos.id); trace!(target: "sub-libp2p", "Cleaning up expired peer \ - #{:?} ({:?})", peer_id, infos.id); + #{:?} ({:?})", who, infos.id); return false; } - ret.push((peer_id, infos.id.clone(), infos.ping_connec.clone())); + ret.push((who, infos.id.clone(), infos.ping_connec.clone())); true }); ret @@ -551,8 +546,8 @@ impl NetworkState { /// Try to add a new connection to a node in the list. /// - /// Returns a `PeerId` to allow further interfacing with this connection. - /// Note that all `PeerId`s are unique and never reused. + /// Returns a `NodeIndex` to allow further interfacing with this connection. + /// Note that all `NodeIndex`s are unique and never reused. /// /// Can return an error if we are refusing the connection to the remote. /// @@ -563,25 +558,23 @@ impl NetworkState { /// so by dropping this sender. pub fn custom_proto( &self, - node_id: PeerstorePeerId, + node_id: PeerId, protocol_id: ProtocolId, endpoint: Endpoint, - ) -> Result<(PeerId, UniqueConnec<(mpsc::UnboundedSender<Bytes>, u8)>), IoError> { + ) -> Result<(NodeIndex, UniqueConnec<(mpsc::UnboundedSender<Bytes>, u8)>), IoError> { let mut connections = self.connections.write(); - if is_peer_disabled(&self.disabled_peers, &node_id) { - debug!(target: "sub-libp2p", "Refusing node {:?} because it was \ - disabled", node_id); - return Err(IoError::new(IoErrorKind::PermissionDenied, - "disabled peer")) + if is_peer_disabled(&self.disabled_nodes, &node_id) { + debug!(target: "sub-libp2p", "Refusing node {:?} because it was disabled", node_id); + return Err(IoError::new(IoErrorKind::PermissionDenied, "disabled peer")) } - let peer_id = accept_connection(&mut connections, &self.next_peer_id, + let who = accept_connection(&mut connections, &self.next_node_index, node_id.clone(), endpoint)?; let num_open_connections = num_open_custom_connections(&connections); - let infos = connections.info_by_peer.get_mut(&peer_id) + let infos = connections.info_by_peer.get_mut(&who) .expect("Newly-created peer id is always valid"); let node_is_reserved = self.reserved_peers.read().contains(&infos.id); @@ -589,27 +582,24 @@ impl NetworkState { if self.reserved_only.load(atomic::Ordering::Relaxed) || num_open_connections >= self.max_peers { - debug!(target: "sub-libp2p", "Refusing node {:?} because we \ - reached the max number of peers", node_id); - return Err(IoError::new(IoErrorKind::PermissionDenied, - "maximum number of peers reached")) + debug!(target: "sub-libp2p", "Refusing node {:?} because we reached the max number of peers", node_id); + return Err(IoError::new(IoErrorKind::PermissionDenied, "maximum number of peers reached")) } } if let Some((_, ref uconn)) = infos.protocols.iter().find(|&(prot, _)| prot == &protocol_id) { - return Ok((peer_id, uconn.clone())) + return Ok((who, uconn.clone())) } let unique_connec = UniqueConnec::empty(); infos.protocols.push((protocol_id.clone(), unique_connec.clone())); - Ok((peer_id, unique_connec)) + Ok((who, unique_connec)) } /// Sends some data to the given peer, using the sender that was passed /// to the `UniqueConnec` of `custom_proto`. - pub fn send(&self, protocol: ProtocolId, peer_id: PeerId, message: Bytes) - -> Result<(), Error> { - if let Some(peer) = self.connections.read().info_by_peer.get(&peer_id) { + pub fn send(&self, protocol: ProtocolId, who: NodeIndex, message: Bytes) -> Result<(), Error> { + if let Some(peer) = self.connections.read().info_by_peer.get(&who) { let sender = peer.protocols.iter().find(|elem| elem.0 == protocol) .and_then(|e| e.1.poll()) .map(|e| e.0); @@ -622,37 +612,35 @@ impl NetworkState { // protocol. debug!(target: "sub-libp2p", "Tried to send message to peer {} for which we aren't connected with the requested protocol", - peer_id + who ); return Err(ErrorKind::PeerNotFound.into()) } } else { - debug!(target: "sub-libp2p", "Tried to send message to invalid peer ID {}", peer_id); + debug!(target: "sub-libp2p", "Tried to send message to invalid peer ID {}", who); return Err(ErrorKind::PeerNotFound.into()) } } /// Get the info on a peer, if there's an active connection. - pub fn peer_info(&self, who: PeerId) -> Option<PeerInfo> { + pub fn peer_info(&self, who: NodeIndex) -> Option<PeerInfo> { self.connections.read().info_by_peer.get(&who).map(Into::into) } + /// Reports that an attempt to make a low-level ping of the peer failed. + pub fn report_ping_failed(&self, who: NodeIndex) { + self.drop_peer(who); + } + /// Disconnects a peer, if a connection exists (ie. drops the Kademlia /// controller, and the senders that were stored in the `UniqueConnec` of /// `custom_proto`). - pub fn drop_peer(&self, peer_id: PeerId, reason: Option<&str>) { + pub fn drop_peer(&self, who: NodeIndex) { let mut connections = self.connections.write(); - if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) { - if let Some(reason) = reason { - if let (&Some(ref client_version), &Some(ref remote_address)) = (&peer_info.client_version, &peer_info.remote_address) { - debug!(target: "sub-libp2p", "Disconnected peer {} (version: {}, address: {}). {}", peer_id, client_version, remote_address, reason); - } else { - debug!(target: "sub-libp2p", "Disconnected peer {}. {}", peer_id, reason); - } - } - - trace!(target: "sub-libp2p", "Destroying peer #{} {:?} ; \ - kademlia = {:?} ; num_protos = {:?}", peer_id, peer_info.id, + if let Some(peer_info) = connections.info_by_peer.remove(&who) { + trace!(target: "sub-libp2p", "Destroying peer #{} {:?} ; kademlia = {:?} ; num_protos = {:?}", + who, + peer_info.id, peer_info.kad_connec.is_alive(), peer_info.protocols.iter().filter(|c| c.1.is_alive()).count()); // TODO: we manually clear the connections as a work-around for @@ -660,7 +648,7 @@ impl NetworkState { for c in peer_info.protocols.iter() { c.1.clear(); } peer_info.kad_connec.clear(); let old = connections.peer_by_nodeid.remove(&peer_info.id); - debug_assert_eq!(old, Some(peer_id)); + debug_assert_eq!(old, Some(who)); } } @@ -681,18 +669,18 @@ impl NetworkState { /// list of disabled peers, and drops any existing connections if /// necessary (ie. drops the sender that was stored in the `UniqueConnec` /// of `custom_proto`). - pub fn disable_peer(&self, peer_id: PeerId, reason: &str) { + pub fn ban_peer(&self, who: NodeIndex, reason: &str) { // TODO: what do we do if the peer is reserved? // TODO: same logging as in disconnect_peer let mut connections = self.connections.write(); - let peer_info = if let Some(peer_info) = connections.info_by_peer.remove(&peer_id) { + let peer_info = if let Some(peer_info) = connections.info_by_peer.remove(&who) { if let (&Some(ref client_version), &Some(ref remote_address)) = (&peer_info.client_version, &peer_info.remote_address) { - info!(target: "network", "Peer {} (version: {}, address: {}) disabled. {}", peer_id, client_version, remote_address, reason); + info!(target: "network", "Peer {} (version: {}, address: {}) disabled. {}", who, client_version, remote_address, reason); } else { - info!(target: "network", "Peer {} disabled. {}", peer_id, reason); + info!(target: "network", "Peer {} disabled. {}", who, reason); } let old = connections.peer_by_nodeid.remove(&peer_info.id); - debug_assert_eq!(old, Some(peer_id)); + debug_assert_eq!(old, Some(who)); peer_info } else { return @@ -700,7 +688,7 @@ impl NetworkState { drop(connections); let timeout = Instant::now() + PEER_DISABLE_DURATION; - self.disabled_peers.lock().insert(peer_info.id.clone(), timeout); + self.disabled_nodes.lock().insert(peer_info.id.clone(), timeout); } /// Flushes the caches to the disk. @@ -708,18 +696,16 @@ impl NetworkState { /// This is done in an atomical way, so that an error doesn't corrupt /// anything. pub fn flush_caches_to_disk(&self) -> Result<(), IoError> { - match self.peerstore { - PeersStorage::Memory(_) => Ok(()), - PeersStorage::Json(ref json) => + match self.node_store { + NodeStore::Memory(_) => Ok(()), + NodeStore::Json(ref json) => match json.flush() { Ok(()) => { - debug!(target: "sub-libp2p", "Flushed JSON peer store \ - to disk"); + debug!(target: "sub-libp2p", "Flushed JSON peer store to disk"); Ok(()) } Err(err) => { - warn!(target: "sub-libp2p", "Failed to flush changes \ - to JSON peer store: {}", err); + warn!(target: "sub-libp2p", "Failed to flush changes to JSON peer store: {}", err); Err(err) } } @@ -733,23 +719,22 @@ impl Drop for NetworkState { } } -/// Assigns a `PeerId` to a node, or returns an existing ID if any exists. +/// Assigns a `NodeIndex` to a node, or returns an existing ID if any exists. /// /// The function only accepts already-locked structs, so that we don't risk /// any deadlock. fn accept_connection( connections: &mut Connections, - next_peer_id: &atomic::AtomicUsize, - node_id: PeerstorePeerId, + next_node_index: &atomic::AtomicUsize, + node_id: PeerId, endpoint: Endpoint -) -> Result<PeerId, IoError> { +) -> Result<NodeIndex, IoError> { let peer_by_nodeid = &mut connections.peer_by_nodeid; let info_by_peer = &mut connections.info_by_peer; - let peer_id = *peer_by_nodeid.entry(node_id.clone()).or_insert_with(|| { - let new_id = next_peer_id.fetch_add(1, atomic::Ordering::Relaxed); - trace!(target: "sub-libp2p", "Creating new peer #{:?} for {:?}", - new_id, node_id); + let who = *peer_by_nodeid.entry(node_id.clone()).or_insert_with(|| { + let new_id = next_node_index.fetch_add(1, atomic::Ordering::Relaxed); + trace!(target: "sub-libp2p", "Creating new peer #{:?} for {:?}", new_id, node_id); info_by_peer.insert(new_id, PeerConnectionInfo { protocols: Vec::new(), // TODO: Vec::with_capacity(num_registered_protocols), @@ -765,13 +750,13 @@ fn accept_connection( new_id }); - Ok(peer_id) + Ok(who) } /// Returns true if a peer is disabled. fn is_peer_disabled( - list: &Mutex<FnvHashMap<PeerstorePeerId, Instant>>, - peer: &PeerstorePeerId + list: &Mutex<FnvHashMap<PeerId, Instant>>, + peer: &PeerId ) -> bool { let mut list = list.lock(); if let Some(timeout) = list.get(peer).cloned() { @@ -804,30 +789,32 @@ fn num_open_custom_connections(connections: &Connections) -> u32 { } /// Parses an address of the form `/ip4/x.x.x.x/tcp/x/p2p/xxxxxx`, and adds it -/// to the given peerstore. Returns the corresponding peer ID. -fn parse_and_add_to_peerstore(addr_str: &str, peerstore: &PeersStorage) - -> Result<PeerstorePeerId, Error> { +/// to the given node_store. Returns the corresponding peer ID. +fn parse_and_add_to_node_store( + addr_str: &str, + node_store: &NodeStore +) -> Result<PeerId, Error> { let mut addr = addr_str.to_multiaddr().map_err(|_| ErrorKind::AddressParse)?; - let peer_id = match addr.pop() { + let who = match addr.pop() { Some(AddrComponent::P2P(key)) | Some(AddrComponent::IPFS(key)) => - PeerstorePeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?, + PeerId::from_bytes(key).map_err(|_| ErrorKind::AddressParse)?, _ => return Err(ErrorKind::AddressParse.into()), }; // Registering the bootstrap node with a TTL of 100000 years TODO: wrong - match peerstore { - PeersStorage::Memory(ref peerstore) => - peerstore - .peer_or_create(&peer_id) + match node_store { + NodeStore::Memory(ref node_store) => + node_store + .peer_or_create(&who) .add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)), - PeersStorage::Json(ref peerstore) => - peerstore - .peer_or_create(&peer_id) + NodeStore::Json(ref node_store) => + node_store + .peer_or_create(&who) .add_addr(addr, Duration::from_secs(100000 * 365 * 24 * 3600)), } - Ok(peer_id) + Ok(who) } /// Obtains or generates the local private key using the configuration. @@ -848,9 +835,11 @@ fn obtain_private_key(config: &NetworkConfiguration) Ok(s) => Ok(s), Err(err) => { // Failed to fetch existing file ; generate a new key - trace!(target: "sub-libp2p", "Failed to load existing \ - secret key file {:?}, generating new key ; err = {:?}", - secret_path, err); + trace!(target: "sub-libp2p", + "Failed to load existing secret key file {:?}, generating new key ; err = {:?}", + secret_path, + err + ); Ok(gen_key_and_try_write_to_file(&secret_path)) } } @@ -861,8 +850,7 @@ fn obtain_private_key(config: &NetworkConfiguration) let mut key: [u8; 32] = [0; 32]; rand::rngs::EntropyRng::new().fill(&mut key); Ok(secio::SecioKeyPair::secp256k1_raw_key(&key) - .expect("randomly-generated key with correct len should \ - always be valid")) + .expect("randomly-generated key with correct len should always be valid")) } } } @@ -899,12 +887,17 @@ fn gen_key_and_try_write_to_file<P>(path: P) -> secio::SecioKeyPair Ok(mut file) => match file.write_all(&raw_key) { Ok(()) => (), - Err(err) => warn!(target: "sub-libp2p", "Failed to write \ - secret key in file {:?} ; err = {:?}", path.as_ref(), err), + Err(err) => warn!(target: "sub-libp2p", + "Failed to write secret key in file {:?} ; err = {:?}", + path.as_ref(), + err + ), }, - Err(err) => - warn!(target: "sub-libp2p", "Failed to store secret key in file \ - {:?} ; err = {:?}", path.as_ref(), err), + Err(err) => warn!(target: "sub-libp2p", + "Failed to store secret key in file {:?} ; err = {:?}", + path.as_ref(), + err + ), } secio_key @@ -943,13 +936,13 @@ mod tests { let state = NetworkState::new(&Default::default()).unwrap(); let example_peer = PublicKey::Rsa(vec![1, 2, 3, 4]).into_peer_id(); - let (peer_id, _) = state.custom_proto( + let (who, _) = state.custom_proto( example_peer.clone(), [1, 2, 3], Endpoint::Dialer ).unwrap(); - state.disable_peer(peer_id, "Just a test"); + state.ban_peer(who, "Just a test"); assert!(state.custom_proto( example_peer.clone(), diff --git a/substrate/substrate/network-libp2p/src/service.rs b/substrate/substrate/network-libp2p/src/service.rs index bc1dcc5d035022dc55169f00bc2ac72a1142baab..2d9ac8946667f80bf3085ec44c54b25c03c69596 100644 --- a/substrate/substrate/network-libp2p/src/service.rs +++ b/substrate/substrate/network-libp2p/src/service.rs @@ -16,7 +16,7 @@ use bytes::Bytes; use {Error, ErrorKind, NetworkConfiguration, NetworkProtocolHandler}; -use {NonReservedPeerMode, NetworkContext, Severity, PeerId, ProtocolId}; +use {NonReservedPeerMode, NetworkContext, Severity, NodeIndex, ProtocolId}; use parking_lot::{Mutex, RwLock}; use libp2p; use libp2p::multiaddr::{AddrComponent, Multiaddr}; @@ -77,7 +77,7 @@ struct Shared { kad_upgrade: KadConnecConfig, /// List of protocols available on the network. It is a logic error to - /// remote protocols from this list, and the code may assume that protocols + /// remove protocols from this list, and the code may assume that protocols /// stay at the same index forever. protocols: RwLock<RegisteredProtocols<Arc<NetworkProtocolHandler + Send + Sync>>>, @@ -149,8 +149,7 @@ impl NetworkService { // reach us self.shared.original_listened_addr.read().as_ref() .map(|addr| - format!("{}/p2p/{}", addr, - self.shared.kad_system.local_peer_id().to_base58()) + format!("{}/p2p/{}", addr, self.shared.kad_system.local_peer_id().to_base58()) ) } @@ -204,8 +203,7 @@ impl NetworkService { let fut = match init_thread(core.handle(), shared, timeouts_register_rx, close_rx) { Ok(future) => { - debug!(target: "sub-libp2p", "Successfully started \ - networking service"); + debug!(target: "sub-libp2p", "Successfully started networking service"); let _ = init_tx.send(Ok(())); future }, @@ -217,8 +215,7 @@ impl NetworkService { match core.run(fut) { Ok(()) => debug!(target: "sub-libp2p", "libp2p future finished"), - Err(err) => error!(target: "sub-libp2p", "error while running \ - libp2p: {:?}", err), + Err(err) => error!(target: "sub-libp2p", "error while running libp2p: {:?}", err), } }); @@ -234,8 +231,7 @@ impl NetworkService { if let Some((close_tx, join)) = self.bg_thread.lock().take() { let _ = close_tx.send(()); if let Err(e) = join.join() { - warn!(target: "sub-libp2p", "error while waiting on libp2p \ - background thread: {:?}", e); + warn!(target: "sub-libp2p", "error while waiting on libp2p background thread: {:?}", e); } } @@ -243,7 +239,7 @@ impl NetworkService { } /// Get a list of all connected peers by id. - pub fn connected_peers(&self) -> Vec<PeerId> { + pub fn connected_peers(&self) -> Vec<NodeIndex> { self.shared.network_state.connected_peers() } @@ -295,18 +291,18 @@ impl Drop for NetworkService { struct NetworkContextImpl { inner: Arc<Shared>, protocol: ProtocolId, - current_peer: Option<PeerId>, + current_peer: Option<NodeIndex>, } impl NetworkContext for NetworkContextImpl { - fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) { + fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) { self.send_protocol(self.protocol, peer, packet_id, data) } fn send_protocol( &self, protocol: ProtocolId, - peer: PeerId, + peer: NodeIndex, packet_id: PacketId, data: Vec<u8> ) { @@ -318,7 +314,8 @@ impl NetworkContext for NetworkContextImpl { message.extend_from_slice(&[packet_id]); message.extend_from_slice(&data); if self.inner.network_state.send(protocol, peer, message).is_err() { - self.inner.network_state.drop_peer(peer, Some("Sending to peer failed")); + debug!(target: "sub-libp2p", "Sending to peer {} failed. Dropping.", peer); + self.inner.network_state.drop_peer(peer); } } @@ -330,18 +327,24 @@ impl NetworkContext for NetworkContextImpl { } } - fn report_peer(&self, peer: PeerId, reason: Severity) { + fn report_peer(&self, peer: NodeIndex, reason: Severity) { if let Some(info) = self.inner.network_state.peer_info(peer) { if let (Some(client_version), Some(remote_address)) = (info.client_version, info.remote_address) { - info!(target: "sub-libp2p", "Peer {} ({} {}) reported by client: {}", peer, remote_address, client_version, reason); + info!(target: "sub-libp2p", + "Peer {} ({} {}) reported by client: {}", + peer, + remote_address, + client_version, + reason + ); } else { info!(target: "sub-libp2p", "Peer {} reported by client: {}", peer, reason); } } match reason { - Severity::Bad(reason) => self.inner.network_state.disable_peer(peer, reason), - Severity::Useless(reason) => self.inner.network_state.drop_peer(peer, Some(reason)), - Severity::Timeout => self.inner.network_state.drop_peer(peer, Some("Timeout waiting for response")), + Severity::Bad(reason) => self.inner.network_state.ban_peer(peer, reason), + Severity::Useless(_) => self.inner.network_state.drop_peer(peer), + Severity::Timeout => self.inner.network_state.drop_peer(peer), } } @@ -368,17 +371,17 @@ impl NetworkContext for NetworkContextImpl { Ok(()) } - fn peer_client_version(&self, peer: PeerId) -> String { + fn peer_client_version(&self, peer: NodeIndex) -> String { // Devp2p returns "unknown" on unknown peer ID, so we do the same. self.inner.network_state.peer_client_version(peer, self.protocol) .unwrap_or_else(|| "unknown".to_string()) } - fn session_info(&self, peer: PeerId) -> Option<SessionInfo> { + fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo> { self.inner.network_state.session_info(peer, self.protocol) } - fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> { + fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8> { self.inner.network_state.protocol_version(peer, protocol) } @@ -394,7 +397,9 @@ impl NetworkContext for NetworkContextImpl { fn init_thread( core: Handle, shared: Arc<Shared>, - timeouts_register_rx: mpsc::UnboundedReceiver<(Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken))>, + timeouts_register_rx: mpsc::UnboundedReceiver< + (Duration, (Arc<NetworkProtocolHandler + Send + Sync + 'static>, ProtocolId, TimerToken)) + >, close_rx: oneshot::Receiver<()> ) -> Result<impl Future<Item = (), Error = IoError>, Error> { // Build the transport layer. @@ -407,11 +412,10 @@ fn init_thread( let addr_resolver = { let shared = shared.clone(); - move |peer_id| { - let addrs = shared.network_state.addrs_of_peer(&peer_id); + move |who| { + let addrs = shared.network_state.addrs_of_peer(&who); for addr in &addrs { - trace!(target: "sub-libp2p", "{:?} resolved as {}", - peer_id, addr); + trace!(target: "sub-libp2p", "{:?} resolved as {}", who, addr); } addrs.into_iter() } @@ -473,8 +477,7 @@ fn init_thread( *shared.original_listened_addr.write() = Some(new_addr.clone()); }, Err(_) => { - warn!(target: "sub-libp2p", "Can't listen on {}, protocol not \ - supported", listen_addr); + warn!(target: "sub-libp2p", "Can't listen on {}, protocol not supported", listen_addr); return Err(ErrorKind::BadProtocol.into()) }, } @@ -482,14 +485,14 @@ fn init_thread( // Explicitely connect to _all_ the boostrap nodes as a temporary measure. for bootnode in shared.config.boot_nodes.iter() { match shared.network_state.add_peer(bootnode) { - Ok(peer_id) => { - trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id); + Ok(who) => { + trace!(target: "sub-libp2p", "Dialing bootnode {:?}", who); for proto in shared.protocols.read().0.clone().into_iter() { open_peer_custom_proto( shared.clone(), transport.clone(), proto, - peer_id.clone(), + who.clone(), &swarm_controller ) } @@ -508,7 +511,7 @@ fn init_thread( }; if let Ok(addr) = multi { - trace!(target: "sub-libp2p", "Missing PeerId for Bootnode {:}. Querying", bootnode); + trace!(target: "sub-libp2p", "Missing NodeIndex for Bootnode {:}. Querying", bootnode); for proto in shared.protocols.read().0.clone().into_iter() { connect_with_query_peer_id( shared.clone(), @@ -520,7 +523,7 @@ fn init_thread( } } else { warn!(target: "sub-libp2p", "Not a valid Bootnode Address {:}", bootnode); - continue; + continue; } }, Err(err) => warn!(target:"sub-libp2p", "Couldn't parse Bootnode Address: {}", err), @@ -560,8 +563,7 @@ fn init_thread( .select(close_rx.then(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err) .and_then(move |_| { - debug!(target: "sub-libp2p", "Networking ended ; disconnecting \ - all peers"); + debug!(target: "sub-libp2p", "Networking ended ; disconnecting all peers"); shared.network_state.disconnect_all(); Ok(()) })) @@ -593,8 +595,7 @@ fn listener_handle<'a, C>( where C: AsyncRead + AsyncWrite + 'a { match upgrade { FinalUpgrade::Kad(controller, kademlia_stream, client_addr) => { - trace!(target: "sub-libp2p", "Opened kademlia substream with {:?}", - client_addr); + trace!(target: "sub-libp2p", "Opened kademlia substream with {:?}", client_addr); match handle_kademlia_connection(shared, client_addr, controller, kademlia_stream) { Ok(fut) => Box::new(fut) as Box<_>, Err(err) => Box::new(future::err(err)) as Box<_>, @@ -624,8 +625,7 @@ fn listener_handle<'a, C>( let node_id = p2p_multiaddr_to_node_id(client_addr); match shared.network_state.ping_connection(node_id.clone()) { Ok((_, ping_connec)) => { - trace!(target: "sub-libp2p", "Successfully opened ping \ - substream with {:?}", node_id); + trace!(target: "sub-libp2p", "Successfully opened ping substream with {:?}", node_id); let fut = ping_connec.set_until(pinger, future); Box::new(fut) as Box<_> }, @@ -650,7 +650,7 @@ fn handle_kademlia_connection( kademlia_stream: Box<Stream<Item = KadIncomingRequest, Error = IoError>> ) -> Result<impl Future<Item = (), Error = IoError>, IoError> { let node_id = p2p_multiaddr_to_node_id(client_addr); - let (peer_id, kad_connec) = shared.network_state + let (who, kad_connec) = shared.network_state .kad_connection(node_id.clone())?; let node_id2 = node_id.clone(); @@ -679,8 +679,7 @@ fn handle_kademlia_connection( Ok(future::Loop::Continue(rest)) }) }).then(move |val| { - trace!(target: "sub-libp2p", "Closed Kademlia connection \ - with #{} {:?} => {:?}", peer_id, node_id2, val); + trace!(target: "sub-libp2p", "Closed Kademlia connection with #{} {:?} => {:?}", who, node_id2, val); val }); @@ -695,16 +694,16 @@ fn build_kademlia_response( ) -> Vec<KadPeer> { shared.kad_system .known_closest_peers(searched) - .map(move |peer_id| { - if peer_id == *shared.kad_system.local_peer_id() { + .map(move |who| { + if who == *shared.kad_system.local_peer_id() { KadPeer { - node_id: peer_id.clone(), + node_id: who.clone(), multiaddrs: shared.listened_addrs.read().clone(), connection_ty: KadConnectionType::Connected, } } else { - let addrs = shared.network_state.addrs_of_peer(&peer_id); - let connec_ty = if shared.network_state.has_connection(&peer_id) { + let addrs = shared.network_state.addrs_of_peer(&who); + let connec_ty = if shared.network_state.has_connection(&who) { // TODO: this only checks connections with substrate ; but what // if we're connected through Kademlia only? KadConnectionType::Connected @@ -713,7 +712,7 @@ fn build_kademlia_response( }; KadPeer { - node_id: peer_id.clone(), + node_id: who.clone(), multiaddrs: addrs, connection_ty: connec_ty, } @@ -749,7 +748,7 @@ fn handle_custom_connection( // TODO: is there a better way to refuse connections than to drop the // newly-opened substream? should we refuse the connection // beforehand? - let (peer_id, unique_connec) = match shared.network_state.custom_proto( + let (who, unique_connec) = match shared.network_state.custom_proto( node_id.clone(), protocol_id, custom_proto_out.endpoint, @@ -759,14 +758,17 @@ fn handle_custom_connection( }; if let UniqueConnecState::Full = unique_connec.state() { - debug!(target: "sub-libp2p", "Interrupting connection attempt to {:?} \ - with {:?} because we're already connected", node_id, custom_proto_out.protocol_id); + debug!(target: "sub-libp2p", + "Interrupting connection attempt to {:?} with {:?} because we're already connected", + node_id, + custom_proto_out.protocol_id + ); return future::Either::A(future::ok(())) } struct ProtoDisconnectGuard { inner: Arc<Shared>, - peer_id: PeerId, + who: NodeIndex, node_id: PeerstorePeerId, handler: Arc<NetworkProtocolHandler + Send + Sync>, protocol: ProtocolId @@ -774,27 +776,27 @@ fn handle_custom_connection( impl Drop for ProtoDisconnectGuard { fn drop(&mut self) { - debug!(target: "sub-libp2p", + info!(target: "sub-libp2p", "Node {:?} with peer ID {} through protocol {:?} disconnected", self.node_id, - self.peer_id, + self.who, self.protocol ); self.handler.disconnected(&NetworkContextImpl { inner: self.inner.clone(), protocol: self.protocol, - current_peer: Some(self.peer_id), - }, &self.peer_id); + current_peer: Some(self.who), + }, &self.who); // When any custom protocol drops, we drop the peer entirely. // TODO: is this correct? - self.inner.network_state.drop_peer(self.peer_id, Some("Remote end disconnected")); + self.inner.network_state.drop_peer(self.who); } } let dc_guard = ProtoDisconnectGuard { inner: shared.clone(), - peer_id, + who, node_id: node_id.clone(), handler: handler.clone(), protocol: protocol_id, @@ -810,8 +812,8 @@ fn handle_custom_connection( handler.read(&NetworkContextImpl { inner: shared.clone(), protocol: protocol_id, - current_peer: Some(peer_id.clone()), - }, &peer_id, packet_id, &data); + current_peer: Some(who.clone()), + }, &who, packet_id, &data); Ok(()) } }); @@ -824,15 +826,19 @@ fn handle_custom_connection( val }); - debug!(target: "sub-libp2p", "Successfully connected to {:?} (peer id \ - {}) with protocol {:?} version {}", node_id, peer_id, protocol_id, - custom_proto_out.protocol_version); + debug!(target: "sub-libp2p", + "Successfully connected to {:?} (peer id {}) with protocol {:?} version {}", + node_id, + who, + protocol_id, + custom_proto_out.protocol_version + ); handler.connected(&NetworkContextImpl { inner: shared.clone(), protocol: protocol_id, - current_peer: Some(peer_id), - }, &peer_id); + current_peer: Some(who), + }, &who); future::Either::B(final_fut) } @@ -869,10 +875,10 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T, let shared = shared.clone(); let transport = transport.clone(); let swarm_controller = swarm_controller.clone(); - move |peer_id| + move |who| obtain_kad_connection( shared.clone(), - peer_id.clone(), + who.clone(), transport.clone(), swarm_controller.clone() ) @@ -938,8 +944,7 @@ fn perform_kademlia_query<T, To, St, C>( let random_key = PublicKey::Ed25519((0 .. 32) .map(|_| -> u8 { rand::random() }).collect()); let random_peer_id = random_key.into_peer_id(); - trace!(target: "sub-libp2p", "Start kademlia discovery for {:?}", - random_peer_id); + trace!(target: "sub-libp2p", "Start kademlia discovery for {:?}", random_peer_id); shared.clone() .kad_system @@ -947,7 +952,7 @@ fn perform_kademlia_query<T, To, St, C>( let shared = shared.clone(); let transport = transport.clone(); let swarm_controller = swarm_controller.clone(); - move |peer_id| obtain_kad_connection(shared.clone(), peer_id.clone(), + move |who| obtain_kad_connection(shared.clone(), who.clone(), transport.clone(), swarm_controller.clone()) }) .filter_map(move |event| @@ -980,8 +985,10 @@ fn connect_to_nodes<T, To, St, C>( St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static, C: 'static { let num_slots = shared.network_state.should_open_outgoing_custom_connections(); - debug!(target: "sub-libp2p", "Outgoing connections cycle ; opening up to \ - {} outgoing connections", num_slots); + debug!(target: "sub-libp2p", + "Outgoing connections cycle ; opening up to {} outgoing connections", + num_slots + ); for _ in 0 .. num_slots { // Choose a random peer. We are potentially already connected to @@ -1034,8 +1041,11 @@ fn connect_with_query_peer_id<T, To, St, C>( .and_then(move |info| { let _ = process_identify_info(shared, &info, original_addr, endpoint, &base_transport); - trace!(target: "sub-libp2p", "Bootnode {:} found with peer id: {:?}", - addr2, info.info.public_key.into_peer_id()); + trace!(target: "sub-libp2p", + "Bootnode {:} found with peer id: {:?}", + addr2, + info.info.public_key.into_peer_id() + ); upgrade::apply(socket, proto, endpoint, client_addr) }) }) @@ -1050,15 +1060,19 @@ fn connect_with_query_peer_id<T, To, St, C>( .map_err({ let addr = addr.clone(); move |err| { - warn!(target: "sub-libp2p", "Error while dialing {:?} to query peer id: {:?}", - addr, err); + warn!(target: "sub-libp2p", + "Error while dialing {:?} to query peer id: {:?}", + addr, + err + ); err } }); - let _ = swarm_controller.dial(addr.clone(), with_err) - .map_err( move |err| warn!(target: "sub-libp2p", - "Error when querying peer node info {:} of {:}", err, addr)); + let _ = swarm_controller.dial(addr.clone(), with_err) + .map_err(move |err| + warn!(target: "sub-libp2p", "Error when querying peer node info {:} of {:}", err, addr) + ); } /// If necessary, dials the given address for the given protocol and using the @@ -1081,8 +1095,7 @@ fn open_peer_custom_proto<T, To, St, C>( // Don't connect to ourselves. // TODO: remove this eventually if &expected_peer_id == shared.kad_system.local_peer_id() { - trace!(target: "sub-libp2p", "Skipped connecting to {:?} because \ - it is ourselves", expected_peer_id); + trace!(target: "sub-libp2p", "Skipped connecting to {:?} because it is ourselves", expected_peer_id); return } @@ -1102,13 +1115,14 @@ fn open_peer_custom_proto<T, To, St, C>( if info.info.public_key.into_peer_id() == expected_peer_id { Ok(socket) } else { - debug!(target: "sub-libp2p", "Public key mismatch for \ - node {:?} with proto {:?}", expected_peer_id, proto_id); - trace!(target: "sub-libp2p", "Removing addr {} for {:?}", - original_addr, expected_peer_id); + debug!(target: "sub-libp2p", + "Public key mismatch for node {:?} with proto {:?}", + expected_peer_id, + proto_id + ); + trace!(target: "sub-libp2p", "Removing addr {} for {:?}", original_addr, expected_peer_id); shared.network_state.set_invalid_kad_address(&expected_peer_id, &original_addr); - Err(IoError::new(IoErrorKind::InvalidData, "public \ - key mismatch when identifyed peer")) + Err(IoError::new(IoErrorKind::InvalidData, "public key mismatch when identifyed peer")) } ) .and_then(move |socket| @@ -1128,32 +1142,39 @@ fn open_peer_custom_proto<T, To, St, C>( .map_err({ let node_id = node_id.clone(); move |err| { - debug!(target: "sub-libp2p", "Error while dialing \ - {:?} with custom proto: {:?}", node_id, err); + debug!(target: "sub-libp2p", "Error while dialing {:?} with custom proto: {:?}", node_id, err); err } }); match shared2.network_state.custom_proto(node_id.clone(), proto_id, Endpoint::Dialer) { - Ok((peer_id, unique_connec)) => { + Ok((who, unique_connec)) => { if !unique_connec.is_alive() { - trace!(target: "sub-libp2p", "Opening connection to #{} {:?} with \ - proto {:?}", peer_id, node_id, proto_id); + trace!(target: "sub-libp2p", + "Opening connection to #{} {:?} with proto {:?}", + who, + node_id, + proto_id + ); } // TODO: this future should be used let _ = unique_connec.get_or_dial(&swarm_controller, &addr, with_err); }, Err(err) => { - trace!(target: "sub-libp2p", "Error while opening connection to - {:?} with proto {:?} => {:?}", node_id, proto_id, err); + trace!(target: "sub-libp2p", + "Error while opening connection to {:?} with proto {:?} => {:?}", + node_id, + proto_id, + err + ); }, } } /// Obtain a Kademlia connection to the given peer. fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>, - peer_id: PeerstorePeerId, transport: T, swarm_controller: SwarmController<St>) + who: PeerstorePeerId, transport: T, swarm_controller: SwarmController<St>) -> impl Future<Item = KadConnecController, Error = IoError> where T: MuxedTransport<Output = TransportOutput<To>> + Clone + 'static, T::MultiaddrFuture: 'static, @@ -1161,7 +1182,7 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>, St: MuxedTransport<Output = FinalUpgrade<C>> + Clone + 'static, C: 'static { let kad_upgrade = shared.kad_upgrade.clone(); - let addr: Multiaddr = AddrComponent::P2P(peer_id.clone().into_bytes()).into(); + let addr: Multiaddr = AddrComponent::P2P(who.clone().into_bytes()).into(); let transport = transport .and_then(move |out, endpoint, client_addr| upgrade::apply(out.socket, kad_upgrade.clone(), @@ -1175,7 +1196,7 @@ fn obtain_kad_connection<T, To, St, C>(shared: Arc<Shared>, }); shared.network_state - .kad_connection(peer_id.clone()) + .kad_connection(who.clone()) .into_future() .map(move |(_, k)| k.get_or_dial(&swarm_controller, &addr, transport)) .flatten() @@ -1204,14 +1225,15 @@ fn process_identify_info( if let Some(mut ext_addr) = transport.nat_traversal(original_listened_addr, &info.observed_addr) { let mut listened_addrs = shared.listened_addrs.write(); if !listened_addrs.iter().any(|a| a == &ext_addr) { - trace!(target: "sub-libp2p", "NAT traversal: remote observes us as \ - {} ; registering {} as one of our own addresses", - info.observed_addr, ext_addr); + trace!(target: "sub-libp2p", + "NAT traversal: remote observes us as {}; registering {} as one of our own addresses", + info.observed_addr, + ext_addr + ); listened_addrs.push(ext_addr.clone()); ext_addr.append(AddrComponent::P2P(shared.kad_system .local_peer_id().clone().into_bytes())); - info!(target: "sub-libp2p", "New external node address: {}", - ext_addr); + info!(target: "sub-libp2p", "New external node address: {}", ext_addr); } } } @@ -1273,16 +1295,16 @@ fn ping_all<T, St, C>( C: 'static { let mut ping_futures = Vec::new(); - for (peer, peer_id, pinger) in shared.network_state.cleanup_and_prepare_ping() { + for (peer, who, pinger) in shared.network_state.cleanup_and_prepare_ping() { let shared = shared.clone(); - let addr = Multiaddr::from(AddrComponent::P2P(peer_id.clone().into_bytes())); + let addr = Multiaddr::from(AddrComponent::P2P(who.clone().into_bytes())); let fut = pinger .get_or_dial(&swarm_controller, &addr, transport.clone()) .and_then(move |mut p| { - trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, peer_id); + trace!(target: "sub-libp2p", "Pinging peer #{} aka. {:?}", peer, who); p.ping() - .map(|()| peer_id) + .map(|()| who) .map_err(|err| IoError::new(IoErrorKind::Other, err)) }); let ping_start_time = Instant::now(); @@ -1291,15 +1313,15 @@ fn ping_all<T, St, C>( match val { Err(err) => { trace!(target: "sub-libp2p", "Error while pinging #{:?} => {:?}", peer, err); - shared.network_state.drop_peer(peer, None); // None so that we don't print messages on such low-level issues. + shared.network_state.report_ping_failed(peer); // Return Ok, otherwise we would close the ping service Ok(()) }, - Ok(peer_id) => { + Ok(who) => { let elapsed = ping_start_time.elapsed(); trace!(target: "sub-libp2p", "Pong from #{:?} in {:?}", peer, elapsed); shared.network_state.report_ping_duration(peer, elapsed); - shared.kad_system.update_kbuckets(peer_id); + shared.kad_system.update_kbuckets(who); Ok(()) } } diff --git a/substrate/substrate/network-libp2p/src/traits.rs b/substrate/substrate/network-libp2p/src/traits.rs index fac9f1915a9505d5e6c21ab14381b5239f2091ca..3632bdac8351da910fce70a013e204664fcd19af 100644 --- a/substrate/substrate/network-libp2p/src/traits.rs +++ b/substrate/substrate/network-libp2p/src/traits.rs @@ -36,7 +36,8 @@ pub type ProtocolId = [u8; 3]; pub type NodeId = H512; /// Local (temporary) peer session ID. -pub type PeerId = usize; +/// RENAME TO NodeIndex +pub type NodeIndex = usize; /// Messages used to communitate with the event loop from other threads. #[derive(Clone)] @@ -62,9 +63,9 @@ pub enum NetworkIoMessage { /// Initliaze public interface. InitPublicInterface, /// Disconnect a peer. - Disconnect(PeerId), + Disconnect(NodeIndex), /// Disconnect and temporary disable peer. - DisablePeer(PeerId), + DisablePeer(NodeIndex), /// Network has been started with the host as the given enode. NetworkStarted(String), } @@ -240,16 +241,16 @@ impl<'a> fmt::Display for Severity<'a> { /// IO access point. This is passed to all IO handlers and provides an interface to the IO subsystem. pub trait NetworkContext { /// Send a packet over the network to another peer. - fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>); + fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>); /// Send a packet over the network to another peer using specified protocol. - fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>); + fn send_protocol(&self, protocol: ProtocolId, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>); /// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing. fn respond(&self, packet_id: PacketId, data: Vec<u8>); /// Report peer. Depending on the report, peer may be disconnected and possibly banned. - fn report_peer(&self, peer: PeerId, reason: Severity); + fn report_peer(&self, peer: NodeIndex, reason: Severity); /// Check if the session is still active. fn is_expired(&self) -> bool; @@ -258,24 +259,24 @@ pub trait NetworkContext { fn register_timer(&self, token: TimerToken, delay: Duration) -> Result<(), Error>; /// Returns peer identification string - fn peer_client_version(&self, peer: PeerId) -> String; + fn peer_client_version(&self, peer: NodeIndex) -> String; /// Returns information on p2p session - fn session_info(&self, peer: PeerId) -> Option<SessionInfo>; + fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo>; /// Returns max version for a given protocol. - fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8>; + fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8>; /// Returns this object's subprotocol name. fn subprotocol_name(&self) -> ProtocolId; } impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext { - fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) { + fn send(&self, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) { (**self).send(peer, packet_id, data) } - fn send_protocol(&self, protocol: ProtocolId, peer: PeerId, packet_id: PacketId, data: Vec<u8>) { + fn send_protocol(&self, protocol: ProtocolId, peer: NodeIndex, packet_id: PacketId, data: Vec<u8>) { (**self).send_protocol(protocol, peer, packet_id, data) } @@ -283,7 +284,7 @@ impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext { (**self).respond(packet_id, data) } - fn report_peer(&self, peer: PeerId, reason: Severity) { + fn report_peer(&self, peer: NodeIndex, reason: Severity) { (**self).report_peer(peer, reason) } @@ -295,15 +296,15 @@ impl<'a, T> NetworkContext for &'a T where T: ?Sized + NetworkContext { (**self).register_timer(token, delay) } - fn peer_client_version(&self, peer: PeerId) -> String { + fn peer_client_version(&self, peer: NodeIndex) -> String { (**self).peer_client_version(peer) } - fn session_info(&self, peer: PeerId) -> Option<SessionInfo> { + fn session_info(&self, peer: NodeIndex) -> Option<SessionInfo> { (**self).session_info(peer) } - fn protocol_version(&self, protocol: ProtocolId, peer: PeerId) -> Option<u8> { + fn protocol_version(&self, protocol: ProtocolId, peer: NodeIndex) -> Option<u8> { (**self).protocol_version(protocol, peer) } @@ -319,11 +320,11 @@ pub trait NetworkProtocolHandler: Sync + Send { /// Initialize the handler fn initialize(&self, _io: &NetworkContext) {} /// Called when new network packet received. - fn read(&self, io: &NetworkContext, peer: &PeerId, packet_id: u8, data: &[u8]); + fn read(&self, io: &NetworkContext, peer: &NodeIndex, packet_id: u8, data: &[u8]); /// Called when new peer is connected. Only called when peer supports the same protocol. - fn connected(&self, io: &NetworkContext, peer: &PeerId); + fn connected(&self, io: &NetworkContext, peer: &NodeIndex); /// Called when a previously connected peer disconnects. - fn disconnected(&self, io: &NetworkContext, peer: &PeerId); + fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex); /// Timer function called after a timeout created with `NetworkContext::timeout`. fn timeout(&self, _io: &NetworkContext, _timer: TimerToken) {} } diff --git a/substrate/substrate/network-libp2p/tests/tests.rs b/substrate/substrate/network-libp2p/tests/tests.rs index c7c98d877535afbc3139f0ae29ff7210d3d5d463..fc07035f9d060e5041b1d5d3bef1307b533a95e1 100644 --- a/substrate/substrate/network-libp2p/tests/tests.rs +++ b/substrate/substrate/network-libp2p/tests/tests.rs @@ -66,12 +66,12 @@ impl NetworkProtocolHandler for TestProtocol { io.register_timer(0, Duration::from_millis(10)).unwrap(); } - fn read(&self, _io: &NetworkContext, _peer: &PeerId, packet_id: u8, data: &[u8]) { + fn read(&self, _io: &NetworkContext, _peer: &NodeIndex, packet_id: u8, data: &[u8]) { assert_eq!(packet_id, 33); self.packet.lock().extend(data); } - fn connected(&self, io: &NetworkContext, peer: &PeerId) { + fn connected(&self, io: &NetworkContext, peer: &NodeIndex) { if self.drop_session { io.report_peer(*peer, Severity::Bad("We are evil and just want to drop")) } else { @@ -79,7 +79,7 @@ impl NetworkProtocolHandler for TestProtocol { } } - fn disconnected(&self, _io: &NetworkContext, _peer: &PeerId) { + fn disconnected(&self, _io: &NetworkContext, _peer: &NodeIndex) { self.got_disconnect.store(true, AtomicOrdering::Relaxed); } diff --git a/substrate/substrate/network/src/blocks.rs b/substrate/substrate/network/src/blocks.rs index 075b2edc757dbb951bf5aca417c410e0a99aad3a..ffb95bf1f3f34797838745a5e97c11f67f6582af 100644 --- a/substrate/substrate/network/src/blocks.rs +++ b/substrate/substrate/network/src/blocks.rs @@ -19,7 +19,7 @@ use std::cmp; use std::ops::Range; use std::collections::{HashMap, BTreeMap}; use std::collections::hash_map::Entry; -use network_libp2p::PeerId; +use network_libp2p::NodeIndex; use runtime_primitives::traits::{Block as BlockT, NumberFor, As}; use message; @@ -29,7 +29,7 @@ const MAX_PARALLEL_DOWNLOADS: u32 = 1; #[derive(Debug, Clone, PartialEq, Eq)] pub struct BlockData<B: BlockT> { pub block: message::BlockData<B>, - pub origin: PeerId, + pub origin: NodeIndex, } #[derive(Debug)] @@ -55,7 +55,7 @@ impl<B: BlockT> BlockRangeState<B> { pub struct BlockCollection<B: BlockT> { /// Downloaded blocks. blocks: BTreeMap<NumberFor<B>, BlockRangeState<B>>, - peer_requests: HashMap<PeerId, NumberFor<B>>, + peer_requests: HashMap<NodeIndex, NumberFor<B>>, } impl<B: BlockT> BlockCollection<B> { @@ -74,7 +74,7 @@ impl<B: BlockT> BlockCollection<B> { } /// Insert a set of blocks into collection. - pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, peer_id: PeerId) { + pub fn insert(&mut self, start: NumberFor<B>, blocks: Vec<message::BlockData<B>>, who: NodeIndex) { if blocks.is_empty() { return; } @@ -92,11 +92,11 @@ impl<B: BlockT> BlockCollection<B> { _ => (), } - self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: peer_id, block: b }).collect())); + self.blocks.insert(start, BlockRangeState::Complete(blocks.into_iter().map(|b| BlockData { origin: who, block: b }).collect())); } /// Returns a set of block hashes that require a header download. The returned set is marked as being downloaded. - pub fn needed_blocks(&mut self, peer_id: PeerId, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>) -> Option<Range<NumberFor<B>>> { + pub fn needed_blocks(&mut self, who: NodeIndex, count: usize, peer_best: NumberFor<B>, common: NumberFor<B>) -> Option<Range<NumberFor<B>>> { // First block number that we need to download let first_different = common + As::sa(1); let count = As::sa(count as u64); @@ -125,11 +125,11 @@ impl<B: BlockT> BlockCollection<B> { }; // crop to peers best if range.start > peer_best { - trace!(target: "sync", "Out of range for peer {} ({} vs {})", peer_id, range.start, peer_best); + trace!(target: "sync", "Out of range for peer {} ({} vs {})", who, range.start, peer_best); return None; } range.end = cmp::min(peer_best + As::sa(1), range.end); - self.peer_requests.insert(peer_id, range.start); + self.peer_requests.insert(who, range.start); self.blocks.insert(range.start, BlockRangeState::Downloading{ len: range.end - range.start, downloading: downloading + 1 }); if range.end <= range.start { panic!("Empty range {:?}, count={}, peer_best={}, common={}, blocks={:?}", range, count, peer_best, common, self.blocks); @@ -162,8 +162,8 @@ impl<B: BlockT> BlockCollection<B> { drained } - pub fn clear_peer_download(&mut self, peer_id: PeerId) { - match self.peer_requests.entry(peer_id) { + pub fn clear_peer_download(&mut self, who: NodeIndex) { + match self.peer_requests.entry(who) { Entry::Occupied(entry) => { let start = entry.remove(); let remove = match self.blocks.get_mut(&start) { diff --git a/substrate/substrate/network/src/consensus_gossip.rs b/substrate/substrate/network/src/consensus_gossip.rs index ed1dbf38d375eb44f7123fd728e84d891a9d495c..8e9065b3a7a4620e14bf7fe0410b5c0e9ab1227b 100644 --- a/substrate/substrate/network/src/consensus_gossip.rs +++ b/substrate/substrate/network/src/consensus_gossip.rs @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet}; use futures::sync::mpsc; use std::time::{Instant, Duration}; -use network_libp2p::PeerId; +use network_libp2p::NodeIndex; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; use runtime_primitives::generic::BlockId; use message::{self, generic::Message as GenericMessage}; @@ -51,7 +51,7 @@ struct MessageEntry<B: BlockT> { /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip<B: BlockT> { - peers: HashMap<PeerId, PeerConsensus<B::Hash>>, + peers: HashMap<NodeIndex, PeerConsensus<B::Hash>>, message_sink: Option<(mpsc::UnboundedSender<ConsensusMessage<B>>, B::Hash)>, messages: Vec<MessageEntry<B>>, message_hashes: HashSet<B::Hash>, @@ -74,9 +74,9 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { } /// Handle new connected peer. - pub fn new_peer(&mut self, protocol: &mut Context<B>, peer_id: PeerId, roles: Roles) { + pub fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex, roles: Roles) { if roles.intersects(Roles::AUTHORITY | Roles::FULL) { - trace!(target:"gossip", "Registering {:?} {}", roles, peer_id); + trace!(target:"gossip", "Registering {:?} {}", roles, who); // Send out all known messages. // TODO: limit by size let mut known_messages = HashSet::new(); @@ -87,9 +87,9 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { ConsensusMessage::ChainSpecific(ref msg, _) => GenericMessage::ChainSpecific(msg.clone()), }; - protocol.send_message(peer_id, message); + protocol.send_message(who, message); } - self.peers.insert(peer_id, PeerConsensus { + self.peers.insert(who, PeerConsensus { known_messages, }); } @@ -115,16 +115,16 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { } /// Handles incoming BFT message, passing to stream and repropagating. - pub fn on_bft_message(&mut self, protocol: &mut Context<B>, peer_id: PeerId, message: message::LocalizedBftMessage<B>) { - if let Some((hash, message)) = self.handle_incoming(protocol, peer_id, ConsensusMessage::Bft(message)) { + pub fn on_bft_message(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: message::LocalizedBftMessage<B>) { + if let Some((hash, message)) = self.handle_incoming(protocol, who, ConsensusMessage::Bft(message)) { // propagate to other peers. self.multicast(protocol, message, Some(hash)); } } /// Handles incoming chain-specific message and repropagates - pub fn on_chain_specific(&mut self, protocol: &mut Context<B>, peer_id: PeerId, message: Vec<u8>, parent_hash: B::Hash) { - if let Some((hash, message)) = self.handle_incoming(protocol, peer_id, ConsensusMessage::ChainSpecific(message, parent_hash)) { + pub fn on_chain_specific(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: Vec<u8>, parent_hash: B::Hash) { + if let Some((hash, message)) = self.handle_incoming(protocol, who, ConsensusMessage::ChainSpecific(message, parent_hash)) { // propagate to other peers. self.multicast(protocol, message, Some(hash)); } @@ -163,8 +163,8 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { } /// Call when a peer has been disconnected to stop tracking gossip status. - pub fn peer_disconnected(&mut self, _protocol: &mut Context<B>, peer_id: PeerId) { - self.peers.remove(&peer_id); + pub fn peer_disconnected(&mut self, _protocol: &mut Context<B>, who: NodeIndex) { + self.peers.remove(&who); } /// Prune old or no longer relevant consensus messages. @@ -195,7 +195,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { } } - fn handle_incoming(&mut self, protocol: &mut Context<B>, peer_id: PeerId, message: ConsensusMessage<B>) -> Option<(B::Hash, ConsensusMessage<B>)> { + fn handle_incoming(&mut self, protocol: &mut Context<B>, who: NodeIndex, message: ConsensusMessage<B>) -> Option<(B::Hash, ConsensusMessage<B>)> { let (hash, parent, message) = match message { ConsensusMessage::Bft(msg) => { let parent = msg.parent_hash; @@ -223,7 +223,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { }; if self.message_hashes.contains(&hash) { - trace!(target:"gossip", "Ignored already known message from {}", peer_id); + trace!(target:"gossip", "Ignored already known message from {}", who); return None; } @@ -234,14 +234,14 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { }, (Ok(info), Ok(Some(header))) => { if header.number() < &info.chain.best_number { - trace!(target:"gossip", "Ignored ancient message from {}, hash={}", peer_id, parent); + trace!(target:"gossip", "Ignored ancient message from {}, hash={}", who, parent); return None; } }, (Ok(_), Ok(None)) => {}, } - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if let Some(ref mut peer) = self.peers.get_mut(&who) { peer.known_messages.insert(hash); if let Some((sink, parent_hash)) = self.message_sink.take() { if parent == parent_hash { @@ -253,7 +253,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { self.message_sink = Some((sink, parent_hash)); } } else { - trace!(target:"gossip", "Ignored statement from unregistered peer {}", peer_id); + trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); return None; } diff --git a/substrate/substrate/network/src/import_queue.rs b/substrate/substrate/network/src/import_queue.rs index 585bbae5c1a70f0b1c2ee13803e6db1b83a8e917..08679ed9f5b5d34e15e8de33e074187de2bb13d5 100644 --- a/substrate/substrate/network/src/import_queue.rs +++ b/substrate/substrate/network/src/import_queue.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use parking_lot::{Condvar, Mutex, RwLock}; use client::{BlockOrigin, BlockStatus, ImportResult}; -use network_libp2p::{PeerId, Severity}; +use network_libp2p::{NodeIndex, Severity}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}; @@ -202,9 +202,9 @@ trait SyncLinkApi<B: BlockT> { /// Maintain sync. fn maintain_sync(&mut self); /// Disconnect from peer. - fn useless_peer(&mut self, peer_id: PeerId, reason: &str); + fn useless_peer(&mut self, who: NodeIndex, reason: &str); /// Disconnect from peer and restart sync. - fn note_useless_and_restart_sync(&mut self, peer_id: PeerId, reason: &str); + fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str); /// Restart sync. fn restart(&mut self); } @@ -233,9 +233,9 @@ enum BlockImportResult<H: ::std::fmt::Debug + PartialEq, N: ::std::fmt::Debug + #[derive(Debug, PartialEq)] enum BlockImportError { /// Disconnect from peer and continue import of next bunch of blocks. - Disconnect(PeerId), + Disconnect(NodeIndex), /// Disconnect from peer and restart sync. - DisconnectAndRestart(PeerId), + DisconnectAndRestart(NodeIndex), /// Restart sync. Restart, } @@ -356,16 +356,16 @@ fn process_import_result<'a, B: BlockT>( link.block_imported(&hash, number); 1 }, - Err(BlockImportError::Disconnect(peer_id)) => { + Err(BlockImportError::Disconnect(who)) => { // TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set. // This should contain an actual reason. - link.useless_peer(peer_id, "Import result was stated Disconnect"); + link.useless_peer(who, "Import result was stated Disconnect"); 0 }, - Err(BlockImportError::DisconnectAndRestart(peer_id)) => { + Err(BlockImportError::DisconnectAndRestart(who)) => { // TODO: FIXME: @arkpar BlockImport shouldn't be trying to manage the peer set. // This should contain an actual reason. - link.note_useless_and_restart_sync(peer_id, "Import result was stated DisconnectAndRestart"); + link.note_useless_and_restart_sync(who, "Import result was stated DisconnectAndRestart"); 0 }, Err(BlockImportError::Restart) => { @@ -408,13 +408,13 @@ impl<'a, B: 'static + BlockT, E: ExecuteInContext<B>> SyncLinkApi<B> for SyncLin self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) } - fn useless_peer(&mut self, peer_id: PeerId, reason: &str) { - self.with_sync(|_, protocol| protocol.report_peer(peer_id, Severity::Useless(reason))) + fn useless_peer(&mut self, who: NodeIndex, reason: &str) { + self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason))) } - fn note_useless_and_restart_sync(&mut self, peer_id: PeerId, reason: &str) { + fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str) { self.with_sync(|sync, protocol| { - protocol.report_peer(peer_id, Severity::Useless(reason)); // is this actually malign or just useless? + protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless? sync.restart(protocol); }) } @@ -490,8 +490,8 @@ pub mod tests { fn chain(&self) -> &Client<Block> { &*self.chain } fn block_imported(&mut self, _hash: &Hash, _number: NumberFor<Block>) { self.imported += 1; } fn maintain_sync(&mut self) { self.maintains += 1; } - fn useless_peer(&mut self, _: PeerId, _: &str) { self.disconnects += 1; } - fn note_useless_and_restart_sync(&mut self, _: PeerId, _: &str) { self.disconnects += 1; self.restarts += 1; } + fn useless_peer(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; } + fn note_useless_and_restart_sync(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; self.restarts += 1; } fn restart(&mut self) { self.restarts += 1; } } diff --git a/substrate/substrate/network/src/io.rs b/substrate/substrate/network/src/io.rs index d38827bee62ab8e71d1a15fac9f6848728e3449f..2ea5e4ffaf64d95b2a57b7d8be02cc058d967a60 100644 --- a/substrate/substrate/network/src/io.rs +++ b/substrate/substrate/network/src/io.rs @@ -14,21 +14,21 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see <http://www.gnu.org/licenses/>.? -use network_libp2p::{NetworkContext, Severity, PeerId, SessionInfo}; +use network_libp2p::{NetworkContext, Severity, NodeIndex, SessionInfo}; /// IO interface for the syncing handler. /// Provides peer connection management and an interface to the blockchain client. pub trait SyncIo { /// Report a peer for misbehaviour. - fn report_peer(&mut self, peer_id: PeerId, reason: Severity); + fn report_peer(&mut self, who: NodeIndex, reason: Severity); /// Send a packet to a peer. - fn send(&mut self, peer_id: PeerId, data: Vec<u8>); + fn send(&mut self, who: NodeIndex, data: Vec<u8>); /// Returns peer identifier string - fn peer_info(&self, peer_id: PeerId) -> String { - peer_id.to_string() + fn peer_info(&self, who: NodeIndex) -> String { + who.to_string() } /// Returns information on p2p session - fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo>; + fn peer_session_info(&self, who: NodeIndex) -> Option<SessionInfo>; /// Check if the session is expired fn is_expired(&self) -> bool; } @@ -48,24 +48,24 @@ impl<'s> NetSyncIo<'s> { } impl<'s> SyncIo for NetSyncIo<'s> { - fn report_peer(&mut self, peer_id: PeerId, reason: Severity) { - self.network.report_peer(peer_id, reason); + fn report_peer(&mut self, who: NodeIndex, reason: Severity) { + self.network.report_peer(who, reason); } - fn send(&mut self, peer_id: PeerId, data: Vec<u8>) { - self.network.send(peer_id, 0, data) + fn send(&mut self, who: NodeIndex, data: Vec<u8>) { + self.network.send(who, 0, data) } - fn peer_session_info(&self, peer_id: PeerId) -> Option<SessionInfo> { - self.network.session_info(peer_id) + fn peer_session_info(&self, who: NodeIndex) -> Option<SessionInfo> { + self.network.session_info(who) } fn is_expired(&self) -> bool { self.network.is_expired() } - fn peer_info(&self, peer_id: PeerId) -> String { - self.network.peer_client_version(peer_id) + fn peer_info(&self, who: NodeIndex) -> String { + self.network.peer_client_version(who) } } diff --git a/substrate/substrate/network/src/lib.rs b/substrate/substrate/network/src/lib.rs index 8ea6ca3db94e16b46fac86141e3909ca9334de19..2c6b880013b31f3b7d78e3896bbc2862baac1fb3 100644 --- a/substrate/substrate/network/src/lib.rs +++ b/substrate/substrate/network/src/lib.rs @@ -59,7 +59,7 @@ pub use service::{Service, FetchFuture, ConsensusService, BftMessageStream, TransactionPool, Params, ManageNetwork, SyncProvider}; pub use protocol::{ProtocolStatus, PeerInfo, Context}; pub use sync::{Status as SyncStatus, SyncState}; -pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, PeerId, ProtocolId, ConnectionFilter, ConnectionDirection, Severity}; +pub use network_libp2p::{NonReservedPeerMode, NetworkConfiguration, NodeIndex, ProtocolId, ConnectionFilter, ConnectionDirection, Severity}; pub use message::{generic as generic_message, RequestId, BftMessage, LocalizedBftMessage, ConsensusVote, SignedConsensusVote, SignedConsensusMessage, SignedConsensusProposal, Status as StatusMessage}; pub use error::Error; pub use config::{Roles, ProtocolConfig}; diff --git a/substrate/substrate/network/src/on_demand.rs b/substrate/substrate/network/src/on_demand.rs index 9689a3f39d90fd841faadb6f9c104adf2c4823ab..8a8e76a3d0bbf3a9d031c284330b12806f4e26ba 100644 --- a/substrate/substrate/network/src/on_demand.rs +++ b/substrate/substrate/network/src/on_demand.rs @@ -28,7 +28,7 @@ use client; use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest}; use io::SyncIo; use message; -use network_libp2p::{Severity, PeerId}; +use network_libp2p::{Severity, NodeIndex}; use service; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT}; @@ -38,16 +38,16 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); /// On-demand service API. pub trait OnDemandService<Block: BlockT>: Send + Sync { /// When new node is connected. - fn on_connect(&self, peer: PeerId, role: service::Roles); + fn on_connect(&self, peer: NodeIndex, role: service::Roles); /// When node is disconnected. - fn on_disconnect(&self, peer: PeerId); + fn on_disconnect(&self, peer: NodeIndex); /// Maintain peers requests. fn maintain_peers(&self, io: &mut SyncIo); /// When call response is received from remote node. - fn on_remote_call_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse); + fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse); } /// On-demand requests service. Dispatches requests to appropriate peers. @@ -66,8 +66,8 @@ struct OnDemandCore<B: BlockT, E: service::ExecuteInContext<B>> { service: Weak<E>, next_request_id: u64, pending_requests: VecDeque<Request<B>>, - active_peers: LinkedHashMap<PeerId, Request<B>>, - idle_peers: VecDeque<PeerId>, + active_peers: LinkedHashMap<NodeIndex, Request<B>>, + idle_peers: VecDeque<NodeIndex>, } struct Request<Block: BlockT> { @@ -132,7 +132,7 @@ impl<B: BlockT, E> OnDemand<B, E> where } /// Try to accept response from given peer. - fn accept_response<F: FnOnce(Request<B>) -> Accept<B>>(&self, rtype: &str, io: &mut SyncIo, peer: PeerId, request_id: u64, try_accept: F) { + fn accept_response<F: FnOnce(Request<B>) -> Accept<B>>(&self, rtype: &str, io: &mut SyncIo, peer: NodeIndex, request_id: u64, try_accept: F) { let mut core = self.core.lock(); let request = match core.remove(peer, request_id) { Some(request) => request, @@ -165,7 +165,7 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where E: service::ExecuteInContext<B>, B::Header: HeaderT, { - fn on_connect(&self, peer: PeerId, role: service::Roles) { + fn on_connect(&self, peer: NodeIndex, role: service::Roles) { if !role.intersects(service::Roles::FULL | service::Roles::AUTHORITY) { // TODO: correct? return; } @@ -175,7 +175,7 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where core.dispatch(); } - fn on_disconnect(&self, peer: PeerId) { + fn on_disconnect(&self, peer: NodeIndex) { let mut core = self.core.lock(); core.remove_peer(peer); core.dispatch(); @@ -189,7 +189,7 @@ impl<B, E> OnDemandService<B> for OnDemand<B, E> where core.dispatch(); } - fn on_remote_call_response(&self, io: &mut SyncIo, peer: PeerId, response: message::RemoteCallResponse) { + fn on_remote_call_response(&self, io: &mut SyncIo, peer: NodeIndex, response: message::RemoteCallResponse) { self.accept_response("call", io, peer, response.id, |request| match request.data { RequestData::RemoteCall(request, sender) => match self.checker.check_execution_proof(&request, response.proof) { Ok(response) => { @@ -222,11 +222,11 @@ impl<B, E> OnDemandCore<B, E> where E: service::ExecuteInContext<B>, B::Header: HeaderT, { - pub fn add_peer(&mut self, peer: PeerId) { + pub fn add_peer(&mut self, peer: NodeIndex) { self.idle_peers.push_back(peer); } - pub fn remove_peer(&mut self, peer: PeerId) { + pub fn remove_peer(&mut self, peer: NodeIndex) { if let Some(request) = self.active_peers.remove(&peer) { self.pending_requests.push_front(request); return; @@ -237,7 +237,7 @@ impl<B, E> OnDemandCore<B, E> where } } - pub fn maintain_peers(&mut self) -> Vec<PeerId> { + pub fn maintain_peers(&mut self) -> Vec<NodeIndex> { let now = Instant::now(); let mut bad_peers = Vec::new(); loop { @@ -263,7 +263,7 @@ impl<B, E> OnDemandCore<B, E> where }); } - pub fn remove(&mut self, peer: PeerId, id: u64) -> Option<Request<B>> { + pub fn remove(&mut self, peer: NodeIndex, id: u64) -> Option<Request<B>> { match self.active_peers.entry(peer) { Entry::Occupied(entry) => match entry.get().id == id { true => { @@ -321,7 +321,7 @@ pub mod tests { use client; use client::light::fetcher::{Fetcher, FetchChecker, RemoteCallRequest}; use message; - use network_libp2p::PeerId; + use network_libp2p::NodeIndex; use service::{Roles, ExecuteInContext}; use test::TestIo; use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; @@ -358,7 +358,7 @@ pub mod tests { core.idle_peers.len() + core.active_peers.len() } - fn receive_call_response(on_demand: &OnDemand<Block, DummyExecutor>, network: &mut TestIo, peer: PeerId, id: message::RequestId) { + fn receive_call_response(on_demand: &OnDemand<Block, DummyExecutor>, network: &mut TestIo, peer: NodeIndex, id: message::RequestId) { on_demand.on_remote_call_response(network, peer, message::RemoteCallResponse { id: id, proof: vec![vec![2]], diff --git a/substrate/substrate/network/src/protocol.rs b/substrate/substrate/network/src/protocol.rs index f09ca9c37e4509c7da9cc691aa0c2030bec52fc0..e798536c451a95e56ea7c58bda5bb30d4da447d4 100644 --- a/substrate/substrate/network/src/protocol.rs +++ b/substrate/substrate/network/src/protocol.rs @@ -21,7 +21,7 @@ use std::time; use parking_lot::RwLock; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, Hash, HashFor, As}; use runtime_primitives::generic::BlockId; -use network_libp2p::{PeerId, Severity}; +use network_libp2p::{NodeIndex, Severity}; use codec::{Encode, Decode}; use message::{self, Message}; @@ -55,7 +55,7 @@ pub struct Protocol<B: BlockT, S: Specialization<B>> { specialization: RwLock<S>, context_data: ContextData<B>, // Connected peers pending Status message. - handshaking_peers: RwLock<HashMap<PeerId, time::Instant>>, + handshaking_peers: RwLock<HashMap<NodeIndex, time::Instant>>, transaction_pool: Arc<TransactionPool<B>>, } /// Syncing status and statistics @@ -110,13 +110,13 @@ pub trait Context<B: BlockT> { fn client(&self) -> &::chain::Client<B>; /// Point out that a peer has been malign or irresponsible or appeared lazy. - fn report_peer(&mut self, peer_id: PeerId, reason: Severity); + fn report_peer(&mut self, who: NodeIndex, reason: Severity); /// Get peer info. - fn peer_info(&self, peer: PeerId) -> Option<PeerInfo<B>>; + fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>>; /// Send a message to a peer. - fn send_message(&mut self, peer_id: PeerId, data: ::message::Message<B>); + fn send_message(&mut self, who: NodeIndex, data: ::message::Message<B>); } /// Protocol context. @@ -134,17 +134,17 @@ impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> { } /// Send a message to a peer. - pub fn send_message(&mut self, peer_id: PeerId, message: Message<B>) { - send_message(&self.context_data.peers, self.io, peer_id, message) + pub fn send_message(&mut self, who: NodeIndex, message: Message<B>) { + send_message(&self.context_data.peers, self.io, who, message) } /// Point out that a peer has been malign or irresponsible or appeared lazy. - pub fn report_peer(&mut self, peer_id: PeerId, reason: Severity) { - self.io.report_peer(peer_id, reason); + pub fn report_peer(&mut self, who: NodeIndex, reason: Severity) { + self.io.report_peer(who, reason); } /// Get peer info. - pub fn peer_info(&self, peer: PeerId) -> Option<PeerInfo<B>> { + pub fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>> { self.context_data.peers.read().get(&peer).map(|p| { PeerInfo { roles: p.roles, @@ -157,16 +157,16 @@ impl<'a, B: BlockT + 'a> ProtocolContext<'a, B> { } impl<'a, B: BlockT + 'a> Context<B> for ProtocolContext<'a, B> { - fn send_message(&mut self, peer_id: PeerId, message: Message<B>) { - ProtocolContext::send_message(self, peer_id, message); + fn send_message(&mut self, who: NodeIndex, message: Message<B>) { + ProtocolContext::send_message(self, who, message); } - fn report_peer(&mut self, peer_id: PeerId, reason: Severity) { - ProtocolContext::report_peer(self, peer_id, reason); + fn report_peer(&mut self, who: NodeIndex, reason: Severity) { + ProtocolContext::report_peer(self, who, reason); } - fn peer_info(&self, peer_id: PeerId) -> Option<PeerInfo<B>> { - ProtocolContext::peer_info(self, peer_id) + fn peer_info(&self, who: NodeIndex) -> Option<PeerInfo<B>> { + ProtocolContext::peer_info(self, who) } fn client(&self) -> &Client<B> { @@ -177,7 +177,7 @@ impl<'a, B: BlockT + 'a> Context<B> for ProtocolContext<'a, B> { /// Data necessary to create a context. pub(crate) struct ContextData<B: BlockT> { // All connected peers - peers: RwLock<HashMap<PeerId, Peer<B>>>, + peers: RwLock<HashMap<NodeIndex, Peer<B>>>, chain: Arc<Client<B>>, } @@ -228,63 +228,63 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { } } - pub fn handle_packet(&self, io: &mut SyncIo, peer_id: PeerId, mut data: &[u8]) { + pub fn handle_packet(&self, io: &mut SyncIo, who: NodeIndex, mut data: &[u8]) { let message: Message<B> = match Decode::decode(&mut data) { Some(m) => m, None => { - trace!(target: "sync", "Invalid packet from {}", peer_id); - io.report_peer(peer_id, Severity::Bad("Peer sent us a packet with invalid format")); + trace!(target: "sync", "Invalid packet from {}", who); + io.report_peer(who, Severity::Bad("Peer sent us a packet with invalid format")); return; } }; match message { - GenericMessage::Status(s) => self.on_status_message(io, peer_id, s), - GenericMessage::BlockRequest(r) => self.on_block_request(io, peer_id, r), + GenericMessage::Status(s) => self.on_status_message(io, who, s), + GenericMessage::BlockRequest(r) => self.on_block_request(io, who, r), GenericMessage::BlockResponse(r) => { let request = { let mut peers = self.context_data.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&peer_id) { + if let Some(ref mut peer) = peers.get_mut(&who) { peer.request_timestamp = None; match mem::replace(&mut peer.block_request, None) { Some(r) => r, None => { - io.report_peer(peer_id, Severity::Bad("Unexpected response packet received from peer")); + io.report_peer(who, Severity::Bad("Unexpected response packet received from peer")); return; } } } else { - io.report_peer(peer_id, Severity::Bad("Unexpected packet received from peer")); + io.report_peer(who, Severity::Bad("Unexpected packet received from peer")); return; } }; if request.id != r.id { - trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", peer_id, request.id, r.id); + trace!(target: "sync", "Ignoring mismatched response packet from {} (expected {} got {})", who, request.id, r.id); return; } - self.on_block_response(io, peer_id, request, r); + self.on_block_response(io, who, request, r); }, - GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, peer_id, announce), - GenericMessage::Transactions(m) => self.on_extrinsics(io, peer_id, m), - GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(io, peer_id, request), - GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, peer_id, response), - other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), peer_id, other), + GenericMessage::BlockAnnounce(announce) => self.on_block_announce(io, who, announce), + GenericMessage::Transactions(m) => self.on_extrinsics(io, who, m), + GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(io, who, request), + GenericMessage::RemoteCallResponse(response) => self.on_remote_call_response(io, who, response), + other => self.specialization.write().on_message(&mut ProtocolContext::new(&self.context_data, io), who, other), } } - pub fn send_message(&self, io: &mut SyncIo, peer_id: PeerId, message: Message<B>) { - send_message::<B>(&self.context_data.peers, io, peer_id, message) + pub fn send_message(&self, io: &mut SyncIo, who: NodeIndex, message: Message<B>) { + send_message::<B>(&self.context_data.peers, io, who, message) } /// Called when a new peer is connected - pub fn on_peer_connected(&self, io: &mut SyncIo, peer_id: PeerId) { - trace!(target: "sync", "Connected {}: {}", peer_id, io.peer_info(peer_id)); - self.handshaking_peers.write().insert(peer_id, time::Instant::now()); - self.send_status(io, peer_id); + pub fn on_peer_connected(&self, io: &mut SyncIo, who: NodeIndex) { + trace!(target: "sync", "Connected {}: {}", who, io.peer_info(who)); + self.handshaking_peers.write().insert(who, time::Instant::now()); + self.send_status(io, who); } /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: PeerId) { + pub fn on_peer_disconnected(&self, io: &mut SyncIo, peer: NodeIndex) { trace!(target: "sync", "Disconnecting {}: {}", peer, io.peer_info(peer)); // lock all the the peer lists so that add/remove peer events are in order @@ -305,7 +305,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { } } - fn on_block_request(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest<B>) { + fn on_block_request(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>) { trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?}", request.id, peer, request.from, request.to, request.max); let mut blocks = Vec::new(); let mut id = match request.from { @@ -351,7 +351,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { self.send_message(io, peer, GenericMessage::BlockResponse(response)) } - fn on_block_response(&self, io: &mut SyncIo, peer: PeerId, request: message::BlockRequest<B>, response: message::BlockResponse<B>) { + fn on_block_response(&self, io: &mut SyncIo, peer: NodeIndex, request: message::BlockRequest<B>, response: message::BlockResponse<B>) { // TODO: validate response trace!(target: "sync", "BlockResponse {} from {} with {} blocks", response.id, peer, response.blocks.len()); self.sync.write().on_block_data(&mut ProtocolContext::new(&self.context_data, io), peer, request, response); @@ -369,12 +369,12 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { { let peers = self.context_data.peers.read(); let handshaking_peers = self.handshaking_peers.read(); - for (peer_id, timestamp) in peers.iter() + for (who, timestamp) in peers.iter() .filter_map(|(id, peer)| peer.request_timestamp.as_ref().map(|r| (id, r))) .chain(handshaking_peers.iter()) { if (tick - *timestamp).as_secs() > REQUEST_TIMEOUT_SEC { - trace!(target: "sync", "Timeout {}", peer_id); - aborting.push(*peer_id); + trace!(target: "sync", "Timeout {}", who); + aborting.push(*who); } } } @@ -385,7 +385,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { } } - pub fn peer_info(&self, peer: PeerId) -> Option<PeerInfo<B>> { + pub fn peer_info(&self, peer: NodeIndex) -> Option<PeerInfo<B>> { self.context_data.peers.read().get(&peer).map(|p| { PeerInfo { roles: p.roles, @@ -397,26 +397,26 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { } /// Called by peer to report status - fn on_status_message(&self, io: &mut SyncIo, peer_id: PeerId, status: message::Status<B>) { - trace!(target: "sync", "New peer {} {:?}", peer_id, status); + fn on_status_message(&self, io: &mut SyncIo, who: NodeIndex, status: message::Status<B>) { + trace!(target: "sync", "New peer {} {:?}", who, status); if io.is_expired() { - trace!(target: "sync", "Status packet from expired session {}:{}", peer_id, io.peer_info(peer_id)); + trace!(target: "sync", "Status packet from expired session {}:{}", who, io.peer_info(who)); return; } { let mut peers = self.context_data.peers.write(); let mut handshaking_peers = self.handshaking_peers.write(); - if peers.contains_key(&peer_id) { - debug!(target: "sync", "Unexpected status packet from {}:{}", peer_id, io.peer_info(peer_id)); + if peers.contains_key(&who) { + debug!(target: "sync", "Unexpected status packet from {}:{}", who, io.peer_info(who)); return; } if status.genesis_hash != self.genesis_hash { - io.report_peer(peer_id, Severity::Bad(&format!("Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash))); + io.report_peer(who, Severity::Bad(&format!("Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash))); return; } if status.version != CURRENT_VERSION { - io.report_peer(peer_id, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version))); + io.report_peer(who, Severity::Bad(&format!("Peer using unsupported protocol version {}", status.version))); return; } @@ -431,27 +431,27 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { known_blocks: HashSet::new(), next_request_id: 0, }; - peers.insert(peer_id.clone(), peer); - handshaking_peers.remove(&peer_id); - debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id)); + peers.insert(who.clone(), peer); + handshaking_peers.remove(&who); + debug!(target: "sync", "Connected {} {}", who, io.peer_info(who)); } let mut context = ProtocolContext::new(&self.context_data, io); - self.sync.write().new_peer(&mut context, peer_id); - self.specialization.write().on_connect(&mut context, peer_id, status.clone()); - self.on_demand.as_ref().map(|s| s.on_connect(peer_id, status.roles)); + self.sync.write().new_peer(&mut context, who); + self.specialization.write().on_connect(&mut context, who, status.clone()); + self.on_demand.as_ref().map(|s| s.on_connect(who, status.roles)); } /// Called when peer sends us new extrinsics - fn on_extrinsics(&self, _io: &mut SyncIo, peer_id: PeerId, extrinsics: message::Transactions<B::Extrinsic>) { + fn on_extrinsics(&self, _io: &mut SyncIo, who: NodeIndex, extrinsics: message::Transactions<B::Extrinsic>) { // Accept extrinsics only when fully synced if self.sync.read().status().state != SyncState::Idle { - trace!(target: "sync", "{} Ignoring extrinsics while syncing", peer_id); + trace!(target: "sync", "{} Ignoring extrinsics while syncing", who); return; } - trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), peer_id); + trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who); let mut peers = self.context_data.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&peer_id) { + if let Some(ref mut peer) = peers.get_mut(&who) { for t in extrinsics { if let Some(hash) = self.transaction_pool.import(&t) { peer.known_extrinsics.insert(hash); @@ -473,7 +473,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { let mut propagated_to = HashMap::new(); let mut peers = self.context_data.peers.write(); - for (peer_id, ref mut peer) in peers.iter_mut() { + for (who, ref mut peer) in peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics .iter() .cloned() @@ -481,7 +481,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { .unzip(); if !to_send.is_empty() { - let node_id = io.peer_session_info(*peer_id).map(|info| match info.id { + let node_id = io.peer_session_info(*who).map(|info| match info.id { Some(id) => format!("{}@{:x}", info.remote_address, id), None => info.remote_address.clone(), }); @@ -491,15 +491,15 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { propagated_to.entry(hash).or_insert_with(Vec::new).push(id.clone()); } } - trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), peer_id); - self.send_message(io, *peer_id, GenericMessage::Transactions(to_send)); + trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); + self.send_message(io, *who, GenericMessage::Transactions(to_send)); } } self.transaction_pool.on_broadcasted(propagated_to); } /// Send Status message - fn send_status(&self, io: &mut SyncIo, peer_id: PeerId) { + fn send_status(&self, io: &mut SyncIo, who: NodeIndex) { if let Ok(info) = self.context_data.chain.info() { let status = message::generic::Status { version: CURRENT_VERSION, @@ -509,7 +509,7 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { best_hash: info.chain.best_hash, chain_status: self.specialization.read().status(), }; - self.send_message(io, peer_id, GenericMessage::Status(status)) + self.send_message(io, who, GenericMessage::Status(status)) } } @@ -533,16 +533,16 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { self.abort(); } - pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce<B::Header>) { + pub fn on_block_announce(&self, io: &mut SyncIo, who: NodeIndex, announce: message::BlockAnnounce<B::Header>) { let header = announce.header; let hash = header.hash(); { let mut peers = self.context_data.peers.write(); - if let Some(ref mut peer) = peers.get_mut(&peer_id) { + if let Some(ref mut peer) = peers.get_mut(&who) { peer.known_blocks.insert(hash.clone()); } } - self.sync.write().on_block_announce(&mut ProtocolContext::new(&self.context_data, io), peer_id, hash, &header); + self.sync.write().on_block_announce(&mut ProtocolContext::new(&self.context_data, io), who, hash, &header); } pub fn on_block_imported(&self, io: &mut SyncIo, hash: B::Hash, header: &B::Header) { @@ -561,35 +561,35 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { // send out block announcements let mut peers = self.context_data.peers.write(); - for (peer_id, ref mut peer) in peers.iter_mut() { + for (who, ref mut peer) in peers.iter_mut() { if peer.known_blocks.insert(hash.clone()) { - trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id); - self.send_message(io, *peer_id, GenericMessage::BlockAnnounce(message::BlockAnnounce { + trace!(target: "sync", "Announcing block {:?} to {}", hash, who); + self.send_message(io, *who, GenericMessage::BlockAnnounce(message::BlockAnnounce { header: header.clone() })); } } } - fn on_remote_call_request(&self, io: &mut SyncIo, peer_id: PeerId, request: message::RemoteCallRequest<B::Hash>) { - trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, peer_id, request.method, request.block); + fn on_remote_call_request(&self, io: &mut SyncIo, who: NodeIndex, request: message::RemoteCallRequest<B::Hash>) { + trace!(target: "sync", "Remote call request {} from {} ({} at {})", request.id, who, request.method, request.block); let proof = match self.context_data.chain.execution_proof(&request.block, &request.method, &request.data) { Ok((_, proof)) => proof, Err(error) => { trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}", - request.id, peer_id, request.method, request.block, error); + request.id, who, request.method, request.block, error); Default::default() }, }; - self.send_message(io, peer_id, GenericMessage::RemoteCallResponse(message::RemoteCallResponse { + self.send_message(io, who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse { id: request.id, proof, })); } - fn on_remote_call_response(&self, io: &mut SyncIo, peer_id: PeerId, response: message::RemoteCallResponse) { - trace!(target: "sync", "Remote call response {} from {}", response.id, peer_id); - self.on_demand.as_ref().map(|s| s.on_remote_call_response(io, peer_id, response)); + fn on_remote_call_response(&self, io: &mut SyncIo, who: NodeIndex, response: message::RemoteCallResponse) { + trace!(target: "sync", "Remote call response {} from {}", response.id, who); + self.on_demand.as_ref().map(|s| s.on_remote_call_response(io, who, response)); } /// Execute a closure with access to a network context and specialization. @@ -600,11 +600,11 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> { } } -fn send_message<B: BlockT>(peers: &RwLock<HashMap<PeerId, Peer<B>>>, io: &mut SyncIo, peer_id: PeerId, mut message: Message<B>) { +fn send_message<B: BlockT>(peers: &RwLock<HashMap<NodeIndex, Peer<B>>>, io: &mut SyncIo, who: NodeIndex, mut message: Message<B>) { match &mut message { &mut GenericMessage::BlockRequest(ref mut r) => { let mut peers = peers.write(); - if let Some(ref mut peer) = peers.get_mut(&peer_id) { + if let Some(ref mut peer) = peers.get_mut(&who) { r.id = peer.next_request_id; peer.next_request_id = peer.next_request_id + 1; peer.block_request = Some(r.clone()); @@ -613,7 +613,7 @@ fn send_message<B: BlockT>(peers: &RwLock<HashMap<PeerId, Peer<B>>>, io: &mut Sy }, _ => (), } - io.send(peer_id, message.encode()); + io.send(who, message.encode()); } /// Hash a message. diff --git a/substrate/substrate/network/src/service.rs b/substrate/substrate/network/src/service.rs index f8b146d19fc8696602ca4d07eb907d67db57fbf9..77114e9d079c3b87183cc1dd3956c9326f54c549 100644 --- a/substrate/substrate/network/src/service.rs +++ b/substrate/substrate/network/src/service.rs @@ -19,7 +19,7 @@ use std::sync::Arc; use std::io; use std::time::Duration; use futures::sync::{oneshot, mpsc}; -use network_libp2p::{NetworkProtocolHandler, NetworkContext, PeerId, ProtocolId, +use network_libp2p::{NetworkProtocolHandler, NetworkContext, NodeIndex, ProtocolId, NetworkConfiguration , NonReservedPeerMode, ErrorKind}; use network_libp2p::{NetworkService}; use core_io::{TimerToken}; @@ -244,8 +244,8 @@ impl<B: BlockT + 'static, S: Specialization<B>> SyncProvider<B> for Service<B, S self.network.with_context_eval(self.protocol_id, |ctx| { let peer_ids = self.network.connected_peers(); - peer_ids.into_iter().filter_map(|peer_id| { - let session_info = match ctx.session_info(peer_id) { + peer_ids.into_iter().filter_map(|who| { + let session_info = match ctx.session_info(who) { None => return None, Some(info) => info, }; @@ -256,7 +256,7 @@ impl<B: BlockT + 'static, S: Specialization<B>> SyncProvider<B> for Service<B, S capabilities: session_info.peer_capabilities.into_iter().map(|c| c.to_string()).collect(), remote_address: session_info.remote_address, local_address: session_info.local_address, - dot_info: self.handler.protocol.peer_info(peer_id), + dot_info: self.handler.protocol.peer_info(who), }) }).collect() }).unwrap_or_else(Vec::new) @@ -276,15 +276,15 @@ impl<B: BlockT + 'static, S: Specialization<B>> NetworkProtocolHandler for Proto .expect("Error registering transaction propagation timer"); } - fn read(&self, io: &NetworkContext, peer: &PeerId, _packet_id: u8, data: &[u8]) { + fn read(&self, io: &NetworkContext, peer: &NodeIndex, _packet_id: u8, data: &[u8]) { self.protocol.handle_packet(&mut NetSyncIo::new(io), *peer, data); } - fn connected(&self, io: &NetworkContext, peer: &PeerId) { + fn connected(&self, io: &NetworkContext, peer: &NodeIndex) { self.protocol.on_peer_connected(&mut NetSyncIo::new(io), *peer); } - fn disconnected(&self, io: &NetworkContext, peer: &PeerId) { + fn disconnected(&self, io: &NetworkContext, peer: &NodeIndex) { self.protocol.on_peer_disconnected(&mut NetSyncIo::new(io), *peer); } diff --git a/substrate/substrate/network/src/specialization.rs b/substrate/substrate/network/src/specialization.rs index 999c545291d887bc4be1da7997917a1689a2fa0d..3c04a367059cdfab6a94addb5393751b2a34a8cc 100644 --- a/substrate/substrate/network/src/specialization.rs +++ b/substrate/substrate/network/src/specialization.rs @@ -16,7 +16,7 @@ //! Specializations of the substrate network protocol to allow more complex forms of communication. -use ::PeerId; +use ::NodeIndex; use runtime_primitives::traits::Block as BlockT; use protocol::Context; @@ -29,13 +29,13 @@ pub trait Specialization<B: BlockT>: Send + Sync + 'static { fn on_start(&mut self) { } /// Called when a peer successfully handshakes. - fn on_connect(&mut self, ctx: &mut Context<B>, peer_id: PeerId, status: ::message::Status<B>); + fn on_connect(&mut self, ctx: &mut Context<B>, who: NodeIndex, status: ::message::Status<B>); /// Called when a peer is disconnected. If the peer ID is unknown, it should be ignored. - fn on_disconnect(&mut self, ctx: &mut Context<B>, peer_id: PeerId); + fn on_disconnect(&mut self, ctx: &mut Context<B>, who: NodeIndex); /// Called when a network-specific message arrives. - fn on_message(&mut self, ctx: &mut Context<B>, peer_id: PeerId, message: ::message::Message<B>); + fn on_message(&mut self, ctx: &mut Context<B>, who: NodeIndex, message: ::message::Message<B>); /// Called on abort. fn on_abort(&mut self) { } diff --git a/substrate/substrate/network/src/sync.rs b/substrate/substrate/network/src/sync.rs index 6096cd294a24f93f176b5c95ecd5c0dd2bc8cc3d..3cfcf07ef340fb574e27c992dd3e4d8b1948133e 100644 --- a/substrate/substrate/network/src/sync.rs +++ b/substrate/substrate/network/src/sync.rs @@ -17,7 +17,7 @@ use std::collections::HashMap; use std::sync::Arc; use protocol::Context; -use network_libp2p::{Severity, PeerId}; +use network_libp2p::{Severity, NodeIndex}; use client::{BlockStatus, BlockOrigin, ClientInfo}; use client::error::Error as ClientError; use blocks::{self, BlockCollection}; @@ -51,7 +51,7 @@ enum PeerSyncState<B: BlockT> { /// Relay chain sync strategy. pub struct ChainSync<B: BlockT> { genesis_hash: B::Hash, - peers: HashMap<PeerId, PeerSync<B>>, + peers: HashMap<NodeIndex, PeerSync<B>>, blocks: BlockCollection<B>, best_queued_number: NumberFor<B>, best_queued_hash: B::Hash, @@ -119,47 +119,47 @@ impl<B: BlockT> ChainSync<B> { } /// Handle new connected peer. - pub(crate) fn new_peer(&mut self, protocol: &mut Context<B>, peer_id: PeerId) { - if let Some(info) = protocol.peer_info(peer_id) { + pub(crate) fn new_peer(&mut self, protocol: &mut Context<B>, who: NodeIndex) { + if let Some(info) = protocol.peer_info(who) { match (block_status(&*protocol.client(), &*self.import_queue, info.best_hash), info.best_number) { (Err(e), _) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); - protocol.report_peer(peer_id, Severity::Useless(&format!("Error legimimately reading blockchain status: {:?}", e))); + protocol.report_peer(who, Severity::Useless(&format!("Error legimimately reading blockchain status: {:?}", e))); }, (Ok(BlockStatus::KnownBad), _) => { - protocol.report_peer(peer_id, Severity::Bad(&format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number))); + protocol.report_peer(who, Severity::Bad(&format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number))); }, (Ok(BlockStatus::Unknown), b) if b == As::sa(0) => { - protocol.report_peer(peer_id, Severity::Bad(&format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number))); + protocol.report_peer(who, Severity::Bad(&format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number))); }, (Ok(BlockStatus::Unknown), _) => { let our_best = self.best_queued_number; if our_best > As::sa(0) { debug!(target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number); - self.peers.insert(peer_id, PeerSync { + self.peers.insert(who, PeerSync { common_hash: self.genesis_hash, common_number: As::sa(0), best_hash: info.best_hash, best_number: info.best_number, state: PeerSyncState::AncestorSearch(our_best), }); - Self::request_ancestry(protocol, peer_id, our_best) + Self::request_ancestry(protocol, who, our_best) } else { // We are at genesis, just start downloading debug!(target:"sync", "New peer with best hash {} ({}).", info.best_hash, info.best_number); - self.peers.insert(peer_id, PeerSync { + self.peers.insert(who, PeerSync { common_hash: self.genesis_hash, common_number: As::sa(0), best_hash: info.best_hash, best_number: info.best_number, state: PeerSyncState::Available, }); - self.download_new(protocol, peer_id) + self.download_new(protocol, who) } }, (Ok(BlockStatus::Queued), _) | (Ok(BlockStatus::InChain), _) => { debug!(target:"sync", "New peer with known best hash {} ({}).", info.best_hash, info.best_number); - self.peers.insert(peer_id, PeerSync { + self.peers.insert(who, PeerSync { common_hash: info.best_hash, common_number: info.best_number, best_hash: info.best_hash, @@ -171,27 +171,27 @@ impl<B: BlockT> ChainSync<B> { } } - pub(crate) fn on_block_data(&mut self, protocol: &mut Context<B>, peer_id: PeerId, _request: message::BlockRequest<B>, response: message::BlockResponse<B>) { - let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + pub(crate) fn on_block_data(&mut self, protocol: &mut Context<B>, who: NodeIndex, _request: message::BlockRequest<B>, response: message::BlockResponse<B>) { + let new_blocks = if let Some(ref mut peer) = self.peers.get_mut(&who) { match peer.state { PeerSyncState::DownloadingNew(start_block) => { - self.blocks.clear_peer_download(peer_id); + self.blocks.clear_peer_download(who); peer.state = PeerSyncState::Available; - self.blocks.insert(start_block, response.blocks, peer_id); + self.blocks.insert(start_block, response.blocks, who); self.blocks.drain(self.best_queued_number + As::sa(1)) }, PeerSyncState::DownloadingStale(_) => { peer.state = PeerSyncState::Available; response.blocks.into_iter().map(|b| blocks::BlockData { - origin: peer_id, + origin: who, block: b }).collect() }, PeerSyncState::AncestorSearch(n) => { match response.blocks.get(0) { Some(ref block) => { - trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", n, block.hash, peer_id); + trace!(target: "sync", "Got ancestry block #{} ({}) from peer {}", n, block.hash, who); match protocol.client().block_hash(n) { Ok(Some(block_hash)) if block_hash == block.hash => { if peer.common_number < n { @@ -199,30 +199,30 @@ impl<B: BlockT> ChainSync<B> { peer.common_number = n; } peer.state = PeerSyncState::Available; - trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n); + trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", who, block.hash, n); vec![] }, Ok(our_best) if n > As::sa(0) => { - trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", peer_id, block.hash, n, our_best); + trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", who, block.hash, n, our_best); let n = n - As::sa(1); peer.state = PeerSyncState::AncestorSearch(n); - Self::request_ancestry(protocol, peer_id, n); + Self::request_ancestry(protocol, who, n); return; }, Ok(_) => { // genesis mismatch - trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", peer_id); - protocol.report_peer(peer_id, Severity::Bad("Ancestry search: genesis mismatch for peer")); + trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); + protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer")); return; }, Err(e) => { - protocol.report_peer(peer_id, Severity::Useless(&format!("Error answering legitimate blockchain query: {:?}", e))); + protocol.report_peer(who, Severity::Useless(&format!("Error answering legitimate blockchain query: {:?}", e))); return; } } }, None => { - trace!(target:"sync", "Invalid response when searching for ancestor from {}", peer_id); - protocol.report_peer(peer_id, Severity::Bad("Invalid response when searching for ancestor")); + trace!(target:"sync", "Invalid response when searching for ancestor from {}", who); + protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor")); return; } } @@ -241,7 +241,7 @@ impl<B: BlockT> ChainSync<B> { } pub fn maintain_sync(&mut self, protocol: &mut Context<B>) { - let peers: Vec<PeerId> = self.peers.keys().map(|p| *p).collect(); + let peers: Vec<NodeIndex> = self.peers.keys().map(|p| *p).collect(); for peer in peers { self.download_new(protocol, peer); } @@ -267,9 +267,9 @@ impl<B: BlockT> ChainSync<B> { self.block_imported(&hash, best_header.number().clone()) } - pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, peer_id: PeerId, hash: B::Hash, header: &B::Header) { + pub(crate) fn on_block_announce(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: B::Hash, header: &B::Header) { let number = *header.number(); - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + if let Some(ref mut peer) = self.peers.get_mut(&who) { if number > peer.best_number { peer.best_number = number; peer.best_hash = hash; @@ -285,17 +285,17 @@ impl<B: BlockT> ChainSync<B> { let stale = number <= self.best_queued_number; if stale { if !self.is_known_or_already_downloading(protocol, header.parent_hash()) { - trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", peer_id, hash, header); + trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", who, hash, header); } else { - trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header); - self.download_stale(protocol, peer_id, &hash); + trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", who, hash, header); + self.download_stale(protocol, who, &hash); } } else { - trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", peer_id, hash, header); - self.download_new(protocol, peer_id); + trace!(target: "sync", "Downloading new block announced from {}: {} {:?}", who, hash, header); + self.download_new(protocol, who); } } else { - trace!(target: "sync", "Known block announce from {}: {}", peer_id, hash); + trace!(target: "sync", "Known block announce from {}: {}", who, hash); } } @@ -304,16 +304,16 @@ impl<B: BlockT> ChainSync<B> { || block_status(&*protocol.client(), &*self.import_queue, *hash).ok().map_or(false, |s| s != BlockStatus::Unknown) } - pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, peer_id: PeerId) { - self.blocks.clear_peer_download(peer_id); - self.peers.remove(&peer_id); + pub(crate) fn peer_disconnected(&mut self, protocol: &mut Context<B>, who: NodeIndex) { + self.blocks.clear_peer_download(who); + self.peers.remove(&who); self.maintain_sync(protocol); } pub(crate) fn restart(&mut self, protocol: &mut Context<B>) { self.import_queue.clear(); self.blocks.clear(); - let ids: Vec<PeerId> = self.peers.keys().map(|p| *p).collect(); + let ids: Vec<NodeIndex> = self.peers.keys().map(|p| *p).collect(); for id in ids { self.new_peer(protocol, id); } @@ -336,8 +336,8 @@ impl<B: BlockT> ChainSync<B> { } // Download old block. - fn download_stale(&mut self, protocol: &mut Context<B>, peer_id: PeerId, hash: &B::Hash) { - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + fn download_stale(&mut self, protocol: &mut Context<B>, who: NodeIndex, hash: &B::Hash) { + if let Some(ref mut peer) = self.peers.get_mut(&who) { match peer.state { PeerSyncState::Available => { let request = message::generic::BlockRequest { @@ -349,7 +349,7 @@ impl<B: BlockT> ChainSync<B> { max: Some(1), }; peer.state = PeerSyncState::DownloadingStale(*hash); - protocol.send_message(peer_id, GenericMessage::BlockRequest(request)); + protocol.send_message(who, GenericMessage::BlockRequest(request)); }, _ => (), } @@ -357,8 +357,8 @@ impl<B: BlockT> ChainSync<B> { } // Issue a request for a peer to download new blocks, if any are available - fn download_new(&mut self, protocol: &mut Context<B>, peer_id: PeerId) { - if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { + fn download_new(&mut self, protocol: &mut Context<B>, who: NodeIndex) { + if let Some(ref mut peer) = self.peers.get_mut(&who) { let import_status = self.import_queue.status(); // when there are too many blocks in the queue => do not try to download new blocks if import_status.importing_count > MAX_IMPORING_BLOCKS { @@ -367,11 +367,11 @@ impl<B: BlockT> ChainSync<B> { // we should not download already queued blocks let common_number = ::std::cmp::max(peer.common_number, import_status.best_importing_number); - trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", peer_id, common_number, peer.best_number); + trace!(target: "sync", "Considering new block download from {}, common block is {}, best is {:?}", who, common_number, peer.best_number); match peer.state { PeerSyncState::Available => { - if let Some(range) = self.blocks.needed_blocks(peer_id, MAX_BLOCKS_TO_REQUEST, peer.best_number, common_number) { - trace!(target: "sync", "Requesting blocks from {}, ({} to {})", peer_id, range.start, range.end); + if let Some(range) = self.blocks.needed_blocks(who, MAX_BLOCKS_TO_REQUEST, peer.best_number, common_number) { + trace!(target: "sync", "Requesting blocks from {}, ({} to {})", who, range.start, range.end); let request = message::generic::BlockRequest { id: 0, fields: self.required_block_attributes.clone(), @@ -381,7 +381,7 @@ impl<B: BlockT> ChainSync<B> { max: Some((range.end - range.start).as_() as u32), }; peer.state = PeerSyncState::DownloadingNew(range.start); - protocol.send_message(peer_id, GenericMessage::BlockRequest(request)); + protocol.send_message(who, GenericMessage::BlockRequest(request)); } else { trace!(target: "sync", "Nothing to request"); } @@ -391,8 +391,8 @@ impl<B: BlockT> ChainSync<B> { } } - fn request_ancestry(protocol: &mut Context<B>, peer_id: PeerId, block: NumberFor<B>) { - trace!(target: "sync", "Requesting ancestry block #{} from {}", block, peer_id); + fn request_ancestry(protocol: &mut Context<B>, who: NodeIndex, block: NumberFor<B>) { + trace!(target: "sync", "Requesting ancestry block #{} from {}", block, who); let request = message::generic::BlockRequest { id: 0, fields: message::BlockAttributes::HEADER | message::BlockAttributes::JUSTIFICATION, @@ -401,7 +401,7 @@ impl<B: BlockT> ChainSync<B> { direction: message::Direction::Ascending, max: Some(1), }; - protocol.send_message(peer_id, GenericMessage::BlockRequest(request)); + protocol.send_message(who, GenericMessage::BlockRequest(request)); } } diff --git a/substrate/substrate/network/src/test/mod.rs b/substrate/substrate/network/src/test/mod.rs index 1011d970dcb397489d238ecd68dea14a81c45424..5a8d82d44839873c4dfc1004881c4c5006f7d1bc 100644 --- a/substrate/substrate/network/src/test/mod.rs +++ b/substrate/substrate/network/src/test/mod.rs @@ -28,7 +28,7 @@ use io::SyncIo; use protocol::{Context, Protocol}; use config::ProtocolConfig; use service::TransactionPool; -use network_libp2p::{PeerId, SessionInfo, Severity}; +use network_libp2p::{NodeIndex, SessionInfo, Severity}; use keyring::Keyring; use codec::Encode; use import_queue::tests::SyncImportQueue; @@ -41,29 +41,29 @@ pub struct DummySpecialization; impl Specialization<Block> for DummySpecialization { fn status(&self) -> Vec<u8> { vec![] } - fn on_connect(&mut self, _ctx: &mut Context<Block>, _peer_id: PeerId, _status: ::message::Status<Block>) { + fn on_connect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex, _status: ::message::Status<Block>) { } - fn on_disconnect(&mut self, _ctx: &mut Context<Block>, _peer_id: PeerId) { + fn on_disconnect(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex) { } - fn on_message(&mut self, _ctx: &mut Context<Block>, _peer_id: PeerId, _message: ::message::Message<Block>) { + fn on_message(&mut self, _ctx: &mut Context<Block>, _peer_id: NodeIndex, _message: ::message::Message<Block>) { } } pub struct TestIo<'p> { queue: &'p RwLock<VecDeque<TestPacket>>, - pub to_disconnect: HashSet<PeerId>, + pub to_disconnect: HashSet<NodeIndex>, packets: Vec<TestPacket>, - peers_info: HashMap<PeerId, String>, - _sender: Option<PeerId>, + peers_info: HashMap<NodeIndex, String>, + _sender: Option<NodeIndex>, } impl<'p> TestIo<'p> where { - pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<PeerId>) -> TestIo<'p> { + pub fn new(queue: &'p RwLock<VecDeque<TestPacket>>, sender: Option<NodeIndex>) -> TestIo<'p> { TestIo { queue: queue, _sender: sender, @@ -81,28 +81,28 @@ impl<'p> Drop for TestIo<'p> { } impl<'p> SyncIo for TestIo<'p> { - fn report_peer(&mut self, peer_id: PeerId, _reason: Severity) { - self.to_disconnect.insert(peer_id); + fn report_peer(&mut self, who: NodeIndex, _reason: Severity) { + self.to_disconnect.insert(who); } fn is_expired(&self) -> bool { false } - fn send(&mut self, peer_id: PeerId, data: Vec<u8>) { + fn send(&mut self, who: NodeIndex, data: Vec<u8>) { self.packets.push(TestPacket { data: data, - recipient: peer_id, + recipient: who, }); } - fn peer_info(&self, peer_id: PeerId) -> String { - self.peers_info.get(&peer_id) + fn peer_info(&self, who: NodeIndex) -> String { + self.peers_info.get(&who) .cloned() - .unwrap_or_else(|| peer_id.to_string()) + .unwrap_or_else(|| who.to_string()) } - fn peer_session_info(&self, _peer_id: PeerId) -> Option<SessionInfo> { + fn peer_session_info(&self, _peer_id: NodeIndex) -> Option<SessionInfo> { None } } @@ -110,7 +110,7 @@ impl<'p> SyncIo for TestIo<'p> { /// Mocked subprotocol packet pub struct TestPacket { data: Vec<u8>, - recipient: PeerId, + recipient: NodeIndex, } pub struct Peer { @@ -129,18 +129,18 @@ impl Peer { } /// Called on connection to other indicated peer. - fn on_connect(&self, other: PeerId) { + fn on_connect(&self, other: NodeIndex) { self.sync.on_peer_connected(&mut TestIo::new(&self.queue, Some(other)), other); } /// Called on disconnect from other indicated peer. - fn on_disconnect(&self, other: PeerId) { + fn on_disconnect(&self, other: NodeIndex) { let mut io = TestIo::new(&self.queue, Some(other)); self.sync.on_peer_disconnected(&mut io, other); } /// Receive a message from another peer. Return a set of peers to disconnect. - fn receive_message(&self, from: PeerId, msg: TestPacket) -> HashSet<PeerId> { + fn receive_message(&self, from: NodeIndex, msg: TestPacket) -> HashSet<NodeIndex> { let mut io = TestIo::new(&self.queue, Some(from)); self.sync.handle_packet(&mut io, from, &msg.data); self.flush(); @@ -219,7 +219,7 @@ impl TransactionPool<Block> for EmptyTransactionPool { pub struct TestNet { peers: Vec<Arc<Peer>>, started: bool, - disconnect_events: Vec<(PeerId, PeerId)>, //disconnected (initiated by, to) + disconnect_events: Vec<(NodeIndex, NodeIndex)>, //disconnected (initiated by, to) } impl TestNet { @@ -264,7 +264,7 @@ impl TestNet { self.peers[peer].start(); for client in 0..self.peers.len() { if peer != client { - self.peers[peer].on_connect(client as PeerId); + self.peers[peer].on_connect(client as NodeIndex); } } } @@ -278,17 +278,17 @@ impl TestNet { let disconnecting = { let recipient = packet.recipient; trace!("--- {} -> {} ---", peer, recipient); - let to_disconnect = self.peers[recipient].receive_message(peer as PeerId, packet); + let to_disconnect = self.peers[recipient].receive_message(peer as NodeIndex, packet); for d in &to_disconnect { // notify this that disconnecting peers are disconnecting - self.peers[recipient].on_disconnect(*d as PeerId); + self.peers[recipient].on_disconnect(*d as NodeIndex); self.disconnect_events.push((peer, *d)); } to_disconnect }; for d in &disconnecting { // notify other peers that this peer is disconnecting - self.peers[*d].on_disconnect(peer as PeerId); + self.peers[*d].on_disconnect(peer as NodeIndex); } }