From 009898f3097595e9e4f9bb5625b227264ecf93b1 Mon Sep 17 00:00:00 2001
From: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Date: Sat, 18 May 2019 02:05:00 +0200
Subject: [PATCH] feat(light client): fetch block body from remote (#2527)

* feat(on_demand): block body request

* fix(light block req): no justific + one block

* fix(bad rebase)

* feat(protocol): add messages for `remote_body`

* fix(on demand body): remove needless `take()`

* fix(network): remove messages for `on_demand_body`

* fix(grumbles): use `hash` in `remote_body_requests`

As long as we can't compute `ordered_trie_root(body)` just compare that request.header.hash() == response.header.hash()

* fix(grumbles): `hdr.ext_root == trie_root(body)`

* fix(grumbles): propogate `Err` in `fn body()`

* fix(grumbles): Vec<Block::Extrinsic>

* fix(grumbles): util_fn for `not_impl` in tests

* fix(on remote body): tests `fetch` and `on_demand`

* docs(resolve todos)
---
 substrate/core/client/src/light/blockchain.rs |  18 +-
 substrate/core/client/src/light/fetcher.rs    | 132 ++++++++++++--
 substrate/core/network/src/message.rs         |   2 +-
 substrate/core/network/src/on_demand.rs       | 172 +++++++++++++++++-
 substrate/core/network/src/protocol.rs        |  23 ++-
 substrate/core/test-client/src/lib.rs         |   8 +
 6 files changed, 328 insertions(+), 27 deletions(-)

diff --git a/substrate/core/client/src/light/blockchain.rs b/substrate/core/client/src/light/blockchain.rs
index c38d50303ec..d7fcd442324 100644
--- a/substrate/core/client/src/light/blockchain.rs
+++ b/substrate/core/client/src/light/blockchain.rs
@@ -30,7 +30,7 @@ use crate::blockchain::{Backend as BlockchainBackend, BlockStatus, Cache as Bloc
 	HeaderBackend as BlockchainHeaderBackend, Info as BlockchainInfo, ProvideCache};
 use crate::cht;
 use crate::error::{Error as ClientError, Result as ClientResult};
-use crate::light::fetcher::{Fetcher, RemoteHeaderRequest};
+use crate::light::fetcher::{Fetcher, RemoteBodyRequest, RemoteHeaderRequest};
 
 /// Light client blockchain storage.
 pub trait Storage<Block: BlockT>: AuxStore + BlockchainHeaderBackend<Block> {
@@ -144,9 +144,19 @@ impl<S, F, Block> BlockchainHeaderBackend<Block> for Blockchain<S, F> where Bloc
 }
 
 impl<S, F, Block> BlockchainBackend<Block> for Blockchain<S, F> where Block: BlockT, S: Storage<Block>, F: Fetcher<Block> {
-	fn body(&self, _id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
-		// TODO: #1445 fetch from remote node
-		Ok(None)
+	fn body(&self, id: BlockId<Block>) -> ClientResult<Option<Vec<Block::Extrinsic>>> {
+		let header = match self.header(id)? {
+			Some(header) => header,
+			None => return Ok(None),
+		};
+
+		self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)?
+			.remote_body(RemoteBodyRequest {
+				header,
+				retry_count: None,
+			})
+			.into_future().wait()
+			.map(Some)
 	}
 
 	fn justification(&self, _id: BlockId<Block>) -> ClientResult<Option<Justification>> {
diff --git a/substrate/core/client/src/light/fetcher.rs b/substrate/core/client/src/light/fetcher.rs
index 1e6f0842fb1..ff6d3c5c457 100644
--- a/substrate/core/client/src/light/fetcher.rs
+++ b/substrate/core/client/src/light/fetcher.rs
@@ -22,8 +22,9 @@ use std::marker::PhantomData;
 use futures::IntoFuture;
 
 use hash_db::{HashDB, Hasher};
+use parity_codec::Encode;
 use primitives::{ChangesTrieConfiguration, convert_hash};
-use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberFor};
+use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, Hash, HashFor, NumberFor};
 use state_machine::{CodeExecutor, ChangesTrieRootsStorage, ChangesTrieAnchorBlockId,
 	TrieBackend, read_proof_check, key_changes_proof_check,
 	create_proof_check_backend_storage, read_child_proof_check};
@@ -124,17 +125,28 @@ pub struct ChangesProof<Header: HeaderT> {
 	pub roots_proof: Vec<Vec<u8>>,
 }
 
+/// Remote block body request
+#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)]
+pub struct RemoteBodyRequest<Header: HeaderT> {
+	/// Header of the requested block body
+	pub header: Header,
+	/// Number of times to retry request. None means that default RETRY_COUNT is used.
+	pub retry_count: Option<usize>,
+}
+
 /// Light client data fetcher. Implementations of this trait must check if remote data
 /// is correct (see FetchedDataChecker) and return already checked data.
 pub trait Fetcher<Block: BlockT>: Send + Sync {
 	/// Remote header future.
-	type RemoteHeaderResult: IntoFuture<Item=Block::Header, Error=ClientError>;
+	type RemoteHeaderResult: IntoFuture<Item = Block::Header, Error = ClientError>;
 	/// Remote storage read future.
-	type RemoteReadResult: IntoFuture<Item=Option<Vec<u8>>, Error=ClientError>;
+	type RemoteReadResult: IntoFuture<Item = Option<Vec<u8>>, Error = ClientError>;
 	/// Remote call result future.
-	type RemoteCallResult: IntoFuture<Item=Vec<u8>, Error=ClientError>;
+	type RemoteCallResult: IntoFuture<Item = Vec<u8>, Error = ClientError>;
 	/// Remote changes result future.
-	type RemoteChangesResult: IntoFuture<Item=Vec<(NumberFor<Block>, u32)>, Error=ClientError>;
+	type RemoteChangesResult: IntoFuture<Item = Vec<(NumberFor<Block>, u32)>, Error = ClientError>;
+	/// Remote block body result future.
+	type RemoteBodyResult: IntoFuture<Item = Vec<Block::Extrinsic>, Error = ClientError>;
 
 	/// Fetch remote header.
 	fn remote_header(&self, request: RemoteHeaderRequest<Block::Header>) -> Self::RemoteHeaderResult;
@@ -153,6 +165,8 @@ pub trait Fetcher<Block: BlockT>: Send + Sync {
 	/// Fetch remote changes ((block number, extrinsic index)) where given key has been changed
 	/// at a given blocks range.
 	fn remote_changes(&self, request: RemoteChangesRequest<Block::Header>) -> Self::RemoteChangesResult;
+	/// Fetch remote block body
+	fn remote_body(&self, request: RemoteBodyRequest<Block::Header>) -> Self::RemoteBodyResult;
 }
 
 /// Light client remote data checker.
@@ -191,6 +205,12 @@ pub trait FetchChecker<Block: BlockT>: Send + Sync {
 		request: &RemoteChangesRequest<Block::Header>,
 		proof: ChangesProof<Block::Header>
 	) -> ClientResult<Vec<(NumberFor<Block>, u32)>>;
+	/// Check remote body proof.
+	fn check_body_proof(
+		&self,
+		request: &RemoteBodyRequest<Block::Header>,
+		body: Vec<Block::Extrinsic>
+	) -> ClientResult<Vec<Block::Extrinsic>>;
 }
 
 /// Remote data checker.
@@ -396,6 +416,25 @@ impl<E, Block, H, S, F> FetchChecker<Block> for LightDataChecker<E, H, Block, S,
 	) -> ClientResult<Vec<(NumberFor<Block>, u32)>> {
 		self.check_changes_proof_with_cht_size(request, remote_proof, cht::SIZE)
 	}
+
+	fn check_body_proof(
+		&self,
+		request: &RemoteBodyRequest<Block::Header>,
+		body: Vec<Block::Extrinsic>
+	) -> ClientResult<Vec<Block::Extrinsic>> {
+
+		// TODO: #2621
+		let	extrinsics_root = HashFor::<Block>::ordered_trie_root(body.iter().map(Encode::encode));
+		if *request.header.extrinsics_root() == extrinsics_root {
+			Ok(body)
+		} else {
+			Err(format!("RemoteBodyRequest: invalid extrinsics root expected: {} but got {}",
+				*request.header.extrinsics_root(),
+				extrinsics_root,
+			).into())
+		}
+
+	}
 }
 
 /// A view of BTreeMap<Number, Hash> as a changes trie roots storage.
@@ -438,7 +477,7 @@ pub mod tests {
 	use crate::error::Error as ClientError;
 	use test_client::{
 		self, TestClient, blockchain::HeaderBackend, AccountKeyring,
-		runtime::{self, Hash, Block, Header}
+		runtime::{self, Hash, Block, Header, Extrinsic}
 	};
 	use consensus::BlockOrigin;
 
@@ -446,7 +485,7 @@ pub mod tests {
 	use crate::light::fetcher::{Fetcher, FetchChecker, LightDataChecker,
 		RemoteCallRequest, RemoteHeaderRequest};
 	use crate::light::blockchain::tests::{DummyStorage, DummyBlockchain};
-	use primitives::{blake2_256, Blake2Hasher};
+	use primitives::{blake2_256, Blake2Hasher, H256};
 	use primitives::storage::{StorageKey, well_known_keys};
 	use runtime_primitives::generic::BlockId;
 	use state_machine::Backend;
@@ -454,22 +493,30 @@ pub mod tests {
 
 	pub type OkCallFetcher = Mutex<Vec<u8>>;
 
+	fn not_implemented_in_tests<T, E>() -> FutureResult<T, E>
+	where
+		E: std::convert::From<&'static str>,
+	{
+		err("Not implemented on test node".into())
+	}
+
 	impl Fetcher<Block> for OkCallFetcher {
 		type RemoteHeaderResult = FutureResult<Header, ClientError>;
 		type RemoteReadResult = FutureResult<Option<Vec<u8>>, ClientError>;
 		type RemoteCallResult = FutureResult<Vec<u8>, ClientError>;
 		type RemoteChangesResult = FutureResult<Vec<(NumberFor<Block>, u32)>, ClientError>;
+		type RemoteBodyResult = FutureResult<Vec<Extrinsic>, ClientError>;
 
 		fn remote_header(&self, _request: RemoteHeaderRequest<Header>) -> Self::RemoteHeaderResult {
-			err("Not implemented on test node".into())
+			not_implemented_in_tests()
 		}
 
 		fn remote_read(&self, _request: RemoteReadRequest<Header>) -> Self::RemoteReadResult {
-			err("Not implemented on test node".into())
+			not_implemented_in_tests()
 		}
 
 		fn remote_read_child(&self, _request: RemoteReadChildRequest<Header>) -> Self::RemoteReadResult {
-			err("Not implemented on test node".into())
+			not_implemented_in_tests()
 		}
 
 		fn remote_call(&self, _request: RemoteCallRequest<Header>) -> Self::RemoteCallResult {
@@ -477,11 +524,21 @@ pub mod tests {
 		}
 
 		fn remote_changes(&self, _request: RemoteChangesRequest<Header>) -> Self::RemoteChangesResult {
-			err("Not implemented on test node".into())
+			not_implemented_in_tests()
+		}
+
+		fn remote_body(&self, _request: RemoteBodyRequest<Header>) -> Self::RemoteBodyResult {
+			not_implemented_in_tests()
 		}
 	}
 
-	type TestChecker = LightDataChecker<executor::NativeExecutor<test_client::LocalExecutor>, Blake2Hasher, Block, DummyStorage, OkCallFetcher>;
+	type TestChecker = LightDataChecker<
+		executor::NativeExecutor<test_client::LocalExecutor>,
+		Blake2Hasher,
+		Block,
+		DummyStorage,
+		OkCallFetcher,
+	>;
 
 	fn prepare_for_read_proof_check() -> (TestChecker, Header, Vec<Vec<u8>>, u32) {
 		// prepare remote client
@@ -537,6 +594,14 @@ pub mod tests {
 		(local_checker, local_cht_root, remote_block_header, remote_header_proof)
 	}
 
+	fn header_with_computed_extrinsics_root(extrinsics: Vec<Extrinsic>) -> Header {
+		let extrinsics_root =
+			trie::ordered_trie_root::<Blake2Hasher, _, _>(extrinsics.iter().map(Encode::encode));
+
+		// only care about `extrinsics_root`
+		Header::new(0, extrinsics_root, H256::zero(), H256::zero(), Default::default())
+	}
+
 	#[test]
 	fn storage_read_proof_is_generated_and_checked() {
 		let (local_checker, remote_block_header, remote_read_proof, authorities_len) = prepare_for_read_proof_check();
@@ -776,4 +841,47 @@ pub mod tests {
 		);
 		assert!(local_checker.check_changes_tries_proof(4, &remote_proof.roots, vec![]).is_err());
 	}
+
+	#[test]
+	fn check_body_proof_faulty() {
+		let header = header_with_computed_extrinsics_root(
+			vec![Extrinsic::IncludeData(vec![1, 2, 3, 4])]
+		);
+		let block = Block::new(header.clone(), Vec::new());
+
+		let local_checker = TestChecker::new(
+			Arc::new(DummyBlockchain::new(DummyStorage::new())),
+			test_client::LocalExecutor::new(None)
+		);
+
+		let body_request = RemoteBodyRequest {
+			header: header.clone(),
+			retry_count: None,
+		};
+
+		assert!(
+			local_checker.check_body_proof(&body_request, block.extrinsics).is_err(),
+			"vec![1, 2, 3, 4] != vec![]"
+		);
+	}
+
+	#[test]
+	fn check_body_proof_of_same_data_should_succeed() {
+		let extrinsics = vec![Extrinsic::IncludeData(vec![1, 2, 3, 4, 5, 6, 7, 8, 255])];
+
+		let header = header_with_computed_extrinsics_root(extrinsics.clone());
+		let block = Block::new(header.clone(), extrinsics);
+
+		let local_checker = TestChecker::new(
+			Arc::new(DummyBlockchain::new(DummyStorage::new())),
+			test_client::LocalExecutor::new(None)
+		);
+
+		let body_request = RemoteBodyRequest {
+			header: header.clone(),
+			retry_count: None,
+		};
+
+		assert!(local_checker.check_body_proof(&body_request, block.extrinsics).is_ok());
+	}
 }
diff --git a/substrate/core/network/src/message.rs b/substrate/core/network/src/message.rs
index 31667033890..6a38c106b73 100644
--- a/substrate/core/network/src/message.rs
+++ b/substrate/core/network/src/message.rs
@@ -205,7 +205,7 @@ pub mod generic {
 		FinalityProofRequest(FinalityProofRequest<Hash>),
 		/// Finality proof reponse.
 		FinalityProofResponse(FinalityProofResponse<Hash>),
-		/// Chain-specific message
+		/// Chain-specific message.
 		#[codec(index = "255")]
 		ChainSpecific(Vec<u8>),
 	}
diff --git a/substrate/core/network/src/on_demand.rs b/substrate/core/network/src/on_demand.rs
index aea762eefcc..ec4c178232f 100644
--- a/substrate/core/network/src/on_demand.rs
+++ b/substrate/core/network/src/on_demand.rs
@@ -22,13 +22,12 @@ use std::time::{Instant, Duration};
 use log::{trace, info};
 use futures::{Async, Future, Poll};
 use futures::sync::oneshot::{channel, Receiver, Sender as OneShotSender};
-use linked_hash_map::LinkedHashMap;
-use linked_hash_map::Entry;
+use linked_hash_map::{Entry, LinkedHashMap};
 use parking_lot::Mutex;
 use client::error::Error as ClientError;
 use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
 	RemoteCallRequest, RemoteReadRequest, RemoteChangesRequest, ChangesProof,
-	RemoteReadChildRequest};
+	RemoteReadChildRequest, RemoteBodyRequest};
 use crate::message;
 use network_libp2p::PeerId;
 use crate::config::Roles;
@@ -76,6 +75,16 @@ pub trait OnDemandService<Block: BlockT>: Send + Sync {
 		peer: PeerId,
 		response: message::RemoteChangesResponse<NumberFor<Block>, Block::Hash>
 	);
+
+	/// When body response is received from remote node.
+	fn on_remote_body_response(
+		&self,
+		peer: PeerId,
+		response: message::BlockResponse<Block>
+	);
+
+	/// Check whether a block response is an `on_demand` response
+	fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool;
 }
 
 /// Trait used by the `OnDemand` service to communicate messages back to the network.
@@ -139,6 +148,7 @@ struct Request<Block: BlockT> {
 }
 
 enum RequestData<Block: BlockT> {
+	RemoteBody(RemoteBodyRequest<Block::Header>, OneShotSender<Result<Vec<Block::Extrinsic>, ClientError>>),
 	RemoteHeader(RemoteHeaderRequest<Block::Header>, OneShotSender<Result<Block::Header, ClientError>>),
 	RemoteRead(RemoteReadRequest<Block::Header>, OneShotSender<Result<Option<Vec<u8>>, ClientError>>),
 	RemoteReadChild(
@@ -391,6 +401,41 @@ impl<B> OnDemandService<B> for OnDemand<B> where
 			data => Accept::Unexpected(data),
 		})
 	}
