lib.rs 24.6 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Polkadot-specific network implementation.
//!
19
20
//! This manages routing for parachain statements, parachain block and outgoing message
//! data fetching, communication between collators and validators, and more.
21

22
mod collator_pool;
23
mod local_collations;
24
mod router;
25
pub mod validation;
26
pub mod gossip;
27

28
use codec::{Decode, Encode};
29
use futures::channel::oneshot;
30
use futures::prelude::*;
31
use polkadot_primitives::{Block, Hash, Header};
32
use polkadot_primitives::parachain::{
33
34
	Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
	StructuredUnroutedIngress, ValidatorId, OutgoingMessages, ErasureChunk,
35
};
36
use sc_network::{
37
	PeerId, RequestId, Context, StatusMessage as GenericFullStatus,
38
	specialization::NetworkSpecialization as Specialization,
Gavin Wood's avatar
Gavin Wood committed
39
};
40
use sc_network_gossip::TopicNotification;
41
use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey};
42
use self::collator_pool::{CollatorPool, Role, Action};
43
use self::local_collations::LocalCollations;
44
use log::{trace, debug, warn};
45
46

use std::collections::{HashMap, HashSet};
47
48
use std::pin::Pin;
use std::task::{Context as PollContext, Poll};
49

50
use crate::gossip::{GossipMessage, ErasureChunkMessage, RegisteredMessageValidator};
51

52
53
54
#[cfg(test)]
mod tests;

55
mod cost {
56
57
58
59
60
61
62
63
64
	use sc_network::ReputationChange as Rep;
	pub(super) const UNEXPECTED_MESSAGE: Rep = Rep::new(-200, "Polkadot: Unexpected message");
	pub(super) const UNEXPECTED_ROLE: Rep = Rep::new(-200, "Polkadot: Unexpected role");
	pub(super) const INVALID_FORMAT: Rep = Rep::new(-200, "Polkadot: Bad message");

	pub(super) const UNKNOWN_PEER: Rep = Rep::new(-50, "Polkadot: Unknown peer");
	pub(super) const COLLATOR_ALREADY_KNOWN: Rep = Rep::new( -100, "Polkadot: Known collator");
	pub(super) const BAD_COLLATION: Rep = Rep::new(-1000, "Polkadot: Bad collation");
	pub(super) const BAD_POV_BLOCK: Rep = Rep::new(-1000, "Polkadot: Bad POV block");
65
66
67
}

mod benefit {
68
69
70
71
72
73
74
75
	use sc_network::ReputationChange as Rep;
	pub(super) const EXPECTED_MESSAGE: Rep = Rep::new(20, "Polkadot: Expected message");
	pub(super) const VALID_FORMAT: Rep = Rep::new(20, "Polkadot: Valid message format");

	pub(super) const KNOWN_PEER: Rep = Rep::new(5, "Polkadot: Known peer");
	pub(super) const NEW_COLLATOR: Rep = Rep::new(10, "Polkadot: New collator");
	pub(super) const GOOD_COLLATION: Rep = Rep::new(100, "Polkadot: Good collation");
	pub(super) const GOOD_POV_BLOCK: Rep = Rep::new(100, "Polkadot: Good POV block");
76
77
}

78
79
80
type FullStatus = GenericFullStatus<Block>;

/// Specialization of the network service for the polkadot protocol.
81
pub type PolkadotNetworkService = sc_network::NetworkService<Block, PolkadotProtocol, Hash>;
82
83
84
85
86
87
88
89
90

/// Basic functionality that a network has to fulfill.
pub trait NetworkService: Send + Sync + 'static {
	/// Get a stream of gossip messages for a given hash.
	fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream;

	/// Gossip a message on given topic.
	fn gossip_message(&self, topic: Hash, message: GossipMessage);

91
92
	/// Send a message to a specific peer we're connected to.
	fn send_message(&self, who: PeerId, message: GossipMessage);
93
94
95

	/// Execute a closure with the polkadot protocol.
	fn with_spec<F: Send + 'static>(&self, with: F)
96
		where Self: Sized, F: FnOnce(&mut PolkadotProtocol, &mut dyn Context<Block>);
97
98
}

99
100
101
102
103
104
105
/// This is a newtype that implements a [`ProvideGossipMessages`] shim trait.
///
/// For any wrapped [`NetworkService`] type it implements a [`ProvideGossipMessages`].
/// For more details see documentation of [`ProvideGossipMessages`].
///
/// [`NetworkService`]: ./trait.NetworkService.html
/// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html
106
107
#[derive(Clone)]
pub struct AvailabilityNetworkShim(pub RegisteredMessageValidator);
108

109
impl av_store::ProvideGossipMessages for AvailabilityNetworkShim {
110
	fn gossip_messages_for(&self, topic: Hash)
111
		-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
112
	{
113
114
		self.0.gossip_messages_for(topic)
			.filter_map(|(msg, _)| async move {
115
				match msg {
116
117
118
					GossipMessage::ErasureChunk(chunk) => {
						Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk))
					},
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
					_ => None,
				}
			})
			.boxed()
	}

	fn gossip_erasure_chunk(
		&self,
		relay_parent: Hash,
		candidate_hash: Hash,
		erasure_root: Hash,
		chunk: ErasureChunk
	) {
		let topic = av_store::erasure_coding_topic(relay_parent, erasure_root, chunk.index);
		self.0.gossip_message(
			topic,
			GossipMessage::ErasureChunk(ErasureChunkMessage {
				chunk,
				relay_parent,
				candidate_hash,
			})
		)
	}
}

144
145
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
146
	topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>,
147
148
149
150
}

impl GossipMessageStream {
	/// Create a new instance with the given topic stream.
151
	pub fn new(topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>) -> Self {
152
		Self {
153
			topic_stream,
154
155
156
157
158
159
160
		}
	}
}

impl Stream for GossipMessageStream {
	type Item = (GossipMessage, Option<PeerId>);

161
162
163
	fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll<Option<Self::Item>> {
		let this = Pin::into_inner(self);

164
		loop {
165
166
167
168
			let msg = match Pin::new(&mut this.topic_stream).poll_next(cx) {
				Poll::Ready(Some(msg)) => msg,
				Poll::Ready(None) => return Poll::Ready(None),
				Poll::Pending => return Poll::Pending,
169
170
171
172
			};

			debug!(target: "validation", "Processing statement for live validation leaf-work");
			if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) {
173
				return Poll::Ready(Some((gmsg, msg.sender)))
174
175
176
177
			}
		}
	}
}
178
179

/// Status of a Polkadot node.
180
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
181
pub struct Status {
Gav Wood's avatar
Gav Wood committed
182
	collating_for: Option<(CollatorId, ParaId)>,
183
184
}

185
struct PoVBlockRequest {
186
	attempted_peers: HashSet<ValidatorId>,
187
	validation_leaf: Hash,
188
189
	candidate_hash: Hash,
	block_data_hash: Hash,
190
	sender: oneshot::Sender<PoVBlock>,
191
	canon_roots: StructuredUnroutedIngress,
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
}

impl PoVBlockRequest {
	// Attempt to process a response. If the provided block is invalid,
	// this returns an error result containing the unmodified request.
	//
	// If `Ok(())` is returned, that indicates that the request has been processed.
	fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> {
		if pov_block.block_data.hash() != self.block_data_hash {
			return Err(self);
		}

		match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) {
			Ok(()) => {
				let _ = self.sender.send(pov_block);
				Ok(())
			}
			Err(_) => Err(self)
		}
	}
212
213
}

214
215
216
217
218
// ensures collator-protocol messages are sent in correct order.
// session key must be sent before collator role.
enum CollatorState {
	Fresh,
	RolePending(Role),
219
	Primed(Option<Role>),
220
221
222
}

impl CollatorState {
223
224
	fn send_key<F: FnMut(Message)>(&mut self, key: ValidatorId, mut f: F) {
		f(Message::ValidatorId(key));
225
		if let CollatorState::RolePending(role) = *self {
226
			f(Message::CollatorRole(role));
227
			*self = CollatorState::Primed(Some(role));
228
229
230
231
		}
	}

	fn set_role<F: FnMut(Message)>(&mut self, role: Role, mut f: F) {
232
		if let CollatorState::Primed(ref mut r) = *self {
233
			f(Message::CollatorRole(role));
234
			*r = Some(role);
235
236
237
238
		} else {
			*self = CollatorState::RolePending(role);
		}
	}
239
240
241
242
243
244
245
246

	fn role(&self) -> Option<Role> {
		match *self {
			CollatorState::Fresh => None,
			CollatorState::RolePending(role) => Some(role),
			CollatorState::Primed(role) => role,
		}
	}
247
248
}

249
struct PeerInfo {
Gav Wood's avatar
Gav Wood committed
250
251
	collating_for: Option<(CollatorId, ParaId)>,
	validator_keys: RecentValidatorIds,
252
	claimed_validator: bool,
253
	collator_state: CollatorState,
254
255
}

256
257
258
impl PeerInfo {
	fn should_send_key(&self) -> bool {
		self.claimed_validator || self.collating_for.is_some()
259
260
261
262
	}
}

/// Polkadot-specific messages.
263
#[derive(Debug, Encode, Decode)]
264
pub enum Message {
265
266
	/// As a validator, tell the peer your current session key.
	// TODO: do this with a cryptographic proof of some kind
267
	// https://github.com/paritytech/polkadot/issues/47
268
	ValidatorId(ValidatorId),
269
270
271
272
	/// Requesting parachain proof-of-validation block (relay_parent, candidate_hash).
	RequestPovBlock(RequestId, Hash, Hash),
	/// Provide requested proof-of-validation block data by candidate hash or nothing if unknown.
	PovBlock(RequestId, Option<PoVBlock>),
273
274
275
276
	/// Tell a collator their role.
	CollatorRole(Role),
	/// A collation provided by a peer. Relay parent and collation.
	Collation(Hash, Collation),
277
278
}

279
fn send_polkadot_message(ctx: &mut dyn Context<Block>, to: PeerId, message: Message) {
280
	trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message);
281
	let encoded = message.encode();
282
	ctx.send_chain_specific(to, encoded)
283
284
285
286
}

/// Polkadot protocol attachment for substrate.
pub struct PolkadotProtocol {
287
	peers: HashMap<PeerId, PeerInfo>,
Gav Wood's avatar
Gav Wood committed
288
	collating_for: Option<(CollatorId, ParaId)>,
289
	collators: CollatorPool,
290
	validators: HashMap<ValidatorId, PeerId>,
291
	local_collations: LocalCollations<Collation>,
292
	live_validation_leaves: LiveValidationLeaves,
293
294
	in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>,
	pending: Vec<PoVBlockRequest>,
295
	availability_store: Option<av_store::Store>,
296
297
298
299
300
	next_req_id: u64,
}

impl PolkadotProtocol {
	/// Instantiate a polkadot protocol handler.
Gav Wood's avatar
Gav Wood committed
301
	pub fn new(collating_for: Option<(CollatorId, ParaId)>) -> Self {
302
303
		PolkadotProtocol {
			peers: HashMap::new(),
304
			collators: CollatorPool::new(),
305
306
307
			collating_for,
			validators: HashMap::new(),
			local_collations: LocalCollations::new(),
308
			live_validation_leaves: LiveValidationLeaves::new(),
309
310
			in_flight: HashMap::new(),
			pending: Vec::new(),
311
			availability_store: None,
312
313
314
315
316
			next_req_id: 1,
		}
	}

	/// Fetch block data by candidate receipt.
317
318
	fn fetch_pov_block(
		&mut self,
319
		ctx: &mut dyn Context<Block>,
320
321
		candidate: &CandidateReceipt,
		relay_parent: Hash,
322
		canon_roots: StructuredUnroutedIngress,
323
	) -> oneshot::Receiver<PoVBlock> {
324
325
		let (tx, rx) = oneshot::channel();

326
		self.pending.push(PoVBlockRequest {
327
			attempted_peers: Default::default(),
328
			validation_leaf: relay_parent,
329
330
331
			candidate_hash: candidate.hash(),
			block_data_hash: candidate.block_data_hash,
			sender: tx,
332
			canon_roots,
333
334
335
336
337
338
		});

		self.dispatch_pending_requests(ctx);
		rx
	}

339
340
	/// Note new leaf to do validation work at
	fn new_validation_leaf_work(
341
		&mut self,
342
		ctx: &mut dyn Context<Block>,
343
344
		params: validation::LeafWorkParams,
	) -> validation::LiveValidationLeaf {
345

346
347
		let (work, new_local) = self.live_validation_leaves
			.new_validation_leaf(params);
348
349

		if let Some(new_local) = new_local {
350
			for (id, peer_data) in self.peers.iter_mut()
351
				.filter(|&(_, ref info)| info.should_send_key())
352
			{
Gav Wood's avatar
Gav Wood committed
353
				peer_data.collator_state.send_key(new_local.clone(), |msg| send_polkadot_message(
354
					ctx,
355
					id.clone(),
356
357
					msg
				));
358
359
			}
		}
360

361
		work
362
	}
363

364
365
	// true indicates that it was removed actually.
	fn remove_validation_session(&mut self, parent_hash: Hash) -> bool {
366
		self.live_validation_leaves.remove(parent_hash)
367
368
	}

369
	fn dispatch_pending_requests(&mut self, ctx: &mut dyn Context<Block>) {
370
		let mut new_pending = Vec::new();
371
372
373
374
		let validator_keys = &mut self.validators;
		let next_req_id = &mut self.next_req_id;
		let in_flight = &mut self.in_flight;

375
		for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) {
376
			let parent = pending.validation_leaf;
377
			let c_hash = pending.candidate_hash;
378

379
			let still_pending = self.live_validation_leaves.with_pov_block(&parent, &c_hash, |x| match x {
380
381
				Ok(data @ &_) => {
					// answer locally.
382
					let _ = pending.sender.send(data.clone());
383
					None
384
				}
385
386
				Err(Some(known_keys)) => {
					let next_peer = known_keys.iter()
387
						.filter_map(|x| validator_keys.get(x).map(|id| (x.clone(), id.clone())))
Gav Wood's avatar
Gav Wood committed
388
						.find(|&(ref key, _)| pending.attempted_peers.insert(key.clone()))
389
390
391
392
393
394
395
396
397
						.map(|(_, id)| id);

					// dispatch to peer
					if let Some(who) = next_peer {
						let req_id = *next_req_id;
						*next_req_id += 1;

						send_polkadot_message(
							ctx,
398
							who.clone(),
399
							Message::RequestPovBlock(req_id, parent, c_hash),
400
401
402
403
404
405
406
407
						);

						in_flight.insert((req_id, who), pending);

						None
					} else {
						Some(pending)
					}
408
				}
409
				Err(None) => None, // no such known validation leaf-work. prune out.
410
			});
411

412
413
414
			if let Some(pending) = still_pending {
				new_pending.push(pending);
			}
415
416
417
418
419
		}

		self.pending = new_pending;
	}

420
	fn on_polkadot_message(&mut self, ctx: &mut dyn Context<Block>, who: PeerId, msg: Message) {
Gav Wood's avatar
Gav Wood committed
421
		trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg);
422
		match msg {
423
			Message::ValidatorId(key) => self.on_session_key(ctx, who, key),
424
			Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => {
425
				let pov_block = self.live_validation_leaves.with_pov_block(
426
427
428
429
430
431
432
433
					&relay_parent,
					&candidate_hash,
					|res| res.ok().map(|b| b.clone()),
				);

				send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block));
			}
			Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data),
Gav Wood's avatar
Gav Wood committed
434
435
			Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation),
			Message::CollatorRole(role) => self.on_new_role(ctx, who, role),
436
437
438
		}
	}

439
	fn on_session_key(&mut self, ctx: &mut dyn Context<Block>, who: PeerId, key: ValidatorId) {
440
		{
Gav Wood's avatar
Gav Wood committed
441
			let info = match self.peers.get_mut(&who) {
442
443
				Some(peer) => peer,
				None => {
Gav Wood's avatar
Gav Wood committed
444
					trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who);
445
446
447
448
449
					return
				}
			};

			if !info.claimed_validator {
450
				ctx.report_peer(who, cost::UNEXPECTED_MESSAGE);
451
452
				return;
			}
453
			ctx.report_peer(who.clone(), benefit::EXPECTED_MESSAGE);
454

455
			let local_collations = &mut self.local_collations;
Gav Wood's avatar
Gav Wood committed
456
			let new_collations = match info.validator_keys.insert(key.clone()) {
457
458
459
460
				InsertedRecentKey::AlreadyKnown => Vec::new(),
				InsertedRecentKey::New(Some(old_key)) => {
					self.validators.remove(&old_key);
					local_collations.fresh_key(&old_key, &key)
461
				}
462
				InsertedRecentKey::New(None) => info.collator_state.role()
Gav Wood's avatar
Gav Wood committed
463
					.map(|r| local_collations.note_validator_role(key.clone(), r))
464
465
					.unwrap_or_else(Vec::new),
			};
466

467
468
469
			for (relay_parent, collation) in new_collations {
				send_polkadot_message(
					ctx,
470
					who.clone(),
471
472
					Message::Collation(relay_parent, collation),
				)
473
			}
474

Gav Wood's avatar
Gav Wood committed
475
			self.validators.insert(key, who);
476
		}
477
478

		self.dispatch_pending_requests(ctx);
479
480
	}

481
482
	fn on_pov_block(
		&mut self,
483
		ctx: &mut dyn Context<Block>,
484
485
486
487
		who: PeerId,
		req_id: RequestId,
		pov_block: Option<PoVBlock>,
	) {
488
		match self.in_flight.remove(&(req_id, who.clone())) {
489
			Some(mut req) => {
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
				match pov_block {
					Some(pov_block) => {
						match req.process_response(pov_block) {
							Ok(()) => {
								ctx.report_peer(who, benefit::GOOD_POV_BLOCK);
								return;
							}
							Err(r) => {
								ctx.report_peer(who, cost::BAD_POV_BLOCK);
								req = r;
							}
						}
					},
					None => {
						ctx.report_peer(who, benefit::EXPECTED_MESSAGE);
505
506
507
508
509
510
					}
				}

				self.pending.push(req);
				self.dispatch_pending_requests(ctx);
			}
511
			None => ctx.report_peer(who, cost::UNEXPECTED_MESSAGE),
512
513
514
515
		}
	}

	// when a validator sends us (a collator) a new role.
516
	fn on_new_role(&mut self, ctx: &mut dyn Context<Block>, who: PeerId, role: Role) {
517
		let info = match self.peers.get_mut(&who) {
518
519
			Some(peer) => peer,
			None => {
Gav Wood's avatar
Gav Wood committed
520
				trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who);
521
522
523
524
				return
			}
		};

525
526
		debug!(target: "p_net", "New collator role {:?} from {}", role, who);

527
		if info.validator_keys.as_slice().is_empty() {
528
			ctx.report_peer(who, cost::UNEXPECTED_ROLE)
529
530
531
532
533
534
535
536
537
		} else {
			// update role for all saved session keys for this validator.
			let local_collations = &mut self.local_collations;
			for (relay_parent, collation) in info.validator_keys
				.as_slice()
				.iter()
				.cloned()
				.flat_map(|k| local_collations.note_validator_role(k, role))
			{
538
				debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent);
539
540
				send_polkadot_message(
					ctx,
541
					who.clone(),
542
543
					Message::Collation(relay_parent, collation),
				)
544
			}
545
546
		}
	}
547
548
549
550
551

	/// Convert the given `CollatorId` to a `PeerId`.
	pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
		self.collators.collator_id_to_peer_id(collator_id)
	}
552
553
554
555
}

impl Specialization<Block> for PolkadotProtocol {
	fn status(&self) -> Vec<u8> {
556
		Status { collating_for: self.collating_for.clone() }.encode()
557
558
	}

559
	fn on_connect(&mut self, ctx: &mut dyn Context<Block>, who: PeerId, status: FullStatus) {
560
561
		let local_status = Status::decode(&mut &status.chain_status[..])
			.unwrap_or_else(|_| Status { collating_for: None });
562

563
		let validator = status.roles.contains(sc_network::config::Roles::AUTHORITY);
564
565

		let mut peer_info = PeerInfo {
Gav Wood's avatar
Gav Wood committed
566
			collating_for: local_status.collating_for.clone(),
567
			validator_keys: Default::default(),
568
569
570
571
			claimed_validator: validator,
			collator_state: CollatorState::Fresh,
		};

572
		if let Some((ref acc_id, ref para_id)) = local_status.collating_for {
573
			if self.collator_peer(acc_id.clone()).is_some() {
574
				ctx.report_peer(who, cost::COLLATOR_ALREADY_KNOWN);
575
576
				return
			}
577
			ctx.report_peer(who.clone(), benefit::NEW_COLLATOR);
578

579
580
581
582
583
			let collator_role = self.collators.on_new_collator(
				acc_id.clone(),
				para_id.clone(),
				who.clone(),
			);
584
585

			peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message(
586
				ctx,
587
				who.clone(),
588
589
				msg,
			));
590
591
		}

592
593
		// send session keys.
		if peer_info.should_send_key() {
594
			for local_session_key in self.live_validation_leaves.recent_keys() {
Gav Wood's avatar
Gav Wood committed
595
				peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message(
596
					ctx,
597
					who.clone(),
598
599
600
					msg,
				));
			}
601
602
		}

