Unverified Commit a5b223a0 authored by Bastian Köcher's avatar Bastian Köcher Committed by GitHub
Browse files

Make `communication_for` exit when we end a round (#313)

* Make `communication_for` exit when we end a round

* Fix compilation
parent d19d5b13
Pipeline #42390 failed with stages
in 33 seconds
...@@ -330,6 +330,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where ...@@ -330,6 +330,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
&self, &self,
table: Arc<SharedTable>, table: Arc<SharedTable>,
authorities: &[ValidatorId], authorities: &[ValidatorId],
exit: exit_future::Exit,
) -> Self::BuildTableRouter { ) -> Self::BuildTableRouter {
let parent_hash = *table.consensus_parent_hash(); let parent_hash = *table.consensus_parent_hash();
let local_session_key = table.session_key(); let local_session_key = table.session_key();
...@@ -354,7 +355,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where ...@@ -354,7 +355,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
let table_router_clone = table_router.clone(); let table_router_clone = table_router.clone();
let work = table_router.checked_statements() let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) }); .for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
executor.spawn(work); executor.spawn(work.select(exit).map(|_| ()).map_err(|_| ()));
table_router table_router
}); });
......
...@@ -22,9 +22,7 @@ use parity_codec::{Decode, HasCompact}; ...@@ -22,9 +22,7 @@ use parity_codec::{Decode, HasCompact};
use srml_support::{decl_storage, decl_module, fail, ensure}; use srml_support::{decl_storage, decl_module, fail, ensure};
use bitvec::{bitvec, BigEndian}; use bitvec::{bitvec, BigEndian};
use sr_primitives::traits::{ use sr_primitives::traits::{Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One};
Hash as HashT, BlakeTwo256, Member, CheckedConversion, Saturating, One, Zero,
};
use primitives::{Hash, Balance, parachain::{ use primitives::{Hash, Balance, parachain::{
self, Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion, self, Id as ParaId, Chain, DutyRoster, AttestedCandidate, Statement, AccountIdConversion,
ParachainDispatchOrigin, UpwardMessage, BlockIngressRoots, ParachainDispatchOrigin, UpwardMessage, BlockIngressRoots,
...@@ -243,9 +241,11 @@ decl_storage! { ...@@ -243,9 +241,11 @@ decl_storage! {
config(parachains): Vec<(ParaId, Vec<u8>, Vec<u8>)>; config(parachains): Vec<(ParaId, Vec<u8>, Vec<u8>)>;
config(_phdata): PhantomData<T>; config(_phdata): PhantomData<T>;
build(|storage: &mut StorageOverlay, _: &mut ChildrenStorageOverlay, config: &GenesisConfig<T>| { build(|storage: &mut StorageOverlay, _: &mut ChildrenStorageOverlay, config: &GenesisConfig<T>| {
use sr_primitives::traits::Zero;
let mut p = config.parachains.clone(); let mut p = config.parachains.clone();
p.sort_unstable_by_key(|&(ref id, _, _)| id.clone()); p.sort_unstable_by_key(|&(ref id, _, _)| *id);
p.dedup_by_key(|&mut (ref id, _, _)| id.clone()); p.dedup_by_key(|&mut (ref id, _, _)| *id);
let only_ids: Vec<_> = p.iter().map(|&(ref id, _, _)| id).cloned().collect(); let only_ids: Vec<_> = p.iter().map(|&(ref id, _, _)| id).cloned().collect();
......
...@@ -147,6 +147,7 @@ pub trait Network { ...@@ -147,6 +147,7 @@ pub trait Network {
&self, &self,
table: Arc<SharedTable>, table: Arc<SharedTable>,
authorities: &[SessionKey], authorities: &[SessionKey],
exit: exit_future::Exit,
) -> Self::BuildTableRouter; ) -> Self::BuildTableRouter;
} }
...@@ -313,11 +314,14 @@ impl<C, N, P> ParachainValidation<C, N, P> where ...@@ -313,11 +314,14 @@ impl<C, N, P> ParachainValidation<C, N, P> where
let (group_info, local_duty) = make_group_info( let (group_info, local_duty) = make_group_info(
duty_roster, duty_roster,
&authorities, &authorities,
sign_with.public().into(), sign_with.public(),
)?; )?;
info!("Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}", info!(
parent_hash, local_duty.validation); "Starting parachain attestation session on top of parent {:?}. Local parachain duty is {:?}",
parent_hash,
local_duty.validation,
);
let active_parachains = self.client.runtime_api().active_parachains(&id)?; let active_parachains = self.client.runtime_api().active_parachains(&id)?;
...@@ -331,25 +335,23 @@ impl<C, N, P> ParachainValidation<C, N, P> where ...@@ -331,25 +335,23 @@ impl<C, N, P> ParachainValidation<C, N, P> where
self.extrinsic_store.clone(), self.extrinsic_store.clone(),
max_block_data_size, max_block_data_size,
)); ));
let (_drop_signal, exit) = exit_future::signal();
let router = self.network.communication_for( let router = self.network.communication_for(
table.clone(), table.clone(),
&authorities, &authorities,
exit.clone(),
); );
let drop_signal = match local_duty.validation { if let Chain::Parachain(id) = local_duty.validation {
Chain::Parachain(id) => Some(self.launch_work( self.launch_work(parent_hash, id, router, max_block_data_size, exit);
parent_hash, }
id,
router,
max_block_data_size,
)),
Chain::Relay => None,
};
let tracker = Arc::new(AttestationTracker { let tracker = Arc::new(AttestationTracker {
table, table,
started: Instant::now(), started: Instant::now(),
_drop_signal: drop_signal _drop_signal,
}); });
live_instances.insert(parent_hash, tracker.clone()); live_instances.insert(parent_hash, tracker.clone());
...@@ -369,10 +371,10 @@ impl<C, N, P> ParachainValidation<C, N, P> where ...@@ -369,10 +371,10 @@ impl<C, N, P> ParachainValidation<C, N, P> where
validation_para: ParaId, validation_para: ParaId,
build_router: N::BuildTableRouter, build_router: N::BuildTableRouter,
max_block_data_size: Option<u64>, max_block_data_size: Option<u64>,
) -> exit_future::Signal { exit: exit_future::Exit,
) {
use extrinsic_store::Data; use extrinsic_store::Data;
let (signal, exit) = exit_future::signal();
let (collators, client) = (self.collators.clone(), self.client.clone()); let (collators, client) = (self.collators.clone(), self.client.clone());
let extrinsic_store = self.extrinsic_store.clone(); let extrinsic_store = self.extrinsic_store.clone();
...@@ -428,16 +430,15 @@ impl<C, N, P> ParachainValidation<C, N, P> where ...@@ -428,16 +430,15 @@ impl<C, N, P> ParachainValidation<C, N, P> where
.then(|_| Ok(())); .then(|_| Ok(()));
// spawn onto thread pool. // spawn onto thread pool.
if let Err(_) = self.handle.execute(Box::new(cancellable_work)) { if self.handle.execute(Box::new(cancellable_work)).is_err() {
error!("Failed to spawn cancellable work task"); error!("Failed to spawn cancellable work task");
} }
signal
} }
} }
/// Parachain validation for a single block. /// Parachain validation for a single block.
struct AttestationTracker { struct AttestationTracker {
_drop_signal: Option<exit_future::Signal>, _drop_signal: exit_future::Signal,
table: Arc<SharedTable>, table: Arc<SharedTable>,
started: Instant, started: Instant,
} }
...@@ -544,7 +545,7 @@ impl<C, N, P, SC, TxApi> consensus::Environment<Block> for ProposerFactory<C, N, ...@@ -544,7 +545,7 @@ impl<C, N, P, SC, TxApi> consensus::Environment<Block> for ProposerFactory<C, N,
parent_id, parent_id,
parent_number: parent_header.number, parent_number: parent_header.number,
transaction_pool: self.transaction_pool.clone(), transaction_pool: self.transaction_pool.clone(),
slot_duration: self.aura_slot_duration.clone(), slot_duration: self.aura_slot_duration,
}) })
} }
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment