validation.rs 23.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! The "validation session" networking code built on top of the base network service.
18
//!
19
20
//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called
//! each time a validation session begins on a new chain head.
21

22
23
24
25
26
use sr_primitives::traits::ProvideRuntimeApi;
use substrate_network::{PeerId, Context as NetContext};
use substrate_network::consensus_gossip::{
	self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
27
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
28
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
29
30
31
32
use polkadot_primitives::parachain::{
	Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId,
	ValidatorId, PoVBlock, ValidatorIndex,
};
33

34
use futures::prelude::*;
35
use futures::future::{self, Executor as FutureExecutor};
36
use futures::sync::mpsc;
37
use futures::sync::oneshot::{self, Receiver};
38

39
40
use std::collections::hash_map::{HashMap, Entry};
use std::io;
41
42
use std::sync::Arc;

43
use arrayvec::ArrayVec;
44
45
use tokio::runtime::TaskExecutor;
use parking_lot::Mutex;
46
use log::warn;
47

48
49
use crate::router::Router;
use crate::gossip::{POLKADOT_ENGINE_ID, RegisteredMessageValidator, MessageValidationData};
50

51
52
use super::PolkadotProtocol;

53
54
pub use polkadot_validation::Incoming;

55
56
57
58
59
60
61
62
63
/// An executor suitable for dispatching async consensus tasks.
pub trait Executor {
	fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F);
}

/// A wrapped futures::future::Executor.
pub struct WrappedExecutor<T>(pub T);

impl<T> Executor for WrappedExecutor<T>
64
	where T: FutureExecutor<Box<dyn Future<Item=(),Error=()> + Send + 'static>>
65
66
67
{
	fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F) {
		if let Err(e) = self.0.execute(Box::new(f)) {
68
			warn!(target: "validation", "could not spawn consensus task: {:?}", e);
69
70
71
72
73
74
75
76
77
78
		}
	}
}

impl Executor for TaskExecutor {
	fn spawn<F: Future<Item=(),Error=()> + Send + 'static>(&self, f: F) {
		TaskExecutor::spawn(self, f)
	}
}

79
80
81
82
83
84
85
86
87
88
89
/// A gossip network subservice.
pub trait GossipService {
	fn send_message(&mut self, ctx: &mut NetContext<Block>, who: &PeerId, message: ConsensusMessage);
}

impl GossipService for consensus_gossip::ConsensusGossip<Block> {
	fn send_message(&mut self, ctx: &mut NetContext<Block>, who: &PeerId, message: ConsensusMessage) {
		consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
	}
}

90
91
92
/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
	/// Get a stream of gossip messages for a given hash.
93
	fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification>;
94
95
96
97

	/// Gossip a message on given topic.
	fn gossip_message(&self, topic: Hash, message: Vec<u8>);

98
99
	/// Execute a closure with the gossip service.
	fn with_gossip<F: Send + 'static>(&self, with: F)
100
		where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext<Block>);
101
102
103

	/// Execute a closure with the polkadot protocol.
	fn with_spec<F: Send + 'static>(&self, with: F)
104
		where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext<Block>);
105
106
107
}

impl NetworkService for super::NetworkService {
108
	fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> {
109
110
		let (tx, rx) = std::sync::mpsc::channel();

111
		super::NetworkService::with_gossip(self, move |gossip, _| {
112
			let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
113
114
115
116
117
118
119
120
121
122
			let _ = tx.send(inner_rx);
		});

		match rx.recv() {
			Ok(rx) => rx,
			Err(_) => mpsc::unbounded().1, // return empty channel.
		}
	}

	fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
123
124
125
126
127
128
		self.gossip_consensus_message(
			topic,
			POLKADOT_ENGINE_ID,
			message,
			GossipMessageRecipient::BroadcastToAll,
		);
129
130
	}

131
	fn with_gossip<F: Send + 'static>(&self, with: F)
132
		where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext<Block>)
133
134
135
	{
		super::NetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
	}
136
137
138
139
140
141
142

	fn with_spec<F: Send + 'static>(&self, with: F)
		where F: FnOnce(&mut PolkadotProtocol, &mut NetContext<Block>)
	{
		super::NetworkService::with_spec(self, with)
	}
}
143

144
145
146
147
148
149
150
151
/// Params to a current validation session.
pub struct SessionParams {
	/// The local session key.
	pub local_session_key: Option<SessionKey>,
	/// The parent hash.
	pub parent_hash: Hash,
	/// The authorities.
	pub authorities: Vec<SessionKey>,
152
153
154
}

/// Wrapper around the network service
155
pub struct ValidationNetwork<P, E, N, T> {
156
	network: Arc<N>,
157
	api: Arc<P>,
158
	executor: T,
159
	message_validator: RegisteredMessageValidator,
160
	exit: E,
161
162
}

163
impl<P, E, N, T> ValidationNetwork<P, E, N, T> {
164
165
166
167
168
169
170
171
172
	/// Create a new consensus networking object.
	pub fn new(
		network: Arc<N>,
		exit: E,
		message_validator: RegisteredMessageValidator,
		api: Arc<P>,
		executor: T,
	) -> Self {
		ValidationNetwork { network, exit, message_validator, api, executor }
173
174
175
	}
}

176
impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
177
	fn clone(&self) -> Self {
178
		ValidationNetwork {
179
			network: self.network.clone(),
180
			exit: self.exit.clone(),
181
			api: self.api.clone(),
182
			executor: self.executor.clone(),
183
			message_validator: self.message_validator.clone(),
184
185
186
187
		}
	}
}

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
	P: ProvideRuntimeApi + Send + Sync + 'static,
	P::Api: ParachainHost<Block>,
	E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static,
	N: NetworkService,
	T: Clone + Executor + Send + Sync + 'static,
{
	/// Instantiate session data fetcher at a parent hash.
	///
	/// If the used session key is new, it will be broadcast to peers.
	/// If a validation session was already instantiated at this parent hash,
	/// the underlying instance will be shared.
	///
	/// If there was already a validation session instantiated and a different
	/// session key was set, then the new key will be ignored.
	///
	/// This implies that there can be multiple services intantiating validation
	/// session instances safely, but they should all be coordinated on which session keys
	/// are being used.
	pub fn instantiate_session(&self, params: SessionParams)
		-> oneshot::Receiver<SessionDataFetcher<P, E, N, T>>
	{
		let parent_hash = params.parent_hash;
		let network = self.network.clone();
		let api = self.api.clone();
		let task_executor = self.executor.clone();
		let exit = self.exit.clone();
		let message_validator = self.message_validator.clone();
216
217
218
219
220
		let index_mapping = params.authorities
			.iter()
			.enumerate()
			.map(|(i, k)| (i as ValidatorIndex, k.clone()))
			.collect();
221
222
223

		let (tx, rx) = oneshot::channel();

224
225
226
227
228
229
230
231
232
233
234
235
236
		{
			let message_validator = self.message_validator.clone();
			let authorities = params.authorities.clone();
			self.network.with_gossip(move |gossip, ctx| {
				message_validator.note_session(
					parent_hash,
					MessageValidationData { authorities, index_mapping },
					|peer_id, message| gossip.send_message(ctx, peer_id, message),
				);
			});
		}

		self.network.with_spec(move |spec, ctx| {
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
			let session = spec.new_validation_session(ctx, params);
			let _ = tx.send(SessionDataFetcher {
				network,
				api,
				task_executor,
				parent_hash,
				knowledge: session.knowledge().clone(),
				exit,
				message_validator,
			});
		});

		rx
	}
}

253
/// A long-lived network which can create parachain statement  routing processes on demand.
254
impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
255
256
	P: ProvideRuntimeApi + Send + Sync + 'static,
	P::Api: ParachainHost<Block>,
257
	E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static,
258
	N: NetworkService,
259
	T: Clone + Executor + Send + Sync + 'static,
260
{
261
	type Error = String;
262
	type TableRouter = Router<P, E, N, T>;
263
	type BuildTableRouter = Box<Future<Item=Self::TableRouter,Error=String> + Send>;
264

265
266
267
	fn communication_for(
		&self,
		table: Arc<SharedTable>,
Gav Wood's avatar
Gav Wood committed
268
		authorities: &[ValidatorId],
269
	) -> Self::BuildTableRouter {
270
271
		let parent_hash = table.consensus_parent_hash().clone();
		let local_session_key = table.session_key();
272

273
274
275
276
277
		let build_fetcher = self.instantiate_session(SessionParams {
			local_session_key: Some(local_session_key),
			parent_hash,
			authorities: authorities.to_vec(),
		});
278
		let message_validator = self.message_validator.clone();
279

280
		let executor = self.executor.clone();
281
282
283
284
285
286
		let work = build_fetcher
			.map_err(|e| format!("{:?}", e))
			.map(move |fetcher| {
				let table_router = Router::new(
					table,
					fetcher,
287
					message_validator,
288
				);
289

290
291
292
293
				let table_router_clone = table_router.clone();
				let work = table_router.checked_statements()
					.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
				executor.spawn(work);
294

295
296
297
298
				table_router
			});

		Box::new(work)
299
300
301
	}
}

302
303
304
305
306
/// Error when the network appears to be down.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct NetworkDown;

/// A future that resolves when a collation is received.
307
308
309
310
pub struct AwaitingCollation {
	outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>,
	inner: Option<::futures::sync::oneshot::Receiver<Collation>>
}
311
312
313
314
315
316

impl Future for AwaitingCollation {
	type Item = Collation;
	type Error = NetworkDown;

	fn poll(&mut self) -> Poll<Collation, NetworkDown> {
317
318
319
320
321
		if let Some(ref mut inner) = self.inner {
			return inner
				.poll()
				.map_err(|_| NetworkDown)
		}
322
323
324
325
326
327
328
		match self.outer.poll() {
			Ok(futures::Async::Ready(inner)) => {
				self.inner = Some(inner);
				self.poll()
			},
			Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady),
			Err(_) => Err(NetworkDown)
329
		}
330
331
332
	}
}

333
impl<P, E: Clone, N, T: Clone> Collators for ValidationNetwork<P, E, N, T> where
334
335
336
	P: ProvideRuntimeApi + Send + Sync + 'static,
	P::Api: ParachainHost<Block>,
	N: NetworkService,
337
{
338
339
	type Error = NetworkDown;
	type Collation = AwaitingCollation;
340

341
	fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation {
342
343
344
345
346
347
		let (tx, rx) = ::futures::sync::oneshot::channel();
		self.network.with_spec(move |spec, _| {
			let collation = spec.await_collation(relay_parent, parachain);
			let _ = tx.send(collation);
		});
		AwaitingCollation{outer: rx, inner: None}
348
349
	}

350

Gav Wood's avatar
Gav Wood committed
351
	fn note_bad_collator(&self, collator: CollatorId) {
352
		self.network.with_spec(move |spec, ctx| spec.disconnect_bad_collator(ctx, collator));
353
	}
354
}
355
356
357

#[derive(Default)]
struct KnowledgeEntry {
Gav Wood's avatar
Gav Wood committed
358
359
	knows_block_data: Vec<ValidatorId>,
	knows_extrinsic: Vec<ValidatorId>,
360
	pov: Option<PoVBlock>,
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
	extrinsic: Option<Extrinsic>,
}

/// Tracks knowledge of peers.
pub(crate) struct Knowledge {
	candidates: HashMap<Hash, KnowledgeEntry>,
}

impl Knowledge {
	/// Create a new knowledge instance.
	pub(crate) fn new() -> Self {
		Knowledge {
			candidates: HashMap::new(),
		}
	}

	/// Note a statement seen from another validator.
Gav Wood's avatar
Gav Wood committed
378
	pub(crate) fn note_statement(&mut self, from: ValidatorId, statement: &Statement) {
379
380
381
		// those proposing the candidate or declaring it valid know everything.
		// those claiming it invalid do not have the extrinsic data as it is
		// generated by valid execution.
382
383
		match *statement {
			GenericStatement::Candidate(ref c) => {
thiolliere's avatar
thiolliere committed
384
				let entry = self.candidates.entry(c.hash()).or_insert_with(Default::default);
Gav Wood's avatar
Gav Wood committed
385
				entry.knows_block_data.push(from.clone());
386
387
				entry.knows_extrinsic.push(from);
			}
388
			GenericStatement::Valid(ref hash) => {
thiolliere's avatar
thiolliere committed
389
				let entry = self.candidates.entry(*hash).or_insert_with(Default::default);
Gav Wood's avatar
Gav Wood committed
390
				entry.knows_block_data.push(from.clone());
391
392
393
				entry.knows_extrinsic.push(from);
			}
			GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash)
394
395
396
397
398
399
400
				.or_insert_with(Default::default)
				.knows_block_data
				.push(from),
		}
	}

	/// Note a candidate collated or seen locally.
401
	pub(crate) fn note_candidate(&mut self, hash: Hash, pov: Option<PoVBlock>, extrinsic: Option<Extrinsic>) {
402
		let entry = self.candidates.entry(hash).or_insert_with(Default::default);
403
		entry.pov = entry.pov.take().or(pov);
404
405
406
407
		entry.extrinsic = entry.extrinsic.take().or(extrinsic);
	}
}

408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
/// receiver for incoming data.
#[derive(Clone)]
pub struct IncomingReceiver {
	inner: future::Shared<Receiver<Incoming>>
}

impl Future for IncomingReceiver {
	type Item = Incoming;
	type Error = io::Error;

	fn poll(&mut self) -> Poll<Incoming, io::Error> {
		match self.inner.poll() {
			Ok(Async::NotReady) => Ok(Async::NotReady),
			Ok(Async::Ready(i)) => Ok(Async::Ready(Incoming::clone(&*i))),
			Err(_) => Err(io::Error::new(
				io::ErrorKind::Other,
				"Sending end of channel hung up",
			)),
		}
	}
}

430
/// A current validation session instance.
431
#[derive(Clone)]
432
pub(crate) struct ValidationSession {
433
	parent_hash: Hash,
434
	knowledge: Arc<Mutex<Knowledge>>,
435
	local_session_key: Option<ValidatorId>,
436
437
}

438
impl ValidationSession {
439
	/// Create a new validation session instance. Needs to be attached to the
Yuanchao Sun's avatar
Yuanchao Sun committed
440
	/// network.
441
	pub(crate) fn new(params: SessionParams) -> Self {
442
		ValidationSession {
443
444
445
			parent_hash: params.parent_hash,
			knowledge: Arc::new(Mutex::new(Knowledge::new())),
			local_session_key: params.local_session_key,
446
447
448
		}
	}

449
450
451
452
453
454
	/// Get a handle to the shared knowledge relative to this consensus
	/// instance.
	pub(crate) fn knowledge(&self) -> &Arc<Mutex<Knowledge>> {
		&self.knowledge
	}

455
	// execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities
456
	// we believe should have the data.
457
458
	fn with_pov_block<F, U>(&self, hash: &Hash, f: F) -> U
		where F: FnOnce(Result<&PoVBlock, &[ValidatorId]>) -> U
459
460
461
462
	{
		let knowledge = self.knowledge.lock();
		let res = knowledge.candidates.get(hash)
			.ok_or(&[] as &_)
463
			.and_then(|entry| entry.pov.as_ref().ok_or(&entry.knows_block_data[..]));
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479

		f(res)
	}
}

// 3 is chosen because sessions change infrequently and usually
// only the last 2 (current session and "last" session) are relevant.
// the extra is an error boundary.
const RECENT_SESSIONS: usize = 3;

/// Result when inserting recent session key.
#[derive(PartialEq, Eq)]
pub(crate) enum InsertedRecentKey {
	/// Key was already known.
	AlreadyKnown,
	/// Key was new and pushed out optional old item.
Gav Wood's avatar
Gav Wood committed
480
	New(Option<ValidatorId>),
481
482
483
484
}

/// Wrapper for managing recent session keys.
#[derive(Default)]
Gav Wood's avatar
Gav Wood committed
485
486
pub(crate) struct RecentValidatorIds {
	inner: ArrayVec<[ValidatorId; RECENT_SESSIONS]>,
487
488
}

Gav Wood's avatar
Gav Wood committed
489
impl RecentValidatorIds {
490
491
	/// Insert a new session key. This returns one to be pushed out if the
	/// set is full.
Gav Wood's avatar
Gav Wood committed
492
	pub(crate) fn insert(&mut self, key: ValidatorId) -> InsertedRecentKey {
493
494
495
496
497
498
499
500
501
502
503
504
505
		if self.inner.contains(&key) { return InsertedRecentKey::AlreadyKnown }

		let old = if self.inner.len() == RECENT_SESSIONS {
			Some(self.inner.remove(0))
		} else {
			None
		};

		self.inner.push(key);
		InsertedRecentKey::New(old)
	}

	/// As a slice.
Gav Wood's avatar
Gav Wood committed
506
	pub(crate) fn as_slice(&self) -> &[ValidatorId] {
507
508
509
		&*self.inner
	}

Gav Wood's avatar
Gav Wood committed
510
	fn remove(&mut self, key: &ValidatorId) {
511
512
513
514
		self.inner.retain(|k| k != key)
	}
}

515
516
/// Manages requests and keys for live validation session instances.
pub(crate) struct LiveValidationSessions {
517
	// recent local session keys.
Gav Wood's avatar
Gav Wood committed
518
	recent: RecentValidatorIds,
519
520
	// live validation session instances, on `parent_hash`. refcount retained alongside.
	live_instances: HashMap<Hash, (usize, ValidationSession)>,
521
522
}

523
524
impl LiveValidationSessions {
	/// Create a new `LiveValidationSessions`
525
	pub(crate) fn new() -> Self {
526
		LiveValidationSessions {
527
528
529
530
531
			recent: Default::default(),
			live_instances: HashMap::new(),
		}
	}

532
	/// Note new validation session. If the used session key is new,
533
	/// it returns it to be broadcasted to peers.
534
535
536
	///
	/// If there was already a validation session instantiated and a different
	/// session key was set, then the new key will be ignored.
537
	pub(crate) fn new_validation_session(
538
		&mut self,
539
540
541
542
543
544
545
546
547
548
549
550
551
552
		params: SessionParams,
	) -> (ValidationSession, Option<ValidatorId>) {
		let parent_hash = params.parent_hash.clone();

		let key = params.local_session_key.clone();
		let recent = &mut self.recent;

		let mut check_new_key = || {
			let inserted_key = key.clone().map(|key| recent.insert(key));
			if let Some(InsertedRecentKey::New(_)) = inserted_key {
				key.clone()
			} else {
				None
			}
553
554
		};

555
556
557
558
559
560
561
562
563
564
565
566
567
568
		if let Some(&mut (ref mut rc, ref mut prev)) = self.live_instances.get_mut(&parent_hash) {
			let maybe_new = if prev.local_session_key.is_none() {
				prev.local_session_key = key.clone();
				check_new_key()
			} else {
				None
			};

			*rc += 1;
			return (prev.clone(), maybe_new)
		}

		let session = ValidationSession::new(params);
		self.live_instances.insert(parent_hash, (1, session.clone()));
569

570
		(session, check_new_key())
571
572
	}

573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
	/// Remove validation session. true indicates that it was actually removed.
	pub(crate) fn remove(&mut self, parent_hash: Hash) -> bool {
		let maybe_removed = if let Entry::Occupied(mut entry) = self.live_instances.entry(parent_hash) {
			entry.get_mut().0 -= 1;
			if entry.get().0 == 0 {
				let (_, session) = entry.remove();
				Some(session)
			} else {
				None
			}
		} else {
			None
		};

		let session = match maybe_removed {
			None => return false,
			Some(s) => s,
		};

		if let Some(ref key) = session.local_session_key {
593
			let key_still_used = self.live_instances.values()
594
				.any(|c| c.1.local_session_key.as_ref() == Some(key));
595
596

			if !key_still_used {
597
				self.recent.remove(key)
598
599
			}
		}
600
601

		true
602
603
604
	}

	/// Recent session keys as a slice.
Gav Wood's avatar
Gav Wood committed
605
	pub(crate) fn recent_keys(&self) -> &[ValidatorId] {
606
607
608
		self.recent.as_slice()
	}

609
610
	/// Call a closure with pov-data from validation session at parent hash for a given
	/// candidate-receipt hash.
611
612
613
614
	///
	/// This calls the closure with `Some(data)` where the session and data are live,
	/// `Err(Some(keys))` when the session is live but the data unknown, with a list of keys
	/// who have the data, and `Err(None)` where the session is unknown.
615
616
	pub(crate) fn with_pov_block<F, U>(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U
		where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U
617
618
	{
		match self.live_instances.get(parent_hash) {
619
			Some(c) => c.1.with_pov_block(c_hash, |res| f(res.map_err(Some))),
620
621
622
623
624
			None => f(Err(None))
		}
	}
}

625
/// Receiver for block data.
626
627
628
pub struct PoVReceiver {
	outer: Receiver<Receiver<PoVBlock>>,
	inner: Option<Receiver<PoVBlock>>
629
630
}

631
632
impl Future for PoVReceiver {
	type Item = PoVBlock;
633
634
	type Error = io::Error;

635
	fn poll(&mut self) -> Poll<PoVBlock, io::Error> {
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
		let map_err = |_| io::Error::new(
			io::ErrorKind::Other,
			"Sending end of channel hung up",
		);

		if let Some(ref mut inner) = self.inner {
			return inner.poll().map_err(map_err);
		}
		match self.outer.poll().map_err(map_err)? {
			Async::Ready(inner) => {
				self.inner = Some(inner);
				self.poll()
			}
			Async::NotReady => Ok(Async::NotReady),
		}
	}
}

/// Can fetch data for a given validation session
pub struct SessionDataFetcher<P, E, N: NetworkService, T> {
	network: Arc<N>,
	api: Arc<P>,
	exit: E,
	task_executor: T,
	knowledge: Arc<Mutex<Knowledge>>,
	parent_hash: Hash,
	message_validator: RegisteredMessageValidator,
}

impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> {
	/// Get the parent hash.
	pub(crate) fn parent_hash(&self) -> Hash {
		self.parent_hash.clone()
	}

	/// Get the shared knowledge.
	pub(crate) fn knowledge(&self) -> &Arc<Mutex<Knowledge>> {
		&self.knowledge
	}

	/// Get the exit future.
	pub(crate) fn exit(&self) -> &E {
		&self.exit
	}

	/// Get the network service.
	pub(crate) fn network(&self) -> &Arc<N> {
		&self.network
	}

	/// Get the executor.
	pub(crate) fn executor(&self) -> &T {
		&self.task_executor
	}

	/// Get the runtime API.
	pub(crate) fn api(&self) -> &Arc<P> {
		&self.api
	}
}

impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E, N, T> {
	fn clone(&self) -> Self {
		SessionDataFetcher {
			network: self.network.clone(),
			api: self.api.clone(),
			task_executor: self.task_executor.clone(),
			parent_hash: self.parent_hash.clone(),
			knowledge: self.knowledge.clone(),
			exit: self.exit.clone(),
			message_validator: self.message_validator.clone(),
		}
	}
}

impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
	P::Api: ParachainHost<Block>,
	N: NetworkService,
	T: Clone + Executor + Send + 'static,
	E: Future<Item=(),Error=()> + Clone + Send + 'static,
{
717
718
719
	/// Fetch PoV block for the given candidate receipt.
	pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
		let parachain = candidate.parachain_index.clone();
720
		let parent_hash = self.parent_hash;
721
722
723
724
725
726
727
728
729
730
731

		let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
			.map_err(|e|
				format!(
					"Cannot fetch ingress for parachain {:?} at {:?}: {:?}",
					parachain,
					parent_hash,
					e,
				)
			);

732
733
734
		let candidate = candidate.clone();
		let (tx, rx) = ::futures::sync::oneshot::channel();
		self.network.with_spec(move |spec, ctx| {
735
736
737
738
			if let Ok(Some(canon_roots)) = canon_roots {
				let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash, canon_roots);
				let _ = tx.send(inner_rx);
			}
739
		});
740
		PoVReceiver { outer: rx, inner: None }
741
742
743
	}
}

744
745
746
#[cfg(test)]
mod tests {
	use super::*;
Gav Wood's avatar
Gav Wood committed
747
	use substrate_primitives::crypto::UncheckedInto;
748
749
750

	#[test]
	fn last_keys_works() {
Gav Wood's avatar
Gav Wood committed
751
752
753
754
		let a: ValidatorId = [1; 32].unchecked_into();
		let b: ValidatorId = [2; 32].unchecked_into();
		let c: ValidatorId = [3; 32].unchecked_into();
		let d: ValidatorId = [4; 32].unchecked_into();
755

Gav Wood's avatar
Gav Wood committed
756
		let mut recent = RecentValidatorIds::default();
757

Gav Wood's avatar
Gav Wood committed
758
		match recent.insert(a.clone()) {
759
760
761
762
			InsertedRecentKey::New(None) => {},
			_ => panic!("is new, not at capacity"),
		}

Gav Wood's avatar
Gav Wood committed
763
		match recent.insert(a.clone()) {
764
765
766
767
			InsertedRecentKey::AlreadyKnown => {},
			_ => panic!("not new"),
		}

Gav Wood's avatar
Gav Wood committed
768
		match recent.insert(b.clone()) {
769
770
771
772
773
774
775
776
777
			InsertedRecentKey::New(None) => {},
			_ => panic!("is new, not at capacity"),
		}

		match recent.insert(b) {
			InsertedRecentKey::AlreadyKnown => {},
			_ => panic!("not new"),
		}

Gav Wood's avatar
Gav Wood committed
778
		match recent.insert(c.clone()) {
779
780
781
782
783
784
785
786
787
			InsertedRecentKey::New(None) => {},
			_ => panic!("is new, not at capacity"),
		}

		match recent.insert(c) {
			InsertedRecentKey::AlreadyKnown => {},
			_ => panic!("not new"),
		}

Gav Wood's avatar
Gav Wood committed
788
		match recent.insert(d.clone()) {
789
790
791
792
793
794
795
796
797
			InsertedRecentKey::New(Some(old)) => assert_eq!(old, a),
			_ => panic!("is new, and at capacity"),
		}

		match recent.insert(d) {
			InsertedRecentKey::AlreadyKnown => {},
			_ => panic!("not new"),
		}
	}
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834

	#[test]
	fn add_new_sessions_works() {
		let mut live_sessions = LiveValidationSessions::new();
		let key_a: ValidatorId = [0; 32].unchecked_into();
		let key_b: ValidatorId = [1; 32].unchecked_into();
		let parent_hash = [0xff; 32].into();

		let (session, new_key) = live_sessions.new_validation_session(SessionParams {
			parent_hash,
			local_session_key: None,
			authorities: Vec::new(),
		});

		let knowledge = session.knowledge().clone();

		assert!(new_key.is_none());

		let (session, new_key) = live_sessions.new_validation_session(SessionParams {
			parent_hash,
			local_session_key: Some(key_a.clone()),
			authorities: Vec::new(),
		});

		// check that knowledge points to the same place.
		assert_eq!(&**session.knowledge() as *const _, &*knowledge as *const _);
		assert_eq!(new_key, Some(key_a.clone()));

		let (session, new_key) = live_sessions.new_validation_session(SessionParams {
			parent_hash,
			local_session_key: Some(key_b.clone()),
			authorities: Vec::new(),
		});

		assert_eq!(&**session.knowledge() as *const _, &*knowledge as *const _);
		assert!(new_key.is_none());
	}
835
}