lib.rs 6.61 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Copyright 2021 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! This subsystem is responsible for keeping track of session changes
//! and issuing a connection request to the validators relevant to
//! the gossiping subsystems on every new session.

21
22
23
#[cfg(test)]
mod tests;

24
use std::time::{Duration, Instant};
25
use futures::{channel::oneshot, FutureExt as _};
26
27
use polkadot_node_subsystem::{
	messages::{
28
		AllMessages, GossipSupportMessage, NetworkBridgeMessage,
29
30
31
32
	},
	ActiveLeavesUpdate, FromOverseer, OverseerSignal,
	Subsystem, SpawnedSubsystem, SubsystemContext,
};
33
use polkadot_node_subsystem_util as util;
34
use polkadot_primitives::v1::{
35
	Hash, SessionIndex, AuthorityDiscoveryId,
36
};
37
use polkadot_node_network_protocol::peer_set::PeerSet;
38
39
use sp_keystore::{CryptoStore, SyncCryptoStorePtr};
use sp_application_crypto::{Public, AppKey};
40

41
const LOG_TARGET: &str = "parachain::gossip-support";
42
43
44
// How much time should we wait since the last
// authority discovery resolution failure.
const BACKOFF_DURATION: Duration = Duration::from_secs(5);
45
46

/// The Gossip Support subsystem.
47
pub struct GossipSupport {
48
49
	keystore: SyncCryptoStorePtr,
}
50
51
52
53

#[derive(Default)]
struct State {
	last_session_index: Option<SessionIndex>,
54
55
56
57
	// Some(timestamp) if we failed to resolve
	// at least a third of authorities the last time.
	// `None` otherwise.
	last_failure: Option<Instant>,
58
59
}

60
impl GossipSupport {
61
	/// Create a new instance of the [`GossipSupport`] subsystem.
62
	pub fn new(keystore: SyncCryptoStorePtr) -> Self {
63
64
65
		Self {
			keystore,
		}
66
67
	}

68
	async fn run<Context>(self, ctx: Context)
69
70
71
72
	where
		Context: SubsystemContext<Message = GossipSupportMessage>,
	{
		let mut state = State::default();
73
74
75
76
77
78
79
		self.run_inner(ctx, &mut state).await;
	}

	async fn run_inner<Context>(self, mut ctx: Context, state: &mut State)
	where
		Context: SubsystemContext<Message = GossipSupportMessage>,
	{
80
		let Self { keystore } = self;
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
		loop {
			let message = match ctx.recv().await {
				Ok(message) => message,
				Err(e) => {
					tracing::debug!(
						target: LOG_TARGET,
						err = ?e,
						"Failed to receive a message from Overseer, exiting"
					);
					return;
				},
			};
			match message {
				FromOverseer::Communication { .. } => {},
				FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
					activated,
					..
				})) => {
					tracing::trace!(target: LOG_TARGET, "active leaves signal");

101
					let leaves = activated.into_iter().map(|a| a.hash);
102
					if let Err(e) = state.handle_active_leaves(&mut ctx, &keystore, leaves).await {
Bastian Köcher's avatar
Bastian Köcher committed
103
						tracing::debug!(target: LOG_TARGET, error = ?e);
104
105
106
107
108
109
110
111
112
113
114
					}
				}
				FromOverseer::Signal(OverseerSignal::BlockFinalized(_hash, _number)) => {},
				FromOverseer::Signal(OverseerSignal::Conclude) => {
					return;
				}
			}
		}
	}
}

115
116
async fn determine_relevant_authorities(
	ctx: &mut impl SubsystemContext,
117
	relay_parent: Hash,
118
) -> Result<Vec<AuthorityDiscoveryId>, util::Error> {
119
	let authorities = util::request_authorities(relay_parent, ctx.sender()).await.await??;
120
121
122
123
124
	tracing::debug!(
		target: LOG_TARGET,
		authority_count = ?authorities.len(),
		"Determined relevant authorities"
	);
125
	Ok(authorities)
126
127
}

128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
/// Return an error if we're not a validator in the given set (do not have keys).
async fn ensure_i_am_an_authority(
	keystore: &SyncCryptoStorePtr,
	authorities: &[AuthorityDiscoveryId],
) -> Result<(), util::Error> {
	for v in authorities {
		if CryptoStore::has_keys(&**keystore, &[(v.to_raw_vec(), AuthorityDiscoveryId::ID)])
			.await
		{
			return Ok(());
		}
	}
	Err(util::Error::NotAValidator)
}

143
144
145
146
147
/// A helper function for making a `ConnectToValidators` request.
pub async fn connect_to_authorities(
	ctx: &mut impl SubsystemContext,
	validator_ids: Vec<AuthorityDiscoveryId>,
	peer_set: PeerSet,
148
149
) -> oneshot::Receiver<usize> {
	let (failed, failed_rx) = oneshot::channel();
150
151
152
153
	ctx.send_message(AllMessages::NetworkBridge(
		NetworkBridgeMessage::ConnectToValidators {
			validator_ids,
			peer_set,
154
			failed,
155
156
		}
	)).await;
157
	failed_rx
158
}
159

160
161
162
163
impl State {
	/// 1. Determine if the current session index has changed.
	/// 2. If it has, determine relevant validators
	///    and issue a connection request.
164
	async fn handle_active_leaves(
165
166
		&mut self,
		ctx: &mut impl SubsystemContext,
167
		keystore: &SyncCryptoStorePtr,
168
		leaves: impl Iterator<Item = Hash>,
169
	) -> Result<(), util::Error> {
170
		for leaf in leaves {
171
			let current_index = util::request_session_index_for_child(leaf, ctx.sender()).await.await??;
172
173
			let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
			let force_request = since_failure >= BACKOFF_DURATION;
174
			let maybe_new_session = match self.last_session_index {
175
				Some(i) if current_index <= i && !force_request => None,
176
177
178
179
				_ => Some((current_index, leaf)),
			};

			if let Some((new_session, relay_parent)) = maybe_new_session {
180
181
182
183
184
185
				tracing::debug!(
					target: LOG_TARGET,
					%new_session,
					%force_request,
					"New session detected",
				);
186
				let authorities = determine_relevant_authorities(ctx, relay_parent).await?;
187
				ensure_i_am_an_authority(keystore, &authorities).await?;
188
189
				let num = authorities.len();
				tracing::debug!(target: LOG_TARGET, %num, "Issuing a connection request");
190

191
				let failures = connect_to_authorities(
192
					ctx,
193
					authorities,
194
					PeerSet::Validation,
195
				).await;
196

197
198
199
200
				// we await for the request to be processed
				// this is fine, it should take much less time than one session
				let failures = failures.await.unwrap_or(num);

201
				self.last_session_index = Some(new_session);
202
203
204
205
206
207
208
				// issue another request for the same session
				// if at least a third of the authorities were not resolved
				self.last_failure = if failures >= num / 3 {
					Some(Instant::now())
				} else {
					None
				}
209
210
211
212
213
214
215
			}
		}

		Ok(())
	}
}

216
impl<Context> Subsystem<Context> for GossipSupport
217
where
218
	Context: SubsystemContext<Message = GossipSupportMessage> + Sync + Send,
219
{
220
	fn start(self, ctx: Context) -> SpawnedSubsystem {
221
222
223
224
225
226
227
228
229
230
		let future = self.run(ctx)
			.map(|_| Ok(()))
			.boxed();

		SpawnedSubsystem {
			name: "gossip-support-subsystem",
			future,
		}
	}
}