lib.rs 26.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Polkadot-specific network implementation.
//!
19
20
//! This manages routing for parachain statements, parachain block and outgoing message
//! data fetching, communication between collators and validators, and more.
21

22
mod collator_pool;
23
mod local_collations;
24
mod router;
25
pub mod validation;
26
pub mod gossip;
27

28
use codec::{Decode, Encode};
Ashley's avatar
Ashley committed
29
30
use futures::channel::{oneshot, mpsc};
use futures::prelude::*;
31
use polkadot_primitives::{Block, Hash, Header};
32
use polkadot_primitives::parachain::{
33
34
	Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
	StructuredUnroutedIngress, ValidatorId, OutgoingMessages, ErasureChunk,
35
};
36
use sc_network::{
37
	PeerId, RequestId, Context, StatusMessage as GenericFullStatus,
38
	specialization::NetworkSpecialization as Specialization,
Gavin Wood's avatar
Gavin Wood committed
39
};
40
use sc_network::consensus_gossip::{
41
42
43
	self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey};
44
use self::collator_pool::{CollatorPool, Role, Action};
45
use self::local_collations::LocalCollations;
46
use log::{trace, debug, warn};
47
48

use std::collections::{HashMap, HashSet};
Ashley's avatar
Ashley committed
49
50
use std::pin::Pin;
use std::task::{Context as PollContext, Poll};
51

52
use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage, ErasureChunkMessage};
53

54
55
56
#[cfg(test)]
mod tests;

57
mod cost {
58
59
60
61
62
63
64
65
66
	use sc_network::ReputationChange as Rep;
	pub(super) const UNEXPECTED_MESSAGE: Rep = Rep::new(-200, "Polkadot: Unexpected message");
	pub(super) const UNEXPECTED_ROLE: Rep = Rep::new(-200, "Polkadot: Unexpected role");
	pub(super) const INVALID_FORMAT: Rep = Rep::new(-200, "Polkadot: Bad message");

	pub(super) const UNKNOWN_PEER: Rep = Rep::new(-50, "Polkadot: Unknown peer");
	pub(super) const COLLATOR_ALREADY_KNOWN: Rep = Rep::new( -100, "Polkadot: Known collator");
	pub(super) const BAD_COLLATION: Rep = Rep::new(-1000, "Polkadot: Bad collation");
	pub(super) const BAD_POV_BLOCK: Rep = Rep::new(-1000, "Polkadot: Bad POV block");
67
68
69
}

mod benefit {
70
71
72
73
74
75
76
77
	use sc_network::ReputationChange as Rep;
	pub(super) const EXPECTED_MESSAGE: Rep = Rep::new(20, "Polkadot: Expected message");
	pub(super) const VALID_FORMAT: Rep = Rep::new(20, "Polkadot: Valid message format");

	pub(super) const KNOWN_PEER: Rep = Rep::new(5, "Polkadot: Known peer");
	pub(super) const NEW_COLLATOR: Rep = Rep::new(10, "Polkadot: New collator");
	pub(super) const GOOD_COLLATION: Rep = Rep::new(100, "Polkadot: Good collation");
	pub(super) const GOOD_POV_BLOCK: Rep = Rep::new(100, "Polkadot: Good POV block");
78
79
}

80
81
82
type FullStatus = GenericFullStatus<Block>;

/// Specialization of the network service for the polkadot protocol.
83
pub type PolkadotNetworkService = sc_network::NetworkService<Block, PolkadotProtocol, Hash>;
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
	/// Get a stream of gossip messages for a given hash.
	fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream;

	/// Gossip a message on given topic.
	fn gossip_message(&self, topic: Hash, message: GossipMessage);

	/// Execute a closure with the gossip service.
	fn with_gossip<F: Send + 'static>(&self, with: F)
		where F: FnOnce(&mut dyn GossipService, &mut dyn Context<Block>);

	/// Execute a closure with the polkadot protocol.
	fn with_spec<F: Send + 'static>(&self, with: F)
		where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>);
}

102
103
104
105
106
107
108
109
110
111
112
113
114
/// This is a newtype that implements a [`ProvideGossipMessages`] shim trait.
///
/// For any wrapped [`NetworkService`] type it implements a [`ProvideGossipMessages`].
/// For more details see documentation of [`ProvideGossipMessages`].
///
/// [`NetworkService`]: ./trait.NetworkService.html
/// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html
pub struct AvailabilityNetworkShim<T>(pub std::sync::Arc<T>);

impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
	where T: NetworkService
{
	fn gossip_messages_for(&self, topic: Hash)
Ashley's avatar
Ashley committed
115
		-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
116
	{
Ashley's avatar
Ashley committed
117
		self.0.gossip_messages_for(topic)
118
			.filter_map(|(msg, _)| async move {
119
				match msg {
120
121
122
					GossipMessage::ErasureChunk(chunk) => {
						Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk))
					},
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
					_ => None,
				}
			})
			.boxed()
	}

	fn gossip_erasure_chunk(
		&self,
		relay_parent: Hash,
		candidate_hash: Hash,
		erasure_root: Hash,
		chunk: ErasureChunk
	) {
		let topic = av_store::erasure_coding_topic(relay_parent, erasure_root, chunk.index);
		self.0.gossip_message(
			topic,
			GossipMessage::ErasureChunk(ErasureChunkMessage {
				chunk,
				relay_parent,
				candidate_hash,
			})
		)
	}
}

impl<T> Clone for AvailabilityNetworkShim<T> {
	fn clone(&self) -> Self {
		AvailabilityNetworkShim(self.0.clone())
	}
}

154
155
156
157
158
159
160
161
162
163
164
165
166
167
impl NetworkService for PolkadotNetworkService {
	fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
		let (tx, rx) = std::sync::mpsc::channel();

		PolkadotNetworkService::with_gossip(self, move |gossip, _| {
			let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic);
			let _ = tx.send(inner_rx);
		});

		let topic_stream = match rx.recv() {
			Ok(rx) => rx,
			Err(_) => mpsc::unbounded().1, // return empty channel.
		};

Ashley's avatar
Ashley committed
168
		GossipMessageStream::new(topic_stream.boxed())
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
	}

	fn gossip_message(&self, topic: Hash, message: GossipMessage) {
		self.gossip_consensus_message(
			topic,
			POLKADOT_ENGINE_ID,
			message.encode(),
			GossipMessageRecipient::BroadcastToAll,
		);
	}

	fn with_gossip<F: Send + 'static>(&self, with: F)
		where F: FnOnce(&mut dyn GossipService, &mut dyn Context<Block>)
	{
		PolkadotNetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx))
	}

	fn with_spec<F: Send + 'static>(&self, with: F)
		where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>)
	{
		PolkadotNetworkService::with_spec(self, with)
	}
}

/// A gossip network subservice.
pub trait GossipService {
	fn send_message(&mut self, ctx: &mut dyn Context<Block>, who: &PeerId, message: ConsensusMessage);
	fn multicast(&mut self, ctx: &mut dyn Context<Block>, topic: &Hash, message: ConsensusMessage);
}

impl GossipService for consensus_gossip::ConsensusGossip<Block> {
	fn send_message(&mut self, ctx: &mut dyn Context<Block>, who: &PeerId, message: ConsensusMessage) {
		consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message)
	}

	fn multicast(&mut self, ctx: &mut dyn Context<Block>, topic: &Hash, message: ConsensusMessage) {
		consensus_gossip::ConsensusGossip::multicast(self, ctx, *topic, message, false)
	}
}

/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
Ashley's avatar
Ashley committed
211
	topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>,
212
213
214
215
}

impl GossipMessageStream {
	/// Create a new instance with the given topic stream.
Ashley's avatar
Ashley committed
216
	pub fn new(topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>) -> Self {
217
		Self {
218
			topic_stream,
219
220
221
222
		}
	}
}

Ashley's avatar
Ashley committed
223
impl Stream for GossipMessageStream {
224
225
	type Item = (GossipMessage, Option<PeerId>);

Ashley's avatar
Ashley committed
226
227
228
	fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll<Option<Self::Item>> {
		let this = Pin::into_inner(self);

229
		loop {
Ashley's avatar
Ashley committed
230
231
232
233
			let msg = match Pin::new(&mut this.topic_stream).poll_next(cx) {
				Poll::Ready(Some(msg)) => msg,
				Poll::Ready(None) => return Poll::Ready(None),
				Poll::Pending => return Poll::Pending,
234
235
236
237
			};

			debug!(target: "validation", "Processing statement for live validation leaf-work");
			if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) {
Ashley's avatar
Ashley committed
238
				return Poll::Ready(Some((gmsg, msg.sender)))
239
240
241
242
			}
		}
	}
}
243
244

/// Status of a Polkadot node.
245
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
246
pub struct Status {
Gav Wood's avatar
Gav Wood committed
247
	collating_for: Option<(CollatorId, ParaId)>,
248
249
}

250
struct PoVBlockRequest {
251
	attempted_peers: HashSet<ValidatorId>,
252
	validation_leaf: Hash,
253
254
	candidate_hash: Hash,
	block_data_hash: Hash,
Ashley's avatar
Ashley committed
255
	sender: oneshot::Sender<PoVBlock>,
256
	canon_roots: StructuredUnroutedIngress,
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
}

impl PoVBlockRequest {
	// Attempt to process a response. If the provided block is invalid,
	// this returns an error result containing the unmodified request.
	//
	// If `Ok(())` is returned, that indicates that the request has been processed.
	fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> {
		if pov_block.block_data.hash() != self.block_data_hash {
			return Err(self);
		}

		match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) {
			Ok(()) => {
				let _ = self.sender.send(pov_block);
				Ok(())
			}
			Err(_) => Err(self)
		}
	}
277
278
}

279
280
281
282
283
// ensures collator-protocol messages are sent in correct order.
// session key must be sent before collator role.
enum CollatorState {
	Fresh,
	RolePending(Role),
284
	Primed(Option<Role>),
285
286
287
}

impl CollatorState {
288
289
	fn send_key<F: FnMut(Message)>(&mut self, key: ValidatorId, mut f: F) {
		f(Message::ValidatorId(key));
290
		if let CollatorState::RolePending(role) = *self {
291
			f(Message::CollatorRole(role));
292
			*self = CollatorState::Primed(Some(role));
293
294
295
296
		}
	}

	fn set_role<F: FnMut(Message)>(&mut self, role: Role, mut f: F) {
297
		if let CollatorState::Primed(ref mut r) = *self {
298
			f(Message::CollatorRole(role));
299
			*r = Some(role);
300
301
302
303
		} else {
			*self = CollatorState::RolePending(role);
		}
	}
304
305
306
307
308
309
310
311

	fn role(&self) -> Option<Role> {
		match *self {
			CollatorState::Fresh => None,
			CollatorState::RolePending(role) => Some(role),
			CollatorState::Primed(role) => role,
		}
	}
312
313
}

314
struct PeerInfo {
Gav Wood's avatar
Gav Wood committed
315
316
	collating_for: Option<(CollatorId, ParaId)>,
	validator_keys: RecentValidatorIds,
317
	claimed_validator: bool,
318
	collator_state: CollatorState,
319
320
}

321
322
323
impl PeerInfo {
	fn should_send_key(&self) -> bool {
		self.claimed_validator || self.collating_for.is_some()
324
325
326
327
	}
}

/// Polkadot-specific messages.
328
#[derive(Debug, Encode, Decode)]
329
pub enum Message {
330
331
	/// As a validator, tell the peer your current session key.
	// TODO: do this with a cryptographic proof of some kind
332
	// https://github.com/paritytech/polkadot/issues/47
333
	ValidatorId(ValidatorId),
334
335
336
337
	/// Requesting parachain proof-of-validation block (relay_parent, candidate_hash).
	RequestPovBlock(RequestId, Hash, Hash),
	/// Provide requested proof-of-validation block data by candidate hash or nothing if unknown.
	PovBlock(RequestId, Option<PoVBlock>),
338
339
340
341
	/// Tell a collator their role.
	CollatorRole(Role),
	/// A collation provided by a peer. Relay parent and collation.
	Collation(Hash, Collation),
342
343
}

344
fn send_polkadot_message(ctx: &mut dyn Context<Block>, to: PeerId, message: Message) {
345
	trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message);
346
	let encoded = message.encode();
347
	ctx.send_chain_specific(to, encoded)
348
349
350
351
}

/// Polkadot protocol attachment for substrate.
pub struct PolkadotProtocol {
352
	peers: HashMap<PeerId, PeerInfo>,
Gav Wood's avatar
Gav Wood committed
353
	collating_for: Option<(CollatorId, ParaId)>,
354
	collators: CollatorPool,
355
	validators: HashMap<ValidatorId, PeerId>,
356
	local_collations: LocalCollations<Collation>,
357
	live_validation_leaves: LiveValidationLeaves,
358
359
	in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>,
	pending: Vec<PoVBlockRequest>,
360
	availability_store: Option<av_store::Store>,
361
362
363
364
365
	next_req_id: u64,
}

impl PolkadotProtocol {
	/// Instantiate a polkadot protocol handler.
Gav Wood's avatar
Gav Wood committed
366
	pub fn new(collating_for: Option<(CollatorId, ParaId)>) -> Self {
367
368
		PolkadotProtocol {
			peers: HashMap::new(),
369
			collators: CollatorPool::new(),
370
371
372
			collating_for,
			validators: HashMap::new(),
			local_collations: LocalCollations::new(),
373
			live_validation_leaves: LiveValidationLeaves::new(),
374
375
			in_flight: HashMap::new(),
			pending: Vec::new(),
376
			availability_store: None,
377
378
379
380
381
			next_req_id: 1,
		}
	}

	/// Fetch block data by candidate receipt.
382
383
	fn fetch_pov_block(
		&mut self,
384
		ctx: &mut dyn Context<Block>,
385
386
		candidate: &CandidateReceipt,
		relay_parent: Hash,
387
		canon_roots: StructuredUnroutedIngress,
Ashley's avatar
Ashley committed
388
389
	) -> oneshot::Receiver<PoVBlock> {
		let (tx, rx) = oneshot::channel();
390

391
		self.pending.push(PoVBlockRequest {
392
			attempted_peers: Default::default(),
393
			validation_leaf: relay_parent,
394
395
396
			candidate_hash: candidate.hash(),
			block_data_hash: candidate.block_data_hash,
			sender: tx,
397
			canon_roots,
398
399
400
401
402
403
		});

		self.dispatch_pending_requests(ctx);
		rx
	}

404
405
	/// Note new leaf to do validation work at
	fn new_validation_leaf_work(
406
		&mut self,
407
		ctx: &mut dyn Context<Block>,
408
409
		params: validation::LeafWorkParams,
	) -> validation::LiveValidationLeaf {
410

411
412
		let (work, new_local) = self.live_validation_leaves
			.new_validation_leaf(params);
413
414

		if let Some(new_local) = new_local {
415
			for (id, peer_data) in self.peers.iter_mut()
416
				.filter(|&(_, ref info)| info.should_send_key())
417
			{
Gav Wood's avatar
Gav Wood committed
418
				peer_data.collator_state.send_key(new_local.clone(), |msg| send_polkadot_message(
419
					ctx,
420
					id.clone(),
421
422
					msg
				));
423
424
			}
		}
425

426
		work
427
	}
428

429
430
	// true indicates that it was removed actually.
	fn remove_validation_session(&mut self, parent_hash: Hash) -> bool {
431
		self.live_validation_leaves.remove(parent_hash)
432
433
	}

434
	fn dispatch_pending_requests(&mut self, ctx: &mut dyn Context<Block>) {
435
		let mut new_pending = Vec::new();
436
437
438
439
		let validator_keys = &mut self.validators;
		let next_req_id = &mut self.next_req_id;
		let in_flight = &mut self.in_flight;

440
		for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) {
441
			let parent = pending.validation_leaf;
442
			let c_hash = pending.candidate_hash;
443

444
			let still_pending = self.live_validation_leaves.with_pov_block(&parent, &c_hash, |x| match x {
445
446
				Ok(data @ &_) => {
					// answer locally.
447
					let _ = pending.sender.send(data.clone());
448
					None
449
				}
450
451
				Err(Some(known_keys)) => {
					let next_peer = known_keys.iter()
452
						.filter_map(|x| validator_keys.get(x).map(|id| (x.clone(), id.clone())))
Gav Wood's avatar
Gav Wood committed
453
						.find(|&(ref key, _)| pending.attempted_peers.insert(key.clone()))
454
455
456
457
458
459
460
461
462
						.map(|(_, id)| id);

					// dispatch to peer
					if let Some(who) = next_peer {
						let req_id = *next_req_id;
						*next_req_id += 1;

						send_polkadot_message(
							ctx,
463
							who.clone(),
464
							Message::RequestPovBlock(req_id, parent, c_hash),
465
466
467
468
469
470
471
472
						);

						in_flight.insert((req_id, who), pending);

						None
					} else {
						Some(pending)
					}
473
				}
474
				Err(None) => None, // no such known validation leaf-work. prune out.
475
			});
476

477
478
479
			if let Some(pending) = still_pending {
				new_pending.push(pending);
			}
480
481
482
483
484
		}

		self.pending = new_pending;
	}

485
	fn on_polkadot_message(&mut self, ctx: &mut dyn Context<Block>, who: PeerId, msg: Message) {
Gav Wood's avatar
Gav Wood committed
486
		trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg);
487
		match msg {
488
			Message::ValidatorId(key) => self.on_session_key(ctx, who, key),
489
			Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => {
490
				let pov_block = self.live_validation_leaves.with_pov_block(
491
492
493
494
495
496
497
498
					&relay_parent,
					&candidate_hash,
					|res| res.ok().map(|b| b.clone()),
				);

				send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block));
			}
			Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data),
Gav Wood's avatar
Gav Wood committed
499
500
			Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation),
			Message::CollatorRole(role) => self.on_new_role(ctx, who, role),
501
502
503
		}
	}

504
	fn on_session_key(&mut self, ctx: &mut dyn Context<Block>, who: PeerId, key: ValidatorId) {
505
		{
Gav Wood's avatar
Gav Wood committed
506
			let info = match self.peers.get_mut(&who) {
507
508
				Some(peer) => peer,
				None => {
Gav Wood's avatar
Gav Wood committed
509
					trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who);
510
511
512
513
514
					return
				}
			};

			if !info.claimed_validator {
515
				ctx.report_peer(who, cost::UNEXPECTED_MESSAGE);
516
517
				return;
			}
518
			ctx.report_peer(who.clone(), benefit::EXPECTED_MESSAGE);
519

520
			let local_collations = &mut self.local_collations;
Gav Wood's avatar
Gav Wood committed
521
			let new_collations = match info.validator_keys.insert(key.clone()) {
522
523
524
525
				InsertedRecentKey::AlreadyKnown => Vec::new(),
				InsertedRecentKey::New(Some(old_key)) => {
					self.validators.remove(&old_key);
					local_collations.fresh_key(&old_key, &key)
526
				}
527
				InsertedRecentKey::New(None) => info.collator_state.role()
Gav Wood's avatar
Gav Wood committed
528
					.map(|r| local_collations.note_validator_role(key.clone(), r))
529
530
					.unwrap_or_else(Vec::new),
			};
531

532
533
534
			for (relay_parent, collation) in new_collations {
				send_polkadot_message(
					ctx,
535
					who.clone(),
536
537
					Message::Collation(relay_parent, collation),
				)
538
			}
539

Gav Wood's avatar
Gav Wood committed
540
			self.validators.insert(key, who);
541
		}
542
543

		self.dispatch_pending_requests(ctx);
544
545
	}

546
547
	fn on_pov_block(
		&mut self,
548
		ctx: &mut dyn Context<Block>,
549
550
551
552
		who: PeerId,
		req_id: RequestId,
		pov_block: Option<PoVBlock>,
	) {
553
		match self.in_flight.remove(&(req_id, who.clone())) {
554
			Some(mut req) => {
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
				match pov_block {
					Some(pov_block) => {
						match req.process_response(pov_block) {
							Ok(()) => {
								ctx.report_peer(who, benefit::GOOD_POV_BLOCK);
								return;
							}
							Err(r) => {
								ctx.report_peer(who, cost::BAD_POV_BLOCK);
								req = r;
							}
						}
					},
					None => {
						ctx.report_peer(who, benefit::EXPECTED_MESSAGE);
570
571
572
573
574
575
					}
				}

				self.pending.push(req);
				self.dispatch_pending_requests(ctx);
			}
576
			None => ctx.report_peer(who, cost::UNEXPECTED_MESSAGE),
577
578
579
580
		}
	}

	// when a validator sends us (a collator) a new role.
581
	fn on_new_role(&mut self, ctx: &mut dyn Context<Block>, who: PeerId, role: Role) {
582
		let info = match self.peers.get_mut(&who) {
583
584
			Some(peer) => peer,
			None => {
Gav Wood's avatar
Gav Wood committed
585
				trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who);
586
587
588
589
				return
			}
		};

590
591
		debug!(target: "p_net", "New collator role {:?} from {}", role, who);

592
		if info.validator_keys.as_slice().is_empty() {
593
			ctx.report_peer(who, cost::UNEXPECTED_ROLE)
594
595
596
597
598
599
600
601
602
		} else {
			// update role for all saved session keys for this validator.
			let local_collations = &mut self.local_collations;
			for (relay_parent, collation) in info.validator_keys
				.as_slice()
				.iter()
				.cloned()
				.flat_map(|k| local_collations.note_validator_role(k, role))
			{
603
				debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent);
604
605
				send_polkadot_message(
					ctx,
606
					who.clone(),
607
608
					Message::Collation(relay_parent, collation),
				)
609
			}
610
611
		}
	}
612
613
614
615
616

	/// Convert the given `CollatorId` to a `PeerId`.
	pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
		self.collators.collator_id_to_peer_id(collator_id)
	}
617
618
619
620
}

impl Specialization<Block> for PolkadotProtocol {
	fn status(&self) -> Vec<u8> {
621
		Status { collating_for: self.collating_for.clone() }.encode()
622
623
	}

624
	fn on_connect(&mut self, ctx: &mut dyn Context<Block>, who: PeerId, status: FullStatus) {
625
626
		let local_status = Status::decode(&mut &status.chain_status[..])
			.unwrap_or_else(|_| Status { collating_for: None });
627

628
		let validator = status.roles.contains(sc_network::config::Roles::AUTHORITY);
629
630

		let mut peer_info = PeerInfo {
Gav Wood's avatar
Gav Wood committed
631
			collating_for: local_status.collating_for.clone(),
632
			validator_keys: Default::default(),
633
634
635
636
			claimed_validator: validator,
			collator_state: CollatorState::Fresh,
		};

637
		if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
638
			if self.collator_peer(acc_id.clone()).is_some() {
639
				ctx.report_peer(who, cost::COLLATOR_ALREADY_KNOWN);
640
641
				return
			}
642
			ctx.report_peer(who.clone(), benefit::NEW_COLLATOR);
643

644
645
646
647
648
			let collator_role = self.collators.on_new_collator(
				acc_id.clone(),
				para_id.clone(),
				who.clone(),
			);
649
650

			peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message(
651
				ctx,
652
				who.clone(),
653
654
				msg,
			));
655
656
		}

657
658
		// send session keys.
		if peer_info.should_send_key() {
659
			for local_session_key in self.live_validation_leaves.recent_keys() {
Gav Wood's avatar
Gav Wood committed
660
				peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message(
661
					ctx,
662
					who.clone(),
663
664
665
					msg,
				));
			}
666
667
		}

668
		self.peers.insert(who, peer_info);
669
670
671
		self.dispatch_pending_requests(ctx);
	}

672
	fn on_disconnect(&mut self, ctx: &mut dyn Context<Block>, who: PeerId) {
Gav Wood's avatar
Gav Wood committed
673
		if let Some(info) = self.peers.remove(&who) {
674
			if let Some((acc_id, _)) = info.collating_for {
675
				let new_primary = self.collators.on_disconnect(acc_id)
676
					.and_then(|new_primary| self.collator_peer(new_primary));
677

678
679
				if let Some((new_primary, primary_info)) = new_primary {
					primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message(
680
						ctx,
681
						new_primary.clone(),
682
683
						msg,
					));
684
685
686
				}
			}

687
688
689
			for key in info.validator_keys.as_slice().iter() {
				self.validators.remove(key);
				self.local_collations.on_disconnect(key);
690
691
692
693
694
			}

			{
				let pending = &mut self.pending;
				self.in_flight.retain(|&(_, ref peer), val| {
Gav Wood's avatar
Gav Wood committed
695
					let retain = peer != &who;
696
					if !retain {
697
						// swap with a dummy value which will be dropped immediately.
Ashley's avatar
Ashley committed
698
						let (sender, _) = oneshot::channel();
699
						pending.push(::std::mem::replace(val, PoVBlockRequest {
700
							attempted_peers: Default::default(),
701
							validation_leaf: Default::default(),
702
703
							candidate_hash: Default::default(),
							block_data_hash: Default::default(),
704
							canon_roots: StructuredUnroutedIngress(Vec::new()),
705
706
707
708
709
710
711
712
713
714
715
							sender,
						}));
					}

					retain
				});
			}
			self.dispatch_pending_requests(ctx);
		}
	}

716
717
718
719
	fn on_message(
		&mut self,
		ctx: &mut dyn Context<Block>,
		who: PeerId,
720
		message: Vec<u8>,
721
	) {
722
		match Message::decode(&mut &message[..]) {
723
			Ok(msg) => {
724
725
726
				ctx.report_peer(who.clone(), benefit::VALID_FORMAT);
				self.on_polkadot_message(ctx, who, msg)
			},
727
			Err(_) => {
728
729
				trace!(target: "p_net", "Bad message from {}", who);
				ctx.report_peer(who, cost::INVALID_FORMAT);
730
731
732
733
			}
		}
	}

734
	fn maintain_peers(&mut self, ctx: &mut dyn Context<Block>) {
735
		self.collators.collect_garbage(None);
736
		self.local_collations.collect_garbage(None);
737
		self.dispatch_pending_requests(ctx);
738
739
740
741

		for collator_action in self.collators.maintain_peers() {
			match collator_action {
				Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator),
742
743
				Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) {
					info.collator_state.set_role(role, |msg| send_polkadot_message(
744
						ctx,
745
						collator.clone(),
746
747
						msg,
					))
748
749
750
751
752
				},
			}
		}
	}

753
	fn on_block_imported(&mut self, _ctx: &mut dyn Context<Block>, hash: Hash, header: &Header) {
754
		self.collators.collect_garbage(Some(&hash));
755
		self.local_collations.collect_garbage(Some(&header.parent_hash));
756
757
758
759
760
	}
}

