Skip to content
Snippets Groups Projects
Commit 9ab9134e authored by Pierre Krieger's avatar Pierre Krieger Committed by GitHub
Browse files

Remove request ID from the new protocol (#5049)

parent b19ea347
No related merge requests found
......@@ -111,7 +111,7 @@ impl Config {
let mut v = Vec::new();
v.extend_from_slice(b"/");
v.extend_from_slice(id.as_bytes());
v.extend_from_slice(b"/sync/1");
v.extend_from_slice(b"/sync/2");
self.protocol = v.into();
self
}
......@@ -146,8 +146,7 @@ where
, request: &api::v1::BlockRequest
) -> Result<api::v1::BlockResponse, Error>
{
log::trace!("block request {} from peer {}: from block {:?} to block {:?}, max blocks {:?}",
request.id,
log::trace!("block request from peer {}: from block {:?} to block {:?}, max blocks {:?}",
peer,
request.from_block,
request.to_block,
......@@ -242,7 +241,7 @@ where
}
}
Ok(api::v1::BlockResponse { id: request.id, blocks })
Ok(api::v1::BlockResponse { blocks })
}
}
......@@ -274,10 +273,10 @@ where
fn inject_node_event(&mut self, peer: PeerId, Request(request, mut stream): Request<NegotiatedSubstream>) {
match self.on_block_request(&peer, &request) {
Ok(res) => {
log::trace!("enqueueing block response {} for peer {} with {} blocks", res.id, peer, res.blocks.len());
log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len());
let mut data = Vec::with_capacity(res.encoded_len());
if let Err(e) = res.encode(&mut data) {
log::debug!("error encoding block response {} for peer {}: {}", res.id, peer, e)
log::debug!("error encoding block response for peer {}: {}", peer, e)
} else {
let future = async move {
if let Err(e) = write_one(&mut stream, data).await {
......@@ -287,7 +286,7 @@ where
self.outgoing.push(future.boxed())
}
}
Err(e) => log::debug!("error handling block request {} from peer {}: {}", request.id, peer, e)
Err(e) => log::debug!("error handling block request from peer {}: {}", peer, e)
}
}
......
......@@ -127,7 +127,7 @@ impl Config {
let mut v = Vec::new();
v.extend_from_slice(b"/");
v.extend_from_slice(id.as_bytes());
v.extend_from_slice(b"/light/1");
v.extend_from_slice(b"/light/2");
self.protocol = v.into();
self
}
......@@ -350,7 +350,7 @@ where
, response: api::v1::light::Response
) -> Result<Reply<B>, Error>
{
log::trace!("response {} from {}", response.id, peer);
log::trace!("response from {}", peer);
use api::v1::light::response::Response;
match response.response {
Some(Response::RemoteCallResponse(response)) =>
......@@ -419,12 +419,10 @@ where
fn on_remote_call_request
( &mut self
, peer: &PeerId
, request_id: u64
, request: &api::v1::light::RemoteCallRequest
) -> Result<api::v1::light::Response, Error>
{
log::trace!("remote call request {} from {} ({} at {:?})",
request_id,
log::trace!("remote call request from {} ({} at {:?})",
peer,
request.method,
request.block);
......@@ -434,8 +432,7 @@ where
let proof = match self.chain.execution_proof(&block, &request.method, &request.data) {
Ok((_, proof)) => proof,
Err(e) => {
log::trace!("remote call request {} from {} ({} at {:?}) failed with: {}",
request_id,
log::trace!("remote call request from {} ({} at {:?}) failed with: {}",
peer,
request.method,
request.block,
......@@ -449,13 +446,12 @@ where
api::v1::light::response::Response::RemoteCallResponse(r)
};
Ok(api::v1::light::Response { id: request_id, response: Some(response) })
Ok(api::v1::light::Response { response: Some(response) })
}
fn on_remote_read_request
( &mut self
, peer: &PeerId
, request_id: u64
, request: &api::v1::light::RemoteReadRequest
) -> Result<api::v1::light::Response, Error>
{
......@@ -464,8 +460,7 @@ where
return Err(Error::BadRequest("remote read request without keys"))
}
log::trace!("remote read request {} from {} ({} at {:?})",
request_id,
log::trace!("remote read request from {} ({} at {:?})",
peer,
fmt_keys(request.keys.first(), request.keys.last()),
request.block);
......@@ -475,8 +470,7 @@ where
let proof = match self.chain.read_proof(&block, &request.keys) {
Ok(proof) => proof,
Err(error) => {
log::trace!("remote read request {} from {} ({} at {:?}) failed with: {}",
request_id,
log::trace!("remote read request from {} ({} at {:?}) failed with: {}",
peer,
fmt_keys(request.keys.first(), request.keys.last()),
request.block,
......@@ -490,13 +484,12 @@ where
api::v1::light::response::Response::RemoteReadResponse(r)
};
Ok(api::v1::light::Response { id: request_id, response: Some(response) })
Ok(api::v1::light::Response { response: Some(response) })
}
fn on_remote_read_child_request
( &mut self
, peer: &PeerId
, request_id: u64
, request: &api::v1::light::RemoteReadChildRequest
) -> Result<api::v1::light::Response, Error>
{
......@@ -505,8 +498,7 @@ where
return Err(Error::BadRequest("remove read child request without keys"))
}
log::trace!("remote read child request {} from {} ({} {} at {:?})",
request_id,
log::trace!("remote read child request from {} ({} {} at {:?})",
peer,
request.storage_key.to_hex::<String>(),
fmt_keys(request.keys.first(), request.keys.last()),
......@@ -519,8 +511,7 @@ where
match self.chain.read_child_proof(&block, &request.storage_key, info, &request.keys) {
Ok(proof) => proof,
Err(error) => {
log::trace!("remote read child request {} from {} ({} {} at {:?}) failed with: {}",
request_id,
log::trace!("remote read child request from {} ({} {} at {:?}) failed with: {}",
peer,
request.storage_key.to_hex::<String>(),
fmt_keys(request.keys.first(), request.keys.last()),
......@@ -530,8 +521,7 @@ where
}
}
} else {
log::trace!("remote read child request {} from {} ({} {} at {:?}) failed with: {}",
request_id,
log::trace!("remote read child request from {} ({} {} at {:?}) failed with: {}",
peer,
request.storage_key.to_hex::<String>(),
fmt_keys(request.keys.first(), request.keys.last()),
......@@ -546,25 +536,23 @@ where
api::v1::light::response::Response::RemoteReadResponse(r)
};
Ok(api::v1::light::Response { id: request_id, response: Some(response) })
Ok(api::v1::light::Response { response: Some(response) })
}
fn on_remote_header_request
( &mut self
, peer: &PeerId
, request_id: u64
, request: &api::v1::light::RemoteHeaderRequest
) -> Result<api::v1::light::Response, Error>
{
log::trace!("remote header proof request {} from {} ({:?})", request_id, peer, request.block);
log::trace!("remote header proof request from {} ({:?})", peer, request.block);
let block = Decode::decode(&mut request.block.as_ref())?;
let (header, proof) = match self.chain.header_proof(block) {
Ok((header, proof)) => (header.encode(), proof),
Err(error) => {
log::trace!("remote header proof request {} from {} ({:?}) failed with: {}",
request_id,
log::trace!("remote header proof request from {} ({:?}) failed with: {}",
peer,
request.block,
error);
......@@ -577,18 +565,16 @@ where
api::v1::light::response::Response::RemoteHeaderResponse(r)
};
Ok(api::v1::light::Response { id: request_id, response: Some(response) })
Ok(api::v1::light::Response { response: Some(response) })
}
fn on_remote_changes_request
( &mut self
, peer: &PeerId
, request_id: u64
, request: &api::v1::light::RemoteChangesRequest
) -> Result<api::v1::light::Response, Error>
{
log::trace!("remote changes proof request {} from {} for key {} ({:?}..{:?})",
request_id,
log::trace!("remote changes proof request from {} for key {} ({:?}..{:?})",
peer,
if !request.storage_key.is_empty() {
format!("{} : {}", request.storage_key.to_hex::<String>(), request.key.to_hex::<String>())
......@@ -613,8 +599,7 @@ where
let proof = match self.chain.key_changes_proof(first, last, min, max, storage_key.as_ref(), &key) {
Ok(proof) => proof,
Err(error) => {
log::trace!("remote changes proof request {} from {} for key {} ({:?}..{:?}) failed with: {}",
request_id,
log::trace!("remote changes proof request from {} for key {} ({:?}..{:?}) failed with: {}",
peer,
if let Some(sk) = storage_key {
format!("{} : {}", sk.0.to_hex::<String>(), key.0.to_hex::<String>())
......@@ -646,7 +631,7 @@ where
api::v1::light::response::Response::RemoteChangesResponse(r)
};
Ok(api::v1::light::Response { id: request_id, response: Some(response) })
Ok(api::v1::light::Response { response: Some(response) })
}
}
......@@ -697,29 +682,29 @@ where
match event {
// An incoming request from remote has been received.
Event::Request(request, mut stream) => {
log::trace!("incoming request {} from {}", peer, request.id);
log::trace!("incoming request from {}", peer);
let result = match &request.request {
Some(api::v1::light::request::Request::RemoteCallRequest(r)) =>
self.on_remote_call_request(&peer, request.id, r),
self.on_remote_call_request(&peer, r),
Some(api::v1::light::request::Request::RemoteReadRequest(r)) =>
self.on_remote_read_request(&peer, request.id, r),
self.on_remote_read_request(&peer, r),
Some(api::v1::light::request::Request::RemoteHeaderRequest(r)) =>
self.on_remote_header_request(&peer, request.id, r),
self.on_remote_header_request(&peer, r),
Some(api::v1::light::request::Request::RemoteReadChildRequest(r)) =>
self.on_remote_read_child_request(&peer, request.id, r),
self.on_remote_read_child_request(&peer, r),
Some(api::v1::light::request::Request::RemoteChangesRequest(r)) =>
self.on_remote_changes_request(&peer, request.id, r),
self.on_remote_changes_request(&peer, r),
None => {
log::debug!("ignoring request {} without request data from peer {}", request.id, peer);
log::debug!("ignoring request without request data from peer {}", peer);
return
}
};
match result {
Ok(response) => {
log::trace!("enqueueing response {} for peer {}", response.id, peer);
log::trace!("enqueueing response for peer {}", peer);
let mut data = Vec::new();
if let Err(e) = response.encode(&mut data) {
log::debug!("error encoding response {} for peer {}: {}", response.id, peer, e)
log::debug!("error encoding response for peer {}: {}", peer, e)
} else {
let future = async move {
if let Err(e) = write_one(&mut stream, data).await {
......@@ -733,16 +718,15 @@ where
self.remove_peer(&peer);
self.peerset.report_peer(peer, ReputationChange::new(-(1 << 12), "bad request"))
}
Err(e) => log::debug!("error handling request {} from peer {}: {}", request.id, peer, e)
Err(e) => log::debug!("error handling request from peer {}: {}", peer, e)
}
}
// A response to one of our own requests has been received.
Event::Response(response) => {
let id = response.id;
Event::Response(id, response) => {
if let Some(request) = self.outstanding.remove(&id) {
// We first just check if the response originates from the expected peer.
if request.peer != peer {
log::debug!("was expecting response {} from {} instead of {}", id, request.peer, peer);
log::debug!("was expecting response from {} instead of {}", request.peer, peer);
self.outstanding.insert(id, request);
self.remove_peer(&peer);
self.peerset.report_peer(peer, ReputationChange::new_fatal("response from unexpected peer"));
......@@ -836,16 +820,17 @@ where
}
};
if let Some(peer) = available_peer {
let id = self.next_request_id();
let rq = serialize_request(id, &request.request);
let rq = serialize_request(&request.request);
let mut buf = Vec::with_capacity(rq.encoded_len());
if let Err(e) = rq.encode(&mut buf) {
log::debug!("failed to serialize request {}: {}", id, e);
log::debug!("failed to serialize request: {}", e);
send_reply(Err(ClientError::RemoteFetchFailed), request.request)
} else {
let id = self.next_request_id();
log::trace!("sending request {} to peer {}", id, peer);
let protocol = OutboundProtocol {
request: buf,
request_id: id,
max_data_size: self.config.max_data_size,
protocol: self.config.protocol.clone(),
};
......@@ -918,7 +903,7 @@ fn retries<B: Block>(request: &Request<B>) -> usize {
rc.unwrap_or(0)
}
fn serialize_request<B: Block>(id: u64, request: &Request<B>) -> api::v1::light::Request {
fn serialize_request<B: Block>(request: &Request<B>) -> api::v1::light::Request {
let request = match request {
Request::Header { request, .. } => {
let r = api::v1::light::RemoteHeaderRequest { block: request.block.encode() };
......@@ -962,7 +947,7 @@ fn serialize_request<B: Block>(id: u64, request: &Request<B>) -> api::v1::light:
}
};
api::v1::light::Request { id, request: Some(request) }
api::v1::light::Request { request: Some(request) }
}
fn send_reply<B: Block>(result: Result<Reply<B>, ClientError>, request: Request<B>) {
......@@ -1004,7 +989,7 @@ pub enum Event<T> {
/// Incoming request from remote and substream to use for the response.
Request(api::v1::light::Request, T),
/// Incoming response from remote.
Response(api::v1::light::Response),
Response(u64, api::v1::light::Response),
}
/// Substream upgrade protocol.
......@@ -1054,6 +1039,8 @@ where
pub struct OutboundProtocol {
/// The serialized protobuf request.
request: Vec<u8>,
/// Local identifier for the request. Used to associate it with a response.
request_id: u64,
/// The max. request length in bytes.
max_data_size: usize,
/// The protocol to use for upgrade negotiation.
......@@ -1082,7 +1069,7 @@ where
write_one(&mut s, &self.request).await?;
let vec = read_one(&mut s, self.max_data_size).await?;
api::v1::light::Response::decode(&vec[..])
.map(Event::Response)
.map(|r| Event::Response(self.request_id, r))
.map_err(|e| {
ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))
})
......@@ -1308,53 +1295,6 @@ mod tests {
assert_eq!(0, behaviour.outstanding.len());
}
#[test]
fn disconnects_from_peer_on_response_with_wrong_id() {
let peer = PeerId::random();
let pset = peerset();
let mut behaviour = make_behaviour(true, pset.1, make_config());
behaviour.inject_connected(peer.clone(), empty_dialer());
assert_eq!(1, behaviour.peers.len());
let chan = oneshot::channel();
let request = fetcher::RemoteCallRequest {
block: Default::default(),
header: dummy_header(),
method: "test".into(),
call_data: vec![],
retry_count: Some(1),
};
behaviour.request(Request::Call { request, sender: chan.0 }).unwrap();
assert_eq!(1, behaviour.pending_requests.len());
assert_eq!(0, behaviour.outstanding.len());
poll(&mut behaviour); // Make progress
assert_eq!(0, behaviour.pending_requests.len());
assert_eq!(1, behaviour.outstanding.len());
// Construct response with bogus ID
let response = {
let r = api::v1::light::RemoteCallResponse { proof: empty_proof() };
api::v1::light::Response {
id: 2365789,
response: Some(api::v1::light::response::Response::RemoteCallResponse(r)),
}
};
// Make sure our bogus ID is really not used.
assert!(!behaviour.outstanding.keys().any(|id| id == &response.id));
behaviour.inject_node_event(peer.clone(), Event::Response(response));
assert!(behaviour.peers.is_empty());
poll(&mut behaviour); // More progress
// The request should be back in the pending queue
assert_eq!(1, behaviour.pending_requests.len());
assert_eq!(0, behaviour.outstanding.len());
}
#[test]
fn disconnects_from_peer_on_incorrect_response() {
let peer = PeerId::random();
......@@ -1386,12 +1326,11 @@ mod tests {
let response = {
let r = api::v1::light::RemoteCallResponse { proof: empty_proof() };
api::v1::light::Response {
id: request_id,
response: Some(api::v1::light::response::Response::RemoteCallResponse(r)),
}
};
behaviour.inject_node_event(peer.clone(), Event::Response(response));
behaviour.inject_node_event(peer.clone(), Event::Response(request_id, response));
assert!(behaviour.peers.is_empty());
poll(&mut behaviour); // More progress
......@@ -1416,12 +1355,11 @@ mod tests {
let response = {
let r = api::v1::light::RemoteCallResponse { proof: empty_proof() };
api::v1::light::Response {
id: 2347895932,
response: Some(api::v1::light::response::Response::RemoteCallResponse(r)),
}
};
behaviour.inject_node_event(peer.clone(), Event::Response(response));
behaviour.inject_node_event(peer.clone(), Event::Response(2347895932, response));
assert!(behaviour.peers.is_empty());
poll(&mut behaviour);
......@@ -1459,12 +1397,11 @@ mod tests {
let response = {
let r = api::v1::light::RemoteReadResponse { proof: empty_proof() }; // Not a RemoteCallResponse!
api::v1::light::Response {
id: request_id,
response: Some(api::v1::light::response::Response::RemoteReadResponse(r)),
}
};
behaviour.inject_node_event(peer.clone(), Event::Response(response));
behaviour.inject_node_event(peer.clone(), Event::Response(request_id, response));
assert!(behaviour.peers.is_empty());
poll(&mut behaviour); // More progress
......@@ -1513,11 +1450,10 @@ mod tests {
let response = {
let r = api::v1::light::RemoteCallResponse { proof: empty_proof() };
api::v1::light::Response {
id: request_id,
response: Some(api::v1::light::response::Response::RemoteCallResponse(r))
}
};
behaviour.inject_node_event(responding_peer, Event::Response(response.clone()));
behaviour.inject_node_event(responding_peer, Event::Response(request_id, response.clone()));
assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::SendEvent { .. }));
assert_matches!(chan.1.try_recv(), Ok(None))
}
......@@ -1527,11 +1463,10 @@ mod tests {
let response = {
let r = api::v1::light::RemoteCallResponse { proof: empty_proof() };
api::v1::light::Response {
id: request_id,
response: Some(api::v1::light::response::Response::RemoteCallResponse(r)),
}
};
behaviour.inject_node_event(responding_peer, Event::Response(response));
behaviour.inject_node_event(responding_peer, Event::Response(request_id, response));
assert_matches!(poll(&mut behaviour), Poll::Pending);
assert_matches!(chan.1.try_recv(), Ok(Some(Err(ClientError::RemoteFetchFailed))))
}
......@@ -1551,28 +1486,24 @@ mod tests {
proof: empty_proof()
};
api::v1::light::Response {
id: 1,
response: Some(api::v1::light::response::Response::RemoteHeaderResponse(r)),
}
}
Request::Read{..} => {
let r = api::v1::light::RemoteReadResponse { proof: empty_proof() };
api::v1::light::Response {
id: 1,
response: Some(api::v1::light::response::Response::RemoteReadResponse(r)),
}
}
Request::ReadChild{..} => {
let r = api::v1::light::RemoteReadResponse { proof: empty_proof() };
api::v1::light::Response {
id: 1,
response: Some(api::v1::light::response::Response::RemoteReadResponse(r)),
}
}
Request::Call{..} => {
let r = api::v1::light::RemoteCallResponse { proof: empty_proof() };
api::v1::light::Response {
id: 1,
response: Some(api::v1::light::response::Response::RemoteCallResponse(r)),
}
}
......@@ -1584,7 +1515,6 @@ mod tests {
roots_proof: empty_proof()
};
api::v1::light::Response {
id: 1,
response: Some(api::v1::light::response::Response::RemoteChangesResponse(r)),
}
}
......@@ -1599,7 +1529,7 @@ mod tests {
assert_eq!(1, behaviour.outstanding.len());
assert_eq!(1, *behaviour.outstanding.keys().next().unwrap());
behaviour.inject_node_event(peer.clone(), Event::Response(response));
behaviour.inject_node_event(peer.clone(), Event::Response(1, response));
poll(&mut behaviour);
......
......@@ -14,31 +14,27 @@ enum Direction {
// Request block data from a peer.
message BlockRequest {
// Unique request id.
uint64 id = 1;
// Bits of block data to request.
uint32 fields = 2;
uint32 fields = 1;
// Start from this block.
oneof from_block {
// Start with given hash.
bytes hash = 3;
bytes hash = 2;
// Start with given block number.
bytes number = 4;
bytes number = 3;
}
// End at this block. An implementation defined maximum is used when unspecified.
bytes to_block = 5; // optional
bytes to_block = 4; // optional
// Sequence direction.
Direction direction = 6;
Direction direction = 5;
// Maximum number of blocks to return. An implementation defined maximum is used when unspecified.
uint32 max_blocks = 7; // optional
uint32 max_blocks = 6; // optional
}
// Response to `BlockRequest`
message BlockResponse {
// Id of a request this response was made for.
uint64 id = 1;
// Block data for the requested sequence.
repeated BlockData blocks = 2;
repeated BlockData blocks = 1;
}
// Block data sent in the response.
......
......@@ -14,26 +14,22 @@ message Pair {
// Enumerate all possible light client request messages.
message Request {
// Unique request id.
uint64 id = 1;
oneof request {
RemoteCallRequest remote_call_request = 2;
RemoteReadRequest remote_read_request = 3;
RemoteHeaderRequest remote_header_request = 4;
RemoteReadChildRequest remote_read_child_request = 5;
RemoteChangesRequest remote_changes_request = 6;
RemoteCallRequest remote_call_request = 1;
RemoteReadRequest remote_read_request = 2;
RemoteHeaderRequest remote_header_request = 3;
RemoteReadChildRequest remote_read_child_request = 4;
RemoteChangesRequest remote_changes_request = 5;
}
}
// Enumerate all possible light client response messages.
message Response {
/// Id of a request this response was made for.
uint64 id = 1;
oneof response {
RemoteCallResponse remote_call_response = 2;
RemoteReadResponse remote_read_response = 3;
RemoteHeaderResponse remote_header_response = 4;
RemoteChangesResponse remote_changes_response = 6;
RemoteCallResponse remote_call_response = 1;
RemoteReadResponse remote_read_response = 2;
RemoteHeaderResponse remote_header_response = 3;
RemoteChangesResponse remote_changes_response = 4;
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment