Unverified Commit ffedeab4 authored by Andronik Ordian's avatar Andronik Ordian Committed by GitHub
Browse files

overseer: send_msg should not return an error (#1995)



* send_message should not return an error

* Apply suggestions from code review
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* s/send_logging_error/send_and_log_error
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
parent 7ac03537
Pipeline #114748 passed with stages
in 29 minutes and 31 seconds
......@@ -95,10 +95,7 @@ impl CollationGenerationSubsystem {
},
msg = receiver.next().fuse() => {
if let Some(msg) = msg {
if let Err(err) = ctx.send_message(msg).await {
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to forward message to overseer");
break;
}
ctx.send_message(msg).await;
}
},
}
......
......@@ -690,7 +690,7 @@ where
RuntimeApiRequest::CandidateEvents(tx),
));
ctx.send_message(msg.into()).await?;
ctx.send_message(msg.into()).await;
Ok(rx.await??)
}
......@@ -858,7 +858,7 @@ where
{
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await?;
ctx.send_message(AllMessages::ChainApi(ChainApiMessage::BlockNumber(block_hash, tx))).await;
Ok(rx.await??.map(|number| number).unwrap_or_default())
}
......
......@@ -171,7 +171,7 @@ async fn runtime_api_request<T>(
relay_parent,
request,
))
).await?;
).await;
receiver.await.map_err(Into::into)
}
......
......@@ -143,13 +143,13 @@ where
let (sender, receiver) = futures::channel::oneshot::channel();
overseer.wait_for_activation(parent_header_hash, sender).await?;
overseer.wait_for_activation(parent_header_hash, sender).await;
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)??;
let (sender, receiver) = futures::channel::oneshot::channel();
overseer.send_msg(AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent_header_hash, sender),
)).await?;
)).await;
let mut timeout = futures_timer::Delay::new(PROPOSE_TIMEOUT).fuse();
......
......@@ -56,62 +56,40 @@ const LOG_TARGET: &'static str = "availability_distribution";
#[derive(Debug, Error)]
enum Error {
#[error("Sending PendingAvailability query failed")]
QueryPendingAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain PendingAvailability failed")]
QueryPendingAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain PendingAvailability failed")]
QueryPendingAvailability(#[source] RuntimeApiError),
#[error("Sending StoreChunk query failed")]
StoreChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain StoreChunk failed")]
StoreChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Sending QueryChunk query failed")]
QueryChunkSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryChunk failed")]
QueryChunkResponseChannel(#[source] oneshot::Canceled),
#[error("Sending QueryAncestors query failed")]
QueryAncestorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryAncestors failed")]
QueryAncestorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryAncestors failed")]
QueryAncestors(#[source] ChainApiError),
#[error("Sending QuerySession query failed")]
QuerySessionSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QuerySession failed")]
QuerySessionResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QuerySession failed")]
QuerySession(#[source] RuntimeApiError),
#[error("Sending QueryValidators query failed")]
QueryValidatorsSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain QueryValidators failed")]
QueryValidatorsResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain QueryValidators failed")]
QueryValidators(#[source] RuntimeApiError),
#[error("Sending AvailabilityCores query failed")]
AvailabilityCoresSendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
AvailabilityCoresResponseChannel(#[source] oneshot::Canceled),
#[error("RuntimeAPI to obtain AvailabilityCores failed")]
AvailabilityCores(#[source] RuntimeApiError),
#[error("Sending AvailabilityCores query failed")]
QueryAvailabilitySendQuery(#[source] SubsystemError),
#[error("Response channel to obtain AvailabilityCores failed")]
QueryAvailabilityResponseChannel(#[source] oneshot::Canceled),
#[error("Sending out a peer report message")]
ReportPeerMessageSend(#[source] SubsystemError),
#[error("Sending a gossip message")]
TrackedGossipMessage(#[source] SubsystemError),
#[error("Receive channel closed")]
IncomingMessageChannel(#[source] SubsystemError),
}
......@@ -290,7 +268,7 @@ impl ProtocolState {
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn remove_relay_parent(&mut self, relay_parent: &Hash) -> Result<()> {
fn remove_relay_parent(&mut self, relay_parent: &Hash) {
// we might be ancestor of some other relay_parent
if let Some(ref mut descendants) = self.ancestry.get_mut(relay_parent) {
// if we were the last user, and it is
......@@ -324,7 +302,6 @@ impl ProtocolState {
}
}
}
Ok(())
}
}
......@@ -351,7 +328,7 @@ where
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
handle_peer_view_change(ctx, state, peerid, view, metrics).await?;
handle_peer_view_change(ctx, state, peerid, view, metrics).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(ctx, keystore, state, view, metrics).await?;
......@@ -472,14 +449,14 @@ where
};
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message)
.await?;
.await;
}
}
// cleanup the removed relay parents and their states
let removed = old_view.difference(&view).collect::<Vec<_>>();
for removed in removed {
state.remove_relay_parent(&removed)?;
state.remove_relay_parent(&removed);
}
Ok(())
}
......@@ -491,7 +468,7 @@ async fn send_tracked_gossip_message_to_peers<Context>(
metrics: &Metrics,
peers: Vec<PeerId>,
message: AvailabilityGossipMessage,
) -> Result<()>
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
......@@ -506,7 +483,7 @@ async fn send_tracked_gossip_messages_to_peer<Context>(
metrics: &Metrics,
peer: PeerId,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()>
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
......@@ -521,12 +498,12 @@ async fn send_tracked_gossip_messages_to_peers<Context>(
metrics: &Metrics,
peers: Vec<PeerId>,
message_iter: impl IntoIterator<Item = AvailabilityGossipMessage>,
) -> Result<()>
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
if peers.is_empty() {
return Ok(());
return;
}
for message in message_iter {
for peer in peers.iter() {
......@@ -553,13 +530,10 @@ where
protocol_v1::ValidationProtocol::AvailabilityDistribution(wire_message),
),
))
.await
.map_err(|e| Error::TrackedGossipMessage(e))?;
.await;
metrics.on_chunk_distributed();
}
Ok(())
}
// Send the difference between two views which were not sent
......@@ -571,7 +545,7 @@ async fn handle_peer_view_change<Context>(
origin: PeerId,
view: View,
metrics: &Metrics,
) -> Result<()>
)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
......@@ -616,9 +590,8 @@ where
.collect::<HashSet<_>>();
send_tracked_gossip_messages_to_peer(ctx, per_candidate, metrics, origin.clone(), messages)
.await?;
.await;
}
Ok(())
}
/// Obtain the first key which has a signing key.
......@@ -662,7 +635,8 @@ where
let live_candidate = if let Some(live_candidate) = live_candidates.get(&message.candidate_hash) {
live_candidate
} else {
return modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
modify_reputation(ctx, origin, COST_NOT_A_LIVE_CANDIDATE).await;
return Ok(());
};
// check the merkle proof
......@@ -674,12 +648,14 @@ where
) {
hash
} else {
return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
};
let erasure_chunk_hash = BlakeTwo256::hash(&message.erasure_chunk.chunk);
if anticipated_hash != erasure_chunk_hash {
return modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
modify_reputation(ctx, origin, COST_MERKLE_PROOF_INVALID).await;
return Ok(());
}
// an internal unique identifier of this message
......@@ -695,7 +671,8 @@ where
.entry(origin.clone())
.or_default();
if received_set.contains(&message_id) {
return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return Ok(());
} else {
received_set.insert(message_id.clone());
}
......@@ -707,9 +684,9 @@ where
.insert(message_id.1, message.clone())
.is_some()
{
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
} else {
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await?;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await;
// save the chunk for our index
if let Some(validator_index) = per_candidate.validator_index {
......@@ -762,7 +739,8 @@ where
.collect::<Vec<_>>();
// gossip that message to interested peers
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await
send_tracked_gossip_message_to_peers(ctx, per_candidate, metrics, peers, message).await;
Ok(())
}
/// The bitfield distribution subsystem.
......@@ -947,8 +925,7 @@ where
relay_parent,
RuntimeApiRequest::AvailabilityCores(tx),
)))
.await
.map_err(|e| Error::AvailabilityCoresSendQuery(e))?;
.await;
let all_para_ids: Vec<_> = rx
.await
......@@ -970,7 +947,7 @@ where
/// Modify the reputation of a peer based on its behavior.
#[tracing::instrument(level = "trace", skip(ctx), fields(subsystem = LOG_TARGET))]
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()>
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep)
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
......@@ -982,9 +959,7 @@ where
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
))
.await
.map_err(|e| Error::ReportPeerMessageSend(e))
)).await;
}
/// Query the proof of validity for a particular candidate hash.
......@@ -996,9 +971,8 @@ where
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryDataAvailability(candidate_hash, tx),
))
.await
.map_err(|e| Error::QueryAvailabilitySendQuery(e))?;
)).await;
rx.await
.map_err(|e| Error::QueryAvailabilityResponseChannel(e))
}
......@@ -1015,9 +989,8 @@ where
let (tx, rx) = oneshot::channel();
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx),
))
.await
.map_err(|e| Error::QueryChunkSendQuery(e))?;
)).await;
rx.await.map_err(|e| Error::QueryChunkResponseChannel(e))
}
......@@ -1033,17 +1006,15 @@ where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let (tx, rx) = oneshot::channel();
ctx.send_message(
AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
)).await
.map_err(|e| Error::StoreChunkSendQuery(e))?;
ctx.send_message(AllMessages::AvailabilityStore(
AvailabilityStoreMessage::StoreChunk {
candidate_hash,
relay_parent,
validator_index,
chunk: erasure_chunk,
tx,
}
)).await;
rx.await.map_err(|e| Error::StoreChunkResponseChannel(e))
}
......@@ -1062,9 +1033,7 @@ where
ctx.send_message(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidatePendingAvailability(para, tx),
)))
.await
.map_err(|e| Error::QueryPendingAvailabilitySendQuery(e))?;
))).await;
rx.await
.map_err(|e| Error::QueryPendingAvailabilityResponseChannel(e))?
......@@ -1087,8 +1056,7 @@ where
));
ctx.send_message(query_validators)
.await
.map_err(|e| Error::QueryValidatorsSendQuery(e))?;
.await;
rx.await
.map_err(|e| Error::QueryValidatorsResponseChannel(e))?
.map_err(|e| Error::QueryValidators(e))
......@@ -1112,8 +1080,7 @@ where
});
ctx.send_message(query_ancestors)
.await
.map_err(|e| Error::QueryAncestorsSendQuery(e))?;
.await;
rx.await
.map_err(|e| Error::QueryAncestorsResponseChannel(e))?
.map_err(|e| Error::QueryAncestors(e))
......@@ -1135,8 +1102,7 @@ where
));
ctx.send_message(query_session_idx_for_child)
.await
.map_err(|e| Error::QuerySessionSendQuery(e))?;
.await;
rx.await
.map_err(|e| Error::QuerySessionResponseChannel(e))?
.map_err(|e| Error::QuerySession(e))
......
......@@ -163,24 +163,20 @@ impl BitfieldDistribution {
msg: BitfieldDistributionMessage::DistributeBitfield(hash, signed_availability),
} => {
tracing::trace!(target: LOG_TARGET, "Processing DistributeBitfield");
if let Err(err) = handle_bitfield_distribution(
handle_bitfield_distribution(
&mut ctx,
&mut state,
&self.metrics,
hash,
signed_availability,
).await {
tracing::warn!(target: LOG_TARGET, err = ?err, "Failed to reply to `DistributeBitfield` message");
}
).await;
}
FromOverseer::Communication {
msg: BitfieldDistributionMessage::NetworkBridgeUpdateV1(event),
} => {
tracing::trace!(target: LOG_TARGET, "Processing NetworkMessage");
// a network message was received
if let Err(e) = handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "Failed to handle incoming network messages");
}
handle_network_msg(&mut ctx, &mut state, &self.metrics, event).await;
}
FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated })) => {
let _timer = self.metrics.time_active_leaves_update();
......@@ -234,7 +230,7 @@ async fn modify_reputation<Context>(
ctx: &mut Context,
peer: PeerId,
rep: ReputationChange,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
......@@ -255,7 +251,7 @@ async fn handle_bitfield_distribution<Context>(
metrics: &Metrics,
relay_parent: Hash,
signed_availability: SignedAvailabilityBitfield,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
......@@ -272,12 +268,12 @@ where
"Not supposed to work on relay parent related data",
);
return Ok(());
return;
};
let validator_set = &job_data.validator_set;
if validator_set.is_empty() {
tracing::trace!(target: LOG_TARGET, relay_parent = %relay_parent, "validator set is empty");
return Ok(());
return;
}
let validator_index = signed_availability.validator_index() as usize;
......@@ -285,7 +281,7 @@ where
validator.clone()
} else {
tracing::trace!(target: LOG_TARGET, "Could not find a validator for index {}", validator_index);
return Ok(());
return;
};
let peer_views = &mut state.peer_views;
......@@ -294,11 +290,9 @@ where
signed_availability,
};
relay_message(ctx, job_data, peer_views, validator, msg).await?;
relay_message(ctx, job_data, peer_views, validator, msg).await;
metrics.on_own_bitfield_gossipped();
Ok(())
}
/// Distribute a given valid and signature checked bitfield message.
......@@ -311,7 +305,7 @@ async fn relay_message<Context>(
peer_views: &mut HashMap<PeerId, View>,
validator: ValidatorId,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
......@@ -325,7 +319,7 @@ where
),
),
))
.await?;
.await;
let message_sent_to_peer = &mut (job_data.message_sent_to_peer);
......@@ -361,9 +355,8 @@ where
message.into_validation_protocol(),
),
))
.await?;
.await;
}
Ok(())
}
/// Handle an incoming message from a peer.
......@@ -374,13 +367,14 @@ async fn process_incoming_peer_message<Context>(
metrics: &Metrics,
origin: PeerId,
message: BitfieldGossipMessage,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
// we don't care about this, not part of our view.
if !state.view.contains(&message.relay_parent) {
return modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
return;
}
// Ignore anything the overseer did not tell this subsystem to work on.
......@@ -388,7 +382,8 @@ where
let job_data: &mut _ = if let Some(ref mut job_data) = job_data {
job_data
} else {
return modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
modify_reputation(ctx, origin, COST_NOT_IN_VIEW).await;
return;
};
let validator_set = &job_data.validator_set;
......@@ -398,7 +393,8 @@ where
relay_parent = %message.relay_parent,
"Validator set is empty",
);
return modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
modify_reputation(ctx, origin, COST_MISSING_PEER_SESSION_KEY).await;
return;
}
// Use the (untrusted) validator index provided by the signed payload
......@@ -408,7 +404,8 @@ where
let validator = if let Some(validator) = validator_set.get(validator_index) {
validator.clone()
} else {
return modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
modify_reputation(ctx, origin, COST_VALIDATOR_INDEX_INVALID).await;
return;
};
// Check if the peer already sent us a message for the validator denoted in the message earlier.
......@@ -422,7 +419,8 @@ where
if !received_set.contains(&validator) {
received_set.insert(validator.clone());
} else {
return modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
modify_reputation(ctx, origin, COST_PEER_DUPLICATE_MESSAGE).await;
return;
};
if message
......@@ -440,12 +438,12 @@ where
validator_index,
"already received a message for validator",
);
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await?;
return Ok(());
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE).await;
return;
}
one_per_validator.insert(validator.clone(), message.clone());
relay_message(ctx, job_data, &mut state.peer_views, validator, message).await?;
relay_message(ctx, job_data, &mut state.peer_views, validator, message).await;
modify_reputation(ctx, origin, BENEFIT_VALID_MESSAGE_FIRST).await
} else {
......@@ -461,7 +459,7 @@ async fn handle_network_msg<Context>(
state: &mut ProtocolState,
metrics: &Metrics,
bridge_message: NetworkBridgeEvent<protocol_v1::BitfieldDistributionMessage>,
) -> SubsystemResult<()>
)
where
Context: SubsystemContext<Message = BitfieldDistributionMessage>,
{
......@@ -477,10 +475,10 @@ where
state.peer_views.remove(&peerid);
}
NetworkBridgeEvent::PeerViewChange(peerid, view) => {
handle_peer_view_change(ctx, state, peerid, view).await?;
handle_peer_view_change(ctx, state, peerid, view).await;
}
NetworkBridgeEvent::OurViewChange(view) => {
handle_our_view_change(state, view)?;
handle_our_view_change(state, view);
}
NetworkBridgeEvent::PeerMessage(remote, message) => {
match message {
......@@ -490,17 +488,16 @@ where
relay_parent,
signed_availability: bitfield,
};
process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await?;
process_incoming_peer_message(ctx, state, metrics, remote, gossiped_bitfield).await;