Unverified Commit c836fb85 authored by André Silva's avatar André Silva Committed by GitHub
Browse files

collator-protocol: add message authentication (#2635)

* collator: authenticate collator protocol messages

* fix tests compilation

* node: verify collator protocol signatures in tests

* collator: fix tests

* implementers-guide: update CollatorProtocol messages

* collator: add test for verification of collator protocol signatures

* node: remove fixmes

* node: remove signature from advertisecollation message

* node: add magic constant to Declare message signature payload
parent 73a146fd
Pipeline #130442 failed with stages
in 27 minutes and 2 seconds
......@@ -5461,6 +5461,7 @@ dependencies = [
"polkadot-primitives",
"sp-core",
"sp-keyring",
"sp-runtime",
"thiserror",
"tracing",
]
......
......@@ -1306,7 +1306,8 @@ mod tests {
// peer A gets reported for sending a collation message.
let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare(
Sr25519Keyring::Alice.public().into()
Sr25519Keyring::Alice.public().into(),
Default::default(),
);
let message = protocol_v1::CollationProtocol::CollatorProtocol(
......@@ -1572,7 +1573,8 @@ mod tests {
{
let collator_protocol_message = protocol_v1::CollatorProtocolMessage::Declare(
Sr25519Keyring::Alice.public().into()
Sr25519Keyring::Alice.public().into(),
Default::default(),
);
let message = protocol_v1::CollationProtocol::CollatorProtocol(
......
......@@ -5,17 +5,20 @@ authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
always-assert = "0.1.2"
futures = "0.3.12"
tracing = "0.1.25"
thiserror = "1.0.23"
futures-timer = "3"
thiserror = "1.0.23"
tracing = "0.1.25"
sp-core = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
always-assert = "0.1.2"
[dev-dependencies]
log = "0.4.13"
......
......@@ -19,11 +19,11 @@ use std::collections::{HashMap, HashSet};
use super::{LOG_TARGET, Result};
use futures::{select, FutureExt, channel::oneshot};
use sp_core::Pair;
use polkadot_primitives::v1::{
CandidateHash, CandidateReceipt, CollatorId, CompressedPoV, CoreIndex,
CoreState, Hash, Id as ParaId,
PoV, ValidatorId
CandidateHash, CandidateReceipt, CollatorPair, CompressedPoV, CoreIndex, CoreState, Hash,
Id as ParaId, PoV, ValidatorId
};
use polkadot_subsystem::{
jaeger, PerLeafSpan,
......@@ -209,10 +209,12 @@ struct Collation {
status: CollationStatus,
}
#[derive(Default)]
struct State {
/// Our id.
our_id: CollatorId,
/// Our network peer id.
local_peer_id: PeerId,
/// Our collator pair.
collator_pair: CollatorPair,
/// The para this collator is collating on.
/// Starts as `None` and is updated with every `CollateOn` message.
......@@ -250,6 +252,25 @@ struct State {
}
impl State {
/// Creates a new `State` instance with the given parameters and setting all remaining
/// state fields to their default values (i.e. empty).
fn new(local_peer_id: PeerId, collator_pair: CollatorPair, metrics: Metrics) -> State {
State {
local_peer_id,
collator_pair,
metrics,
collating_on: Default::default(),
peer_views: Default::default(),
view: Default::default(),
span_per_relay_parent: Default::default(),
collations: Default::default(),
collation_result_senders: Default::default(),
our_validators_groups: Default::default(),
declared_at: Default::default(),
connection_requests: Default::default(),
}
}
/// Returns `true` if the given `peer` is interested in the leaf that is represented by `relay_parent`.
fn peer_interested_in_leaf(&self, peer: &PeerId, relay_parent: &Hash) -> bool {
self.peer_views.get(peer).map(|v| v.contains(relay_parent)).unwrap_or(false)
......@@ -407,7 +428,12 @@ async fn declare(
state: &mut State,
peer: PeerId,
) {
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(state.our_id.clone());
let declare_signature_payload = protocol_v1::declare_signature_payload(&state.local_peer_id);
let wire_message = protocol_v1::CollatorProtocolMessage::Declare(
state.collator_pair.public(),
state.collator_pair.sign(&declare_signature_payload),
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
......@@ -490,7 +516,10 @@ async fn advertise_collation(
},
}
let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(relay_parent, collating_on);
let wire_message = protocol_v1::CollatorProtocolMessage::AdvertiseCollation(
relay_parent,
collating_on,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
......@@ -676,7 +705,7 @@ async fn handle_incoming_peer_message(
use protocol_v1::CollatorProtocolMessage::*;
match msg {
Declare(_) => {
Declare(_, _) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
......@@ -861,20 +890,17 @@ async fn handle_our_view_change(
}
/// The collator protocol collator side main loop.
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(skip(ctx, collator_pair, metrics), fields(subsystem = LOG_TARGET))]
pub(crate) async fn run(
mut ctx: impl SubsystemContext<Message = CollatorProtocolMessage>,
our_id: CollatorId,
local_peer_id: PeerId,
collator_pair: CollatorPair,
metrics: Metrics,
) -> Result<()> {
use FromOverseer::*;
use OverseerSignal::*;
let mut state = State {
metrics,
our_id,
..Default::default()
};
let mut state = State::new(local_peer_id, collator_pair, metrics);
loop {
select! {
......@@ -907,27 +933,31 @@ pub(crate) async fn run(
mod tests {
use super::*;
use std::{time::Duration, sync::Arc};
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use futures::{executor, future, Future, channel::mpsc};
use futures::{channel::mpsc, executor, future, Future};
use sp_core::{crypto::Pair, Decode};
use sp_keyring::Sr25519Keyring;
use sp_runtime::traits::AppVerify;
use polkadot_primitives::v1::{
BlockData, CandidateDescriptor, CollatorPair, ScheduledCore,
ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId,
SessionIndex, SessionInfo,
};
use polkadot_subsystem::{ActiveLeavesUpdate, messages::{RuntimeApiMessage, RuntimeApiRequest}, jaeger};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers as test_helpers;
use polkadot_node_network_protocol::{
our_view,
view,
request_response::request::IncomingRequest,
};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_primitives::v1::{
AuthorityDiscoveryId, BlockData, CandidateDescriptor, CollatorPair, GroupRotationInfo,
ScheduledCore, SessionIndex, SessionInfo, ValidatorIndex,
};
use polkadot_subsystem::{
jaeger,
messages::{RuntimeApiMessage, RuntimeApiRequest},
ActiveLeavesUpdate,
};
use polkadot_subsystem_testhelpers as test_helpers;
#[derive(Default)]
struct TestCandidateBuilder {
......@@ -961,7 +991,8 @@ mod tests {
validator_groups: (Vec<Vec<ValidatorIndex>>, GroupRotationInfo),
relay_parent: Hash,
availability_core: CoreState,
our_collator_pair: CollatorPair,
local_peer_id: PeerId,
collator_pair: CollatorPair,
session_index: SessionIndex,
}
......@@ -1008,7 +1039,8 @@ mod tests {
let relay_parent = Hash::random();
let our_collator_pair = CollatorPair::generate().0;
let local_peer_id = PeerId::random();
let collator_pair = CollatorPair::generate().0;
Self {
para_id,
......@@ -1019,7 +1051,8 @@ mod tests {
validator_groups,
relay_parent,
availability_core,
our_collator_pair,
local_peer_id,
collator_pair,
session_index: 1,
}
}
......@@ -1094,7 +1127,8 @@ mod tests {
}
fn test_harness<T: Future<Output = ()>>(
collator_id: CollatorId,
local_peer_id: PeerId,
collator_pair: CollatorPair,
test: impl FnOnce(TestHarness) -> T,
) {
let _ = env_logger::builder()
......@@ -1113,7 +1147,7 @@ mod tests {
let (context, virtual_overseer) = test_helpers::make_subsystem_context(pool.clone());
let subsystem = run(context, collator_id, Metrics::default());
let subsystem = run(context, local_peer_id, collator_pair, Metrics::default());
let test_fut = test(TestHarness { virtual_overseer });
......@@ -1355,8 +1389,12 @@ mod tests {
assert_eq!(to[0], *peer);
assert_matches!(
wire_message,
protocol_v1::CollatorProtocolMessage::Declare(collator_id) => {
assert_eq!(collator_id, test_state.our_collator_pair.public());
protocol_v1::CollatorProtocolMessage::Declare(collator_id, signature) => {
assert!(signature.verify(
&*protocol_v1::declare_signature_payload(&test_state.local_peer_id),
&collator_id),
);
assert_eq!(collator_id, test_state.collator_pair.public());
}
);
}
......@@ -1406,8 +1444,10 @@ mod tests {
#[test]
fn advertise_and_send_collation() {
let mut test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
setup_system(&mut virtual_overseer, &test_state).await;
......@@ -1520,8 +1560,10 @@ mod tests {
#[test]
fn collators_are_registered_correctly_at_validators() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.validator_peer_id[0].clone();
......@@ -1542,8 +1584,10 @@ mod tests {
#[test]
fn collations_are_only_advertised_to_validators_with_correct_view() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
......@@ -1583,8 +1627,10 @@ mod tests {
#[test]
fn collate_on_two_different_relay_chain_blocks() {
let mut test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
......@@ -1628,8 +1674,10 @@ mod tests {
#[test]
fn validator_reconnect_does_not_advertise_a_second_time() {
let test_state = TestState::default();
let local_peer_id = test_state.local_peer_id.clone();
let collator_pair = test_state.collator_pair.clone();
test_harness(test_state.our_collator_pair.public(), |test_harness| async move {
test_harness(local_peer_id, collator_pair, |test_harness| async move {
let mut virtual_overseer = test_harness.virtual_overseer;
let peer = test_state.current_group_validator_peer_ids()[0].clone();
......
......@@ -25,20 +25,13 @@ use std::time::Duration;
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use thiserror::Error;
use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange as Rep};
use polkadot_node_subsystem_util::{self as util, metrics::prometheus};
use polkadot_primitives::v1::CollatorPair;
use polkadot_subsystem::{
Subsystem, SubsystemContext, SubsystemError, SpawnedSubsystem,
errors::RuntimeApiError,
messages::{
AllMessages, CollatorProtocolMessage, NetworkBridgeMessage,
},
};
use polkadot_node_network_protocol::{
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_primitives::v1::CollatorId;
use polkadot_node_subsystem_util::{
self as util,
metrics::prometheus,
messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage},
SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError,
};
mod collator_side;
......@@ -77,7 +70,7 @@ pub enum ProtocolSide {
/// Validators operate on the relay chain.
Validator(CollatorEvictionPolicy, validator_side::Metrics),
/// Collators operate on a parachain.
Collator(CollatorId, collator_side::Metrics),
Collator(PeerId, CollatorPair, collator_side::Metrics),
}
/// The collator protocol subsystem.
......@@ -107,9 +100,10 @@ impl CollatorProtocolSubsystem {
policy,
metrics,
).await,
ProtocolSide::Collator(id, metrics) => collator_side::run(
ProtocolSide::Collator(local_peer_id, collator_pair, metrics) => collator_side::run(
ctx,
id,
local_peer_id,
collator_pair,
metrics,
).await,
}.map_err(|e| {
......
......@@ -16,34 +16,35 @@
use std::{collections::{HashMap, HashSet}, sync::Arc, task::Poll};
use std::time::{Duration, Instant};
use futures::{FutureExt, channel::oneshot, future::{Fuse, FusedFuture, BoxFuture, Either}};
use futures::StreamExt;
use futures_timer::Delay;
use always_assert::never;
use futures::{
channel::oneshot, future::{BoxFuture, Either, Fuse, FusedFuture}, FutureExt, StreamExt,
};
use futures_timer::Delay;
use polkadot_primitives::v1::{
Id as ParaId, CandidateReceipt, CollatorId, Hash, PoV,
use polkadot_node_network_protocol::{
request_response as req_res, v1 as protocol_v1,
peer_set::PeerSet,
request_response::{
request::{Recipient, RequestError},
v1::{CollationFetchingRequest, CollationFetchingResponse},
OutgoingRequest, Requests,
},
OurView, PeerId, UnifiedReputationChange as Rep, View,
};
use polkadot_node_primitives::{SignedFullStatement, Statement};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_primitives::v1::{CandidateReceipt, CollatorId, Hash, Id as ParaId, PoV};
use polkadot_subsystem::{
jaeger, PerLeafSpan,
FromOverseer, OverseerSignal, SubsystemContext,
jaeger,
messages::{
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, NetworkBridgeMessage,
NetworkBridgeEvent, IfDisconnected,
AllMessages, CandidateSelectionMessage, CollatorProtocolMessage, IfDisconnected,
NetworkBridgeEvent, NetworkBridgeMessage,
},
FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext,
};
use polkadot_node_network_protocol::{
OurView, PeerId, UnifiedReputationChange as Rep, View,
peer_set::PeerSet,
request_response::{OutgoingRequest, Requests, request::{Recipient, RequestError}}, v1 as protocol_v1
};
use polkadot_node_network_protocol::request_response::v1::{CollationFetchingRequest, CollationFetchingResponse};
use polkadot_node_network_protocol::request_response as req_res;
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_primitives::{Statement, SignedFullStatement};
use super::{modify_reputation, LOG_TARGET, Result};
use super::{modify_reputation, Result, LOG_TARGET};
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
/// Message could not be decoded properly.
......@@ -51,6 +52,7 @@ const COST_CORRUPTED_MESSAGE: Rep = Rep::CostMinor("Message was corrupt");
/// Network errors that originated at the remote host should have same cost as timeout.
const COST_NETWORK_ERROR: Rep = Rep::CostMinor("Some network error");
const COST_REQUEST_TIMED_OUT: Rep = Rep::CostMinor("A collation request has timed out");
const COST_INVALID_SIGNATURE: Rep = Rep::Malicious("Invalid network message signature");
const COST_REPORT_BAD: Rep = Rep::Malicious("A collator was reported by another subsystem");
const BENEFIT_NOTIFY_GOOD: Rep = Rep::BenefitMinor("A collator was noted good by another subsystem");
......@@ -422,13 +424,19 @@ where
Context: SubsystemContext<Message = CollatorProtocolMessage>
{
use protocol_v1::CollatorProtocolMessage::*;
use sp_runtime::traits::AppVerify;
if let Some(d) = state.peer_data.get_mut(&origin) {
d.note_active();
}
match msg {
Declare(id) => {
Declare(id, signature) => {
if !signature.verify(&*protocol_v1::declare_signature_payload(&origin), &id) {
modify_reputation(ctx, origin, COST_INVALID_SIGNATURE).await;
return;
}
tracing::debug!(
target: LOG_TARGET,
peer_id = ?origin,
......@@ -1015,7 +1023,6 @@ mod tests {
)
).await;
let peer_b = PeerId::random();
overseer_send(
......@@ -1023,7 +1030,10 @@ mod tests {
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(pair.public()),
protocol_v1::CollatorProtocolMessage::Declare(
pair.public(),
pair.sign(&protocol_v1::declare_signature_payload(&peer_b)),
)
)
)
).await;
......@@ -1083,6 +1093,7 @@ mod tests {
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
test_state.collators[0].public(),
test_state.collators[0].sign(&protocol_v1::declare_signature_payload(&peer_b)),
),
)
)
......@@ -1095,6 +1106,7 @@ mod tests {
peer_c.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
test_state.collators[1].public(),
test_state.collators[1].sign(&protocol_v1::declare_signature_payload(&peer_c)),
),
)
)
......@@ -1132,6 +1144,44 @@ mod tests {
});
}
// Test that we verify the signatures on `Declare` and `AdvertiseCollation` messages.
#[test]
fn collator_authentication_verification_works() {
let test_state = TestState::default();
test_harness(|test_harness| async move {
let TestHarness {
mut virtual_overseer,
} = test_harness;
let peer_b = PeerId::random();
// the peer sends a declare message but sign the wrong payload
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdateV1(NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
test_state.collators[0].public(),
test_state.collators[0].sign(&[42]),
),
)),
)
.await;
// it should be reported for sending a message with an invalid signature
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
) => {
assert_eq!(peer, peer_b);
assert_eq!(rep, COST_INVALID_SIGNATURE);
}
);
});
}
// A test scenario that takes the following steps
// - Two collators connect, declare themselves and advertise a collation relevant to
// our view.
......@@ -1167,6 +1217,7 @@ mod tests {
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
test_state.collators[0].public(),
test_state.collators[0].sign(&protocol_v1::declare_signature_payload(&peer_b)),
)
)
)
......@@ -1179,6 +1230,7 @@ mod tests {
peer_c.clone(),
protocol_v1::CollatorProtocolMessage::Declare(
test_state.collators[1].public(),
test_state.collators[1].sign(&protocol_v1::declare_signature_payload(&peer_c)),
)
)
)
......@@ -1366,7 +1418,10 @@ mod tests {
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(pair.public()),
protocol_v1::CollatorProtocolMessage::Declare(
pair.public(),
pair.sign(&protocol_v1::declare_signature_payload(&peer_b)),
)
)
)
).await;
......@@ -1455,7 +1510,10 @@ mod tests {
CollatorProtocolMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerMessage(
peer_b.clone(),
protocol_v1::CollatorProtocolMessage::Declare(pair.public()),
protocol_v1::CollatorProtocolMessage::Declare(
pair.public(),
pair.sign(&protocol_v1::declare_signature_payload(&peer_b)),
)
)
)
).await;
......
......@@ -288,14 +288,20 @@ impl View {
/// v1 protocol types.
pub mod v1 {
use polkadot_primitives::v1::{AvailableData, CandidateHash, CandidateIndex, CollatorId, CompressedPoV, ErasureChunk, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex};
use std::convert::TryFrom;
use parity_scale_codec::{Decode, Encode};
use super::RequestId;
use polkadot_node_primitives::{
SignedFullStatement,
approval::{IndirectAssignmentCert, IndirectSignedApprovalVote},
SignedFullStatement,
};
use polkadot_primitives::v1::{
AvailableData, CandidateHash, CandidateIndex, CollatorId, CompressedPoV,
CollatorSignature, ErasureChunk, Hash, Id as ParaId, SignedAvailabilityBitfield,
ValidatorIndex,
};
use parity_scale_codec::{Encode, Decode};