Unverified Commit e9a29ecc authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

More secure `Signed` implementation (#2963)

* Remove signature verification in backing.

`SignedFullStatement` now signals that the signature has already been
checked.

* Remove unused check_payload function.

* Introduced unchecked signed variants.

* Fix inclusion to use unchecked variant.

* More unchecked variants.

* Use unchecked variants in protocols.

* Start fixing statement-distribution.

* Fixup statement distribution.

* Fix inclusion.

* Fix warning.

* Fix backing properly.

* Fix bitfield distribution.

* Make crypto store optional for `RuntimeInfo`.

* Factor out utility functions.

* get_group_rotation_info

* WIP: Collator cleanup + check signatures.

* Convenience signature checking functions.

* Check signature on collator-side.

* Fix warnings.

* Fix collator side tests.

* Get rid of warnings.

* Better Signed/UncheckedSigned implementation.

Also get rid of Encode/Decode for Signed! *party*

* Get rid of dead code.

* Move Signed in its own module.

* into_checked -> try_into_checked

* Fix merge.
parent 823f811c
Pipeline #136691 canceled with stages
in 5 minutes and 35 seconds
......@@ -5582,7 +5582,6 @@ dependencies = [
"futures 0.3.14",
"log",
"maplit",
"parity-scale-codec",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
......
......@@ -195,7 +195,6 @@ const fn group_quorum(n_validators: usize) -> usize {
#[derive(Default)]
struct TableContext {
signing_context: SigningContext,
validator: Option<Validator>,
groups: HashMap<ParaId, Vec<ValidatorIndex>>,
validators: Vec<ValidatorId>,
......@@ -870,7 +869,6 @@ impl CandidateBackingJob {
.with_candidate(statement.payload().candidate_hash())
.with_relay_parent(_relay_parent);
self.check_statement_signature(&statement)?;
match self.maybe_validate_and_import(&root_span, sender, statement).await {
Err(Error::ValidationFailed(_)) => return Ok(()),
Err(e) => return Err(e),
......@@ -1028,22 +1026,6 @@ impl CandidateBackingJob {
Some(signed)
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn check_statement_signature(&self, statement: &SignedFullStatement) -> Result<(), Error> {
let idx = statement.validator_index().0 as usize;
if self.table_context.validators.len() > idx {
statement.check_signature(
&self.table_context.signing_context,
&self.table_context.validators[idx],
).map_err(|_| Error::InvalidSignature)?;
} else {
return Err(Error::InvalidSignature);
}
Ok(())
}
/// Insert or get the unbacked-span for the given candidate hash.
fn insert_or_get_unbacked_span(
&mut self,
......@@ -1204,7 +1186,6 @@ impl util::JobTrait for CandidateBackingJob {
let table_context = TableContext {
groups,
validators,
signing_context,
validator,
};
......@@ -1658,14 +1639,9 @@ mod tests {
AllMessages::StatementDistribution(
StatementDistributionMessage::Share(
parent_hash,
signed_statement,
_signed_statement,
)
) if parent_hash == test_state.relay_parent => {
signed_statement.check_signature(
&test_state.signing_context,
&test_state.validator_public[0],
).unwrap();
}
) if parent_hash == test_state.relay_parent => {}
);
assert_matches!(
......@@ -1708,11 +1684,6 @@ mod tests {
}.build();
let candidate_a_hash = candidate_a.hash();
let public0 = CryptoStore::sr25519_generate_new(
&*test_state.keystore,
ValidatorId::ID,
Some(&test_state.validators[0].to_seed()),
).await.expect("Insert key into keystore");
let public1 = CryptoStore::sr25519_generate_new(
&*test_state.keystore,
ValidatorId::ID,
......@@ -1795,10 +1766,9 @@ mod tests {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::StatementDistribution(
StatementDistributionMessage::Share(hash, stmt)
StatementDistributionMessage::Share(hash, _stmt)
) => {
assert_eq!(test_state.relay_parent, hash);
stmt.check_signature(&test_state.signing_context, &public0.into()).expect("Is signed correctly");
}
);
......@@ -2092,11 +2062,6 @@ mod tests {
signed_statement,
)
) if relay_parent == test_state.relay_parent => {
signed_statement.check_signature(
&test_state.signing_context,
&test_state.validator_public[0],
).unwrap();
assert_eq!(*signed_statement.payload(), Statement::Valid(candidate_a_hash));
}
);
......@@ -2257,11 +2222,6 @@ mod tests {
signed_statement,
)
) if parent_hash == test_state.relay_parent => {
signed_statement.check_signature(
&test_state.signing_context,
&test_state.validator_public[0],
).unwrap();
assert_eq!(*signed_statement.payload(), Statement::Seconded(candidate_b));
}
);
......@@ -2593,10 +2553,7 @@ mod tests {
use sp_core::Encode;
use std::convert::TryFrom;
let relay_parent = [1; 32].into();
let para_id = ParaId::from(10);
let session_index = 5;
let signing_context = SigningContext { parent_hash: relay_parent, session_index };
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
......@@ -2614,7 +2571,6 @@ mod tests {
};
let table_context = TableContext {
signing_context,
validator: None,
groups: validator_groups,
validators: validator_public.clone(),
......
......@@ -249,12 +249,12 @@ impl CandidateSelectionJob {
.with_relay_parent(_relay_parent);
self.handle_invalid(sender, candidate_receipt).await;
}
Some(CandidateSelectionMessage::Seconded(_relay_parent, statement)) => {
Some(CandidateSelectionMessage::Seconded(relay_parent, statement)) => {
let _span = span.child("handle-seconded")
.with_stage(jaeger::Stage::CandidateSelection)
.with_candidate(statement.payload().candidate_hash())
.with_relay_parent(_relay_parent);
self.handle_seconded(sender, statement).await;
.with_relay_parent(relay_parent);
self.handle_seconded(sender, relay_parent, statement).await;
}
None => break,
}
......@@ -345,6 +345,7 @@ impl CandidateSelectionJob {
async fn handle_seconded(
&mut self,
sender: &mut impl SubsystemSender,
relay_parent: Hash,
statement: SignedFullStatement,
) {
let received_from = match &self.seconded_candidate {
......@@ -368,7 +369,11 @@ impl CandidateSelectionJob {
.await;
sender.send_message(
CollatorProtocolMessage::NotifyCollationSeconded(received_from.clone(), statement).into()
CollatorProtocolMessage::NotifyCollationSeconded(
received_from.clone(),
relay_parent,
statement
).into()
).await;
}
}
......
......@@ -79,7 +79,7 @@ impl ParachainsInherentDataProvider {
let inherent_data = match res {
Ok(pd) => ParachainsInherentData {
bitfields: pd.bitfields,
bitfields: pd.bitfields.into_iter().map(Into::into).collect(),
backed_candidates: pd.backed_candidates,
disputes: pd.disputes,
parent_header,
......
......@@ -85,7 +85,7 @@ impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
let runtime = RuntimeInfo::new(keystore.clone());
let runtime = RuntimeInfo::new(Some(keystore.clone()));
Self { keystore, runtime, metrics }
}
......
......@@ -280,7 +280,7 @@ mod tests {
let (mut context, mut virtual_overseer) =
test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
let keystore = make_ferdie_keystore();
let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(keystore);
let mut runtime = polkadot_node_subsystem_util::runtime::RuntimeInfo::new(Some(keystore));
let (tx, rx) = oneshot::channel();
let testee = async {
......
......@@ -32,14 +32,14 @@ use futures::{
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_subsystem_util::request_availability_cores;
use polkadot_primitives::v1::{CandidateHash, CoreState, Hash, OccupiedCore};
use polkadot_node_subsystem_util::runtime::get_occupied_cores;
use polkadot_primitives::v1::{CandidateHash, Hash, OccupiedCore};
use polkadot_subsystem::{
messages::AllMessages, ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
};
use super::{error::recv_runtime, session_cache::SessionCache, LOG_TARGET, Metrics};
use crate::error::Error;
use super::{session_cache::SessionCache, LOG_TARGET, Metrics};
/// A task fetching a particular chunk.
mod fetch_task;
......@@ -125,7 +125,7 @@ impl Requester {
Context: SubsystemContext,
{
for ActivatedLeaf { hash: leaf, .. } in new_heads {
let cores = query_occupied_cores(ctx, leaf).await?;
let cores = get_occupied_cores(ctx, leaf).await?;
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
......@@ -226,25 +226,3 @@ impl Stream for Requester {
}
}
/// Query all hashes and descriptors of candidates pending availability at a particular block.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_occupied_cores<Context>(
ctx: &mut Context,
relay_parent: Hash,
) -> Result<Vec<OccupiedCore>, Error>
where
Context: SubsystemContext,
{
let cores = recv_runtime(request_availability_cores(relay_parent, ctx.sender()).await).await?;
Ok(cores
.into_iter()
.filter_map(|core_state| {
if let CoreState::Occupied(occupied) = core_state {
Some(occupied)
} else {
None
}
})
.collect())
}
......@@ -7,7 +7,6 @@ edition = "2018"
[dependencies]
futures = "0.3.12"
tracing = "0.1.25"
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
......
......@@ -22,7 +22,6 @@
#![deny(unused_crate_dependencies)]
use parity_scale_codec::{Decode, Encode};
use futures::{channel::oneshot, FutureExt};
use polkadot_subsystem::messages::*;
......@@ -49,7 +48,7 @@ const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Valid message");
/// Checked signed availability bitfield that is distributed
/// to other peers.
#[derive(Encode, Decode, Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
struct BitfieldGossipMessage {
/// The relay parent this message is relative to.
relay_parent: Hash,
......@@ -69,7 +68,7 @@ impl BitfieldGossipMessage {
{
protocol_v1::BitfieldDistributionMessage::Bitfield(
self.relay_parent,
self.signed_availability,
self.signed_availability.into(),
)
}
}
......@@ -392,19 +391,26 @@ async fn process_incoming_peer_message<Context>(
state: &mut ProtocolState,
metrics: &Metrics,
origin: PeerId,
message: BitfieldGossipMessage,
message: protocol_v1::BitfieldDistributionMessage,
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
let protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) = message;
tracing::trace!(
target: LOG_TARGET,
peer_id = %origin,
?relay_parent,
"received bitfield gossip from peer"
);
// we don't care about this, not part of our view.
if !state.view.contains(&message.relay_parent) {
if !state.view.contains(&relay_parent) {
modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
return;
}
// Ignore anything the overseer did not tell this subsystem to work on.
let mut job_data = state.per_relay_parent.get_mut(&message.relay_parent);
let mut job_data = state.per_relay_parent.get_mut(&relay_parent);
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
job_data
} else {
......@@ -412,17 +418,19 @@ where
return;
};
let validator_index = bitfield.unchecked_validator_index();
let mut _span = job_data.span
.child("msg-received")
.with_peer_id(&origin)
.with_claimed_validator_index(message.signed_availability.validator_index())
.with_claimed_validator_index(validator_index)
.with_stage(jaeger::Stage::BitfieldDistribution);
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
tracing::trace!(
target: LOG_TARGET,
relay_parent = %message.relay_parent,
relay_parent = %relay_parent,
?origin,
"Validator set is empty",
);
......@@ -433,8 +441,7 @@ where
// Use the (untrusted) validator index provided by the signed payload
// and see if that one actually signed the availability bitset.
let signing_context = job_data.signing_context.clone();
let validator_index = message.signed_availability.validator_index().0 as usize;
let validator = if let Some(validator) = validator_set.get(validator_index) {
let validator = if let Some(validator) = validator_set.get(validator_index.0 as usize) {
validator.clone()
} else {
modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
......@@ -454,7 +461,7 @@ where
} else {
tracing::trace!(
target: LOG_TARGET,
validator_index,
?validator_index,
?origin,
"Duplicate message",
);
......@@ -468,22 +475,26 @@ where
if let Some(old_message) = one_per_validator.get(&validator) {
tracing::trace!(
target: LOG_TARGET,
validator_index,
?validator_index,
"already received a message for validator",
);
if old_message.signed_availability == message.signed_availability {
if old_message.signed_availability.as_unchecked() == &bitfield {
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
}
return;
}
if message
.signed_availability
.check_signature(&signing_context, &validator)
.is_err()
{
modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await;
return;
}
let signed_availability = match bitfield.try_into_checked(&signing_context, &validator) {
Err(_) => {
modify_reputation(ctx, origin, COST_SIGNATURE_INVALID).await;
return;
},
Ok(bitfield) => bitfield,
};
let message = BitfieldGossipMessage {
relay_parent,
signed_availability,
};
metrics.on_bitfield_received();
one_per_validator.insert(validator.clone(), message.clone());
......@@ -544,23 +555,8 @@ where
);
handle_our_view_change(state, view);
}
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
tracing::trace!(
target: LOG_TARGET,
peer_id = %remote,
?relay_parent,
"received bitfield gossip from peer"
);
let gossiped_bitfield = BitfieldGossipMessage {
relay_parent,
signed_availability: bitfield,
};
process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await;
}
}
}
NetworkBridgeEvent::PeerMessage(remote, message) =>
process_incoming_peer_message(ctx, state, metrics, remote, message).await,
}
}
......
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//
//! Error handling related code and Error/Result definitions.
use polkadot_node_primitives::UncheckedSignedFullStatement;
use polkadot_subsystem::SubsystemError;
use thiserror::Error;
use polkadot_node_subsystem_util::{Fault, runtime, unwrap_non_fatal};
use crate::LOG_TARGET;
/// General result.
pub type Result<T> = std::result::Result<T, Error>;
/// Result for fatal only failures.
pub type FatalResult<T> = std::result::Result<T, Fatal>;
/// Errors for statement distribution.
#[derive(Debug, Error)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);
impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}
impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
}
impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
}
}
/// Fatal runtime errors.
#[derive(Debug, Error)]
pub enum Fatal {
/// Receiving subsystem message from overseer failed.
#[error("Receiving message from overseer failed")]
SubsystemReceive(#[source] SubsystemError),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::Fatal),
}
/// Errors for fetching of runtime information.
#[derive(Debug, Error)]
pub enum NonFatal {
/// Signature was invalid on received statement.
#[error("CollationSeconded contained statement with invalid signature.")]
InvalidStatementSignature(UncheckedSignedFullStatement),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[from] #[source] runtime::NonFatal),
}
/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them.
pub fn log_error(result: Result<()>, ctx: &'static str)
-> FatalResult<()>
{
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx)
}
Ok(())
}
......@@ -22,41 +22,25 @@
use std::time::Duration;
use futures::{channel::oneshot, FutureExt, TryFutureExt};
use thiserror::Error;
use futures::{FutureExt, TryFutureExt};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_network_protocol::{PeerId, UnifiedReputationChange as Rep};
use polkadot_node_subsystem_util::{self as util, metrics::prometheus};
use polkadot_primitives::v1::CollatorPair;
use polkadot_subsystem::{
errors::RuntimeApiError,
messages::{AllMessages, CollatorProtocolMessage, NetworkBridgeMessage},
SpawnedSubsystem, Subsystem, SubsystemContext, SubsystemError,
};
mod error;
use error::Result;
mod collator_side;
mod validator_side;
const LOG_TARGET: &'static str = "parachain::collator-protocol";
#[derive(Debug, Error)]
enum Error {
#[error(transparent)]
Subsystem(#[from] SubsystemError),
#[error(transparent)]
Oneshot(#[from] oneshot::Canceled),
#[error(transparent)]
RuntimeApi(#[from] RuntimeApiError),
#[error(transparent)]
UtilError(#[from] util::Error),
#[error(transparent)]
Prometheus(#[from] prometheus::PrometheusError),
}
type Result<T> = std::result::Result<T, Error>;
/// A collator eviction policy - how fast to evict collators which are inactive.
#[derive(Debug, Clone, Copy)]
pub struct CollatorEvictionPolicy {
......@@ -124,9 +108,7 @@ impl CollatorProtocolSubsystem {
collator_pair,
metrics,
).await,
}.map_err(|e| {
SubsystemError::with_origin("collator-protocol", e).into()
})
}
}
}
......
......@@ -47,6 +47,8 @@ use polkadot_subsystem::{
FromOverseer, OverseerSignal, PerLeafSpan, SubsystemContext, SubsystemSender,
};
use crate::error::Fatal;
use super::{modify_reputation, Result, LOG_TARGET};
const COST_UNEXPECTED_MESSAGE: Rep = Rep::CostMinor("An unexpected message");
......@@ -540,6 +542,7 @@ async fn notify_collation_seconded(
ctx: &mut impl SubsystemContext<Message = CollatorProtocolMessage>,
peer_data: &HashMap<PeerId, PeerData>,
id: CollatorId,
relay_parent: Hash,
statement: SignedFullStatement,
) {
if !matches!(statement.payload(), Statement::Seconded(_)) {
......@@ -552,7 +555,7 @@ async fn notify_collation_seconded(
}
if let Some(peer_id) = collator_peer_id(peer_data, &id) {
let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(statement);
let wire_message = protocol_v1::CollatorProtocolMessage::CollationSeconded(relay_parent, statement.into());
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
......@@ -782,7 +785,7 @@ where
}
}
}
CollationSeconded(_) => {