mod.rs 31 KiB
Newer Older
// This file is part of Substrate.

// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
Nikolay Volf's avatar
Nikolay Volf committed
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
//! Communication streams for the polite-grandpa networking protocol.
//!
//! GRANDPA nodes communicate over a gossip network, where messages are not sent to
//! peers until they have reached a given round.
//!
//! Rather than expressing protocol rules,
//! polite-grandpa just carries a notion of impoliteness. Nodes which pass some arbitrary
//! threshold of impoliteness are removed. Messages are either costly, or beneficial.
//!
//! For instance, it is _impolite_ to send the same message more than once.
//! In the future, there will be a fallback for allowing sending the same message
//! under certain conditions that are used to un-stick the protocol.

use futures::{prelude::*, channel::mpsc};
use log::{debug, trace};
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use std::{pin::Pin, sync::Arc, task::{Context, Poll}};
use sp_core::traits::BareCryptoStorePtr;
use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use finality_grandpa::{voter, voter_set::VoterSet};
use sc_network::{NetworkService, ReputationChange};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use parity_scale_codec::{Encode, Decode};
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
use sc_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO};
use crate::{
	CatchUp, Commit, CommunicationIn, CommunicationOutH,
	CompactCommit, Error, Message, SignedMessage,
use crate::environment::HasVoted;
	FullCatchUpMessage,
	FullCommitMessage,
	GossipMessage,
	GossipValidator,
	PeerReport,
	VoteMessage,
use sp_finality_grandpa::{
	AuthorityId, AuthoritySignature, SetId as SetIdNumber, RoundNumber,
use sp_utils::mpsc::TracingUnboundedReceiver;
mod periodic;

#[cfg(test)]
pub use sp_finality_grandpa::GRANDPA_ENGINE_ID;
pub const GRANDPA_PROTOCOL_NAME: &'static str = "/paritytech/grandpa/1";
// cost scalars for reporting peers.
mod cost {
	use sc_network::ReputationChange as Rep;
	pub(super) const PAST_REJECTION: Rep = Rep::new(-50, "Grandpa: Past message");
	pub(super) const BAD_SIGNATURE: Rep = Rep::new(-100, "Grandpa: Bad signature");
	pub(super) const MALFORMED_CATCH_UP: Rep = Rep::new(-1000, "Grandpa: Malformed cath-up");
	pub(super) const MALFORMED_COMMIT: Rep = Rep::new(-1000, "Grandpa: Malformed commit");
	pub(super) const FUTURE_MESSAGE: Rep = Rep::new(-500, "Grandpa: Future message");
	pub(super) const UNKNOWN_VOTER: Rep = Rep::new(-150, "Grandpa: Unknown voter");

	pub(super) const INVALID_VIEW_CHANGE: Rep = Rep::new(-500, "Grandpa: Invalid view change");
	pub(super) const PER_UNDECODABLE_BYTE: i32 = -5;
	pub(super) const PER_SIGNATURE_CHECKED: i32 = -25;
	pub(super) const PER_BLOCK_LOADED: i32 = -10;
	pub(super) const INVALID_CATCH_UP: Rep = Rep::new(-5000, "Grandpa: Invalid catch-up");
	pub(super) const INVALID_COMMIT: Rep = Rep::new(-5000, "Grandpa: Invalid commit");
	pub(super) const OUT_OF_SCOPE_MESSAGE: Rep = Rep::new(-500, "Grandpa: Out-of-scope message");
	pub(super) const CATCH_UP_REQUEST_TIMEOUT: Rep = Rep::new(-200, "Grandpa: Catch-up request timeout");

	// cost of answering a catch up request
	pub(super) const CATCH_UP_REPLY: Rep = Rep::new(-200, "Grandpa: Catch-up reply");
	pub(super) const HONEST_OUT_OF_SCOPE_CATCH_UP: Rep = Rep::new(-200, "Grandpa: Out-of-scope catch-up");
}

// benefit scalars for reporting peers.
mod benefit {
	use sc_network::ReputationChange as Rep;
	pub(super) const NEIGHBOR_MESSAGE: Rep = Rep::new(100, "Grandpa: Neighbor message");
	pub(super) const ROUND_MESSAGE: Rep = Rep::new(100, "Grandpa: Round message");
	pub(super) const BASIC_VALIDATED_CATCH_UP: Rep = Rep::new(200, "Grandpa: Catch-up message");
	pub(super) const BASIC_VALIDATED_COMMIT: Rep = Rep::new(100, "Grandpa: Commit");
	pub(super) const PER_EQUIVOCATION: i32 = 10;
}

/// A type that ties together our local authority id and a keystore where it is
/// available for signing.
pub struct LocalIdKeystore((AuthorityId, BareCryptoStorePtr));

impl LocalIdKeystore {
	/// Returns a reference to our local authority id.
	fn local_id(&self) -> &AuthorityId {
		&(self.0).0
	}

	/// Returns a reference to the keystore.
	fn keystore(&self) -> &BareCryptoStorePtr {
		&(self.0).1
	}
}

impl AsRef<BareCryptoStorePtr> for LocalIdKeystore {
	fn as_ref(&self) -> &BareCryptoStorePtr {
		self.keystore()
	}
}

impl From<(AuthorityId, BareCryptoStorePtr)> for LocalIdKeystore {
	fn from(inner: (AuthorityId, BareCryptoStorePtr)) -> LocalIdKeystore {
		LocalIdKeystore(inner)
	}
}

/// If the voter set is larger than this value some telemetry events are not
/// sent to avoid increasing usage resource on the node and flooding the
/// telemetry server (e.g. received votes, received commits.)
const TELEMETRY_VOTERS_LIMIT: usize = 10;

/// A handle to the network.
///
/// Something that provides both the capabilities needed for the `gossip_network::Network` trait as
/// well as the ability to set a fork sync request for a particular block.
pub trait Network<Block: BlockT>: GossipNetwork<Block> + Clone + Send + 'static {
	/// Notifies the sync service to try and sync the given block from the given
	/// peers.
	///
	/// If the given vector of peers is empty then the underlying implementation
	/// should make a best effort to fetch the block from any peers it is
	/// connected to (NOTE: this assumption will change in the future #3629).
	fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}

impl<B, H> Network<B> for Arc<NetworkService<B, H>> where
	B: BlockT,
	H: sc_network::ExHashT,
{
	fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
		NetworkService::set_sync_fork_request(self, peers, hash, number)
	}
}

