From d4dbc306e87cd3e2cbe0b492d61c0cd475e8d4dc Mon Sep 17 00:00:00 2001 From: Arkadiy Paronyan <arkady.paronyan@gmail.com> Date: Sat, 29 Sep 2018 16:53:47 +0200 Subject: [PATCH] Fixed consensus message garbage collection & DB initialization (#841) * Consensus cleanup * Fixed DB initialization issue * Spacing --- substrate/core/client/src/client.rs | 2 +- .../core/network/src/consensus_gossip.rs | 4 +- substrate/core/service/test/src/lib.rs | 83 ++++++++++++------- substrate/node/network/src/lib.rs | 1 + 4 files changed, 57 insertions(+), 33 deletions(-) diff --git a/substrate/core/client/src/client.rs b/substrate/core/client/src/client.rs index 985439461c2..0e2ca942590 100644 --- a/substrate/core/client/src/client.rs +++ b/substrate/core/client/src/client.rs @@ -243,7 +243,7 @@ impl<B, E, Block> Client<B, E, Block> where } // changes trie configuration should never change => we can read it in advance - let changes_trie_config = backend.state_at(BlockId::Number(Zero::zero()))? + let changes_trie_config = backend.state_at(BlockId::Number(backend.blockchain().info()?.best_number))? .storage(well_known_keys::CHANGES_TRIE_CONFIG) .map_err(|e| error::Error::from_state(Box::new(e)))? .and_then(|c| Decode::decode(&mut &*c)); diff --git a/substrate/core/network/src/consensus_gossip.rs b/substrate/core/network/src/consensus_gossip.rs index 449aab26ee8..7e685f85427 100644 --- a/substrate/core/network/src/consensus_gossip.rs +++ b/substrate/core/network/src/consensus_gossip.rs @@ -216,9 +216,7 @@ impl<B: BlockT> ConsensusGossip<B> where B::Header: HeaderT<Number=u64> { false } }); - if self.messages.len() != before { - trace!(target:"gossip", "Cleaned up {} stale messages", before - self.messages.len()); - } + trace!(target:"gossip", "Cleaned up {} stale messages, {} left", before - self.messages.len(), self.messages.len()); for (_, ref mut peer) in self.peers.iter_mut() { peer.known_messages.retain(|h| hashes.contains(h)); } diff --git a/substrate/core/service/test/src/lib.rs b/substrate/core/service/test/src/lib.rs index 607d68e4fb6..6ec3720deb6 100644 --- a/substrate/core/service/test/src/lib.rs +++ b/substrate/core/service/test/src/lib.rs @@ -52,9 +52,12 @@ use sr_primitives::traits::As; struct TestNet<F: ServiceFactory> { runtime: Runtime, - authority_nodes: Arc<Vec<(u32, F::FullService)>>, - full_nodes: Arc<Vec<(u32, F::FullService)>>, - _light_nodes: Arc<Vec<(u32, F::LightService)>>, + authority_nodes: Vec<(u32, Arc<F::FullService>)>, + full_nodes: Vec<(u32, Arc<F::FullService>)>, + _light_nodes: Vec<(u32, Arc<F::LightService>)>, + chain_spec: FactoryChainSpec<F>, + base_port: u16, + nodes: usize, } impl<F: ServiceFactory> TestNet<F> { @@ -101,7 +104,7 @@ fn node_config<F: ServiceFactory> ( public_addresses: vec![], boot_nodes: vec![], use_secret: Some(blake2_256(node_private_key_string(index).as_bytes())), - min_peers: 25, + min_peers: 50, max_peers: 500, reserved_nodes: vec![], non_reserved_mode: NonReservedPeerMode::Accept, @@ -135,28 +138,42 @@ impl<F: ServiceFactory> TestNet<F> { ::env_logger::init().ok(); ::fdlimit::raise_fd_limit(); let runtime = Runtime::new().expect("Error creating tokio runtime"); - let authority_nodes = authorities.iter().enumerate().map(|(index, key)| (index as u32, - F::new_full(node_config::<F>(index as u32, &spec, Roles::AUTHORITY, Some(key.clone()), base_port, &temp), runtime.executor()) - .expect("Error creating test node service")) - ).collect(); - - let authorities = authorities.len() as u32; - let full_nodes = (authorities..full + authorities).map(|index| (index, - F::new_full(node_config::<F>(index, &spec, Roles::FULL, None, base_port, &temp), runtime.executor()) - .expect("Error creating test node service")) - ).collect(); - - let light_nodes = (full + authorities..full + authorities + light).map(|index| (index, - F::new_light(node_config::<F>(index, &spec, Roles::LIGHT, None, base_port, &temp), runtime.executor()) - .expect("Error creating test node service")) - ).collect(); - - TestNet { + let mut net = TestNet { runtime, - authority_nodes: Arc::new(authority_nodes), - full_nodes: Arc::new(full_nodes), - _light_nodes: Arc::new(light_nodes), - } + authority_nodes: Default::default(), + full_nodes: Default::default(), + _light_nodes: Default::default(), + chain_spec: spec.clone(), + base_port, + nodes: 0, + }; + net.insert_nodes(temp, full, light, authorities); + net + } + + fn insert_nodes(&mut self, temp: &TempDir, full: u32, light: u32, authorities: Vec<String>) { + let mut nodes = self.nodes; + let base_port = self.base_port; + let spec = self.chain_spec.clone(); + let executor = self.runtime.executor(); + self.authority_nodes.extend(authorities.iter().enumerate().map(|(index, key)| ((index + nodes) as u32, + Arc::new(F::new_full(node_config::<F>(index as u32, &spec, Roles::AUTHORITY, Some(key.clone()), base_port, &temp), executor.clone()) + .expect("Error creating test node service"))) + )); + nodes += authorities.len(); + + self.full_nodes.extend((nodes..nodes + full as usize).map(|index| (index as u32, + Arc::new(F::new_full(node_config::<F>(index as u32, &spec, Roles::FULL, None, base_port, &temp), executor.clone()) + .expect("Error creating test node service"))) + )); + nodes += full as usize; + + self._light_nodes.extend((nodes..nodes + light as usize).map(|index| (index as u32, + Arc::new(F::new_light(node_config::<F>(index as u32, &spec, Roles::LIGHT, None, base_port, &temp), executor.clone()) + .expect("Error creating test node service"))) + )); + nodes += light as usize; + self.nodes = nodes; } } @@ -229,11 +246,11 @@ pub fn consensus<F>(spec: FactoryChainSpec<F>, authorities: Vec<String>) where F: ServiceFactory, { - const NUM_NODES: u32 = 10; + const NUM_NODES: u32 = 20; const NUM_BLOCKS: u64 = 200; - info!("Checking consensus"); let temp = TempDir::new("substrate-conensus-test").expect("Error creating test dir"); - let mut network = TestNet::<F>::new(&temp, spec.clone(), NUM_NODES, 0, authorities, 30600); + let mut network = TestNet::<F>::new(&temp, spec.clone(), NUM_NODES / 2, 0, authorities, 30600); + info!("Checking consensus"); let first_address = network.authority_nodes[0].1.network().node_id().unwrap(); for (_, service) in network.full_nodes.iter() { service.network().add_reserved_peer(first_address.clone()).expect("Error adding reserved peer"); @@ -242,6 +259,14 @@ where service.network().add_reserved_peer(first_address.clone()).expect("Error adding reserved peer"); } network.run_until_all_full(|_index, service| { - service.client().info().unwrap().chain.finalized_number >= As::sa(NUM_BLOCKS) + service.client().info().unwrap().chain.finalized_number >= As::sa(NUM_BLOCKS / 2) }); + info!("Adding more peers"); + network.insert_nodes(&temp, NUM_NODES / 2, 0, vec![]); + for (_, service) in network.full_nodes.iter() { + service.network().add_reserved_peer(first_address.clone()).expect("Error adding reserved peer"); + } + network.run_until_all_full(|_index, service| + service.client().info().unwrap().chain.finalized_number >= As::sa(NUM_BLOCKS) + ); } diff --git a/substrate/node/network/src/lib.rs b/substrate/node/network/src/lib.rs index 4dad6e3f78c..7e07054fcb5 100644 --- a/substrate/node/network/src/lib.rs +++ b/substrate/node/network/src/lib.rs @@ -105,6 +105,7 @@ impl Specialization<Block> for Protocol { } fn maintain_peers(&mut self, _ctx: &mut Context<Block>) { + self.consensus_gossip.collect_garbage(None); } fn on_block_imported(&mut self, _ctx: &mut Context<Block>, _hash: Hash, _header: &Header) { -- GitLab