lib.rs 33.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Availability Recovery Subsystem of Polkadot.

#![warn(missing_docs)]

use std::collections::HashMap;
use std::time::Duration;
use std::pin::Pin;

use futures::{channel::{oneshot, mpsc}, prelude::*, stream::FuturesUnordered};
use futures_timer::Delay;
use lru::LruCache;
28
use rand::seq::SliceRandom;
29
30
31
32
33
use streamunordered::{StreamUnordered, StreamYield};

use polkadot_primitives::v1::{
	AuthorityDiscoveryId, AvailableData, CandidateReceipt, CandidateHash,
	Hash, ErasureChunk, ValidatorId, ValidatorIndex,
34
	SessionInfo, SessionIndex, BlakeTwo256, HashT, GroupIndex,
35
36
37
38
39
};
use polkadot_subsystem::{
	SubsystemContext, SubsystemResult, SubsystemError, Subsystem, SpawnedSubsystem, FromOverseer,
	OverseerSignal, ActiveLeavesUpdate,
	errors::RecoveryError,
40
	jaeger,
41
42
	messages::{
		AvailabilityStoreMessage, AvailabilityRecoveryMessage, AllMessages, NetworkBridgeMessage,
43
		NetworkBridgeEvent,
44
45
46
	},
};
use polkadot_node_network_protocol::{
47
	peer_set::PeerSet, v1 as protocol_v1, PeerId, RequestId, UnifiedReputationChange as Rep,
48
49
50
51
52
53
54
55
56
57
58
};
use polkadot_node_subsystem_util::{
	Timeout, TimeoutExt,
	request_session_info_ctx,
};
use polkadot_erasure_coding::{branches, branch_hash, recovery_threshold, obtain_chunks_v1};
mod error;

#[cfg(test)]
mod tests;

59
const LOG_TARGET: &str = "parachain::availability-recovery";
60

61
62
const COST_MERKLE_PROOF_INVALID: Rep = Rep::CostMinor("Merkle proof was invalid");
const COST_UNEXPECTED_CHUNK: Rep = Rep::CostMinor("Peer has sent an unexpected chunk");
63
const COST_INVALID_AVAILABLE_DATA: Rep = Rep::CostMinor("Peer provided invalid available data");
64
65
66
67
68
69
70
71

// How many parallel requests interaction should have going at once.
const N_PARALLEL: usize = 50;

// Size of the LRU cache where we keep recovered data.
const LRU_SIZE: usize = 16;

// A timeout for a chunk request.
72
#[cfg(not(test))]
73
74
const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);

75
76
77
78
79
80
81
82
83
84
85
86
#[cfg(test)]
const CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);

// A timeout for a full data request.
#[cfg(not(test))]
const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);

#[cfg(test)]
const FULL_DATA_REQUEST_TIMEOUT: Duration = Duration::from_millis(100);

// A period to poll and clean awaited data.
const AWAITED_CLEANUP_INTERVAL: Duration = Duration::from_secs(1);
87
88

/// The Availability Recovery Subsystem.
89
90
91
pub struct AvailabilityRecoverySubsystem {
	fast_path: bool,
}
92

93
type DataResponse<T> = (PeerId, ValidatorIndex, T);
94

95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/// Awaited data from the network.
enum Awaited {
	Chunk(AwaitedData<ErasureChunk>),
	FullData(AwaitedData<AvailableData>),
}

impl Awaited {
	fn is_canceled(&self) -> bool {
		match *self {
			Awaited::Chunk(ref c) => c.response.is_canceled(),
			Awaited::FullData(ref fd) => fd.response.is_canceled(),
		}
	}

	/// Token to cancel the connection request to the validator.
	fn token(&self) -> usize {
		match *self {
			Awaited::Chunk(ref c) => c.token,
			Awaited::FullData(ref fd) => fd.token,
		}
	}
}

/// Data we keep around for network data that we are awaiting.
struct AwaitedData<T> {
120
121
122
123
124
125
126
127
128
129
	/// Index of the validator we have requested this chunk from.
	validator_index: ValidatorIndex,

	/// The hash of the candidate the chunks belongs to.
	candidate_hash: CandidateHash,

	/// Token to cancel the connection request to the validator.
	token: usize,

	/// Result sender.
130
	response: oneshot::Sender<DataResponse<T>>,
131
132
133
134
135
136
137
138
139
140
141
142
143
144
}

/// Accumulate all awaiting sides for some particular `AvailableData`.
struct InteractionHandle {
	awaiting: Vec<oneshot::Sender<Result<AvailableData, RecoveryError>>>,
}

/// A message received by main code from an async `Interaction` task.
#[derive(Debug)]
enum FromInteraction {
	/// An interaction concluded.
	Concluded(CandidateHash, Result<AvailableData, RecoveryError>),

	/// Make a request of a particular chunk from a particular validator.
145
146
147
148
149
150
151
152
153
	MakeChunkRequest(
		AuthorityDiscoveryId,
		CandidateHash,
		ValidatorIndex,
		oneshot::Sender<DataResponse<ErasureChunk>>,
	),

	/// Make a request of the full data from a particular validator.
	MakeFullDataRequest(
154
155
156
		AuthorityDiscoveryId,
		CandidateHash,
		ValidatorIndex,
157
		oneshot::Sender<DataResponse<AvailableData>>,
158
159
160
161
162
163
164
165
166
	),

	/// Report a peer.
	ReportPeer(
		PeerId,
		Rep,
	),
}

167
168
169
170
171
172
173
174
175
176
177
178
179
struct RequestFromBackersPhase {
	// a random shuffling of the validators from the backing group which indicates the order
	// in which we connect to them and request the chunk.
	shuffled_backers: Vec<ValidatorIndex>,
}

struct RequestChunksPhase {
	// a random shuffling of the validators which indicates the order in which we connect to the validators and
	// request the chunk from them.
	shuffling: Vec<ValidatorIndex>,
	received_chunks: HashMap<ValidatorIndex, ErasureChunk>,
	requesting_chunks: FuturesUnordered<Timeout<oneshot::Receiver<DataResponse<ErasureChunk>>>>,
}
180

181
struct InteractionParams {
182
183
184
185
186
187
188
189
190
191
192
193
194
195
	/// Discovery ids of `validators`.
	validator_authority_keys: Vec<AuthorityDiscoveryId>,

	/// Validators relevant to this `Interaction`.
	validators: Vec<ValidatorId>,

	/// The number of pieces needed.
	threshold: usize,

	/// A hash of the relevant candidate.
	candidate_hash: CandidateHash,

	/// The root of the erasure encoding of the para block.
	erasure_root: Hash,
196
197
198
199
200
201
202
203
204
205
206
}

enum InteractionPhase {
	RequestFromBackers(RequestFromBackersPhase),
	RequestChunks(RequestChunksPhase),
}

/// A state of a single interaction reconstructing an available data.
struct Interaction {
	/// A communication channel with the `State`.
	to_state: mpsc::Sender<FromInteraction>,
207

208
209
	/// The parameters of the interaction.
	params: InteractionParams,
210

211
212
	/// The phase of the interaction.
	phase: InteractionPhase,
213
214
}

215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
impl RequestFromBackersPhase {
	fn new(mut backers: Vec<ValidatorIndex>) -> Self {
		backers.shuffle(&mut rand::thread_rng());

		RequestFromBackersPhase {
			shuffled_backers: backers,
		}
	}

	// Run this phase to completion, returning `true` if data was successfully recovered and
	// false otherwise.
	async fn run(
		&mut self,
		params: &InteractionParams,
		to_state: &mut mpsc::Sender<FromInteraction>
	) -> Result<bool, mpsc::SendError> {
		loop {
			// Pop the next backer, and proceed to next phase if we're out.
			let validator_index = match self.shuffled_backers.pop() {
				None => return Ok(false),
				Some(i) => i,
			};

			let (tx, rx) = oneshot::channel();

			// Request data.
			to_state.send(FromInteraction::MakeFullDataRequest(
242
				params.validator_authority_keys[validator_index.0 as usize].clone(),
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
				params.candidate_hash.clone(),
				validator_index,
				tx,
			)).await?;

			match rx.timeout(FULL_DATA_REQUEST_TIMEOUT).await {
				Some(Ok((peer_id, _validator_index, data))) => {
					if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
						to_state.send(
							FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
						).await?;

						return Ok(true);
					} else {
						to_state.send(FromInteraction::ReportPeer(
							peer_id.clone(),
							COST_INVALID_AVAILABLE_DATA,
						)).await?;
					}
				}
				Some(Err(e)) => {
					tracing::debug!(
						target: LOG_TARGET,
						err = ?e,
						"A response channel was cancelled while waiting for full data",
					);
				}
				None => {
					tracing::debug!(
						target: LOG_TARGET,
						"A full data request has timed out",
					);
				}
			}
		}
	}
279
280
}

281
impl RequestChunksPhase {
282
283
	fn new(n_validators: u32) -> Self {
		let mut shuffling: Vec<_> = (0..n_validators).map(ValidatorIndex).collect();
284
285
286
287
288
289
290
291
292
293
294
295
296
297
		shuffling.shuffle(&mut rand::thread_rng());

		RequestChunksPhase {
			shuffling,
			received_chunks: HashMap::new(),
			requesting_chunks: FuturesUnordered::new(),
		}
	}

	async fn launch_parallel_requests(
		&mut self,
		params: &InteractionParams,
		to_state: &mut mpsc::Sender<FromInteraction>,
	) -> Result<(), mpsc::SendError> {
298
299
300
301
		while self.requesting_chunks.len() < N_PARALLEL {
			if let Some(validator_index) = self.shuffling.pop() {
				let (tx, rx) = oneshot::channel();

302
				to_state.send(FromInteraction::MakeChunkRequest(
303
					params.validator_authority_keys[validator_index.0 as usize].clone(),
304
					params.candidate_hash.clone(),
305
306
					validator_index,
					tx,
307
				)).await?;
308
309
310
311
312
313
314
315
316
317

				self.requesting_chunks.push(rx.timeout(CHUNK_REQUEST_TIMEOUT));
			} else {
				break;
			}
		}

		Ok(())
	}

318
319
320
321
322
	async fn wait_for_chunks(
		&mut self,
		params: &InteractionParams,
		to_state: &mut mpsc::Sender<FromInteraction>,
	) -> Result<(), mpsc::SendError> {
323
324
325
326
327
328
329
330
		// Check if the requesting chunks is not empty not to poll to completion.
		if self.requesting_chunks.is_empty() {
			return Ok(());
		}

		// Poll for new updates from requesting_chunks.
		while let Some(request_result) = self.requesting_chunks.next().await {
			match request_result {
331
				Some(Ok((peer_id, validator_index, chunk))) => {
332
333
					// Check merkle proofs of any received chunks, and any failures should
					// lead to issuance of a FromInteraction::ReportPeer message.
334
335
336
337
338
339
340
341
342
343
344
345
346

					// We need to check that the validator index matches the chunk index and
					// not blindly trust the data from an untrusted peer.
					if validator_index != chunk.index {
						to_state.send(FromInteraction::ReportPeer(
							peer_id.clone(),
							COST_MERKLE_PROOF_INVALID,
						)).await?;

						continue;
					}


347
					if let Ok(anticipated_hash) = branch_hash(
348
						&params.erasure_root,
349
						&chunk.proof,
350
						chunk.index.0 as usize,
351
352
353
354
					) {
						let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);

						if erasure_chunk_hash != anticipated_hash {
355
							to_state.send(FromInteraction::ReportPeer(
356
357
								peer_id.clone(),
								COST_MERKLE_PROOF_INVALID,
358
359
360
361
362
363
364
365
366
							)).await?;
						} else {
							self.received_chunks.insert(validator_index, chunk);
						}
					} else {
						to_state.send(FromInteraction::ReportPeer(
							peer_id.clone(),
							COST_MERKLE_PROOF_INVALID,
						)).await?;
367
368
369
370
371
372
					}
				}
				Some(Err(e)) => {
					tracing::debug!(
						target: LOG_TARGET,
						err = ?e,
373
						"A response channel was cancelled while waiting for a chunk",
374
375
376
377
378
379
380
381
382
383
384
385
386
387
					);
				}
				None => {
					tracing::debug!(
						target: LOG_TARGET,
						"A chunk request has timed out",
					);
				}
			}
		}

		Ok(())
	}

388
389
390
391
392
	async fn run(
		&mut self,
		params: &InteractionParams,
		to_state: &mut mpsc::Sender<FromInteraction>,
	) -> Result<(), mpsc::SendError> {
393
394
395
396
397
		loop {
			if is_unavailable(
				self.received_chunks.len(),
				self.requesting_chunks.len(),
				self.shuffling.len(),
398
				params.threshold,
399
			) {
400
401
				to_state.send(FromInteraction::Concluded(
					params.candidate_hash,
402
					Err(RecoveryError::Unavailable),
403
				)).await?;
404
405
406
407

				return Ok(());
			}

408
409
			self.launch_parallel_requests(params, to_state).await?;
			self.wait_for_chunks(params, to_state).await?;
410
411
412
413
414

			// If received_chunks has more than threshold entries, attempt to recover the data.
			// If that fails, or a re-encoding of it doesn't match the expected erasure root,
			// break and issue a FromInteraction::Concluded(RecoveryError::Invalid).
			// Otherwise, issue a FromInteraction::Concluded(Ok(())).
415
			if self.received_chunks.len() >= params.threshold {
416
				let concluded = match polkadot_erasure_coding::reconstruct_v1(
417
					params.validators.len(),
418
					self.received_chunks.values().map(|c| (&c.chunk[..], c.index.0 as usize)),
419
420
				) {
					Ok(data) => {
421
422
						if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
							FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
423
424
						} else {
							FromInteraction::Concluded(
425
								params.candidate_hash.clone(),
426
427
428
429
430
								Err(RecoveryError::Invalid),
							)
						}
					}
					Err(_) => FromInteraction::Concluded(
431
						params.candidate_hash.clone(),
432
433
434
435
						Err(RecoveryError::Invalid),
					),
				};

436
				to_state.send(concluded).await?;
437
438
439
440
441
442
				return Ok(());
			}
		}
	}
}

443
444
445
446
447
448
449
450
451
const fn is_unavailable(
	received_chunks: usize,
	requesting_chunks: usize,
	n_validators: usize,
	threshold: usize,
) -> bool {
	received_chunks + requesting_chunks + n_validators < threshold
}

452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
fn reconstructed_data_matches_root(
	n_validators: usize,
	expected_root: &Hash,
	data: &AvailableData,
) -> bool {
	let chunks = match obtain_chunks_v1(n_validators, data) {
		Ok(chunks) => chunks,
		Err(e) => {
			tracing::debug!(
				target: LOG_TARGET,
				err = ?e,
				"Failed to obtain chunks",
			);
			return false;
		}
	};

	let branches = branches(&chunks);

	branches.root() == *expected_root
}

474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
impl Interaction {
	async fn run(mut self) -> error::Result<()> {
		loop {
			// These only fail if we cannot reach the underlying subsystem, which case there is nothing
			// meaningful we can do.
			match self.phase {
				InteractionPhase::RequestFromBackers(ref mut from_backers) => {
					if from_backers.run(&self.params, &mut self.to_state).await
						.map_err(error::Error::ClosedToState)?
					{
						break Ok(())
					} else {
						self.phase = InteractionPhase::RequestChunks(
							RequestChunksPhase::new(self.params.validators.len() as _)
						);
					}
				}
				InteractionPhase::RequestChunks(ref mut from_all) => {
					break from_all.run(&self.params, &mut self.to_state).await
						.map_err(error::Error::ClosedToState)
				}
			}
		}
	}
}

500
501
502
503
504
505
506
507
508
struct State {
	/// Each interaction is implemented as its own async task,
	/// and these handles are for communicating with them.
	interactions: HashMap<CandidateHash, InteractionHandle>,

	/// A recent block hash for which state should be available.
	live_block_hash: Hash,

	/// We are waiting for these validators to connect and as soon as they
509
510
	/// do, request the needed data we are waiting for.
	discovering_validators: HashMap<AuthorityDiscoveryId, Vec<Awaited>>,
511
512

	/// Requests that we have issued to the already connected validators
513
514
	/// about the data we are interested in.
	live_requests: HashMap<RequestId, (PeerId, Awaited)>,
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540

	/// Derive request ids from this.
	next_request_id: RequestId,

	connecting_validators: StreamUnordered<mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>>,

	/// interaction communication. This is cloned and given to interactions that are spun up.
	from_interaction_tx: mpsc::Sender<FromInteraction>,

	/// receiver for messages from interactions.
	from_interaction_rx: mpsc::Receiver<FromInteraction>,

	/// An LRU cache of recently recovered data.
	availability_lru: LruCache<CandidateHash, Result<AvailableData, RecoveryError>>,
}

impl Default for State {
	fn default() -> Self {
		let (from_interaction_tx, from_interaction_rx) = mpsc::channel(16);

		Self {
			from_interaction_tx,
			from_interaction_rx,
			interactions: HashMap::new(),
			live_block_hash: Hash::default(),
			discovering_validators: HashMap::new(),
541
			live_requests: HashMap::new(),
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
			next_request_id: 0,
			connecting_validators: StreamUnordered::new(),
			availability_lru: LruCache::new(LRU_SIZE),
		}
	}
}

impl<C> Subsystem<C> for AvailabilityRecoverySubsystem
	where C: SubsystemContext<Message = AvailabilityRecoveryMessage>
{
	fn start(self, ctx: C) -> SpawnedSubsystem {
		let future = self.run(ctx)
			.map_err(|e| SubsystemError::with_origin("availability-recovery", e))
			.boxed();
		SpawnedSubsystem {
			name: "availability-recovery-subsystem",
			future,
		}
	}
}

/// Handles a signal from the overseer.
async fn handle_signal(
	state: &mut State,
	signal: OverseerSignal,
) -> SubsystemResult<bool> {
	match signal {
		OverseerSignal::Conclude => Ok(true),
		OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
			// if activated is non-empty, set state.live_block_hash to the first block in Activated.
			if let Some(hash) = activated.get(0) {
				state.live_block_hash = hash.0;
			}

			Ok(false)
		}
578
		OverseerSignal::BlockFinalized(_, _) => Ok(false)
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
	}
}

/// Report a reputation change for a peer.
async fn report_peer(
	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
	peer: PeerId,
	rep: Rep,
) {
	ctx.send_message(AllMessages::NetworkBridge(NetworkBridgeMessage::ReportPeer(peer, rep))).await;
}

/// Machinery around launching interactions into the background.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn launch_interaction(
	state: &mut State,
	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
	session_index: SessionIndex,
	session_info: SessionInfo,
	receipt: CandidateReceipt,
599
	backing_group: Option<GroupIndex>,
600
601
602
603
	response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
) -> error::Result<()> {
	let to_state = state.from_interaction_tx.clone();

604
	let candidate_hash = receipt.hash();
605
606
607
608
609
610
611
	state.interactions.insert(
		candidate_hash.clone(),
		InteractionHandle {
			awaiting: vec![response_sender],
		}
	);

612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
	let params = InteractionParams {
		validator_authority_keys: session_info.discovery_keys.clone(),
		validators: session_info.validators.clone(),
		threshold: recovery_threshold(session_info.validators.len())?,
		candidate_hash,
		erasure_root: receipt.descriptor.erasure_root,
	};

	let phase = backing_group
		.and_then(|g| session_info.validator_groups.get(g.0 as usize))
		.map(|group| InteractionPhase::RequestFromBackers(
			RequestFromBackersPhase::new(group.clone())
		))
		.unwrap_or_else(|| InteractionPhase::RequestChunks(
			RequestChunksPhase::new(params.validators.len() as _)
		));
628
629
630

	let interaction = Interaction {
		to_state,
631
632
		params,
		phase,
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
	};

	let future = async move {
		if let Err(e) = interaction.run().await {
			tracing::debug!(
				target: LOG_TARGET,
				err = ?e,
				"Interaction finished with an error",
			);
		}
	}.boxed();

	if let Err(e) = ctx.spawn("recovery interaction", future).await {
		tracing::warn!(
			target: LOG_TARGET,
			err = ?e,
			"Failed to spawn a recovery interaction task",
		);
	}

	Ok(())
}

/// Handles an availability recovery request.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_recover(
	state: &mut State,
	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
	receipt: CandidateReceipt,
	session_index: SessionIndex,
663
	backing_group: Option<GroupIndex>,
664
665
666
667
	response_sender: oneshot::Sender<Result<AvailableData, RecoveryError>>,
) -> error::Result<()> {
	let candidate_hash = receipt.hash();

668
669
670
	let mut span = jaeger::candidate_hash_span(&candidate_hash, "availbility-recovery");
	span.add_stage(jaeger::Stage::AvailabilityRecovery);

671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
	if let Some(result) = state.availability_lru.get(&candidate_hash) {
		if let Err(e) = response_sender.send(result.clone()) {
			tracing::warn!(
				target: LOG_TARGET,
				err = ?e,
				"Error responding with an availability recovery result",
			);
		}
		return Ok(());
	}

	if let Some(interaction) = state.interactions.get_mut(&candidate_hash) {
		interaction.awaiting.push(response_sender);
		return Ok(());
	}

687
	let _span = span.child("not-cached");
688
689
690
691
692
693
	let session_info = request_session_info_ctx(
		state.live_block_hash,
		session_index,
		ctx,
	).await?.await.map_err(error::Error::CanceledSessionInfo)??;

694
	let _span = span.child("session-info-ctx-received");
695
696
697
698
699
700
701
702
	match session_info {
		Some(session_info) => {
			launch_interaction(
				state,
				ctx,
				session_index,
				session_info,
				receipt,
703
				backing_group,
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
				response_sender,
			).await
		}
		None => {
			tracing::warn!(
				target: LOG_TARGET,
				"SessionInfo is `None` at {}", state.live_block_hash,
			);
			response_sender
				.send(Err(RecoveryError::Unavailable))
				.map_err(|_| error::Error::CanceledResponseSender)?;
			Ok(())
		}
	}
}

/// Queries a chunk from av-store.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_chunk(
	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
	candidate_hash: CandidateHash,
	validator_index: ValidatorIndex,
) -> error::Result<Option<ErasureChunk>> {
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::AvailabilityStore(
		AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
	)).await;

	Ok(rx.await.map_err(error::Error::CanceledQueryChunk)?)
}

735
736
737
738
739
740
741
742
743
744
745
746
747
748
/// Queries a chunk from av-store.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn query_full_data(
	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
	candidate_hash: CandidateHash,
) -> error::Result<Option<AvailableData>> {
	let (tx, rx) = oneshot::channel();
	ctx.send_message(AllMessages::AvailabilityStore(
		AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx),
	)).await;

	Ok(rx.await.map_err(error::Error::CanceledQueryFullData)?)
}

749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
/// Handles message from interaction.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_from_interaction(
	state: &mut State,
	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
	from_interaction: FromInteraction,
) -> error::Result<()> {
	match from_interaction {
		FromInteraction::Concluded(candidate_hash, result) => {
			// Load the entry from the interactions map.
			// It should always exist, if not for logic errors.
			if let Some(interaction) = state.interactions.remove(&candidate_hash) {
				// Send the result to each member of awaiting.
				for awaiting in interaction.awaiting {
					if let Err(_) = awaiting.send(result.clone()) {
						tracing::debug!(
							target: LOG_TARGET,
							"An awaiting side of the interaction has been canceled",
						);
					}
				}
			} else {
				tracing::warn!(
					target: LOG_TARGET,
					"Interaction under candidate hash {} is missing",
					candidate_hash,
				);
			}

			state.availability_lru.put(candidate_hash, result);
		}
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
		FromInteraction::MakeChunkRequest(id, candidate_hash, validator_index, response) => {
			let (tx, rx) = mpsc::channel(2);

			let message = NetworkBridgeMessage::ConnectToValidators {
				validator_ids: vec![id.clone()],
				peer_set: PeerSet::Validation,
				connected: tx,
			};

			ctx.send_message(AllMessages::NetworkBridge(message)).await;

			let token = state.connecting_validators.push(rx);

			state.discovering_validators.entry(id).or_default().push(Awaited::Chunk(AwaitedData {
				validator_index,
				candidate_hash,
				token,
				response,
			}));
		}
		FromInteraction::MakeFullDataRequest(id, candidate_hash, validator_index, response) => {
801
802
803
804
			let (tx, rx) = mpsc::channel(2);

			let message = NetworkBridgeMessage::ConnectToValidators {
				validator_ids: vec![id.clone()],
805
				peer_set: PeerSet::Validation,
806
807
808
809
810
811
812
				connected: tx,
			};

			ctx.send_message(AllMessages::NetworkBridge(message)).await;

			let token = state.connecting_validators.push(rx);

813
814
			println!("pushing full data request");
			state.discovering_validators.entry(id).or_default().push(Awaited::FullData(AwaitedData {
815
816
817
818
				validator_index,
				candidate_hash,
				token,
				response,
819
			}));
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
		}
		FromInteraction::ReportPeer(peer_id, rep) => {
			report_peer(ctx, peer_id, rep).await;
		}
	}

	Ok(())
}

/// Handles a network bridge update.
#[tracing::instrument(level = "trace", skip(ctx, state), fields(subsystem = LOG_TARGET))]
async fn handle_network_update(
	state: &mut State,
	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
	update: NetworkBridgeEvent<protocol_v1::AvailabilityRecoveryMessage>,
) -> error::Result<()> {
	match update {
		NetworkBridgeEvent::PeerMessage(peer, message) => {
			match message {
				protocol_v1::AvailabilityRecoveryMessage::RequestChunk(
					request_id,
					candidate_hash,
					validator_index,
				) => {
					// Issue a
					// AvailabilityStore::QueryChunk(candidate-hash, validator_index, response)
					// message.
					let chunk = query_chunk(ctx, candidate_hash, validator_index).await?;

849
850
851
852
853
854
					tracing::trace!(
						target: LOG_TARGET,
						"Responding({}) to chunk request req_id={} candidate={} index={}",
						chunk.is_some(),
						request_id,
						candidate_hash,
855
						validator_index.0,
856
857
					);

858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
					// Whatever the result, issue an
					// AvailabilityRecoveryV1Message::Chunk(r_id, response) message.
					let wire_message = protocol_v1::AvailabilityRecoveryMessage::Chunk(
						request_id,
						chunk,
					);

					ctx.send_message(AllMessages::NetworkBridge(
						NetworkBridgeMessage::SendValidationMessage(
							vec![peer],
							protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
						),
					)).await;
				}
				protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => {
873
					match state.live_requests.remove(&request_id) {
874
875
876
877
						None => {
							// If there doesn't exist one, report the peer and return.
							report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
						}
878
						Some((peer_id, Awaited::Chunk(awaited_chunk))) if peer_id == peer => {
879
880
881
882
883
884
							tracing::trace!(
								target: LOG_TARGET,
								"Received chunk response({}) req_id={} candidate={} index={}",
								chunk.is_some(),
								request_id,
								awaited_chunk.candidate_hash,
885
								awaited_chunk.validator_index.0,
886
887
							);

888
889
890
							// If there exists an entry under r_id, remove it.
							// Send the chunk response on the awaited_chunk for the interaction to handle.
							if let Some(chunk) = chunk {
891
892
893
								if awaited_chunk.response.send(
									(peer_id, awaited_chunk.validator_index, chunk)
								).is_err() {
894
895
896
897
898
899
900
901
902
903
									tracing::debug!(
										target: LOG_TARGET,
										"A sending side of the recovery request is closed",
									);
								}
							}
						}
						Some(a) => {
							// If the peer in the entry doesn't match the sending peer,
							// reinstate the entry, report the peer, and return
904
							state.live_requests.insert(request_id, a);
905
906
907
908
							report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
						}
					}
				}
909
910
911
912
913
914
915
916
917
				protocol_v1::AvailabilityRecoveryMessage::RequestFullData(
					request_id,
					candidate_hash,
				) => {
					// Issue a
					// AvailabilityStore::QueryAvailableData(candidate-hash, response)
					// message.
					let full_data = query_full_data(ctx, candidate_hash).await?;

918
919
920
921
922
923
924
925
					tracing::trace!(
						target: LOG_TARGET,
						"Responding({}) to full data request req_id={} candidate={}",
						full_data.is_some(),
						request_id,
						candidate_hash,
					);

926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
					// Whatever the result, issue an
					// AvailabilityRecoveryV1Message::FullData(r_id, response) message.
					let wire_message = protocol_v1::AvailabilityRecoveryMessage::FullData(
						request_id,
						full_data,
					);

					ctx.send_message(AllMessages::NetworkBridge(
						NetworkBridgeMessage::SendValidationMessage(
							vec![peer],
							protocol_v1::ValidationProtocol::AvailabilityRecovery(wire_message),
						),
					)).await;
				}
				protocol_v1::AvailabilityRecoveryMessage::FullData(request_id, data) => {
					match state.live_requests.remove(&request_id) {
						None => {
							// If there doesn't exist one, report the peer and return.
							report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
						}
						Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => {
947
948
949
950
951
952
953
954
							tracing::trace!(
								target: LOG_TARGET,
								"Received full data response({}) req_id={} candidate={}",
								data.is_some(),
								request_id,
								awaited.candidate_hash,
							);

955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
							// If there exists an entry under r_id, remove it.
							// Send the response on the awaited for the interaction to handle.
							if let Some(data) = data {
								if awaited.response.send((peer_id, awaited.validator_index, data)).is_err() {
									tracing::debug!(
										target: LOG_TARGET,
										"A sending side of the recovery request is closed",
									);
								}
							}
						}
						Some(a) => {
							// If the peer in the entry doesn't match the sending peer,
							// reinstate the entry, report the peer, and return
							state.live_requests.insert(request_id, a);
							report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
						}
					}
973
				}
974
975
976
977
978
979
980
981
982
983
984
985
986
987
			}
		}
		// We do not really need to track the peers' views in this subsystem
		// since the peers are _required_ to have the data we are interested in.
		NetworkBridgeEvent::PeerViewChange(_, _) => {}
		NetworkBridgeEvent::OurViewChange(_) => {}
		// All peer connections are handled via validator discovery API.
		NetworkBridgeEvent::PeerConnected(_, _) => {}
		NetworkBridgeEvent::PeerDisconnected(_) => {}
	}

	Ok(())
}

988
989
/// Issues a request to the validator we've been waiting for to connect to us.
async fn issue_request(
990
991
992
	state: &mut State,
	ctx: &mut impl SubsystemContext<Message = AvailabilityRecoveryMessage>,
	peer_id: PeerId,
993
	awaited: Awaited,
994
995
996
997
) -> error::Result<()> {
	let request_id = state.next_request_id;
	state.next_request_id += 1;

998
	let wire_message = match awaited {
999
1000
		Awaited::Chunk(ref awaited_chunk) => {
			tracing::trace!(
For faster browsing, not all history is shown. View entire blame