Newer
Older
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_subsystem_testhelpers::TestSubsystemContextHandle;
use smallvec::smallvec;
use futures::{FutureExt, channel::oneshot, SinkExt, channel::mpsc, StreamExt};
use futures_timer::Delay;
use sc_keystore::LocalKeystore;
use sp_application_crypto::AppKey;
use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr};
use sp_keyring::Sr25519Keyring;
use sp_core::{traits::SpawnNamed, testing::TaskExecutor};
use sc_network as network;
use sc_network::IfDisconnected;
use sc_network::config as netconfig;
use polkadot_subsystem::{ActiveLeavesUpdate, FromOverseer, OverseerSignal, ActivatedLeaf,
messages::{
AllMessages, AvailabilityDistributionMessage, AvailabilityStoreMessage, NetworkBridgeMessage,
RuntimeApiMessage, RuntimeApiRequest,
}
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
};
use polkadot_primitives::v1::{CandidateHash, CoreState, ErasureChunk, GroupIndex, Hash, Id
as ParaId, ScheduledCore, SessionInfo, ValidatorId,
ValidatorIndex
};
use polkadot_node_network_protocol::{jaeger,
request_response::{IncomingRequest, OutgoingRequest, Requests, v1}
};
use polkadot_subsystem_testhelpers as test_helpers;
use test_helpers::SingleItemSink;
use super::mock::{make_session_info, OccupiedCoreBuilder, };
use crate::LOG_TARGET;
pub struct TestHarness {
pub virtual_overseer: test_helpers::TestSubsystemContextHandle<AvailabilityDistributionMessage>,
pub pool: TaskExecutor,
}
/// TestState for mocking execution of this subsystem.
///
/// The `Default` instance provides data, which makes the system succeed by providing a couple of
/// valid occupied cores. You can tune the data before calling `TestState::run`. E.g. modify some
/// chunks to be invalid, the test will then still pass if you remove that chunk from
/// `valid_chunks`.
#[derive(Clone)]
pub struct TestState {
// Simulated relay chain heads:
pub relay_chain: Vec<Hash>,
pub chunks: HashMap<(CandidateHash, ValidatorIndex), ErasureChunk>,
/// All chunks that are valid and should be accepted.
pub valid_chunks: HashSet<(CandidateHash, ValidatorIndex)>,
pub session_info: SessionInfo,
/// Cores per relay chain block.
pub cores: HashMap<Hash, Vec<CoreState>>,
pub keystore: SyncCryptoStorePtr,
}
impl Default for TestState {
fn default() -> Self {
let relay_chain: Vec<_> = (1u8..10).map(Hash::repeat_byte).collect();
let chain_a = ParaId::from(1);
let chain_b = ParaId::from(2);
let chain_ids = vec![chain_a, chain_b];
let keystore: SyncCryptoStorePtr = Arc::new(LocalKeystore::in_memory());
let session_info = make_session_info();
SyncCryptoStore::sr25519_generate_new(
&*keystore,
ValidatorId::ID,
Some(&Sr25519Keyring::Ferdie.to_seed()),
)
.expect("Insert key into keystore");
let (cores, chunks) = {
let mut cores = HashMap::new();
let mut chunks = HashMap::new();
cores.insert(relay_chain[0],
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
vec![
CoreState::Scheduled(ScheduledCore {
para_id: chain_ids[0],
collator: None,
}),
CoreState::Scheduled(ScheduledCore {
para_id: chain_ids[1],
collator: None,
}),
]
);
let heads = {
let mut advanced = relay_chain.iter();
advanced.next();
relay_chain.iter().zip(advanced)
};
for (relay_parent, relay_child) in heads {
let (p_cores, p_chunks): (Vec<_>, Vec<_>) = chain_ids.iter().enumerate()
.map(|(i, para_id)| {
let (core, chunk) = OccupiedCoreBuilder {
group_responsible: GroupIndex(i as _),
para_id: *para_id,
relay_parent: relay_parent.clone(),
}.build();
(CoreState::Occupied(core), chunk)
}
)
.unzip();
cores.insert(relay_child.clone(), p_cores);
// Skip chunks for our own group (won't get fetched):
let mut chunks_other_groups = p_chunks.into_iter();
chunks_other_groups.next();
for (validator_index, chunk) in chunks_other_groups {
chunks.insert((validator_index, chunk.index), chunk);
}
}
(cores, chunks)
};
Self {
relay_chain,
valid_chunks: chunks.clone().keys().map(Clone::clone).collect(),
chunks,
session_info,
cores,
keystore,
}
}
}
impl TestState {
/// Run, but fail after some timeout.
pub async fn run(self, harness: TestHarness) {
// Make sure test won't run forever.
let f = self.run_inner(harness.pool, harness.virtual_overseer).timeout(Duration::from_secs(10));
assert!(f.await.is_some(), "Test ran into timeout");
}
/// Run tests with the given mock values in `TestState`.
///
/// This will simply advance through the simulated chain and examines whether the subsystem
/// behaves as expected: It will succeed if all valid chunks of other backing groups get stored
/// and no other.
async fn run_inner(self, executor: TaskExecutor, virtual_overseer: TestSubsystemContextHandle<AvailabilityDistributionMessage>) {
// We skip genesis here (in reality ActiveLeavesUpdate can also skip a block:
let updates = {
let mut advanced = self.relay_chain.iter();
advanced.next();
self
.relay_chain.iter().zip(advanced)
.map(|(old, new)| ActiveLeavesUpdate {
activated: smallvec![ActivatedLeaf {
hash: new.clone(),
number: 1,
span: Arc::new(jaeger::Span::Disabled),
}],
deactivated: smallvec![old.clone()],
}).collect::<Vec<_>>()
};
// We should be storing all valid chunks during execution:
//
// Test will fail if this does not happen until timeout.
let mut remaining_stores = self.valid_chunks.len();
let TestSubsystemContextHandle { tx, mut rx } = virtual_overseer;
// Spawning necessary as incoming queue can only hold a single item, we don't want to dead
// lock ;-)
let update_tx = tx.clone();
executor.spawn("Sending active leaves updates", async move {
for update in updates {
overseer_signal(
update_tx.clone(),
OverseerSignal::ActiveLeaves(update)
).await;
// We need to give the subsystem a little time to do its job, otherwise it will
// cancel jobs as obsolete:
Delay::new(Duration::from_millis(20)).await;
}
}.boxed()
);
while remaining_stores > 0
{
tracing::trace!(target: LOG_TARGET, remaining_stores, "Stores left to go");
let msg = overseer_recv(&mut rx).await;
match msg {
AllMessages::NetworkBridge(NetworkBridgeMessage::SendRequests(reqs, IfDisconnected::TryConnect)) => {
for req in reqs {
// Forward requests:
let in_req = to_incoming_req(&executor, req);
executor.spawn("Request forwarding",
overseer_send(
tx.clone(),
AvailabilityDistributionMessage::ChunkFetchingRequest(in_req)
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
).boxed()
);
}
}
AllMessages::AvailabilityStore(AvailabilityStoreMessage::QueryChunk(candidate_hash, validator_index, tx)) => {
let chunk = self.chunks.get(&(candidate_hash, validator_index));
tx.send(chunk.map(Clone::clone))
.expect("Receiver is expected to be alive");
}
AllMessages::AvailabilityStore(AvailabilityStoreMessage::StoreChunk{candidate_hash, chunk, tx, ..}) => {
assert!(
self.valid_chunks.contains(&(candidate_hash, chunk.index)),
"Only valid chunks should ever get stored."
);
tx.send(Ok(()))
.expect("Receiver is expected to be alive");
tracing::trace!(target: LOG_TARGET, "'Stored' fetched chunk.");
remaining_stores -= 1;
}
AllMessages::RuntimeApi(RuntimeApiMessage::Request(hash, req)) => {
match req {
RuntimeApiRequest::SessionIndexForChild(tx) => {
// Always session index 1 for now:
tx.send(Ok(1))
.expect("Receiver should still be alive");
}
RuntimeApiRequest::SessionInfo(_, tx) => {
tx.send(Ok(Some(self.session_info.clone())))
.expect("Receiver should be alive.");
}
RuntimeApiRequest::AvailabilityCores(tx) => {
tracing::trace!(target: LOG_TARGET, cores= ?self.cores[&hash], hash = ?hash, "Sending out cores for hash");
tx.send(Ok(self.cores[&hash].clone()))
.expect("Receiver should still be alive");
}
_ => {
panic!("Unexpected runtime request: {:?}", req);
}
}
}
_ => {
panic!("Unexpected message received: {:?}", msg);
}
}
}
}
}
async fn overseer_signal(
mut tx: SingleItemSink<FromOverseer<AvailabilityDistributionMessage>>,
msg: impl Into<OverseerSignal>,
) {
let msg = msg.into();
tracing::trace!(target: LOG_TARGET, msg = ?msg, "sending message");
tx.send(FromOverseer::Signal(msg))
.await
.expect("Test subsystem no longer live");
}
async fn overseer_send(
mut tx: SingleItemSink<FromOverseer<AvailabilityDistributionMessage>>,
msg: impl Into<AvailabilityDistributionMessage>,
) {
let msg = msg.into();
tracing::trace!(target: LOG_TARGET, msg = ?msg, "sending message");
tx.send(FromOverseer::Communication { msg }).await
.expect("Test subsystem no longer live");
tracing::trace!(target: LOG_TARGET, "sent message");
}
async fn overseer_recv(
rx: &mut mpsc::UnboundedReceiver<AllMessages>,
) -> AllMessages {
tracing::trace!(target: LOG_TARGET, "waiting for message ...");
rx.next().await.expect("Test subsystem no longer live")
}
fn to_incoming_req(
executor: &TaskExecutor,
outgoing: Requests
) -> IncomingRequest<v1::ChunkFetchingRequest> {
match outgoing {
Requests::ChunkFetching(OutgoingRequest { payload, pending_response, .. }) => {
let (tx, rx): (oneshot::Sender<netconfig::OutgoingResponse>, oneshot::Receiver<_>)
= oneshot::channel();
executor.spawn("Message forwarding", async {
let response = rx.await;
let payload = response.expect("Unexpected canceled request").result;
pending_response.send(payload.map_err(|_| network::RequestFailure::Refused))
.expect("Sending response is expected to work");
}.boxed()
);
IncomingRequest::new(
// We don't really care:
network::PeerId::random(),
payload,
tx
)
}
_ => panic!("Unexpected request!"),