diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 63c96aec091ced74f9b8f5217931d8501fbb33ee..9a41d3066af75cf0637f85b33ab98497054c9527 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -216,6 +216,11 @@ impl CollatorPool { let now = Instant::now(); self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now)); } + + /// Convert the given `CollatorId` to a `PeerId`. + pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> { + self.collators.get(collator_id).map(|ids| &ids.1) + } } #[cfg(test)] diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 1606778e994e101a8dbb2a54a73d22a6a2c06f6f..5bf7635db0d68cfb85f4323ae4817a2fe6cb311f 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -460,6 +460,11 @@ impl PolkadotProtocol { } } } + + /// Convert the given `CollatorId` to a `PeerId`. + pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> { + self.collators.collator_id_to_peer_id(collator_id) + } } impl Specialization<Block> for PolkadotProtocol { diff --git a/polkadot/network/src/router.rs b/polkadot/network/src/router.rs index 925df7d2d4a1fbeb936d56246d88d07b5c888820..1e15524239a1c07959e5f818b0a2f81445a6f15f 100644 --- a/polkadot/network/src/router.rs +++ b/polkadot/network/src/router.rs @@ -28,8 +28,8 @@ use polkadot_validation::{ SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated }; use polkadot_primitives::{Block, Hash}; -use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost, - ValidatorIndex, Collation, PoVBlock, +use polkadot_primitives::parachain::{ + Extrinsic, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, }; use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement}; @@ -51,6 +51,23 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { BlakeTwo256::hash(&v[..]) } +/// Create a `Stream` of checked statements. +/// +/// The returned stream will not terminate, so it is required to make sure that the stream is +/// dropped when it is not required anymore. Otherwise, it will stick around in memory +/// infinitely. +pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) -> + impl Stream<Item=SignedStatement, Error=()> { + // spin up a task in the background that processes all incoming statements + // validation has been done already by the gossip validator. + // this will block internally until the gossip messages stream is obtained. + network.gossip_messages_for(topic) + .filter_map(|msg| match msg.0 { + GossipMessage::Statement(s) => Some(s.signed_statement), + _ => None + }) +} + /// Table routing implementation. pub struct Router<P, E, N: NetworkService, T> { table: Arc<SharedTable>, @@ -76,21 +93,14 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> { } } - /// Return a future of checked messages. These should be imported into the router + /// Return a `Stream` of checked messages. These should be imported into the router /// with `import_statement`. /// /// The returned stream will not terminate, so it is required to make sure that the stream is /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> { - // spin up a task in the background that processes all incoming statements - // validation has been done already by the gossip validator. - // this will block internally until the gossip messages stream is obtained. - self.network().gossip_messages_for(self.attestation_topic) - .filter_map(|msg| match msg.0 { - GossipMessage::Statement(s) => Some(s.signed_statement), - _ => None - }) + checked_statements(&**self.network(), self.attestation_topic) } fn parent_hash(&self) -> Hash { @@ -107,7 +117,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> { Router { table: self.table.clone(), fetcher: self.fetcher.clone(), - attestation_topic: self.attestation_topic.clone(), + attestation_topic: self.attestation_topic, deferred_statements: self.deferred_statements.clone(), message_validator: self.message_validator.clone(), } @@ -177,7 +187,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w let table = self.table.clone(); let network = self.network().clone(); let knowledge = self.fetcher.knowledge().clone(); - let attestation_topic = self.attestation_topic.clone(); + let attestation_topic = self.attestation_topic; let parent_hash = self.parent_hash(); producer.prime(self.fetcher.api().clone()) @@ -232,7 +242,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> { fn drop(&mut self) { - let parent_hash = self.parent_hash().clone(); + let parent_hash = self.parent_hash(); self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); }); } } diff --git a/polkadot/network/src/validation.rs b/polkadot/network/src/validation.rs index fabf58a3bd51bbfd1c102e1712399878c482d1c7..8150b34411244d875256c23517582a72d4ab5693 100644 --- a/polkadot/network/src/validation.rs +++ b/polkadot/network/src/validation.rs @@ -25,7 +25,9 @@ use substrate_network::{PeerId, Context as NetContext}; use substrate_network::consensus_gossip::{ self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage, }; -use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement}; +use polkadot_validation::{ + Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement, +}; use polkadot_primitives::{Block, BlockId, Hash, SessionKey}; use polkadot_primitives::parachain::{ Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId, @@ -286,6 +288,26 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where rx } + + /// Convert the given `CollatorId` to a `PeerId`. + pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> + impl Future<Item=Option<PeerId>, Error=()> + Send + { + let (send, recv) = oneshot::channel(); + self.network.with_spec(move |spec, _| { + let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned()); + }); + recv.map_err(|_| ()) + } + + /// Create a `Stream` of checked statements for the given `relay_parent`. + /// + /// The returned stream will not terminate, so it is required to make sure that the stream is + /// dropped when it is not required anymore. Otherwise, it will stick around in memory + /// infinitely. + pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream<Item=SignedStatement, Error=()> { + crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent)) + } } /// A long-lived network which can create parachain statement routing processes on demand. @@ -305,7 +327,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where table: Arc<SharedTable>, authorities: &[ValidatorId], ) -> Self::BuildTableRouter { - let parent_hash = table.consensus_parent_hash().clone(); + let parent_hash = *table.consensus_parent_hash(); let local_session_key = table.session_key(); let build_fetcher = self.instantiate_session(SessionParams { @@ -343,7 +365,7 @@ pub struct NetworkDown; /// A future that resolves when a collation is received. pub struct AwaitingCollation { - outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>, + outer: futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>, inner: Option<::futures::sync::oneshot::Receiver<Collation>> } @@ -576,7 +598,7 @@ impl LiveValidationSessions { &mut self, params: SessionParams, ) -> (ValidationSession, Option<ValidatorId>) { - let parent_hash = params.parent_hash.clone(); + let parent_hash = params.parent_hash; let key = params.local_session_key.clone(); let recent = &mut self.recent; @@ -703,7 +725,7 @@ pub struct SessionDataFetcher<P, E, N: NetworkService, T> { impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> { /// Get the parent hash. pub(crate) fn parent_hash(&self) -> Hash { - self.parent_hash.clone() + self.parent_hash } /// Get the shared knowledge. @@ -738,7 +760,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E network: self.network.clone(), api: self.api.clone(), task_executor: self.task_executor.clone(), - parent_hash: self.parent_hash.clone(), + parent_hash: self.parent_hash, knowledge: self.knowledge.clone(), exit: self.exit.clone(), message_validator: self.message_validator.clone(), @@ -754,7 +776,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where { /// Fetch PoV block for the given candidate receipt. pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver { - let parachain = candidate.parachain_index.clone(); + let parachain = candidate.parachain_index; let parent_hash = self.parent_hash; let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)