Unverified Commit 5ce2b380 authored by asynchronous rob's avatar asynchronous rob Committed by GitHub
Browse files

small improvements for parachains consensus (#2040)



* introduce a waiting period before selecting candidates and bitfields

* add network_bridge=debug tracing for rep

* change to 2.5s timeout in proposer

* pass timeout to proposer

* move timeout back to provisioner

* grumbles

* Update node/core/provisioner/src/lib.rs

* Fix nitpicks

* Fix bug

Co-authored-by: default avatarBastian Köcher <bkchr@users.noreply.github.com>
Co-authored-by: Bastian Köcher's avatarBastian Köcher <git@kchr.de>
parent 6ef668ae
Pipeline #115570 passed with stages
in 30 minutes and 13 seconds
......@@ -37,7 +37,7 @@ use prometheus_endpoint::Registry as PrometheusRegistry;
use std::{fmt, pin::Pin, sync::Arc, time};
/// How long proposal can take before we give up and err out
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_secs(2);
const PROPOSE_TIMEOUT: core::time::Duration = core::time::Duration::from_millis(2500);
/// Custom Proposer factory for Polkadot
pub struct ProposerFactory<TxPool, Backend, Client> {
......
......@@ -13,8 +13,8 @@ thiserror = "1.0.22"
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
futures-timer = "3.0.2"
[dev-dependencies]
sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
futures-timer = "3.0.2"
......@@ -38,9 +38,45 @@ use polkadot_primitives::v1::{
};
use std::{pin::Pin, collections::BTreeMap};
use thiserror::Error;
use futures_timer::Delay;
/// How long to wait before proposing.
const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_millis(2000);
const LOG_TARGET: &str = "provisioner";
enum InherentAfter {
Ready,
Wait(Delay),
}
impl InherentAfter {
fn new_from_now() -> Self {
InherentAfter::Wait(Delay::new(PRE_PROPOSE_TIMEOUT))
}
fn is_ready(&self) -> bool {
match *self {
InherentAfter::Ready => true,
InherentAfter::Wait(_) => false,
}
}
async fn ready(&mut self) {
match *self {
InherentAfter::Ready => {
// Make sure we never end the returned future.
// This is required because the `select!` that calls this future will end in a busy loop.
futures::pending!()
},
InherentAfter::Wait(ref mut d) => {
d.await;
*self = InherentAfter::Ready;
},
}
}
}
struct ProvisioningJob {
relay_parent: Hash,
sender: mpsc::Sender<FromJobCommand>,
......@@ -49,6 +85,8 @@ struct ProvisioningJob {
backed_candidates: Vec<BackedCandidate>,
signed_bitfields: Vec<SignedAvailabilityBitfield>,
metrics: Metrics,
inherent_after: InherentAfter,
awaiting_inherent: Vec<oneshot::Sender<ProvisionerInherentData>>
}
#[derive(Debug, Error)]
......@@ -92,7 +130,12 @@ impl JobTrait for ProvisioningJob {
sender: mpsc::Sender<FromJobCommand>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
async move {
let job = ProvisioningJob::new(relay_parent, metrics, sender, receiver);
let job = ProvisioningJob::new(
relay_parent,
metrics,
sender,
receiver,
);
// it isn't necessary to break run_loop into its own function,
// but it's convenient to separate the concerns in this way
......@@ -117,6 +160,8 @@ impl ProvisioningJob {
backed_candidates: Vec::new(),
signed_bitfields: Vec::new(),
metrics,
inherent_after: InherentAfter::new_from_now(),
awaiting_inherent: Vec::new(),
}
}
......@@ -126,70 +171,89 @@ impl ProvisioningJob {
};
loop {
match self.receiver.next().await {
Some(RequestInherentData(_, return_sender)) => {
let _timer = self.metrics.time_request_inherent_data();
if let Err(err) = send_inherent_data(
self.relay_parent,
&self.signed_bitfields,
&self.backed_candidates,
return_sender,
self.sender.clone(),
)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
self.metrics.on_inherent_data_request(Err(()));
} else {
self.metrics.on_inherent_data_request(Ok(()));
}
}
Some(RequestBlockAuthorshipData(_, sender)) => {
self.provisionable_data_channels.push(sender)
}
Some(ProvisionableData(_, data)) => {
let _timer = self.metrics.time_provisionable_data();
let mut bad_indices = Vec::new();
for (idx, channel) in self.provisionable_data_channels.iter_mut().enumerate() {
match channel.send(data.clone()).await {
Ok(_) => {}
Err(_) => bad_indices.push(idx),
futures::select! {
msg = self.receiver.next().fuse() => match msg {
Some(RequestInherentData(_, return_sender)) => {
let _timer = self.metrics.time_request_inherent_data();
if self.inherent_after.is_ready() {
self.send_inherent_data(vec![return_sender]).await;
} else {
self.awaiting_inherent.push(return_sender);
}
}
self.note_provisionable_data(data);
// clean up our list of channels by removing the bad indices
// start by reversing it for efficient pop
bad_indices.reverse();
// Vec::retain would be nicer here, but it doesn't provide
// an easy API for retaining by index, so we re-collect instead.
self.provisionable_data_channels = self
.provisionable_data_channels
.into_iter()
.enumerate()
.filter(|(idx, _)| {
if bad_indices.is_empty() {
return true;
}
let tail = bad_indices[bad_indices.len() - 1];
let retain = *idx != tail;
if *idx >= tail {
let _ = bad_indices.pop();
Some(RequestBlockAuthorshipData(_, sender)) => {
self.provisionable_data_channels.push(sender)
}
Some(ProvisionableData(_, data)) => {
let _timer = self.metrics.time_provisionable_data();
let mut bad_indices = Vec::new();
for (idx, channel) in self.provisionable_data_channels.iter_mut().enumerate() {
match channel.send(data.clone()).await {
Ok(_) => {}
Err(_) => bad_indices.push(idx),
}
retain
})
.map(|(_, item)| item)
.collect();
}
self.note_provisionable_data(data);
// clean up our list of channels by removing the bad indices
// start by reversing it for efficient pop
bad_indices.reverse();
// Vec::retain would be nicer here, but it doesn't provide
// an easy API for retaining by index, so we re-collect instead.
self.provisionable_data_channels = self
.provisionable_data_channels
.into_iter()
.enumerate()
.filter(|(idx, _)| {
if bad_indices.is_empty() {
return true;
}
let tail = bad_indices[bad_indices.len() - 1];
let retain = *idx != tail;
if *idx >= tail {
let _ = bad_indices.pop();
}
retain
})
.map(|(_, item)| item)
.collect();
}
None => break,
},
_ = self.inherent_after.ready().fuse() => {
let return_senders = std::mem::take(&mut self.awaiting_inherent);
if !return_senders.is_empty() {
self.send_inherent_data(return_senders).await;
}
}
None => break,
}
}
Ok(())
}
async fn send_inherent_data(
&mut self,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
) {
if let Err(err) = send_inherent_data(
self.relay_parent,
&self.signed_bitfields,
&self.backed_candidates,
return_senders,
&mut self.sender,
)
.await
{
tracing::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
self.metrics.on_inherent_data_request(Err(()));
} else {
self.metrics.on_inherent_data_request(Ok(()));
}
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn note_provisionable_data(&mut self, provisionable_data: ProvisionableData) {
match provisionable_data {
......@@ -223,15 +287,15 @@ type CoreAvailability = BitVec<bitvec::order::Lsb0, u8>;
/// When we're choosing bitfields to include, the rule should be simple:
/// maximize availability. So basically, include all bitfields. And then
/// choose a coherent set of candidates along with that.
#[tracing::instrument(level = "trace", skip(return_sender, from_job), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(return_senders, from_job), fields(subsystem = LOG_TARGET))]
async fn send_inherent_data(
relay_parent: Hash,
bitfields: &[SignedAvailabilityBitfield],
candidates: &[BackedCandidate],
return_sender: oneshot::Sender<ProvisionerInherentData>,
mut from_job: mpsc::Sender<FromJobCommand>,
return_senders: Vec<oneshot::Sender<ProvisionerInherentData>>,
from_job: &mut mpsc::Sender<FromJobCommand>,
) -> Result<(), Error> {
let availability_cores = request_availability_cores(relay_parent, &mut from_job)
let availability_cores = request_availability_cores(relay_parent, from_job)
.await?
.await??;
......@@ -241,13 +305,15 @@ async fn send_inherent_data(
&bitfields,
candidates,
relay_parent,
&mut from_job,
from_job,
)
.await?;
return_sender
.send((bitfields, candidates))
.map_err(|_data| Error::InherentDataReturnChannel)?;
let res = (bitfields, candidates);
for return_sender in return_senders {
return_sender.send(res.clone()).map_err(|_data| Error::InherentDataReturnChannel)?;
}
Ok(())
}
......
......@@ -243,7 +243,7 @@ mod tests {
sp_api::mock_impl_runtime_apis! {
impl ParachainHost<Block> for MockRuntimeApi {
type Error = String;
type Error = sp_api::ApiError;
fn validators(&self) -> Vec<ValidatorId> {
self.validators.clone()
......
......@@ -136,6 +136,7 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
sc_network::NetworkService::event_stream(self, "polkadot-network-bridge").boxed()
}
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn action_sink<'a>(&'a mut self)
-> Pin<Box<dyn Sink<NetworkAction, Error = SubsystemError> + Send + 'a>>
{
......@@ -153,10 +154,13 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
fn start_send(self: Pin<&mut Self>, action: NetworkAction) -> SubsystemResult<()> {
match action {
NetworkAction::ReputationChange(peer, cost_benefit) => self.0.report_peer(
peer,
cost_benefit,
),
NetworkAction::ReputationChange(peer, cost_benefit) => {
tracing::debug!("reputation: {:?} for {}", cost_benefit, peer);
self.0.report_peer(
peer,
cost_benefit,
)
}
NetworkAction::WriteNotification(peer, peer_set, message) => {
match peer_set {
PeerSet::Validation => self.0.write_notification(
......
......@@ -21,6 +21,8 @@ use sp_keyring::Sr25519Keyring;
#[substrate_test_utils::test]
async fn ensure_test_service_build_blocks(task_executor: TaskExecutor) {
sc_cli::init_logger("", Default::default(), None).expect("Sets up logger");
let mut alice = run_validator_node(
task_executor.clone(),
Sr25519Keyring::Alice,
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment