diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 25ff39ea8db9cd4f0021c2d8905fa0fadad01e6d..bbf67fcf18e1eceb7a4bfa417c24987da46f14d1 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -190,12 +190,16 @@ async fn determine_new_blocks( Ok(ancestry) } +// Sessions unavailable in state to cache. +#[derive(Debug)] +struct SessionsUnavailable; + async fn load_all_sessions( ctx: &mut impl SubsystemContext, block_hash: Hash, start: SessionIndex, end_inclusive: SessionIndex, -) -> SubsystemResult<Option<Vec<SessionInfo>>> { +) -> Result<Vec<SessionInfo>, SessionsUnavailable> { let mut v = Vec::new(); for i in start..=end_inclusive { let (tx, rx)= oneshot::channel(); @@ -214,22 +218,17 @@ async fn load_all_sessions( block_hash, ); - return Ok(None); + return Err(SessionsUnavailable); } - Ok(Err(e)) => return Err(SubsystemError::with_origin("approval-voting", e)), - Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)), + Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable), }; v.push(session_info); } - Ok(Some(v)) + Ok(v) } -// Sessions unavailable in state to cache. -#[derive(Debug)] -struct SessionsUnavailable; - // When inspecting a new import notification, updates the session info cache to match // the session of the imported block. // @@ -242,7 +241,7 @@ async fn cache_session_info_for_head( session_window: &mut RollingSessionWindow, block_hash: Hash, block_header: &Header, -) -> SubsystemResult<Result<(), SessionsUnavailable>> { +) -> Result<(), SessionsUnavailable> { let session_index = { let (s_tx, s_rx) = oneshot::channel(); @@ -254,9 +253,9 @@ async fn cache_session_info_for_head( RuntimeApiRequest::SessionIndexForChild(s_tx), ).into()).await; - match s_rx.await? { - Ok(s) => s, - Err(e) => return Err(SubsystemError::with_origin("approval-voting", e)), + match s_rx.await { + Ok(Ok(s)) => s, + Ok(Err(_)) | Err(_) => return Err(SessionsUnavailable), } }; @@ -271,17 +270,17 @@ async fn cache_session_info_for_head( window_start, session_index, ); - match load_all_sessions(ctx, block_hash, window_start, session_index).await? { - None => { + match load_all_sessions(ctx, block_hash, window_start, session_index).await { + Err(SessionsUnavailable) => { tracing::warn!( target: LOG_TARGET, "Could not load sessions {}..={} from block {:?} in session {}", window_start, session_index, block_hash, session_index, ); - return Ok(Err(SessionsUnavailable)); + return Err(SessionsUnavailable); }, - Some(s) => { + Ok(s) => { session_window.earliest_session = Some(window_start); session_window.session_info = s; } @@ -291,7 +290,7 @@ async fn cache_session_info_for_head( let latest = session_window.latest_session().expect("latest always exists if earliest does; qed"); // Either cached or ancient. - if session_index <= latest { return Ok(Ok(())) } + if session_index <= latest { return Ok(()) } let old_window_end = latest; @@ -311,17 +310,17 @@ async fn cache_session_info_for_head( latest + 1 }; - match load_all_sessions(ctx, block_hash, fresh_start, session_index).await? { - None => { + match load_all_sessions(ctx, block_hash, fresh_start, session_index).await { + Err(SessionsUnavailable) => { tracing::warn!( target: LOG_TARGET, "Could not load sessions {}..={} from block {:?} in session {}", latest + 1, session_index, block_hash, session_index, ); - return Ok(Err(SessionsUnavailable)); + return Err(SessionsUnavailable); } - Some(s) => { + Ok(s) => { let outdated = std::cmp::min(overlap_start as usize, session_window.session_info.len()); session_window.session_info.drain(..outdated); session_window.session_info.extend(s); @@ -335,7 +334,7 @@ async fn cache_session_info_for_head( } } - Ok(Ok(())) + Ok(()) } struct ImportedBlockInfo { @@ -539,7 +538,13 @@ pub(crate) async fn handle_new_head( match h_rx.await? { Err(e) => { - return Err(SubsystemError::with_origin("approval-voting", e)); + tracing::debug!( + target: LOG_TARGET, + "Chain API subsystem temporarily unreachable {}", + e, + ); + + return Ok(Vec::new()); } Ok(None) => { tracing::warn!(target: LOG_TARGET, "Missing header for new head {}", head); @@ -555,7 +560,7 @@ pub(crate) async fn handle_new_head( &mut state.session_window, head, &header, - ).await? + ).await { tracing::warn!( target: LOG_TARGET, @@ -582,13 +587,35 @@ pub(crate) async fn handle_new_head( let mut imported_candidates = Vec::with_capacity(new_blocks.len()); // `determine_new_blocks` gives us a vec in backwards order. we want to move forwards. - for (block_hash, block_header) in new_blocks.into_iter().rev() { - let env = ImportedBlockInfoEnv { - session_window: &state.session_window, - assignment_criteria: &*state.assignment_criteria, - keystore: &state.keystore, - }; + let imported_blocks_and_info = { + let mut imported_blocks_and_info = Vec::with_capacity(new_blocks.len()); + for (block_hash, block_header) in new_blocks.into_iter().rev() { + let env = ImportedBlockInfoEnv { + session_window: &state.session_window, + assignment_criteria: &*state.assignment_criteria, + keystore: &state.keystore, + }; + + match imported_block_info(ctx, env, block_hash, &block_header).await? { + Some(i) => imported_blocks_and_info.push((block_hash, block_header, i)), + None => { + // Such errors are likely spurious, but this prevents us from getting gaps + // in the approval-db. + tracing::warn!( + target: LOG_TARGET, + "Unable to gather info about imported block {:?}. Skipping chain.", + (block_hash, block_header.number), + ); + + return Ok(Vec::new()); + }, + }; + } + + imported_blocks_and_info + }; + for (block_hash, block_header, imported_block_info) in imported_blocks_and_info { let ImportedBlockInfo { included_candidates, session_index, @@ -596,10 +623,7 @@ pub(crate) async fn handle_new_head( n_validators, relay_vrf_story, slot, - } = match imported_block_info(ctx, env, block_hash, &block_header).await? { - Some(i) => i, - None => continue, - }; + } = imported_block_info; let session_info = state.session_window.session_info(session_index) .expect("imported_block_info requires session to be available; qed"); @@ -1772,7 +1796,7 @@ mod tests { &mut window, hash, &header, - ).await.unwrap().unwrap(); + ).await.unwrap(); assert_eq!(window.earliest_session, Some(expected_start_session)); assert_eq!( @@ -1953,7 +1977,7 @@ mod tests { &mut window, hash, &header, - ).await.unwrap(); + ).await; assert_matches!(res, Err(SessionsUnavailable)); }) @@ -2020,7 +2044,7 @@ mod tests { &mut window, hash, &header, - ).await.unwrap().unwrap(); + ).await.unwrap(); assert_eq!(window.earliest_session, Some(session)); assert_eq!( diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index d120c738288b1284c71dc46b26a4480d374d32c5..1183a141b87cc9d229a3fd0091a0ed2a2540fb4d 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -665,10 +665,10 @@ async fn handle_approved_ancestor( ctx.send_message(ChainApiMessage::BlockNumber(target, tx).into()).await; - match rx.await? { - Ok(Some(n)) => n, - Ok(None) => return Ok(None), - Err(_) => return Ok(None), + match rx.await { + Ok(Ok(Some(n))) => n, + Ok(Ok(None)) => return Ok(None), + Ok(Err(_)) | Err(_) => return Ok(None), } }; @@ -689,9 +689,9 @@ async fn handle_approved_ancestor( response_channel: tx, }.into()).await; - match rx.await? { - Ok(a) => a, - Err(_) => return Ok(None), + match rx.await { + Ok(Ok(a)) => a, + Err(_) | Ok(Err(_)) => return Ok(None), } } else { Vec::new() @@ -1406,11 +1406,18 @@ async fn launch_approval( ChainApiMessage::BlockNumber(candidate.descriptor.relay_parent, context_num_tx).into() ).await; - let in_context_number = match context_num_rx.await? - .map_err(|e| SubsystemError::with_origin("chain-api", e))? - { - Some(n) => n, - None => return Ok(()), + let in_context_number = match context_num_rx.await { + Ok(Ok(Some(n))) => n, + Ok(Ok(None)) | Ok(Err(_)) | Err(_) => { + tracing::warn!( + target: LOG_TARGET, + "Could not launch approval work for candidate {:?}: Number of block {} unknown", + (candidate_hash, candidate.descriptor.para_id), + candidate.descriptor.relay_parent, + ); + + return Ok(()); + } }; ctx.send_message(