+
+	fn on_remote_body_response(&self, peer: PeerId, response: message::BlockResponse<B>) {
+		self.accept_response("body", peer, response.id, |request| match request.data {
+			RequestData::RemoteBody(request, sender) => {
+				let mut bodies: Vec<_> = response
+					.blocks
+					.into_iter()
+					.filter_map(|b| b.body)
+					.collect();
+
+				// Number of bodies are hardcoded to 1 for valid `RemoteBodyResponses`
+				if bodies.len() != 1 {
+					return Accept::CheckFailed(
+						"RemoteBodyResponse: invalid number of blocks".into(),
+						RequestData::RemoteBody(request, sender),
+					)
+				}
+				let body = bodies.remove(0);
+
+				match self.checker.check_body_proof(&request, body) {
+					Ok(body) => {
+						let _ = sender.send(Ok(body));
+						Accept::Ok
+					}
+					Err(error) => Accept::CheckFailed(error, RequestData::RemoteBody(request, sender)),
+				}
+			}
+			other => Accept::Unexpected(other),
+		})
+	}
+
+	fn is_on_demand_response(&self, peer: &PeerId, request_id: message::RequestId) -> bool {
+		let core = self.core.lock();
+		core.is_pending_request(&peer, request_id)
+	}
 }
 
 impl<B> Fetcher<B> for OnDemand<B> where
