diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock index b0440ecb4168974bc06c4db5296d8620a9106485..66baf41ecde442694656c82640cfa95c3c8e31d9 100644 --- a/polkadot/Cargo.lock +++ b/polkadot/Cargo.lock @@ -5365,6 +5365,7 @@ dependencies = [ "assert_matches", "env_logger 0.8.2", "futures 0.3.12", + "futures-timer 3.0.2", "log", "polkadot-node-network-protocol", "polkadot-node-primitives", diff --git a/polkadot/node/network/bridge/src/action.rs b/polkadot/node/network/bridge/src/action.rs index a1fa2750f89ce33b3ac7353e0b9631e4e70653dc..cabba1f9df1f2c106ceb463a57f0a5b15233cf0c 100644 --- a/polkadot/node/network/bridge/src/action.rs +++ b/polkadot/node/network/bridge/src/action.rs @@ -57,6 +57,9 @@ pub(crate) enum Action { /// Report a peer to the network implementation (decreasing/increasing its reputation). ReportPeer(PeerId, UnifiedReputationChange), + /// Disconnect a peer from the given peer-set without affecting their reputation. + DisconnectPeer(PeerId, PeerSet), + /// A subsystem updates us on the relay chain leaves we consider active. /// /// Implementation will send `WireMessage::ViewUpdate` message to peers as appropriate to the @@ -119,6 +122,9 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage> } Ok(FromOverseer::Communication { msg }) => match msg { NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep), + NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => { + Action::DisconnectPeer(peer, peer_set) + } NetworkBridgeMessage::SendValidationMessage(peers, msg) => { Action::SendValidationMessages(vec![(peers, msg)]) } diff --git a/polkadot/node/network/bridge/src/lib.rs b/polkadot/node/network/bridge/src/lib.rs index 1f3aa03a557aad94e30d985d37171673ce385e17..81ef905f93ecfd8ab2f385e26a8074ec7d3281d2 100644 --- a/polkadot/node/network/bridge/src/lib.rs +++ b/polkadot/node/network/bridge/src/lib.rs @@ -297,6 +297,16 @@ where bridge.network_service.report_peer(peer, rep).await? } + Action::DisconnectPeer(peer, peer_set) => { + tracing::debug!( + target: LOG_TARGET, + action = "DisconnectPeer", + ?peer, + peer_set = ?peer_set, + ); + bridge.network_service.disconnect_peer(peer, peer_set); + } + Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { tracing::debug!( target: LOG_TARGET, diff --git a/polkadot/node/network/bridge/src/network.rs b/polkadot/node/network/bridge/src/network.rs index 28a32a5d33f7dd3d54c373728e8d28157097c7b3..d85e4819de274f1993470e536a477fa208ea2165 100644 --- a/polkadot/node/network/bridge/src/network.rs +++ b/polkadot/node/network/bridge/src/network.rs @@ -90,6 +90,8 @@ where pub enum NetworkAction { /// Note a change in reputation for a peer. ReputationChange(PeerId, Rep), + /// Disconnect a peer from the given peer-set. + DisconnectPeer(PeerId, PeerSet), /// Write a notification to a given peer on the given peer-set. WriteNotification(PeerId, PeerSet, Vec<u8>), } @@ -130,6 +132,20 @@ pub trait Network: Send + 'static { .boxed() } + /// Disconnect a given peer from the peer set specified without harming reputation. + fn disconnect_peer( + &mut self, + who: PeerId, + peer_set: PeerSet, + ) -> BoxFuture<SubsystemResult<()>> { + async move { + self.action_sink() + .send(NetworkAction::DisconnectPeer(who, peer_set)) + .await + } + .boxed() + } + /// Write a notification to a peer on the given peer-set's protocol. fn write_notification( &mut self, @@ -179,6 +195,9 @@ impl Network for Arc<NetworkService<Block, Hash>> { ); self.0.report_peer(peer, cost_benefit.into_base_rep()) } + NetworkAction::DisconnectPeer(peer, peer_set) => self + .0 + .disconnect_peer(peer, peer_set.into_protocol_name()), NetworkAction::WriteNotification(peer, peer_set, message) => self .0 .write_notification(peer, peer_set.into_protocol_name(), message), diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index 9f95fb94ac3a373b357b36989d5e09116a449d15..98efa7d240301b374f4be69c2bb399d78a01e170 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" futures = "0.3.12" tracing = "0.1.25" thiserror = "1.0.23" +futures-timer = "3" polkadot-primitives = { path = "../../../primitives" } polkadot-node-network-protocol = { path = "../../network/protocol" } diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index d4c4facdf9e7da492e5cf36bae2784cd8e1e214b..8bd7995c5fc92445f9f989d5d76554bf7039c110 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -20,6 +20,8 @@ #![deny(missing_docs, unused_crate_dependencies)] #![recursion_limit="256"] +use std::time::Duration; + use futures::{channel::oneshot, FutureExt, TryFutureExt}; use thiserror::Error; @@ -60,10 +62,20 @@ enum Error { type Result<T> = std::result::Result<T, Error>; +/// A collator eviction policy - how fast to evict collators which are inactive. +#[derive(Debug, Clone, Copy)] +pub struct CollatorEvictionPolicy(pub Duration); + +impl Default for CollatorEvictionPolicy { + fn default() -> Self { + CollatorEvictionPolicy(Duration::from_secs(24)) + } +} + /// What side of the collator protocol is being engaged pub enum ProtocolSide { /// Validators operate on the relay chain. - Validator(validator_side::Metrics), + Validator(CollatorEvictionPolicy, validator_side::Metrics), /// Collators operate on a parachain. Collator(CollatorId, collator_side::Metrics), } @@ -90,8 +102,9 @@ impl CollatorProtocolSubsystem { Context: SubsystemContext<Message = CollatorProtocolMessage>, { match self.protocol_side { - ProtocolSide::Validator(metrics) => validator_side::run( + ProtocolSide::Validator(policy, metrics) => validator_side::run( ctx, + policy, metrics, ).await, ProtocolSide::Collator(id, metrics) => collator_side::run( diff --git a/polkadot/node/network/collator-protocol/src/validator_side.rs b/polkadot/node/network/collator-protocol/src/validator_side.rs index 8cec1ed99d6d2f3c94bbda0111120d5c7601159d..3678e858b3285468942ec65d9f39f3196600301a 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side.rs @@ -15,8 +15,11 @@ // along with Polkadot. If not, see <http://www.gnu.org/licenses/>. use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll}; +use std::time::{Duration, Instant}; -use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture}}; +use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture, Either}}; +use futures::StreamExt; +use futures_timer::Delay; use always_assert::never; use polkadot_primitives::v1::{ @@ -32,6 +35,7 @@ use polkadot_subsystem::{ }; use polkadot_node_network_protocol::{ OurView, PeerId, UnifiedReputationChange as Rep, View, + peer_set::PeerSet, request_response::{OutgoingRequest, Requests, request::{Recipient, RequestError}}, v1 as protocol_v1 }; use polkadot_node_network_protocol::request_response::v1::{CollationFetchingRequest, CollationFetchingResponse}; @@ -47,13 +51,19 @@ const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt"); /// Network errors that originated at the remote host should have same cost as timeout. const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error"); const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out"); -const COST_REPORT_BAD: Rep = Rep::CostMajor("A collator was reported by another subsystem"); +const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem"); const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem"); +// How often to check all peers with activity. +#[cfg(not(test))] +const ACTIVITY_POLL: Duration = Duration::from_secs(1); + +#[cfg(test)] +const ACTIVITY_POLL: Duration = Duration::from_millis(10); + #[derive(Clone, Default)] pub struct Metrics(Option<MetricsInner>); - impl Metrics { fn on_request(&self, succeeded: std::result::Result<(), ()>) { if let Some(metrics) = &self.0 { @@ -130,14 +140,42 @@ struct PerRequest { span: Option<jaeger::Span>, } +struct PeerData { + view: View, + last_active: Instant, +} + +impl PeerData { + fn new(view: View) -> Self { + PeerData { + view, + last_active: Instant::now(), + } + } + + fn note_active(&mut self) { + self.last_active = Instant::now(); + } + + fn active_since(&self, instant: Instant) -> bool { + self.last_active >= instant + } +} + +impl Default for PeerData { + fn default() -> Self { + PeerData::new(Default::default()) + } +} + /// All state relevant for the validator side of the protocol lives here. #[derive(Default)] struct State { /// Our own view. view: OurView, - /// Track all active collators and their views. - peer_views: HashMap<PeerId, View>, + /// Track all active collators and their data. + peer_data: HashMap<PeerId, PeerData>, /// Peers that have declared themselves as collators. known_collators: HashMap<PeerId, CollatorId>, @@ -263,18 +301,18 @@ async fn handle_peer_view_change( peer_id: PeerId, view: View, ) -> Result<()> { - let current = state.peer_views.entry(peer_id.clone()).or_default(); + let current = state.peer_data.entry(peer_id.clone()).or_default(); - let removed: Vec<_> = current.difference(&view).cloned().collect(); + let removed: Vec<_> = current.view.difference(&view).cloned().collect(); - *current = view; + current.view = view; if let Some(advertisements) = state.advertisements.get_mut(&peer_id) { advertisements.retain(|(_, relay_parent)| !removed.contains(relay_parent)); } for removed in removed.into_iter() { - state.requested_collations.retain(|k, _| k.0 != removed); + state.requested_collations.retain(|k, _| k.0 != removed || k.2 != peer_id); } Ok(()) @@ -385,6 +423,10 @@ where { use protocol_v1::CollatorProtocolMessage::*; + if let Some(d) = state.peer_data.get_mut(&origin) { + d.note_active(); + } + match msg { Declare(id) => { tracing::debug!( @@ -392,12 +434,39 @@ where peer_id = ?origin, "Declared as collator", ); - state.known_collators.insert(origin.clone(), id); - state.peer_views.entry(origin).or_default(); + + if state.known_collators.insert(origin.clone(), id).is_some() { + modify_reputation(ctx, origin.clone(), COST_UNEXPECTED_MESSAGE).await; + } } AdvertiseCollation(relay_parent, para_id) => { let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation")); - state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent)); + + if !state.view.contains(&relay_parent) { + tracing::debug!( + target: LOG_TARGET, + peer_id = ?origin, + %para_id, + ?relay_parent, + "Advertise collation out of view", + ); + + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + return; + } + + if !state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent)) { + tracing::debug!( + target: LOG_TARGET, + peer_id = ?origin, + %para_id, + ?relay_parent, + "Multiple collations for same relay-parent advertised", + ); + + modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await; + return; + } if let Some(collator) = state.known_collators.get(&origin) { tracing::debug!( @@ -488,13 +557,12 @@ where use NetworkBridgeEvent::*; match bridge_message { - PeerConnected(_id, _role) => { - // A peer has connected. Until it issues a `Declare` message we do not - // want to track it's view or take any other actions. + PeerConnected(peer_id, _role) => { + state.peer_data.entry(peer_id).or_default(); }, PeerDisconnected(peer_id) => { state.known_collators.remove(&peer_id); - state.peer_views.remove(&peer_id); + state.peer_data.remove(&peer_id); }, PeerViewChange(peer_id, view) => { handle_peer_view_change(state, peer_id, view).await?; @@ -573,14 +641,26 @@ where } } +// wait until next inactivity check. returns the instant for the following check. +async fn wait_until_next_check(last_poll: Instant) -> Instant { + let now = Instant::now(); + let next_poll = last_poll + ACTIVITY_POLL; + + if next_poll > now { + Delay::new(next_poll - now).await + } + + Instant::now() +} + /// The main run loop. #[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))] pub(crate) async fn run<Context>( mut ctx: Context, + eviction_policy: crate::CollatorEvictionPolicy, metrics: Metrics, - ) -> Result<()> -where - Context: SubsystemContext<Message = CollatorProtocolMessage> +) -> Result<()> + where Context: SubsystemContext<Message = CollatorProtocolMessage> { use FromOverseer::*; use OverseerSignal::*; @@ -590,23 +670,50 @@ where ..Default::default() }; + let next_inactivity_stream = futures::stream::unfold( + Instant::now() + ACTIVITY_POLL, + |next_check| async move { Some(((), wait_until_next_check(next_check).await)) } + ).fuse(); + + futures::pin_mut!(next_inactivity_stream); + loop { - if let Poll::Ready(msg) = futures::poll!(ctx.recv()) { - let msg = msg?; - tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message"); - - match msg { - Communication { msg } => process_msg(&mut ctx, msg, &mut state).await, - Signal(BlockFinalized(..)) => {} - Signal(ActiveLeaves(_)) => {} - Signal(Conclude) => { break } + let res = { + let s = futures::future::select(ctx.recv().fuse(), next_inactivity_stream.next().fuse()); + + if let Poll::Ready(res) = futures::poll!(s) { + Some(match res { + Either::Left((msg, _)) => Either::Left(msg?), + Either::Right((_, _)) => Either::Right(()), + }) + } else { + None } - continue; + }; + + match res { + Some(Either::Left(msg)) => { + tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message"); + + match msg { + Communication { msg } => process_msg(&mut ctx, msg, &mut state).await, + Signal(BlockFinalized(..)) => {} + Signal(ActiveLeaves(_)) => {} + Signal(Conclude) => { break } + } + + continue + } + Some(Either::Right(())) => { + disconnect_inactive_peers(&mut ctx, eviction_policy, &state.peer_data).await; + continue + } + None => {} } let mut retained_requested = HashSet::new(); for ((hash, para_id, peer_id), per_req) in state.requested_collations.iter_mut() { - // Despite the await, this won't block: + // Despite the await, this won't block on the response itself. let finished = poll_collation_response( &mut ctx, &state.metrics, &state.span_per_relay_parent, hash, para_id, peer_id, per_req @@ -621,6 +728,28 @@ where Ok(()) } +// This issues `NetworkBridge` notifications to disconnect from all inactive peers at the +// earliest possible point. This does not yet clean up any metadata, as that will be done upon +// receipt of the `PeerDisconnected` event. +async fn disconnect_inactive_peers( + ctx: &mut impl SubsystemContext, + eviction_policy: crate::CollatorEvictionPolicy, + peers: &HashMap<PeerId, PeerData>, +) { + let cutoff = match Instant::now().checked_sub(eviction_policy.0) { + None => return, + Some(i) => i, + }; + + for (peer, peer_data) in peers { + if !peer_data.active_since(cutoff) { + ctx.send_message( + NetworkBridgeMessage::DisconnectPeer(peer.clone(), PeerSet::Collation).into() + ).await; + } + } +} + /// Poll collation response, return immediately if there is none. /// /// Ready responses are handled, by logging and decreasing peer's reputation on error and by @@ -761,10 +890,12 @@ mod tests { use polkadot_primitives::v1::{BlockData, CollatorPair, CompressedPoV}; use polkadot_subsystem_testhelpers as test_helpers; - use polkadot_node_network_protocol::{our_view, + use polkadot_node_network_protocol::{our_view, ObservedRole, request_response::Requests }; + const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(50); + #[derive(Clone)] struct TestState { chain_ids: Vec<ParaId>, @@ -813,7 +944,11 @@ mod tests { let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone()); - let subsystem = run(context, Metrics::default()); + let subsystem = run( + context, + crate::CollatorEvictionPolicy(ACTIVITY_TIMEOUT), + Metrics::default(), + ); let test_fut = test(TestHarness { virtual_overseer }); @@ -823,7 +958,7 @@ mod tests { executor::block_on(future::select(test_fut, subsystem)); } - const TIMEOUT: Duration = Duration::from_millis(100); + const TIMEOUT: Duration = Duration::from_millis(200); async fn overseer_send( overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>, @@ -1191,4 +1326,208 @@ mod tests { assert_eq!(collation_1.0, candidate_b); }); } + + #[test] + fn inactive_disconnected() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + let pair = CollatorPair::generate().0; + tracing::trace!("activating"); + + let hash_a = test_state.relay_parent; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(our_view![hash_a]) + ) + ).await; + + + let peer_b = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare(pair.public()), + ) + ) + ).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + test_state.relay_parent, + test_state.chain_ids[0], + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + ) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(para_id, test_state.chain_ids[0]); + assert_eq!(collator, pair.public()); + }); + + Delay::new(ACTIVITY_TIMEOUT * 2).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + peer, + peer_set, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(peer_set, PeerSet::Collation); + } + ) + }); + } + + #[test] + fn activity_extends_life() { + let test_state = TestState::default(); + + test_harness(|test_harness| async move { + let TestHarness { + mut virtual_overseer, + } = test_harness; + + let pair = CollatorPair::generate().0; + tracing::trace!("activating"); + + let hash_a = test_state.relay_parent; + let hash_b = Hash::repeat_byte(1); + let hash_c = Hash::repeat_byte(2); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::OurViewChange(our_view![hash_a, hash_b, hash_c]) + ) + ).await; + + + let peer_b = PeerId::random(); + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerConnected( + peer_b.clone(), + ObservedRole::Full, + ) + ) + ).await; + + Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::Declare(pair.public()), + ) + ) + ).await; + + Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + test_state.relay_parent, + test_state.chain_ids[0], + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + ) + ) => { + assert_eq!(relay_parent, test_state.relay_parent); + assert_eq!(para_id, test_state.chain_ids[0]); + assert_eq!(collator, pair.public()); + }); + + Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdateV1( + NetworkBridgeEvent::PeerMessage( + peer_b.clone(), + protocol_v1::CollatorProtocolMessage::AdvertiseCollation( + hash_b, + test_state.chain_ids[1], + ) + ) + ) + ).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateSelection(CandidateSelectionMessage::Collation( + relay_parent, + para_id, + collator, + ) + ) => { + assert_eq!(relay_parent, hash_b); + assert_eq!(para_id, test_state.chain_ids[1]); + assert_eq!(collator, pair.public()); + }); + + Delay::new(ACTIVITY_TIMEOUT * 3 / 2).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer( + peer, + peer_set, + )) => { + assert_eq!(peer, peer_b); + assert_eq!(peer_set, PeerSet::Collation); + } + ) + }); + } } diff --git a/polkadot/node/network/protocol/src/reputation.rs b/polkadot/node/network/protocol/src/reputation.rs index 9eb706fd05cd7df233dcf316fa95da399adf39eb..774b00b6b694bee1e97f018287b2b046d8bef6d1 100644 --- a/polkadot/node/network/protocol/src/reputation.rs +++ b/polkadot/node/network/protocol/src/reputation.rs @@ -32,7 +32,7 @@ impl UnifiedReputationChange { Self::CostMajor(_) => -300_000, Self::CostMinorRepeated(_) => -200_000, Self::CostMajorRepeated(_) => -600_000, - Self::Malicious(_) => -1_000_000, + Self::Malicious(_) => i32::min_value(), Self::BenefitMajorFirst(_) => 300_000, Self::BenefitMajor(_) => 200_000, Self::BenefitMinorFirst(_) => 15_000, diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index 17b51419c618fdb7d2dc56a9c61713a383df3842..d475a390605e1170be74df363cb3d51eeff2767e 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -503,7 +503,7 @@ where collator_protocol: { let side = match is_collator { IsCollator::Yes(id) => ProtocolSide::Collator(id, Metrics::register(registry)?), - IsCollator::No => ProtocolSide::Validator(Metrics::register(registry)?), + IsCollator::No => ProtocolSide::Validator(Default::default(),Metrics::register(registry)?), }; CollatorProtocolSubsystem::new( side, diff --git a/polkadot/node/subsystem/src/messages.rs b/polkadot/node/subsystem/src/messages.rs index 0df612bd37f78eedd9302ce2332ce380a4df1c4d..b38a26a4f15752c455aaf662288dfc99e2fc87d9 100644 --- a/polkadot/node/subsystem/src/messages.rs +++ b/polkadot/node/subsystem/src/messages.rs @@ -211,6 +211,9 @@ pub enum NetworkBridgeMessage { /// Report a peer for their actions. ReportPeer(PeerId, UnifiedReputationChange), + /// Disconnect a peer from the given peer-set without affecting their reputation. + DisconnectPeer(PeerId, PeerSet), + /// Send a message to one or more peers on the validation peer-set. SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol), @@ -249,6 +252,7 @@ impl NetworkBridgeMessage { pub fn relay_parent(&self) -> Option<Hash> { match self { Self::ReportPeer(_, _) => None, + Self::DisconnectPeer(_, _) => None, Self::SendValidationMessage(_, _) => None, Self::SendCollationMessage(_, _) => None, Self::SendValidationMessages(_) => None, diff --git a/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md b/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md index 7f3d3446d7e311cb21614f3b22c68afbb07fa6d9..3f65a847357b85c8e1914b852afd3d3bda055980 100644 --- a/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md +++ b/polkadot/roadmap/implementers-guide/src/node/utility/network-bridge.md @@ -86,6 +86,10 @@ Map the message onto the corresponding [Event Handler](#event-handlers) based on - Adjust peer reputation according to cost or benefit provided +### DisconnectPeer + +- Disconnect the peer from the peer-set requested, if connected. + ### SendValidationMessage / SendValidationMessages - Issue a corresponding `ProtocolMessage` to each listed peer on the validation peer-set. diff --git a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md index 8f01459140db6faf7d50938dd7b2a5b73a7f06e2..9ffab8a551ce55d847024430c2216aa80f6f0a63 100644 --- a/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/polkadot/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -328,7 +328,9 @@ enum PeerSet { enum NetworkBridgeMessage { /// Report a cost or benefit of a peer. Negative values are costs, positive are benefits. - ReportPeer(PeerSet, PeerId, cost_benefit: i32), + ReportPeer(PeerId, cost_benefit: i32), + /// Disconnect a peer from the given peer-set without affecting their reputation. + DisconnectPeer(PeerId, PeerSet), /// Send a message to one or more peers on the validation peerset. SendValidationMessage([PeerId], ValidationProtocolV1), /// Send a message to one or more peers on the collation peerset.