router.rs 12.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

17
//! Statement routing and validation statement table router implementation.
18
//!
19
//! During the attestation process, validators exchange statements on validity and availability
20
//! of parachain candidates.
21
//!
22
//! The `Router` in this file hooks into the underlying network to fulfill
23
//! the `TableRouter` trait from `polkadot-validation`, which is expected to call into a shared statement table
24
25
//! and dispatch evaluation work as necessary when new statements come in.

26
use sp_runtime::traits::{ProvideRuntimeApi, BlakeTwo256, Hash as HashT};
27
use polkadot_validation::{
28
	SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated
29
};
30
use polkadot_primitives::{Block, Hash};
31
use polkadot_primitives::parachain::{
32
	OutgoingMessages, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, ErasureChunk,
33
};
34
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement, ErasureChunkMessage};
35

36
use futures::prelude::*;
37
use futures::{task::SpawnExt, future::{ready, select}};
38
use parking_lot::Mutex;
39
use log::{debug, trace};
40

41
42
use std::collections::{HashMap, HashSet};
use std::io;
43
use std::sync::Arc;
44
use std::pin::Pin;
45

46
use crate::validation::{LeafWorkDataFetcher, Executor};
47
use crate::NetworkService;
48

49
50
/// Compute the gossip topic for attestations on the given parent hash.
pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
51
52
53
54
55
56
	let mut v = parent_hash.as_ref().to_vec();
	v.extend(b"attestations");

	BlakeTwo256::hash(&v[..])
}

57
/// Create a `Stream` of checked messages.
58
59
60
61
62
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
63
	impl Stream<Item=SignedStatement> {
64
65
66
67
68
	// spin up a task in the background that processes all incoming statements
	// validation has been done already by the gossip validator.
	// this will block internally until the gossip messages stream is obtained.
	network.gossip_messages_for(topic)
		.filter_map(|msg| match msg.0 {
69
70
			GossipMessage::Statement(s) => ready(Some(s.signed_statement)),
			_ => ready(None)
71
72
73
		})
}

74
/// Table routing implementation.
75
pub struct Router<P, E, N: NetworkService, T> {
76
	table: Arc<SharedTable>,
77
	attestation_topic: Hash,
78
	fetcher: LeafWorkDataFetcher<P, E, N, T>,
79
	deferred_statements: Arc<Mutex<DeferredStatements>>,
80
	message_validator: RegisteredMessageValidator,
81
82
}

83
impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
84
85
	pub(crate) fn new(
		table: Arc<SharedTable>,
86
		fetcher: LeafWorkDataFetcher<P, E, N, T>,
87
		message_validator: RegisteredMessageValidator,
88
	) -> Self {
89
		let parent_hash = fetcher.parent_hash();
90
91
		Router {
			table,
92
			fetcher,
93
			attestation_topic: attestation_topic(parent_hash),
94
			deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())),
95
			message_validator,
96
97
98
		}
	}

99
	/// Return a `Stream` of checked messages. These should be imported into the router
100
	/// with `import_statement`.
101
102
103
104
	///
	/// The returned stream will not terminate, so it is required to make sure that the stream is
	/// dropped when it is not required anymore. Otherwise, it will stick around in memory
	/// infinitely.
105
	pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement> {
106
		checked_statements(&**self.network(), self.attestation_topic)
107
108
	}

109
110
111
112
113
114
	fn parent_hash(&self) -> Hash {
		self.fetcher.parent_hash()
	}

	fn network(&self) -> &Arc<N> {
		self.fetcher.network()
115
116
117
	}
}

118
impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
119
120
121
	fn clone(&self) -> Self {
		Router {
			table: self.table.clone(),
122
			fetcher: self.fetcher.clone(),
123
			attestation_topic: self.attestation_topic,
124
			deferred_statements: self.deferred_statements.clone(),
125
			message_validator: self.message_validator.clone(),
126
127
128
129
		}
	}
}

130
impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> where
Gavin Wood's avatar
Gavin Wood committed
131
	P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
132
133
	N: NetworkService,
	T: Clone + Executor + Send + 'static,
134
	E: Future<Output=()> + Clone + Send + Unpin + 'static,