@@ -401,6 +446,7 @@ impl<B> Fetcher<B> for OnDemand<B> where
 	type RemoteReadResult = RemoteResponse<Option<Vec<u8>>>;
 	type RemoteCallResult = RemoteResponse<Vec<u8>>;
 	type RemoteChangesResult = RemoteResponse<Vec<(NumberFor<B>, u32)>>;
+	type RemoteBodyResult = RemoteResponse<Vec<B::Extrinsic>>;
 
 	fn remote_header(&self, request: RemoteHeaderRequest<B::Header>) -> Self::RemoteHeaderResult {
 		let (sender, receiver) = channel();
@@ -440,12 +486,22 @@ impl<B> Fetcher<B> for OnDemand<B> where
 		self.schedule_request(request.retry_count.clone(), RequestData::RemoteChanges(request, sender),
 			RemoteResponse { receiver })
 	}
+
+	fn remote_body(&self, request: RemoteBodyRequest<B::Header>) -> Self::RemoteBodyResult {
+		let (sender, receiver) = channel();
+		self.schedule_request(request.retry_count.clone(), RequestData::RemoteBody(request, sender),
+			RemoteResponse { receiver })
+	}
 }
 
 impl<B> OnDemandCore<B> where
 	B: BlockT,
 	B::Header: HeaderT,
 {
+	fn is_pending_request(&self, peer: &PeerId, request_id: message::RequestId) -> bool {
+		self.active_peers.get(&peer).map_or(false, |r| r.id == request_id)
+	}
+
 	pub fn add_peer(&mut self, peer: PeerId, best_number: NumberFor<B>) {
 		self.idle_peers.push_back(peer.clone());
 		self.best_blocks.insert(peer, best_number);
@@ -570,6 +626,7 @@ impl<Block: BlockT> Request<Block> {
 			RequestData::RemoteReadChild(ref data, _) => *data.header.number(),
 			RequestData::RemoteCall(ref data, _) => *data.header.number(),
 			RequestData::RemoteChanges(ref data, _) => data.max_block.0,
+			RequestData::RemoteBody(ref data, _) => *data.header.number(),
 		}
 	}
 
@@ -610,6 +667,16 @@ impl<Block: BlockT> Request<Block> {
 					max: data.max_block.1.clone(),
 					key: data.key.clone(),
 				}),
