Newer
Older
})
.await;
if !receive_availability.await.map_err(Error::Oneshot)? {
// If the data is not available, we disregard the dispute votes.
// This is an indication that the dispute does not correspond to any included
// candidate and that it should be ignored.
//
// We expect that if the candidate is truly disputed that the higher-level network
// code will retry.
tracing::debug!(
target: LOG_TARGET,
"Recovering availability failed - invalid import."
);
return Ok(ImportStatementsResult::InvalidImport)
metrics.on_open();
if concluded_valid {
metrics.on_concluded_valid();
}
if concluded_invalid {
metrics.on_concluded_invalid();
}
}
// Only write when updated and vote is available.
overlay_db.write_recent_disputes(recent_disputes);
}
overlay_db.write_candidate_votes(session, candidate_hash, votes.into());
Ok(ImportStatementsResult::ValidImport)
fn find_controlled_validator_indices(
keystore: &LocalKeystore,
validators: &[ValidatorId],
) -> HashSet<ValidatorIndex> {
let mut controlled = HashSet::new();
for (index, validator) in validators.iter().enumerate() {
if keystore.key_pair::<ValidatorPair>(validator).ok().flatten().is_none() {
continue
}
controlled.insert(ValidatorIndex(index as _));
}
controlled
}
async fn issue_local_statement(
ctx: &mut impl SubsystemContext,
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
state: &mut State,
candidate_hash: CandidateHash,
candidate_receipt: CandidateReceipt,
session: SessionIndex,
valid: bool,
now: Timestamp,
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
) -> Result<(), Error> {
// Load session info.
let info = match state.rolling_session_window.session_info(session) {
None => {
tracing::warn!(
target: LOG_TARGET,
session,
"Missing info for session which has an active dispute",
);
return Ok(())
},
Some(info) => info,
};
let validators = info.validators.clone();
let votes = overlay_db
.load_candidate_votes(session, &candidate_hash)?
.map(CandidateVotes::from)
.unwrap_or_else(|| CandidateVotes {
candidate_receipt: candidate_receipt.clone(),
valid: Vec::new(),
invalid: Vec::new(),
});
// Sign a statement for each validator index we control which has
// not already voted. This should generally be maximum 1 statement.
let voted_indices = votes.voted_indices();
let mut statements = Vec::new();
let voted_indices: HashSet<_> = voted_indices.into_iter().collect();
let controlled_indices = find_controlled_validator_indices(&state.keystore, &validators[..]);
for index in controlled_indices {
if voted_indices.contains(&index) {
continue
}
let keystore = state.keystore.clone() as Arc<_>;
let res = SignedDisputeStatement::sign_explicit(
&keystore,
valid,
candidate_hash,
session,
validators[index.0 as usize].clone(),
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
)
.await;
match res {
Ok(Some(signed_dispute_statement)) => {
statements.push((signed_dispute_statement, index));
},
Ok(None) => {},
Err(e) => {
tracing::error!(
target: LOG_TARGET,
err = ?e,
"Encountered keystore error while signing dispute statement",
);
},
}
}
// Get our message out:
for (statement, index) in &statements {
let dispute_message = match make_dispute_message(info, &votes, statement.clone(), *index) {
Err(err) => {
tracing::debug!(target: LOG_TARGET, ?err, "Creating dispute message failed.");
continue
},
Ok(dispute_message) => dispute_message,
};
ctx.send_message(DisputeDistributionMessage::SendDispute(dispute_message)).await;
}
// Do import
if !statements.is_empty() {
ctx,
overlay_db,
state,
candidate_hash,
MaybeCandidateReceipt::Provides(candidate_receipt),
session,
statements,
now,
Err(_) => {
tracing::error!(
target: LOG_TARGET,
?candidate_hash,
?session,
"pending confirmation receiver got dropped by `handle_import_statements` for our own votes!"
);
},
Ok(ImportStatementsResult::InvalidImport) => {
tracing::error!(
target: LOG_TARGET,
?candidate_hash,
?session,
"`handle_import_statements` considers our own votes invalid!"
);
},
Ok(ImportStatementsResult::ValidImport) => {
tracing::trace!(
target: LOG_TARGET,
?candidate_hash,
?session,
"`handle_import_statements` successfully imported our vote!"
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
}
Ok(())
}
#[derive(Debug, thiserror::Error)]
enum DisputeMessageCreationError {
#[error("There was no opposite vote available")]
NoOppositeVote,
#[error("Found vote had an invalid validator index that could not be found")]
InvalidValidatorIndex,
#[error("Statement found in votes had invalid signature.")]
InvalidStoredStatement,
#[error(transparent)]
InvalidStatementCombination(DisputeMessageCheckError),
}
fn make_dispute_message(
info: &SessionInfo,
votes: &CandidateVotes,
our_vote: SignedDisputeStatement,
our_index: ValidatorIndex,
) -> Result<DisputeMessage, DisputeMessageCreationError> {
let validators = &info.validators;
let (valid_statement, valid_index, invalid_statement, invalid_index) =
if let DisputeStatement::Valid(_) = our_vote.statement() {
let (statement_kind, validator_index, validator_signature) =
votes.invalid.get(0).ok_or(DisputeMessageCreationError::NoOppositeVote)?.clone();
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Invalid(statement_kind),
our_vote.candidate_hash().clone(),
our_vote.session_index(),
validators
.get(validator_index.0 as usize)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature,
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(our_vote, our_index, other_vote, validator_index)
} else {
let (statement_kind, validator_index, validator_signature) =
votes.valid.get(0).ok_or(DisputeMessageCreationError::NoOppositeVote)?.clone();
let other_vote = SignedDisputeStatement::new_checked(
DisputeStatement::Valid(statement_kind),
our_vote.candidate_hash().clone(),
our_vote.session_index(),
validators
.get(validator_index.0 as usize)
.ok_or(DisputeMessageCreationError::InvalidValidatorIndex)?
.clone(),
validator_signature,
)
.map_err(|()| DisputeMessageCreationError::InvalidStoredStatement)?;
(other_vote, validator_index, our_vote, our_index)
};
DisputeMessage::from_signed_statements(
valid_statement,
valid_index,
invalid_statement,
invalid_index,
votes.candidate_receipt.clone(),
info,
)
.map_err(DisputeMessageCreationError::InvalidStatementCombination)
}
/// Determine the the best block and its block number.
/// Assumes `block_descriptions` are sorted from the one
/// with the lowest `BlockNumber` to the highest.
fn determine_undisputed_chain(
overlay_db: &mut OverlayedBackend<'_, impl Backend>,
base_number: BlockNumber,
block_descriptions: Vec<BlockDescription>,
) -> Result<(BlockNumber, Hash), Error> {
let last = block_descriptions
.last()
.map(|e| (base_number + block_descriptions.len() as BlockNumber, e.block_hash))
.unwrap_or((base_number, base_hash));
// Fast path for no disputes.
let recent_disputes = match overlay_db.load_recent_disputes()? {
None => return Ok(last),
Some(a) if a.is_empty() => return Ok(last),
Some(a) => a,
};
let is_possibly_invalid = |session, candidate_hash| {
recent_disputes
.get(&(session, candidate_hash))
.map_or(false, |status| status.is_possibly_invalid())
};
for (i, BlockDescription { session, candidates, .. }) in block_descriptions.iter().enumerate() {
if candidates.iter().any(|c| is_possibly_invalid(*session, *c)) {
if i == 0 {
return Ok((base_number, base_hash))
return Ok((base_number + i as BlockNumber, block_descriptions[i - 1].block_hash))
}
}
}
Ok(last)
}