collator_pool.rs 10.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Bridge between the network and consensus service for getting collations to it.

19
use codec::{Encode, Decode};
20
21
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
22
use substrate_network::PeerId;
23
24
25
26
27
28
29
30
use futures::sync::oneshot;

use std::collections::hash_map::{HashMap, Entry};
use std::time::{Duration, Instant};

const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5);

/// The role of the collator. Whether they're the primary or backup for this parachain.
31
#[derive(PartialEq, Debug, Clone, Copy, Encode, Decode)]
32
33
pub enum Role {
	/// Primary collators should send collations whenever it's time.
34
	Primary = 0,
35
	/// Backup collators should not.
36
37
38
	Backup = 1,
}

39
40
41
42
43
/// A maintenance action for the collator set.
#[derive(PartialEq, Debug)]
#[allow(dead_code)]
pub enum Action {
	/// Disconnect the given collator.
Gav Wood's avatar
Gav Wood committed
44
	Disconnect(CollatorId),
45
	/// Give the collator a new role.
Gav Wood's avatar
Gav Wood committed
46
	NewRole(CollatorId, Role),
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
}

struct CollationSlot {
	live_at: Instant,
	entries: SlotEntries,
}

impl CollationSlot {
	fn blank_now() -> Self {
		CollationSlot {
			live_at: Instant::now(),
			entries: SlotEntries::Blank,
		}
	}

	fn stay_alive(&self, now: Instant) -> bool {
		self.live_at + COLLATION_LIFETIME > now
	}
}

enum SlotEntries {
	Blank,
	// not queried yet
	Pending(Vec<Collation>),
	// waiting for next to arrive.
	Awaiting(Vec<oneshot::Sender<Collation>>),
}

impl SlotEntries {
	fn received_collation(&mut self, collation: Collation) {
		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
			SlotEntries::Blank => SlotEntries::Pending(vec![collation]),
			SlotEntries::Pending(mut cs) => {
				cs.push(collation);
				SlotEntries::Pending(cs)
			}
			SlotEntries::Awaiting(senders) => {
				for sender in senders {
					let _ = sender.send(collation.clone());
				}

				SlotEntries::Blank
			}
		};
	}

	fn await_with(&mut self, sender: oneshot::Sender<Collation>) {
		*self = match ::std::mem::replace(self, SlotEntries::Blank) {
			SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]),
			SlotEntries::Awaiting(mut senders) => {
				senders.push(sender);
				SlotEntries::Awaiting(senders)
			}
			SlotEntries::Pending(mut cs) => {
				let next_collation = cs.pop().expect("empty variant is always `Blank`; qed");
				let _ = sender.send(next_collation);

				if cs.is_empty() {
					SlotEntries::Blank
				} else {
					SlotEntries::Pending(cs)
				}
			}
		};
	}
}

struct ParachainCollators {
Gav Wood's avatar
Gav Wood committed
115
116
	primary: CollatorId,
	backup: Vec<CollatorId>,
117
118
119
120
}

/// Manages connected collators and role assignments from the perspective of a validator.
pub struct CollatorPool {
121
	collators: HashMap<CollatorId, (ParaId, PeerId)>,
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
	parachain_collators: HashMap<ParaId, ParachainCollators>,
	collations: HashMap<(Hash, ParaId), CollationSlot>,
}

impl CollatorPool {
	/// Create a new `CollatorPool` object.
	pub fn new() -> Self {
		CollatorPool {
			collators: HashMap::new(),
			parachain_collators: HashMap::new(),
			collations: HashMap::new(),
		}
	}

	/// Call when a new collator is authenticated. Returns the role.
137
138
	pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId, peer_id: PeerId) -> Role {
		self.collators.insert(collator_id.clone(), (para_id, peer_id));
139
140
141
		match self.parachain_collators.entry(para_id) {
			Entry::Vacant(vacant) => {
				vacant.insert(ParachainCollators {
Gav Wood's avatar
Gav Wood committed
142
					primary: collator_id,
143
144
145
146
147
148
					backup: Vec::new(),
				});

				Role::Primary
			},
			Entry::Occupied(mut occupied) => {
Gav Wood's avatar
Gav Wood committed
149
				occupied.get_mut().backup.push(collator_id);
150
151
152
153
154
155
156
157

				Role::Backup
			}
		}
	}

	/// Called when a collator disconnects. If it was the primary, returns a new primary for that
	/// parachain.
Gav Wood's avatar
Gav Wood committed
158
	pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option<CollatorId> {
159
		self.collators.remove(&collator_id).and_then(|(para_id, _)| match self.parachain_collators.entry(para_id) {
160
161
			Entry::Vacant(_) => None,
			Entry::Occupied(mut occ) => {
Gav Wood's avatar
Gav Wood committed
162
				if occ.get().primary == collator_id {
163
164
165
166
167
168
					if occ.get().backup.is_empty() {
						occ.remove();
						None
					} else {
						let mut collators = occ.get_mut();
						collators.primary = collators.backup.pop().expect("backup non-empty; qed");
Gav Wood's avatar
Gav Wood committed
169
						Some(collators.primary.clone())
170
171
					}
				} else {
Gav Wood's avatar
Gav Wood committed
172
					let pos = occ.get().backup.iter().position(|a| a == &collator_id)
173
174
175
176
177
178
179
180
181
182
183
184
						.expect("registered collator always present in backup if not primary; qed");

					occ.get_mut().backup.remove(pos);
					None
				}
			}
		})
	}

	/// Called when a collation is received.
	/// The collator should be registered for the parachain of the collation as a precondition of this function.
	/// The collation should have been checked for integrity of signature before passing to this function.
