collator_pool.rs 10.5 KB
Newer Older
Shawn Tabrizi's avatar
Shawn Tabrizi committed
1
// Copyright 2018-2020 Parity Technologies (UK) Ltd.
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Bridge between the network and consensus service for getting collations to it.

19
use codec::{Encode, Decode};
20
21
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
22
use sc_network::PeerId;
23
use futures::channel::oneshot;
24
25

use std::collections::hash_map::{HashMap, Entry};
Gavin Wood's avatar
Gavin Wood committed
26
27
use std::time::Duration;
use wasm_timer::Instant;
28
29
30
31

const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);

/// The role of the collator. Whether they're the primary or backup for this parachain.
32
#[derive(PartialEq, Debug, Clone, Copy, Encode, Decode)]
33
34
pub enum Role {
	/// Primary collators should send collations whenever it's time.
35
	Primary = 0,
36
	/// Backup collators should not.
37
38
39
	Backup = 1,
}

40
41
42
43
44
/// A maintenance action for the collator set.
#[derive(PartialEq, Debug)]
#[allow(dead_code)]
pub enum Action {
	/// Disconnect the given collator.
Gav Wood's avatar
Gav Wood committed
45
	Disconnect(CollatorId),
46
	/// Give the collator a new role.
Gav Wood's avatar
Gav Wood committed
47
	NewRole(CollatorId, Role),
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
}

struct CollationSlot {
	live_at: Instant,
	entries: SlotEntries,
}

impl CollationSlot {
	fn blank_now() -> Self {
		CollationSlot {
			live_at: Instant::now(),
			entries: SlotEntries::Blank,
		}
	}

	fn stay_alive(&self, now: Instant) -> bool {
		self.live_at + COLLATION_LIFETIME > now
	}
}

enum SlotEntries {
	Blank,
	// not queried yet
	Pending(Vec<Collation>),
	// waiting for next to arrive.
	Awaiting(Vec<oneshot::Sender<Collation>>),
}

impl SlotEntries {
	fn received_collation(&mut self, collation: Collation) {
		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
			SlotEntries::Blank => SlotEntries::Pending(vec![collation]),
			SlotEntries::Pending(mut cs) => {
				cs.push(collation);
				SlotEntries::Pending(cs)
			}
			SlotEntries::Awaiting(senders) => {
				for sender in senders {
					let _ = sender.send(collation.clone());
				}

				SlotEntries::Blank
			}
		};
	}

	fn await_with(&mut self, sender: oneshot::Sender<Collation>) {
		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
			SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]),
			SlotEntries::Awaiting(mut senders) => {
				senders.push(sender);
				SlotEntries::Awaiting(senders)
			}
			SlotEntries::Pending(mut cs) => {
				let next_collation = cs.pop().expect("empty variant is always `Blank`; qed");
				let _ = sender.send(next_collation);

				if cs.is_empty() {
					SlotEntries::Blank
				} else {
					SlotEntries::Pending(cs)
				}
			}
		};
	}
}

struct ParachainCollators {
Gav Wood's avatar
Gav Wood committed
116
117
	primary: CollatorId,
	backup: Vec<CollatorId>,
118
119
120
121
}

/// Manages connected collators and role assignments from the perspective of a validator.
pub struct CollatorPool {
122
	collators: HashMap<CollatorId, (ParaId, PeerId)>,
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
	parachain_collators: HashMap<ParaId, ParachainCollators>,
	collations: HashMap<(Hash, ParaId), CollationSlot>,
}

impl CollatorPool {
	/// Create a new `CollatorPool` object.
	pub fn new() -> Self {
		CollatorPool {
			collators: HashMap::new(),
			parachain_collators: HashMap::new(),
			collations: HashMap::new(),
		}
	}

	/// Call when a new collator is authenticated. Returns the role.
138
139
	pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId, peer_id: PeerId) -> Role {
		self.collators.insert(collator_id.clone(), (para_id, peer_id));
140
141
142
		match self.parachain_collators.entry(para_id) {
			Entry::Vacant(vacant) => {
				vacant.insert(ParachainCollators {
Gav Wood's avatar
Gav Wood committed
143
					primary: collator_id,
144
145
146
147
148
149
					backup: Vec::new(),
				});

				Role::Primary
			},
			Entry::Occupied(mut occupied) => {
Gav Wood's avatar
Gav Wood committed
150
				occupied.get_mut().backup.push(collator_id);
151
152
153
154
155
156
157
158

				Role::Backup
			}
		}
	}

	/// Called when a collator disconnects. If it was the primary, returns a new primary for that
	/// parachain.
Gav Wood's avatar
Gav Wood committed
159
	pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option<CollatorId> {
160
		self.collators.remove(&collator_id).and_then(|(para_id, _)| match self.parachain_collators.entry(para_id) {
161
162
			Entry::Vacant(_) => None,
			Entry::Occupied(mut occ) => {
Gav Wood's avatar
Gav Wood committed
163
				if occ.get().primary == collator_id {
164
165
166
167
168
169
					if occ.get().backup.is_empty() {
						occ.remove();
						None
					} else {
						let mut collators = occ.get_mut();
						collators.primary = collators.backup.pop().expect("backup non-empty; qed");
Gav Wood's avatar
Gav Wood committed
170
						Some(collators.primary.clone())
171
172
					}
				} else {
Gav Wood's avatar
Gav Wood committed
173
					let pos = occ.get().backup.iter().position(|a| a == &collator_id)
174
175
176
177
178
179
180
181
182
183
184
185
						.expect("registered collator always present in backup if not primary; qed");

					occ.get_mut().backup.remove(pos);
					None
				}
			}
		})
	}

	/// Called when a collation is received.
	/// The collator should be registered for the parachain of the collation as a precondition of this function.
	/// The collation should have been checked for integrity of signature before passing to this function.
Gav Wood's avatar
Gav Wood committed
186
	pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) {
187
		if let Some((para_id, _)) = self.collators.get(&collator_id) {
188
			debug_assert_eq!(para_id, &collation.info.parachain_index);
189

190
			// TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213)
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210

			self.collations.entry((relay_parent, para_id.clone()))
				.or_insert_with(CollationSlot::blank_now)
				.entries
				.received_collation(collation);
		}
	}

	/// Wait for a collation from a parachain.
	pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
		self.collations.entry((relay_parent, para_id))
			.or_insert_with(CollationSlot::blank_now)
			.entries
			.await_with(sender);
	}

	/// Call periodically to perform collator set maintenance.
	/// Returns a set of actions to perform on the network level.
	pub fn maintain_peers(&mut self) -> Vec<Action> {
		// TODO: rearrange periodically to new primary, evaluate based on latency etc.
211
		// https://github.com/paritytech/polkadot/issues/214
212
213
214
215
216
217
218
219
		Vec::new()
	}

	/// called when a block with given hash has been imported.
	pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) {
		let now = Instant::now();
		self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
	}
220
221
222
223
224

	/// Convert the given `CollatorId` to a `PeerId`.
	pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
		self.collators.get(collator_id).map(|ids| &ids.1)
	}
225
226
227
228
229
}

#[cfg(test)]
mod tests {
	use super::*;
230
	use sp_core::crypto::UncheckedInto;
231
232
233
	use polkadot_primitives::parachain::{
		CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
	};
234
	use futures::executor::block_on;
235

236
237
238
239
240
241
242
	fn make_pov(block_data: Vec<u8>) -> PoVBlock {
		PoVBlock {
			block_data: BlockData(block_data),
			ingress: ConsolidatedIngress(Vec::new()),
		}
	}

243
244
245
246
	#[test]
	fn disconnect_primary_gives_new_primary() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
247
248
		let bad_primary: CollatorId = [0; 32].unchecked_into();
		let good_backup: CollatorId = [1; 32].unchecked_into();
249

250
251
		assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
		assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
Gav Wood's avatar
Gav Wood committed
252
		assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone()));
253
254
255
256
257
258
259
		assert_eq!(pool.on_disconnect(good_backup), None);
	}

	#[test]
	fn disconnect_backup_removes_from_pool() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
260
261
		let primary = [0; 32].unchecked_into();
		let backup: CollatorId = [1; 32].unchecked_into();
262

263
264
		assert_eq!(pool.on_new_collator(primary, para_id.clone(), PeerId::random()), Role::Primary);
		assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
265
266
267
268
269
270
271
272
		assert_eq!(pool.on_disconnect(backup), None);
		assert!(pool.parachain_collators.get(&para_id).unwrap().backup.is_empty());
	}

	#[test]
	fn await_before_collation() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
273
		let peer_id = PeerId::random();
Gav Wood's avatar
Gav Wood committed
274
		let primary: CollatorId = [0; 32].unchecked_into();
275
276
		let relay_parent = [1; 32].into();

277
		assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), peer_id.clone()), Role::Primary);
278
279
280
281
		let (tx1, rx1) = oneshot::channel();
		let (tx2, rx2) = oneshot::channel();
		pool.await_collation(relay_parent, para_id, tx1);
		pool.await_collation(relay_parent, para_id, tx2);
Gav Wood's avatar
Gav Wood committed
282
		pool.on_collation(primary.clone(), relay_parent, Collation {
283
			info: CandidateReceipt {
284
				parachain_index: para_id,
285
				collator: primary.clone().into(),
Gav Wood's avatar
Gav Wood committed
286
				signature: Default::default(),
287
288
289
290
				head_data: HeadData(vec![1, 2, 3]),
				egress_queue_roots: vec![],
				fees: 0,
				block_data_hash: [3; 32].into(),
291
				upward_messages: Vec::new(),
292
293
				erasure_root: [1u8; 32].into(),
			}.into(),
294
			pov: make_pov(vec![4, 5, 6]),
295
296
		});

297
298
		block_on(rx1).unwrap();
		block_on(rx2).unwrap();
299
		assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id);
300
301
302
303
304
305
	}

	#[test]
	fn collate_before_await() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
306
		let primary: CollatorId = [0; 32].unchecked_into();
307
308
		let relay_parent = [1; 32].into();

309
		assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
310

Gav Wood's avatar
Gav Wood committed
311
		pool.on_collation(primary.clone(), relay_parent, Collation {
312
			info: CandidateReceipt {
313
				parachain_index: para_id,
Gav Wood's avatar
Gav Wood committed
314
315
				collator: primary,
				signature: Default::default(),
316
317
318
319
				head_data: HeadData(vec![1, 2, 3]),
				egress_queue_roots: vec![],
				fees: 0,
				block_data_hash: [3; 32].into(),
320
				upward_messages: Vec::new(),
321
322
				erasure_root: [1u8; 32].into(),
			}.into(),
323
			pov: make_pov(vec![4, 5, 6]),
324
325
326
327
		});

		let (tx, rx) = oneshot::channel();
		pool.await_collation(relay_parent, para_id, tx);
328
		block_on(rx).unwrap();
329
330
331
332
333
334
335
336
337
338
339
340
341
	}

	#[test]
	fn slot_stay_alive() {
		let slot = CollationSlot::blank_now();
		let now = slot.live_at;

		assert!(slot.stay_alive(now));
		assert!(slot.stay_alive(now + Duration::from_secs(10)));
		assert!(!slot.stay_alive(now + COLLATION_LIFETIME));
		assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10)));
	}
}