Unverified Commit f0c517eb authored by Ashley's avatar Ashley
Browse files

Merge branch 'ashley-futures-updates' into ashley-futures-update

parents 938f411a 60e72111
......@@ -38,7 +38,7 @@ use client::{
BlockchainEvents, BlockBody,
};
use sp_api::ApiExt;
use std::pin::Pin;
use log::warn;
use std::sync::Arc;
......@@ -107,7 +107,7 @@ pub trait ProvideGossipMessages {
fn gossip_messages_for(
&self,
topic: Hash,
) -> Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send + Unpin>;
) -> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>;
/// Gossip an erasure chunk message.
fn gossip_erasure_chunk(
......
......@@ -37,8 +37,7 @@ use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
};
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, Sink, SinkExt, StreamExt, future::select, task::SpawnExt};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::SpawnExt};
use keystore::KeyStorePtr;
use tokio::runtime::{Handle, Runtime as LocalRuntime};
......
......@@ -149,9 +149,13 @@ pub fn run<W>(worker: W, version: cli::VersionInfo) -> error::Result<()> where
cli::ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config|
Ok(service::new_chain_ops(config)?), load_spec),
cli::ParseAndPrepare::CustomCommand(PolkadotSubCommands::ValidationWorker(args)) => {
#[cfg(not(target_os = "unknown"))]
service::run_validation_worker(&args.mem_id)?;
Ok(())
if cfg!(feature = "browser") {
Err(error::Error::Input("Cannot run validation worker in browser".into()))
} else {
#[cfg(not(feature = "browser"))]
service::run_validation_worker(&args.mem_id)?;
Ok(())
}
}
}
}
......
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// This file is part of Polkadot.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
......
......@@ -28,7 +28,6 @@ pub mod gossip;
use codec::{Decode, Encode};
use futures::channel::{oneshot, mpsc};
use futures::prelude::*;
use futures::future::Either;
use polkadot_primitives::{Block, Hash, Header};
use polkadot_primitives::parachain::{
Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock,
......@@ -113,9 +112,9 @@ impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
where T: NetworkService
{
fn gossip_messages_for(&self, topic: Hash)
-> Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Unpin + Send>
-> Pin<Box<dyn Stream<Item = (Hash, Hash, ErasureChunk)> + Send>>
{
Box::new(self.0.gossip_messages_for(topic)
self.0.gossip_messages_for(topic)
.filter_map(|(msg, _)| async move {
match msg {
GossipMessage::ErasureChunk(chunk) => {
......@@ -125,7 +124,6 @@ impl<T> av_store::ProvideGossipMessages for AvailabilityNetworkShim<T>
}
})
.boxed()
)
}
fn gossip_erasure_chunk(
......@@ -167,7 +165,7 @@ impl NetworkService for PolkadotNetworkService {
Err(_) => mpsc::unbounded().1, // return empty channel.
};
GossipMessageStream::new(Box::new(topic_stream))
GossipMessageStream::new(topic_stream.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
......@@ -210,12 +208,12 @@ impl GossipService for consensus_gossip::ConsensusGossip<Block> {
/// A stream of gossip messages and an optional sender for a topic.
pub struct GossipMessageStream {
topic_stream: Box<dyn Stream<Item = TopicNotification> + Unpin + Send>,
topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>,
}
impl GossipMessageStream {
/// Create a new instance with the given topic stream.
pub fn new(topic_stream: Box<dyn Stream<Item = TopicNotification> + Unpin + Send>) -> Self {
pub fn new(topic_stream: Pin<Box<dyn Stream<Item = TopicNotification> + Send>>) -> Self {
Self {
topic_stream,
}
......@@ -827,34 +825,26 @@ impl PolkadotProtocol {
/// This should be called by a collator intending to get the locally-collated
/// block into the hands of validators.
/// It also places the outgoing message and block data in the local availability store.
pub fn add_local_collation(
pub async fn add_local_collation(
&mut self,
ctx: &mut dyn Context<Block>,
relay_parent: Hash,
targets: HashSet<ValidatorId>,
collation: Collation,
outgoing_targeted: OutgoingMessages,
) -> impl futures::future::Future<Output = ()> {
) {
debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}",
relay_parent, collation.info.parachain_index);
let res = match self.availability_store {
Some(ref availability_store) => {
let availability_store_cloned = availability_store.clone();
let collation_cloned = collation.clone();
Either::Left((async move {
let _ = availability_store_cloned.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
}
)
.boxed()
)
}
None => Either::Right(futures::future::ready(())),
if let Some(ref availability_store) = self.availability_store {
let availability_store_cloned = availability_store.clone();
let collation_cloned = collation.clone();
let _ = availability_store_cloned.make_available(av_store::Data {
relay_parent,
parachain_id: collation_cloned.info.parachain_index,
block_data: collation_cloned.pov.block_data.clone(),
outgoing_queues: Some(outgoing_targeted.clone().into()),
}).await;
};
for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) {
......@@ -871,8 +861,6 @@ impl PolkadotProtocol {
warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary),
}
}
res
}
/// Give the network protocol a handle to an availability store, used for
......
......@@ -175,7 +175,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion");
let work = select(work, self.fetcher.exit().clone())
let work = select(work.boxed(), self.fetcher.exit().clone())
.map(drop);
let _ = self.fetcher.executor().spawn(work);
}
......@@ -193,35 +193,35 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
let knowledge = self.fetcher.knowledge().clone();
let attestation_topic = self.attestation_topic;
let parent_hash = self.parent_hash();
producer.prime(self.fetcher.api().clone())
.validate()
.boxed()
.map_ok(move |validated| {
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
Some(validated.0.pov_block().clone()),
validated.0.outgoing_messages().cloned(),
);
// propagate the statement.
// consider something more targeted than gossip in the future.
let statement = GossipStatement::new(
parent_hash,
match table.import_validated(validated.0) {
None => return,
Some(s) => s,
}
);
network.gossip_message(attestation_topic, statement.into());
})
.map(|res| {
if let Err(e) = res {
debug!(target: "p_net", "Failed to produce statements: {:?}", e);
let api = self.fetcher.api().clone();
async move {
match producer.prime(api).validate().await {
Ok(validated) => {
// store the data before broadcasting statements, so other peers can fetch.
knowledge.lock().note_candidate(
candidate_hash,
Some(validated.0.pov_block().clone()),
validated.0.outgoing_messages().cloned(),
);
// propagate the statement.
// consider something more targeted than gossip in the future.
let statement = GossipStatement::new(
parent_hash,
match table.import_validated(validated.0) {
None => return,
Some(s) => s,
}
);
network.gossip_message(attestation_topic, statement.into());
}
Err(err) => {
debug!(target: "p_net", "Failed to produce statements: {:?}", err);
}
})
}
}
}
}
......
......@@ -150,7 +150,7 @@ impl NetworkService for TestNetwork {
fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream {
let (tx, rx) = mpsc::unbounded();
let _ = self.gossip.send_listener.unbounded_send((topic, tx));
GossipMessageStream::new(Box::new(rx))
GossipMessageStream::new(rx.boxed())
}
fn gossip_message(&self, topic: Hash, message: GossipMessage) {
......@@ -419,8 +419,8 @@ impl av_store::ProvideGossipMessages for DummyGossipMessages {
fn gossip_messages_for(
&self,
_topic: Hash
) -> Box<dyn futures::Stream<Item = (Hash, Hash, ErasureChunk)> + Send + Unpin> {
Box::new(stream::empty())
) -> Pin<Box<dyn futures::Stream<Item = (Hash, Hash, ErasureChunk)> + Send>> {
stream::empty().boxed()
}
fn gossip_erasure_chunk(
......
......@@ -158,14 +158,13 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
impl Future<Output=Option<PeerId>> + Send
{
pub async fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> Option<PeerId> {
let (send, recv) = oneshot::channel();
self.network.with_spec(move |spec, _| {
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned());
});
recv.map(|res| res.unwrap_or(None))
recv.map(|res| res.unwrap_or(None)).await
}
/// Create a `Stream` of checked statements for the given `relay_parent`.
......
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// This file is part of Polkadot.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
......
// Copyright 2017-2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// This file is part of Polkadot.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
......
......@@ -126,7 +126,7 @@ type WinnersData<T> =
// This module's storage items.
decl_storage! {
trait Store for Module<T: Trait> as Slots {
/// The number of auctions that been started so far.
/// The number of auctions that have been started so far.
pub AuctionCounter get(auction_counter): AuctionIndex;
/// Ordered list of all `ParaId` values that are managed by this module. This includes
......
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// This file is part of Polkadot.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
......
// Copyright 2019 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// This file is part of Polkadot.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment