collator_pool.rs 10.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Bridge between the network and consensus service for getting collations to it.

19
use parity_codec::{Encode, Decode};
20
21
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
22
use substrate_network::PeerId;
23
24
25
26
27
28
29
30
use futures::sync::oneshot;

use std::collections::hash_map::{HashMap, Entry};
use std::time::{Duration, Instant};

const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);

/// The role of the collator. Whether they're the primary or backup for this parachain.
31
#[derive(PartialEq, Debug, Clone, Copy, Encode, Decode)]
32
33
pub enum Role {
	/// Primary collators should send collations whenever it's time.
34
	Primary = 0,
35
	/// Backup collators should not.
36
37
38
	Backup = 1,
}

39
40
41
42
43
/// A maintenance action for the collator set.
#[derive(PartialEq, Debug)]
#[allow(dead_code)]
pub enum Action {
	/// Disconnect the given collator.
Gav Wood's avatar
Gav Wood committed
44
	Disconnect(CollatorId),
45
	/// Give the collator a new role.
Gav Wood's avatar
Gav Wood committed
46
	NewRole(CollatorId, Role),
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
}

struct CollationSlot {
	live_at: Instant,
	entries: SlotEntries,
}

impl CollationSlot {
	fn blank_now() -> Self {
		CollationSlot {
			live_at: Instant::now(),
			entries: SlotEntries::Blank,
		}
	}

	fn stay_alive(&self, now: Instant) -> bool {
		self.live_at + COLLATION_LIFETIME > now
	}
}

enum SlotEntries {
	Blank,
	// not queried yet
	Pending(Vec<Collation>),
	// waiting for next to arrive.
	Awaiting(Vec<oneshot::Sender<Collation>>),
}

impl SlotEntries {
	fn received_collation(&mut self, collation: Collation) {
		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
			SlotEntries::Blank => SlotEntries::Pending(vec![collation]),
			SlotEntries::Pending(mut cs) => {
				cs.push(collation);
				SlotEntries::Pending(cs)
			}
			SlotEntries::Awaiting(senders) => {
				for sender in senders {
					let _ = sender.send(collation.clone());
				}

				SlotEntries::Blank
			}
		};
	}

	fn await_with(&mut self, sender: oneshot::Sender<Collation>) {
		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
			SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]),
			SlotEntries::Awaiting(mut senders) => {
				senders.push(sender);
				SlotEntries::Awaiting(senders)
			}
			SlotEntries::Pending(mut cs) => {
				let next_collation = cs.pop().expect("empty variant is always `Blank`; qed");
				let _ = sender.send(next_collation);

				if cs.is_empty() {
					SlotEntries::Blank
				} else {
					SlotEntries::Pending(cs)
				}
			}
		};
	}
}

struct ParachainCollators {
Gav Wood's avatar
Gav Wood committed
115
116
	primary: CollatorId,
	backup: Vec<CollatorId>,
117
118
119
120
}

/// Manages connected collators and role assignments from the perspective of a validator.
pub struct CollatorPool {
121
	collators: HashMap<CollatorId, (ParaId, PeerId)>,
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
	parachain_collators: HashMap<ParaId, ParachainCollators>,
	collations: HashMap<(Hash, ParaId), CollationSlot>,
}

impl CollatorPool {
	/// Create a new `CollatorPool` object.
	pub fn new() -> Self {
		CollatorPool {
			collators: HashMap::new(),
			parachain_collators: HashMap::new(),
			collations: HashMap::new(),
		}
	}

	/// Call when a new collator is authenticated. Returns the role.
137
138
	pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId, peer_id: PeerId) -> Role {
		self.collators.insert(collator_id.clone(), (para_id, peer_id));
139
140
141
		match self.parachain_collators.entry(para_id) {
			Entry::Vacant(vacant) => {
				vacant.insert(ParachainCollators {
Gav Wood's avatar
Gav Wood committed
142
					primary: collator_id,
143
144
145
146
147
148
					backup: Vec::new(),
				});

				Role::Primary
			},
			Entry::Occupied(mut occupied) => {
Gav Wood's avatar
Gav Wood committed
149
				occupied.get_mut().backup.push(collator_id);
150
151
152
153
154
155
156
157

				Role::Backup
			}
		}
	}

	/// Called when a collator disconnects. If it was the primary, returns a new primary for that
	/// parachain.
Gav Wood's avatar
Gav Wood committed
158
	pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option<CollatorId> {
159
		self.collators.remove(&collator_id).and_then(|(para_id, _)| match self.parachain_collators.entry(para_id) {
160
161
			Entry::Vacant(_) => None,
			Entry::Occupied(mut occ) => {
Gav Wood's avatar
Gav Wood committed
162
				if occ.get().primary == collator_id {
163
164
165
166
167
168
					if occ.get().backup.is_empty() {
						occ.remove();
						None
					} else {
						let mut collators = occ.get_mut();
						collators.primary = collators.backup.pop().expect("backup non-empty; qed");
Gav Wood's avatar
Gav Wood committed
169
						Some(collators.primary.clone())
170
171
					}
				} else {
Gav Wood's avatar
Gav Wood committed
172
					let pos = occ.get().backup.iter().position(|a| a == &collator_id)
173
174
175
176
177
178
179
180
181
182
183
184
						.expect("registered collator always present in backup if not primary; qed");

					occ.get_mut().backup.remove(pos);
					None
				}
			}
		})
	}

	/// Called when a collation is received.
	/// The collator should be registered for the parachain of the collation as a precondition of this function.
	/// The collation should have been checked for integrity of signature before passing to this function.
Gav Wood's avatar
Gav Wood committed
185
	pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) {
186
		if let Some((para_id, _)) = self.collators.get(&collator_id) {
187
188
			debug_assert_eq!(para_id, &collation.receipt.parachain_index);

189
			// TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213)
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209

			self.collations.entry((relay_parent, para_id.clone()))
				.or_insert_with(CollationSlot::blank_now)
				.entries
				.received_collation(collation);
		}
	}

	/// Wait for a collation from a parachain.
	pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
		self.collations.entry((relay_parent, para_id))
			.or_insert_with(CollationSlot::blank_now)
			.entries
			.await_with(sender);
	}

	/// Call periodically to perform collator set maintenance.
	/// Returns a set of actions to perform on the network level.
	pub fn maintain_peers(&mut self) -> Vec<Action> {
		// TODO: rearrange periodically to new primary, evaluate based on latency etc.
210
		// https://github.com/paritytech/polkadot/issues/214
211
212
213
214
215
216
217
218
219
220
221
222
223
		Vec::new()
	}

	/// called when a block with given hash has been imported.
	pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) {
		let now = Instant::now();
		self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
	}
}

#[cfg(test)]
mod tests {
	use super::*;
Gav Wood's avatar
Gav Wood committed
224
	use substrate_primitives::crypto::UncheckedInto;
225
226
227
	use polkadot_primitives::parachain::{
		CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
	};
228
229
	use futures::Future;

230
231
232
233
234
235
236
	fn make_pov(block_data: Vec<u8>) -> PoVBlock {
		PoVBlock {
			block_data: BlockData(block_data),
			ingress: ConsolidatedIngress(Vec::new()),
		}
	}

237
238
239
240
	#[test]
	fn disconnect_primary_gives_new_primary() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
241
242
		let bad_primary: CollatorId = [0; 32].unchecked_into();
		let good_backup: CollatorId = [1; 32].unchecked_into();
243

244
245
		assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
		assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
Gav Wood's avatar
Gav Wood committed
246
		assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone()));
247
248
249
250
251
252
253
		assert_eq!(pool.on_disconnect(good_backup), None);
	}

	#[test]
	fn disconnect_backup_removes_from_pool() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
254
255
		let primary = [0; 32].unchecked_into();
		let backup: CollatorId = [1; 32].unchecked_into();
256

257
258
		assert_eq!(pool.on_new_collator(primary, para_id.clone(), PeerId::random()), Role::Primary);
		assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
259
260
261
262
263
264
265
266
		assert_eq!(pool.on_disconnect(backup), None);
		assert!(pool.parachain_collators.get(&para_id).unwrap().backup.is_empty());
	}

	#[test]
	fn await_before_collation() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
267
		let peer_id = PeerId::random();
Gav Wood's avatar
Gav Wood committed
268
		let primary: CollatorId = [0; 32].unchecked_into();
269
270
		let relay_parent = [1; 32].into();

271
		assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), peer_id.clone()), Role::Primary);
272
273
274
275
		let (tx1, rx1) = oneshot::channel();
		let (tx2, rx2) = oneshot::channel();
		pool.await_collation(relay_parent, para_id, tx1);
		pool.await_collation(relay_parent, para_id, tx2);
Gav Wood's avatar
Gav Wood committed
276
		pool.on_collation(primary.clone(), relay_parent, Collation {
277
278
			receipt: CandidateReceipt {
				parachain_index: para_id,
279
				collator: primary.clone().into(),
Gav Wood's avatar
Gav Wood committed
280
				signature: Default::default(),
281
282
283
284
				head_data: HeadData(vec![1, 2, 3]),
				egress_queue_roots: vec![],
				fees: 0,
				block_data_hash: [3; 32].into(),
285
				upward_messages: Vec::new(),
286
			},
287
			pov: make_pov(vec![4, 5, 6]),
288
289
290
291
		});

		rx1.wait().unwrap();
		rx2.wait().unwrap();
292
		assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id);
293
294
295
296
297
298
	}

	#[test]
	fn collate_before_await() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
299
		let primary: CollatorId = [0; 32].unchecked_into();
300
301
		let relay_parent = [1; 32].into();

302
		assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
303

Gav Wood's avatar
Gav Wood committed
304
		pool.on_collation(primary.clone(), relay_parent, Collation {
305
306
			receipt: CandidateReceipt {
				parachain_index: para_id,
Gav Wood's avatar
Gav Wood committed
307
308
				collator: primary,
				signature: Default::default(),
309
310
311
312
				head_data: HeadData(vec![1, 2, 3]),
				egress_queue_roots: vec![],
				fees: 0,
				block_data_hash: [3; 32].into(),
313
				upward_messages: Vec::new(),
314
			},
315
			pov: make_pov(vec![4, 5, 6]),
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
		});

		let (tx, rx) = oneshot::channel();
		pool.await_collation(relay_parent, para_id, tx);
		rx.wait().unwrap();
	}

	#[test]
	fn slot_stay_alive() {
		let slot = CollationSlot::blank_now();
		let now = slot.live_at;

		assert!(slot.stay_alive(now));
		assert!(slot.stay_alive(now + Duration::from_secs(10)));
		assert!(!slot.stay_alive(now + COLLATION_LIFETIME));
		assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10)));
	}
}