Unverified Commit f98966ac authored by Ashley's avatar Ashley
Browse files

Add async blocks back in

parent 7fa88af0
...@@ -79,7 +79,7 @@ const COLLATION_TIMEOUT: Duration = Duration::from_secs(30); ...@@ -79,7 +79,7 @@ const COLLATION_TIMEOUT: Duration = Duration::from_secs(30);
pub trait Network: Send + Sync { pub trait Network: Send + Sync {
/// Convert the given `CollatorId` to a `PeerId`. /// Convert the given `CollatorId` to a `PeerId`.
fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Output=Option<PeerId>> + Unpin + Send>; Box<dyn Future<Output=Option<PeerId>> + Send>;
/// Create a `Stream` of checked statements for the given `relay_parent`. /// Create a `Stream` of checked statements for the given `relay_parent`.
/// ///
...@@ -95,7 +95,7 @@ impl<P, E, SP> Network for ValidationNetwork<P, E, PolkadotNetworkService, SP> w ...@@ -95,7 +95,7 @@ impl<P, E, SP> Network for ValidationNetwork<P, E, PolkadotNetworkService, SP> w
SP: 'static + Spawn + Clone + Send + Sync, SP: 'static + Spawn + Clone + Send + Sync,
{ {
fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
Box<dyn Future<Output=Option<PeerId>> + Unpin + Send> Box<dyn Future<Output=Option<PeerId>> + Send>
{ {
Box::new(Self::collator_id_to_peer_id(self, collator_id)) Box::new(Self::collator_id_to_peer_id(self, collator_id))
} }
......
...@@ -175,7 +175,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w ...@@ -175,7 +175,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion"); trace!(target: "validation", "driving statement work to completion");
let work = select(work, self.fetcher.exit().clone()) let work = select(work.boxed(), self.fetcher.exit().clone())
.map(drop); .map(drop);
let _ = self.fetcher.executor().spawn(work); let _ = self.fetcher.executor().spawn(work);
} }
...@@ -193,35 +193,35 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w ...@@ -193,35 +193,35 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
let knowledge = self.fetcher.knowledge().clone(); let knowledge = self.fetcher.knowledge().clone();
let attestation_topic = self.attestation_topic; let attestation_topic = self.attestation_topic;
let parent_hash = self.parent_hash(); let parent_hash = self.parent_hash();
let api = self.fetcher.api().clone();
producer.prime(self.fetcher.api().clone()) async move {
.validate() match producer.prime(api).validate().await {
.boxed() Ok(validated) => {
.map_ok(move |validated| { // store the data before broadcasting statements, so other peers can fetch.
// store the data before broadcasting statements, so other peers can fetch. knowledge.lock().note_candidate(
knowledge.lock().note_candidate(
candidate_hash, candidate_hash,
Some(validated.0.pov_block().clone()), Some(validated.0.pov_block().clone()),
validated.0.outgoing_messages().cloned(), validated.0.outgoing_messages().cloned(),
); );
// propagate the statement. // propagate the statement.
// consider something more targeted than gossip in the future. // consider something more targeted than gossip in the future.
let statement = GossipStatement::new( let statement = GossipStatement::new(
parent_hash, parent_hash,
match table.import_validated(validated.0) { match table.import_validated(validated.0) {
None => return, None => return,
Some(s) => s, Some(s) => s,
} }
); );
network.gossip_message(attestation_topic, statement.into()); network.gossip_message(attestation_topic, statement.into());
}) },
.map(|res| { Err(err) => {
if let Err(e) = res { debug!(target: "p_net", "Failed to produce statements: {:?}", err);
debug!(target: "p_net", "Failed to produce statements: {:?}", e);
} }
}) }
}
} }
} }
......
...@@ -150,7 +150,7 @@ impl NetworkService for TestNetwork { ...@@ -150,7 +150,7 @@ impl NetworkService for TestNetwork {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
let _ = self.gossip.send_listener.unbounded_send((topic, tx)); let _ = self.gossip.send_listener.unbounded_send((topic, tx));
GossipMessageStream::new(Box::new(rx)) GossipMessageStream::new(rx.boxed())
} }
fn gossip_message(&self, topic: Hash, message: GossipMessage) { fn gossip_message(&self, topic: Hash, message: GossipMessage) {
...@@ -419,8 +419,8 @@ impl av_store::ProvideGossipMessages for DummyGossipMessages { ...@@ -419,8 +419,8 @@ impl av_store::ProvideGossipMessages for DummyGossipMessages {
fn gossip_messages_for( fn gossip_messages_for(
&self, &self,
_topic: Hash _topic: Hash
) -> Box<dyn futures::Stream<Item = (Hash, Hash, ErasureChunk)> + Send + Unpin> { ) -> Pin<Box<dyn futures::Stream<Item = (Hash, Hash, ErasureChunk)> + Send>> {
Box::new(stream::empty()) stream::empty().boxed()
} }
fn gossip_erasure_chunk( fn gossip_erasure_chunk(
......
...@@ -161,11 +161,15 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService { ...@@ -161,11 +161,15 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
impl Future<Output=Option<PeerId>> + Send impl Future<Output=Option<PeerId>> + Send
{ {
let (send, recv) = oneshot::channel(); let network = self.network.clone();
self.network.with_spec(move |spec, _| {
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned()); async move {
}); let (send, recv) = oneshot::channel();
recv.map(|res| res.unwrap_or(None)) network.with_spec(move |spec, _| {
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned());
});
recv.await.ok().and_then(|opt| opt)
}
} }
/// Create a `Stream` of checked statements for the given `relay_parent`. /// Create a `Stream` of checked statements for the given `relay_parent`.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment