collator_pool.rs 9.94 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Bridge between the network and consensus service for getting collations to it.

19
use parity_codec::{Encode, Decode};
20
21
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
22
23
24
25
26
27
28
29
use futures::sync::oneshot;

use std::collections::hash_map::{HashMap, Entry};
use std::time::{Duration, Instant};

const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);

/// The role of the collator. Whether they're the primary or backup for this parachain.
30
#[derive(PartialEq, Debug, Clone, Copy, Encode, Decode)]
31
32
pub enum Role {
	/// Primary collators should send collations whenever it's time.
33
	Primary = 0,
34
	/// Backup collators should not.
35
36
37
	Backup = 1,
}

38
39
40
41
42
/// A maintenance action for the collator set.
#[derive(PartialEq, Debug)]
#[allow(dead_code)]
pub enum Action {
	/// Disconnect the given collator.
Gav Wood's avatar
Gav Wood committed
43
	Disconnect(CollatorId),
44
	/// Give the collator a new role.
Gav Wood's avatar
Gav Wood committed
45
	NewRole(CollatorId, Role),
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
}

struct CollationSlot {
	live_at: Instant,
	entries: SlotEntries,
}

impl CollationSlot {
	fn blank_now() -> Self {
		CollationSlot {
			live_at: Instant::now(),
			entries: SlotEntries::Blank,
		}
	}

	fn stay_alive(&self, now: Instant) -> bool {
		self.live_at + COLLATION_LIFETIME > now
	}
}

enum SlotEntries {
	Blank,
	// not queried yet
	Pending(Vec<Collation>),
	// waiting for next to arrive.
	Awaiting(Vec<oneshot::Sender<Collation>>),
}

impl SlotEntries {
	fn received_collation(&mut self, collation: Collation) {
		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
			SlotEntries::Blank => SlotEntries::Pending(vec![collation]),
			SlotEntries::Pending(mut cs) => {
				cs.push(collation);
				SlotEntries::Pending(cs)
			}
			SlotEntries::Awaiting(senders) => {
				for sender in senders {
					let _ = sender.send(collation.clone());
				}

				SlotEntries::Blank
			}
		};
	}

	fn await_with(&mut self, sender: oneshot::Sender<Collation>) {
		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
			SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]),
			SlotEntries::Awaiting(mut senders) => {
				senders.push(sender);
				SlotEntries::Awaiting(senders)
			}
			SlotEntries::Pending(mut cs) => {
				let next_collation = cs.pop().expect("empty variant is always `Blank`; qed");
				let _ = sender.send(next_collation);

				if cs.is_empty() {
					SlotEntries::Blank
				} else {
					SlotEntries::Pending(cs)
				}
			}
		};
	}
}

struct ParachainCollators {
Gav Wood's avatar
Gav Wood committed
114
115
	primary: CollatorId,
	backup: Vec<CollatorId>,
116
117
118
119
}

/// Manages connected collators and role assignments from the perspective of a validator.
pub struct CollatorPool {
Gav Wood's avatar
Gav Wood committed
120
	collators: HashMap<CollatorId, ParaId>,
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
	parachain_collators: HashMap<ParaId, ParachainCollators>,
	collations: HashMap<(Hash, ParaId), CollationSlot>,
}

impl CollatorPool {
	/// Create a new `CollatorPool` object.
	pub fn new() -> Self {
		CollatorPool {
			collators: HashMap::new(),
			parachain_collators: HashMap::new(),
			collations: HashMap::new(),
		}
	}

	/// Call when a new collator is authenticated. Returns the role.
Gav Wood's avatar
Gav Wood committed
136
137
	pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId) -> Role {
		self.collators.insert(collator_id.clone(), para_id);
138
139
140
		match self.parachain_collators.entry(para_id) {
			Entry::Vacant(vacant) => {
				vacant.insert(ParachainCollators {
Gav Wood's avatar
Gav Wood committed
141
					primary: collator_id,
142
143
144
145
146
147
					backup: Vec::new(),
				});

				Role::Primary
			},
			Entry::Occupied(mut occupied) => {
Gav Wood's avatar
Gav Wood committed
148
				occupied.get_mut().backup.push(collator_id);
149
150
151
152
153
154
155
156

				Role::Backup
			}
		}
	}

	/// Called when a collator disconnects. If it was the primary, returns a new primary for that
	/// parachain.
Gav Wood's avatar
Gav Wood committed
157
158
	pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option<CollatorId> {
		self.collators.remove(&collator_id).and_then(|para_id| match self.parachain_collators.entry(para_id) {
159
160
			Entry::Vacant(_) => None,
			Entry::Occupied(mut occ) => {
Gav Wood's avatar
Gav Wood committed
161
				if occ.get().primary == collator_id {
162
163
164
165
166
167
					if occ.get().backup.is_empty() {
						occ.remove();
						None
					} else {
						let mut collators = occ.get_mut();
						collators.primary = collators.backup.pop().expect("backup non-empty; qed");
Gav Wood's avatar
Gav Wood committed
168
						Some(collators.primary.clone())
169
170
					}
				} else {
Gav Wood's avatar
Gav Wood committed
171
					let pos = occ.get().backup.iter().position(|a| a == &collator_id)
172
173
174
175
176
177
178
179
180
181
182
183
						.expect("registered collator always present in backup if not primary; qed");

					occ.get_mut().backup.remove(pos);
					None
				}
			}
		})
	}

	/// Called when a collation is received.
	/// The collator should be registered for the parachain of the collation as a precondition of this function.
	/// The collation should have been checked for integrity of signature before passing to this function.
Gav Wood's avatar
Gav Wood committed
184
185
	pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) {
		if let Some(para_id) = self.collators.get(&collator_id) {
186
187
			debug_assert_eq!(para_id, &collation.receipt.parachain_index);

188
			// TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213)
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208

			self.collations.entry((relay_parent, para_id.clone()))
				.or_insert_with(CollationSlot::blank_now)
				.entries
				.received_collation(collation);
		}
	}

	/// Wait for a collation from a parachain.
	pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
		self.collations.entry((relay_parent, para_id))
			.or_insert_with(CollationSlot::blank_now)
			.entries
			.await_with(sender);
	}

	/// Call periodically to perform collator set maintenance.
	/// Returns a set of actions to perform on the network level.
	pub fn maintain_peers(&mut self) -> Vec<Action> {
		// TODO: rearrange periodically to new primary, evaluate based on latency etc.
209
		// https://github.com/paritytech/polkadot/issues/214
210
211
212
213
214
215
216
217
218
219
220
221
222
		Vec::new()
	}

	/// called when a block with given hash has been imported.
	pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) {
		let now = Instant::now();
		self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
	}
}

#[cfg(test)]
mod tests {
	use super::*;
Gav Wood's avatar
Gav Wood committed
223
	use substrate_primitives::crypto::UncheckedInto;
224
225
226
	use polkadot_primitives::parachain::{
		CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
	};
227
228
	use futures::Future;

229
230
231
232
233
234
235
	fn make_pov(block_data: Vec<u8>) -> PoVBlock {
		PoVBlock {
			block_data: BlockData(block_data),
			ingress: ConsolidatedIngress(Vec::new()),
		}
	}

236
237
238
239
	#[test]
	fn disconnect_primary_gives_new_primary() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
240
241
		let bad_primary: CollatorId = [0; 32].unchecked_into();
		let good_backup: CollatorId = [1; 32].unchecked_into();
242

Gav Wood's avatar
Gav Wood committed
243
244
245
		assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone()), Role::Primary);
		assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone()), Role::Backup);
		assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone()));
246
247
248
249
250
251
252
		assert_eq!(pool.on_disconnect(good_backup), None);
	}

	#[test]
	fn disconnect_backup_removes_from_pool() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
253
254
		let primary = [0; 32].unchecked_into();
		let backup: CollatorId = [1; 32].unchecked_into();
255
256

		assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
Gav Wood's avatar
Gav Wood committed
257
		assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone()), Role::Backup);
258
259
260
261
262
263
264
265
		assert_eq!(pool.on_disconnect(backup), None);
		assert!(pool.parachain_collators.get(&para_id).unwrap().backup.is_empty());
	}

	#[test]
	fn await_before_collation() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
266
		let primary: CollatorId = [0; 32].unchecked_into();
267
268
		let relay_parent = [1; 32].into();

Gav Wood's avatar
Gav Wood committed
269
		assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone()), Role::Primary);
270
271
272
273
		let (tx1, rx1) = oneshot::channel();
		let (tx2, rx2) = oneshot::channel();
		pool.await_collation(relay_parent, para_id, tx1);
		pool.await_collation(relay_parent, para_id, tx2);
Gav Wood's avatar
Gav Wood committed
274
		pool.on_collation(primary.clone(), relay_parent, Collation {
275
276
277
			receipt: CandidateReceipt {
				parachain_index: para_id,
				collator: primary.into(),
Gav Wood's avatar
Gav Wood committed
278
				signature: Default::default(),
279
280
281
282
283
284
				head_data: HeadData(vec![1, 2, 3]),
				balance_uploads: vec![],
				egress_queue_roots: vec![],
				fees: 0,
				block_data_hash: [3; 32].into(),
			},
285
			pov: make_pov(vec![4, 5, 6]),
286
287
288
289
290
291
292
293
294
295
		});

		rx1.wait().unwrap();
		rx2.wait().unwrap();
	}

	#[test]
	fn collate_before_await() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
296
		let primary: CollatorId = [0; 32].unchecked_into();
297
298
		let relay_parent = [1; 32].into();

Gav Wood's avatar
Gav Wood committed
299
		assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone()), Role::Primary);
300

Gav Wood's avatar
Gav Wood committed
301
		pool.on_collation(primary.clone(), relay_parent, Collation {
302
303
			receipt: CandidateReceipt {
				parachain_index: para_id,
Gav Wood's avatar
Gav Wood committed
304
305
				collator: primary,
				signature: Default::default(),
306
307
308
309
310
311
				head_data: HeadData(vec![1, 2, 3]),
				balance_uploads: vec![],
				egress_queue_roots: vec![],
				fees: 0,
				block_data_hash: [3; 32].into(),
			},
312
			pov: make_pov(vec![4, 5, 6]),
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
		});

		let (tx, rx) = oneshot::channel();
		pool.await_collation(relay_parent, para_id, tx);
		rx.wait().unwrap();
	}

	#[test]
	fn slot_stay_alive() {
		let slot = CollationSlot::blank_now();
		let now = slot.live_at;

		assert!(slot.stay_alive(now));
		assert!(slot.stay_alive(now + Duration::from_secs(10)));
		assert!(!slot.stay_alive(now + COLLATION_LIFETIME));
		assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10)));
	}
}