603
		self.peers.insert(who, peer_info);
604
605
606
		self.dispatch_pending_requests(ctx);
	}

607
	fn on_disconnect(&mut self, ctx: &mut dyn Context<Block>, who: PeerId) {
Gav Wood's avatar
Gav Wood committed
608
		if let Some(info) = self.peers.remove(&who) {
609
			if let Some((acc_id, _)) = info.collating_for {
610
				let new_primary = self.collators.on_disconnect(acc_id)
611
					.and_then(|new_primary| self.collator_peer(new_primary));
612

613
614
				if let Some((new_primary, primary_info)) = new_primary {
					primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message(
615
						ctx,
616
						new_primary.clone(),
617
618
						msg,
					));
619
620
621
				}
			}

622
623
624
			for key in info.validator_keys.as_slice().iter() {
				self.validators.remove(key);
				self.local_collations.on_disconnect(key);
625
626
627
628
629
			}

			{
				let pending = &mut self.pending;
				self.in_flight.retain(|&(_, ref peer), val| {
Gav Wood's avatar
Gav Wood committed
630
					let retain = peer != &who;
631
					if !retain {
632
						// swap with a dummy value which will be dropped immediately.
633
						let (sender, _) = oneshot::channel();
634
						pending.push(::std::mem::replace(val, PoVBlockRequest {
635
							attempted_peers: Default::default(),
636
							validation_leaf: Default::default(),
637
638
							candidate_hash: Default::default(),
							block_data_hash: Default::default(),
639
							canon_roots: StructuredUnroutedIngress(Vec::new()),
640
641
642
643
644
645
646
647
648
649
650
							sender,
						}));
					}

					retain
				});
			}
			self.dispatch_pending_requests(ctx);
		}
	}

651
652
653
654
	fn on_message(
		&mut self,
		ctx: &mut dyn Context<Block>,
		who: PeerId,
655
		message: Vec<u8>,
656
	) {
657
		match Message::decode(&mut &message[..]) {
658
			Ok(msg) => {
659
660
661
				ctx.report_peer(who.clone(), benefit::VALID_FORMAT);
				self.on_polkadot_message(ctx, who, msg)
			},
662
			Err(_) => {
663
664
				trace!(target: "p_net", "Bad message from {}", who);
				ctx.report_peer(who, cost::INVALID_FORMAT);
665
666
667
668
			}
		}
	}

669
	fn maintain_peers(&mut self, ctx: &mut dyn Context<Block>) {
670
		self.collators.collect_garbage(None);
671
		self.local_collations.collect_garbage(None);
672
		self.dispatch_pending_requests(ctx);
673
674
675
676

		for collator_action in self.collators.maintain_peers() {
			match collator_action {
				Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator),
677
678
				Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) {
					info.collator_state.set_role(role, |msg| send_polkadot_message(
679
						ctx,
680
						collator.clone(),
681
682
						msg,
					))
683
684
685
686
687
				},
			}
		}
	}

688
	fn on_block_imported(&mut self, _ctx: &mut dyn Context<Block>, hash: Hash, header: &Header) {
689
		self.collators.collect_garbage(Some(&hash));
690
		self.local_collations.collect_garbage(Some(&header.parent_hash));
691
692
693
694
695
	}
}

