Unverified Commit cd7b3da3 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

av-store: clean up StoreAvailableData message (#3984)

* av-store: clean up StoreAvailableData message

* fmt

* use named fields
parent 9ee47759
Pipeline #160203 passed with stages
in 37 minutes and 44 seconds
......@@ -1074,19 +1074,18 @@ fn process_message(
},
}
},
AvailabilityStoreMessage::StoreAvailableData(
candidate,
_our_index,
AvailabilityStoreMessage::StoreAvailableData {
candidate_hash,
n_validators,
available_data,
tx,
) => {
} => {
subsystem.metrics.on_chunks_received(n_validators as _);
let _timer = subsystem.metrics.time_store_available_data();
let res =
store_available_data(&subsystem, candidate, n_validators as _, available_data);
store_available_data(&subsystem, candidate_hash, n_validators as _, available_data);
match res {
Ok(()) => {
......
......@@ -420,13 +420,12 @@ fn store_block_works() {
};
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
candidate_hash,
Some(validator_index),
n_validators,
available_data.clone(),
available_data: available_data.clone(),
tx,
);
};
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
assert_eq!(rx.await.unwrap(), Ok(()));
......@@ -474,13 +473,12 @@ fn store_pov_and_query_chunk_works() {
erasure::obtain_chunks_v1(n_validators as _, &available_data).unwrap();
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
candidate_hash,
None,
n_validators,
available_data,
tx,
);
};
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
......@@ -521,13 +519,12 @@ fn query_all_chunks_works() {
{
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_hash_1,
None,
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
candidate_hash: candidate_hash_1,
n_validators,
available_data,
tx,
);
};
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
assert_eq!(rx.await.unwrap(), Ok(()));
......@@ -610,13 +607,12 @@ fn stored_but_not_included_data_is_pruned() {
};
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
candidate_hash,
None,
n_validators,
available_data.clone(),
available_data: available_data.clone(),
tx,
);
};
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
......@@ -663,13 +659,12 @@ fn stored_data_kept_until_finalized() {
let block_number = 10;
let (tx, rx) = oneshot::channel();
let block_msg = AvailabilityStoreMessage::StoreAvailableData(
let block_msg = AvailabilityStoreMessage::StoreAvailableData {
candidate_hash,
None,
n_validators,
available_data.clone(),
available_data: available_data.clone(),
tx,
);
};
virtual_overseer.send(FromOverseer::Communication { msg: block_msg }).await;
......@@ -899,26 +894,24 @@ fn forkfullness_works() {
};
let (tx, rx) = oneshot::channel();
let msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_1_hash,
None,
let msg = AvailabilityStoreMessage::StoreAvailableData {
candidate_hash: candidate_1_hash,
n_validators,
available_data_1.clone(),
available_data: available_data_1.clone(),
tx,
);
};
virtual_overseer.send(FromOverseer::Communication { msg }).await;
rx.await.unwrap().unwrap();
let (tx, rx) = oneshot::channel();
let msg = AvailabilityStoreMessage::StoreAvailableData(
candidate_2_hash,
None,
let msg = AvailabilityStoreMessage::StoreAvailableData {
candidate_hash: candidate_2_hash,
n_validators,
available_data_2.clone(),
available_data: available_data_2.clone(),
tx,
);
};
virtual_overseer.send(FromOverseer::Communication { msg }).await;
......
......@@ -294,20 +294,18 @@ fn table_attested_to_backed(
async fn store_available_data(
sender: &mut JobSender<impl SubsystemSender>,
id: Option<ValidatorIndex>,
n_validators: u32,
candidate_hash: CandidateHash,
available_data: AvailableData,
) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
sender
.send_message(AvailabilityStoreMessage::StoreAvailableData(
.send_message(AvailabilityStoreMessage::StoreAvailableData {
candidate_hash,
id,
n_validators,
available_data,
tx,
))
})
.await;
let _ = rx.await.map_err(Error::StoreAvailableData)?;
......@@ -321,7 +319,6 @@ async fn store_available_data(
// This returns `Err()` iff there is an internal error. Otherwise, it returns either `Ok(Ok(()))` or `Ok(Err(_))`.
async fn make_pov_available(
sender: &mut JobSender<impl SubsystemSender>,
validator_index: Option<ValidatorIndex>,
n_validators: usize,
pov: Arc<PoV>,
candidate_hash: CandidateHash,
......@@ -347,14 +344,7 @@ async fn make_pov_available(
{
let _span = span.as_ref().map(|s| s.child("store-data").with_candidate(candidate_hash));
store_available_data(
sender,
validator_index,
n_validators as u32,
candidate_hash,
available_data,
)
.await?;
store_available_data(sender, n_validators as u32, candidate_hash, available_data).await?;
}
Ok(Ok(()))
......@@ -409,7 +399,6 @@ struct BackgroundValidationParams<S: overseer::SubsystemSender<AllMessages>, F>
candidate: CandidateReceipt,
relay_parent: Hash,
pov: PoVData,
validator_index: Option<ValidatorIndex>,
n_validators: usize,
span: Option<jaeger::Span>,
make_command: F,
......@@ -427,7 +416,6 @@ async fn validate_and_make_available(
candidate,
relay_parent,
pov,
validator_index,
n_validators,
span,
make_command,
......@@ -484,7 +472,6 @@ async fn validate_and_make_available(
} else {
let erasure_valid = make_pov_available(
&mut sender,
validator_index,
n_validators,
pov.clone(),
candidate.hash(),
......@@ -719,7 +706,6 @@ impl CandidateBackingJob {
candidate: candidate.clone(),
relay_parent: self.parent,
pov: PoVData::Ready(pov),
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
make_command: ValidatedCandidateCommand::Second,
......@@ -1033,7 +1019,6 @@ impl CandidateBackingJob {
candidate: attesting.candidate,
relay_parent: self.parent,
pov,
validator_index: self.table_context.validator.as_ref().map(|v| v.index()),
n_validators: self.table_context.validators.len(),
span,
make_command: ValidatedCandidateCommand::Attest,
......
......@@ -336,7 +336,7 @@ fn backing_second_works() {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx)
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
) if candidate_hash == candidate.hash() => {
tx.send(Ok(())).unwrap();
}
......@@ -495,7 +495,7 @@ fn backing_works() {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx)
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
) if candidate_hash == candidate_a.hash() => {
tx.send(Ok(())).unwrap();
}
......@@ -853,7 +853,7 @@ fn backing_misbehavior_works() {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx)
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
) if candidate_hash == candidate_a.hash() => {
tx.send(Ok(())).unwrap();
}
......@@ -1027,7 +1027,7 @@ fn backing_dont_second_invalid() {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreAvailableData(candidate_hash, _, _, _, tx)
AvailabilityStoreMessage::StoreAvailableData { candidate_hash, tx, .. }
) if candidate_hash == candidate_b.hash() => {
tx.send(Ok(())).unwrap();
}
......
......@@ -248,13 +248,12 @@ async fn participate(
// we dispatch a request to store the available data for the candidate. we
// want to maximize data availability for other potential checkers involved
// in the dispute
ctx.send_message(AvailabilityStoreMessage::StoreAvailableData(
ctx.send_message(AvailabilityStoreMessage::StoreAvailableData {
candidate_hash,
None,
n_validators,
available_data.clone(),
store_available_data_tx,
))
available_data: available_data.clone(),
tx: store_available_data_tx,
})
.await;
match store_available_data_rx.await? {
......
......@@ -150,13 +150,7 @@ async fn fetch_validation_code(virtual_overseer: &mut VirtualOverseer) {
async fn store_available_data(virtual_overseer: &mut VirtualOverseer, success: bool) {
assert_matches!(
virtual_overseer.recv().await,
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData(
_,
_,
_,
_,
tx,
)) => {
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreAvailableData { tx, .. }) => {
if success {
tx.send(Ok(())).unwrap();
} else {
......
......@@ -486,17 +486,19 @@ pub enum AvailabilityStoreMessage {
tx: oneshot::Sender<Result<(), ()>>,
},
/// Store a `AvailableData` in the AV store.
/// If `ValidatorIndex` is present store corresponding chunk also.
/// Store a `AvailableData` and all of its chunks in the AV store.
///
/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
StoreAvailableData(
CandidateHash,
Option<ValidatorIndex>,
u32,
AvailableData,
oneshot::Sender<Result<(), ()>>,
),
StoreAvailableData {
/// A hash of the candidate this `available_data` belongs to.
candidate_hash: CandidateHash,
/// The number of validators in the session.
n_validators: u32,
/// The `AvailableData` itself.
available_data: AvailableData,
/// Sending side of the channel to send result to.
tx: oneshot::Sender<Result<(), ()>>,
},
}
impl AvailabilityStoreMessage {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment