Newer
Older
use futures::{
channel::{mpsc, oneshot},
future::AbortHandle,
prelude::*,
task::SpawnError,
};
use polkadot_node_subsystem::messages::{
AllMessages, RuntimeApiMessage, RuntimeApiRequest, SchedulerRoster,
};
use polkadot_primitives::{
parachain::{
GlobalValidationSchedule, HeadData, Id as ParaId, LocalValidationData, SigningContext,
ValidatorId,
},
Hash,
};
use std::{
collections::HashMap,
ops::{Deref, DerefMut},
};
/// JobCanceler aborts all contained abort handles on drop
#[derive(Debug, Default)]
pub struct JobCanceler(HashMap<Hash, AbortHandle>);
impl Drop for JobCanceler {
fn drop(&mut self) {
for abort_handle in self.0.values() {
abort_handle.abort();
}
}
}
// JobCanceler is a smart pointer wrapping the contained hashmap;
// it only cares about the wrapped data insofar as it's necessary
// to implement proper Drop behavior. Therefore, it's appropriate
// to impl Deref and DerefMut.
impl Deref for JobCanceler {
type Target = HashMap<Hash, AbortHandle>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for JobCanceler {
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug, derive_more::From)]
pub enum Error {
#[from]
Oneshot(oneshot::Canceled),
#[from]
Mpsc(mpsc::SendError),
#[from]
Spawn(SpawnError),
}
/// Request some data from the `RuntimeApi`.
pub async fn request_from_runtime<RequestBuilder, Response>(
parent: Hash,
sender: &mut mpsc::Sender<AllMessages>,
request_builder: RequestBuilder,
) -> Result<oneshot::Receiver<Response>, Error>
where
RequestBuilder: FnOnce(oneshot::Sender<Response>) -> RuntimeApiRequest,
{
let (tx, rx) = oneshot::channel();
sender
.send(AllMessages::RuntimeApi(RuntimeApiMessage::Request(
parent,
request_builder(tx),
)))
.await?;
Ok(rx)
}
/// Request a `GlobalValidationSchedule` from `RuntimeApi`.
pub async fn request_global_validation_schedule(
parent: Hash,
s: &mut mpsc::Sender<AllMessages>,
) -> Result<oneshot::Receiver<GlobalValidationSchedule>, Error> {
request_from_runtime(parent, s, |tx| RuntimeApiRequest::GlobalValidationSchedule(tx)).await
}
/// Request a `LocalValidationData` from `RuntimeApi`.
pub async fn request_local_validation_data(
parent: Hash,
para_id: ParaId,
s: &mut mpsc::Sender<AllMessages>,
) -> Result<oneshot::Receiver<Option<LocalValidationData>>, Error> {
request_from_runtime(parent, s, |tx| RuntimeApiRequest::LocalValidationData(para_id, tx)).await
}
/// Request a validator set from the `RuntimeApi`.
pub async fn request_validators(
parent: Hash,
s: &mut mpsc::Sender<AllMessages>,
) -> Result<oneshot::Receiver<Vec<ValidatorId>>, Error> {
request_from_runtime(parent, s, |tx| RuntimeApiRequest::Validators(tx)).await
}
/// Request the scheduler roster from `RuntimeApi`.
pub async fn request_validator_groups(
parent: Hash,
s: &mut mpsc::Sender<AllMessages>,
) -> Result<oneshot::Receiver<SchedulerRoster>, Error> {
request_from_runtime(parent, s, |tx| RuntimeApiRequest::ValidatorGroups(tx)).await
}
/// Request a `SigningContext` from the `RuntimeApi`.
pub async fn request_signing_context(
parent: Hash,
s: &mut mpsc::Sender<AllMessages>,
) -> Result<oneshot::Receiver<SigningContext>, Error> {
request_from_runtime(parent, s, |tx| RuntimeApiRequest::SigningContext(tx)).await
}
/// Request `HeadData` for some `ParaId` from `RuntimeApi`.
pub async fn request_head_data(
parent: Hash,
s: &mut mpsc::Sender<AllMessages>,
id: ParaId,
) -> Result<oneshot::Receiver<HeadData>, Error> {
request_from_runtime(parent, s, |tx| RuntimeApiRequest::HeadData(id, tx)).await