validation.rs 11.7 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2019-2020 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Tests and helpers for validation networking.
18

19
20
#![allow(unused)]

21
use crate::gossip::GossipMessage;
22
23
use sc_network::{Context as NetContext, PeerId};
use sc_network_gossip::TopicNotification;
24
25
use sp_core::{NativeOrEncoded, ExecutionContext};
use sp_keyring::Sr25519Keyring;
26
use crate::{PolkadotProtocol, NetworkService, GossipMessageStream};
27

28
29
use polkadot_validation::{SharedTable, Network};
use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId};
30
use polkadot_primitives::parachain::{
31
	Id as ParaId, Chain, DutyRoster, ParachainHost, TargetedMessage,
32
	ValidatorId, StructuredUnroutedIngress, BlockIngressRoots, Status,
33
	FeeSchedule, HeadData, Retriable, CollatorId, ErasureChunk, CandidateReceipt,
34
};
35
use parking_lot::Mutex;
Gavin Wood's avatar
Gavin Wood committed
36
use sp_blockchain::Result as ClientResult;
37
use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi};
38
39
use sp_runtime::traits::{Block as BlockT, HasherFor, NumberFor};
use sp_state_machine::ChangesTrieState;
40
41
42

use std::collections::HashMap;
use std::sync::Arc;
43
44
use std::pin::Pin;
use std::task::{Poll, Context};
45
use futures::{prelude::*, channel::mpsc, future::{select, Either}};
46
use codec::Encode;
47

48
use super::{TestContext, TestChainContext};
49

50
type TaskExecutor = Arc<dyn futures::task::Spawn + Send + Sync>;
Gavin Wood's avatar
Gavin Wood committed
51

52
53
54
55
#[derive(Clone, Copy)]
struct NeverExit;

impl Future for NeverExit {
56
	type Output = ();
57

58
59
	fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
		Poll::Pending
60
61
62
	}
}

63
64
65
66
67
68
69
fn clone_gossip(n: &TopicNotification) -> TopicNotification {
	TopicNotification {
		message: n.message.clone(),
		sender: n.sender.clone(),
	}
}

70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async fn gossip_router(
	mut incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
	mut incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>
) {
	let mut outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)> = Vec::new();
	let mut messages = Vec::new();

	loop {
		match select(incoming_messages.next(), incoming_streams.next()).await {
			Either::Left((Some((topic, message)), _)) => {
				outgoing.retain(|&(ref o_topic, ref sender)| {
					o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
				});
				messages.push((topic, message));
			},
			Either::Right((Some((topic, sender)), _)) => {
				for message in messages.iter()
					.filter(|&&(ref t, _)| t == &topic)
					.map(|&(_, ref msg)| clone_gossip(msg))
				{
					if let Err(_) = sender.unbounded_send(message) { return }
				}

				outgoing.push((topic, sender));
			},
			Either::Left((None, _)) | Either::Right((None, _)) =>  panic!("ended early.")
96
97
98
99
100
101
		}
	}
}

#[derive(Clone)]
struct GossipHandle {
102
103
	send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>,
	send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
104
105
}

106
fn make_gossip() -> (impl Future<Output = ()>, GossipHandle) {
107
108
109
110
	let (message_tx, message_rx) = mpsc::unbounded();
	let (listener_tx, listener_rx) = mpsc::unbounded();

	(
111
		gossip_router(message_rx, listener_rx),
112
113
114
115
116
117
118
119
120
121
		GossipHandle { send_message: message_tx, send_listener: listener_tx },
	)
}

struct TestNetwork {
	proto: Arc<Mutex<PolkadotProtocol>>,
	gossip: GossipHandle,
}

impl NetworkService for TestNetwork {
122
	fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
123
124
		let (tx, rx) = mpsc::unbounded();
		let _  = self.gossip.send_listener.unbounded_send((topic, tx));
125
		GossipMessageStream::new(rx.boxed())
126
127
	}

128
129
130
131
	fn send_message(&self, _: PeerId, _: GossipMessage) {
		unimplemented!()
	}

132
133
	fn gossip_message(&self, topic: Hash, message: GossipMessage) {
		let notification = TopicNotification { message: message.encode(), sender: None };
134
		let _ = self.gossip.send_message.unbounded_send((topic, notification));
135
136
137
	}

	fn with_spec<F: Send + 'static>(&self, with: F)
138
		where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext<Block>)
139
140
141
142
	{
		let mut context = TestContext::default();
		let res = with(&mut *self.proto.lock(), &mut context);
		// TODO: send context to worker for message routing.
143
		// https://github.com/paritytech/polkadot/issues/215
144
145
146
147
148
149
		res
	}
}

#[derive(Default)]
struct ApiData {
Gav Wood's avatar
Gav Wood committed
150
	validators: Vec<ValidatorId>,
151
	duties: Vec<Chain>,
152
	active_parachains: Vec<(ParaId, Option<(CollatorId, Retriable)>)>,
153
	ingress: HashMap<ParaId, StructuredUnroutedIngress>,
154
155
156
157
158
159
160
161
162
163
164
}

#[derive(Default, Clone)]
struct TestApi {
	data: Arc<Mutex<ApiData>>,
}

struct RuntimeApi {
	data: Arc<Mutex<ApiData>>,
}

165
impl ProvideRuntimeApi<Block> for TestApi {
166
167
168
169
170
171
172
173
	type Api = RuntimeApi;

	fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
		RuntimeApi { data: self.data.clone() }.into()
	}
}

impl Core<Block> for RuntimeApi {
174
	fn Core_version_runtime_api_impl(
175
176
177
178
179
180
181
182
183
		&self,
		_: &BlockId,
		_: ExecutionContext,
		_: Option<()>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<RuntimeVersion>> {
		unimplemented!("Not required for testing!")
	}

184
	fn Core_execute_block_runtime_api_impl(
185
186
187
188
189
190
191
192
193
		&self,
		_: &BlockId,
		_: ExecutionContext,
		_: Option<(Block)>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<()>> {
		unimplemented!("Not required for testing!")
	}

194
	fn Core_initialize_block_runtime_api_impl(
195
196
197
198
199
200
201
202
203
204
		&self,
		_: &BlockId,
		_: ExecutionContext,
		_: Option<&Header>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<()>> {
		unimplemented!("Not required for testing!")
	}
}

205
impl ApiErrorExt for RuntimeApi {
Gavin Wood's avatar
Gavin Wood committed
206
	type Error = sp_blockchain::Error;
207
208
209
210
}

impl ApiExt<Block> for RuntimeApi {
	type StateBackend = sp_state_machine::InMemoryBackend<sp_api::HasherFor<Block>>;
Gavin Wood's avatar
Gavin Wood committed
211

212
213
214
215
216
217
218
219
220
221
	fn map_api_result<F: FnOnce(&Self) -> Result<R, E>, R, E>(
		&self,
		_: F
	) -> Result<R, E> {
		unimplemented!("Not required for testing!")
	}

	fn runtime_version_at(&self, _: &BlockId) -> ClientResult<RuntimeVersion> {
		unimplemented!("Not required for testing!")
	}
222
223
224

	fn record_proof(&mut self) { }

Gavin Wood's avatar
Gavin Wood committed
225
	fn extract_proof(&mut self) -> Option<StorageProof> {
226
227
		None
	}
228

229
	fn into_storage_changes(
230
231
		&self,
		_: &Self::StateBackend,
232
		_: Option<&ChangesTrieState<HasherFor<Block>, NumberFor<Block>>>,
233
234
235
236
237
238
		_: <Block as sp_api::BlockT>::Hash,
	) -> std::result::Result<sp_api::StorageChanges<Self::StateBackend, Block>, String>
		where Self: Sized
	{
		unimplemented!("Not required for testing!")
	}
239
240
241
}

impl ParachainHost<Block> for RuntimeApi {
242
	fn ParachainHost_validators_runtime_api_impl(
243
244
245
246
247
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<()>,
		_: Vec<u8>,
Gav Wood's avatar
Gav Wood committed
248
	) -> ClientResult<NativeOrEncoded<Vec<ValidatorId>>> {
249
250
251
		Ok(NativeOrEncoded::Native(self.data.lock().validators.clone()))
	}

252
	fn ParachainHost_duty_roster_runtime_api_impl(
253
254
255
256
257
258
259
260
261
262
263
264
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<()>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<DutyRoster>> {

		Ok(NativeOrEncoded::Native(DutyRoster {
			validator_duty: self.data.lock().duties.clone(),
		}))
	}

265
	fn ParachainHost_active_parachains_runtime_api_impl(
266
267
268
269
270
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<()>,
		_: Vec<u8>,
271
	) -> ClientResult<NativeOrEncoded<Vec<(ParaId, Option<(CollatorId, Retriable)>)>>> {
272
273
274
		Ok(NativeOrEncoded::Native(self.data.lock().active_parachains.clone()))
	}

275
	fn ParachainHost_parachain_status_runtime_api_impl(
276
277
278
279
280
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<ParaId>,
		_: Vec<u8>,
281
282
283
284
285
286
287
288
289
	) -> ClientResult<NativeOrEncoded<Option<Status>>> {
		Ok(NativeOrEncoded::Native(Some(Status {
			head_data: HeadData(Vec::new()),
			balance: 0,
			fee_schedule: FeeSchedule {
				base: 0,
				per_byte: 0,
			}
		})))
290
291
	}

292
	fn ParachainHost_parachain_code_runtime_api_impl(
293
294
295
296
297
298
299
300
301
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<ParaId>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<Option<Vec<u8>>>> {
		Ok(NativeOrEncoded::Native(Some(Vec::new())))
	}

302
	fn ParachainHost_ingress_runtime_api_impl(
303
304
305
		&self,
		_at: &BlockId,
		_: ExecutionContext,
306
		id: Option<(ParaId, Option<BlockNumber>)>,
307
		_: Vec<u8>,
308
	) -> ClientResult<NativeOrEncoded<Option<StructuredUnroutedIngress>>> {
309
		let (id, _) = id.unwrap();
310
311
		Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned()))
	}
312
313
314
315
316
317
318
319
320
321

	fn ParachainHost_get_heads_runtime_api_impl(
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_extrinsics: Option<Vec<<Block as BlockT>::Extrinsic>>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<Option<Vec<CandidateReceipt>>>> {
		Ok(NativeOrEncoded::Native(Some(Vec::new())))
	}
322
323
}

324
type TestValidationNetwork = crate::validation::ValidationNetwork<
325
326
327
328
329
330
	TestApi,
	NeverExit,
	TaskExecutor,
>;

struct Built {
331
	gossip: Pin<Box<dyn Future<Output = ()>>>,
332
	api_handle: Arc<Mutex<ApiData>>,
333
	networks: Vec<TestValidationNetwork>,
334
335
336
337
338
339
340
341
342
343
344
345
346
}

fn build_network(n: usize, executor: TaskExecutor) -> Built {
	let (gossip_router, gossip_handle) = make_gossip();
	let api_handle = Arc::new(Mutex::new(Default::default()));
	let runtime_api = Arc::new(TestApi { data: api_handle.clone() });

	let networks = (0..n).map(|_| {
		let net = Arc::new(TestNetwork {
			proto: Arc::new(Mutex::new(PolkadotProtocol::new(None))),
			gossip: gossip_handle.clone(),
		});

347
		let message_val = crate::gossip::RegisteredMessageValidator::new_test(
348
			TestChainContext::default(),
349
			Box::new(|_, _| {}),
350
351
		);

352
		TestValidationNetwork::new(
353
			message_val,
354
			NeverExit,
355
356
357
358
359
360
361
362
			runtime_api.clone(),
			executor.clone(),
		)
	});

	let networks: Vec<_> = networks.collect();

	Built {
363
		gossip: gossip_router.boxed(),
364
365
366
367
368
369
370
371
372
373
374
		api_handle,
		networks,
	}
}

#[derive(Default)]
struct IngressBuilder {
	egress: HashMap<(ParaId, ParaId), Vec<Vec<u8>>>,
}

impl IngressBuilder {
375
	fn add_messages(&mut self, source: ParaId, messages: &[TargetedMessage]) {
376
377
378
379
380
381
		for message in messages {
			let target = message.target;
			self.egress.entry((source, target)).or_insert_with(Vec::new).push(message.data.clone());
		}
	}

382
	fn build(self) -> HashMap<ParaId, BlockIngressRoots> {
383
384
385
		let mut map = HashMap::new();
		for ((source, target), messages) in self.egress {
			map.entry(target).or_insert_with(Vec::new)
386
				.push((source, polkadot_validation::message_queue_root(&messages)));
387
388
389
390
391
392
		}

		for roots in map.values_mut() {
			roots.sort_by_key(|&(para_id, _)| para_id);
		}

393
		map.into_iter().map(|(k, v)| (k, BlockIngressRoots(v))).collect()
394
395
396
	}
}

397
398
399
400
401
402
403
404
#[derive(Clone)]
struct DummyGossipMessages;

use futures::stream;
impl av_store::ProvideGossipMessages for DummyGossipMessages {
	fn gossip_messages_for(
		&self,
		_topic: Hash
405
406
	) -> Pin<Box<dyn futures::Stream<Item = (Hash, Hash, ErasureChunk)> + Send>> {
		stream::empty().boxed()
407
408
409
410
411
412
413
414
415
416
417
	}

	fn gossip_erasure_chunk(
		&self,
		_relay_parent: Hash,
		_candidate_hash: Hash,
		_erasure_root: Hash,
		_chunk: ErasureChunk,
	) {}
}

418
fn make_table(data: &ApiData, local_key: &Sr25519Keyring, parent_hash: Hash) -> Arc<SharedTable> {
419
	use av_store::Store;
420
	use sp_core::crypto::Pair;
421

422
423
	let sr_pair = local_key.pair();
	let local_key = polkadot_primitives::parachain::ValidatorPair::from(local_key.pair());
424
	let store = Store::new_in_memory(DummyGossipMessages);
425
	let (group_info, _) = ::polkadot_validation::make_group_info(
426
		DutyRoster { validator_duty: data.duties.clone() },
Gav Wood's avatar
Gav Wood committed
427
		&data.validators, // only possible as long as parachain crypto === aura crypto
428
		Some(sr_pair.public().into()),
429
430
431
	).unwrap();

	Arc::new(SharedTable::new(
432
		data.validators.clone(),
433
		group_info,
434
		Some(Arc::new(local_key)),
435
436
		parent_hash,
		store,
437
		None,
438
439
	))
}