Unverified Commit d9b4fc45 authored by Ashley's avatar Ashley Committed by GitHub
Browse files

Strip out old XCMP primitives (#823)

* WIP

* WIp

* Mostly get tests to compile

* Fix adder collator

* Remove more stuff

* Revert some changes to av store

* Fix av store tests

* Nitpicks

* Restore some things

* Small changes

* Remvoe unused error variants
parent 96f5dc51
Pipeline #79028 passed with stages
in 19 minutes and 52 seconds
......@@ -28,8 +28,7 @@ use keystore::KeyStorePtr;
use polkadot_primitives::{
Hash, Block,
parachain::{
Id as ParaId, BlockData, CandidateReceipt, Message, AvailableMessages, ErasureChunk,
ParachainHost,
Id as ParaId, BlockData, CandidateReceipt, ErasureChunk, ParachainHost
},
};
use sp_runtime::traits::{BlakeTwo256, Hash as HashT, HasherFor};
......@@ -126,10 +125,6 @@ pub struct Data {
pub parachain_id: ParaId,
/// Block data.
pub block_data: BlockData,
/// Outgoing message queues from execution of the block, if any.
///
/// The tuple pairs the message queue root and the queue data.
pub outgoing_queues: Option<AvailableMessages>,
}
/// Handle to the availability store.
......@@ -384,9 +379,4 @@ impl Store {
{
self.inner.block_data_by_candidate(relay_parent, candidate_hash)
}
/// Query message queue data by message queue root hash.
pub fn queue_by_root(&self, queue_root: &Hash) -> Option<Vec<Message>> {
self.inner.queue_by_root(queue_root)
}
}
......@@ -21,9 +21,7 @@ use codec::{Encode, Decode};
use polkadot_erasure_coding::{self as erasure};
use polkadot_primitives::{
Hash,
parachain::{
BlockData, CandidateReceipt, Message, ErasureChunk
},
parachain::{BlockData, CandidateReceipt, ErasureChunk},
};
use log::{trace, warn};
......@@ -130,18 +128,6 @@ impl Store {
data.block_data.encode()
);
if let Some(outgoing_queues) = data.outgoing_queues {
// This is kept forever and not pruned.
for (root, messages) in outgoing_queues.0 {
tx.put_vec(
columns::DATA,
root.as_ref(),
messages.encode(),
);
}
}
self.inner.write(tx)
}
......@@ -287,14 +273,13 @@ impl Store {
columns::DATA,
&block_data_key(&relay_parent, &receipt.block_data_hash)
) {
if let Ok((block_data, outgoing_queues)) = erasure::reconstruct(
if let Ok(block_data) = erasure::reconstruct(
n_validators as usize,
v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize))) {
self.make_available(Data {
relay_parent: *relay_parent,
parachain_id: receipt.parachain_index,
block_data,
outgoing_queues,
})?;
}
}
......@@ -387,11 +372,6 @@ impl Store {
})
}
/// Query message queue data by message queue root hash.
pub fn queue_by_root(&self, queue_root: &Hash) -> Option<Vec<Message>> {
self.query_inner(columns::DATA, queue_root.as_ref())
}
fn block_hash_to_candidate_hash(&self, block_hash: Hash) -> Option<Hash> {
self.query_inner(columns::META, &block_to_candidate_key(&block_hash))
}
......@@ -414,8 +394,8 @@ impl Store {
#[cfg(test)]
mod tests {
use super::*;
use polkadot_erasure_coding::{self as erasure};
use polkadot_primitives::parachain::{Id as ParaId, AvailableMessages};
use polkadot_erasure_coding as erasure;
use polkadot_primitives::parachain::Id as ParaId;
#[test]
fn finalization_removes_unneeded() {
......@@ -444,14 +424,12 @@ mod tests {
relay_parent,
parachain_id: para_id_1,
block_data: block_data_1.clone(),
outgoing_queues: None,
}).unwrap();
store.make_available(Data {
relay_parent,
parachain_id: para_id_2,
block_data: block_data_2.clone(),
outgoing_queues: None,
}).unwrap();
let candidate_1 = CandidateReceipt {
......@@ -460,7 +438,6 @@ mod tests {
signature: Default::default(),
head_data: Default::default(),
parent_head: Default::default(),
egress_queue_roots: Vec::new(),
fees: 0,
block_data_hash: block_data_1.hash(),
upward_messages: Vec::new(),
......@@ -473,7 +450,6 @@ mod tests {
signature: Default::default(),
head_data: Default::default(),
parent_head: Default::default(),
egress_queue_roots: Vec::new(),
fees: 0,
block_data_hash: block_data_2.hash(),
upward_messages: Vec::new(),
......@@ -516,34 +492,12 @@ mod tests {
let para_id = 5.into();
let block_data = BlockData(vec![1, 2, 3]);
let message_queue_root_1 = [0x42; 32].into();
let message_queue_root_2 = [0x43; 32].into();
let message_a = Message(vec![1, 2, 3, 4]);
let message_b = Message(vec![4, 5, 6, 7]);
let outgoing_queues = AvailableMessages(vec![
(message_queue_root_1, vec![message_a.clone()]),
(message_queue_root_2, vec![message_b.clone()]),
]);
let store = Store::new_in_memory();
store.make_available(Data {
relay_parent,
parachain_id: para_id,
block_data: block_data.clone(),
outgoing_queues: Some(outgoing_queues),
block_data,
}).unwrap();
assert_eq!(
store.queue_by_root(&message_queue_root_1),
Some(vec![message_a]),
);
assert_eq!(
store.queue_by_root(&message_queue_root_2),
Some(vec![message_b]),
);
}
#[test]
......@@ -554,21 +508,10 @@ mod tests {
let block_data_hash = block_data.hash();
let n_validators = 5;
let message_queue_root_1 = [0x42; 32].into();
let message_queue_root_2 = [0x43; 32].into();
let message_a = Message(vec![1, 2, 3, 4]);
let message_b = Message(vec![5, 6, 7, 8]);
let outgoing_queues = Some(AvailableMessages(vec![
(message_queue_root_1, vec![message_a.clone()]),
(message_queue_root_2, vec![message_b.clone()]),
]));
let erasure_chunks = erasure::obtain_chunks(
n_validators,
&block_data,
outgoing_queues.as_ref()).unwrap();
).unwrap();
let branches = erasure::branches(erasure_chunks.as_ref());
......@@ -578,7 +521,6 @@ mod tests {
signature: Default::default(),
head_data: Default::default(),
parent_head: Default::default(),
egress_queue_roots: Vec::new(),
fees: 0,
block_data_hash: block_data.hash(),
upward_messages: Vec::new(),
......
......@@ -35,7 +35,7 @@ use consensus_common::{
use polkadot_primitives::{Block, BlockId, Hash};
use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
ValidatorPair, ErasureChunk, PoVBlock,
};
use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}};
use keystore::KeyStorePtr;
......@@ -90,7 +90,7 @@ pub(crate) struct ParachainBlocks {
/// The relay parent of the block these parachain blocks belong to.
pub relay_parent: Hash,
/// The blocks themselves.
pub blocks: Vec<(CandidateReceipt, Option<(BlockData, AvailableMessages)>)>,
pub blocks: Vec<(CandidateReceipt, Option<PoVBlock>)>,
/// A sender to signal the result asynchronously.
pub result: oneshot::Sender<Result<(), Error>>,
}
......@@ -367,7 +367,7 @@ where
runtime_handle: &Handle,
sender: &mut mpsc::UnboundedSender<WorkerMsg>,
relay_parent: Hash,
blocks: Vec<(CandidateReceipt, Option<(BlockData, AvailableMessages)>)>,
blocks: Vec<(CandidateReceipt, Option<PoVBlock>)>,
) -> Result<(), Error> {
let hashes: Vec<_> = blocks.iter().map(|(c, _)| c.hash()).collect();
......@@ -375,7 +375,7 @@ where
for (candidate, block) in blocks.into_iter() {
let _ = self.availability_store.add_candidate(&candidate);
if let Some((_block, _msgs)) = block {
if let Some(_block) = block {
// Should we be breaking block into chunks here and gossiping it and so on?
}
......
......@@ -48,7 +48,6 @@ use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn};
use log::warn;
......@@ -57,18 +56,17 @@ use sp_core::{Pair, Blake2Hasher};
use polkadot_primitives::{
BlockId, Hash, Block,
parachain::{
self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId,
OutgoingMessages, PoVBlock, Status as ParachainStatus, ValidatorId, CollatorPair,
self, BlockData, DutyRoster, HeadData, Id as ParaId,
PoVBlock, Status as ParachainStatus, ValidatorId, CollatorPair,
}
};
use polkadot_cli::{
ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama,
service::{self, Roles, SelectChain}
};
use polkadot_network::legacy::validation::{LeafWorkParams, ValidationNetwork};
use polkadot_network::legacy::validation::ValidationNetwork;
pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration};
pub use polkadot_network::legacy::validation::Incoming;
pub use polkadot_validation::SignedStatement;
pub use polkadot_primitives::parachain::CollatorId;
pub use sc_network::PeerId;
......@@ -111,14 +109,14 @@ pub struct InvalidHead;
/// Collation errors.
#[derive(Debug)]
pub enum Error<R> {
pub enum Error {
/// Error on the relay-chain side of things.
Polkadot(R),
Polkadot(String),
/// Error on the collator side of things.
Collator(InvalidHead),
}
impl<R: fmt::Display> fmt::Display for Error<R> {
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Error::Polkadot(ref err) => write!(f, "Polkadot node error: {}", err),
......@@ -162,65 +160,41 @@ pub trait BuildParachainContext {
/// This can be implemented through an externally attached service or a stub.
/// This is expected to be a lightweight, shared type like an Arc.
pub trait ParachainContext: Clone {
type ProduceCandidate: Future<Output = Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>>;
type ProduceCandidate: Future<Output = Result<(BlockData, HeadData), InvalidHead>>;
/// Produce a candidate, given the relay parent hash, the latest ingress queue information
/// and the last parachain head.
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
fn produce_candidate(
&mut self,
relay_parent: Hash,
status: ParachainStatus,
ingress: I,
) -> Self::ProduceCandidate;
}
/// Relay chain context needed to collate.
/// This encapsulates a network and local database which may store
/// some of the input.
pub trait RelayChainContext {
type Error: std::fmt::Debug;
/// Future that resolves to the un-routed egress queues of a parachain.
/// The first item is the oldest.
type FutureEgress: Future<Output = Result<ConsolidatedIngress, Self::Error>>;
/// Get un-routed egress queues from a parachain to the local parachain.
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress;
}
/// Produce a candidate for the parachain, with given contexts, parent head, and signing key.
pub async fn collate<R, P>(
pub async fn collate<P>(
relay_parent: Hash,
local_id: ParaId,
parachain_status: ParachainStatus,
relay_context: R,
mut para_context: P,
key: Arc<CollatorPair>,
)
-> Result<(parachain::Collation, OutgoingMessages), Error<R::Error>>
-> Result<parachain::Collation, Error>
where
R: RelayChainContext,
P: ParachainContext,
P::ProduceCandidate: Send,
{
let ingress = relay_context.unrouted_egress(local_id).await.map_err(Error::Polkadot)?;
let (block_data, head_data, mut outgoing) = para_context.produce_candidate(
let (block_data, head_data) = para_context.produce_candidate(
relay_parent,
parachain_status,
ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg)))
).map_err(Error::Collator).await?;
let block_data_hash = block_data.hash();
let signature = key.sign(block_data_hash.as_ref());
let egress_queue_roots =
polkadot_validation::egress_roots(&mut outgoing.outgoing_messages);
let info = parachain::CollationInfo {
parachain_index: local_id,
collator: key.public(),
signature,
egress_queue_roots,
head_data,
block_data_hash,
upward_messages: Vec::new(),
......@@ -230,47 +204,10 @@ pub async fn collate<R, P>(
info,
pov: PoVBlock {
block_data,
ingress,
},
};
Ok((collation, outgoing))
}
/// Polkadot-api context.
struct ApiContext<P, SP> {
network: Arc<ValidationNetwork<P, SP>>,
parent_hash: Hash,
validators: Vec<ValidatorId>,
}
impl<P: 'static, SP: 'static> RelayChainContext for ApiContext<P, SP> where
P: ProvideRuntimeApi<Block> + Send + Sync,
P::Api: ParachainHost<Block>,
SP: Spawn + Clone + Send + Sync
{
type Error = String;
type FutureEgress = Pin<Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Send>>;
fn unrouted_egress(&self, _id: ParaId) -> Self::FutureEgress {
let network = self.network.clone();
let parent_hash = self.parent_hash;
let authorities = self.validators.clone();
async move {
// TODO: https://github.com/paritytech/polkadot/issues/253
//
// Fetch ingress and accumulate all unrounted egress
let _session = network.instantiate_leaf_work(LeafWorkParams {
local_session_key: None,
parent_hash,
authorities,
})
.map_err(|e| format!("unable to instantiate validation session: {:?}", e));
Ok(ConsolidatedIngress(Vec::new()))
}.boxed()
}
Ok(collation)
}
/// Run the collator node using the given `service`.
......@@ -378,7 +315,6 @@ fn run_collator_node<S, P, Extrinsic>(
let client = client.clone();
let key = key.clone();
let parachain_context = parachain_context.clone();
let validation_network = validation_network.clone();
let work = future::lazy(move |_| {
let api = client.runtime_api();
......@@ -395,27 +331,19 @@ fn run_collator_node<S, P, Extrinsic>(
try_fr!(api.duty_roster(&id)),
);
let context = ApiContext {
network: validation_network,
parent_hash: relay_parent,
validators,
};
let collation_work = collate(
relay_parent,
para_id,
status,
context,
parachain_context,
key,
).map_ok(move |(collation, outgoing)| {
).map_ok(move |collation| {
network.with_spec(move |spec, ctx| {
let res = spec.add_local_collation(
ctx,
relay_parent,
targets,
collation,
outgoing,
);
tokio::spawn(res.boxed());
......@@ -514,104 +442,26 @@ pub fn run_collator<P>(
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use polkadot_primitives::parachain::{TargetedMessage, FeeSchedule};
use polkadot_primitives::parachain::FeeSchedule;
use keyring::Sr25519Keyring;
use super::*;
#[derive(Default, Clone)]
struct DummyRelayChainContext {
ingress: HashMap<ParaId, ConsolidatedIngress>
}
impl RelayChainContext for DummyRelayChainContext {
type Error = ();
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress,()>> + Unpin>;
fn unrouted_egress(&self, para_id: ParaId) -> Self::FutureEgress {
match self.ingress.get(&para_id) {
Some(ingress) => Box::new(future::ok(ingress.clone())),
None => Box::new(future::pending()),
}
}
}
#[derive(Clone)]
struct DummyParachainContext;
impl ParachainContext for DummyParachainContext {
type ProduceCandidate = future::Ready<Result<(BlockData, HeadData, OutgoingMessages), InvalidHead>>;
type ProduceCandidate = future::Ready<Result<(BlockData, HeadData), InvalidHead>>;
fn produce_candidate<I: IntoIterator<Item=(ParaId, Message)>>(
fn produce_candidate(
&mut self,
_relay_parent: Hash,
_status: ParachainStatus,
ingress: I,
) -> Self::ProduceCandidate {
// send messages right back.
future::ok((
BlockData(vec![1, 2, 3, 4, 5,]),
HeadData(vec![9, 9, 9]),
OutgoingMessages {
outgoing_messages: ingress.into_iter().map(|(id, msg)| TargetedMessage {
target: id,
data: msg.0,
}).collect(),
}
))
}
}
#[test]
fn collates_correct_queue_roots() {
let mut context = DummyRelayChainContext::default();
let id = ParaId::from(100);
let a = ParaId::from(123);
let b = ParaId::from(456);
let messages_from_a = vec![
Message(vec![1, 1, 1]),
Message(b"helloworld".to_vec()),
];
let messages_from_b = vec![
Message(b"dogglesworth".to_vec()),
Message(b"buy_1_chili_con_carne_here_is_my_cash".to_vec()),
];
let root_a = ::polkadot_validation::message_queue_root(
messages_from_a.iter().map(|msg| &msg.0)
);
let root_b = ::polkadot_validation::message_queue_root(
messages_from_b.iter().map(|msg| &msg.0)
);
context.ingress.insert(id, ConsolidatedIngress(vec![
(b, messages_from_b),
(a, messages_from_a),
]));
let future = collate(
Default::default(),
id,
ParachainStatus {
head_data: HeadData(vec![5]),
balance: 10,
fee_schedule: FeeSchedule {
base: 0,
per_byte: 1,
},
},
context.clone(),
DummyParachainContext,
Arc::new(Sr25519Keyring::Alice.pair().into()),
);
let collation = futures::executor::block_on(future).unwrap().0;
// ascending order by root.
assert_eq!(collation.info.egress_queue_roots, vec![(a, root_a), (b, root_b)]);
}
}
......@@ -27,7 +27,7 @@
use codec::{Encode, Decode};
use reed_solomon::galois_16::{self, ReedSolomon};
use primitives::{Hash as H256, BlakeTwo256, HashT};
use primitives::parachain::{BlockData, AvailableMessages};
use primitives::parachain::BlockData;
use sp_core::Blake2Hasher;
use trie::{EMPTY_PREFIX, MemoryDB, Trie, TrieMut, trie_types::{TrieDBMut, TrieDB}};
......@@ -125,11 +125,11 @@ fn code_params(n_validators: usize) -> Result<CodeParams, Error> {
/// Obtain erasure-coded chunks, one for each validator.
///
/// Works only up to 65536 validators, and `n_validators` must be non-zero.
pub fn obtain_chunks(n_validators: usize, block_data: &BlockData, outgoing: Option<&AvailableMessages>)
pub fn obtain_chunks(n_validators: usize, block_data: &BlockData)
-> Result<Vec<Vec<u8>>, Error>
{
let params = code_params(n_validators)?;
let encoded = (block_data, outgoing).encode();
let encoded = block_data.encode();
if encoded.is_empty() {
return Err(Error::BadPayload);
......@@ -151,7 +151,7 @@ pub fn obtain_chunks(n_validators: usize, block_data: &BlockData, outgoing: Opti
///
/// Works only up to 65536 validators, and `n_validators` must be non-zero.
pub fn reconstruct<'a, I: 'a>(n_validators: usize, chunks: I)
-> Result<(BlockData, Option<AvailableMessages>), Error>
-> Result<BlockData, Error>
where I: IntoIterator<Item=(&'a [u8], usize)>
{
let params = code_params(n_validators)?;
......@@ -402,11 +402,9 @@ mod tests {
#[test]
fn round_trip_block_data() {
let block_data = BlockData((0..255).collect());
let ex = Some(AvailableMessages(Vec::new()));
let chunks = obtain_chunks(
10,
&block_data,
ex.as_ref(),
).unwrap();
assert_eq!(chunks.len(), 10);
......@@ -422,18 +420,16 @@ mod tests {
].iter().cloned(),
).unwrap();
assert_eq!(reconstructed, (block_data, ex));
assert_eq!(reconstructed, block_data);
}
#[test]
fn construct_valid_branches() {
let block_data = BlockData(vec![2; 256]);
let ex = Some(AvailableMessages(Vec::new()));
let chunks = obtain_chunks(
10,
&block_data,
ex.as_ref(),
).unwrap();
assert_eq!(chunks.len(), 10);
......
......@@ -236,15 +236,12 @@ impl CollatorPool {
mod tests {
use super::*;
use sp_core::crypto::UncheckedInto;
use polkadot_primitives::parachain::{
CandidateReceipt, BlockData, PoVBlock, HeadData, ConsolidatedIngress,
};
use polkadot_primitives::parachain::{CandidateReceipt, BlockData, PoVBlock, HeadData};
use futures::executor::block_on;