Unverified Commit c61383a7 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

Evict inactive peers from the collator protocol peer-set (#2680)

* malicious reputation cost is fatal

* make ReportBad a malicious cost

* futures control-flow for cleaning up inactive collator peers

* guide: network bridge updates

* add `PeerDisconnected` message

* guide: update

* reverse order

* remember to match

* implement disconnect peer in network bridge

* implement disconnect_inactive_peers

* test

* remove println

* don't hardcore policy

* add fuse outside of loop

* use default eviction policy
parent 12d44b14
Pipeline #130269 canceled with stages
in 14 minutes and 41 seconds
......@@ -5365,6 +5365,7 @@ dependencies = [
"assert_matches",
"env_logger 0.8.2",
"futures 0.3.12",
"futures-timer 3.0.2",
"log",
"polkadot-node-network-protocol",
"polkadot-node-primitives",
......
......@@ -57,6 +57,9 @@ pub(crate) enum Action {
/// Report a peer to the network implementation (decreasing/increasing its reputation).
ReportPeer(PeerId, UnifiedReputationChange),
/// Disconnect a peer from the given peer-set without affecting their reputation.
DisconnectPeer(PeerId, PeerSet),
/// A subsystem updates us on the relay chain leaves we consider active.
///
/// Implementation will send `WireMessage::ViewUpdate` message to peers as appropriate to the
......@@ -119,6 +122,9 @@ impl From<polkadot_subsystem::SubsystemResult<FromOverseer<NetworkBridgeMessage>
}
Ok(FromOverseer::Communication { msg }) => match msg {
NetworkBridgeMessage::ReportPeer(peer, rep) => Action::ReportPeer(peer, rep),
NetworkBridgeMessage::DisconnectPeer(peer, peer_set) => {
Action::DisconnectPeer(peer, peer_set)
}
NetworkBridgeMessage::SendValidationMessage(peers, msg) => {
Action::SendValidationMessages(vec![(peers, msg)])
}
......
......@@ -297,6 +297,16 @@ where
bridge.network_service.report_peer(peer, rep).await?
}
Action::DisconnectPeer(peer, peer_set) => {
tracing::debug!(
target: LOG_TARGET,
action = "DisconnectPeer",
?peer,
peer_set = ?peer_set,
);
bridge.network_service.disconnect_peer(peer, peer_set);
}
Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => {
tracing::debug!(
target: LOG_TARGET,
......
......@@ -90,6 +90,8 @@ where
pub enum NetworkAction {
/// Note a change in reputation for a peer.
ReputationChange(PeerId, Rep),
/// Disconnect a peer from the given peer-set.
DisconnectPeer(PeerId, PeerSet),
/// Write a notification to a given peer on the given peer-set.
WriteNotification(PeerId, PeerSet, Vec<u8>),
}
......@@ -130,6 +132,20 @@ pub trait Network: Send + 'static {
.boxed()
}
/// Disconnect a given peer from the peer set specified without harming reputation.
fn disconnect_peer(
&mut self,
who: PeerId,
peer_set: PeerSet,
) -> BoxFuture<SubsystemResult<()>> {
async move {
self.action_sink()
.send(NetworkAction::DisconnectPeer(who, peer_set))
.await
}
.boxed()
}
/// Write a notification to a peer on the given peer-set's protocol.
fn write_notification(
&mut self,
......@@ -179,6 +195,9 @@ impl Network for Arc<NetworkService<Block, Hash>> {
);
self.0.report_peer(peer, cost_benefit.into_base_rep())
}
NetworkAction::DisconnectPeer(peer, peer_set) => self
.0
.disconnect_peer(peer, peer_set.into_protocol_name()),
NetworkAction::WriteNotification(peer, peer_set, message) => self
.0
.write_notification(peer, peer_set.into_protocol_name(), message),
......
......@@ -8,6 +8,7 @@ edition = "2018"
futures = "0.3.12"
tracing = "0.1.25"
thiserror = "1.0.23"
futures-timer = "3"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
......
......@@ -20,6 +20,8 @@
#![deny(missing_docs, unused_crate_dependencies)]
#![recursion_limit="256"]
use std::time::Duration;
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use thiserror::Error;
......@@ -60,10 +62,20 @@ enum Error {
type Result<T> = std::result::Result<T, Error>;
/// A collator eviction policy - how fast to evict collators which are inactive.
#[derive(Debug, Clone, Copy)]
pub struct CollatorEvictionPolicy(pub Duration);
impl Default for CollatorEvictionPolicy {
fn default() -> Self {
CollatorEvictionPolicy(Duration::from_secs(24))
}
}
/// What side of the collator protocol is being engaged
pub enum ProtocolSide {
/// Validators operate on the relay chain.
Validator(validator_side::Metrics),
Validator(CollatorEvictionPolicy, validator_side::Metrics),
/// Collators operate on a parachain.
Collator(CollatorId, collator_side::Metrics),
}
......@@ -90,8 +102,9 @@ impl CollatorProtocolSubsystem {
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
match self.protocol_side {
ProtocolSide::Validator(metrics) => validator_side::run(
ProtocolSide::Validator(policy, metrics) => validator_side::run(
ctx,
policy,
metrics,
).await,
ProtocolSide::Collator(id, metrics) => collator_side::run(
......
......@@ -15,8 +15,11 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll};
use std::time::{Duration, Instant};
use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture}};
use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture, Either}};
use futures::StreamExt;
use futures_timer::Delay;
use always_assert::never;
use polkadot_primitives::v1::{
......@@ -32,6 +35,7 @@ use polkadot_subsystem::{
};
use polkadot_node_network_protocol::{
OurView, PeerId, UnifiedReputationChange as Rep, View,
peer_set::PeerSet,
request_response::{OutgoingRequest, Requests, request::{Recipient, RequestError}}, v1 as protocol_v1
};
use polkadot_node_network_protocol::request_response::v1::{CollationFetchingRequest, CollationFetchingResponse};
......@@ -47,13 +51,19 @@ const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt");
/// Network errors that originated at the remote host should have same cost as timeout.
const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out");
const COST_REPORT_BAD: Rep = Rep::CostMajor("A collator was reported by another subsystem");
const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem");
const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem");
// How often to check all peers with activity.
#[cfg(not(test))]
const ACTIVITY_POLL: Duration = Duration::from_secs(1);
#[cfg(test)]
const ACTIVITY_POLL: Duration = Duration::from_millis(10);
#[derive(Clone, Default)]
pub struct Metrics(Option<MetricsInner>);
impl Metrics {
fn on_request(&self, succeeded: std::result::Result<(), ()>) {
if let Some(metrics) = &self.0 {
......@@ -130,14 +140,42 @@ struct PerRequest {
span: Option<jaeger::Span>,
}
struct PeerData {
view: View,
last_active: Instant,
}
impl PeerData {
fn new(view: View) -> Self {
PeerData {
view,
last_active: Instant::now(),
}
}
fn note_active(&mut self) {
self.last_active = Instant::now();
}
fn active_since(&self, instant: Instant) -> bool {
self.last_active >= instant
}
}
impl Default for PeerData {
fn default() -> Self {
PeerData::new(Default::default())
}
}
/// All state relevant for the validator side of the protocol lives here.
#[derive(Default)]
struct State {
/// Our own view.
view: OurView,
/// Track all active collators and their views.
peer_views: HashMap<PeerId, View>,
/// Track all active collators and their data.
peer_data: HashMap<PeerId, PeerData>,
/// Peers that have declared themselves as collators.
known_collators: HashMap<PeerId, CollatorId>,
......@@ -263,18 +301,18 @@ async fn handle_peer_view_change(
peer_id: PeerId,
view: View,
) -> Result<()> {
let current = state.peer_views.entry(peer_id.clone()).or_default();
let current = state.peer_data.entry(peer_id.clone()).or_default();
let removed: Vec<_> = current.difference(&view).cloned().collect();
let removed: Vec<_> = current.view.difference(&view).cloned().collect();
*current = view;
current.view = view;
if let Some(advertisements) = state.advertisements.get_mut(&peer_id) {
advertisements.retain(|(_, relay_parent)| !removed.contains(relay_parent));
}
for removed in removed.into_iter() {
state.requested_collations.retain(|k, _| k.0 != removed);
state.requested_collations.retain(|k, _| k.0 != removed || k.2 != peer_id);
}
Ok(())
......@@ -385,6 +423,10 @@ where
{
use protocol_v1::CollatorProtocolMessage::*;
if let Some(d) = state.peer_data.get_mut(&origin) {
d.note_active();
}
match msg {
Declare(id) => {
tracing::debug!(
......@@ -392,12 +434,39 @@ where
peer_id = ?origin,
"Declared as collator",
);
state.known_collators.insert(origin.clone(), id);
state.peer_views.entry(origin).or_default();
if state.known_collators.insert(origin.clone(), id).is_some() {
modify_reputation(ctx, origin.clone(), COST_UNEXPECTED_MESSAGE).await;
}
}
AdvertiseCollation(relay_parent, para_id) => {
let _span = state.span_per_relay_parent.get(&relay_parent).map(|s| s.child("advertise-collation"));
state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent));
if !state.view.contains(&relay_parent) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
%para_id,
?relay_parent,
"Advertise collation out of view",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
return;
}
if !state.advertisements.entry(origin.clone()).or_default().insert((para_id, relay_parent)) {
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
%para_id,
?relay_parent,
"Multiple collations for same relay-parent advertised",
);
modify_reputation(ctx, origin, COST_UNEXPECTED_MESSAGE).await;
return;
}
if let Some(collator) = state.known_collators.get(&origin) {
tracing::debug!(
......@@ -488,13 +557,12 @@ where
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(_id, _role) => {
// A peer has connected. Until it issues a `Declare` message we do not
// want to track it's view or take any other actions.
PeerConnected(peer_id, _role) => {
state.peer_data.entry(peer_id).or_default();
},
PeerDisconnected(peer_id) => {
state.known_collators.remove(&peer_id);
state.peer_views.remove(&peer_id);
state.peer_data.remove(&peer_id);
},
PeerViewChange(peer_id, view) => {
handle_peer_view_change(state, peer_id, view).await?;
......@@ -573,14 +641,26 @@ where
}
}
// wait until next inactivity check. returns the instant for the following check.
async fn wait_until_next_check(last_poll: Instant) -> Instant {
let now = Instant::now();
let next_poll = last_poll + ACTIVITY_POLL;
if next_poll > now {
Delay::new(next_poll - now).await
}
Instant::now()
}
/// The main run loop.
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
pub(crate) async fn run<Context>(
mut ctx: Context,
eviction_policy: crate::CollatorEvictionPolicy,
metrics: Metrics,
) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>
) -> Result<()>
where Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use FromOverseer::*;
use OverseerSignal::*;
......@@ -590,23 +670,50 @@ where
..Default::default()
};
let next_inactivity_stream = futures::stream::unfold(
Instant::now() + ACTIVITY_POLL,
|next_check| async move { Some(((), wait_until_next_check(next_check).await)) }
).fuse();
futures::pin_mut!(next_inactivity_stream);
loop {
if let Poll::Ready(msg) = futures::poll!(ctx.recv()) {
let msg = msg?;
tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
match msg {
Communication { msg } => process_msg(&mut ctx, msg, &mut state).await,
Signal(BlockFinalized(..)) => {}
Signal(ActiveLeaves(_)) => {}
Signal(Conclude) => { break }
let res = {
let s = futures::future::select(ctx.recv().fuse(), next_inactivity_stream.next().fuse());
if let Poll::Ready(res) = futures::poll!(s) {
Some(match res {
Either::Left((msg, _)) => Either::Left(msg?),
Either::Right((_, _)) => Either::Right(()),
})
} else {
None
}
continue;
};
match res {
Some(Either::Left(msg)) => {
tracing::trace!(target: LOG_TARGET, msg = ?msg, "received a message");
match msg {
Communication { msg } => process_msg(&mut ctx, msg, &mut state).await,
Signal(BlockFinalized(..)) => {}
Signal(ActiveLeaves(_)) => {}
Signal(Conclude) => { break }
}
continue
}
Some(Either::Right(())) => {
disconnect_inactive_peers(&mut ctx, eviction_policy, &state.peer_data).await;
continue
}
None => {}
}
let mut retained_requested = HashSet::new();
for ((hash, para_id, peer_id), per_req) in state.requested_collations.iter_mut() {
// Despite the await, this won't block:
// Despite the await, this won't block on the response itself.
let finished = poll_collation_response(
&mut ctx, &state.metrics, &state.span_per_relay_parent,
hash, para_id, peer_id, per_req
......@@ -621,6 +728,28 @@ where
Ok(())
}
// This issues `NetworkBridge` notifications to disconnect from all inactive peers at the
// earliest possible point. This does not yet clean up any metadata, as that will be done upon
// receipt of the `PeerDisconnected` event.
async fn disconnect_inactive_peers(
ctx: &mut impl SubsystemContext,
eviction_policy: crate::CollatorEvictionPolicy,
peers: &HashMap<PeerId, PeerData>,
) {
let cutoff = match Instant::now().checked_sub(eviction_policy.0) {
None => return,
Some(i) => i,
};
for (peer, peer_data) in peers {
if !peer_data.active_since(cutoff) {
ctx.send_message(
NetworkBridgeMessage::DisconnectPeer(peer.clone(), PeerSet::Collation).into()
).await;
}
}
}
/// Poll collation response, return immediately if there is none.
///
/// Ready responses are handled, by logging and decreasing peer's reputation on error and by
......@@ -761,10 +890,12 @@ mod tests {
use polkadot_primitives::v1::{BlockData, CollatorPair, CompressedPoV};
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::{our_view,
use polkadot_node_network_protocol::{our_view, ObservedRole,
request_response::Requests
};
const ACTIVITY_TIMEOUT: Duration = Duration::from_millis(50);
#[derive(Clone)]
struct TestState {
chain_ids: Vec<ParaId>,
......@@ -813,7 +944,11 @@ mod tests {
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = run(context, Metrics::default());
let subsystem = run(
context,
crate::CollatorEvictionPolicy(ACTIVITY_TIMEOUT),
Metrics::default(),
);
let test_fut = test(TestHarness { virtual_overseer });
......@@ -823,7 +958,7 @@ mod tests {
executor::block_on(future::select(test_fut, subsystem));
}
const TIMEOUT: Duration = Duration::from_millis(100);
const TIMEOUT: Duration = Duration::from_millis(200);
async fn overseer_send(
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
......@@ -1191,4 +1326,208 @@ mod tests {
assert_eq!(collation_1.0, candidate_b);
});
}
#[test]
fn inactive_disconnected() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
} = test_harness;
let pair = CollatorPair::generate().0;
tracing::trace!("activating");
let hash_a = test_state.relay_parent;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![hash_a])
)
).await;
let peer_b = PeerId::random();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
)
)
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(pair.public()),
)
)
).await;
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
test_state.relay_parent,
test_state.chain_ids[0],
)
)
)
).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateSelection(CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator,
)
) => {
assert_eq!(relay_parent, test_state.relay_parent);
assert_eq!(para_id, test_state.chain_ids[0]);
assert_eq!(collator, pair.public());
});
Delay::new(ACTIVITY_TIMEOUT * 2).await;
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(NetworkBridgeMessage::DisconnectPeer(
peer,
peer_set,
)) => {
assert_eq!(peer, peer_b);
assert_eq!(peer_set, PeerSet::Collation);
}
)
});
}
#[test]
fn activity_extends_life() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
} = test_harness;
let pair = CollatorPair::generate().0;
tracing::trace!("activating");
let hash_a = test_state.relay_parent;
let hash_b = Hash::repeat_byte(1);
let hash_c = Hash::repeat_byte(2);
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::OurViewChange(our_view![hash_a, hash_b, hash_c])
)
).await;
let peer_b = PeerId::random();
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
)
)
).await;
Delay::new(ACTIVITY_TIMEOUT * 2 / 3).await;
overseer_send(