Unverified Commit b938a8ae authored by Arkadiy Paronyan's avatar Arkadiy Paronyan Committed by GitHub
Browse files

Additional logging for polkadot network protocols (#2684)



* Additional logging for polkadot network protocols

* Additional log

* Update node/network/bitfield-distribution/src/lib.rs

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>

* Update node/network/availability-distribution/src/responder.rs

* Added additional chunk info

* Added additional peer info

Co-authored-by: asynchronous rob's avatarRobert Habermeier <rphmeier@gmail.com>
parent b125e1f9
Pipeline #130251 canceled with stages
in 7 minutes and 22 seconds
......@@ -150,11 +150,22 @@ impl State {
event: NetworkBridgeEvent<protocol_v1::ApprovalDistributionMessage>,
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, _role) => {
NetworkBridgeEvent::PeerConnected(peer_id, role) => {
// insert a blank view if none already present
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?role,
"Peer connected",
);
self.peer_views.entry(peer_id).or_default();
}
NetworkBridgeEvent::PeerDisconnected(peer_id) => {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
"Peer disconnected",
);
self.peer_views.remove(&peer_id);
self.blocks.iter_mut().for_each(|(_hash, entry)| {
entry.known_by.remove(&peer_id);
......@@ -164,6 +175,11 @@ impl State {
self.handle_peer_view_change(ctx, peer_id, view).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
?view,
"Own view change",
);
for head in view.iter() {
if !self.blocks.contains_key(head) {
self.pending_known.entry(*head).or_default();
......@@ -329,6 +345,11 @@ impl State {
peer_id: PeerId,
view: View,
) {
tracing::trace!(
target: LOG_TARGET,
?view,
"Peer view change",
);
Self::unify_with_peer(&mut self.blocks, ctx, peer_id.clone(), view.clone()).await;
let finalized_number = view.finalized_number;
let old_view = self.peer_views.insert(peer_id.clone(), view);
......@@ -469,7 +490,7 @@ impl State {
modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await;
tracing::info!(
target: LOG_TARGET,
peer = ?peer_id,
?peer_id,
"Got a bad assignment from peer",
);
return;
......@@ -635,7 +656,7 @@ impl State {
modify_reputation(ctx, peer_id, COST_INVALID_MESSAGE).await;
tracing::info!(
target: LOG_TARGET,
peer = ?peer_id,
?peer_id,
"Got a bad approval from peer",
);
return;
......@@ -705,7 +726,7 @@ impl State {
if !peers.is_empty() {
tracing::trace!(
target: LOG_TARGET,
"Sending approval (block={}, index={})to {} peers",
"Sending approval (block={}, index={}) to {} peers",
block_hash,
candidate_index,
peers.len(),
......@@ -881,7 +902,6 @@ impl ApprovalDistribution {
FromOverseer::Communication {
msg: ApprovalDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
tracing::debug!(target: LOG_TARGET, "Processing network message");
state.handle_network_msg(&mut ctx, &self.metrics, event).await;
}
FromOverseer::Communication {
......
......@@ -101,6 +101,11 @@ impl Requester {
where
Context: SubsystemContext,
{
tracing::trace!(
target: LOG_TARGET,
?update,
"Update fetching heads"
);
let ActiveLeavesUpdate {
activated,
deactivated,
......@@ -126,6 +131,11 @@ impl Requester {
Err(err) => return Ok(Some(err)),
Ok(cores) => cores,
};
tracing::trace!(
target: LOG_TARGET,
occupied_cores = ?cores,
"Query occupied core"
);
if let Some(err) = self.add_cores(ctx, leaf, cores).await? {
return Ok(Some(err));
}
......
......@@ -74,6 +74,15 @@ where
let result = chunk.is_some();
tracing::trace!(
target: LOG_TARGET,
hash = ?req.payload.candidate_hash,
index = ?req.payload.index,
peer = ?req.peer,
has_data = ?chunk.is_some(),
"Serving chunk",
);
let response = match chunk {
None => v1::AvailabilityFetchingResponse::NoSuchChunk,
Some(chunk) => v1::AvailabilityFetchingResponse::Chunk(chunk.into()),
......@@ -99,5 +108,14 @@ where
))
.await;
rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
rx.await.map_err(|e| {
tracing::trace!(
target: LOG_TARGET,
?validator_index,
?candidate_hash,
error = ?e,
"Error retrieving chunk",
);
Error::QueryChunkResponseChannel(e)
})
}
......@@ -228,6 +228,12 @@ impl RequestFromBackersPhase {
params: &InteractionParams,
to_state: &mut mpsc::Sender<FromInteraction>
) -> Result<bool, mpsc::SendError> {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Requesting from backers",
);
loop {
// Pop the next backer, and proceed to next phase if we're out.
let validator_index = match self.shuffled_backers.pop() {
......@@ -252,8 +258,19 @@ impl RequestFromBackersPhase {
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
).await?;
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
"Received full data",
);
return Ok(true);
} else {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
validator = ?peer_id,
"Invalid data response",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_INVALID_AVAILABLE_DATA,
......@@ -264,12 +281,14 @@ impl RequestFromBackersPhase {
tracing::debug!(
target: LOG_TARGET,
err = ?e,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
"A response channel was cancelled while waiting for full data",
);
}
None => {
tracing::debug!(
target: LOG_TARGET,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
"A full data request has timed out",
);
}
......@@ -298,7 +317,13 @@ impl RequestChunksPhase {
while self.requesting_chunks.len() < N_PARALLEL {
if let Some(validator_index) = self.shuffling.pop() {
let (tx, rx) = oneshot::channel();
tracing::trace!(
target: LOG_TARGET,
validator = ?params.validator_authority_keys[validator_index.0 as usize],
?validator_index,
candidate_hash = ?params.candidate_hash,
"Requesting chunk",
);
to_state.send(FromInteraction::MakeChunkRequest(
params.validator_authority_keys[validator_index.0 as usize].clone(),
params.candidate_hash.clone(),
......@@ -335,6 +360,13 @@ impl RequestChunksPhase {
// We need to check that the validator index matches the chunk index and
// not blindly trust the data from an untrusted peer.
if validator_index != chunk.index {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
chunk_index = ?chunk.index,
"Index mismatch",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
......@@ -352,14 +384,32 @@ impl RequestChunksPhase {
let erasure_chunk_hash = BlakeTwo256::hash(&chunk.chunk);
if erasure_chunk_hash != anticipated_hash {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Merkle proof mismatch",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
)).await?;
} else {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Received valid Merkle proof",
);
self.received_chunks.insert(validator_index, chunk);
}
} else {
tracing::debug!(
target: LOG_TARGET,
validator = ?peer_id,
?validator_index,
"Invalid Merkle proof",
);
to_state.send(FromInteraction::ReportPeer(
peer_id.clone(),
COST_MERKLE_PROOF_INVALID,
......@@ -397,6 +447,15 @@ impl RequestChunksPhase {
self.shuffling.len(),
params.threshold,
) {
tracing::debug!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
received = %self.received_chunks.len(),
requesting = %self.requesting_chunks.len(),
n_validators = %self.shuffling.len(),
"Data recovery is not possible",
);
to_state.send(FromInteraction::Concluded(
params.candidate_hash,
Err(RecoveryError::Unavailable),
......@@ -419,18 +478,39 @@ impl RequestChunksPhase {
) {
Ok(data) => {
if reconstructed_data_matches_root(params.validators.len(), &params.erasure_root, &data) {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Data recovery complete",
);
FromInteraction::Concluded(params.candidate_hash.clone(), Ok(data))
} else {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
"Data recovery - root mismatch",
);
FromInteraction::Concluded(
params.candidate_hash.clone(),
Err(RecoveryError::Invalid),
)
}
}
Err(_) => FromInteraction::Concluded(
params.candidate_hash.clone(),
Err(RecoveryError::Invalid),
),
Err(err) => {
tracing::trace!(
target: LOG_TARGET,
candidate_hash = ?params.candidate_hash,
erasure_root = ?params.erasure_root,
?err,
"Data recovery error ",
);
FromInteraction::Concluded(
params.candidate_hash.clone(),
Err(RecoveryError::Invalid),
)
},
};
to_state.send(concluded).await?;
......@@ -871,6 +951,11 @@ async fn handle_network_update(
protocol_v1::AvailabilityRecoveryMessage::Chunk(request_id, chunk) => {
match state.live_requests.remove(&request_id) {
None => {
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected chunk response",
);
// If there doesn't exist one, report the peer and return.
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
......@@ -898,6 +983,11 @@ async fn handle_network_update(
}
}
Some(a) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected chunk response",
);
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
state.live_requests.insert(request_id, a);
......@@ -940,6 +1030,11 @@ async fn handle_network_update(
match state.live_requests.remove(&request_id) {
None => {
// If there doesn't exist one, report the peer and return.
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected full data response",
);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
Some((peer_id, Awaited::FullData(awaited))) if peer_id == peer => {
......@@ -965,6 +1060,11 @@ async fn handle_network_update(
Some(a) => {
// If the peer in the entry doesn't match the sending peer,
// reinstate the entry, report the peer, and return
tracing::debug!(
target: LOG_TARGET,
?peer,
"Received unexpected full data response",
);
state.live_requests.insert(request_id, a);
report_peer(ctx, peer, COST_UNEXPECTED_CHUNK).await;
}
......@@ -1049,6 +1149,12 @@ async fn handle_validator_connected(
authority_id: AuthorityDiscoveryId,
peer_id: PeerId,
) -> error::Result<()> {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?authority_id,
"Validator connected",
);
if let Some(discovering) = state.discovering_validators.remove(&authority_id) {
for awaited in discovering {
issue_request(state, ctx, peer_id.clone(), awaited).await?;
......
......@@ -169,7 +169,11 @@ impl BitfieldDistribution {
FromOverseer::Communication {
msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
} => {
tracing::trace!(target: LOG_TARGET, "Processing DistributeBitfield");
tracing::trace!(
target: LOG_TARGET,
?hash,
"Processing DistributeBitfield"
);
handle_bitfield_distribution(
&mut ctx,
&mut state,
......@@ -235,7 +239,7 @@ async fn modify_reputation<Context>(
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
tracing::trace!(target: LOG_TARGET, rep = ?rep, peer_id = %peer, "reputation change");
tracing::trace!(target: LOG_TARGET, ?rep, peer_id = %peer, "reputation change");
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
......@@ -410,6 +414,7 @@ where
tracing::trace!(
target: LOG_TARGET,
relay_parent = %message.relay_parent,
?origin,
"Validator set is empty",
);
modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
......@@ -438,6 +443,12 @@ where
if !received_set.contains(&validator) {
received_set.insert(validator.clone());
} else {
tracing::trace!(
target: LOG_TARGET,
validator_index,
?origin,
"Duplicate message",
);
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return;
};
......@@ -485,24 +496,51 @@ where
let _timer = metrics.time_handle_network_msg();
match bridge_message {
NetworkBridgeEvent::PeerConnected(peerid, _role) => {
NetworkBridgeEvent::PeerConnected(peerid, role) => {
tracing::trace!(
target: LOG_TARGET,
?peerid,
?role,
"Peer connected",
);
// insert if none already present
state.peer_views.entry(peerid).or_default();
}
NetworkBridgeEvent::PeerDisconnected(peerid) => {
tracing::trace!(
target: LOG_TARGET,
?peerid,
"Peer disconnected",
);
// get rid of superfluous data
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
tracing::trace!(
target: LOG_TARGET,
?peerid,
?view,
"Peer view change",
);
handle_peer_view_change(ctx, state, peerid, view).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
?view,
"Our view change",
);
handle_our_view_change(state, view);
}
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
protocol_v1::BitfieldDistributionMessage::Bitfield(relay_parent, bitfield) => {
tracing::trace!(target: LOG_TARGET, peer_id = %remote, "received bitfield gossip from peer");
tracing::trace!(
target: LOG_TARGET,
peer_id = %remote,
?relay_parent,
"received bitfield gossip from peer"
);
let gossiped_bitfield = BitfieldGossipMessage {
relay_parent,
signed_availability: bitfield,
......@@ -601,6 +639,13 @@ where
};
let _span = job_data.span.child("gossip");
tracing::trace!(
target: LOG_TARGET,
?dest,
?validator,
relay_parent = ?message.relay_parent,
"Sending gossip message"
);
job_data.message_sent_to_peer
.entry(dest.clone())
......
......@@ -320,6 +320,17 @@ async fn distribute_collation(
return Ok(());
}
tracing::debug!(
target: LOG_TARGET,
para_id = %id,
relay_parent = %relay_parent,
candidate_hash = ?receipt.hash(),
pov_hash = ?pov.hash(),
core = ?our_core,
?current_validators,
?next_validators,
"Accepted collation, connecting to validators."
);
// Issue a discovery request for the validators of the current group and the next group.
connect_to_validators(
ctx,
......@@ -626,11 +637,20 @@ async fn send_collation(
pov: PoV,
) {
let pov = match CompressedPoV::compress(&pov) {
Ok(pov) => pov,
Ok(compressed) => {
tracing::trace!(
target: LOG_TARGET,
size = %pov.block_data.0.len(),
compressed = %compressed.len(),
peer_id = ?request.peer,
"Sending collation."
);
compressed
},
Err(error) => {
tracing::error!(
target: LOG_TARGET,
error = ?error,
?error,
"Failed to create `CompressedPov`",
);
return
......@@ -659,12 +679,14 @@ async fn handle_incoming_peer_message(
Declare(_) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
"Declare message is not expected on the collator side of the protocol",
);
}
AdvertiseCollation(_, _) => {
tracing::warn!(
target: LOG_TARGET,
?origin,
"AdvertiseCollation message is not expected on the collator side of the protocol",
);
}
......@@ -672,10 +694,17 @@ async fn handle_incoming_peer_message(
if !matches!(statement.payload(), Statement::Seconded(_)) {
tracing::warn!(
target: LOG_TARGET,
statement = ?statement,
?statement,
?origin,
"Collation seconded message received with none-seconded statement.",
);
} else if let Some(sender) = state.collation_result_senders.remove(&statement.payload().candidate_hash()) {
tracing::trace!(
target: LOG_TARGET,
?statement,
?origin,
"received a `CollationSeconded`",
);
let _ = sender.send(statement);
}
}
......@@ -744,18 +773,40 @@ async fn handle_network_msg(
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(_peer_id, _observed_role) => {
PeerConnected(peer_id, observed_role) => {
// If it is possible that a disconnected validator would attempt a reconnect
// it should be handled here.
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?observed_role,
"Peer connected",
);
}
PeerViewChange(peer_id, view) => {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
?view,
"Peer view change",
);
handle_peer_view_change(ctx, state, peer_id, view).await;
}
PeerDisconnected(peer_id) => {
tracing::trace!(
target: LOG_TARGET,
?peer_id,
"Peer disconnected",
);
state.peer_views.remove(&peer_id);
state.declared_at.remove(&peer_id);
}
OurViewChange(view) => {
tracing::trace!(
target: LOG_TARGET,
?view,
"Own view change",
);
handle_our_view_change(state, view).await?;
}
PeerMessage(remote, msg) => {
......@@ -1078,7 +1129,7 @@ mod tests {
overseer: &mut test_helpers::TestSubsystemContextHandle<CollatorProtocolMessage>,
msg: CollatorProtocolMessage,
) {
tracing::trace!(msg = ?msg, "sending message");
tracing::trace!(?msg, "sending message");
overseer
.send(FromOverseer::Communication { msg })
.timeout(TIMEOUT)
......@@ -1093,7 +1144,7 @@ mod tests {
.await
.expect(&format!("{:?} is more than enough to receive messages", TIMEOUT));
tracing::trace!(msg = ?msg, "received message");
tracing::trace!(?msg, "received message");
msg
}
......
......@@ -230,6 +230,13 @@ async fn notify_all_we_are_awaiting(