diff --git a/core/finality-grandpa/src/communication.rs b/core/finality-grandpa/src/communication.rs index fe6e2a0477a6f96cf8467b51448ce86cf59e02a5..b1378944e4cc70e22f9138cf23eb678928211b30 100644 --- a/core/finality-grandpa/src/communication.rs +++ b/core/finality-grandpa/src/communication.rs @@ -33,24 +33,29 @@ fn localized_payload<E: Encode>(round: u64, set_id: u64, message: &E) -> Vec<u8> (message, round, set_id).encode() } +#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] +struct Round(u64); +#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] +struct SetId(u64); + enum Broadcast<Block: BlockT> { - // set_id, round, encoded commit. - Commit(u64, u64, Vec<u8>), - // set_id, round, encoded signed message. - Message(u64, u64, Vec<u8>), - // set_id, round, announcement of block hash that should be downloaded - Announcement(u64, u64, Block::Hash), - // set_id, round being dropped. - DropRound(u64, u64), + // round, set id, encoded commit. + Commit(Round, SetId, Vec<u8>), + // round, set id, encoded signed message. + Message(Round, SetId, Vec<u8>), + // round, set id, announcement of block hash that should be downloaded + Announcement(Round, SetId, Block::Hash), + // round, set id being dropped. + DropRound(Round, SetId), } impl<Block: BlockT> Broadcast<Block> { - fn set_id(&self) -> u64 { + fn set_id(&self) -> SetId { match *self { - Broadcast::Commit(s, _, _) => s, - Broadcast::Message(s, _, _) => s, - Broadcast::Announcement(s, _, _) => s, - Broadcast::DropRound(s, _) => s, + Broadcast::Commit(_, s, _) => s, + Broadcast::Message(_, s, _) => s, + Broadcast::Announcement(_, s, _) => s, + Broadcast::DropRound(_, s) => s, } } } @@ -66,9 +71,9 @@ pub(crate) fn rebroadcasting_network<B: BlockT, N: Network<B>>(network: N) -> (B ( BroadcastWorker { interval: Interval::new_interval(REBROADCAST_PERIOD), - set_id: 0, // will be overwritten on first item to broadcast. + set_id: SetId(0), // will be overwritten on first item to broadcast. last_commit: None, - round_messages: (0, Vec::new()), + round_messages: (Round(0), Vec::new()), announcements: HashMap::new(), network: network.clone(), incoming_broadcast: rx, @@ -85,10 +90,10 @@ pub(crate) fn rebroadcasting_network<B: BlockT, N: Network<B>>(network: N) -> (B #[must_use = "network rebroadcast future must be driven to completion"] pub(crate) struct BroadcastWorker<B: BlockT, N: Network<B>> { interval: Interval, - set_id: u64, - last_commit: Option<(u64, Vec<u8>)>, - round_messages: (u64, Vec<Vec<u8>>), - announcements: HashMap<B::Hash, u64>, + set_id: SetId, + last_commit: Option<(Round, Vec<u8>)>, + round_messages: (Round, Vec<Vec<u8>>), + announcements: HashMap<B::Hash, Round>, network: N, incoming_broadcast: mpsc::UnboundedReceiver<Broadcast<B>>, } @@ -115,17 +120,18 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> { } if rebroadcast { - if let Some((c_round, ref c_commit)) = self.last_commit { - self.network.send_commit(c_round, self.set_id, c_commit.clone()); + let SetId(set_id) = self.set_id; + if let Some((Round(c_round), ref c_commit)) = self.last_commit { + self.network.send_commit(c_round, set_id, c_commit.clone()); } - let round = self.round_messages.0; + let Round(round) = self.round_messages.0; for message in self.round_messages.1.iter().cloned() { - self.network.send_message(round, self.set_id, message); + self.network.send_message(round, set_id, message); } - for (&announce_hash, &round) in &self.announcements { - self.network.announce(round, self.set_id, announce_hash); + for (&announce_hash, &Round(round)) in &self.announcements { + self.network.announce(round, set_id, announce_hash); } } } @@ -139,22 +145,24 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> { if item.set_id() > self.set_id { self.set_id = item.set_id(); self.last_commit = None; - self.round_messages = (0, Vec::new()); + self.round_messages = (Round(0), Vec::new()); self.announcements.clear(); } match item { - Broadcast::Commit(set_id, round, commit) => { + Broadcast::Commit(round, set_id, commit) => { if self.set_id == set_id { - if round >= self.last_commit.as_ref().map_or(0, |&(r, _)| r) { + if round >= self.last_commit.as_ref() + .map_or(Round(0), |&(r, _)| r) + { self.last_commit = Some((round, commit.clone())); } } // always send out to network. - self.network.send_commit(round, self.set_id, commit); + self.network.send_commit(round.0, self.set_id.0, commit); } - Broadcast::Message(set_id, round, message) => { + Broadcast::Message(round, set_id, message) => { if self.set_id == set_id { if round > self.round_messages.0 { self.round_messages = (round, vec![message.clone()]); @@ -166,20 +174,20 @@ impl<B: BlockT, N: Network<B>> Future for BroadcastWorker<B, N> { } // always send out to network. - self.network.send_message(round, set_id, message); + self.network.send_message(round.0, set_id.0, message); } - Broadcast::Announcement(set_id, round, hash) => { + Broadcast::Announcement(round, set_id, hash) => { if self.set_id == set_id { self.announcements.insert(hash, round); } // always send out. - self.network.announce(round, set_id, hash); + self.network.announce(round.0, set_id.0, hash); } - Broadcast::DropRound(set_id, round) => { + Broadcast::DropRound(round, set_id) => { // stop making announcements for any dead rounds. self.announcements.retain(|_, &mut r| r > round); - self.network.drop_messages(round, set_id); + self.network.drop_messages(round.0, set_id.0); } } } @@ -196,11 +204,11 @@ impl<B: BlockT, N: Network<B>> Network<B> for BroadcastHandle<B, N> { } fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>) { - let _ = self.relay.unbounded_send(Broadcast::Message(set_id, round, message)); + let _ = self.relay.unbounded_send(Broadcast::Message(Round(round), SetId(set_id), message)); } fn drop_messages(&self, round: u64, set_id: u64) { - let _ = self.relay.unbounded_send(Broadcast::DropRound(set_id, round)); + let _ = self.relay.unbounded_send(Broadcast::DropRound(Round(round), SetId(set_id))); } fn commit_messages(&self, set_id: u64) -> Self::In { @@ -208,11 +216,13 @@ impl<B: BlockT, N: Network<B>> Network<B> for BroadcastHandle<B, N> { } fn send_commit(&self, round: u64, set_id: u64, message: Vec<u8>) { - let _ = self.relay.unbounded_send(Broadcast::Commit(round, set_id, message)); + let _ = self.relay.unbounded_send(Broadcast::Commit(Round(round), SetId(set_id), message)); } fn announce(&self, round: u64, set_id: u64, block: B::Hash) { - let _ = self.relay.unbounded_send(Broadcast::Announcement(round, set_id, block)); + let _ = self.relay.unbounded_send( + Broadcast::Announcement(Round(round), SetId(set_id), block) + ); } }