/// Create a unique topic for a round and set-id combo.
pub(crate) fn round_topic<B: BlockT>(round: RoundNumber, set_id: SetIdNumber) -> B::Hash {
	<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes())
}

/// Create a unique topic for global messages on a set ID.
pub(crate) fn global_topic<B: BlockT>(set_id: SetIdNumber) -> B::Hash {
	<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes())
}

/// Bridge between the underlying network service, gossiping consensus messages and Grandpa
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
	service: N,
	gossip_engine: Arc<Mutex<GossipEngine<B>>>,
	validator: Arc<GossipValidator<B>>,

	/// Sender side of the neighbor packet channel.
	///
	/// Packets sent into this channel are processed by the `NeighborPacketWorker` and passed on to
	/// the underlying `GossipEngine`.
	neighbor_sender: periodic::NeighborPacketSender<B>,

	/// `NeighborPacketWorker` processing packets sent through the `NeighborPacketSender`.
	//
	// `NetworkBridge` is required to be cloneable, thus one needs to be able to clone its children,
	// thus one has to wrap `neighbor_packet_worker` with an `Arc` `Mutex`.
	neighbor_packet_worker: Arc<Mutex<periodic::NeighborPacketWorker<B>>>,

	/// Receiver side of the peer report stream populated by the gossip validator, forwarded to the
	/// gossip engine.
	//
	// `NetworkBridge` is required to be cloneable, thus one needs to be able to clone its children,
	// thus one has to wrap gossip_validator_report_stream with an `Arc` `Mutex`. Given that it is
	// just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer
	// channel implementation.
	gossip_validator_report_stream: Arc<Mutex<TracingUnboundedReceiver<PeerReport>>>,
impl<B: BlockT, N: Network<B>> Unpin for NetworkBridge<B, N> {}

impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
	/// Create a new NetworkBridge to the given NetworkService. Returns the service
	/// On creation it will register previous rounds' votes with the gossip
	/// service taken from the VoterSetState.
		service: N,
		config: crate::Config,
		set_state: crate::environment::SharedVoterSetState<B>,
		prometheus_registry: Option<&Registry>,
		let (validator, report_stream) = GossipValidator::new(
			config,
			set_state.clone(),
		let validator = Arc::new(validator);
		let gossip_engine = Arc::new(Mutex::new(GossipEngine::new(
			service.clone(),
			GRANDPA_ENGINE_ID,
			GRANDPA_PROTOCOL_NAME,
			validator.clone()
			// register all previous votes with the gossip service so that they're
			// available to peers potentially stuck on a previous round.
			let completed = set_state.read().completed_rounds();
			let (set_id, voters) = completed.set_info();
			validator.note_set(SetId(set_id), voters.to_vec(), |_, _| {});
			for round in completed.iter() {
				let topic = round_topic::<B>(round.number, set_id);

				// we need to note the round with the gossip validator otherwise
				// messages will be ignored.
				validator.note_round(Round(round.number), |_, _| {});
					let message = gossip::GossipMessage::Vote(
						gossip::VoteMessage::<B> {
							message: signed.clone(),
							round: Round(round.number),
							set_id: SetId(set_id),
						}
					);

					gossip_engine.lock().register_gossip_message(
						topic,
						message.encode(),
					);
				}

				trace!(target: "afg",
					"Registered {} messages for topic {:?} (round: {}, set_id: {})",
					round.votes.len(),
					topic,
					round.number,
					set_id,
				);
			}
		}

		let (neighbor_packet_worker, neighbor_packet_sender) = periodic::NeighborPacketWorker::new();
		NetworkBridge {
			service,
			gossip_engine,
			validator,
			neighbor_sender: neighbor_packet_sender,
			neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)),
			gossip_validator_report_stream: Arc::new(Mutex::new(report_stream)),
	/// Note the beginning of a new round to the `GossipValidator`.
	pub(crate) fn note_round(
		voters: &VoterSet<AuthorityId>,
		// is a no-op if currently in that set.
		self.validator.note_set(
			set_id,
			voters.iter().map(|(v, _)| v.clone()).collect(),
			|to, neighbor| self.neighbor_sender.send(to, neighbor),
		self.validator.note_round(
			round,
			|to, neighbor| self.neighbor_sender.send(to, neighbor),
	/// Get a stream of signature-checked round messages from the network as well as a sink for round messages to the
	/// network all within the current set.
	pub(crate) fn round_communication(
		&self,
		keystore: Option<LocalIdKeystore>,
		round: Round,
		set_id: SetId,
		voters: Arc<VoterSet<AuthorityId>>,
		has_voted: HasVoted<B>,
	) -> (
		impl Stream<Item = SignedMessage<B>> + Unpin,
	) {
		self.note_round(
			round,
			set_id,
			&*voters,
		);
		let keystore = keystore.and_then(|ks| {
			let id = ks.local_id();
			if voters.contains(id) {
				Some(ks)
			} else {
				None
			}
		});

		let topic = round_topic::<B>(round.0, set_id.0);
		let incoming = self.gossip_engine.lock().messages_for(topic)
				let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);

				match decoded {
					Err(ref e) => {
						debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
						future::ready(None)
						if !voters.contains(&msg.message.id) {
							debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
							return future::ready(None);
						if voters.len().get() <= TELEMETRY_VOTERS_LIMIT {
							match &msg.message.message {
								PrimaryPropose(propose) => {
									telemetry!(CONSENSUS_INFO; "afg.received_propose";
										"voter" => ?format!("{}", msg.message.id),
										"target_number" => ?propose.target_number,
										"target_hash" => ?propose.target_hash,
									);
								},
								Prevote(prevote) => {
									telemetry!(CONSENSUS_INFO; "afg.received_prevote";
										"voter" => ?format!("{}", msg.message.id),
										"target_number" => ?prevote.target_number,
										"target_hash" => ?prevote.target_hash,
									);
								},
								Precommit(precommit) => {
									telemetry!(CONSENSUS_INFO; "afg.received_precommit";
										"voter" => ?format!("{}", msg.message.id),
										"target_number" => ?precommit.target_number,
										"target_hash" => ?precommit.target_hash,
									);
								},
							};
						}
						future::ready(Some(msg.message))
					}
					_ => {
						debug!(target: "afg", "Skipping unknown message type");
						future::ready(None)
		let (tx, out_rx) = mpsc::channel(0);
		let outgoing = OutgoingMessages::<B> {
			network: self.gossip_engine.clone(),
		// Combine incoming votes from external GRANDPA nodes with outgoing
		// votes from our own GRANDPA voter to have a single
		// vote-import-pipeline.
		let incoming = stream::select(incoming, out_rx);

		(incoming, outgoing)
	}

	/// Set up the global communication streams.
	pub(crate) fn global_communication(
		&self,
		set_id: SetId,
		voters: Arc<VoterSet<AuthorityId>>,
		is_voter: bool,
		impl Stream<Item = CommunicationIn<B>>,
		impl Sink<CommunicationOutH<B, B::Hash>, Error = Error> + Unpin,
		self.validator.note_set(
			set_id,
			voters.iter().map(|(v, _)| v.clone()).collect(),
			|to, neighbor| self.neighbor_sender.send(to, neighbor),

		let topic = global_topic::<B>(set_id.0);
		let incoming = incoming_global(
			topic,
			voters,
			self.validator.clone(),
			self.neighbor_sender.clone(),
		);
		let outgoing = CommitsOut::<B>::new(
			self.gossip_engine.clone(),
			self.validator.clone(),
			self.neighbor_sender.clone(),
		let outgoing = outgoing.with(|out| {
			let voter::CommunicationOut::Commit(round, commit) = out;
			future::ok((round, commit))

	/// Notifies the sync service to try and sync the given block from the given
	/// peers.
	///
	/// If the given vector of peers is empty then the underlying implementation
	/// should make a best effort to fetch the block from any peers it is
	/// connected to (NOTE: this assumption will change in the future #3629).
	pub(crate) fn set_sync_fork_request(
		&self,
		peers: Vec<sc_network::PeerId>,
		hash: B::Hash,
		number: NumberFor<B>
	) {
		Network::set_sync_fork_request(&self.service, peers, hash, number)
impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
	fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
			match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
				Poll::Ready(Some((to, packet))) => {
					self.gossip_engine.lock().send_message(to, packet.encode());
				Poll::Ready(None) => return Poll::Ready(
					Err(Error::Network("Neighbor packet worker stream closed.".into()))
				Poll::Pending => break,

		loop {
			match self.gossip_validator_report_stream.lock().poll_next_unpin(cx) {
				Poll::Ready(Some(PeerReport { who, cost_benefit })) => {
					self.gossip_engine.lock().report(who, cost_benefit);
				Poll::Ready(None) => return Poll::Ready(
					Err(Error::Network("Gossip validator report stream closed.".into()))
				),
				Poll::Pending => break,
		match self.gossip_engine.lock().poll_unpin(cx) {
			Poll::Ready(()) => return Poll::Ready(
				Err(Error::Network("Gossip engine future finished.".into()))
			),
fn incoming_global<B: BlockT>(
	gossip_engine: Arc<Mutex<GossipEngine<B>>>,
	topic: B::Hash,
	voters: Arc<VoterSet<AuthorityId>>,
	gossip_validator: Arc<GossipValidator<B>>,
	neighbor_sender: periodic::NeighborPacketSender<B>,
) -> impl Stream<Item = CommunicationIn<B>> {
	let process_commit = move |
		msg: FullCommitMessage<B>,
		mut notification: sc_network_gossip::TopicNotification,
		gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
		gossip_validator: &Arc<GossipValidator<B>>,
		voters: &VoterSet<AuthorityId>,
	| {
		if voters.len().get() <= TELEMETRY_VOTERS_LIMIT {
			let precommits_signed_by: Vec<String> =
				msg.message.auth_data.iter().map(move |(_, a)| {
					format!("{}", a)
				}).collect();

			telemetry!(CONSENSUS_INFO; "afg.received_commit";
				"contains_precommits_signed_by" => ?precommits_signed_by,
				"target_number" => ?msg.message.target_number.clone(),
				"target_hash" => ?msg.message.target_hash.clone(),
			);
		}

		if let Err(cost) = check_compact_commit::<B>(
			&msg.message,
			voters,
			msg.round,
			msg.set_id,
		) {
			if let Some(who) = notification.sender {
				gossip_engine.lock().report(who, cost);
		let round = msg.round;
		let set_id = msg.set_id;
		let commit = msg.message;
		let finalized_number = commit.target_number;
		let gossip_validator = gossip_validator.clone();
		let gossip_engine = gossip_engine.clone();
		let neighbor_sender = neighbor_sender.clone();
		let cb = move |outcome| match outcome {
			voter::CommitProcessingOutcome::Good(_) => {
				// if it checks out, gossip it. not accounting for
				// any discrepancy between the actual ghost and the claimed
				// finalized number.
				gossip_validator.note_commit_finalized(
					finalized_number,
					|to, neighbor| neighbor_sender.send(to, neighbor),
				gossip_engine.lock().gossip_message(topic, notification.message.clone(), false);
			}
			voter::CommitProcessingOutcome::Bad(_) => {
				// report peer and do not gossip.
				if let Some(who) = notification.sender.take() {
					gossip_engine.lock().report(who, cost::INVALID_COMMIT);
				}
			}
		};

		let cb = voter::Callback::Work(Box::new(cb));

		Some(voter::CommunicationIn::Commit(round.0, commit, cb))
	};

	let process_catch_up = move |
		msg: FullCatchUpMessage<B>,
		mut notification: sc_network_gossip::TopicNotification,
		gossip_engine: &Arc<Mutex<GossipEngine<B>>>,
		gossip_validator: &Arc<GossipValidator<B>>,
		voters: &VoterSet<AuthorityId>,
	| {
		let gossip_validator = gossip_validator.clone();
		let gossip_engine = gossip_engine.clone();

		if let Err(cost) = check_catch_up::<B>(
			&msg.message,
			voters,
			msg.set_id,
		) {
			if let Some(who) = notification.sender {
				gossip_engine.lock().report(who, cost);
			}

			return None;
		}

		let cb = move |outcome| {
			if let voter::CatchUpProcessingOutcome::Bad(_) = outcome {
				// report peer
				if let Some(who) = notification.sender.take() {
					gossip_engine.lock().report(who, cost::INVALID_CATCH_UP);
				}
			}

			gossip_validator.note_catch_up_message_processed();
		};

		let cb = voter::Callback::Work(Box::new(cb));

		Some(voter::CommunicationIn::CatchUp(msg.message, cb))
	};

	gossip_engine.clone().lock().messages_for(topic)
		.filter_map(|notification| {
			// this could be optimized by decoding piecewise.
			let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
			if let Err(ref e) = decoded {
				trace!(target: "afg", "Skipping malformed commit message {:?}: {}", notification, e);
			future::ready(decoded.map(move |d| (notification, d)).ok())
		})
		.filter_map(move |(notification, msg)| {
			future::ready(match msg {
				GossipMessage::Commit(msg) =>
					process_commit(msg, notification, &gossip_engine, &gossip_validator, &*voters),
				GossipMessage::CatchUp(msg) =>
					process_catch_up(msg, notification, &gossip_engine, &gossip_validator, &*voters),
				_ => {
					debug!(target: "afg", "Skipping unknown message type");
impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
	fn clone(&self) -> Self {
		NetworkBridge {
			gossip_engine: self.gossip_engine.clone(),
			validator: Arc::clone(&self.validator),
			neighbor_sender: self.neighbor_sender.clone(),
			neighbor_packet_worker: self.neighbor_packet_worker.clone(),
			gossip_validator_report_stream: self.gossip_validator_report_stream.clone(),
/// Type-safe wrapper around a round number.
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)]
pub struct Round(pub RoundNumber);
/// Type-safe wrapper around a set ID.
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)]
pub struct SetId(pub SetIdNumber);
/// A sink for outgoing messages to the network. Any messages that are sent will
/// be replaced, as appropriate, according to the given `HasVoted`.
/// NOTE: The votes are stored unsigned, which means that the signatures need to
/// be "stable", i.e. we should end up with the exact same signed message if we
/// use the same raw message and key to sign. This is currently true for
/// `ed25519` and `BLS` signatures (which we might use in the future), care must
/// be taken when switching to different key types.
pub(crate) struct OutgoingMessages<Block: BlockT> {
	round: RoundNumber,
	set_id: SetIdNumber,
	keystore: Option<LocalIdKeystore>,
	sender: mpsc::Sender<SignedMessage<Block>>,
	network: Arc<Mutex<GossipEngine<Block>>>,
impl<B: BlockT> Unpin for OutgoingMessages<B> {}

impl<Block: BlockT> Sink<Message<Block>> for OutgoingMessages<Block>
	fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
		Sink::poll_ready(Pin::new(&mut self.sender), cx)
			.map(|elem| { elem.map_err(|e| {
				Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
			})})
	}

	fn start_send(mut self: Pin<&mut Self>, mut msg: Message<Block>) -> Result<(), Self::Error> {
		// if we've voted on this round previously under the same key, send that vote instead
		match &mut msg {
			finality_grandpa::Message::PrimaryPropose(ref mut vote) =>
				if let Some(propose) = self.has_voted.propose() {
					*vote = propose.clone();
				},
			finality_grandpa::Message::Prevote(ref mut vote) =>
				if let Some(prevote) = self.has_voted.prevote() {
					*vote = prevote.clone();
				},
			finality_grandpa::Message::Precommit(ref mut vote) =>
				if let Some(precommit) = self.has_voted.precommit() {
					*vote = precommit.clone();
				},
		}

		// when locals exist, sign messages on import
		if let Some(ref keystore) = self.keystore {
			let target_hash = *(msg.target().0);
			let signed = sp_finality_grandpa::sign_message(
				keystore.local_id().clone(),
				self.round,
				self.set_id,
			).ok_or_else(
				|| Error::Signing(format!(
					"Failed to sign GRANDPA vote for round {} targetting {:?}", self.round, target_hash
				))
			)?;
			let message = GossipMessage::Vote(VoteMessage::<Block> {
				message: signed.clone(),
				round: Round(self.round),
				set_id: SetId(self.set_id),
			});

			debug!(
				target: "afg",
				"Announcing block {} to peers which we voted on in round {} in set {}",
				target_hash,
				self.round,
				self.set_id,
			);

			telemetry!(
				CONSENSUS_DEBUG; "afg.announcing_blocks_to_voted_peers";
				"block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id,
			);

			// announce the block we voted on to our peers.
			self.network.lock().announce(target_hash, Vec::new());
			// propagate the message to peers
			let topic = round_topic::<Block>(self.round, self.set_id);
			self.network.lock().gossip_message(topic, message.encode(), false);

			// forward the message to the inner sender.
			return self.sender.start_send(signed).map_err(|e| {
				Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
			});
		};
	fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
		Sink::poll_close(Pin::new(&mut self.sender), cx)
			.map(|elem| { elem.map_err(|e| {
				Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
			})})
// checks a compact commit. returns the cost associated with processing it if
// the commit was bad.
fn check_compact_commit<Block: BlockT>(
	msg: &CompactCommit<Block>,
	round: Round,
	set_id: SetId,
) -> Result<(), ReputationChange> {
	// 4f + 1 = equivocations from f voters.
	let f = voters.total_weight() - voters.threshold();
	let full_threshold = (f + voters.total_weight()).0;
	// check total weight is not out of range.
	let mut total_weight = 0;
	for (_, ref id) in &msg.auth_data {
		if let Some(weight) = voters.get(id).map(|info| info.weight()) {
			total_weight += weight.get();
			if total_weight > full_threshold {
				return Err(cost::MALFORMED_COMMIT);
			}
		} else {
			debug!(target: "afg", "Skipping commit containing unknown voter {}", id);
			return Err(cost::MALFORMED_COMMIT);
		}
	}

	if total_weight < voters.threshold().get() {
		return Err(cost::MALFORMED_COMMIT);
	}

	// check signatures on all contained precommits.
	for (i, (precommit, &(ref sig, ref id))) in msg.precommits.iter()
		.zip(&msg.auth_data)
		.enumerate()
	{
		use crate::communication::gossip::Misbehavior;
		use finality_grandpa::Message as GrandpaMessage;
		if !sp_finality_grandpa::check_message_signature_with_buffer(
			&GrandpaMessage::Precommit(precommit.clone()),
			id,
			sig,
			round.0,
			set_id.0,
		) {
			debug!(target: "afg", "Bad commit message signature {}", id);
			telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id);
			let cost = Misbehavior::BadCommitMessage {
				signatures_checked: i as i32,
				blocks_loaded: 0,
				equivocations_caught: 0,
			}.cost();

			return Err(cost);
// checks a catch up. returns the cost associated with processing it if
// the catch up was bad.
fn check_catch_up<Block: BlockT>(
	msg: &CatchUp<Block>,
	voters: &VoterSet<AuthorityId>,
	set_id: SetId,
) -> Result<(), ReputationChange> {
	// 4f + 1 = equivocations from f voters.
	let f = voters.total_weight() - voters.threshold();
	let full_threshold = (f + voters.total_weight()).0;

	// check total weight is not out of range for a set of votes.
	fn check_weight<'a>(
		voters: &'a VoterSet<AuthorityId>,
		votes: impl Iterator<Item=&'a AuthorityId>,
		full_threshold: u64,
	) -> Result<(), ReputationChange> {
		let mut total_weight = 0;

		for id in votes {
			if let Some(weight) = voters.get(&id).map(|info| info.weight()) {
				total_weight += weight.get();
				if total_weight > full_threshold {
					return Err(cost::MALFORMED_CATCH_UP);
				}
			} else {
				debug!(target: "afg", "Skipping catch up message containing unknown voter {}", id);
				return Err(cost::MALFORMED_CATCH_UP);
			}
		}

		if total_weight < voters.threshold().get() {
			return Err(cost::MALFORMED_CATCH_UP);
		}

		Ok(())
	};

	check_weight(
		voters,
		msg.prevotes.iter().map(|vote| &vote.id),
		full_threshold,
	)?;

	check_weight(
		voters,
		msg.precommits.iter().map(|vote| &vote.id),
		full_threshold,
	)?;

	fn check_signatures<'a, B, I>(
		messages: I,
		round: RoundNumber,
		set_id: SetIdNumber,
		mut signatures_checked: usize,
	) -> Result<usize, ReputationChange> where
		B: BlockT,
		I: Iterator<Item=(Message<B>, &'a AuthorityId, &'a AuthoritySignature)>,
	{
		use crate::communication::gossip::Misbehavior;

		for (msg, id, sig) in messages {
			signatures_checked += 1;

			if !sp_finality_grandpa::check_message_signature_with_buffer(
				&msg,
				id,
				sig,
				round,
				set_id,
			) {
				debug!(target: "afg", "Bad catch up message signature {}", id);
				telemetry!(CONSENSUS_DEBUG; "afg.bad_catch_up_msg_signature"; "id" => ?id);

				let cost = Misbehavior::BadCatchUpMessage {
					signatures_checked: signatures_checked as i32,
				}.cost();

				return Err(cost);
			}
		}

		Ok(signatures_checked)
	}

	// check signatures on all contained prevotes.
	let signatures_checked = check_signatures::<Block, _>(
		msg.prevotes.iter().map(|vote| {
			(finality_grandpa::Message::Prevote(vote.prevote.clone()), &vote.id, &vote.signature)
		}),
		msg.round_number,
		set_id.0,
		0,
	)?;

	// check signatures on all contained precommits.
	let _ = check_signatures::<Block, _>(
		msg.precommits.iter().map(|vote| {
			(finality_grandpa::Message::Precommit(vote.precommit.clone()), &vote.id, &vote.signature)
		}),
		msg.round_number,
		set_id.0,
		signatures_checked,
/// An output sink for commit messages.
struct CommitsOut<Block: BlockT> {
	network: Arc<Mutex<GossipEngine<Block>>>,
	gossip_validator: Arc<GossipValidator<Block>>,
	neighbor_sender: periodic::NeighborPacketSender<Block>,
impl<Block: BlockT> CommitsOut<Block> {
	/// Create a new commit output stream.
	pub(crate) fn new(
		network: Arc<Mutex<GossipEngine<Block>>>,
		set_id: SetIdNumber,
		is_voter: bool,
		gossip_validator: Arc<GossipValidator<Block>>,
		neighbor_sender: periodic::NeighborPacketSender<Block>,
		CommitsOut {
			network,
			set_id: SetId(set_id),
			is_voter,
			gossip_validator,
impl<Block: BlockT> Sink<(RoundNumber, Commit<Block>)> for CommitsOut<Block> {
	type Error = Error;

	fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
		Poll::Ready(Ok(()))
	}
	fn start_send(self: Pin<&mut Self>, input: (RoundNumber, Commit<Block>)) -> Result<(), Self::Error> {
		}

		let (round, commit) = input;
		let round = Round(round);

		telemetry!(CONSENSUS_DEBUG; "afg.commit_issued";
			"target_number" => ?commit.target_number, "target_hash" => ?commit.target_hash,
		);
		let (precommits, auth_data) = commit.precommits.into_iter()
			.map(|signed| (signed.precommit, (signed.signature, signed.id)))
			.unzip();

		let compact_commit = CompactCommit::<Block> {
			target_hash: commit.target_hash,
			target_number: commit.target_number,
			precommits,
			auth_data
		};

		let message = GossipMessage::Commit(FullCommitMessage::<Block> {
			set_id: self.set_id,
			message: compact_commit,
		});

		let topic = global_topic::<Block>(self.set_id.0);

		// the gossip validator needs to be made aware of the best commit-height we know of
		// before gossiping
		self.gossip_validator.note_commit_finalized(