Gav Wood's avatar
Gav Wood committed
185
	pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) {
186
		if let Some((para_id, _)) = self.collators.get(&collator_id) {
187
188
			debug_assert_eq!(para_id, &collation.receipt.parachain_index);

189
			// TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213)
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209

			self.collations.entry((relay_parent, para_id.clone()))
				.or_insert_with(CollationSlot::blank_now)
				.entries
				.received_collation(collation);
		}
	}

	/// Wait for a collation from a parachain.
	pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender<Collation>) {
		self.collations.entry((relay_parent, para_id))
			.or_insert_with(CollationSlot::blank_now)
			.entries
			.await_with(sender);
	}

	/// Call periodically to perform collator set maintenance.
	/// Returns a set of actions to perform on the network level.
	pub fn maintain_peers(&mut self) -> Vec<Action> {
		// TODO: rearrange periodically to new primary, evaluate based on latency etc.
210
		// https://github.com/paritytech/polkadot/issues/214
211
212
213
214
215
216
217
218
		Vec::new()
	}

	/// called when a block with given hash has been imported.
	pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) {
		let now = Instant::now();
		self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
	}
219
220
221
222
223

	/// Convert the given `CollatorId` to a `PeerId`.
	pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
		self.collators.get(collator_id).map(|ids| &ids.1)
	}
224
225
226
227
228
}

#[cfg(test)]
mod tests {
	use super::*;
Gav Wood's avatar
Gav Wood committed
229
	use substrate_primitives::crypto::UncheckedInto;
230
231
232
	use polkadot_primitives::parachain::{
		CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
	};
233
234
	use futures::Future;

235
236
237
238
239
240
241
	fn make_pov(block_data: Vec<u8>) -> PoVBlock {
		PoVBlock {
			block_data: BlockData(block_data),
			ingress: ConsolidatedIngress(Vec::new()),
		}
	}

242
243
244
245
	#[test]
	fn disconnect_primary_gives_new_primary() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
246
247
		let bad_primary: CollatorId = [0; 32].unchecked_into();
		let good_backup: CollatorId = [1; 32].unchecked_into();
248

249
250
		assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
		assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
Gav Wood's avatar
Gav Wood committed
251
		assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone()));
252
253
254
255
256
257
258
		assert_eq!(pool.on_disconnect(good_backup), None);
	}

	#[test]
	fn disconnect_backup_removes_from_pool() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
259
260
		let primary = [0; 32].unchecked_into();
		let backup: CollatorId = [1; 32].unchecked_into();
261

262
263
		assert_eq!(pool.on_new_collator(primary, para_id.clone(), PeerId::random()), Role::Primary);
		assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
264
265
266
267
268
269
270
271
		assert_eq!(pool.on_disconnect(backup), None);
		assert!(pool.parachain_collators.get(&para_id).unwrap().backup.is_empty());
	}

	#[test]
	fn await_before_collation() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
272
		let peer_id = PeerId::random();
Gav Wood's avatar
Gav Wood committed
273
		let primary: CollatorId = [0; 32].unchecked_into();
274
275
		let relay_parent = [1; 32].into();

276
		assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), peer_id.clone()), Role::Primary);
277
278
279
280
		let (tx1, rx1) = oneshot::channel();
		let (tx2, rx2) = oneshot::channel();
		pool.await_collation(relay_parent, para_id, tx1);
		pool.await_collation(relay_parent, para_id, tx2);
Gav Wood's avatar
Gav Wood committed
281
		pool.on_collation(primary.clone(), relay_parent, Collation {
282
283
			receipt: CandidateReceipt {
				parachain_index: para_id,
284
				collator: primary.clone().into(),
Gav Wood's avatar
Gav Wood committed
285
				signature: Default::default(),
286
287
288
289
				head_data: HeadData(vec![1, 2, 3]),
				egress_queue_roots: vec![],
				fees: 0,
				block_data_hash: [3; 32].into(),
290
				upward_messages: Vec::new(),
291
			},
292
			pov: make_pov(vec![4, 5, 6]),
293
294
295
296
		});

		rx1.wait().unwrap();
		rx2.wait().unwrap();
297
		assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id);
298
299
300
301
302
303
	}

	#[test]
	fn collate_before_await() {
		let mut pool = CollatorPool::new();
		let para_id: ParaId = 5.into();
Gav Wood's avatar
Gav Wood committed
304
		let primary: CollatorId = [0; 32].unchecked_into();
305
306
		let relay_parent = [1; 32].into();

307
		assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
308

Gav Wood's avatar
Gav Wood committed
309
		pool.on_collation(primary.clone(), relay_parent, Collation {
310
311
			receipt: CandidateReceipt {
				parachain_index: para_id,
Gav Wood's avatar
Gav Wood committed
312
313
				collator: primary,
				signature: Default::default(),
314
315
316
317
				head_data: HeadData(vec![1, 2, 3]),
				egress_queue_roots: vec![],
				fees: 0,
				block_data_hash: [3; 32].into(),
318
				upward_messages: Vec::new(),
319
			},
320
			pov: make_pov(vec![4, 5, 6]),
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
		});

		let (tx, rx) = oneshot::channel();
		pool.await_collation(relay_parent, para_id, tx);
		rx.wait().unwrap();
	}

	#[test]
	fn slot_stay_alive() {
		let slot = CollationSlot::blank_now();
		let now = slot.live_at;

		assert!(slot.stay_alive(now));
		assert!(slot.stay_alive(now + Duration::from_secs(10)));
		assert!(!slot.stay_alive(now + COLLATION_LIFETIME));
		assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10)));
	}
}