diff --git a/substrate/client/network/transactions/src/lib.rs b/substrate/client/network/transactions/src/lib.rs index 5239a94ef23f39079f728639aeb1b270f5081243..4cc76507c6f167b732adf58db4b2959aa189ac74 100644 --- a/substrate/client/network/transactions/src/lib.rs +++ b/substrate/client/network/transactions/src/lib.rs @@ -172,11 +172,13 @@ impl TransactionsHandlerPrototype { let handler = TransactionsHandler { protocol_name: self.protocol_name, - propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), + propagate_timeout: (Box::pin(interval(PROPAGATE_TIMEOUT)) + as Pin<Box<dyn Stream<Item = ()> + Send>>) + .fuse(), pending_transactions: FuturesUnordered::new(), pending_transactions_peers: HashMap::new(), service, - event_stream, + event_stream: event_stream.fuse(), peers: HashMap::new(), transaction_pool, from_controller, @@ -229,7 +231,7 @@ pub struct TransactionsHandler< > { protocol_name: ProtocolName, /// Interval at which we call `propagate_transactions`. - propagate_timeout: Pin<Box<dyn Stream<Item = ()> + Send>>, + propagate_timeout: stream::Fuse<Pin<Box<dyn Stream<Item = ()> + Send>>>, /// Pending transactions verification tasks. pending_transactions: FuturesUnordered<PendingTransaction<H>>, /// As multiple peers can send us the same transaction, we group @@ -240,7 +242,7 @@ pub struct TransactionsHandler< /// Network service to use to send messages and manage peers. service: S, /// Stream of networking events. - event_stream: Pin<Box<dyn Stream<Item = Event> + Send>>, + event_stream: stream::Fuse<Pin<Box<dyn Stream<Item = Event> + Send>>>, // All connected peers peers: HashMap<PeerId, Peer<H>>, transaction_pool: Arc<dyn TransactionPool<H, B>>, @@ -268,7 +270,7 @@ where pub async fn run(mut self) { loop { futures::select! { - _ = self.propagate_timeout.next().fuse() => { + _ = self.propagate_timeout.next() => { self.propagate_transactions(); }, (tx_hash, result) = self.pending_transactions.select_next_some() => { @@ -278,7 +280,7 @@ where warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!"); } }, - network_event = self.event_stream.next().fuse() => { + network_event = self.event_stream.next() => { if let Some(network_event) = network_event { self.handle_network_event(network_event).await; } else { @@ -286,7 +288,7 @@ where return; } }, - message = self.from_controller.select_next_some().fuse() => { + message = self.from_controller.select_next_some() => { match message { ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash), ToHandler::PropagateTransactions => self.propagate_transactions(),