Unverified Commit fa1fea18 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

`ValidationNetwork` expose more functionality (#301)

* Expose `collator_id_to_peer_id`

* `ValidationNetwork` expose `checked_statements`

* Style nit
parent 5d4cce00
Pipeline #41371 passed with stages
in 8 minutes and 16 seconds
...@@ -216,6 +216,11 @@ impl CollatorPool { ...@@ -216,6 +216,11 @@ impl CollatorPool {
let now = Instant::now(); let now = Instant::now();
self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now)); self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
} }
/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
self.collators.get(collator_id).map(|ids| &ids.1)
}
} }
#[cfg(test)] #[cfg(test)]
......
...@@ -460,6 +460,11 @@ impl PolkadotProtocol { ...@@ -460,6 +460,11 @@ impl PolkadotProtocol {
} }
} }
} }
/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
self.collators.collator_id_to_peer_id(collator_id)
}
} }
impl Specialization<Block> for PolkadotProtocol { impl Specialization<Block> for PolkadotProtocol {
......
...@@ -28,8 +28,8 @@ use polkadot_validation::{ ...@@ -28,8 +28,8 @@ use polkadot_validation::{
SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated
}; };
use polkadot_primitives::{Block, Hash}; use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost, use polkadot_primitives::parachain::{
ValidatorIndex, Collation, PoVBlock, Extrinsic, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock,
}; };
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement}; use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement};
...@@ -51,6 +51,23 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { ...@@ -51,6 +51,23 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
BlakeTwo256::hash(&v[..]) BlakeTwo256::hash(&v[..])
} }
/// Create a `Stream` of checked statements.
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
impl Stream<Item=SignedStatement, Error=()> {
// spin up a task in the background that processes all incoming statements
// validation has been done already by the gossip validator.
// this will block internally until the gossip messages stream is obtained.
network.gossip_messages_for(topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => Some(s.signed_statement),
_ => None
})
}
/// Table routing implementation. /// Table routing implementation.
pub struct Router<P, E, N: NetworkService, T> { pub struct Router<P, E, N: NetworkService, T> {
table: Arc<SharedTable>, table: Arc<SharedTable>,
...@@ -76,21 +93,14 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> { ...@@ -76,21 +93,14 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
} }
} }
/// Return a future of checked messages. These should be imported into the router /// Return a `Stream` of checked messages. These should be imported into the router
/// with `import_statement`. /// with `import_statement`.
/// ///
/// The returned stream will not terminate, so it is required to make sure that the stream is /// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory /// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely. /// infinitely.
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> { pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> {
// spin up a task in the background that processes all incoming statements checked_statements(&**self.network(), self.attestation_topic)
// validation has been done already by the gossip validator.
// this will block internally until the gossip messages stream is obtained.
self.network().gossip_messages_for(self.attestation_topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => Some(s.signed_statement),
_ => None
})
} }
fn parent_hash(&self) -> Hash { fn parent_hash(&self) -> Hash {
...@@ -107,7 +117,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> { ...@@ -107,7 +117,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
Router { Router {
table: self.table.clone(), table: self.table.clone(),
fetcher: self.fetcher.clone(), fetcher: self.fetcher.clone(),
attestation_topic: self.attestation_topic.clone(), attestation_topic: self.attestation_topic,
deferred_statements: self.deferred_statements.clone(), deferred_statements: self.deferred_statements.clone(),
message_validator: self.message_validator.clone(), message_validator: self.message_validator.clone(),
} }
...@@ -177,7 +187,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w ...@@ -177,7 +187,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
let table = self.table.clone(); let table = self.table.clone();
let network = self.network().clone(); let network = self.network().clone();
let knowledge = self.fetcher.knowledge().clone(); let knowledge = self.fetcher.knowledge().clone();
let attestation_topic = self.attestation_topic.clone(); let attestation_topic = self.attestation_topic;
let parent_hash = self.parent_hash(); let parent_hash = self.parent_hash();
producer.prime(self.fetcher.api().clone()) producer.prime(self.fetcher.api().clone())
...@@ -232,7 +242,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh ...@@ -232,7 +242,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> { impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
fn drop(&mut self) { fn drop(&mut self) {
let parent_hash = self.parent_hash().clone(); let parent_hash = self.parent_hash();
self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); }); self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
} }
} }
......
...@@ -25,7 +25,9 @@ use substrate_network::{PeerId, Context as NetContext}; ...@@ -25,7 +25,9 @@ use substrate_network::{PeerId, Context as NetContext};
use substrate_network::consensus_gossip::{ use substrate_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage, self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
}; };
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement}; use polkadot_validation::{
Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement,
};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey}; use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{ use polkadot_primitives::parachain::{
Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId, Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId,
...@@ -286,6 +288,26 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where ...@@ -286,6 +288,26 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
rx rx
} }
/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
impl Future<Item=Option<PeerId>, Error=()> + Send
{
let (send, recv) = oneshot::channel();
self.network.with_spec(move |spec, _| {
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned());
});
recv.map_err(|_| ())
}
/// Create a `Stream` of checked statements for the given `relay_parent`.
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream<Item=SignedStatement, Error=()> {
crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent))
}
} }
/// A long-lived network which can create parachain statement routing processes on demand. /// A long-lived network which can create parachain statement routing processes on demand.
...@@ -305,7 +327,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where ...@@ -305,7 +327,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
table: Arc<SharedTable>, table: Arc<SharedTable>,
authorities: &[ValidatorId], authorities: &[ValidatorId],
) -> Self::BuildTableRouter { ) -> Self::BuildTableRouter {
let parent_hash = table.consensus_parent_hash().clone(); let parent_hash = *table.consensus_parent_hash();
let local_session_key = table.session_key(); let local_session_key = table.session_key();
let build_fetcher = self.instantiate_session(SessionParams { let build_fetcher = self.instantiate_session(SessionParams {
...@@ -343,7 +365,7 @@ pub struct NetworkDown; ...@@ -343,7 +365,7 @@ pub struct NetworkDown;
/// A future that resolves when a collation is received. /// A future that resolves when a collation is received.
pub struct AwaitingCollation { pub struct AwaitingCollation {
outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>, outer: futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>,
inner: Option<::futures::sync::oneshot::Receiver<Collation>> inner: Option<::futures::sync::oneshot::Receiver<Collation>>
} }
...@@ -576,7 +598,7 @@ impl LiveValidationSessions { ...@@ -576,7 +598,7 @@ impl LiveValidationSessions {
&mut self, &mut self,
params: SessionParams, params: SessionParams,
) -> (ValidationSession, Option<ValidatorId>) { ) -> (ValidationSession, Option<ValidatorId>) {
let parent_hash = params.parent_hash.clone(); let parent_hash = params.parent_hash;
let key = params.local_session_key.clone(); let key = params.local_session_key.clone();
let recent = &mut self.recent; let recent = &mut self.recent;
...@@ -703,7 +725,7 @@ pub struct SessionDataFetcher<P, E, N: NetworkService, T> { ...@@ -703,7 +725,7 @@ pub struct SessionDataFetcher<P, E, N: NetworkService, T> {
impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> { impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> {
/// Get the parent hash. /// Get the parent hash.
pub(crate) fn parent_hash(&self) -> Hash { pub(crate) fn parent_hash(&self) -> Hash {
self.parent_hash.clone() self.parent_hash
} }
/// Get the shared knowledge. /// Get the shared knowledge.
...@@ -738,7 +760,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E ...@@ -738,7 +760,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E
network: self.network.clone(), network: self.network.clone(),
api: self.api.clone(), api: self.api.clone(),
task_executor: self.task_executor.clone(), task_executor: self.task_executor.clone(),
parent_hash: self.parent_hash.clone(), parent_hash: self.parent_hash,
knowledge: self.knowledge.clone(), knowledge: self.knowledge.clone(),
exit: self.exit.clone(), exit: self.exit.clone(),
message_validator: self.message_validator.clone(), message_validator: self.message_validator.clone(),
...@@ -754,7 +776,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where ...@@ -754,7 +776,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
{ {
/// Fetch PoV block for the given candidate receipt. /// Fetch PoV block for the given candidate receipt.
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver { pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
let parachain = candidate.parachain_index.clone(); let parachain = candidate.parachain_index;
let parent_hash = self.parent_hash; let parent_hash = self.parent_hash;
let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain) let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment