// Copyright 2019 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // Polkadot is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . //! Tests and helpers for validation networking. #![allow(unused)] use crate::gossip::GossipMessage; use substrate_network::Context as NetContext; use substrate_network::consensus_gossip::TopicNotification; use substrate_primitives::{NativeOrEncoded, ExecutionContext}; use substrate_keyring::Sr25519Keyring; use crate::{GossipService, PolkadotProtocol, NetworkService, GossipMessageStream}; use polkadot_validation::{SharedTable, Network}; use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId}; use polkadot_primitives::parachain::{ Id as ParaId, Chain, DutyRoster, ParachainHost, TargetedMessage, ValidatorId, StructuredUnroutedIngress, BlockIngressRoots, Status, FeeSchedule, HeadData, Retriable, CollatorId }; use parking_lot::Mutex; use sp_blockchain::Result as ClientResult; use sr_api::{Core, RuntimeVersion, StorageProof, ApiExt}; use sr_primitives::traits::{ApiRef, ProvideRuntimeApi}; use std::collections::HashMap; use std::sync::Arc; use std::pin::Pin; use std::task::{Poll, Context}; use futures::{prelude::*, channel::mpsc}; use codec::Encode; use super::{TestContext, TestChainContext}; type TaskExecutor = Arc; #[derive(Clone, Copy)] struct NeverExit; impl Future for NeverExit { type Output = (); fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll { Poll::Pending } } fn clone_gossip(n: &TopicNotification) -> TopicNotification { TopicNotification { message: n.message.clone(), sender: n.sender.clone(), } } struct GossipRouter { incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>, incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender)>, outgoing: Vec<(Hash, mpsc::UnboundedSender)>, messages: Vec<(Hash, TopicNotification)>, } impl GossipRouter { fn add_message(&mut self, topic: Hash, message: TopicNotification) { self.outgoing.retain(|&(ref o_topic, ref sender)| { o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok() }); self.messages.push((topic, message)); } fn add_outgoing(&mut self, topic: Hash, sender: mpsc::UnboundedSender) { for message in self.messages.iter() .filter(|&&(ref t, _)| t == &topic) .map(|&(_, ref msg)| clone_gossip(msg)) { if let Err(_) = sender.unbounded_send(message) { return } } self.outgoing.push((topic, sender)); } } impl Future for GossipRouter { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = Pin::into_inner(self); loop { match Pin::new(&mut this.incoming_messages).poll_next(cx) { Poll::Ready(Some((topic, message))) => this.add_message(topic, message), Poll::Ready(None) => panic!("ended early."), Poll::Pending => break, } } loop { match Pin::new(&mut this.incoming_streams).poll_next(cx) { Poll::Ready(Some((topic, sender))) => this.add_outgoing(topic, sender), Poll::Ready(None) => panic!("ended early."), Poll::Pending => break, } } Poll::Pending } } #[derive(Clone)] struct GossipHandle { send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>, send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender)>, } fn make_gossip() -> (GossipRouter, GossipHandle) { let (message_tx, message_rx) = mpsc::unbounded(); let (listener_tx, listener_rx) = mpsc::unbounded(); ( GossipRouter { incoming_messages: message_rx, incoming_streams: listener_rx, outgoing: Vec::new(), messages: Vec::new(), }, GossipHandle { send_message: message_tx, send_listener: listener_tx }, ) } struct TestNetwork { proto: Arc>, gossip: GossipHandle, } impl NetworkService for TestNetwork { fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { let (tx, rx) = mpsc::unbounded(); let _ = self.gossip.send_listener.unbounded_send((topic, tx)); GossipMessageStream::new(Box::new(rx)) } fn gossip_message(&self, topic: Hash, message: GossipMessage) { let notification = TopicNotification { message: message.encode(), sender: None }; let _ = self.gossip.send_message.unbounded_send((topic, notification)); } fn with_gossip(&self, with: F) where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext) { unimplemented!() } fn with_spec(&self, with: F) where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext) { let mut context = TestContext::default(); let res = with(&mut *self.proto.lock(), &mut context); // TODO: send context to worker for message routing. // https://github.com/paritytech/polkadot/issues/215 res } } #[derive(Default)] struct ApiData { validators: Vec, duties: Vec, active_parachains: Vec<(ParaId, Option<(CollatorId, Retriable)>)>, ingress: HashMap, } #[derive(Default, Clone)] struct TestApi { data: Arc>, } struct RuntimeApi { data: Arc>, } impl ProvideRuntimeApi for TestApi { type Api = RuntimeApi; fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> { RuntimeApi { data: self.data.clone() }.into() } } impl Core for RuntimeApi { fn Core_version_runtime_api_impl( &self, _: &BlockId, _: ExecutionContext, _: Option<()>, _: Vec, ) -> ClientResult> { unimplemented!("Not required for testing!") } fn Core_execute_block_runtime_api_impl( &self, _: &BlockId, _: ExecutionContext, _: Option<(Block)>, _: Vec, ) -> ClientResult> { unimplemented!("Not required for testing!") } fn Core_initialize_block_runtime_api_impl( &self, _: &BlockId, _: ExecutionContext, _: Option<&Header>, _: Vec, ) -> ClientResult> { unimplemented!("Not required for testing!") } } impl ApiExt for RuntimeApi { type Error = sp_blockchain::Error; fn map_api_result Result, R, E>( &self, _: F ) -> Result { unimplemented!("Not required for testing!") } fn runtime_version_at(&self, _: &BlockId) -> ClientResult { unimplemented!("Not required for testing!") } fn record_proof(&mut self) { } fn extract_proof(&mut self) -> Option { None } } impl ParachainHost for RuntimeApi { fn ParachainHost_validators_runtime_api_impl( &self, _at: &BlockId, _: ExecutionContext, _: Option<()>, _: Vec, ) -> ClientResult>> { Ok(NativeOrEncoded::Native(self.data.lock().validators.clone())) } fn ParachainHost_duty_roster_runtime_api_impl( &self, _at: &BlockId, _: ExecutionContext, _: Option<()>, _: Vec, ) -> ClientResult> { Ok(NativeOrEncoded::Native(DutyRoster { validator_duty: self.data.lock().duties.clone(), })) } fn ParachainHost_active_parachains_runtime_api_impl( &self, _at: &BlockId, _: ExecutionContext, _: Option<()>, _: Vec, ) -> ClientResult)>>> { Ok(NativeOrEncoded::Native(self.data.lock().active_parachains.clone())) } fn ParachainHost_parachain_status_runtime_api_impl( &self, _at: &BlockId, _: ExecutionContext, _: Option, _: Vec, ) -> ClientResult>> { Ok(NativeOrEncoded::Native(Some(Status { head_data: HeadData(Vec::new()), balance: 0, fee_schedule: FeeSchedule { base: 0, per_byte: 0, } }))) } fn ParachainHost_parachain_code_runtime_api_impl( &self, _at: &BlockId, _: ExecutionContext, _: Option, _: Vec, ) -> ClientResult>>> { Ok(NativeOrEncoded::Native(Some(Vec::new()))) } fn ParachainHost_ingress_runtime_api_impl( &self, _at: &BlockId, _: ExecutionContext, id: Option<(ParaId, Option)>, _: Vec, ) -> ClientResult>> { let (id, _) = id.unwrap(); Ok(NativeOrEncoded::Native(self.data.lock().ingress.get(&id).cloned())) } } type TestValidationNetwork = crate::validation::ValidationNetwork< TestApi, NeverExit, TestNetwork, TaskExecutor, >; struct Built { gossip: GossipRouter, api_handle: Arc>, networks: Vec, } fn build_network(n: usize, executor: TaskExecutor) -> Built { let (gossip_router, gossip_handle) = make_gossip(); let api_handle = Arc::new(Mutex::new(Default::default())); let runtime_api = Arc::new(TestApi { data: api_handle.clone() }); let networks = (0..n).map(|_| { let net = Arc::new(TestNetwork { proto: Arc::new(Mutex::new(PolkadotProtocol::new(None))), gossip: gossip_handle.clone(), }); let message_val = crate::gossip::RegisteredMessageValidator::new_test( TestChainContext::default(), Box::new(|_, _| {}), ); TestValidationNetwork::new( net, NeverExit, message_val, runtime_api.clone(), executor.clone(), ) }); let networks: Vec<_> = networks.collect(); Built { gossip: gossip_router, api_handle, networks, } } #[derive(Default)] struct IngressBuilder { egress: HashMap<(ParaId, ParaId), Vec>>, } impl IngressBuilder { fn add_messages(&mut self, source: ParaId, messages: &[TargetedMessage]) { for message in messages { let target = message.target; self.egress.entry((source, target)).or_insert_with(Vec::new).push(message.data.clone()); } } fn build(self) -> HashMap { let mut map = HashMap::new(); for ((source, target), messages) in self.egress { map.entry(target).or_insert_with(Vec::new) .push((source, polkadot_validation::message_queue_root(&messages))); } for roots in map.values_mut() { roots.sort_by_key(|&(para_id, _)| para_id); } map.into_iter().map(|(k, v)| (k, BlockIngressRoots(v))).collect() } } fn make_table(data: &ApiData, local_key: &Sr25519Keyring, parent_hash: Hash) -> Arc { use av_store::Store; use substrate_primitives::crypto::Pair; let sr_pair = local_key.pair(); let local_key = polkadot_primitives::parachain::ValidatorPair::from(local_key.pair()); let store = Store::new_in_memory(); let (group_info, _) = ::polkadot_validation::make_group_info( DutyRoster { validator_duty: data.duties.clone() }, &data.validators, // only possible as long as parachain crypto === aura crypto Some(sr_pair.public().into()), ).unwrap(); Arc::new(SharedTable::new( data.validators.clone(), group_info, Some(Arc::new(local_key)), parent_hash, store, None, )) }