Unverified Commit 40f10656 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Store `PeerId`s in collator pool (#299)

parent 8088e5b9
Pipeline #41201 passed with stages
in 10 minutes and 10 seconds
......@@ -19,6 +19,7 @@
use parity_codec::{Encode, Decode};
use polkadot_primitives::Hash;
use polkadot_primitives::parachain::{CollatorId, Id as ParaId, Collation};
use substrate_network::PeerId;
use futures::sync::oneshot;
use std::collections::hash_map::{HashMap, Entry};
......@@ -117,7 +118,7 @@ struct ParachainCollators {
/// Manages connected collators and role assignments from the perspective of a validator.
pub struct CollatorPool {
collators: HashMap<CollatorId, ParaId>,
collators: HashMap<CollatorId, (ParaId, PeerId)>,
parachain_collators: HashMap<ParaId, ParachainCollators>,
collations: HashMap<(Hash, ParaId), CollationSlot>,
}
......@@ -133,8 +134,8 @@ impl CollatorPool {
}
/// Call when a new collator is authenticated. Returns the role.
pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId) -> Role {
self.collators.insert(collator_id.clone(), para_id);
pub fn on_new_collator(&mut self, collator_id: CollatorId, para_id: ParaId, peer_id: PeerId) -> Role {
self.collators.insert(collator_id.clone(), (para_id, peer_id));
match self.parachain_collators.entry(para_id) {
Entry::Vacant(vacant) => {
vacant.insert(ParachainCollators {
......@@ -155,7 +156,7 @@ impl CollatorPool {
/// Called when a collator disconnects. If it was the primary, returns a new primary for that
/// parachain.
pub fn on_disconnect(&mut self, collator_id: CollatorId) -> Option<CollatorId> {
self.collators.remove(&collator_id).and_then(|para_id| match self.parachain_collators.entry(para_id) {
self.collators.remove(&collator_id).and_then(|(para_id, _)| match self.parachain_collators.entry(para_id) {
Entry::Vacant(_) => None,
Entry::Occupied(mut occ) => {
if occ.get().primary == collator_id {
......@@ -182,7 +183,7 @@ impl CollatorPool {
/// The collator should be registered for the parachain of the collation as a precondition of this function.
/// The collation should have been checked for integrity of signature before passing to this function.
pub fn on_collation(&mut self, collator_id: CollatorId, relay_parent: Hash, collation: Collation) {
if let Some(para_id) = self.collators.get(&collator_id) {
if let Some((para_id, _)) = self.collators.get(&collator_id) {
debug_assert_eq!(para_id, &collation.receipt.parachain_index);
// TODO: punish if not primary? (https://github.com/paritytech/polkadot/issues/213)
......@@ -240,8 +241,8 @@ mod tests {
let bad_primary: CollatorId = [0; 32].unchecked_into();
let good_backup: CollatorId = [1; 32].unchecked_into();
assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone()), Role::Primary);
assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone()), Role::Backup);
assert_eq!(pool.on_new_collator(bad_primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
assert_eq!(pool.on_new_collator(good_backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup.clone()));
assert_eq!(pool.on_disconnect(good_backup), None);
}
......@@ -253,8 +254,8 @@ mod tests {
let primary = [0; 32].unchecked_into();
let backup: CollatorId = [1; 32].unchecked_into();
assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary);
assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone()), Role::Backup);
assert_eq!(pool.on_new_collator(primary, para_id.clone(), PeerId::random()), Role::Primary);
assert_eq!(pool.on_new_collator(backup.clone(), para_id.clone(), PeerId::random()), Role::Backup);
assert_eq!(pool.on_disconnect(backup), None);
assert!(pool.parachain_collators.get(&para_id).unwrap().backup.is_empty());
}
......@@ -263,10 +264,11 @@ mod tests {
fn await_before_collation() {
let mut pool = CollatorPool::new();
let para_id: ParaId = 5.into();
let peer_id = PeerId::random();
let primary: CollatorId = [0; 32].unchecked_into();
let relay_parent = [1; 32].into();
assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone()), Role::Primary);
assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), peer_id.clone()), Role::Primary);
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
pool.await_collation(relay_parent, para_id, tx1);
......@@ -274,7 +276,7 @@ mod tests {
pool.on_collation(primary.clone(), relay_parent, Collation {
receipt: CandidateReceipt {
parachain_index: para_id,
collator: primary.into(),
collator: primary.clone().into(),
signature: Default::default(),
head_data: HeadData(vec![1, 2, 3]),
egress_queue_roots: vec![],
......@@ -287,6 +289,7 @@ mod tests {
rx1.wait().unwrap();
rx2.wait().unwrap();
assert_eq!(pool.collators.get(&primary).map(|ids| &ids.1).unwrap(), &peer_id);
}
#[test]
......@@ -296,7 +299,7 @@ mod tests {
let primary: CollatorId = [0; 32].unchecked_into();
let relay_parent = [1; 32].into();
assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone()), Role::Primary);
assert_eq!(pool.on_new_collator(primary.clone(), para_id.clone(), PeerId::random()), Role::Primary);
pool.on_collation(primary.clone(), relay_parent, Collation {
receipt: CandidateReceipt {
......
......@@ -491,7 +491,11 @@ impl Specialization<Block> for PolkadotProtocol {
}
ctx.report_peer(who.clone(), benefit::NEW_COLLATOR);
let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone());
let collator_role = self.collators.on_new_collator(
acc_id.clone(),
para_id.clone(),
who.clone(),
);
peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message(
ctx,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment