// This file is part of Substrate.

// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::{
	ExHashT,
	chain::{Client, FinalityProofProvider},
	config::{BoxFinalityProofRequestBuilder, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport},
	error,
	utils::interval
};

use bytes::{Bytes, BytesMut};
use futures::{prelude::*, stream::FuturesUnordered};
use generic_proto::{GenericProto, GenericProtoOut};
use libp2p::{Multiaddr, PeerId};
use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}};
use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler};
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use sp_core::{
	storage::{StorageKey, PrefixedStorageKey, ChildInfo, ChildType},
	hexdisplay::HexDisplay
};
use sp_consensus::{
	BlockOrigin,
	block_validation::BlockAnnounceValidator,
	import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use codec::{Decode, Encode};
use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification};
use sp_runtime::traits::{
	Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub
};
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, Message};
use message::generic::{Message as GenericMessage, ConsensusMessage, Roles};
use prometheus_endpoint::{Registry, Gauge, GaugeVec, HistogramVec, PrometheusError, Opts, register, U64};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::fmt::Write;
use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use sc_client_api::{ChangesProof, StorageProof};
use util::LruHashSet;
use wasm_timer::Instant;

mod generic_proto;
mod util;

pub mod message;
pub mod event;
pub mod sync;

pub use generic_proto::LegacyConnectionKillError;

const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
/// Interval at which we propagate extrinsics;
const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900);

/// Maximim number of known block hashes to keep for a peer.
const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead
/// Maximim number of known extrinsic hashes to keep for a peer.
const MAX_KNOWN_EXTRINSICS: usize = 4096; // ~128kb per peer + overhead

/// Maximim number of transaction validation request we keep at any moment.
const MAX_PENDING_TRANSACTIONS: usize = 8192;

/// Current protocol version.
pub(crate) const CURRENT_VERSION: u32 = 6;
/// Lowest version we support
pub(crate) const MIN_VERSION: u32 = 3;

// Maximum allowed entries in `BlockResponse`
const MAX_BLOCK_DATA_RESPONSE: u32 = 128;
/// When light node connects to the full node and the full node is behind light node
/// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it not useful
/// and disconnect to free connection slot.
const LIGHT_MAXIMAL_BLOCKS_DIFFERENCE: u64 = 8192;

mod rep {
	use sc_peerset::ReputationChange as Rep;
	/// Reputation change when a peer is "clogged", meaning that it's not fast enough to process our
	/// messages.
	pub const CLOGGED_PEER: Rep = Rep::new(-(1 << 12), "Clogged message queue");
	/// Reputation change when a peer doesn't respond in time to our messages.
	pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
	/// Reputation change when a peer sends us a status message while we already received one.
	pub const UNEXPECTED_STATUS: Rep = Rep::new(-(1 << 20), "Unexpected status message");
	/// Reputation change when we are a light client and a peer is behind us.
	pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
	/// Reputation change when a peer sends us any extrinsic.
	///
	/// This forces node to verify it, thus the negative value here. Once extrinsic is verified,
	/// reputation change should be refunded with `ANY_EXTRINSIC_REFUND`
	pub const ANY_EXTRINSIC: Rep = Rep::new(-(1 << 4), "Any extrinsic");
	/// Reputation change when a peer sends us any extrinsic that is not invalid.
	pub const ANY_EXTRINSIC_REFUND: Rep = Rep::new(1 << 4, "Any extrinsic (refund)");
	/// Reputation change when a peer sends us an extrinsic that we didn't know about.
	pub const GOOD_EXTRINSIC: Rep = Rep::new(1 << 7, "Good extrinsic");
	/// Reputation change when a peer sends us a bad extrinsic.
	pub const BAD_EXTRINSIC: Rep = Rep::new(-(1 << 12), "Bad extrinsic");
	/// We sent an RPC query to the given node, but it failed.
	pub const RPC_FAILED: Rep = Rep::new(-(1 << 12), "Remote call failed");
	/// We received a message that failed to decode.
	pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
	/// We received an unexpected response.
	pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet");
	/// We received an unexpected extrinsic packet.
	pub const UNEXPECTED_EXTRINSICS: Rep = Rep::new_fatal("Unexpected extrinsics packet");
	/// We received an unexpected light node request.
	pub const UNEXPECTED_REQUEST: Rep = Rep::new_fatal("Unexpected block request packet");
	/// Peer has different genesis.
	pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch");
	/// Peer is on unsupported protocol version.
	pub const BAD_PROTOCOL: Rep = Rep::new_fatal("Unsupported protocol");
	/// Peer role does not match (e.g. light peer connecting to another light peer).
	pub const BAD_ROLE: Rep = Rep::new_fatal("Unsupported role");
	/// Peer response data does not have requested bits.
	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
}

struct Metrics {
	handshaking_peers: Gauge<U64>,
	obsolete_requests: Gauge<U64>,
	peers: Gauge<U64>,
	queued_blocks: Gauge<U64>,
	fork_targets: Gauge<U64>,
	finality_proofs: GaugeVec<U64>,
	justifications: GaugeVec<U64>,
}

impl Metrics {
	fn register(r: &Registry) -> Result<Self, PrometheusError> {
		Ok(Metrics {
			handshaking_peers: {
				let g = Gauge::new("sync_handshaking_peers", "Number of newly connected peers")?;
				register(g, r)?
			},
			obsolete_requests: {
				let g = Gauge::new("sync_obsolete_requests", "Number of obsolete requests")?;
				register(g, r)?
			},
			peers: {
				let g = Gauge::new("sync_peers", "Number of peers we sync with")?;
				register(g, r)?
			},
			queued_blocks: {
				let g = Gauge::new("sync_queued_blocks", "Number of blocks in import queue")?;
				register(g, r)?
			},
			fork_targets: {
				let g = Gauge::new("sync_fork_targets", "Number of fork sync targets")?;
				register(g, r)?
			},
			justifications: {
				let g = GaugeVec::new(
					Opts::new(
						"sync_extra_justifications",
						"Number of extra justifications requests"
					),
					&["status"],
				)?;
				register(g, r)?
			},
			finality_proofs: {
				let g = GaugeVec::new(
					Opts::new(
						"sync_extra_finality_proofs",
						"Number of extra finality proof requests",
					),
					&["status"],
				)?;
				register(g, r)?
			},
		})
	}
}

struct PendingTransaction {
	validation: TransactionImportFuture,
	peer_id: PeerId,
}

impl Future for PendingTransaction {
	type Output = (PeerId, TransactionImport);

	fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
		let this = Pin::into_inner(self);
		if let Poll::Ready(import_result) = this.validation.poll_unpin(cx) {
			return Poll::Ready((this.peer_id.clone(), import_result));
		}

		Poll::Pending
	}
}

// Lock must always be taken in order declared here.
pub struct Protocol<B: BlockT, H: ExHashT> {
	/// Interval at which we call `tick`.
	tick_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
	/// Interval at which we call `propagate_extrinsics`.
	propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>,
	/// Pending list of messages to return from `poll` as a priority.
	pending_messages: VecDeque<CustomMessageOutcome<B>>,
	/// Pending extrinsic verification tasks.
	pending_transactions: FuturesUnordered<PendingTransaction>,
	config: ProtocolConfig,
	genesis_hash: B::Hash,
	sync: ChainSync<B>,
	context_data: ContextData<B, H>,
	/// List of nodes for which we perform additional logging because they are important for the
	/// user.
	important_peers: HashSet<PeerId>,
	// Connected peers pending Status message.
	handshaking_peers: HashMap<PeerId, HandshakingPeer>,
	/// Used to report reputation changes.
	peerset_handle: sc_peerset::PeersetHandle,
	transaction_pool: Arc<dyn TransactionPool<H, B>>,
	/// When asked for a proof of finality, we use this struct to build one.
	finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
	/// Handles opening the unique substream and sending and receiving raw messages.
	behaviour: GenericProto,
	/// For each legacy gossiping engine ID, the corresponding new protocol name.
	protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
	/// For each protocol name, the legacy equivalent.
	legacy_equiv_by_name: HashMap<Cow<'static, [u8]>, Fallback>,
	/// Name of the protocol used for transactions.
	transactions_protocol: Cow<'static, [u8]>,
	/// Name of the protocol used for block announces.
	block_announces_protocol: Cow<'static, [u8]>,
	/// Prometheus metrics.
	metrics: Option<Metrics>,
	/// The `PeerId`'s of all boot nodes.
	boot_node_ids: Arc<HashSet<PeerId>>,
	/// If true, we send back requests as `CustomMessageOutcome` events. If false, we directly
	/// dispatch requests using the legacy substream.
	use_new_block_requests_protocol: bool,
}

#[derive(Default)]
struct PacketStats {
	bytes_in: u64,
	bytes_out: u64,
	count_in: u64,
	count_out: u64,
}

/// A peer that we are connected to
/// and from whom we have not yet received a Status message.
struct HandshakingPeer {
	timestamp: Instant,
}

/// Peer information
#[derive(Debug, Clone)]
struct Peer<B: BlockT, H: ExHashT> {
	info: PeerInfo<B>,
	/// Current block request, if any.
	block_request: Option<(Instant, message::BlockRequest<B>)>,
	/// Requests we are no longer interested in.
	obsolete_requests: HashMap<message::RequestId, Instant>,
	/// Holds a set of transactions known to this peer.
	known_extrinsics: LruHashSet<H>,
	/// Holds a set of blocks known to this peer.
	known_blocks: LruHashSet<B::Hash>,
	/// Request counter,
	next_request_id: message::RequestId,
}

/// Info about a peer's known state.
#[derive(Clone, Debug)]
pub struct PeerInfo<B: BlockT> {
	/// Roles
	pub roles: Roles,
	/// Protocol version
	pub protocol_version: u32,
	/// Peer best block hash
	pub best_hash: B::Hash,
	/// Peer best block number
	pub best_number: <B::Header as HeaderT>::Number,
}

/// Data necessary to create a context.
struct ContextData<B: BlockT, H: ExHashT> {
	// All connected peers
	peers: HashMap<PeerId, Peer<B, H>>,
	stats: HashMap<&'static str, PacketStats>,
	pub chain: Arc<dyn Client<B>>,
}

/// Configuration for the Substrate-specific part of the networking layer.
#[derive(Clone)]
pub struct ProtocolConfig {
	/// Assigned roles.
	pub roles: Roles,
	/// Maximum number of peers to ask the same blocks in parallel.
	pub max_parallel_downloads: u32,
}

impl Default for ProtocolConfig {
	fn default() -> ProtocolConfig {
		ProtocolConfig {
			roles: Roles::FULL,
			max_parallel_downloads: 5,
		}
	}
}

/// Handshake sent when we open a block announces substream.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)]
struct BlockAnnouncesHandshake<B: BlockT> {
	/// Roles of the node.
	roles: Roles,
	/// Best block number.
	best_number: NumberFor<B>,
	/// Best block hash.
	best_hash: B::Hash,
	/// Genesis block hash.
	genesis_hash: B::Hash,
}

impl<B: BlockT> BlockAnnouncesHandshake<B> {
	fn build(protocol_config: &ProtocolConfig, chain: &Arc<dyn Client<B>>) -> Self {
		let info = chain.info();
		BlockAnnouncesHandshake {
			genesis_hash: info.genesis_hash,
			roles: protocol_config.roles,
			best_number: info.best_number,
			best_hash: info.best_hash,
		}
	}
}

/// Fallback mechanism to use to send a notification if no substream is open.
#[derive(Debug, Clone, PartialEq, Eq)]
enum Fallback {
	/// Use a `Message::Consensus` with the given engine ID.
	Consensus(ConsensusEngineId),
	/// The message is the bytes encoding of a `Transactions<E>` (which is itself defined as a `Vec<E>`).
	Transactions,
	/// The message is the bytes encoding of a `BlockAnnounce<H>`.
	BlockAnnounce,
}

impl<B: BlockT, H: ExHashT> Protocol<B, H> {
	/// Create a new instance.
	pub fn new(
		config: ProtocolConfig,
		local_peer_id: PeerId,
		chain: Arc<dyn Client<B>>,
		transaction_pool: Arc<dyn TransactionPool<H, B>>,
		finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
		finality_proof_request_builder: Option<BoxFinalityProofRequestBuilder<B>>,
		protocol_id: ProtocolId,
		peerset_config: sc_peerset::PeersetConfig,
		block_announce_validator: Box<dyn BlockAnnounceValidator<B> + Send>,
		metrics_registry: Option<&Registry>,
		boot_node_ids: Arc<HashSet<PeerId>>,
		use_new_block_requests_protocol: bool,
		queue_size_report: Option<HistogramVec>,
	) -> error::Result<(Protocol<B, H>, sc_peerset::PeersetHandle)> {
		let info = chain.info();
		let sync = ChainSync::new(
			config.roles,
			chain.clone(),
			&info,
			finality_proof_request_builder,
			block_announce_validator,
			config.max_parallel_downloads,
		);

		let important_peers = {
			let mut imp_p = HashSet::new();
			for reserved in peerset_config.priority_groups.iter().flat_map(|(_, l)| l.iter()) {
				imp_p.insert(reserved.clone());
			}
			imp_p.shrink_to_fit();
			imp_p
		};

		let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
		let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
		let mut behaviour = GenericProto::new(
			local_peer_id,
			protocol_id.clone(),
			versions,
			peerset,
			queue_size_report
		);

		let mut legacy_equiv_by_name = HashMap::new();

		let transactions_protocol: Cow<'static, [u8]> = Cow::from({
			let mut proto = b"/".to_vec();
			proto.extend(protocol_id.as_bytes());
			proto.extend(b"/transactions/1");
			proto
		});
		behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
		legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);

		let block_announces_protocol: Cow<'static, [u8]> = Cow::from({
			let mut proto = b"/".to_vec();
			proto.extend(protocol_id.as_bytes());
			proto.extend(b"/block-announces/1");
			proto
		});
		behaviour.register_notif_protocol(
			block_announces_protocol.clone(),
			BlockAnnouncesHandshake::build(&config, &chain).encode()
		);
		legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);

		let protocol = Protocol {
			tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
			propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
			pending_messages: VecDeque::new(),
			pending_transactions: FuturesUnordered::new(),
			config,
			context_data: ContextData {
				peers: HashMap::new(),
				stats: HashMap::new(),
				chain,
			},
			genesis_hash: info.genesis_hash,
			sync,
			handshaking_peers: HashMap::new(),
			important_peers,
			transaction_pool,
			finality_proof_provider,
			peerset_handle: peerset_handle.clone(),
			behaviour,
			protocol_name_by_engine: HashMap::new(),
			legacy_equiv_by_name,
			transactions_protocol,
			block_announces_protocol,
			metrics: if let Some(r) = metrics_registry {
				Some(Metrics::register(r)?)
			} else {
				None
			},
			boot_node_ids,
			use_new_block_requests_protocol,
		};

		Ok((protocol, peerset_handle))
	}

	/// Returns the list of all the peers we have an open channel to.
	pub fn open_peers(&self) -> impl Iterator<Item = &PeerId> {
		self.behaviour.open_peers()
	}

	/// Returns true if we have a channel open with this node.
	pub fn is_open(&self, peer_id: &PeerId) -> bool {
		self.behaviour.is_open(peer_id)
	}

	/// Returns the list of all the peers that the peerset currently requests us to be connected to.
	pub fn requested_peers(&self) -> impl Iterator<Item = &PeerId> {
		self.behaviour.requested_peers()
	}

	/// Returns the number of discovered nodes that we keep in memory.
	pub fn num_discovered_peers(&self) -> usize {
		self.behaviour.num_discovered_peers()
	}

	/// Disconnects the given peer if we are connected to it.
	pub fn disconnect_peer(&mut self, peer_id: &PeerId) {
		self.behaviour.disconnect_peer(peer_id)
	}

	/// Returns true if we try to open protocols with the given peer.
	pub fn is_enabled(&self, peer_id: &PeerId) -> bool {
		self.behaviour.is_enabled(peer_id)
	}

	/// Returns the state of the peerset manager, for debugging purposes.
	pub fn peerset_debug_info(&mut self) -> serde_json::Value {
		self.behaviour.peerset_debug_info()
	}

	/// Returns the number of peers we're connected to.
	pub fn num_connected_peers(&self) -> usize {
		self.context_data.peers.values().count()
	}

	/// Returns the number of peers we're connected to and that are being queried.
	pub fn num_active_peers(&self) -> usize {
		self.context_data
			.peers
			.values()
			.filter(|p| p.block_request.is_some())
			.count()
	}

	/// Current global sync state.
	pub fn sync_state(&self) -> SyncState {
		self.sync.status().state
	}

	/// Target sync block number.
	pub fn best_seen_block(&self) -> Option<NumberFor<B>> {
		self.sync.status().best_seen_block
	}

	/// Number of peers participating in syncing.
	pub fn num_sync_peers(&self) -> u32 {
		self.sync.status().num_peers
	}

	/// Number of blocks in the import queue.
	pub fn num_queued_blocks(&self) -> u32 {
		self.sync.status().queued_blocks
	}

	/// Number of processed blocks.
	pub fn num_processed_blocks(&self) -> usize {
		self.sync.num_processed_blocks()
	}

	/// Number of active sync requests.
	pub fn num_sync_requests(&self) -> usize {
		self.sync.num_sync_requests()
	}

	/// Sync local state with the blockchain state.
	pub fn update_chain(&mut self) {
		let info = self.context_data.chain.info();
		self.sync.update_chain_info(&info.best_hash, info.best_number);
	}

	fn update_peer_info(&mut self, who: &PeerId) {
		if let Some(info) = self.sync.peer_info(who) {
			if let Some(ref mut peer) = self.context_data.peers.get_mut(who) {
				peer.info.best_hash = info.best_hash;
				peer.info.best_number = info.best_number;
			}
		}
	}

	/// Returns information about all the peers we are connected to after the handshake message.
	pub fn peers_info(&self) -> impl Iterator<Item = (&PeerId, &PeerInfo<B>)> {
		self.context_data.peers.iter().map(|(id, peer)| (id, &peer.info))
	}

	pub fn on_custom_message(
		&mut self,
		who: PeerId,
		data: BytesMut,
	) -> CustomMessageOutcome<B> {

		let message = match <Message<B> as Decode>::decode(&mut &data[..]) {
			Ok(message) => message,
			Err(err) => {
				debug!(target: "sync", "Couldn't decode packet sent by {}: {:?}: {}", who, data, err.what());
				self.peerset_handle.report_peer(who, rep::BAD_MESSAGE);
				return CustomMessageOutcome::None;
			}
		};

		let mut stats = self.context_data.stats.entry(message.id()).or_default();
		stats.bytes_in += data.len() as u64;
		stats.count_in += 1;

		match message {
			GenericMessage::Status(s) => return self.on_status_message(who, s),
			GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
			GenericMessage::BlockResponse(r) => {
				let outcome = self.on_block_response(who.clone(), r);
				self.update_peer_info(&who);
				return outcome
			},
			GenericMessage::BlockAnnounce(announce) => {
				let outcome = self.on_block_announce(who.clone(), announce);
				self.update_peer_info(&who);
				return outcome;
			},
			GenericMessage::Transactions(m) =>
				self.on_extrinsics(who, m),
			GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request),
			GenericMessage::RemoteCallResponse(_) =>
				warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"),
			GenericMessage::RemoteReadRequest(request) =>
				self.on_remote_read_request(who, request),
			GenericMessage::RemoteReadResponse(_) =>
				warn!(target: "sub-libp2p", "Received unexpected RemoteReadResponse"),
			GenericMessage::RemoteHeaderRequest(request) =>
				self.on_remote_header_request(who, request),
			GenericMessage::RemoteHeaderResponse(_) =>
				warn!(target: "sub-libp2p", "Received unexpected RemoteHeaderResponse"),
			GenericMessage::RemoteChangesRequest(request) =>
				self.on_remote_changes_request(who, request),
			GenericMessage::RemoteChangesResponse(_) =>
				warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"),
			GenericMessage::FinalityProofRequest(request) =>
				self.on_finality_proof_request(who, request),
			GenericMessage::FinalityProofResponse(response) =>
				return self.on_finality_proof_response(who, response),
			GenericMessage::RemoteReadChildRequest(request) =>
				self.on_remote_read_child_request(who, request),
			GenericMessage::Consensus(msg) =>
				return if self.protocol_name_by_engine.contains_key(&msg.engine_id) {
					CustomMessageOutcome::NotificationsReceived {
						remote: who,
						messages: vec![(msg.engine_id, From::from(msg.data))],
					}
				} else {
					warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
					CustomMessageOutcome::None
				},
			GenericMessage::ConsensusBatch(messages) => {
				let messages = messages
					.into_iter()
					.filter_map(|msg| {
						if self.protocol_name_by_engine.contains_key(&msg.engine_id) {
							Some((msg.engine_id, From::from(msg.data)))
						} else {
							warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
							None
						}
					})
					.collect::<Vec<_>>();

				return if !messages.is_empty() {
					CustomMessageOutcome::NotificationsReceived {
						remote: who,
						messages,
					}
				} else {
					CustomMessageOutcome::None
				};
			},
		}

		CustomMessageOutcome::None
	}

	fn send_request(&mut self, who: &PeerId, message: Message<B>) {
		send_request::<B, H>(
			&mut self.behaviour,
			&mut self.context_data.stats,
			&mut self.context_data.peers,
			who,
			message,
		);
	}

	fn send_message(
		&mut self,
		who: &PeerId,
		message: Option<(Cow<'static, [u8]>, Vec<u8>)>,
		legacy: Message<B>,
	) {
		send_message::<B>(
			&mut self.behaviour,
			&mut self.context_data.stats,
			who,
			message,
			legacy,
		);
	}

	fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest<B>) {
		update_peer_request::<B, H>(&mut self.context_data.peers, who, request)
	}

	/// Called when a new peer is connected
	pub fn on_peer_connected(&mut self, who: PeerId) {
		trace!(target: "sync", "Connecting {}", who);
		self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: Instant::now() });
		self.send_status(who);
	}

	/// Called by peer when it is disconnecting
	pub fn on_peer_disconnected(&mut self, peer: PeerId) -> CustomMessageOutcome<B> {
		if self.important_peers.contains(&peer) {
			warn!(target: "sync", "Reserved peer {} disconnected", peer);
		} else {
			trace!(target: "sync", "{} disconnected", peer);
		}

		// lock all the the peer lists so that add/remove peer events are in order
		let removed = {
			self.handshaking_peers.remove(&peer);
			self.context_data.peers.remove(&peer)
		};
		if let Some(_peer_data) = removed {
			self.sync.peer_disconnected(&peer);

			// Notify all the notification protocols as closed.
			CustomMessageOutcome::NotificationStreamClosed {
				remote: peer,
				protocols: self.protocol_name_by_engine.keys().cloned().collect(),
			}
		} else {
			CustomMessageOutcome::None
		}
	}

	/// Called as a back-pressure mechanism if the networking detects that the peer cannot process
	/// our messaging rate fast enough.
	pub fn on_clogged_peer(&self, who: PeerId, _msg: Option<Message<B>>) {
		self.peerset_handle.report_peer(who.clone(), rep::CLOGGED_PEER);

		// Print some diagnostics.
		if let Some(peer) = self.context_data.peers.get(&who) {
			debug!(target: "sync", "Clogged peer {} (protocol_version: {:?}; roles: {:?}; \
				known_extrinsics: {:?}; known_blocks: {:?}; best_hash: {:?}; best_number: {:?})",
				who, peer.info.protocol_version, peer.info.roles, peer.known_extrinsics, peer.known_blocks,
				peer.info.best_hash, peer.info.best_number);
		} else {
			debug!(target: "sync", "Peer clogged before being properly connected");
		}
	}

	fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest<B>) {
		trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}",
			request.id,
			peer,
			request.from,
			request.to,
			request.max,
			request.fields,
		);

		// sending block requests to the node that is unable to serve it is considered a bad behavior
		if !self.config.roles.is_full() {
			trace!(target: "sync", "Peer {} is trying to sync from the light node", peer);
			self.behaviour.disconnect_peer(&peer);
			self.peerset_handle.report_peer(peer, rep::UNEXPECTED_REQUEST);
			return;
		}

		let mut blocks = Vec::new();
		let mut id = match request.from {
			message::FromBlock::Hash(h) => BlockId::Hash(h),
			message::FromBlock::Number(n) => BlockId::Number(n),
		};
		let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize;
		let get_header = request.fields.contains(message::BlockAttributes::HEADER);
		let get_body = request.fields.contains(message::BlockAttributes::BODY);
		let get_justification = request
			.fields
			.contains(message::BlockAttributes::JUSTIFICATION);
		while let Some(header) = self.context_data.chain.header(id).unwrap_or(None) {
			if blocks.len() >= max {
				break;
			}
			let number = *header.number();
			let hash = header.hash();
			let parent_hash = *header.parent_hash();
			let justification = if get_justification {
				self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None)
			} else {
				None
			};
			let block_data = message::generic::BlockData {
				hash,
				header: if get_header { Some(header) } else { None },
				body: if get_body {
					self.context_data
						.chain
						.block_body(&BlockId::Hash(hash))
						.unwrap_or(None)
				} else {
					None
				},
				receipt: None,
				message_queue: None,
				justification,
			};
			// Stop if we don't have requested block body
			if get_body && block_data.body.is_none() {
				trace!(target: "sync", "Missing data for block request.");
				break;
			}
			blocks.push(block_data);
			match request.direction {
				message::Direction::Ascending => id = BlockId::Number(number + One::one()),
				message::Direction::Descending => {
					if number.is_zero() {
						break;
					}
					id = BlockId::Hash(parent_hash)
				}
			}
		}
		let response = message::generic::BlockResponse {
			id: request.id,
			blocks,
		};
		trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len());
		self.send_message(&peer, None, GenericMessage::BlockResponse(response))
	}

	/// Adjusts the reputation of a node.
	pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) {
		self.peerset_handle.report_peer(who, reputation)
	}

	/// Must be called in response to a [`CustomMessageOutcome::BlockRequest`] being emitted.
	/// Must contain the same `PeerId` and request that have been emitted.
	pub fn on_block_response(
		&mut self,
		peer: PeerId,
		response: message::BlockResponse<B>,
	) -> CustomMessageOutcome<B> {
		let request = if let Some(ref mut p) = self.context_data.peers.get_mut(&peer) {
			if p.obsolete_requests.remove(&response.id).is_some() {
				trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
				return CustomMessageOutcome::None;
			}
			// Clear the request. If the response is invalid peer will be disconnected anyway.
			match p.block_request.take() {
				Some((_, request)) if request.id == response.id => request,
				Some(_) =>  {
					trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
					return CustomMessageOutcome::None;
				}
				None => {
					trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
					self.behaviour.disconnect_peer(&peer);
					self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
					return CustomMessageOutcome::None;
				}
			}
		} else {
			trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
			self.behaviour.disconnect_peer(&peer);
			self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
			return CustomMessageOutcome::None;
		};

		let blocks_range = || match (
			response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
			response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
		) {
			(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
			(Some(first), Some(_)) => format!(" ({})", first),
			_ => Default::default(),
		};
		trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}",
			response.id,
			peer,
			response.blocks.len(),
			blocks_range(),
		);

		if request.fields == message::BlockAttributes::JUSTIFICATION {
			match self.sync.on_block_justification(peer, response) {
				Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None,
				Ok(sync::OnBlockJustification::Import { peer, hash, number, justification }) =>
					CustomMessageOutcome::JustificationImport(peer, hash, number, justification),
				Err(sync::BadPeer(id, repu)) => {
					self.behaviour.disconnect_peer(&id);
					self.peerset_handle.report_peer(id, repu);
					CustomMessageOutcome::None
				}
			}
		} else {
			// Validate fields against the request.
			if request.fields.contains(message::BlockAttributes::HEADER) && response.blocks.iter().any(|b| b.header.is_none()) {
				self.behaviour.disconnect_peer(&peer);
				self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
				trace!(target: "sync", "Missing header for a block");
				return CustomMessageOutcome::None
			}
			if request.fields.contains(message::BlockAttributes::BODY) && response.blocks.iter().any(|b| b.body.is_none()) {
				self.behaviour.disconnect_peer(&peer);
				self.peerset_handle.report_peer(peer, rep::BAD_RESPONSE);
				trace!(target: "sync", "Missing body for a block");
				return CustomMessageOutcome::None
			}

			match self.sync.on_block_data(&peer, Some(request), response) {
				Ok(sync::OnBlockData::Import(origin, blocks)) =>
					CustomMessageOutcome::BlockImport(origin, blocks),
				Ok(sync::OnBlockData::Request(peer, mut req)) => {
					if self.use_new_block_requests_protocol {
						self.update_peer_request(&peer, &mut req);
						CustomMessageOutcome::BlockRequest {
							target: peer,
							request: req,
						}
					} else {
						self.send_request(&peer, GenericMessage::BlockRequest(req));
						CustomMessageOutcome::None
					}
				}
				Err(sync::BadPeer(id, repu)) => {
					self.behaviour.disconnect_peer(&id);
					self.peerset_handle.report_peer(id, repu);
					CustomMessageOutcome::None
				}
			}
		}
	}

	/// Must be called in response to a [`CustomMessageOutcome::BlockRequest`] if it has failed.
	pub fn on_block_request_failed(
		&mut self,
		peer: &PeerId,
	) {
		self.peerset_handle.report_peer(peer.clone(), rep::TIMEOUT);
		self.behaviour.disconnect_peer(peer);
	}

	/// Perform time based maintenance.
	///
	/// > **Note**: This method normally doesn't have to be called except for testing purposes.
	pub fn tick(&mut self) {
		self.maintain_peers();
		self.report_metrics()
	}

	fn maintain_peers(&mut self) {
		let tick = Instant::now();
		let mut aborting = Vec::new();
		{
			for (who, peer) in self.context_data.peers.iter() {
				if peer.block_request.as_ref().map_or(false, |(t, _)| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) {
					log!(
						target: "sync",
						if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
						"Request timeout {}", who
					);
					aborting.push(who.clone());
				} else if peer.obsolete_requests.values().any(|t| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) {
					log!(
						target: "sync",
						if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
						"Obsolete timeout {}", who
					);
					aborting.push(who.clone());
				}
			}
			for (who, _) in self.handshaking_peers.iter()
				.filter(|(_, handshaking)| (tick - handshaking.timestamp).as_secs() > REQUEST_TIMEOUT_SEC)
			{
				log!(
					target: "sync",
					if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
					"Handshake timeout {}", who
				);
				aborting.push(who.clone());
			}
		}

		for p in aborting {
			self.behaviour.disconnect_peer(&p);
			self.peerset_handle.report_peer(p, rep::TIMEOUT);
		}
	}

	/// Called by peer to report status
	fn on_status_message(&mut self, who: PeerId, status: message::Status<B>) -> CustomMessageOutcome<B> {
		trace!(target: "sync", "New peer {} {:?}", who, status);
		let _protocol_version = {
			if self.context_data.peers.contains_key(&who) {
				log!(
					target: "sync",
					if self.important_peers.contains(&who) { Level::Warn } else { Level::Debug },
					"Unexpected status packet from {}", who
				);
				self.peerset_handle.report_peer(who, rep::UNEXPECTED_STATUS);
				return CustomMessageOutcome::None;
			}
			if status.genesis_hash != self.genesis_hash {
				log!(
					target: "sync",
					if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
					"Peer is on different chain (our genesis: {} theirs: {})",
					self.genesis_hash, status.genesis_hash
				);
				self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
				self.behaviour.disconnect_peer(&who);

				if self.boot_node_ids.contains(&who) {
					error!(
						target: "sync",
						"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
						who,
						self.genesis_hash,
						status.genesis_hash,
					);
				}

				return CustomMessageOutcome::None;
			}
			if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
				log!(
					target: "sync",
					if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
					"Peer {:?} using unsupported protocol version {}", who, status.version
				);
				self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL);
				self.behaviour.disconnect_peer(&who);
				return CustomMessageOutcome::None;
			}

			if self.config.roles.is_light() {
				// we're not interested in light peers
				if status.roles.is_light() {
					debug!(target: "sync", "Peer {} is unable to serve light requests", who);
					self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
					self.behaviour.disconnect_peer(&who);
					return CustomMessageOutcome::None;
				}

				// we don't interested in peers that are far behind us
				let self_best_block = self
					.context_data
					.chain
					.info()
					.best_number;
				let blocks_difference = self_best_block
					.checked_sub(&status.best_number)
					.unwrap_or_else(Zero::zero)
					.saturated_into::<u64>();
				if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
					debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
					self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
					self.behaviour.disconnect_peer(&who);
					return CustomMessageOutcome::None;
				}
			}

			let info = match self.handshaking_peers.remove(&who) {
				Some(_handshaking) => {
					PeerInfo {
						protocol_version: status.version,
						roles: status.roles,
						best_hash: status.best_hash,
						best_number: status.best_number
					}
				},
				None => {
					error!(target: "sync", "Received status from previously unconnected node {}", who);
					return CustomMessageOutcome::None;
				},
			};

			let peer = Peer {
				info,
				block_request: None,
				known_extrinsics: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_EXTRINSICS)
					.expect("Constant is nonzero")),
				known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
					.expect("Constant is nonzero")),
				next_request_id: 0,
				obsolete_requests: HashMap::new(),
			};
			self.context_data.peers.insert(who.clone(), peer);

			debug!(target: "sync", "Connected {}", who);
			status.version
		};

		let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
		self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number));
		if info.roles.is_full() {
			match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
				Ok(None) => (),
				Ok(Some(mut req)) => {
					if self.use_new_block_requests_protocol {
						self.update_peer_request(&who, &mut req);
						self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
							target: who.clone(),
							request: req,
						});
					} else {
						self.send_request(&who, GenericMessage::BlockRequest(req))
					}
				},
				Err(sync::BadPeer(id, repu)) => {
					self.behaviour.disconnect_peer(&id);
					self.peerset_handle.report_peer(id, repu)
				}
			}
		}

		// Notify all the notification protocols as open.
		CustomMessageOutcome::NotificationStreamOpened {
			remote: who,
			protocols: self.protocol_name_by_engine.keys().cloned().collect(),
			roles: info.roles,
		}
	}

	/// Send a notification to the given peer we're connected to.
	///
	/// Doesn't do anything if we don't have a notifications substream for that protocol with that
	/// peer.
	pub fn write_notification(
		&mut self,
		target: PeerId,
		engine_id: ConsensusEngineId,
		message: impl Into<Vec<u8>>,
	) {
		if let Some(protocol_name) = self.protocol_name_by_engine.get(&engine_id) {
			let message = message.into();
			let fallback = GenericMessage::<(), (), (), ()>::Consensus(ConsensusMessage {
				engine_id,
				data: message.clone(),
			}).encode();
			self.behaviour.write_notification(&target, protocol_name.clone(), message, fallback);
		} else {
			error!(
				target: "sub-libp2p",
				"Sending a notification with a protocol that wasn't registered: {:?}",
				engine_id
			);
		}
	}

	/// Registers a new notifications protocol.
	///
	/// While registering a protocol while we already have open connections is discouraged, we
	/// nonetheless handle it by notifying that we opened channels with everyone. This function
	/// returns a list of substreams to open as a result.
	pub fn register_notifications_protocol<'a>(
		&'a mut self,
		engine_id: ConsensusEngineId,
		protocol_name: impl Into<Cow<'static, [u8]>>,
		handshake_message: Vec<u8>,
	) -> impl ExactSizeIterator<Item = (&'a PeerId, Roles)> + 'a {
		let protocol_name = protocol_name.into();
		if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() {
			error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name);
		} else {
			self.behaviour.register_notif_protocol(protocol_name.clone(), handshake_message);
			self.legacy_equiv_by_name.insert(protocol_name, Fallback::Consensus(engine_id));
		}

		self.context_data.peers.iter()
			.map(|(peer_id, peer)| (peer_id, peer.info.roles))
	}

	/// Called when peer sends us new extrinsics
	fn on_extrinsics(
		&mut self,
		who: PeerId,
		extrinsics: message::Transactions<B::Extrinsic>
	) {
		// sending extrinsic to light node is considered a bad behavior
		if !self.config.roles.is_full() {
			trace!(target: "sync", "Peer {} is trying to send extrinsic to the light node", who);
			self.behaviour.disconnect_peer(&who);
			self.peerset_handle.report_peer(who, rep::UNEXPECTED_EXTRINSICS);
			return;
		}

		// Accept extrinsics only when fully synced
		if self.sync.status().state != SyncState::Idle {
			trace!(target: "sync", "{} Ignoring extrinsics while syncing", who);
			return;
		}
		trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who);
		if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
			for t in extrinsics {
				if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
					debug!(
						target: "sync",
						"Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit",
						MAX_PENDING_TRANSACTIONS,
					);
					break;
				}

				let hash = self.transaction_pool.hash_of(&t);
				peer.known_extrinsics.insert(hash);

				self.peerset_handle.report_peer(who.clone(), rep::ANY_EXTRINSIC);

				self.pending_transactions.push(PendingTransaction {
					peer_id: who.clone(),
					validation: self.transaction_pool.import(t),
				});
			}
		}
	}

	fn on_handle_extrinsic_import(&mut self, who: PeerId, import: TransactionImport) {
		match import {
			TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_EXTRINSIC_REFUND),
			TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_EXTRINSIC),
			TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_EXTRINSIC),
			TransactionImport::None => {},
		}
	}

	/// Propagate one extrinsic.
	pub fn propagate_extrinsic(
		&mut self,
		hash: &H,
	) {
		debug!(target: "sync", "Propagating extrinsic [{:?}]", hash);
		// Accept transactions only when fully synced
		if self.sync.status().state != SyncState::Idle {
			return;
		}
		if let Some(extrinsic) = self.transaction_pool.transaction(hash) {
			let propagated_to = self.do_propagate_extrinsics(&[(hash.clone(), extrinsic)]);
			self.transaction_pool.on_broadcasted(propagated_to);
		}
	}

	fn do_propagate_extrinsics(
		&mut self,
		extrinsics: &[(H, B::Extrinsic)],
	) -> HashMap<H, Vec<String>> {
		let mut propagated_to = HashMap::new();
		for (who, peer) in self.context_data.peers.iter_mut() {
			// never send extrinsics to the light node
			if !peer.info.roles.is_full() {
				continue;
			}

			let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics
				.iter()
				.filter(|&(ref hash, _)| peer.known_extrinsics.insert(hash.clone()))
				.cloned()
				.unzip();

			if !to_send.is_empty() {
				for hash in hashes {
					propagated_to
						.entry(hash)
						.or_insert_with(Vec::new)
						.push(who.to_base58());
				}
				trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who);
				let encoded = to_send.encode();
				send_message::<B> (
					&mut self.behaviour,
					&mut self.context_data.stats,
					&who,
					Some((self.transactions_protocol.clone(), encoded)),
					GenericMessage::Transactions(to_send)
				)
			}
		}

		propagated_to
	}

	/// Call when we must propagate ready extrinsics to peers.
	pub fn propagate_extrinsics(&mut self) {
		debug!(target: "sync", "Propagating extrinsics");
		// Accept transactions only when fully synced
		if self.sync.status().state != SyncState::Idle {
			return;
		}
		let extrinsics = self.transaction_pool.transactions();
		let propagated_to = self.do_propagate_extrinsics(&extrinsics);
		self.transaction_pool.on_broadcasted(propagated_to);
	}

	/// Make sure an important block is propagated to peers.
	///
	/// In chain-based consensus, we often need to make sure non-best forks are
	/// at least temporarily synced.
	pub fn announce_block(&mut self, hash: B::Hash, data: Vec<u8>) {
		let header = match self.context_data.chain.header(BlockId::Hash(hash)) {
			Ok(Some(header)) => header,
			Ok(None) => {
				warn!("Trying to announce unknown block: {}", hash);
				return;
			}
			Err(e) => {
				warn!("Error reading block header {}: {:?}", hash, e);
				return;
			}
		};

		// don't announce genesis block since it will be ignored
		if header.number().is_zero() {
			return;
		}

		let is_best = self.context_data.chain.info().best_hash == hash;
		debug!(target: "sync", "Reannouncing block {:?}", hash);
		self.send_announcement(&header, data, is_best, true)
	}

	fn send_announcement(&mut self, header: &B::Header, data: Vec<u8>, is_best: bool, force: bool) {
		let hash = header.hash();

		for (who, ref mut peer) in self.context_data.peers.iter_mut() {
			trace!(target: "sync", "Announcing block {:?} to {}", hash, who);
			let inserted = peer.known_blocks.insert(hash);
			if inserted || force {
				let message = message::BlockAnnounce {
					header: header.clone(),
					state: if peer.info.protocol_version >= 4  {
						if is_best {
							Some(message::BlockState::Best)
						} else {
							Some(message::BlockState::Normal)
						}
					} else  {
						None
					},
					data: if peer.info.protocol_version >= 4 {
						Some(data.clone())
					} else {
						None
					},
				};

				let encoded = message.encode();

				send_message::<B> (
					&mut self.behaviour,
					&mut self.context_data.stats,
					&who,
					Some((self.block_announces_protocol.clone(), encoded)),
					Message::<B>::BlockAnnounce(message),
				)
			}
		}
	}

	/// Send Status message
	fn send_status(&mut self, who: PeerId) {
		let info = self.context_data.chain.info();
		let status = message::generic::Status {
			version: CURRENT_VERSION,
			min_supported_version: MIN_VERSION,
			genesis_hash: info.genesis_hash,
			roles: self.config.roles,
			best_number: info.best_number,
			best_hash: info.best_hash,
			chain_status: Vec::new(), // TODO: find a way to make this backwards-compatible
		};

		self.send_message(&who, None, GenericMessage::Status(status))
	}

	fn on_block_announce(
		&mut self,
		who: PeerId,
		announce: BlockAnnounce<B::Header>,
	) -> CustomMessageOutcome<B> {
		let hash = announce.header.hash();
		let number = *announce.header.number();

		if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) {
			peer.known_blocks.insert(hash.clone());
		}

		let is_their_best = match announce.state.unwrap_or(message::BlockState::Best) {
			message::BlockState::Best => true,
			message::BlockState::Normal => false,
		};

		match self.sync.on_block_announce(&who, hash, &announce, is_their_best) {
			sync::OnBlockAnnounce::Nothing => {
				// `on_block_announce` returns `OnBlockAnnounce::ImportHeader`
				// when we have all data required to import the block
				// in the BlockAnnounce message. This is only when:
				// 1) we're on light client;
				// AND
				// 2) parent block is already imported and not pruned.
				if is_their_best {
					return CustomMessageOutcome::PeerNewBest(who, number);
				} else {
					return CustomMessageOutcome::None;
				}
			}
			sync::OnBlockAnnounce::ImportHeader => () // We proceed with the import.
		}

		// to import header from announced block let's construct response to request that normally would have
		// been sent over network (but it is not in our case)
		let blocks_to_import = self.sync.on_block_data(
			&who,
			None,
			message::generic::BlockResponse {
				id: 0,
				blocks: vec![
					message::generic::BlockData {
						hash: hash,
						header: Some(announce.header),
						body: None,
						receipt: None,
						message_queue: None,
						justification: None,
					},
				],
			},
		);

		if is_their_best {
			self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who, number));
		}

		match blocks_to_import {
			Ok(sync::OnBlockData::Import(origin, blocks)) => {
				CustomMessageOutcome::BlockImport(origin, blocks)
			},
			Ok(sync::OnBlockData::Request(peer, mut req)) => {
				if self.use_new_block_requests_protocol {
					self.update_peer_request(&peer, &mut req);
					CustomMessageOutcome::BlockRequest {
						target: peer,
						request: req,
					}
				} else {
					self.send_request(&peer, GenericMessage::BlockRequest(req));
					CustomMessageOutcome::None
				}
			}
			Err(sync::BadPeer(id, repu)) => {
				self.behaviour.disconnect_peer(&id);
				self.peerset_handle.report_peer(id, repu);
				CustomMessageOutcome::None
			}
		}
	}

	/// Call this when a block has been finalized. The sync layer may have some additional
	/// requesting to perform.
	pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) {
		self.sync.on_block_finalized(&hash, *header.number())
	}

	fn on_remote_call_request(
		&mut self,
		who: PeerId,
		request: message::RemoteCallRequest<B::Hash>,
	) {
		trace!(target: "sync", "Remote call request {} from {} ({} at {})",
			request.id,
			who,
			request.method,
			request.block
		);
		let proof = match self.context_data.chain.execution_proof(
			&BlockId::Hash(request.block),
			&request.method,
			&request.data,
		) {
			Ok((_, proof)) => proof,
			Err(error) => {
				trace!(target: "sync", "Remote call request {} from {} ({} at {}) failed with: {}",
					request.id,
					who,
					request.method,
					request.block,
					error
				);
				self.peerset_handle.report_peer(who.clone(), rep::RPC_FAILED);
				StorageProof::empty()
			}
		};

		self.send_message(
			&who,
			None,
			GenericMessage::RemoteCallResponse(message::RemoteCallResponse {
				id: request.id,
				proof,
			}),
		);
	}

	/// Request a justification for the given block.
	///
	/// Uses `protocol` to queue a new justification request and tries to dispatch all pending
	/// requests.
	pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
		self.sync.request_justification(&hash, number)
	}

	/// Request syncing for the given block from given set of peers.
	/// Uses `protocol` to queue a new block download request and tries to dispatch all pending
	/// requests.
	pub fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
		self.sync.set_sync_fork_request(peers, hash, number)
	}

	/// A batch of blocks have been processed, with or without errors.
	/// Call this when a batch of blocks have been processed by the importqueue, with or without
	/// errors.
	pub fn on_blocks_processed(
		&mut self,
		imported: usize,
		count: usize,
		results: Vec<(Result<BlockImportResult<NumberFor<B>>, BlockImportError>, B::Hash)>
	) {
		let new_best = results.iter().rev().find_map(|r| match r {
			(Ok(BlockImportResult::ImportedUnknown(n, aux, _)), hash) if aux.is_new_best => Some((*n, hash.clone())),
			_ => None,
		});
		if let Some((best_num, best_hash)) = new_best {
			self.sync.update_chain_info(&best_hash, best_num);
			self.behaviour.set_notif_protocol_handshake(
				&self.block_announces_protocol,
				BlockAnnouncesHandshake::build(&self.config, &self.context_data.chain).encode()
			);
		}
		let results = self.sync.on_blocks_processed(
			imported,
			count,
			results,
		);
		for result in results {
			match result {
				Ok((id, mut req)) => {
					if self.use_new_block_requests_protocol {
						update_peer_request(&mut self.context_data.peers, &id, &mut req);
						self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
							target: id,
							request: req,
						});
					} else {
						let msg = GenericMessage::BlockRequest(req);
						send_request(
							&mut self.behaviour,
							&mut self.context_data.stats,
							&mut self.context_data.peers,
							&id,
							msg
						)
					}
				}
				Err(sync::BadPeer(id, repu)) => {
					self.behaviour.disconnect_peer(&id);
					self.peerset_handle.report_peer(id, repu)
				}
			}
		}
	}

	/// Call this when a justification has been processed by the import queue, with or without
	/// errors.
	pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
		self.sync.on_justification_import(hash, number, success)
	}

	/// Request a finality proof for the given block.
	///
	/// Queues a new finality proof request and tries to dispatch all pending requests.
	pub fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor<B>) {
		self.sync.request_finality_proof(&hash, number)
	}

	/// Notify the protocol that we have learned about the existence of nodes.
	///
	/// Can be called multiple times with the same `PeerId`s.
	pub fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
		self.behaviour.add_discovered_nodes(peer_ids)
	}

	pub fn finality_proof_import_result(
		&mut self,
		request_block: (B::Hash, NumberFor<B>),
		finalization_result: Result<(B::Hash, NumberFor<B>), ()>,
	) {
		self.sync.on_finality_proof_import(request_block, finalization_result)
	}

	fn on_remote_read_request(
		&mut self,
		who: PeerId,
		request: message::RemoteReadRequest<B::Hash>,
	) {
		if request.keys.is_empty() {
			debug!(target: "sync", "Invalid remote read request sent by {}", who);
			self.behaviour.disconnect_peer(&who);
			self.peerset_handle.report_peer(who, rep::BAD_MESSAGE);
			return;
		}

		let keys_str = || match request.keys.len() {
			1 => HexDisplay::from(&request.keys[0]).to_string(),
			_ => format!(
				"{}..{}",
				HexDisplay::from(&request.keys[0]),
				HexDisplay::from(&request.keys[request.keys.len() - 1]),
			),
		};

		trace!(target: "sync", "Remote read request {} from {} ({} at {})",
			request.id, who, keys_str(), request.block);
		let proof = match self.context_data.chain.read_proof(
			&BlockId::Hash(request.block),
			&mut request.keys.iter().map(AsRef::as_ref)
		) {
			Ok(proof) => proof,
			Err(error) => {
				trace!(target: "sync", "Remote read request {} from {} ({} at {}) failed with: {}",
					request.id,
					who,
					keys_str(),
					request.block,
					error
				);
				StorageProof::empty()
			}
		};
		self.send_message(
			&who,
			None,
			GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
				id: request.id,
				proof,
			}),
		);
	}

	fn on_remote_read_child_request(
		&mut self,
		who: PeerId,
		request: message::RemoteReadChildRequest<B::Hash>,
	) {
		if request.keys.is_empty() {
			debug!(target: "sync", "Invalid remote child read request sent by {}", who);
			self.behaviour.disconnect_peer(&who);
			self.peerset_handle.report_peer(who, rep::BAD_MESSAGE);
			return;
		}

		let keys_str = || match request.keys.len() {
			1 => HexDisplay::from(&request.keys[0]).to_string(),
			_ => format!(
				"{}..{}",
				HexDisplay::from(&request.keys[0]),
				HexDisplay::from(&request.keys[request.keys.len() - 1]),
			),
		};

		trace!(target: "sync", "Remote read child request {} from {} ({} {} at {})",
			request.id, who, HexDisplay::from(&request.storage_key), keys_str(), request.block);
		let prefixed_key = PrefixedStorageKey::new_ref(&request.storage_key);
		let child_info = match ChildType::from_prefixed_key(prefixed_key) {
			Some((ChildType::ParentKeyId, storage_key)) => Ok(ChildInfo::new_default(storage_key)),
			None => Err("Invalid child storage key".into()),
		};
		let proof = match child_info.and_then(|child_info| self.context_data.chain.read_child_proof(
			&BlockId::Hash(request.block),
			&child_info,
			&mut request.keys.iter().map(AsRef::as_ref),
		)) {
			Ok(proof) => proof,
			Err(error) => {
				trace!(target: "sync", "Remote read child request {} from {} ({} {} at {}) failed with: {}",
					request.id,
					who,
					HexDisplay::from(&request.storage_key),
					keys_str(),
					request.block,
					error
				);
				StorageProof::empty()
			}
		};
		self.send_message(
			&who,
			None,
			GenericMessage::RemoteReadResponse(message::RemoteReadResponse {
				id: request.id,
				proof,
			}),
		);
	}

	fn on_remote_header_request(
		&mut self,
		who: PeerId,
		request: message::RemoteHeaderRequest<NumberFor<B>>,
	) {
		trace!(target: "sync", "Remote header proof request {} from {} ({})",
			request.id, who, request.block);
		let (header, proof) = match self.context_data.chain.header_proof(&BlockId::Number(request.block)) {
			Ok((header, proof)) => (Some(header), proof),
			Err(error) => {
				trace!(target: "sync", "Remote header proof request {} from {} ({}) failed with: {}",
					request.id,
					who,
					request.block,
					error
				);
				(Default::default(), StorageProof::empty())
			}
		};
		self.send_message(
			&who,
			None,
			GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse {
				id: request.id,
				header,
				proof,
			}),
		);
	}

	fn on_remote_changes_request(
		&mut self,
		who: PeerId,
		request: message::RemoteChangesRequest<B::Hash>,
	) {
		trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{})",
			request.id,
			who,
			if let Some(sk) = request.storage_key.as_ref() {
				format!("{} : {}", HexDisplay::from(sk), HexDisplay::from(&request.key))
			} else {
				HexDisplay::from(&request.key).to_string()
			},
			request.first,
			request.last
		);
		let key = StorageKey(request.key);
		let prefixed_key =  request.storage_key.as_ref()
			.map(|storage_key| PrefixedStorageKey::new_ref(storage_key));
		let (first, last, min, max) = (request.first, request.last, request.min, request.max);
		let proof = match self.context_data.chain.key_changes_proof(
			first,
			last,
			min,
			max,
			prefixed_key,
			&key,
		) {
			Ok(proof) => proof,
			Err(error) => {
				trace!(target: "sync", "Remote changes proof request {} from {} for key {} ({}..{}) failed with: {}",
					request.id,
					who,
					if let Some(sk) = request.storage_key.as_ref() {
						format!("{} : {}", HexDisplay::from(sk), HexDisplay::from(&key.0))
					} else {
						HexDisplay::from(&key.0).to_string()
					},
					request.first,
					request.last,
					error
				);
				ChangesProof::<B::Header> {
					max_block: Zero::zero(),
					proof: vec![],
					roots: BTreeMap::new(),
					roots_proof: StorageProof::empty(),
				}
			}
		};
		self.send_message(
			&who,
			None,
			GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse {
				id: request.id,
				max: proof.max_block,
				proof: proof.proof,
				roots: proof.roots.into_iter().collect(),
				roots_proof: proof.roots_proof,
			}),
		);
	}

	fn on_finality_proof_request(
		&mut self,
		who: PeerId,
		request: message::FinalityProofRequest<B::Hash>,
	) {
		trace!(target: "sync", "Finality proof request from {} for {}", who, request.block);
		let finality_proof = self.finality_proof_provider.as_ref()
			.ok_or_else(|| String::from("Finality provider is not configured"))
			.and_then(|provider|
				provider.prove_finality(request.block, &request.request).map_err(|e| e.to_string())
			);
		let finality_proof = match finality_proof {
			Ok(finality_proof) => finality_proof,
			Err(error) => {
				trace!(target: "sync", "Finality proof request from {} for {} failed with: {}",
					who,
					request.block,
					error
				);
				None
			},
		};
		self.send_message(
			&who,
			None,
			GenericMessage::FinalityProofResponse(message::FinalityProofResponse {
				id: 0,
				block: request.block,
				proof: finality_proof,
			}),
		);
	}

	/// Must be called after a [`CustomMessageOutcome::FinalityProofRequest`] has been emitted,
	/// to notify of the response having arrived.
	pub fn on_finality_proof_response(
		&mut self,
		who: PeerId,
		response: message::FinalityProofResponse<B::Hash>,
	) -> CustomMessageOutcome<B> {
		trace!(target: "sync", "Finality proof response from {} for {}", who, response.block);
		match self.sync.on_block_finality_proof(who, response) {
			Ok(sync::OnBlockFinalityProof::Nothing) => CustomMessageOutcome::None,
			Ok(sync::OnBlockFinalityProof::Import { peer, hash, number, proof }) =>
				CustomMessageOutcome::FinalityProofImport(peer, hash, number, proof),
			Err(sync::BadPeer(id, repu)) => {
				self.behaviour.disconnect_peer(&id);
				self.peerset_handle.report_peer(id, repu);
				CustomMessageOutcome::None
			}
		}
	}

	fn format_stats(&self) -> String {
		let mut out = String::new();
		for (id, stats) in &self.context_data.stats {
			let _ = writeln!(
				&mut out,
				"{}: In: {} bytes ({}), Out: {} bytes ({})",
				id,
				stats.bytes_in,
				stats.count_in,
				stats.bytes_out,
				stats.count_out,
			);
		}
		out
	}

	fn report_metrics(&self) {
		use std::convert::TryInto;

		if let Some(metrics) = &self.metrics {
			let mut obsolete_requests: u64 = 0;
			for peer in self.context_data.peers.values() {
				let n = peer.obsolete_requests.len().try_into().unwrap_or(std::u64::MAX);
				obsolete_requests = obsolete_requests.saturating_add(n);
			}
			metrics.obsolete_requests.set(obsolete_requests);

			let n = self.handshaking_peers.len().try_into().unwrap_or(std::u64::MAX);
			metrics.handshaking_peers.set(n);

			let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
			metrics.peers.set(n);

			let m = self.sync.metrics();

			metrics.fork_targets.set(m.fork_targets.into());
			metrics.queued_blocks.set(m.queued_blocks.into());

			metrics.justifications.with_label_values(&["pending"])
				.set(m.justifications.pending_requests.into());
			metrics.justifications.with_label_values(&["active"])
				.set(m.justifications.active_requests.into());
			metrics.justifications.with_label_values(&["failed"])
				.set(m.justifications.failed_requests.into());
			metrics.justifications.with_label_values(&["importing"])
				.set(m.justifications.importing_requests.into());

			metrics.finality_proofs.with_label_values(&["pending"])
				.set(m.finality_proofs.pending_requests.into());
			metrics.finality_proofs.with_label_values(&["active"])
				.set(m.finality_proofs.active_requests.into());
			metrics.finality_proofs.with_label_values(&["failed"])
				.set(m.finality_proofs.failed_requests.into());
			metrics.finality_proofs.with_label_values(&["importing"])
				.set(m.finality_proofs.importing_requests.into());
		}
	}
}

/// Outcome of an incoming custom message.
#[derive(Debug)]
#[must_use]
pub enum CustomMessageOutcome<B: BlockT> {
	BlockImport(BlockOrigin, Vec<IncomingBlock<B>>),
	JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
	FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
	/// Notification protocols have been opened with a remote.
	NotificationStreamOpened { remote: PeerId, protocols: Vec<ConsensusEngineId>, roles: Roles },
	/// Notification protocols have been closed with a remote.
	NotificationStreamClosed { remote: PeerId, protocols: Vec<ConsensusEngineId> },
	/// Messages have been received on one or more notifications protocols.
	NotificationsReceived { remote: PeerId, messages: Vec<(ConsensusEngineId, Bytes)> },
	/// A new block request must be emitted.
	/// You must later call either [`Protocol::on_block_response`] or
	/// [`Protocol::on_block_request_failed`].
	/// Each peer can only have one active request. If a request already exists for this peer, it
	/// must be silently discarded.
	/// It is the responsibility of the handler to ensure that a timeout exists.
	BlockRequest { target: PeerId, request: message::BlockRequest<B> },
	/// A new finality proof request must be emitted.
	/// Once you have the response, you must call `Protocol::on_finality_proof_response`.
	/// It is the responsibility of the handler to ensure that a timeout exists.
	/// If the request times out, or the peer responds in an invalid way, the peer has to be
	/// disconnect. This will inform the state machine that the request it has emitted is stale.
	FinalityProofRequest { target: PeerId, block_hash: B::Hash, request: Vec<u8> },
	/// Peer has a reported a new head of chain.
	PeerNewBest(PeerId, NumberFor<B>),
	None,
}

fn send_request<B: BlockT, H: ExHashT>(
	behaviour: &mut GenericProto,
	stats: &mut HashMap<&'static str, PacketStats>,
	peers: &mut HashMap<PeerId, Peer<B, H>>,
	who: &PeerId,
	mut message: Message<B>,
) {
	if let GenericMessage::BlockRequest(ref mut r) = message {
		if let Some(ref mut peer) = peers.get_mut(who) {
			r.id = peer.next_request_id;
			peer.next_request_id += 1;
			if let Some((timestamp, request)) = peer.block_request.take() {
				trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
				peer.obsolete_requests.insert(request.id, timestamp);
			}
			peer.block_request = Some((Instant::now(), r.clone()));
		}
	}
	send_message::<B>(behaviour, stats, who, None, message)
}

fn update_peer_request<B: BlockT, H: ExHashT>(
	peers: &mut HashMap<PeerId, Peer<B, H>>,
	who: &PeerId,
	request: &mut message::BlockRequest<B>,
) {
	if let Some(ref mut peer) = peers.get_mut(who) {
		request.id = peer.next_request_id;
		peer.next_request_id += 1;
		if let Some((timestamp, request)) = peer.block_request.take() {
			trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
			peer.obsolete_requests.insert(request.id, timestamp);
		}
		peer.block_request = Some((Instant::now(), request.clone()));
	}
}

fn send_message<B: BlockT>(
	behaviour: &mut GenericProto,
	stats: &mut HashMap<&'static str, PacketStats>,
	who: &PeerId,
	message: Option<(Cow<'static, [u8]>, Vec<u8>)>,
	legacy_message: Message<B>,
) {
	let encoded = legacy_message.encode();
	let mut stats = stats.entry(legacy_message.id()).or_default();
	stats.bytes_out += encoded.len() as u64;
	stats.count_out += 1;
	if let Some((proto, msg)) = message {
		behaviour.write_notification(who, proto, msg, encoded);
	} else {
		behaviour.send_packet(who, encoded);
	}
}

impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
	type ProtocolsHandler = <GenericProto as NetworkBehaviour>::ProtocolsHandler;
	type OutEvent = CustomMessageOutcome<B>;

	fn new_handler(&mut self) -> Self::ProtocolsHandler {
		self.behaviour.new_handler()
	}

	fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
		self.behaviour.addresses_of_peer(peer_id)
	}

	fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
		self.behaviour.inject_connection_established(peer_id, conn, endpoint)
	}

	fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
		self.behaviour.inject_connection_closed(peer_id, conn, endpoint)
	}

	fn inject_connected(&mut self, peer_id: &PeerId) {
		self.behaviour.inject_connected(peer_id)
	}

	fn inject_disconnected(&mut self, peer_id: &PeerId) {
		self.behaviour.inject_disconnected(peer_id)
	}

	fn inject_event(
		&mut self,
		peer_id: PeerId,
		connection: ConnectionId,
		event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
	) {
		self.behaviour.inject_event(peer_id, connection, event)
	}

	fn poll(
		&mut self,
		cx: &mut std::task::Context,
		params: &mut impl PollParameters,
	) -> Poll<
		NetworkBehaviourAction<
			<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
			Self::OutEvent
		>
	> {
		if let Some(message) = self.pending_messages.pop_front() {
			return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
		}

		while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) {
			self.tick();
		}

		while let Poll::Ready(Some(())) = self.propagate_timeout.poll_next_unpin(cx) {
			self.propagate_extrinsics();
		}

		for (id, mut r) in self.sync.block_requests() {
			if self.use_new_block_requests_protocol {
				update_peer_request(&mut self.context_data.peers, &id, &mut r);
				let event = CustomMessageOutcome::BlockRequest {
					target: id.clone(),
					request: r,
				};
				self.pending_messages.push_back(event);
			} else {
				send_request(
					&mut self.behaviour,
					&mut self.context_data.stats,
					&mut self.context_data.peers,
					&id,
					GenericMessage::BlockRequest(r),
				)
			}
		}
		for (id, mut r) in self.sync.justification_requests() {
			if self.use_new_block_requests_protocol {
				update_peer_request(&mut self.context_data.peers, &id, &mut r);
				let event = CustomMessageOutcome::BlockRequest {
					target: id,
					request: r,
				};
				self.pending_messages.push_back(event);
			} else {
				send_request(
					&mut self.behaviour,
					&mut self.context_data.stats,
					&mut self.context_data.peers,
					&id,
					GenericMessage::BlockRequest(r),
				)
			}
		}
		for (id, r) in self.sync.finality_proof_requests() {
			if self.use_new_block_requests_protocol {
				let event = CustomMessageOutcome::FinalityProofRequest {
					target: id,
					block_hash: r.block,
					request: r.request,
				};
				self.pending_messages.push_back(event);
			} else {
				send_request(
					&mut self.behaviour,
					&mut self.context_data.stats,
					&mut self.context_data.peers,
					&id,
					GenericMessage::FinalityProofRequest(r),
				)
			}
		}
		if let Poll::Ready(Some((peer_id, result))) = self.pending_transactions.poll_next_unpin(cx) {
			self.on_handle_extrinsic_import(peer_id, result);
		}
		if let Some(message) = self.pending_messages.pop_front() {
			return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
		}

		let event = match self.behaviour.poll(cx, params) {
			Poll::Pending => return Poll::Pending,
			Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev,
			Poll::Ready(NetworkBehaviourAction::DialAddress { address }) =>
				return Poll::Ready(NetworkBehaviourAction::DialAddress { address }),
			Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) =>
				return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }),
			Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) =>
				return Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }),
			Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
				return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
		};

		let outcome = match event {
			GenericProtoOut::CustomProtocolOpen { peer_id, .. } => {
				self.on_peer_connected(peer_id);
				CustomMessageOutcome::None
			}
			GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
				self.on_peer_disconnected(peer_id)
			},
			GenericProtoOut::LegacyMessage { peer_id, message } =>
				self.on_custom_message(peer_id, message),
			GenericProtoOut::Notification { peer_id, protocol_name, message } =>
				match self.legacy_equiv_by_name.get(&protocol_name) {
					Some(Fallback::Consensus(engine_id)) => {
						CustomMessageOutcome::NotificationsReceived {
							remote: peer_id,
							messages: vec![(*engine_id, message.freeze())],
						}
					}
					Some(Fallback::Transactions) => {
						if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) {
							self.on_extrinsics(peer_id, m);
						} else {
							warn!(target: "sub-libp2p", "Failed to decode transactions list");
						}
						CustomMessageOutcome::None
					}
					Some(Fallback::BlockAnnounce) => {
						if let Ok(announce) = message::BlockAnnounce::decode(&mut message.as_ref()) {
							let outcome = self.on_block_announce(peer_id.clone(), announce);
							self.update_peer_info(&peer_id);
							outcome
						} else {
							warn!(target: "sub-libp2p", "Failed to decode block announce");
							CustomMessageOutcome::None
						}
					}
					None => {
						error!(target: "sub-libp2p", "Received notification from unknown protocol {:?}", protocol_name);
						CustomMessageOutcome::None
					}
				}
			GenericProtoOut::Clogged { peer_id, messages } => {
				debug!(target: "sync", "{} clogging messages:", messages.len());
				for msg in messages.into_iter().take(5) {
					let message: Option<Message<B>> = Decode::decode(&mut &msg[..]).ok();
					debug!(target: "sync", "{:?}", message);
					self.on_clogged_peer(peer_id.clone(), message);
				}
				CustomMessageOutcome::None
			}
		};

		if let CustomMessageOutcome::None = outcome {
			Poll::Pending
		} else {
			Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome))
		}
	}

	fn inject_addr_reach_failure(
		&mut self,
		peer_id: Option<&PeerId>,
		addr: &Multiaddr,
		error: &dyn std::error::Error
	) {
		self.behaviour.inject_addr_reach_failure(peer_id, addr, error)
	}

	fn inject_dial_failure(&mut self, peer_id: &PeerId) {
		self.behaviour.inject_dial_failure(peer_id)
	}

	fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
		self.behaviour.inject_new_listen_addr(addr)
	}

	fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
		self.behaviour.inject_expired_listen_addr(addr)
	}

	fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
		self.behaviour.inject_new_external_addr(addr)
	}

	fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
		self.behaviour.inject_listener_error(id, err);
	}

	fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) {
		self.behaviour.inject_listener_closed(id, reason);
	}
}

impl<B: BlockT, H: ExHashT> Drop for Protocol<B, H> {
	fn drop(&mut self) {
		debug!(target: "sync", "Network stats:\n{}", self.format_stats());
	}
}

#[cfg(test)]
mod tests {
	use crate::PeerId;
	use crate::config::EmptyTransactionPool;
	use super::{CustomMessageOutcome, Protocol, ProtocolConfig};

	use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
	use std::sync::Arc;
	use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt};
	use substrate_test_runtime_client::runtime::{Block, Hash};

	#[test]
	fn no_handshake_no_notif_closed() {
		let client = Arc::new(TestClientBuilder::with_default_backend().build_with_longest_chain().0);

		let (mut protocol, _) = Protocol::<Block, Hash>::new(
			ProtocolConfig::default(),
			PeerId::random(),
			client.clone(),
			Arc::new(EmptyTransactionPool),
			None,
			None,
			From::from(&b"test"[..]),
			sc_peerset::PeersetConfig {
				in_peers: 10,
				out_peers: 10,
				bootnodes: Vec::new(),
				reserved_only: false,
				priority_groups: Vec::new(),
			},
			Box::new(DefaultBlockAnnounceValidator::new(client.clone())),
			None,
			Default::default(),
			true,
			None,
		).unwrap();

		let dummy_peer_id = PeerId::random();
		let _ = protocol.on_peer_connected(dummy_peer_id.clone());
		match protocol.on_peer_disconnected(dummy_peer_id) {
			CustomMessageOutcome::None => {},
			_ => panic!()
		};
	}
}