135
{
136
	/// Import a statement whose signature has been checked already.
137
	pub(crate) fn import_statement(&self, statement: SignedStatement) {
138
		trace!(target: "p_net", "importing validation statement {:?}", statement.statement);
139

140
		// defer any statements for which we haven't imported the candidate yet
141
		let c_hash = {
142
			let candidate_data = match statement.statement {
143
				GenericStatement::Candidate(ref c) => Some(c.hash()),
144
145
				GenericStatement::Valid(ref hash)
					| GenericStatement::Invalid(ref hash)
146
					=> self.table.with_candidate(hash, |c| c.map(|_| *hash)),
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
			};
			match candidate_data {
				Some(x) => x,
				None => {
					self.deferred_statements.lock().push(statement);
					return;
				}
			}
		};

		// import all statements pending on this candidate
		let (mut statements, _traces) = if let GenericStatement::Candidate(_) = statement.statement {
			self.deferred_statements.lock().get_deferred(&c_hash)
		} else {
			(Vec::new(), Vec::new())
		};

		// prepend the candidate statement.
165
		debug!(target: "validation", "Importing statements about candidate {:?}", c_hash);
166
167
168
169
170
171
172
		statements.insert(0, statement);
		let producers: Vec<_> = self.table.import_remote_statements(
			self,
			statements.iter().cloned(),
		);
		// dispatch future work as necessary.
		for (producer, statement) in producers.into_iter().zip(statements) {
173
174
			if let Some(sender) = self.table.index_to_id(statement.sender) {
				self.fetcher.knowledge().lock().note_statement(sender, &statement.statement);
175

176
177
				if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
					trace!(target: "validation", "driving statement work to completion");
178
179
180
181

					let work = select(work.boxed(), self.fetcher.exit().clone())
						.map(drop);
					let _ = self.fetcher.executor().spawn(work);
182
				}
183
184
185
186
			}
		}
	}

187
	fn create_work<D>(&self, candidate_hash: Hash, producer: ParachainWork<D>)
188
		-> impl Future<Output=()> + Send + 'static
189
		where
190
		D: Future<Output=Result<PoVBlock,io::Error>> + Send + Unpin + 'static,
191
192
	{
		let table = self.table.clone();
193
194
		let network = self.network().clone();
		let knowledge = self.fetcher.knowledge().clone();
195
		let attestation_topic = self.attestation_topic;
196
		let parent_hash = self.parent_hash();
197
		let api = self.fetcher.api().clone();
198

199
200
201
202
203
		async move {
			match producer.prime(api).validate().await {
				Ok(validated) => {
					// store the data before broadcasting statements, so other peers can fetch.
					knowledge.lock().note_candidate(
204
					candidate_hash,
205
206
					Some(validated.0.pov_block().clone()),
					validated.0.outgoing_messages().cloned(),
207
					);
208

209
210
211
					// propagate the statement.
					// consider something more targeted than gossip in the future.
					let statement = GossipStatement::new(
212
					parent_hash,
213
					match table.import_validated(validated.0) {
214
215
					None => return,
					Some(s) => s,
216
					}
217
					);
218

219
220
221
222
223
224
225
					network.gossip_message(attestation_topic, statement.into());
				},
				Err(err) => {
					debug!(target: "p_net", "Failed to produce statements: {:?}", err);
				}
			}
		}
226
227
228
	}
}

229
230
231
232
impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> where
	P::Api: ParachainHost<Block>,
	N: NetworkService,
	T: Clone + Executor + Send + 'static,
233
	E: Future<Output=()> + Clone + Send + 'static,
234
{
235
	type Error = io::Error;
236
	type FetchValidationProof = Pin<Box<dyn Future<Output = Result<PoVBlock, io::Error>> + Send>>;
237

238
239
240
241
242
243
244
245
	// We have fetched from a collator and here the receipt should have been already formed.
	fn local_collation(
		&self,
		collation: Collation,
		receipt: CandidateReceipt,
		outgoing: OutgoingMessages,
		chunks: (ValidatorIndex, &[ErasureChunk])
	) {
246
		// produce a signed statement
247
248
		let hash = receipt.hash();
		let erasure_root = receipt.erasure_root;
249
		let validated = Validated::collated_local(
250
			receipt,
251
			collation.pov.clone(),
252
			outgoing.clone(),
253
254
		);

255
256
		let statement = GossipStatement::new(
			self.parent_hash(),
257
258
259
260
			match self.table.import_validated(validated) {
				None => return,
				Some(s) => s,
			},
261
		);
262

263
		// give to network to make available.
264
		self.fetcher.knowledge().lock().note_candidate(hash, Some(collation.pov), Some(outgoing));
265
		self.network().gossip_message(self.attestation_topic, statement.into());
266
267
268
269
270
271
272
273
274
275
276
277
278
279

		for chunk in chunks.1 {
			let relay_parent = self.parent_hash();
			let message = ErasureChunkMessage {
				chunk: chunk.clone(),
				relay_parent,
				candidate_hash: hash,
			};

			self.network().gossip_message(
				av_store::erasure_coding_topic(relay_parent, erasure_root, chunk.index),
				message.into()
			);
		}
280
281
	}

282
283
	fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof {
		self.fetcher.fetch_pov_block(candidate)
284
	}
285
286
}

287
impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
288
	fn drop(&mut self) {
289
		let parent_hash = self.parent_hash();
290
		self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
291
292
293
294
295
296
	}
}

// A unique trace for valid statements issued by a validator.
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
enum StatementTrace {
297
298
	Valid(ValidatorIndex, Hash),
	Invalid(ValidatorIndex, Hash),
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
}

// helper for deferring statements whose associated candidate is unknown.
struct DeferredStatements {
	deferred: HashMap<Hash, Vec<SignedStatement>>,
	known_traces: HashSet<StatementTrace>,
}

impl DeferredStatements {
	fn new() -> Self {
		DeferredStatements {
			deferred: HashMap::new(),
			known_traces: HashSet::new(),
		}
	}

	fn push(&mut self, statement: SignedStatement) {
		let (hash, trace) = match statement.statement {
			GenericStatement::Candidate(_) => return,
Gav Wood's avatar
Gav Wood committed
318
319
			GenericStatement::Valid(hash) => (hash, StatementTrace::Valid(statement.sender.clone(), hash)),
			GenericStatement::Invalid(hash) => (hash, StatementTrace::Invalid(statement.sender.clone(), hash)),
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
		};

		if self.known_traces.insert(trace) {
			self.deferred.entry(hash).or_insert_with(Vec::new).push(statement);
		}
	}

	fn get_deferred(&mut self, hash: &Hash) -> (Vec<SignedStatement>, Vec<StatementTrace>) {
		match self.deferred.remove(hash) {
			None => (Vec::new(), Vec::new()),
			Some(deferred) => {
				let mut traces = Vec::new();
				for statement in deferred.iter() {
					let trace = match statement.statement {
						GenericStatement::Candidate(_) => continue,
Gav Wood's avatar
Gav Wood committed
335
336
						GenericStatement::Valid(hash) => StatementTrace::Valid(statement.sender.clone(), hash),
						GenericStatement::Invalid(hash) => StatementTrace::Invalid(statement.sender.clone(), hash),
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
					};

					self.known_traces.remove(&trace);
					traces.push(trace);
				}

				(deferred, traces)
			}
		}
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn deferred_statements_works() {
		let mut deferred = DeferredStatements::new();
		let hash = [1; 32].into();
Gav Wood's avatar
Gav Wood committed
357
		let sig = Default::default();
358
		let sender_index = 0;
359
360
361

		let statement = SignedStatement {
			statement: GenericStatement::Valid(hash),
362
			sender: sender_index,
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
			signature: sig,
		};

		// pre-push.
		{
			let (signed, traces) = deferred.get_deferred(&hash);
			assert!(signed.is_empty());
			assert!(traces.is_empty());
		}

		deferred.push(statement.clone());
		deferred.push(statement.clone());

		// draining: second push should have been ignored.
		{
			let (signed, traces) = deferred.get_deferred(&hash);
			assert_eq!(signed.len(), 1);

			assert_eq!(traces.len(), 1);
			assert_eq!(signed[0].clone(), statement);
383
			assert_eq!(traces[0].clone(), StatementTrace::Valid(sender_index, hash));
384
385
386
387
388
389
390
391
392
393
		}

		// after draining
		{
			let (signed, traces) = deferred.get_deferred(&hash);
			assert!(signed.is_empty());
			assert!(traces.is_empty());
		}
	}
}