impl PolkadotProtocol {
	// we received a collation from a peer
696
697
698
699
700
701
702
	fn on_collation(
		&mut self,
		ctx: &mut dyn Context<Block>,
		from: PeerId,
		relay_parent: Hash,
		collation: Collation
	) {
703
704
		let collation_para = collation.info.parachain_index;
		let collated_acc = collation.info.collator.clone();
705
706

		match self.peers.get(&from) {
707
			None => ctx.report_peer(from, cost::UNKNOWN_PEER),
708
709
710
711
712
713
714
			Some(peer_info) => {
				ctx.report_peer(from.clone(), benefit::KNOWN_PEER);
				match peer_info.collating_for {
					None => ctx.report_peer(from, cost::UNEXPECTED_MESSAGE),
					Some((ref acc_id, ref para_id)) => {
						ctx.report_peer(from.clone(), benefit::EXPECTED_MESSAGE);
						let structurally_valid = para_id == &collation_para && acc_id == &collated_acc;
715
						if structurally_valid && collation.info.check_signature().is_ok() {
716
717
718
719
720
721
722
							debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from);
							ctx.report_peer(from, benefit::GOOD_COLLATION);
							self.collators.on_collation(acc_id.clone(), relay_parent, collation)
						} else {
							ctx.report_peer(from, cost::INVALID_FORMAT)
						};
					}
723
724
725
726
727
728
729
				}
			},
		}
	}

	fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver<Collation> {
		let (tx, rx) = oneshot::channel();
730
		debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
731
732
733
734
735
		self.collators.await_collation(relay_parent, para_id, tx);
		rx
	}

	// get connected peer with given account ID for collation.
736
	fn collator_peer(&mut self, collator_id: CollatorId) -> Option<(PeerId, &mut PeerInfo)> {
737
738
739
		let check_info = |info: &PeerInfo| info
			.collating_for
			.as_ref()
Gav Wood's avatar
Gav Wood committed
740
			.map_or(false, |&(ref acc_id, _)| acc_id == &collator_id);
741
742

		self.peers
743
744
			.iter_mut()
			.filter(|&(_, ref info)| check_info(&**info))
745
			.map(|(who, info)| (who.clone(), info))
746
747
748
749
			.next()
	}

	// disconnect a collator by account-id.
750
	fn disconnect_bad_collator(&mut self, ctx: &mut dyn Context<Block>, collator_id: CollatorId) {
Gav Wood's avatar
Gav Wood committed
751
		if let Some((who, _)) = self.collator_peer(collator_id) {
752
			ctx.report_peer(who, cost::BAD_COLLATION)
753
754
755
756
757
758
		}
	}
}

impl PolkadotProtocol {
	/// Add a local collation and broadcast it to the necessary peers.
759
760
761
762
	///
	/// This should be called by a collator intending to get the locally-collated
	/// block into the hands of validators.
	/// It also places the outgoing message and block data in the local availability store.
763
764
	pub fn add_local_collation(
		&mut self,
765
		ctx: &mut dyn Context<Block>,
766
		relay_parent: Hash,
767
		targets: HashSet<ValidatorId>,
768
		collation: Collation,
769
		outgoing_targeted: OutgoingMessages,
770
	) -> impl Future<Output = ()> {
771
		debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
772
773
			relay_parent, collation.info.parachain_index);

774
775
		for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
			match self.validators.get(&primary) {
776
777
778
779
				Some(who) => {
					debug!(target: "p_net", "Sending local collation to {:?}", primary);
					send_polkadot_message(
						ctx,
780
						who.clone(),
781
782
783
						Message::Collation(relay_parent, cloned_collation),
					)
				},
784
785
786
				None =>
					warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary),
			}
787
		}
788

789
790
791
792
793
794
795
796
797
798
799
800
801
		let availability_store = self.availability_store.clone();
		let collation_cloned = collation.clone();

		async move {
			if let Some(availability_store) = availability_store {
				let _ = availability_store.make_available(av_store::Data {
					relay_parent,
					parachain_id: collation_cloned.info.parachain_index,
					block_data: collation_cloned.pov.block_data.clone(),
					outgoing_queues: Some(outgoing_targeted.clone().into()),
				}).await;
			}
		}
802
	}
803

804
805
806
807
	/// Give the network protocol a handle to an availability store, used for
	/// circulation of parachain data required for validation.
	pub fn register_availability_store(&mut self, availability_store: ::av_store::Store) {
		self.availability_store = Some(availability_store);
808
	}
809
}