diff --git a/substrate/core/consensus/common/src/import_queue.rs b/substrate/core/consensus/common/src/import_queue.rs index 209e27e5e209bf757d348647f0a518edefe1c56f..a2da3ed2e55adbfadc1411b5105ff3b20101c5db 100644 --- a/substrate/core/consensus/common/src/import_queue.rs +++ b/substrate/core/consensus/common/src/import_queue.rs @@ -42,6 +42,15 @@ use runtime_primitives::Justification; use crate::error::Error as ConsensusError; use parity_codec::alloc::collections::hash_map::HashMap; +/// Reputation change for peers which send us a block with an incomplete header. +const INCOMPLETE_HEADER_REPUTATION_CHANGE: i32 = -(1 << 20); +/// Reputation change for peers which send us a block which we fail to verify. +const VERIFICATION_FAIL_REPUTATION_CHANGE: i32 = -(1 << 20); +/// Reputation change for peers which send us a bad block. +const BAD_BLOCK_REPUTATION_CHANGE: i32 = -(1 << 29); +/// Reputation change for peers which send us a block with bad justifications. +const BAD_JUSTIFICATION_REPUTATION_CHANGE: i32 = -(1 << 16); + /// Shared block import struct used by the queue. pub type SharedBlockImport<B> = Arc<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>; @@ -362,23 +371,30 @@ impl<B: BlockT> BlockImporter<B> { if aux.bad_justification { if let Some(peer) = who { - link.useless_peer(peer, "Sent block with bad justification to import"); + info!("Sent block with bad justification to import"); + link.report_peer(peer, BAD_JUSTIFICATION_REPUTATION_CHANGE); } } }, Err(BlockImportError::IncompleteHeader(who)) => { if let Some(peer) = who { - link.note_useless_and_restart_sync(peer, "Sent block with incomplete header to import"); + info!("Peer sent block with incomplete header to import"); + link.report_peer(peer, INCOMPLETE_HEADER_REPUTATION_CHANGE); + link.restart(); } }, Err(BlockImportError::VerificationFailed(who, e)) => { if let Some(peer) = who { - link.note_useless_and_restart_sync(peer, &format!("Verification failed: {}", e)); + info!("Verification failed from peer: {}", e); + link.report_peer(peer, VERIFICATION_FAIL_REPUTATION_CHANGE); + link.restart(); } }, Err(BlockImportError::BadBlock(who)) => { if let Some(peer) = who { - link.note_useless_and_restart_sync(peer, "Sent us a bad block"); + info!("Bad block"); + link.report_peer(peer, BAD_BLOCK_REPUTATION_CHANGE); + link.restart(); } }, Err(BlockImportError::UnknownParent) | Err(BlockImportError::Error) => { @@ -515,10 +531,8 @@ pub trait Link<B: BlockT>: Send { fn clear_justification_requests(&self) {} /// Request a justification for the given block. fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {} - /// Disconnect from peer. - fn useless_peer(&self, _who: Origin, _reason: &str) {} - /// Disconnect from peer and restart sync. - fn note_useless_and_restart_sync(&self, _who: Origin, _reason: &str) {} + /// Adjusts the reputation of the given peer. + fn report_peer(&self, _who: Origin, _reputation_change: i32) {} /// Restart sync. fn restart(&self) {} /// Synchronization request has been processed. @@ -650,13 +664,9 @@ mod tests { fn block_imported(&self, _hash: &Hash, _number: NumberFor<Block>) { let _ = self.sender.send(LinkMsg::BlockImported); } - fn useless_peer(&self, _: Origin, _: &str) { + fn report_peer(&self, _: Origin, _: i32) { let _ = self.sender.send(LinkMsg::Disconnected); } - fn note_useless_and_restart_sync(&self, id: Origin, r: &str) { - self.useless_peer(id, r); - self.restart(); - } fn restart(&self) { let _ = self.sender.send(LinkMsg::Restarted); } diff --git a/substrate/core/network-libp2p/src/behaviour.rs b/substrate/core/network-libp2p/src/behaviour.rs index 1b4bdd309c88072d79fe9c174c834fe88b96fbdb..7091cd072933c331925fdd42d79d504a083d0dc4 100644 --- a/substrate/core/network-libp2p/src/behaviour.rs +++ b/substrate/core/network-libp2p/src/behaviour.rs @@ -29,7 +29,7 @@ use libp2p::mdns::{Mdns, MdnsEvent}; use libp2p::multiaddr::Protocol; use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; use log::{debug, info, trace, warn}; -use std::{borrow::Cow, cmp, fmt, time::Duration}; +use std::{borrow::Cow, cmp, time::Duration}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_timer::{Delay, clock::Clock}; use void; @@ -451,27 +451,3 @@ where Async::NotReady } } - -/// The severity of misbehaviour of a peer that is reported. -#[derive(Debug, PartialEq, Eq, Clone)] -pub enum Severity { - /// Peer is timing out. Could be bad connectivity of overload of work on either of our sides. - Timeout, - /// Peer has been notably useless. E.g. unable to answer a request that we might reasonably consider - /// it could answer. - Useless(String), - /// Peer has behaved in an invalid manner. This doesn't necessarily need to be Byzantine, but peer - /// must have taken concrete action in order to behave in such a way which is wantanly invalid. - Bad(String), -} - -impl fmt::Display for Severity { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - Severity::Timeout => write!(fmt, "Timeout"), - Severity::Useless(r) => write!(fmt, "Useless ({})", r), - Severity::Bad(r) => write!(fmt, "Bad ({})", r), - } - } -} - diff --git a/substrate/core/network-libp2p/src/lib.rs b/substrate/core/network-libp2p/src/lib.rs index 639a74933d2a62395c251ac9e29c1556d5311660..3876ac9e5fb42f086c858fd66e046e42d1c0b240 100644 --- a/substrate/core/network-libp2p/src/lib.rs +++ b/substrate/core/network-libp2p/src/lib.rs @@ -25,7 +25,6 @@ mod custom_proto; mod service_task; mod transport; -pub use crate::behaviour::Severity; pub use crate::config::*; pub use crate::custom_proto::{CustomMessage, RegisteredProtocol}; pub use crate::config::{NetworkConfiguration, NodeKeyConfig, Secret, NonReservedPeerMode}; diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index 9f1267118c23785afca03d5a86ddca24e4908366..668b30de79544443f7c66098458c9f6f7af44e7e 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -24,7 +24,7 @@ use std::time; use log::{trace, debug}; use futures::sync::mpsc; use lru_cache::LruCache; -use network_libp2p::{Severity, PeerId}; +use network_libp2p::PeerId; use runtime_primitives::traits::{Block as BlockT, Hash, HashFor}; use runtime_primitives::ConsensusEngineId; pub use crate::message::generic::{Message, ConsensusMessage}; @@ -35,6 +35,15 @@ use crate::config::Roles; const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096; const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30); +/// Reputation change when a peer sends us a gossip message that we didn't know about. +const GOSSIP_SUCCESS_REPUTATION_CHANGE: i32 = 1 << 4; +/// Reputation change when a peer sends us a gossip message that we already knew about. +const DUPLICATE_GOSSIP_REPUTATION_CHANGE: i32 = -(1 << 2); +/// Reputation change when a peer sends us a gossip message for an unknown engine, whatever that +/// means. +const UNKNOWN_GOSSIP_REPUTATION_CHANGE: i32 = -(1 << 6); +/// Reputation change when a peer sends a message from a topic it isn't registered on. +const UNREGISTERED_TOPIC_REPUTATION_CHANGE: i32 = -(1 << 10); struct PeerConsensus<H> { known_messages: HashSet<H>, @@ -389,6 +398,7 @@ impl<B: BlockT> ConsensusGossip<B> { if self.known_messages.contains_key(&message_hash) { trace!(target:"gossip", "Ignored already known message from {}", who); + protocol.report_peer(who.clone(), DUPLICATE_GOSSIP_REPUTATION_CHANGE); return; } @@ -406,15 +416,14 @@ impl<B: BlockT> ConsensusGossip<B> { Some(ValidationResult::Discard) => None, None => { trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who); - protocol.report_peer( - who, - Severity::Useless(format!("Sent unknown consensus engine id")), - ); + protocol.report_peer(who.clone(), UNKNOWN_GOSSIP_REPUTATION_CHANGE); + protocol.disconnect_peer(who); return; } }; if let Some((topic, keep)) = validation_result { + protocol.report_peer(who.clone(), GOSSIP_SUCCESS_REPUTATION_CHANGE); if let Some(ref mut peer) = self.peers.get_mut(&who) { peer.known_messages.insert(message_hash); if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { @@ -437,6 +446,7 @@ impl<B: BlockT> ConsensusGossip<B> { } } else { trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); + protocol.report_peer(who.clone(), UNREGISTERED_TOPIC_REPUTATION_CHANGE); } } else { trace!(target:"gossip", "Handled valid one hop message from peer {}", who); diff --git a/substrate/core/network/src/lib.rs b/substrate/core/network/src/lib.rs index 1e3a7f1552b21de1bbf651d06c85af43265a323f..4c538e2cc0ee1c201a66674c0df697336e3720ae 100644 --- a/substrate/core/network/src/lib.rs +++ b/substrate/core/network/src/lib.rs @@ -19,7 +19,7 @@ //! Substrate-specific P2P networking: synchronizing blocks, propagating BFT messages. //! Allows attachment of an optional subprotocol for chain-specific requests. -//! +//! //! **Important**: This crate is unstable and the API and usage may change. //! @@ -49,7 +49,7 @@ pub use protocol::{ProtocolStatus, PeerInfo, Context}; pub use sync::{Status as SyncStatus, SyncState}; pub use network_libp2p::{ identity, multiaddr, - ProtocolId, Severity, Multiaddr, + ProtocolId, Multiaddr, NetworkState, NetworkStatePeer, NetworkStateNotConnectedPeer, NetworkStatePeerEndpoint, NodeKeyConfig, Secret, Secp256k1Secret, Ed25519Secret, build_multiaddr, PeerId, PublicKey diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs index ca2d04a662ab26b9a44a2ee61d1b8be100bda1ac..6bd303e93ebc453dfd59a646edc59d1543b9469b 100644 --- a/substrate/core/network/src/on_demand.rs +++ b/substrate/core/network/src/on_demand.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::{Instant, Duration}; -use log::trace; +use log::{trace, info}; use futures::{Async, Future, Poll}; use futures::sync::oneshot::{channel, Receiver, Sender as OneShotSender}; use linked_hash_map::LinkedHashMap; @@ -29,7 +29,7 @@ use client::error::Error as ClientError; use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest, RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof}; use crate::message; -use network_libp2p::{Severity, PeerId}; +use network_libp2p::PeerId; use crate::config::Roles; use crate::service::{NetworkChan, NetworkMsg}; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; @@ -38,6 +38,8 @@ use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor}; const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); /// Default request retry count. const RETRY_COUNT: usize = 1; +/// Reputation change for a peer when a request timed out. +const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8); /// On-demand service API. pub trait OnDemandService<Block: BlockT>: Send + Sync { @@ -175,8 +177,9 @@ impl<B: BlockT> OnDemand<B> where let request = match core.remove(peer.clone(), request_id) { Some(request) => request, None => { - let reason = format!("Invalid remote {} response from peer", rtype); - self.send(NetworkMsg::ReportPeer(peer.clone(), Severity::Bad(reason))); + info!("Invalid remote {} response from peer {}", rtype, peer); + self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value())); + self.send(NetworkMsg::DisconnectPeer(peer.clone())); core.remove_peer(peer); return; }, @@ -186,8 +189,9 @@ impl<B: BlockT> OnDemand<B> where let (retry_count, retry_request_data) = match try_accept(request) { Accept::Ok => (retry_count, None), Accept::CheckFailed(error, retry_request_data) => { - let reason = format!("Failed to check remote {} response from peer: {}", rtype, error); - self.send(NetworkMsg::ReportPeer(peer.clone(), Severity::Bad(reason))); + info!("Failed to check remote {} response from peer {}: {}", rtype, peer, error); + self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value())); + self.send(NetworkMsg::DisconnectPeer(peer.clone())); core.remove_peer(peer); if retry_count > 0 { @@ -199,8 +203,9 @@ impl<B: BlockT> OnDemand<B> where } }, Accept::Unexpected(retry_request_data) => { - let reason = format!("Unexpected response to remote {} from peer", rtype); - self.send(NetworkMsg::ReportPeer(peer.clone(), Severity::Bad(reason))); + info!("Unexpected response to remote {} from peer", rtype); + self.send(NetworkMsg::ReportPeer(peer.clone(), i32::min_value())); + self.send(NetworkMsg::DisconnectPeer(peer.clone())); core.remove_peer(peer); (retry_count, Some(retry_request_data)) @@ -244,7 +249,8 @@ impl<B> OnDemandService<B> for OnDemand<B> where fn maintain_peers(&self) { let mut core = self.core.lock(); for bad_peer in core.maintain_peers() { - self.send(NetworkMsg::ReportPeer(bad_peer, Severity::Timeout)); + self.send(NetworkMsg::ReportPeer(bad_peer.clone(), TIMEOUT_REPUTATION_CHANGE)); + self.send(NetworkMsg::DisconnectPeer(bad_peer)); } core.dispatch(self); } @@ -532,7 +538,7 @@ pub mod tests { RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof}; use crate::config::Roles; use crate::message; - use network_libp2p::{PeerId, Severity}; + use network_libp2p::PeerId; use crate::service::{network_channel, NetworkPort, NetworkMsg}; use super::{REQUEST_TIMEOUT, OnDemand, OnDemandService}; use test_client::runtime::{changes_trie_config, Block, Header}; @@ -603,15 +609,11 @@ pub mod tests { } } - fn assert_disconnected_peer(network_port: NetworkPort<Block>, expected_severity: Severity) { + fn assert_disconnected_peer(network_port: NetworkPort<Block>) { let mut disconnect_count = 0; while let Ok(msg) = network_port.receiver().try_recv() { match msg { - NetworkMsg::ReportPeer(_, severity) => { - if severity == expected_severity { - disconnect_count = disconnect_count + 1; - } - }, + NetworkMsg::DisconnectPeer(_) => disconnect_count = disconnect_count + 1, _ => {}, } } @@ -672,7 +674,7 @@ pub mod tests { on_demand.maintain_peers(); assert!(on_demand.core.lock().idle_peers.is_empty()); assert_eq!(vec![peer1.clone()], on_demand.core.lock().active_peers.keys().cloned().collect::<Vec<_>>()); - assert_disconnected_peer(network_port, Severity::Timeout); + assert_disconnected_peer(network_port); } #[test] @@ -691,7 +693,7 @@ pub mod tests { retry_count: None, }); receive_call_response(&*on_demand, peer0, 1); - assert_disconnected_peer(network_port, Severity::Bad("Invalid remote call response from peer".to_string())); + assert_disconnected_peer(network_port); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -711,7 +713,7 @@ pub mod tests { on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); receive_call_response(&*on_demand, peer0.clone(), 0); - assert_disconnected_peer(network_port, Severity::Bad("Failed to check remote call response from peer: Backend error: Test error".to_string())); + assert_disconnected_peer(network_port); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } @@ -724,7 +726,7 @@ pub mod tests { on_demand.on_connect(peer0.clone(), Roles::FULL, 1000); receive_call_response(&*on_demand, peer0, 0); - assert_disconnected_peer(network_port, Severity::Bad("Invalid remote call response from peer".to_string())); + assert_disconnected_peer(network_port); } #[test] @@ -747,7 +749,7 @@ pub mod tests { id: 0, proof: vec![vec![2]], }); - assert_disconnected_peer(network_port, Severity::Bad("Unexpected response to remote read from peer".to_string())); + assert_disconnected_peer(network_port); assert_eq!(on_demand.core.lock().pending_requests.len(), 1); } diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs index de10d3d935309f844791af68f142c46acd5beff4..a16a9783f7bee80a5f702eae3d2461896db9e863 100644 --- a/substrate/core/network/src/protocol.rs +++ b/substrate/core/network/src/protocol.rs @@ -17,7 +17,7 @@ use crossbeam_channel::{self as channel, Receiver, Sender, select}; use futures::sync::mpsc; use parking_lot::Mutex; -use network_libp2p::{PeerId, Severity}; +use network_libp2p::PeerId; use primitives::storage::StorageKey; use runtime_primitives::{generic::BlockId, ConsensusEngineId}; use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor, Zero}; @@ -36,7 +36,7 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::{cmp, num::NonZeroUsize, thread, time}; -use log::{trace, debug, warn}; +use log::{trace, debug, warn, error}; use crate::chain::Client; use client::light::fetcher::ChangesProof; use crate::{error, util::LruHashSet}; @@ -61,6 +61,24 @@ const MAX_BLOCK_DATA_RESPONSE: u32 = 128; /// and disconnect to free connection slot. const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192; +/// Reputation change when a peer is "clogged", meaning that it's not fast enough to process our +/// messages. +const CLOGGED_PEER_REPUTATION_CHANGE: i32 = -(1 << 12); +/// Reputation change when a peer doesn't respond in time to our messages. +const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 10); +/// Reputation change when a peer sends us a status message while we already received one. +const UNEXPECTED_STATUS_REPUTATION_CHANGE: i32 = -(1 << 20); +/// Reputation change when we are a light client and a peer is behind us. +const PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE: i32 = -(1 << 8); +/// Reputation change when a peer sends us an extrinsic that we didn't know about. +const NEW_EXTRINSIC_REPUTATION_CHANGE: i32 = 1 << 7; +/// Reputation change when a peer sends us a block. We don't know whether this block is valid or +/// already known to us. Since this has a small cost, we decrease the reputation of the node, and +/// will increase it back later if the import is successful. +const BLOCK_ANNOUNCE_REPUTATION_CHANGE: i32 = -(1 << 2); +/// We sent an RPC query to the given node, but it failed. +const RPC_FAILED_REPUTATION_CHANGE: i32 = -(1 << 12); + // Lock must always be taken in order declared here. pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> { status_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<ProtocolStatus<B>>>>>, @@ -139,8 +157,12 @@ pub trait Context<B: BlockT> { /// Get a reference to the client. fn client(&self) -> &crate::chain::Client<B>; - /// Point out that a peer has been malign or irresponsible or appeared lazy. - fn report_peer(&mut self, who: PeerId, reason: Severity); + /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or + /// irresponsible or appeared lazy. + fn report_peer(&mut self, who: PeerId, reputation: i32); + + /// Force disconnecting from a peer. Use this when a peer misbehaved. + fn disconnect_peer(&mut self, who: PeerId); /// Get peer info. fn peer_info(&self, peer: &PeerId) -> Option<PeerInfo<B>>; @@ -168,8 +190,12 @@ impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { } impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B, H> { - fn report_peer(&mut self, who: PeerId, reason: Severity) { - self.network_chan.send(NetworkMsg::ReportPeer(who, reason)) + fn report_peer(&mut self, who: PeerId, reputation: i32) { + self.network_chan.send(NetworkMsg::ReportPeer(who, reputation)) + } + + fn disconnect_peer(&mut self, who: PeerId) { + self.network_chan.send(NetworkMsg::DisconnectPeer(who)) } fn peer_info(&self, who: &PeerId) -> Option<PeerInfo<B>> { @@ -468,9 +494,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { if request.as_ref().map_or(false, |(_, r)| r.id == response.id) { return request.map(|(_, r)| r) } - trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id,); - let severity = Severity::Bad("Unexpected response packet received from peer".to_string()); - self.network_chan.send(NetworkMsg::ReportPeer(who, severity)) + trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id); + self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); + self.network_chan.send(NetworkMsg::DisconnectPeer(who)); } None } @@ -602,7 +628,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. pub fn on_clogged_peer(&self, who: PeerId, _msg: Option<Message<B>>) { - // We don't do anything but print some diagnostics for now. + self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE)); + + // Print some diagnostics. if let Some(peer) = self.context_data.peers.get(&who) { debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \ known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})", @@ -739,9 +767,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); for p in aborting { - let _ = self - .network_chan - .send(NetworkMsg::ReportPeer(p, Severity::Timeout)); + let _ = self.network_chan.send(NetworkMsg::DisconnectPeer(p.clone())); + let _ = self.network_chan.send(NetworkMsg::ReportPeer(p, TIMEOUT_REPUTATION_CHANGE)); } } @@ -751,31 +778,23 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { let protocol_version = { if self.context_data.peers.contains_key(&who) { debug!("Unexpected status packet from {}", who); + self.network_chan.send(NetworkMsg::ReportPeer(who, UNEXPECTED_STATUS_REPUTATION_CHANGE)); return; } if status.genesis_hash != self.genesis_hash { - let reason = format!( - "Peer is on different chain (our genesis: {} theirs: {})", - self.genesis_hash, status.genesis_hash - ); trace!( target: "protocol", "Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash ); - self.network_chan.send(NetworkMsg::ReportPeer( - who, - Severity::Bad(reason), - )); + self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); + self.network_chan.send(NetworkMsg::DisconnectPeer(who)); return; } if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { - let reason = format!("Peer using unsupported protocol version {}", status.version); trace!(target: "protocol", "Peer {:?} using unsupported protocol version {}", who, status.version); - self.network_chan.send(NetworkMsg::ReportPeer( - who, - Severity::Bad(reason), - )); + self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); + self.network_chan.send(NetworkMsg::DisconnectPeer(who)); return; } if self.config.roles & Roles::LIGHT == Roles::LIGHT { @@ -791,13 +810,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { .checked_sub(status.best_number.as_()) .unwrap_or(0); if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { - self.network_chan.send(NetworkMsg::ReportPeer( - who, - Severity::Useless( - "Peer is far behind us and will unable to serve light requests" - .to_string(), - ), - )); + debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); + self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE)); + self.network_chan.send(NetworkMsg::DisconnectPeer(who)); return; } } @@ -818,7 +833,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { peer_info }, None => { - debug!(target: "sync", "Received status from previously unconnected node {}", who); + error!(target: "sync", "Received status from previously unconnected node {}", who); return; }, }; @@ -859,6 +874,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { if let Some(hash) = self.transaction_pool.import(&t) { + self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE)); peer.known_extrinsics.insert(hash); } else { trace!(target: "sync", "Extrinsic rejected"); @@ -971,10 +987,11 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { .map(|s| s.on_block_announce(who.clone(), *header.number())); self.sync.on_block_announce( &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), - who, + who.clone(), hash, &header, ); + self.network_chan.send(NetworkMsg::ReportPeer(who, BLOCK_ANNOUNCE_REPUTATION_CHANGE)); } fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { @@ -1025,6 +1042,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> { Err(error) => { trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}", request.id, who, request.method, request.block, error); + self.network_chan.send(NetworkMsg::ReportPeer(who.clone(), RPC_FAILED_REPUTATION_CHANGE)); Default::default() } }; diff --git a/substrate/core/network/src/service.rs b/substrate/core/network/src/service.rs index 4f4d3d838a2da8ee7cdf45441f0dd0bb7ae0373c..46fba70071a95ffd2cb67385db99d79bab32979f 100644 --- a/substrate/core/network/src/service.rs +++ b/substrate/core/network/src/service.rs @@ -19,10 +19,10 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::{io, thread}; -use log::{warn, debug, error, trace, info}; +use log::{warn, debug, error, info}; use futures::{Async, Future, Stream, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; -use network_libp2p::{ProtocolId, NetworkConfiguration, Severity}; +use network_libp2p::{ProtocolId, NetworkConfiguration}; use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, ServiceEvent as NetworkServiceEvent}; use network_libp2p::{RegisteredProtocol, NetworkState}; use peerset::PeersetHandle; @@ -98,8 +98,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> { fn justification_imported(&self, who: PeerId, hash: &B::Hash, number: NumberFor<B>, success: bool) { let _ = self.protocol_sender.send(ProtocolMsg::JustificationImportResult(hash.clone(), number, success)); if !success { - let reason = Severity::Bad(format!("Invalid justification provided for #{}", hash).to_string()); - let _ = self.network_sender.send(NetworkMsg::ReportPeer(who, reason)); + info!("Invalid justification provided by {} for #{}", who, hash); + let _ = self.network_sender.send(NetworkMsg::ReportPeer(who.clone(), i32::min_value())); + let _ = self.network_sender.send(NetworkMsg::DisconnectPeer(who.clone())); } } @@ -111,16 +112,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>> Link<B> for NetworkLink<B, S> { let _ = self.protocol_sender.send(ProtocolMsg::RequestJustification(hash.clone(), number)); } - fn useless_peer(&self, who: PeerId, reason: &str) { - trace!(target:"sync", "Useless peer {}, {}", who, reason); - self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); - } - - fn note_useless_and_restart_sync(&self, who: PeerId, reason: &str) { - trace!(target:"sync", "Bad peer {}, {}", who, reason); - // is this actually malign or just useless? - self.network_sender.send(NetworkMsg::ReportPeer(who, Severity::Useless(reason.to_string()))); - let _ = self.protocol_sender.send(ProtocolMsg::RestartSync); + fn report_peer(&self, who: PeerId, reputation_change: i32) { + self.network_sender.send(NetworkMsg::ReportPeer(who, reputation_change)); } fn restart(&self) { @@ -473,8 +466,10 @@ impl<B: BlockT + 'static> NetworkPort<B> { pub enum NetworkMsg<B: BlockT + 'static> { /// Send an outgoing custom message. Outgoing(PeerId, Message<B>), - /// Report a peer. - ReportPeer(PeerId, Severity), + /// Disconnect a peer we're connected to, or do nothing if we're not connected. + DisconnectPeer(PeerId), + /// Performs a reputation adjustement on a peer. + ReportPeer(PeerId, i32), /// Synchronization response. #[cfg(any(test, feature = "test-helpers"))] Synchronized, @@ -529,29 +524,12 @@ fn run_thread<B: BlockT + 'static>( loop { match network_port.take_one_message() { Ok(None) => break, - Ok(Some(NetworkMsg::Outgoing(who, outgoing_message))) => { - network_service - .lock() - .send_custom_message(&who, outgoing_message); - }, - Ok(Some(NetworkMsg::ReportPeer(who, severity))) => { - match severity { - Severity::Bad(message) => { - info!(target: "sync", "Banning {:?} because {:?}", who, message); - network_service.lock().drop_node(&who); - // temporary: make sure the peer gets dropped from the peerset - peerset.report_peer(who, i32::min_value()); - }, - Severity::Useless(message) => { - debug!(target: "sync", "Dropping {:?} because {:?}", who, message); - network_service.lock().drop_node(&who) - }, - Severity::Timeout => { - debug!(target: "sync", "Dropping {:?} because it timed out", who); - network_service.lock().drop_node(&who) - }, - } - }, + Ok(Some(NetworkMsg::Outgoing(who, outgoing_message))) => + network_service.lock().send_custom_message(&who, outgoing_message), + Ok(Some(NetworkMsg::ReportPeer(who, reputation))) => + peerset.report_peer(who, reputation), + Ok(Some(NetworkMsg::DisconnectPeer(who))) => + network_service.lock().drop_node(&who), #[cfg(any(test, feature = "test-helpers"))] Ok(Some(NetworkMsg::Synchronized)) => {} diff --git a/substrate/core/network/src/sync.rs b/substrate/core/network/src/sync.rs index 59b2aabeed44e35f1e1e2d8871bd8e38ebf9cb28..16e8609858d9715091685780753bc409ecac7428 100644 --- a/substrate/core/network/src/sync.rs +++ b/substrate/core/network/src/sync.rs @@ -17,10 +17,10 @@ use std::cmp::max; use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; -use log::{debug, trace, warn}; +use log::{debug, trace, info, warn}; use crate::protocol::Context; use fork_tree::ForkTree; -use network_libp2p::{Severity, PeerId}; +use network_libp2p::PeerId; use client::{BlockStatus, ClientInfo}; use consensus::BlockOrigin; use consensus::import_queue::{ImportQueue, IncomingBlock}; @@ -47,6 +47,12 @@ const JUSTIFICATION_RETRY_WAIT: Duration = Duration::from_secs(10); const ANNOUNCE_HISTORY_SIZE: usize = 64; // Max number of blocks to download for unknown forks. const MAX_UNKNOWN_FORK_DOWNLOAD_LEN: u32 = 32; +/// Reputation change when a peer sent us a status message that led to a database read error. +const BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE: i32 = -(1 << 16); +/// Reputation change when a peer failed to answer our legitimate ancestry block search. +const ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE: i32 = -(1 << 9); +/// Reputation change when a peer sent us a status message with a different genesis than us. +const GENESIS_MISMATCH_REPUTATION_CHANGE: i32 = i32::min_value() + 1; #[derive(Debug)] struct PeerSync<B: BlockT> { @@ -470,16 +476,18 @@ impl<B: BlockT> ChainSync<B> { match (status, info.best_number) { (Err(e), _) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); - let reason = format!("Error legimimately reading blockchain status: {:?}", e); - protocol.report_peer(who, Severity::Useless(reason)); + protocol.report_peer(who.clone(), BLOCKCHAIN_STATUS_READ_ERROR_REPUTATION_CHANGE); + protocol.disconnect_peer(who); }, (Ok(BlockStatus::KnownBad), _) => { - let reason = format!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number); - protocol.report_peer(who, Severity::Bad(reason)); + info!("New peer with known bad best block {} ({}).", info.best_hash, info.best_number); + protocol.report_peer(who.clone(), i32::min_value()); + protocol.disconnect_peer(who); }, (Ok(BlockStatus::Unknown), b) if b == As::sa(0) => { - let reason = format!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); - protocol.report_peer(who, Severity::Bad(reason)); + info!("New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); + protocol.report_peer(who.clone(), i32::min_value()); + protocol.disconnect_peer(who); }, (Ok(BlockStatus::Unknown), _) if self.queue_blocks.len() > MAJOR_SYNC_BLOCKS => { // when actively syncing the common point moves too fast. @@ -638,13 +646,15 @@ impl<B: BlockT> ChainSync<B> { maybe_our_block_hash.map_or(false, |x| x == block.hash) }, (None, _) => { - trace!(target:"sync", "Invalid response when searching for ancestor from {}", who); - protocol.report_peer(who, Severity::Bad("Invalid response when searching for ancestor".to_string())); + debug!(target: "sync", "Invalid response when searching for ancestor from {}", who); + protocol.report_peer(who.clone(), i32::min_value()); + protocol.disconnect_peer(who); return; }, (_, Err(e)) => { - let reason = format!("Error answering legitimate blockchain query: {:?}", e); - protocol.report_peer(who, Severity::Useless(reason)); + info!("Error answering legitimate blockchain query: {:?}", e); + protocol.report_peer(who.clone(), ANCESTRY_BLOCK_ERROR_REPUTATION_CHANGE); + protocol.disconnect_peer(who); return; }, }; @@ -653,7 +663,8 @@ impl<B: BlockT> ChainSync<B> { } if !block_hash_match && num == As::sa(0) { trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", who); - protocol.report_peer(who, Severity::Bad("Ancestry search: genesis mismatch for peer".to_string())); + protocol.report_peer(who.clone(), GENESIS_MISMATCH_REPUTATION_CHANGE); + protocol.disconnect_peer(who); return; } if let Some((next_state, next_block_num)) = Self::handle_ancestor_search_state(state, num, block_hash_match) { @@ -710,13 +721,10 @@ impl<B: BlockT> ChainSync<B> { match response.blocks.into_iter().next() { Some(response) => { if hash != response.hash { - let msg = format!( - "Invalid block justification provided: requested: {:?} got: {:?}", - hash, - response.hash, - ); - - protocol.report_peer(who, Severity::Bad(msg)); + info!("Invalid block justification provided by {}: requested: {:?} got: {:?}", + who, hash, response.hash); + protocol.report_peer(who.clone(), i32::min_value()); + protocol.disconnect_peer(who); return; } diff --git a/substrate/core/network/src/test/mod.rs b/substrate/core/network/src/test/mod.rs index 0954fbaed0f949287158edf69cfc142e4e983456..2b85c198e3a14d87b2a743bf90cdf0dc79b451b8 100644 --- a/substrate/core/network/src/test/mod.rs +++ b/substrate/core/network/src/test/mod.rs @@ -157,12 +157,8 @@ impl<S: NetworkSpecialization<Block>> Link<Block> for TestLink<S> { self.link.request_justification(hash, number); } - fn useless_peer(&self, who: PeerId, reason: &str) { - self.link.useless_peer(who, reason); - } - - fn note_useless_and_restart_sync(&self, who: PeerId, reason: &str) { - self.link.note_useless_and_restart_sync(who, reason); + fn report_peer(&self, who: PeerId, reputation_change: i32) { + self.link.report_peer(who, reputation_change); } fn restart(&self) { @@ -704,6 +700,7 @@ pub trait TestNetFactory: Sized { let need_continue = self.route_single(true, None, &|msg| match *msg { NetworkMsg::Outgoing(_, crate::message::generic::Message::Status(_)) => true, NetworkMsg::Outgoing(_, _) => false, + NetworkMsg::DisconnectPeer(_) | NetworkMsg::ReportPeer(_, _) | NetworkMsg::Synchronized => true, }); if !need_continue { @@ -747,7 +744,7 @@ pub trait TestNetFactory: Sized { peers[recipient_pos].receive_message(&peer.peer_id, packet); }, - NetworkMsg::ReportPeer(who, _) => { + NetworkMsg::DisconnectPeer(who) => { if disconnect { to_disconnect.insert(who); }