// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! The provisioner is responsible for assembling a relay chain block //! from a set of available parachain candidates of its choice. use bitvec::vec::BitVec; use futures::{ channel::{mpsc, oneshot}, prelude::*, }; use polkadot_node_subsystem::{ errors::ChainApiError, messages::{ AllMessages, ChainApiMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, RuntimeApiMessage, }, util::{self, request_availability_cores, JobTrait, ToJobTrait}, }; use polkadot_primitives::v1::{ BackedCandidate, BlockNumber, CoreState, Hash, SignedAvailabilityBitfield, }; use std::{ collections::{HashMap, HashSet}, convert::TryFrom, pin::Pin, }; pub struct ProvisioningJob { relay_parent: Hash, sender: mpsc::Sender, receiver: mpsc::Receiver, provisionable_data_channels: Vec>, backed_candidates: Vec, signed_bitfields: Vec, } pub enum ToJob { Provisioner(ProvisionerMessage), Stop, } impl ToJobTrait for ToJob { const STOP: Self = Self::Stop; fn relay_parent(&self) -> Option { match self { Self::Provisioner(pm) => pm.relay_parent(), Self::Stop => None, } } } impl TryFrom for ToJob { type Error = (); fn try_from(msg: AllMessages) -> Result { match msg { AllMessages::Provisioner(pm) => Ok(Self::Provisioner(pm)), _ => Err(()), } } } impl From for ToJob { fn from(pm: ProvisionerMessage) -> Self { Self::Provisioner(pm) } } pub enum FromJob { ChainApi(ChainApiMessage), Runtime(RuntimeApiMessage), } impl From for AllMessages { fn from(from_job: FromJob) -> AllMessages { match from_job { FromJob::ChainApi(cam) => AllMessages::ChainApi(cam), FromJob::Runtime(ram) => AllMessages::RuntimeApi(ram), } } } impl TryFrom for FromJob { type Error = (); fn try_from(msg: AllMessages) -> Result { match msg { AllMessages::ChainApi(chain) => Ok(FromJob::ChainApi(chain)), AllMessages::RuntimeApi(runtime) => Ok(FromJob::Runtime(runtime)), _ => Err(()), } } } #[derive(Debug, derive_more::From)] pub enum Error { #[from] Sending(mpsc::SendError), #[from] Util(util::Error), #[from] OneshotRecv(oneshot::Canceled), #[from] ChainApi(ChainApiError), OneshotSend, } impl JobTrait for ProvisioningJob { type ToJob = ToJob; type FromJob = FromJob; type Error = Error; type RunArgs = (); const NAME: &'static str = "ProvisioningJob"; /// Run a job for the parent block indicated // // this function is in charge of creating and executing the job's main loop fn run( relay_parent: Hash, _run_args: Self::RunArgs, receiver: mpsc::Receiver, sender: mpsc::Sender, ) -> Pin> + Send>> { async move { let job = ProvisioningJob::new(relay_parent, sender, receiver); // it isn't necessary to break run_loop into its own function, // but it's convenient to separate the concerns in this way job.run_loop().await } .boxed() } } impl ProvisioningJob { pub fn new( relay_parent: Hash, sender: mpsc::Sender, receiver: mpsc::Receiver, ) -> Self { Self { relay_parent, sender, receiver, provisionable_data_channels: Vec::new(), backed_candidates: Vec::new(), signed_bitfields: Vec::new(), } } async fn run_loop(mut self) -> Result<(), Error> { while let Some(msg) = self.receiver.next().await { use ProvisionerMessage::{ ProvisionableData, RequestBlockAuthorshipData, RequestInherentData, }; match msg { ToJob::Provisioner(RequestInherentData(_, sender)) => { // Note the cloning here: we have to clone the vectors of signed bitfields and backed candidates // so that we can respond to more than a single request for inherent data; we can't just move them. // It would be legal, however, to set up `from_job` as `&mut _` instead of a clone. We clone it instead // of borrowing it so that this async function doesn't depend at all on the lifetime of `&self`. send_inherent_data( self.relay_parent, self.signed_bitfields.clone(), self.backed_candidates.clone(), sender, self.sender.clone(), ) .await? } ToJob::Provisioner(RequestBlockAuthorshipData(_, sender)) => { self.provisionable_data_channels.push(sender) } ToJob::Provisioner(ProvisionableData(data)) => { let mut bad_indices = Vec::new(); for (idx, channel) in self.provisionable_data_channels.iter_mut().enumerate() { match channel.send(data.clone()).await { Ok(_) => {} Err(_) => bad_indices.push(idx), } } self.note_provisionable_data(data); // clean up our list of channels by removing the bad indices // start by reversing it for efficient pop bad_indices.reverse(); // Vec::retain would be nicer here, but it doesn't provide // an easy API for retaining by index, so we re-collect instead. self.provisionable_data_channels = self .provisionable_data_channels .into_iter() .enumerate() .filter(|(idx, _)| { if bad_indices.is_empty() { return true; } let tail = bad_indices[bad_indices.len() - 1]; let retain = *idx != tail; if *idx >= tail { bad_indices.pop(); } retain }) .map(|(_, item)| item) .collect(); } ToJob::Stop => break, } } Ok(()) } fn note_provisionable_data(&mut self, provisionable_data: ProvisionableData) { match provisionable_data { ProvisionableData::Bitfield(_, signed_bitfield) => { self.signed_bitfields.push(signed_bitfield) } ProvisionableData::BackedCandidate(backed_candidate) => { self.backed_candidates.push(backed_candidate) } _ => {} } } } // The provisioner is the subsystem best suited to choosing which specific // backed candidates and availability bitfields should be assembled into the // block. To engage this functionality, a // `ProvisionerMessage::RequestInherentData` is sent; the response is a set of // non-conflicting candidates and the appropriate bitfields. Non-conflicting // means that there are never two distinct parachain candidates included for // the same parachain and that new parachain candidates cannot be included // until the previous one either gets declared available or expired. // // The main complication here is going to be around handling // occupied-core-assumptions. We might have candidates that are only // includable when some bitfields are included. And we might have candidates // that are not includable when certain bitfields are included. // // When we're choosing bitfields to include, the rule should be simple: // maximize availability. So basically, include all bitfields. And then // choose a coherent set of candidates along with that. async fn send_inherent_data( relay_parent: Hash, signed_bitfields: Vec, mut backed_candidates: Vec, return_sender: oneshot::Sender, mut from_job: mpsc::Sender, ) -> Result<(), Error> { let availability_cores = match request_availability_cores(relay_parent, &mut from_job) .await? .await? { Ok(cores) => cores, Err(runtime_err) => { // Don't take down the node on runtime API errors. log::warn!(target: "provisioner", "Encountered a runtime API error: {:?}", runtime_err); return Ok(()); } }; // availability cores indexed by their para_id for ease of use let cores_by_id: HashMap<_, _> = availability_cores .iter() .enumerate() .filter_map(|(idx, core_state)| Some((core_state.para_id()?, (idx, core_state)))) .collect(); // utility let mut parachains_represented = HashSet::new(); // ideally, we wouldn't calculate this unconditionally, but only if we have to use it in the // `if let Some(ref scheduled) = occupied.next_up_on_time_out {` case. In all other cases, it's // irrelevant. However, that's challenging: that whole thing happens within a `.retain` closure, // which is _not_ async. // We can leave the optimization for another day. let block_number = match get_block_number_under_construction(relay_parent, &mut from_job).await { Ok(n) => n, Err(err) => { log::warn!(target: "Provisioner", "failed to get number of block under construction: {:?}", err); 0 } }; // coherent candidates fulfill these conditions: // // - only one per parachain // - any of: // - this para is assigned to a `Scheduled` core // - this para is assigned to an `Occupied` core, and any of: // - it is `next_up_on_available` and the bitfields we are including, merged with // the `availability` vec, form 2/3+ of validators // - it is `next_up_on_time_out` and the bitfields we are including, merged with // the `availability_ vec, for <2/3 of validators, and `time_out_at` is // the block we are building. // // postcondition: they are sorted by core index backed_candidates.retain(|candidate| { // only allow the first candidate per parachain let para_id = candidate.candidate.descriptor.para_id; if !parachains_represented.insert(para_id) { return false; } let (core_idx, core) = match cores_by_id.get(¶_id) { Some(core) => core, None => return false, }; match core { CoreState::Free => false, CoreState::Scheduled(_) => true, CoreState::Occupied(occupied) => { if let Some(ref scheduled) = occupied.next_up_on_available { return scheduled.para_id == para_id && merged_bitfields_are_gte_two_thirds( *core_idx, &signed_bitfields, &occupied.availability, ); } if let Some(ref scheduled) = occupied.next_up_on_time_out { return scheduled.para_id == para_id && occupied.time_out_at == block_number && !merged_bitfields_are_gte_two_thirds( *core_idx, &signed_bitfields, &occupied.availability, ); } false } } }); // ensure the postcondition holds true: sorted by core index // note: unstable sort is still deterministic becuase we know (by means of `parachains_represented`) that // no two backed candidates remain, both of which are assigned to the same core. backed_candidates.sort_unstable_by_key(|candidate| { let para_id = candidate.candidate.descriptor.para_id; let (core_idx, _) = cores_by_id.get(¶_id).expect("paras not assigned to a core have already been eliminated from the backed_candidates list; qed"); core_idx }); // type ProvisionerInherentData = (Vec, Vec); return_sender .send((signed_bitfields, backed_candidates)) .map_err(|_| Error::OneshotSend)?; Ok(()) } // produces a block number 1 higher than that of the relay parent // in the event of an invalid `relay_parent`, returns `Ok(0)` async fn get_block_number_under_construction( relay_parent: Hash, sender: &mut mpsc::Sender, ) -> Result { let (tx, rx) = oneshot::channel(); sender .send(FromJob::ChainApi(ChainApiMessage::BlockNumber( relay_parent, tx, ))) .await .map_err(|_| Error::OneshotSend)?; match rx.await? { Ok(Some(n)) => Ok(n + 1), Ok(None) => Ok(0), Err(err) => Err(err.into()), } } // The instructions state: // // > we can only include the candidate if the bitfields we are including _and_ the availability vec of the OccupiedCore // // The natural implementation takes advantage of the fact that the availability bitfield for a given core is the transpose // of a set of signed availability bitfields. It goes like this: // // - organize the incoming bitfields by validator index // - construct a transverse slice along `core_idx` // - bitwise-or it with the availability slice // - count the 1 bits, compare to the total length fn merged_bitfields_are_gte_two_thirds( core_idx: usize, bitfields: &[SignedAvailabilityBitfield], availability: &BitVec, ) -> bool { let mut transverse = availability.clone(); let transverse_len = transverse.len(); for bitfield in bitfields { let validator_idx = bitfield.validator_index() as usize; match transverse.get_mut(validator_idx) { None => { // in principle, this function might return a `Result` so that we can more clearly express this error condition // however, in practice, that would just push off an error-handling routine which would look a whole lot like this one. // simpler to just handle the error internally here. log::warn!(target: "provisioner", "attempted to set a transverse bit at idx {} which is greater than bitfield size {}", validator_idx, transverse_len); return false; } Some(mut bit_mut) => *bit_mut |= bitfield.payload().0[core_idx], } } 3 * transverse.count_ones() >= 2 * transverse.len() }