Unverified Commit 2e8b05ed authored by Ashley's avatar Ashley
Browse files

Box pin changes

parent 08bfdf7f
......@@ -38,7 +38,7 @@ use client::{
BlockchainEvents, BlockBody,
};
use sp_api::ApiExt;
use std::pin::Pin;
use log::warn;
use std::sync::Arc;
......@@ -107,7 +107,7 @@ pub trait ProvideGossipMessages {
fn gossip_messages_for(
&self,
topic: Hash,
) -> Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send + Unpin>;
) -> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>;
/// Gossip an erasure chunk message.
fn gossip_erasure_chunk(
......
......@@ -37,8 +37,7 @@ use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
};
use futures::channel::{mpsc, oneshot};
use futures::{prelude::*, future::select};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::SpawnExt};
use keystore::KeyStorePtr;
use tokio::runtime::{Handle, Runtime as LocalRuntime};
......
......@@ -149,8 +149,12 @@ pub fn run<W>(worker: W, version: cli::VersionInfo) -> error::Result<()> where
cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config|
Ok(service::new_chain_ops(config)?), load_spec),
cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => {
#[cfg(not(target_os = "unknown"))]
service::run_validation_worker(&args.mem_id)?;
if cfg!(feature = "browser") {
error!("Cannot run validation worker in browser");
} else {
#[cfg(not(feature = "browser"))]
service::run_validation_worker(&args.mem_id)?;
}
Ok(())
}
}
......
......@@ -115,7 +115,7 @@ impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
fn gossip_messages_for(&self, topic: Hash)
-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
{
Box::new(self.0.gossip_messages_for(topic)
self.0.gossip_messages_for(topic)
.filter_map(|(msg, _)| async move {
match msg {
GossipMessage::ErasureChunk(chunk) => {
......@@ -125,7 +125,6 @@ impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
}
})
.boxed()
)
}
fn gossip_erasure_chunk(
......@@ -167,7 +166,7 @@ impl NetworkService for PolkadotNetworkService {
Err(_) => mpsc::unbounded().1, // return empty channel.
};
GossipMessageStream::new(Box::new(topic_stream))
GossipMessageStream::new(topic_stream.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
......@@ -215,7 +214,7 @@ pub struct GossipMessageStream {
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: Box<dyn Stream<Item = TopicNotification> + Unpin + Send>) -> Self {
pub fn new(topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>) -> Self {
Self {
topic_stream,
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment