Unverified Commit fff63cb5 authored by Robert Klotzner's avatar Robert Klotzner Committed by GitHub
Browse files

Infrastructure improvements (#2897)

* Factor out runtime module into utils.

* Add maybe_authority information to `PeerConnected` event.

We already gather this information in authority discovery, so we might
as well share it with others.

This opens up an easy path to trigger validators differently from normal
nodes, e.g. for prioritization. This change has become more important
now, that we just connect to all validators and therefore just have a
long peer list without any information about those nodes.

* Test fix.
parent 3e1951c2
Pipeline #134946 failed with stages
in 19 minutes and 31 seconds
......@@ -6096,6 +6096,7 @@ dependencies = [
"futures 0.3.13",
"futures-timer 3.0.2",
"log",
"lru",
"metered-channel",
"parity-scale-codec",
"parking_lot 0.11.1",
......
......@@ -592,7 +592,7 @@ impl metrics::Metrics for Metrics {
#[cfg(test)]
mod tests {
use super::*;
use super::*;
use polkadot_node_subsystem_test_helpers as test_helpers;
use polkadot_primitives::v1::{HeadData, UpwardMessage};
use sp_core::testing::TaskExecutor;
......
......@@ -191,7 +191,7 @@ impl State {
event: NetworkBridgeEvent<protocol_v1::ApprovalDistributionMessage>,
) {
match event {
NetworkBridgeEvent::PeerConnected(peer_id, role) => {
NetworkBridgeEvent::PeerConnected(peer_id, role, _) => {
// insert a blank view if none already present
tracing::trace!(
target: LOG_TARGET,
......
......@@ -117,7 +117,7 @@ async fn setup_peer_with_view(
overseer_send(
virtual_overseer,
ApprovalDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_id.clone(), ObservedRole::Full, None)
)
).await;
overseer_send(
......
......@@ -18,12 +18,15 @@
//! Error handling related code and Error/Result definitions.
use polkadot_node_network_protocol::request_response::request::RequestError;
use polkadot_primitives::v1::SessionIndex;
use thiserror::Error;
use futures::channel::oneshot;
use polkadot_node_subsystem_util::Error as UtilError;
use polkadot_primitives::v1::SessionIndex;
use polkadot_node_subsystem_util::{
runtime,
Error as UtilError,
};
use polkadot_subsystem::{errors::RuntimeApiError, SubsystemError};
use crate::LOG_TARGET;
......@@ -74,10 +77,6 @@ pub enum Error {
#[error("Runtime API error")]
RuntimeRequest(RuntimeApiError),
/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),
/// Fetching PoV failed with `RequestError`.
#[error("FetchPoV request error")]
FetchPoV(#[source] RequestError),
......@@ -92,10 +91,24 @@ pub enum Error {
/// No validator with the index could be found in current session.
#[error("Given validator index could not be found")]
InvalidValidatorIndex,
/// We tried fetching a session info which was not available.
#[error("There was no session with the given index")]
NoSuchSession(SessionIndex),
/// Errors coming from runtime::Runtime.
#[error("Error while accessing runtime information")]
Runtime(#[source] runtime::Error),
}
pub type Result<T> = std::result::Result<T, Error>;
impl From<runtime::Error> for Error {
fn from(err: runtime::Error) -> Self {
Self::Runtime(err)
}
}
impl From<SubsystemError> for Error {
fn from(err: SubsystemError) -> Self {
Self::IncomingMessageChannel(err)
......
......@@ -28,9 +28,7 @@ mod error;
pub use error::Error;
use error::{Result, log_error};
/// Runtime requests.
mod runtime;
use runtime::Runtime;
use polkadot_node_subsystem_util::runtime::Runtime;
/// `Requester` taking care of requesting chunks for candidates pending availability.
mod requester;
......
......@@ -33,8 +33,9 @@ use polkadot_subsystem::{
ActiveLeavesUpdate, SubsystemContext, ActivatedLeaf,
messages::{AllMessages, NetworkBridgeMessage, IfDisconnected}
};
use polkadot_node_subsystem_util::runtime::{Runtime, ValidatorInfo};
use crate::{error::{Error, log_error}, runtime::{Runtime, ValidatorInfo}};
use crate::error::{Error, log_error};
/// Number of sessions we want to keep in the LRU.
const NUM_SESSIONS: usize = 2;
......@@ -274,7 +275,7 @@ mod tests {
let (mut context, mut virtual_overseer) =
test_helpers::make_subsystem_context::<AvailabilityDistributionMessage, TaskExecutor>(pool.clone());
let keystore = make_ferdie_keystore();
let mut runtime = crate::runtime::Runtime::new(keystore);
let mut runtime = polkadot_node_subsystem_util::runtime::Runtime::new(keystore);
let (tx, rx) = oneshot::channel();
let testee = async {
......
......@@ -508,7 +508,7 @@ where
let _timer = metrics.time_handle_network_msg();
match bridge_message {
NetworkBridgeEvent::PeerConnected(peerid, role) => {
NetworkBridgeEvent::PeerConnected(peerid, role, _) => {
tracing::trace!(
target: LOG_TARGET,
?peerid,
......@@ -1335,7 +1335,7 @@ mod test {
&mut ctx,
&mut state,
&Default::default(),
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None),
));
// make peer b interested
......
......@@ -36,7 +36,7 @@ use polkadot_subsystem::messages::{
NetworkBridgeMessage, AllMessages,
CollatorProtocolMessage, NetworkBridgeEvent,
};
use polkadot_primitives::v1::{Hash, BlockNumber};
use polkadot_primitives::v1::{Hash, BlockNumber, AuthorityDiscoveryId};
use polkadot_node_network_protocol::{
PeerId, peer_set::PeerSet, View, v1 as protocol_v1, OurView, UnifiedReputationChange as Rep,
ObservedRole,
......@@ -316,7 +316,7 @@ impl From<SubsystemError> for UnexpectedAbort {
// notifications to be passed through to the validator discovery worker.
enum ValidatorDiscoveryNotification {
PeerConnected(PeerId, PeerSet),
PeerConnected(PeerId, PeerSet, Option<AuthorityDiscoveryId>),
PeerDisconnected(PeerId, PeerSet),
}
......@@ -540,11 +540,11 @@ where
},
notification = validator_discovery_notifications.next().fuse() => match notification {
None => return Ok(()),
Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set)) => {
Some(ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_auth)) => {
validator_discovery.on_peer_connected(
peer.clone(),
peer_set,
&mut authority_discovery_service,
maybe_auth,
).await;
}
Some(ValidatorDiscoveryNotification::PeerDisconnected(peer, peer_set)) => {
......@@ -555,9 +555,10 @@ where
}
}
async fn handle_network_messages(
async fn handle_network_messages<AD: validator_discovery::AuthorityDiscovery>(
mut sender: impl SubsystemSender,
mut network_service: impl Network,
mut authority_discovery_service: AD,
mut request_multiplexer: RequestMultiplexer,
mut validator_discovery_notifications: mpsc::Sender<ValidatorDiscoveryNotification>,
metrics: Metrics,
......@@ -607,10 +608,14 @@ async fn handle_network_messages(
shared.local_view.clone().unwrap_or(View::default())
};
let maybe_authority =
authority_discovery_service
.get_authority_id_by_peer_id(peer).await;
// Failure here means that the other side of the network bridge
// has concluded and this future will be dropped in due course.
let _ = validator_discovery_notifications.send(
ValidatorDiscoveryNotification::PeerConnected(peer.clone(), peer_set)
ValidatorDiscoveryNotification::PeerConnected(peer, peer_set, maybe_authority.clone())
).await;
......@@ -618,7 +623,7 @@ async fn handle_network_messages(
PeerSet::Validation => {
dispatch_validation_events_to_all(
vec![
NetworkBridgeEvent::PeerConnected(peer.clone(), role),
NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority),
NetworkBridgeEvent::PeerViewChange(
peer.clone(),
View::default(),
......@@ -640,7 +645,7 @@ async fn handle_network_messages(
PeerSet::Collation => {
dispatch_collation_events_to_all(
vec![
NetworkBridgeEvent::PeerConnected(peer.clone(), role),
NetworkBridgeEvent::PeerConnected(peer.clone(), role, maybe_authority),
NetworkBridgeEvent::PeerViewChange(
peer.clone(),
View::default(),
......@@ -858,6 +863,7 @@ where
let (remote, network_event_handler) = handle_network_messages(
ctx.sender().clone(),
network_service.clone(),
authority_discovery_service.clone(),
request_multiplexer,
validation_worker_tx,
metrics.clone(),
......@@ -1191,6 +1197,7 @@ mod tests {
_req_configs: Vec<RequestResponseConfig>,
}
#[derive(Clone)]
struct TestAuthorityDiscovery;
// The test's view of the network. This receives updates from the subsystem in the form
......@@ -1796,7 +1803,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -1848,7 +1855,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -1920,7 +1927,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -1932,7 +1939,7 @@ mod tests {
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2004,7 +2011,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2016,7 +2023,7 @@ mod tests {
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2099,7 +2106,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2111,7 +2118,7 @@ mod tests {
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2259,7 +2266,7 @@ mod tests {
// bridge will inform about all connected peers.
{
assert_sends_validation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......@@ -2271,7 +2278,7 @@ mod tests {
{
assert_sends_collation_event_to_all(
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full),
NetworkBridgeEvent::PeerConnected(peer.clone(), ObservedRole::Full, None),
&mut virtual_overseer,
).await;
......
......@@ -34,7 +34,7 @@ const LOG_TARGET: &str = "parachain::validator-discovery";
/// An abstraction over the authority discovery service.
#[async_trait]
pub trait AuthorityDiscovery: Send + 'static {
pub trait AuthorityDiscovery: Send + Clone + 'static {
/// Get the addresses for the given [`AuthorityId`] from the local address cache.
async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option<Vec<Multiaddr>>;
/// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache.
......@@ -307,16 +307,15 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
}
/// Should be called when a peer connected.
#[tracing::instrument(level = "trace", skip(self, authority_discovery_service), fields(subsystem = LOG_TARGET))]
#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
pub async fn on_peer_connected(
&mut self,
peer_id: PeerId,
peer_set: PeerSet,
authority_discovery_service: &mut AD,
maybe_authority: Option<AuthorityDiscoveryId>,
) {
let state = &mut self.state[peer_set];
// check if it's an authority we've been waiting for
let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await;
if let Some(authority) = maybe_authority {
for request in state.non_revoked_discovery_requests.iter_mut() {
let _ = request.on_authority_connected(&authority, &peer_id);
......@@ -359,7 +358,7 @@ mod tests {
peers_set: HashSet<Multiaddr>,
}
#[derive(Default)]
#[derive(Default, Clone)]
struct TestAuthorityDiscovery {
by_authority_id: HashMap<AuthorityDiscoveryId, Multiaddr>,
by_peer_id: HashMap<PeerId, AuthorityDiscoveryId>,
......@@ -469,7 +468,8 @@ mod tests {
let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()];
let (sender, mut receiver) = mpsc::channel(2);
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let _ = service.on_request(
req1,
......@@ -509,12 +509,14 @@ mod tests {
).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let reply1 = receiver.next().await.unwrap();
assert_eq!(reply1.0, authority_ids[0]);
assert_eq!(reply1.1, peer_ids[0]);
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let reply2 = receiver.next().await.unwrap();
assert_eq!(reply2.0, authority_ids[1]);
assert_eq!(reply2.1, peer_ids[1]);
......@@ -534,8 +536,10 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone()],
......@@ -580,8 +584,10 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, &mut ads).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[0]).await;
service.on_peer_connected(peer_ids[0].clone(), PeerSet::Validation, maybe_authority).await;
let maybe_authority = ads.get_authority_id_by_peer_id(peer_ids[1]).await;
service.on_peer_connected(peer_ids[1].clone(), PeerSet::Validation, maybe_authority).await;
let (ns, ads) = service.on_request(
vec![authority_ids[0].clone(), authority_ids[2].clone()],
......@@ -645,7 +651,8 @@ mod tests {
futures::executor::block_on(async move {
let (sender, mut receiver) = mpsc::channel(1);
service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, &mut ads).await;
let maybe_authority = ads.get_authority_id_by_peer_id(validator_peer_id).await;
service.on_peer_connected(validator_peer_id.clone(), PeerSet::Validation, maybe_authority).await;
let address = known_multiaddr()[0].clone().with(Protocol::P2p(validator_peer_id.clone().into()));
ads.by_peer_id.insert(validator_peer_id.clone(), validator_id.clone());
......
......@@ -791,7 +791,7 @@ async fn handle_network_msg(
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(peer_id, observed_role) => {
PeerConnected(peer_id, observed_role, _) => {
// If it is possible that a disconnected validator would attempt a reconnect
// it should be handled here.
tracing::trace!(
......@@ -1343,6 +1343,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer.clone(),
polkadot_node_network_protocol::ObservedRole::Authority,
None,
),
),
).await;
......
......@@ -873,7 +873,7 @@ where
use NetworkBridgeEvent::*;
match bridge_message {
PeerConnected(peer_id, _role) => {
PeerConnected(peer_id, _role, _) => {
state.peer_data.entry(peer_id).or_default();
state.metrics.note_collator_peer_count(state.peer_data.len());
},
......@@ -1469,6 +1469,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1543,6 +1544,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1553,6 +1555,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_c,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1636,6 +1639,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1704,6 +1708,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1714,6 +1719,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_c,
ObservedRole::Full,
None,
),
)
).await;
......@@ -1918,6 +1924,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......@@ -2012,6 +2019,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......@@ -2153,6 +2161,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......@@ -2199,6 +2208,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......@@ -2270,6 +2280,7 @@ mod tests {
NetworkBridgeEvent::PeerConnected(
peer_b.clone(),
ObservedRole::Full,
None,
)
)
).await;
......
......@@ -154,7 +154,7 @@ impl Protocol {
name: p_name,
max_request_size: 1_000,
// Available data size is dominated code size.
// + 1000 to account for protocol overhead (should be way less).
// + 1000 to account for protocol overhead (should be way less).
max_response_size: MAX_CODE_SIZE as u64 + 1000,
// We need statement fetching to be fast and will try our best at the responding
// side to answer requests within that timeout, assuming a bandwidth of 500Mbit/s
......@@ -199,7 +199,7 @@ impl Protocol {
// waisting precious time.
let available_bandwidth = 7 * MIN_BANDWIDTH_BYTES / 10;
let size = u64::saturating_sub(
STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64),
STATEMENTS_TIMEOUT.as_millis() as u64 * available_bandwidth / (1000 * MAX_CODE_SIZE as u64),
MAX_PARALLEL_STATEMENT_REQUESTS as u64
);
debug_assert!(
......
......@@ -1437,7 +1437,7 @@ async fn handle_network_update(
metrics: &Metrics,
) {
match update {
NetworkBridgeEvent::PeerConnected(peer, role) => {
NetworkBridgeEvent::PeerConnected(peer, role, _) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
......@@ -2667,13 +2667,13 @@ mod tests {
// notify of peers and view
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None)
)
}).await;
......@@ -2835,23 +2835,23 @@ mod tests {
// notify of peers and view
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_a.clone(), ObservedRole::Full, None)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_b.clone(), ObservedRole::Full, None)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_c.clone(), ObservedRole::Full, None)
)
}).await;
handle.send(FromOverseer::Communication {
msg: StatementDistributionMessage::NetworkBridgeUpdateV1(
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full)
NetworkBridgeEvent::PeerConnected(peer_bad.clone(), ObservedRole::Full, None)
)
}).await;
......
......@@ -16,6 +16,7 @@ rand = "0.8.3"
streamunordered = "0.5.1"
thiserror = "1.0.23"