lib.rs 9.19 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Utilities for testing subsystems.

19
20
#![warn(missing_docs)]

21
use polkadot_node_subsystem::messages::AllMessages;
22
23
24
25
use polkadot_node_subsystem::{
	FromOverseer, SubsystemContext, SubsystemError, SubsystemResult, Subsystem,
	SpawnedSubsystem, OverseerSignal,
};
26
use polkadot_node_subsystem_util::TimeoutExt;
27
28
29

use futures::channel::mpsc;
use futures::poll;
30
use futures::prelude::*;
31
use parking_lot::Mutex;
32
use sp_core::{testing::TaskExecutor, traits::SpawnNamed};
33
34
35
36
37

use std::convert::Infallible;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
38
use std::time::Duration;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

enum SinkState<T> {
	Empty {
		read_waker: Option<Waker>,
	},
	Item {
		item: T,
		ready_waker: Option<Waker>,
		flush_waker: Option<Waker>,
	},
}

/// The sink half of a single-item sink that does not resolve until the item has been read.
pub struct SingleItemSink<T>(Arc<Mutex<SinkState<T>>>);

/// The stream half of a single-item sink.
pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>);

impl<T> Sink<T> for SingleItemSink<T> {
	type Error = Infallible;

60
	fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
61
62
63
		let mut state = self.0.lock();
		match *state {
			SinkState::Empty { .. } => Poll::Ready(Ok(())),
64
65
66
67
			SinkState::Item {
				ref mut ready_waker,
				..
			} => {
68
69
70
71
72
73
				*ready_waker = Some(cx.waker().clone());
				Poll::Pending
			}
		}
	}

74
	fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Infallible> {
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
		let mut state = self.0.lock();

		match *state {
			SinkState::Empty { ref mut read_waker } => {
				if let Some(waker) = read_waker.take() {
					waker.wake();
				}
			}
			_ => panic!("start_send called outside of empty sink state ensured by poll_ready"),
		}

		*state = SinkState::Item {
			item,
			ready_waker: None,
			flush_waker: None,
		};

		Ok(())
	}

95
	fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
96
97
98
		let mut state = self.0.lock();
		match *state {
			SinkState::Empty { .. } => Poll::Ready(Ok(())),
99
100
101
102
			SinkState::Item {
				ref mut flush_waker,
				..
			} => {
103
104
105
106
107
108
				*flush_waker = Some(cx.waker().clone());
				Poll::Pending
			}
		}
	}

109
	fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Infallible>> {
110
111
112
113
114
115
116
117
118
119
120
121
122
123
		self.poll_flush(cx)
	}
}

impl<T> Stream for SingleItemStream<T> {
	type Item = T;

	fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
		let mut state = self.0.lock();

		let read_waker = Some(cx.waker().clone());

		match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) {
			SinkState::Empty { .. } => Poll::Pending,
124
125
126
127
128
			SinkState::Item {
				item,
				ready_waker,
				flush_waker,
			} => {
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
				if let Some(waker) = ready_waker {
					waker.wake();
				}

				if let Some(waker) = flush_waker {
					waker.wake();
				}

				Poll::Ready(Some(item))
			}
		}
	}
}

/// Create a single-item Sink/Stream pair.
///
/// The sink's send methods resolve at the point which the stream reads the item,
/// not when the item is buffered.
pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
	let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None }));
149
	(SingleItemSink(inner.clone()), SingleItemStream(inner))
150
151
152
153
154
155
156
157
158
159
}

/// A test subsystem context.
pub struct TestSubsystemContext<M, S> {
	tx: mpsc::UnboundedSender<AllMessages>,
	rx: SingleItemStream<FromOverseer<M>>,
	spawn: S,
}

#[async_trait::async_trait]
160
161
162
impl<M: Send + 'static, S: SpawnNamed + Send + 'static> SubsystemContext
	for TestSubsystemContext<M, S>
{
163
164
165
166
167
168
169
170
171
172
173
	type Message = M;

	async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> {
		match poll!(self.rx.next()) {
			Poll::Ready(Some(msg)) => Ok(Some(msg)),
			Poll::Ready(None) => Err(()),
			Poll::Pending => Ok(None),
		}
	}

	async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> {
174
175
		self.rx.next().await
			.ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))
176
177
	}

178
179
180
181
182
	async fn spawn(
		&mut self,
		name: &'static str,
		s: Pin<Box<dyn Future<Output = ()> + Send>>,
	) -> SubsystemResult<()> {
183
184
		self.spawn.spawn(name, s);
		Ok(())
185
186
	}

187
188
189
190
191
192
193
	async fn spawn_blocking(&mut self, name: &'static str, s: Pin<Box<dyn Future<Output = ()> + Send>>)
		-> SubsystemResult<()>
	{
		self.spawn.spawn_blocking(name, s);
		Ok(())
	}

194
	async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
195
196
197
198
		self.tx
			.send(msg)
			.await
			.expect("test overseer no longer live");
199
200
201
202
		Ok(())
	}

	async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()>
203
204
205
	where
		T: IntoIterator<Item = AllMessages> + Send,
		T::IntoIter: Send,
206
207
	{
		let mut iter = stream::iter(msgs.into_iter().map(Ok));
208
209
210
211
		self.tx
			.send_all(&mut iter)
			.await
			.expect("test overseer no longer live");
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226

		Ok(())
	}
}

/// A handle for interacting with the subsystem context.
pub struct TestSubsystemContextHandle<M> {
	tx: SingleItemSink<FromOverseer<M>>,
	rx: mpsc::UnboundedReceiver<AllMessages>,
}

impl<M> TestSubsystemContextHandle<M> {
	/// Send a message or signal to the subsystem. This resolves at the point in time where the
	/// subsystem has _read_ the message.
	pub async fn send(&mut self, from_overseer: FromOverseer<M>) {
227
228
229
230
		self.tx
			.send(from_overseer)
			.await
			.expect("Test subsystem no longer live");
231
232
233
234
	}

	/// Receive the next message from the subsystem.
	pub async fn recv(&mut self) -> AllMessages {
235
236
237
238
239
240
		self.try_recv().await.expect("Test subsystem no longer live")
	}

	/// Receive the next message from the subsystem, or `None` if the channel has been closed.
	pub async fn try_recv(&mut self) -> Option<AllMessages> {
		self.rx.next().await
241
242
243
244
	}
}

/// Make a test subsystem context.
245
246
247
pub fn make_subsystem_context<M, S>(
	spawn: S,
) -> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>) {
248
249
250
251
252
253
254
255
256
257
258
	let (overseer_tx, overseer_rx) = single_item_sink();
	let (all_messages_tx, all_messages_rx) = mpsc::unbounded();

	(
		TestSubsystemContext {
			tx: all_messages_tx,
			rx: overseer_rx,
			spawn,
		},
		TestSubsystemContextHandle {
			tx: overseer_tx,
259
			rx: all_messages_rx,
260
261
		},
	)
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
}

/// Test a subsystem, mocking the overseer
///
/// Pass in two async closures: one mocks the overseer, the other runs the test from the perspective of a subsystem.
///
/// Times out in two seconds.
pub fn subsystem_test_harness<M, OverseerFactory, Overseer, TestFactory, Test>(
	overseer_factory: OverseerFactory,
	test_factory: TestFactory,
) where
	OverseerFactory: FnOnce(TestSubsystemContextHandle<M>) -> Overseer,
	Overseer: Future<Output = ()>,
	TestFactory: FnOnce(TestSubsystemContext<M, TaskExecutor>) -> Test,
	Test: Future<Output = ()>,
{
	let pool = TaskExecutor::new();
	let (context, handle) = make_subsystem_context(pool);
	let overseer = overseer_factory(handle);
	let test = test_factory(context);

283
	futures::pin_mut!(overseer, test);
284
285

	futures::executor::block_on(async move {
286
287
288
289
		future::join(overseer, test)
			.timeout(Duration::from_secs(2))
			.await
			.expect("test timed out instead of completing")
290
291
	});
}
292
293
294
295
296
297
298
299
300
301
302
303
304
305

/// A forward subsystem that implements [`Subsystem`].
///
/// It forwards all communication from the overseer to the internal message
/// channel.
///
/// This subsystem is useful for testing functionality that interacts with the overseer.
pub struct ForwardSubsystem<Msg>(pub mpsc::Sender<Msg>);

impl<C: SubsystemContext<Message = Msg>, Msg: Send + 'static> Subsystem<C> for ForwardSubsystem<Msg> {
	fn start(mut self, mut ctx: C) -> SpawnedSubsystem {
		let future = Box::pin(async move {
			loop {
				match ctx.recv().await {
306
					Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
307
308
309
					Ok(FromOverseer::Communication { msg }) => {
						let _ = self.0.send(msg).await;
					},
310
					Err(_) => return Ok(()),
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
					_ => (),
				}
			}
		});

		SpawnedSubsystem {
			name: "forward-subsystem",
			future,
		}
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use polkadot_overseer::{Overseer, AllSubsystems};
	use futures::executor::block_on;
	use polkadot_node_subsystem::messages::CandidateSelectionMessage;

	#[test]
	fn forward_subsystem_works() {
		let spawner = sp_core::testing::TaskExecutor::new();
		let (tx, rx) = mpsc::channel(2);
		let all_subsystems = AllSubsystems::<()>::dummy().replace_candidate_selection(ForwardSubsystem(tx));
		let (overseer, mut handler) = Overseer::new(
			Vec::new(),
			all_subsystems,
			None,
			spawner.clone(),
		).unwrap();

		spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());

		block_on(handler.send_msg(CandidateSelectionMessage::Invalid(Default::default(), Default::default()))).unwrap();
		assert!(matches!(block_on(rx.into_future()).0.unwrap(), CandidateSelectionMessage::Invalid(_, _)));
	}
}