validation.rs 10.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Tests and helpers for validation networking.
18

19
20
#![allow(unused)]

21
use crate::validation::{NetworkService, GossipService};
22
use substrate_network::Context as NetContext;
23
use substrate_network::consensus_gossip::TopicNotification;
24
use substrate_primitives::{NativeOrEncoded, ExecutionContext};
Gav Wood's avatar
Gav Wood committed
25
use substrate_keyring::AuthorityKeyring;
26
use crate::PolkadotProtocol;
27

28
use polkadot_validation::{SharedTable, MessagesFrom, Network};
Gav Wood's avatar
Gav Wood committed
29
use polkadot_primitives::{SessionKey, Block, Hash, Header, BlockId};
30
31
use polkadot_primitives::parachain::{
	Id as ParaId, Chain, DutyRoster, ParachainHost, OutgoingMessage,
32
	ValidatorId, StructuredUnroutedIngress, BlockIngressRoots,
33
};
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
use parking_lot::Mutex;
use substrate_client::error::Result as ClientResult;
use substrate_client::runtime_api::{Core, RuntimeVersion, ApiExt};
use sr_primitives::traits::{ApiRef, ProvideRuntimeApi};

use std::collections::HashMap;
use std::sync::Arc;
use futures::{prelude::*, sync::mpsc};
use tokio::runtime::{Runtime, TaskExecutor};

use super::TestContext;

#[derive(Clone, Copy)]
struct NeverExit;

impl Future for NeverExit {
	type Item = ();
	type Error = ();

	fn poll(&mut self) -> Poll<(), ()> {
		Ok(Async::NotReady)
	}
}

58
59
60
61
62
63
64
fn clone_gossip(n: &TopicNotification) -> TopicNotification {
	TopicNotification {
		message: n.message.clone(),
		sender: n.sender.clone(),
	}
}

65
struct GossipRouter {
66
67
68
69
	incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>,
	incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
	outgoing: Vec<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
	messages: Vec<(Hash, TopicNotification)>,
70
71
72
}

impl GossipRouter {
73
	fn add_message(&mut self, topic: Hash, message: TopicNotification) {
74
		self.outgoing.retain(|&(ref o_topic, ref sender)| {
75
			o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok()
76
77
78
79
		});
		self.messages.push((topic, message));
	}

80
	fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender<TopicNotification>) {
81
82
		for message in self.messages.iter()
			.filter(|&&(ref t, _)| t == &topic)
83
			.map(|&(_, ref msg)| clone_gossip(msg))
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
		{
			if let Err(_) = sender.unbounded_send(message) { return }
		}

		self.outgoing.push((topic, sender));
	}
}

impl Future for GossipRouter {
	type Item = ();
	type Error = ();

	fn poll(&mut self) -> Poll<(), ()> {
		loop {
			match self.incoming_messages.poll().unwrap() {
				Async::Ready(Some((topic, message))) => self.add_message(topic, message),
				Async::Ready(None) => panic!("ended early."),
				Async::NotReady => break,
			}
		}

		loop {
			match self.incoming_streams.poll().unwrap() {
				Async::Ready(Some((topic, sender))) => self.add_outgoing(topic, sender),
				Async::Ready(None) => panic!("ended early."),
				Async::NotReady => break,
			}
		}

		Ok(Async::NotReady)
	}
}


#[derive(Clone)]
struct GossipHandle {
120
121
	send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>,
	send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender<TopicNotification>)>,
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
}

fn make_gossip() -> (GossipRouter, GossipHandle) {
	let (message_tx, message_rx) = mpsc::unbounded();
	let (listener_tx, listener_rx) = mpsc::unbounded();

	(
		GossipRouter {
			incoming_messages: message_rx,
			incoming_streams: listener_rx,
			outgoing: Vec::new(),
			messages: Vec::new(),
		},
		GossipHandle { send_message: message_tx, send_listener: listener_tx },
	)
}

struct TestNetwork {
	proto: Arc<Mutex<PolkadotProtocol>>,
	gossip: GossipHandle,
}

impl NetworkService for TestNetwork {
145
	fn gossip_messages_for(&self, topic: Hash) -> mpsc::UnboundedReceiver<TopicNotification> {
146
147
148
149
150
		let (tx, rx) = mpsc::unbounded();
		let _  = self.gossip.send_listener.unbounded_send((topic, tx));
		rx
	}

151
	fn gossip_message(&self, topic: Hash, message: Vec<u8>) {
152
153
		let notification = TopicNotification { message, sender: None };
		let _ = self.gossip.send_message.unbounded_send((topic, notification));
154
155
	}

156
	fn with_gossip<F: Send + 'static>(&self, with: F)
157
		where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext<Block>)
158
159
160
	{
		unimplemented!()
	}
161
162

	fn with_spec<F: Send + 'static>(&self, with: F)
163
		where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext<Block>)
164
165
166
167
	{
		let mut context = TestContext::default();
		let res = with(&mut *self.proto.lock(), &mut context);
		// TODO: send context to worker for message routing.
168
		// https://github.com/paritytech/polkadot/issues/215
169
170
171
172
173
174
		res
	}
}

#[derive(Default)]
struct ApiData {
Gav Wood's avatar
Gav Wood committed
175
	validators: Vec<ValidatorId>,
176
177
	duties: Vec<Chain>,
	active_parachains: Vec<ParaId>,
178
	ingress: HashMap<ParaId, StructuredUnroutedIngress>,
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
}

#[derive(Default, Clone)]
struct TestApi {
	data: Arc<Mutex<ApiData>>,
}

struct RuntimeApi {
	data: Arc<Mutex<ApiData>>,
}

impl ProvideRuntimeApi for TestApi {
	type Api = RuntimeApi;

	fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> {
		RuntimeApi { data: self.data.clone() }.into()
	}
}

impl Core<Block> for RuntimeApi {
199
	fn Core_version_runtime_api_impl(
200
201
202
203
204
205
206
207
208
		&self,
		_: &BlockId,
		_: ExecutionContext,
		_: Option<()>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<RuntimeVersion>> {
		unimplemented!("Not required for testing!")
	}

209
	fn Core_execute_block_runtime_api_impl(
210
211
212
213
214
215
216
217
218
		&self,
		_: &BlockId,
		_: ExecutionContext,
		_: Option<(Block)>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<()>> {
		unimplemented!("Not required for testing!")
	}

219
	fn Core_initialize_block_runtime_api_impl(
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
		&self,
		_: &BlockId,
		_: ExecutionContext,
		_: Option<&Header>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<()>> {
		unimplemented!("Not required for testing!")
	}
}

impl ApiExt<Block> for RuntimeApi {
	fn map_api_result<F: FnOnce(&Self) -> Result<R, E>, R, E>(
		&self,
		_: F
	) -> Result<R, E> {
		unimplemented!("Not required for testing!")
	}

	fn runtime_version_at(&self, _: &BlockId) -> ClientResult<RuntimeVersion> {
		unimplemented!("Not required for testing!")
	}
241
242
243
244
245
246

	fn record_proof(&mut self) { }

	fn extract_proof(&mut self) -> Option<Vec<Vec<u8>>> {
		None
	}
247
248
249
}

impl ParachainHost<Block> for RuntimeApi {
250
	fn ParachainHost_validators_runtime_api_impl(
251
252
253
254
255
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<()>,
		_: Vec<u8>,
Gav Wood's avatar
Gav Wood committed
256
	) -> ClientResult<NativeOrEncoded<Vec<ValidatorId>>> {
257
258
259
		Ok(NativeOrEncoded::Native(self.data.lock().validators.clone()))
	}

260
	fn ParachainHost_duty_roster_runtime_api_impl(
261
262
263
264
265
266
267
268
269
270
271
272
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<()>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<DutyRoster>> {

		Ok(NativeOrEncoded::Native(DutyRoster {
			validator_duty: self.data.lock().duties.clone(),
		}))
	}

273
	fn ParachainHost_active_parachains_runtime_api_impl(
274
275
276
277
278
279
280
281
282
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<()>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<Vec<ParaId>>> {
		Ok(NativeOrEncoded::Native(self.data.lock().active_parachains.clone()))
	}

283
	fn ParachainHost_parachain_head_runtime_api_impl(
284
285
286
287
288
289
290
291
292
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<ParaId>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<Option<Vec<u8>>>> {
		Ok(NativeOrEncoded::Native(Some(Vec::new())))
	}

293
	fn ParachainHost_parachain_code_runtime_api_impl(
294
295
296
297
298
299
300
301
302
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		_: Option<ParaId>,
		_: Vec<u8>,
	) -> ClientResult<NativeOrEncoded<Option<Vec<u8>>>> {
		Ok(NativeOrEncoded::Native(Some(Vec::new())))
	}

303
	fn ParachainHost_ingress_runtime_api_impl(
304
305
306
307
308
		&self,
		_at: &BlockId,
		_: ExecutionContext,
		id: Option<ParaId>,
		_: Vec<u8>,
309
	) -> ClientResult<NativeOrEncoded<Option<StructuredUnroutedIngress>>> {
310
311
312
313
314
		let id = id.unwrap();
		Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned()))
	}
}

315
type TestValidationNetwork = crate::validation::ValidationNetwork<
316
317
318
319
320
321
322
323
324
	TestApi,
	NeverExit,
	TestNetwork,
	TaskExecutor,
>;

struct Built {
	gossip: GossipRouter,
	api_handle: Arc<Mutex<ApiData>>,
325
	networks: Vec<TestValidationNetwork>,
326
327
328
329
330
331
332
333
334
335
336
337
338
}

fn build_network(n: usize, executor: TaskExecutor) -> Built {
	let (gossip_router, gossip_handle) = make_gossip();
	let api_handle = Arc::new(Mutex::new(Default::default()));
	let runtime_api = Arc::new(TestApi { data: api_handle.clone() });

	let networks = (0..n).map(|_| {
		let net = Arc::new(TestNetwork {
			proto: Arc::new(Mutex::new(PolkadotProtocol::new(None))),
			gossip: gossip_handle.clone(),
		});

339
340
		let message_val = crate::gossip::RegisteredMessageValidator::new_test(
			|_hash: &_| Some(crate::gossip::Known::Leaf),
341
			Box::new(|_, _| {}),
342
343
		);

344
		TestValidationNetwork::new(
345
346
			net,
			NeverExit,
347
			message_val,
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
			runtime_api.clone(),
			executor.clone(),
		)
	});

	let networks: Vec<_> = networks.collect();

	Built {
		gossip: gossip_router,
		api_handle,
		networks,
	}
}

#[derive(Default)]
struct IngressBuilder {
	egress: HashMap<(ParaId, ParaId), Vec<Vec<u8>>>,
}

impl IngressBuilder {
	fn add_messages(&mut self, source: ParaId, messages: &[OutgoingMessage]) {
		for message in messages {
			let target = message.target;
			self.egress.entry((source, target)).or_insert_with(Vec::new).push(message.data.clone());
		}
	}

375
	fn build(self) -> HashMap<ParaId, BlockIngressRoots> {
376
377
378
		let mut map = HashMap::new();
		for ((source, target), messages) in self.egress {
			map.entry(target).or_insert_with(Vec::new)
379
				.push((source, polkadot_validation::message_queue_root(&messages)));
380
381
382
383
384
385
		}

		for roots in map.values_mut() {
			roots.sort_by_key(|&(para_id, _)| para_id);
		}

386
		map.into_iter().map(|(k, v)| (k, BlockIngressRoots(v))).collect()
387
388
389
	}
}

Gav Wood's avatar
Gav Wood committed
390
fn make_table(data: &ApiData, local_key: &AuthorityKeyring, parent_hash: Hash) -> Arc<SharedTable> {
391
392
393
	use ::av_store::Store;

	let store = Store::new_in_memory();
394
	let (group_info, _) = ::polkadot_validation::make_group_info(
395
		DutyRoster { validator_duty: data.duties.clone() },
Gav Wood's avatar
Gav Wood committed
396
397
		&data.validators, // only possible as long as parachain crypto === aura crypto
		SessionKey::from(*local_key)
398
399
400
	).unwrap();

	Arc::new(SharedTable::new(
401
		data.validators.as_slice(),
402
403
404
405
		group_info,
		Arc::new(local_key.pair()),
		parent_hash,
		store,
406
		None,
407
408
	))
}