+			RequestData::RemoteBody(ref data, _) => {
+				message::generic::Message::BlockRequest(message::BlockRequest::<Block> {
+					id: self.id,
+					fields: message::BlockAttributes::BODY,
+					from: message::FromBlock::Hash(data.header.hash()),
+					to: None,
+					direction: message::Direction::Ascending,
+					max: Some(1),
+				})
+			}
 		}
 	}
 }
@@ -623,6 +690,7 @@ impl<Block: BlockT> RequestData<Block> {
 			RequestData::RemoteRead(_, sender) => { let _ = sender.send(Err(error)); },
 			RequestData::RemoteReadChild(_, sender) => { let _ = sender.send(Err(error)); },
 			RequestData::RemoteChanges(_, sender) => { let _ = sender.send(Err(error)); },
+			RequestData::RemoteBody(_, sender) => { let _ = sender.send(Err(error)); },
 		}
 	}
 }
@@ -637,12 +705,12 @@ pub mod tests {
 	use client::{error::{Error as ClientError, Result as ClientResult}};
 	use client::light::fetcher::{Fetcher, FetchChecker, RemoteHeaderRequest,
 		ChangesProof,	RemoteCallRequest, RemoteReadRequest,
-		RemoteReadChildRequest, RemoteChangesRequest};
+		RemoteReadChildRequest, RemoteChangesRequest, RemoteBodyRequest};
 	use crate::config::Roles;
 	use crate::message;
 	use network_libp2p::PeerId;
 	use super::{REQUEST_TIMEOUT, OnDemand, OnDemandNetwork, OnDemandService};
-	use test_client::runtime::{changes_trie_config, Block, Header};
+	use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header};
 
 	pub struct DummyExecutor;
 	struct DummyFetchChecker { ok: bool }
@@ -685,12 +753,27 @@ pub mod tests {
 			}
 		}
 
-		fn check_changes_proof(&self, _: &RemoteChangesRequest<Header>, _: ChangesProof<Header>) -> ClientResult<Vec<(NumberFor<Block>, u32)>> {
+		fn check_changes_proof(
+			&self,
+			_: &RemoteChangesRequest<Header>,
+			_: ChangesProof<Header>
+		) -> ClientResult<Vec<(NumberFor<Block>, u32)>> {
 			match self.ok {
 				true => Ok(vec![(100, 2)]),
 				false => Err(ClientError::Backend("Test error".into())),
 			}
 		}
+
+		fn check_body_proof(
+			&self,
+			_: &RemoteBodyRequest<Header>,
+			body: Vec<Extrinsic>
+		) -> ClientResult<Vec<Extrinsic>> {
+			match self.ok {
+				true => Ok(body),
+				false => Err(ClientError::Backend("Test error".into())),
+			}
+		}
 	}
 
 	fn dummy(ok: bool) -> (Arc<DummyExecutor>, Arc<OnDemand<Block>>) {
@@ -1166,4 +1249,81 @@ pub mod tests {
 		assert!(on_demand.core.lock().idle_peers.iter().cloned().collect::<Vec<_>>().is_empty());
 		assert_eq!(on_demand.core.lock().pending_requests.len(), 1);
 	}
+
+	#[test]
+	fn remote_body_with_one_block_body_should_succeed() {
+		let (_x, on_demand) = dummy(true);
+		let network_interface = Arc::new(DummyNetwork::default());
+		let peer1 = PeerId::random();
+		on_demand.set_network_interface(Box::new(network_interface.clone()));
+
+		let header = dummy_header();
+		on_demand.on_connect(peer1.clone(), Roles::FULL, 250);
+
+		on_demand.remote_body(RemoteBodyRequest {
+			header: header.clone(),
+			retry_count: None,
+		});
+
+		assert!(on_demand.core.lock().pending_requests.is_empty());
+		assert_eq!(on_demand.core.lock().active_peers.len(), 1);
+
+		let block = message::BlockData::<Block> {
+			hash: primitives::H256::random(),
+			header: None,
+			body: Some(Vec::new()),
+			message_queue: None,
+			receipt: None,
+			justification: None,
+		};
+
+		let response = message::generic::BlockResponse {
+			id: 0,
+			blocks: vec![block],
+		};
+
+		on_demand.on_remote_body_response(peer1.clone(), response);
+
+		assert!(on_demand.core.lock().active_peers.is_empty());
+		assert_eq!(on_demand.core.lock().idle_peers.len(), 1);
+	}
+
+	#[test]
+	fn remote_body_with_three_bodies_should_fail() {
+		let (_x, on_demand) = dummy(true);
+		let network_interface = Arc::new(DummyNetwork::default());
+		let peer1 = PeerId::random();
+		on_demand.set_network_interface(Box::new(network_interface.clone()));
+
+		let header = dummy_header();
+		on_demand.on_connect(peer1.clone(), Roles::FULL, 250);
+
+		on_demand.remote_body(RemoteBodyRequest {
+			header: header.clone(),
+			retry_count: None,
+		});
+
+		assert!(on_demand.core.lock().pending_requests.is_empty());
+		assert_eq!(on_demand.core.lock().active_peers.len(), 1);
+
+		let response = {
+			let blocks: Vec<_> = (0..3).map(|_| message::BlockData::<Block> {
+				hash: primitives::H256::random(),
+				header: None,
+				body: Some(Vec::new()),
+				message_queue: None,
+				receipt: None,
+				justification: None,
+			}).collect();
+
+			message::generic::BlockResponse {
+				id: 0,
+				blocks,
+			}
+		};
+
+		on_demand.on_remote_body_response(peer1.clone(), response);
+		assert!(on_demand.core.lock().active_peers.is_empty());
+		assert!(on_demand.core.lock().idle_peers.is_empty(), "peer should be disconnected after bad response");
+	}
 }
diff --git a/substrate/core/network/src/protocol.rs b/substrate/core/network/src/protocol.rs
index 33e376aab33..685b447df6e 100644
--- a/substrate/core/network/src/protocol.rs
+++ b/substrate/core/network/src/protocol.rs
@@ -324,6 +324,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Future for Protocol<B,
 }
 
 impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
+	fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool {
+		self.on_demand.as_ref().map_or(false, |od| od.is_on_demand_response(&who, response_id))
+	}
+
 	fn handle_response(
 		&mut self,
 		who: PeerId,
@@ -365,10 +369,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 			GenericMessage::Status(s) => self.on_status_message(who, s),
 			GenericMessage::BlockRequest(r) => self.on_block_request(who, r),
 			GenericMessage::BlockResponse(r) => {
-				if let Some(request) = self.handle_response(who.clone(), &r) {
-					let outcome = self.on_block_response(who.clone(), request, r);
-					self.update_peer_info(&who);
-					return outcome
+				// Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter.
+				if self.is_on_demand_response(&who, r.id) {
+					self.on_remote_body_response(who, r);
+				} else {
+					if let Some(request) = self.handle_response(who.clone(), &r) {
+						let outcome = self.on_block_response(who.clone(), request, r);
+						self.update_peer_info(&who);
+						return outcome
+					}
 				}
 			},
 			GenericMessage::BlockAnnounce(announce) => {
@@ -1202,6 +1211,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
 			CustomMessageOutcome::None
 		}
 	}
+
+	fn on_remote_body_response(&self, peer: PeerId, response: message::BlockResponse<B>) {
+		self.on_demand
+			.as_ref()
+			.map(|od| od.on_remote_body_response(peer, response));
+	}
 }
 
 /// Outcome of an incoming custom message.
diff --git a/substrate/core/test-client/src/lib.rs b/substrate/core/test-client/src/lib.rs
index dfbb1fbcec4..770b3de323f 100644
--- a/substrate/core/test-client/src/lib.rs
+++ b/substrate/core/test-client/src/lib.rs
@@ -288,6 +288,7 @@ impl<Block: BlockT> client::light::fetcher::Fetcher<Block> for LightFetcher {
 	type RemoteReadResult = FutureResult<Option<Vec<u8>>, client::error::Error>;
 	type RemoteCallResult = FutureResult<Vec<u8>, client::error::Error>;
 	type RemoteChangesResult = FutureResult<Vec<(NumberFor<Block>, u32)>, client::error::Error>;
+	type RemoteBodyResult = FutureResult<Vec<Block::Extrinsic>, client::error::Error>;
 
 	fn remote_header(
 		&self,
@@ -323,4 +324,11 @@ impl<Block: BlockT> client::light::fetcher::Fetcher<Block> for LightFetcher {
 	) -> Self::RemoteChangesResult {
 		unimplemented!("not (yet) used in tests")
 	}
+
+	fn remote_body(
+		&self,
+		_request: client::light::fetcher::RemoteBodyRequest<Block::Header>,
+	) -> Self::RemoteBodyResult {
+		unimplemented!("not (yet) used in tests")
+	}
 }
-- 
GitLab