impl PolkadotProtocol {
	// we received a collation from a peer
761
762
763
764
765
766
767
	fn on_collation(
		&mut self,
		ctx: &mut dyn Context<Block>,
		from: PeerId,
		relay_parent: Hash,
		collation: Collation
	) {
768
769
		let collation_para = collation.info.parachain_index;
		let collated_acc = collation.info.collator.clone();
770
771

		match self.peers.get(&from) {
772
			None => ctx.report_peer(from, cost::UNKNOWN_PEER),
773
774
775
776
777
778
779
			Some(peer_info) => {
				ctx.report_peer(from.clone(), benefit::KNOWN_PEER);
				match peer_info.collating_for {
					None => ctx.report_peer(from, cost::UNEXPECTED_MESSAGE),
					Some((ref acc_id, ref para_id)) => {
						ctx.report_peer(from.clone(), benefit::EXPECTED_MESSAGE);
						let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
780
						if structurally_valid && collation.info.check_signature().is_ok() {
781
782
783
784
785
786
787
							debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from);
							ctx.report_peer(from, benefit::GOOD_COLLATION);
							self.collators.on_collation(acc_id.clone(), relay_parent, collation)
						} else {
							ctx.report_peer(from, cost::INVALID_FORMAT)
						};
					}
788
789
790
791
792
				}
			},
		}
	}

Ashley's avatar
Ashley committed
793
794
	fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
		let (tx, rx) = oneshot::channel();
795
		debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
796
797
798
799
800
		self.collators.await_collation(relay_parent, para_id, tx);
		rx
	}

	// get connected peer with given account ID for collation.
801
	fn collator_peer(&mut self, collator_id: CollatorId) -> Option<(PeerId, &mut PeerInfo)> {
802
803
804
		let check_info = |info: &PeerInfo| info
			.collating_for
			.as_ref()
Gav Wood's avatar
Gav Wood committed
805
			.map_or(false, |&(ref acc_id, _)| acc_id == &collator_id);
806
807

		self.peers
808
809
			.iter_mut()
			.filter(|&(_, ref info)| check_info(&**info))
810
			.map(|(who, info)| (who.clone(), info))
811
812
813
814
			.next()
	}

	// disconnect a collator by account-id.
815
	fn disconnect_bad_collator(&mut self, ctx: &mut dyn Context<Block>, collator_id: CollatorId) {
Gav Wood's avatar
Gav Wood committed
816
		if let Some((who, _)) = self.collator_peer(collator_id) {
817
			ctx.report_peer(who, cost::BAD_COLLATION)
818
819
820
821
822
823
		}
	}
}

impl PolkadotProtocol {
	/// Add a local collation and broadcast it to the necessary peers.
824
825
826
827
	///
	/// This should be called by a collator intending to get the locally-collated
	/// block into the hands of validators.
	/// It also places the outgoing message and block data in the local availability store.
Ashley's avatar
Ashley committed
828
	pub fn add_local_collation(
829
		&mut self,
830
		ctx: &mut dyn Context<Block>,
831
		relay_parent: Hash,
832
		targets: HashSet<ValidatorId>,
833
		collation: Collation,
834
		outgoing_targeted: OutgoingMessages,
Ashley's avatar
Ashley committed
835
	) -> impl Future<Output = ()> {
836
		debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
837
838
			relay_parent, collation.info.parachain_index);

839
840
		for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
			match self.validators.get(&primary) {
841
842
843
844
				Some(who) => {
					debug!(target: "p_net", "Sending local collation to {:?}", primary);
					send_polkadot_message(
						ctx,
845
						who.clone(),
846
847
848
						Message::Collation(relay_parent, cloned_collation),
					)
				},
849
850
851
				None =>
					warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary),
			}
852
		}
Ashley's avatar
Ashley committed
853

Ashley's avatar
Ashley committed
854
855
856
857
858
859
860
861
862
863
864
865
866
		let availability_store = self.availability_store.clone();
		let collation_cloned = collation.clone();

		async move {
			if let Some(availability_store) = availability_store {
				let _ = availability_store.make_available(av_store::Data {
					relay_parent,
					parachain_id: collation_cloned.info.parachain_index,
					block_data: collation_cloned.pov.block_data.clone(),
					outgoing_queues: Some(outgoing_targeted.clone().into()),
				}).await;
			}
		}
867
	}
868

869
870
871
872
	/// Give the network protocol a handle to an availability store, used for
	/// circulation of parachain data required for validation.
	pub fn register_availability_store(&mut self, availability_store: ::av_store::Store) {
		self.availability_store = Some(availability_store);
873
	}
874
}