Commit a3956386 authored by parity-processbot's avatar parity-processbot
Browse files

Merge remote-tracking branch 'origin/master' into tka-companion-8625

parents 0e4f3ac8 fff63cb5
......@@ -6083,6 +6083,7 @@ dependencies = [
"futures 0.3.13",
"futures-timer 3.0.2",
"log",
"lru",
"metered-channel",
"parity-scale-codec",
"parking_lot 0.11.1",
......
......@@ -592,7 +592,7 @@ impl metrics::Metrics for Metrics {
#[cfg(test)]
mod tests {
use super::*;
use super::*;
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_primitives::v1::{HeadData, UpwardMessage};
use sp_core::testing::TaskExecutor;
......
......@@ -191,7 +191,7 @@ impl State {
event: NetworkBridgeEvent<protocol_v1::ApprovalDistributionMessage>,
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, role) => {
NetworkBridgeEvent::PeerConnected(peer_id, role, _) => {
// insert a blank view if none already present
tracing::trace!(
target: LOG_TARGET,
......
......@@ -117,7 +117,7 @@ async fn setup_peer_with_view(
overseer_send(
virtual_overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full, None)
)
).await;
overseer_send(
......
......@@ -18,12 +18,15 @@
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::request_response::request::RequestError;
use polkadot_primitives::v1::SessionIndex;
use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_primitives::v1::SessionIndex;
use polkadot_node_subsystem_util::{
runtime,
Error as UtilError,
};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
use crate::LOG_TARGET;
......@@ -74,10 +77,6 @@ pub enum Error {
#[error("Runtime API error")]
RuntimeRequest(RuntimeApiError),
/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),
/// Fetching PoV failed with `RequestError`.
#[error("FetchPoV request error")]
FetchPoV(#[source] RequestError),
......@@ -92,10 +91,24 @@ pub enum Error {
/// No validator with the index could be found in current session.
#[error("Given validator index could not be found")]
InvalidValidatorIndex,
/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[source] runtime::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<runtime::Error> for Error {
fn from(err: runtime::Error) -> Self {
Self::Runtime(err)
}
}
impl From<SubsystemError> for Error {
fn from(err: SubsystemError) -> Self {
Self::IncomingMessageChannel(err)
......
......@@ -28,9 +28,7 @@ mod error;
pub use error::Error;
use error::{Result, log_error};
/// Runtime requests.
mod runtime;
use runtime::Runtime;
use polkadot_node_subsystem_util::runtime::Runtime;
/// `Requester` taking care of requesting chunks for candidates pending availability.
mod requester;
......
......@@ -33,8 +33,9 @@ use polkadot_subsystem::{
ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
messages::{AllMessages, NetworkBridgeMessage, IfDisconnected}
};
use polkadot_node_subsystem_util::runtime::{Runtime, ValidatorInfo};
use crate::{error::{Error, log_error}, runtime::{Runtime, ValidatorInfo}};
use crate::error::{Error, log_error};
/// Number of sessions we want to keep in the LRU.
const NUM_SESSIONS: usize = 2;
......@@ -274,7 +275,7 @@ mod tests {
let (mut context, mut virtual_overseer) =
test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
let keystore = make_ferdie_keystore();
let mut runtime = crate::runtime::Runtime::new(keystore);
let mut runtime = polkadot_node_subsystem_util::runtime::Runtime::new(keystore);
let (tx, rx) = oneshot::channel();
let testee = async {
......
......@@ -508,7 +508,7 @@ where
let _timer = metrics.time_handle_network_msg();
match bridge_message {
NetworkBridgeEvent::PeerConnected(peerid, role) => {
NetworkBridgeEvent::PeerConnected(peerid, role, _) => {
tracing::trace!(
target: LOG_TARGET,
?peerid,
......@@ -1335,7 +1335,7 @@ mod test {
&mut ctx,
&mut state,
&Default::default(),
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None),
));
// make peer b interested
......
......@@ -36,7 +36,7 @@ use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages,
CollatorProtocolMessage, NetworkBridgeEvent,
};
use polkadot_primitives::v1::{Hash, BlockNumber};
use polkadot_primitives::v1::{Hash, BlockNumber, AuthorityDiscoveryId};
use polkadot_node_network_protocol::{
PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
ObservedRole,
......@@ -316,7 +316,7 @@ impl From<SubsystemError> for UnexpectedAbort {
// notifications to be passed through to the validator discovery worker.
enum ValidatorDiscoveryNotification {
PeerConnected(PeerId, PeerSet),
PeerConnected(PeerId, PeerSet, Option<AuthorityDiscoveryId>),
PeerDisconnected(PeerId, PeerSet),
}
......@@ -540,11 +540,11 @@ where
},
notification = validator_discovery_notifications.next().fuse() => match notification {
None => return Ok(()),
Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set)) => {
Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_auth)) => {
validator_discovery.on_peer_connected(
peer.clone(),
peer_set,
&mut authority_discovery_service,
maybe_auth,
).await;
}
Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => {
......@@ -555,9 +555,10 @@ where
}
}
async fn handle_network_messages(
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
mut sender: impl SubsystemSender,
mut network_service: impl Network,
mut authority_discovery_service: AD,
mut request_multiplexer: RequestMultiplexer,
mut validator_discovery_notifications: mpsc::Sender<ValidatorDiscoveryNotification>,
metrics: Metrics,
......@@ -607,10 +608,14 @@ async fn handle_network_messages(
shared.local_view.clone().unwrap_or(View::default())
};
let maybe_authority =
authority_discovery_service
.get_authority_id_by_peer_id(peer).await;
// Failure here means that the other side of the network bridge
// has concluded and this future will be dropped in due course.
let _ = validator_discovery_notifications.send(
ValidatorDiscoveryNotification::PeerConnected(peer.clone(), peer_set)
ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_authority.clone())
).await;
......@@ -618,7 +623,7 @@ async fn handle_network_messages(
PeerSet::Validation => {
dispatch_validation_events_to_all(
vec![
NetworkBridgeEvent::PeerConnected(peer.clone(), role),
NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority),
NetworkBridgeEvent::PeerViewChange(
peer.clone(),
View::default(),
......@@ -640,7 +645,7 @@ async fn handle_network_messages(
PeerSet::Collation => {
dispatch_collation_events_to_all(
vec![
NetworkBridgeEvent::PeerConnected(peer.clone(), role),
NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority),
NetworkBridgeEvent::PeerViewChange(
peer.clone(),
View::default(),
......@@ -858,6 +863,7 @@ where
let (remote, network_event_handler) = handle_network_messages(
ctx.sender().clone(),
network_service.clone(),
authority_discovery_service.clone(),
request_multiplexer,
validation_worker_tx,
metrics.clone(),
......@@ -1191,6 +1197,7 @@ mod tests {
_req_configs: Vec<RequestResponseConfig>,
}
#[derive(Clone)]
struct TestAuthorityDiscovery;
// The test's view of the network. This receives updates from the subsystem in the form
......@@ -1796,7 +1803,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -1848,7 +1855,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -1920,7 +1927,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -1932,7 +1939,7 @@ mod tests {
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2004,7 +2011,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2016,7 +2023,7 @@ mod tests {
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2099,7 +2106,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2111,7 +2118,7 @@ mod tests {
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2259,7 +2266,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2271,7 +2278,7 @@ mod tests {
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......
......@@ -34,7 +34,7 @@ const LOG_TARGET: &str = "parachain::validator-discovery";
/// An abstraction over the authority discovery service.
#[async_trait]
pub trait AuthorityDiscovery: Send + 'static {
pub trait AuthorityDiscovery: Send + Clone + 'static {
/// Get the addresses for the given [`AuthorityId`] from the local address cache.
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>>;
/// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache.
......@@ -307,16 +307,15 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
/// Should be called when a peer connected.
#[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn on_peer_connected(
&mut self,
peer_id: PeerId,
peer_set: PeerSet,
authority_discovery_service: &mut AD,
maybe_authority: Option<AuthorityDiscoveryId>,
) {
let state = &mut self.state[peer_set];
// check if it's an authority we've been waiting for
let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await;
if let Some(authority) = maybe_authority {
for request in state.non_revoked_discovery_requests.iter_mut() {
let _ = request.on_authority_connected(&authority, &peer_id);
......@@ -359,7 +358,7 @@ mod tests {
peers_set: HashSet<Multiaddr>,
}
#[derive(Default)]
#[derive(Default, Clone)]
struct TestAuthorityDiscovery {
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
......@@ -469,7 +468,8 @@ mod tests {
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
let (sender, mut receiver) = mpsc::channel(2);
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let _ = service.on_request(
req1,
......@@ -509,12 +509,14 @@ mod tests {
).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let reply1 = receiver.next().await.unwrap();
assert_eq!(reply1.0, authority_ids[0]);
assert_eq!(reply1.1, peer_ids[0]);
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let reply2 = receiver.next().await.unwrap();
assert_eq!(reply2.0, authority_ids[1]);
assert_eq!(reply2.1, peer_ids[1]);
......@@ -534,8 +536,10 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone()],
......@@ -580,8 +584,10 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone(), authority_ids[2].clone()],
......@@ -645,7 +651,8 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(validator_peer_id).await;
service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, maybe_authority).await;
let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into()));
ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone());
......
......@@ -791,7 +791,7 @@ async fn handle_network_msg(
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(peer_id, observed_role) => {
PeerConnected(peer_id, observed_role, _) => {
// If it is possible that a disconnected validator would attempt a reconnect
// it should be handled here.
tracing::trace!(
......@@ -1343,6 +1343,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer.clone(),
polkadot_node_network_protocol::ObservedRole::Authority,
None,
),
),
).await;
......
......@@ -873,7 +873,7 @@ where
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(peer_id, _role) => {
PeerConnected(peer_id, _role, _) => {
state.peer_data.entry(peer_id).or_default();
state.metrics.note_collator_peer_count(state.peer_data.len());
},
......@@ -1469,6 +1469,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1543,6 +1544,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1553,6 +1555,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_c,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1636,6 +1639,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1704,6 +1708,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1714,6 +1719,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_c,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1918,6 +1924,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......@@ -2012,6 +2019,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......@@ -2153,6 +2161,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......@@ -2199,6 +2208,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......@@ -2270,6 +2280,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......
......@@ -154,7 +154,7 @@ impl Protocol {
name: p_name,
max_request_size: 1_000,
// Available data size is dominated code size.
// + 1000 to account for protocol overhead (should be way less).
// + 1000 to account for protocol overhead (should be way less).
max_response_size: MAX_CODE_SIZE as u64 + 1000,
// We need statement fetching to be fast and will try our best at the responding
// side to answer requests within that timeout, assuming a bandwidth of 500Mbit/s
......@@ -199,7 +199,7 @@ impl Protocol {
// waisting precious time.
let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10;
let size = u64::saturating_sub(
STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64),
STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64),
MAX_PARALLEL_STATEMENT_REQUESTS as u64
);
debug_assert!(
......
......@@ -1437,7 +1437,7 @@ async fn handle_network_update(
metrics: &Metrics,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, role) => {
NetworkBridgeEvent::PeerConnected(peer, role, _) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
......@@ -2667,13 +2667,13 @@ mod tests {
// notify of peers and view
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None)
)
}).await;
......@@ -2835,23 +2835,23 @@ mod tests {
// notify of peers and view
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full, None)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None)
)
}).await;
......
......@@ -16,6 +16,7 @@ rand = "0.8.3"
streamunordered = "0.5.1"
thiserror = "1.0.23"
tracing = "0.1.25"
lru = "0.6.5"
polkadot-node-primitives = { path = "../primitives" }