From 188840dca11960e445fb9d7bbe478b0de4f1100c Mon Sep 17 00:00:00 2001
From: Niklas Adolfsson <niklasadolfsson1@gmail.com>
Date: Mon, 7 Feb 2022 11:21:56 +0100
Subject: [PATCH] staking miner: spawn separate task for each block (#4716)

* staking miner: use config for emergency solution

Fixes #4678

* bump jsonrpsee

* run `monitor_cmd_for` until the connection is closed

* new tokio task for submit_and_watch xt

* re-use header subscription

* update jsonrpsee + simplify code

* revert polkadot runtime changes

* fix grumbles

* Update utils/staking-miner/src/monitor.rs

* grumbles: fix logs + nits
---
 polkadot/Cargo.lock                           |   2 +-
 polkadot/utils/staking-miner/Cargo.toml       |  12 +-
 polkadot/utils/staking-miner/README.md        |   7 +
 polkadot/utils/staking-miner/src/main.rs      |   4 +-
 polkadot/utils/staking-miner/src/monitor.rs   | 298 +++++++++++++-----
 .../utils/staking-miner/src/rpc_helpers.rs    |   6 +-
 6 files changed, 232 insertions(+), 97 deletions(-)

diff --git a/polkadot/Cargo.lock b/polkadot/Cargo.lock
index 311a648fd0a..4b7f0e21770 100644
--- a/polkadot/Cargo.lock
+++ b/polkadot/Cargo.lock
@@ -10401,7 +10401,7 @@ dependencies = [
  "frame-election-provider-support",
  "frame-support",
  "frame-system",
- "jsonrpsee 0.4.1",
+ "jsonrpsee 0.8.0",
  "kusama-runtime",
  "log",
  "pallet-balances",
diff --git a/polkadot/utils/staking-miner/Cargo.toml b/polkadot/utils/staking-miner/Cargo.toml
index 21e08f84ba7..6a2700f66ad 100644
--- a/polkadot/utils/staking-miner/Cargo.toml
+++ b/polkadot/utils/staking-miner/Cargo.toml
@@ -5,16 +5,16 @@ authors = ["Parity Technologies <admin@parity.io>"]
 edition = "2018"
 
 [dependencies]
+clap = { version = "3.0", features = ["derive", "env"] }
 codec = { package = "parity-scale-codec", version = "2.0.0" }
-tokio = { version = "1.15", features = ["macros"] }
-log = "0.4.11"
 env_logger = "0.9.0"
-clap = { version = "3.0", features = ["derive", "env"] }
-jsonrpsee = { version = "0.4.1", default-features = false, features = ["ws-client"] }
-serde_json = "1.0"
-serde = "1.0.132"
+jsonrpsee = { version = "0.8", features = ["ws-client"] }
+log = "0.4.11"
 paste = "1.0.6"
+serde = "1.0.132"
+serde_json = "1.0"
 thiserror = "1.0.30"
+tokio = { version = "1.15", features = ["macros", "rt-multi-thread", "sync"] }
 
 remote-externalities = { git = "https://github.com/paritytech/substrate", branch = "master" }
 
diff --git a/polkadot/utils/staking-miner/README.md b/polkadot/utils/staking-miner/README.md
index 944f870d6df..6355395b1ab 100644
--- a/polkadot/utils/staking-miner/README.md
+++ b/polkadot/utils/staking-miner/README.md
@@ -59,3 +59,10 @@ docker run --rm -it \
     -e URI=wss://your-node:9944 \
     staking-miner dry-run
 ```
+
+### Test locally
+
+1. Modify `EPOCH_DURATION_IN_SLOTS` and `SessionsPerEra` to force an election
+   more often than once per day.
+2. $ polkadot --chain polkadot-dev --tmp --alice --execution Native -lruntime=debug --offchain-worker=Always --ws-port 9999
+3. $ staking-miner --uri ws://localhost:9999 --seed //Alice monitor phrag-mms
diff --git a/polkadot/utils/staking-miner/src/main.rs b/polkadot/utils/staking-miner/src/main.rs
index e38c81f602e..daf29d8c280 100644
--- a/polkadot/utils/staking-miner/src/main.rs
+++ b/polkadot/utils/staking-miner/src/main.rs
@@ -228,7 +228,7 @@ macro_rules! any_runtime_unit {
 #[derive(frame_support::DebugNoBound, thiserror::Error)]
 enum Error<T: EPM::Config> {
 	Io(#[from] std::io::Error),
-	JsonRpsee(#[from] jsonrpsee::types::Error),
+	JsonRpsee(#[from] jsonrpsee::core::Error),
 	RpcHelperError(#[from] rpc_helpers::RpcHelperError),
 	Codec(#[from] codec::Error),
 	Crypto(sp_core::crypto::SecretStringError),
@@ -602,7 +602,7 @@ async fn main() {
 
 	let outcome = any_runtime! {
 		match command.clone() {
-			Command::Monitor(c) => monitor_cmd(&client, shared, c, signer_account).await
+			Command::Monitor(c) => monitor_cmd(client, shared, c, signer_account).await
 				.map_err(|e| {
 					log::error!(target: LOG_TARGET, "Monitor error: {:?}", e);
 				}),
diff --git a/polkadot/utils/staking-miner/src/monitor.rs b/polkadot/utils/staking-miner/src/monitor.rs
index 19259098df5..8d71b242a41 100644
--- a/polkadot/utils/staking-miner/src/monitor.rs
+++ b/polkadot/utils/staking-miner/src/monitor.rs
@@ -19,13 +19,17 @@
 use crate::{prelude::*, rpc_helpers::*, signer::Signer, Error, MonitorConfig, SharedConfig};
 use codec::Encode;
 use jsonrpsee::{
+	core::{
+		client::{Subscription, SubscriptionClientT},
+		Error as RpcError,
+	},
 	rpc_params,
-	types::{traits::SubscriptionClient, Subscription},
 	ws_client::WsClient,
 };
-
 use sc_transaction_pool_api::TransactionStatus;
 use sp_core::storage::StorageKey;
+use std::sync::Arc;
+use tokio::sync::mpsc;
 
 /// Ensure that now is the signed phase.
 async fn ensure_signed_phase<T: EPM::Config, B: BlockT>(
@@ -64,14 +68,16 @@ async fn ensure_no_previous_solution<
 }
 
 macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
+
 	/// The monitor command.
 	pub(crate) async fn [<monitor_cmd_ $runtime>](
-		client: &WsClient,
+		client: WsClient,
 		shared: SharedConfig,
 		config: MonitorConfig,
 		signer: Signer,
 	) -> Result<(), Error<$crate::[<$runtime _runtime_exports>]::Runtime>> {
 		use $crate::[<$runtime _runtime_exports>]::*;
+		type StakingMinerError = Error<$crate::[<$runtime _runtime_exports>]::Runtime>;
 
 		let (sub, unsub) = if config.listen == "head" {
 			("chain_subscribeNewHeads", "chain_unsubscribeNewHeads")
@@ -79,66 +85,143 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
 			("chain_subscribeFinalizedHeads", "chain_unsubscribeFinalizedHeads")
 		};
 
+		let mut subscription: Subscription<Header> = client.subscribe(&sub, None, &unsub).await?;
+
+		let client = Arc::new(client);
+		let (tx, mut rx) = mpsc::unbounded_channel::<StakingMinerError>();
+
 		loop {
-			log::info!(target: LOG_TARGET, "subscribing to {:?} / {:?}", sub, unsub);
-			let mut subscription: Subscription<Header> = client
-				.subscribe(&sub, None, &unsub)
-				.await
-				.unwrap();
-
-			while let Some(now) = subscription.next().await? {
-				let hash = now.hash();
-				log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", now.number, hash);
-
-				// if the runtime version has changed, terminate.
-				crate::check_versions::<Runtime>(client).await?;
-
-				// we prefer doing this check before fetching anything into a remote-ext.
-				if ensure_signed_phase::<Runtime, Block>(client, hash).await.is_err() {
-					log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all.");
-					continue;
-				};
+			let at = tokio::select! {
+				maybe_rp = subscription.next() => {
+					match maybe_rp {
+						Some(Ok(r)) => r,
+						// Custom `jsonrpsee` message sent by the server if the subscription was closed on the server side.
+						Some(Err(RpcError::SubscriptionClosed(reason))) => {
+							log::warn!(target: LOG_TARGET, "subscription to {} terminated: {:?}. Retrying..", sub, reason);
+							subscription = client.subscribe(&sub, None, &unsub).await?;
+							continue;
+						}
+						Some(Err(e)) => {
+							log::error!(target: LOG_TARGET, "subscription failed to decode Header {:?}, this is bug please file an issue", e);
+							return Err(e.into());
+						}
+						// The subscription was dropped, should only happen if:
+						//	- the connection was closed.
+						//	- the subscription could not keep up with the server.
+						None => {
+							log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub);
+							subscription = client.subscribe(&sub, None, &unsub).await?;
+							continue
+						}
+					}
+				},
+				maybe_err = rx.recv() => {
+					match maybe_err {
+						Some(err) => return Err(err),
+						None => unreachable!("at least one sender kept in the main loop should always return Some; qed"),
+					}
+				}
+			};
+
+			// Spawn task and non-recoverable errors are sent back to the main task
+			// such as if the connection has been closed.
+			tokio::spawn(
+				send_and_watch_extrinsic(client.clone(), tx.clone(), at, signer.clone(), shared.clone(), config.clone())
+			);
+		}
+
+		/// Construct extrinsic at given block and watch it.
+		async fn send_and_watch_extrinsic(
+			client: Arc<WsClient>,
+			tx: mpsc::UnboundedSender<StakingMinerError>,
+			at: Header,
+			signer: Signer,
+			shared: SharedConfig,
+			config: MonitorConfig,
+		) {
+
+			let hash = at.hash();
+			log::trace!(target: LOG_TARGET, "new event at #{:?} ({:?})", at.number, hash);
 
-				// grab an externalities without staking, just the election snapshot.
-				let mut ext = crate::create_election_ext::<Runtime, Block>(
-					shared.uri.clone(),
-					Some(hash),
-					vec![],
-				).await?;
+			// if the runtime version has changed, terminate.
+			if let Err(err) = crate::check_versions::<Runtime>(&*client).await {
+				let _ = tx.send(err.into());
+				return;
+			}
+
+			// we prefer doing this check before fetching anything into a remote-ext.
+			if ensure_signed_phase::<Runtime, Block>(&*client, hash).await.is_err() {
+				log::debug!(target: LOG_TARGET, "phase closed, not interested in this block at all.");
+				return;
+			}
 
-				if ensure_no_previous_solution::<Runtime, Block>(&mut ext, &signer.account).await.is_err() {
-					log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping.");
-					continue;
+			// grab an externalities without staking, just the election snapshot.
+			let mut ext = match crate::create_election_ext::<Runtime, Block>(
+				shared.uri.clone(),
+				Some(hash),
+				vec![],
+			).await {
+				Ok(ext) => ext,
+				Err(err) => {
+					let _ = tx.send(err);
+					return;
 				}
+			};
+
+			if ensure_no_previous_solution::<Runtime, Block>(&mut ext, &signer.account).await.is_err() {
+				log::debug!(target: LOG_TARGET, "We already have a solution in this phase, skipping.");
+				return;
+			}
 
-				// mine a solution, and run feasibility check on it as well.
-				let (raw_solution, witness) = crate::mine_with::<Runtime>(&config.solver, &mut ext, true)?;
-				log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score);
-
-				let nonce = crate::get_account_info::<Runtime>(client, &signer.account, Some(hash))
-					.await?
-					.map(|i| i.nonce)
-					.expect(crate::signer::SIGNER_ACCOUNT_WILL_EXIST);
-				let tip = 0 as Balance;
-				let period = <Runtime as frame_system::Config>::BlockHashCount::get() / 2;
-				let current_block = now.number.saturating_sub(1);
-				let era = sp_runtime::generic::Era::mortal(period.into(), current_block.into());
-				log::trace!(
-					target: LOG_TARGET, "transaction mortality: {:?} -> {:?}",
-					era.birth(current_block.into()),
-					era.death(current_block.into()),
-				);
-				let extrinsic = ext.execute_with(|| create_uxt(raw_solution, witness, signer.clone(), nonce, tip, era));
-				let bytes = sp_core::Bytes(extrinsic.encode());
-
-				let mut tx_subscription: Subscription<
+			// mine a solution, and run feasibility check on it as well.
+			let (raw_solution, witness) = match crate::mine_with::<Runtime>(&config.solver, &mut ext, true) {
+				Ok(r) => r,
+				Err(err) => {
+					let _ = tx.send(err.into());
+					return;
+				}
+			};
+
+			log::info!(target: LOG_TARGET, "mined solution with {:?}", &raw_solution.score);
+
+			let nonce = match crate::get_account_info::<Runtime>(&*client, &signer.account, Some(hash)).await {
+				Ok(maybe_account) => {
+					let acc = maybe_account.expect(crate::signer::SIGNER_ACCOUNT_WILL_EXIST);
+					acc.nonce
+				}
+				Err(err) => {
+					let _ = tx.send(err);
+					return;
+				}
+			};
+
+			let tip = 0 as Balance;
+			let period = <Runtime as frame_system::Config>::BlockHashCount::get() / 2;
+			let current_block = at.number.saturating_sub(1);
+			let era = sp_runtime::generic::Era::mortal(period.into(), current_block.into());
+
+			log::trace!(
+				target: LOG_TARGET, "transaction mortality: {:?} -> {:?}",
+				era.birth(current_block.into()),
+				era.death(current_block.into()),
+			);
+
+			let extrinsic = ext.execute_with(|| create_uxt(raw_solution, witness, signer.clone(), nonce, tip, era));
+			let bytes = sp_core::Bytes(extrinsic.encode());
+
+			let mut tx_subscription: Subscription<
 					TransactionStatus<<Block as BlockT>::Hash, <Block as BlockT>::Hash>
-				> = match client
-					.subscribe(&"author_submitAndWatchExtrinsic", rpc_params! { bytes }, "author_unwatchExtrinsic")
-					.await
-				{
-					Ok(sub) => sub,
-					Err(why) => {
+			> = match client.subscribe(
+				"author_submitAndWatchExtrinsic",
+				rpc_params! { bytes },
+				"author_unwatchExtrinsic"
+			).await {
+				Ok(sub) => sub,
+				Err(RpcError::RestartNeeded(e)) => {
+					let _ = tx.send(RpcError::RestartNeeded(e).into());
+					return
+				},
+				Err(why) => {
 					// This usually happens when we've been busy with mining for a few blocks, and
 					// now we're receiving the subscriptions of blocks in which we were busy. In
 					// these blocks, we still don't have a solution, so we re-compute a new solution
@@ -146,38 +229,83 @@ macro_rules! monitor_cmd_for { ($runtime:tt) => { paste::paste! {
 					// error. NOTE: to improve this overall, and to be able to introduce an array of
 					// other fancy features, we should make this multi-threaded and do the
 					// computation outside of this callback.
-						log::warn!(target: LOG_TARGET, "failing to submit a transaction {:?}. continuing...", why);
-						continue
-					}
+					log::warn!(
+						target: LOG_TARGET,
+						"failing to submit a transaction {:?}. ignore block: {}",
+						why, at.number
+					);
+					return;
+				},
+			};
+
+			while let Some(rp) = tx_subscription.next().await {
+				let status_update = match rp {
+					Ok(r) => r,
+					// Custom `jsonrpsee` message sent by the server if the subscription was closed on the server side.
+					Err(RpcError::SubscriptionClosed(reason)) => {
+						log::warn!(
+							target: LOG_TARGET,
+							"tx subscription closed by the server: {:?}; skip block: {}",
+							reason, at.number
+						);
+						return;
+					},
+					Err(e) => {
+						log::error!(target: LOG_TARGET, "subscription failed to decode TransactionStatus {:?}, this is a bug please file an issue", e);
+						let _ = tx.send(e.into());
+						return;
+					},
 				};
 
-				while let Some(status_update) = tx_subscription.next().await? {
-					log::trace!(target: LOG_TARGET, "status update {:?}", status_update);
-					match status_update {
-						TransactionStatus::Ready | TransactionStatus::Broadcast(_) | TransactionStatus::Future => continue,
-						TransactionStatus::InBlock(hash) => {
-							log::info!(target: LOG_TARGET, "included at {:?}", hash);
-							let key = StorageKey(frame_support::storage::storage_prefix(b"System",b"Events").to_vec());
-							let events = get_storage::<Vec<frame_system::EventRecord<Event, <Block as BlockT>::Hash>>,
-							>(client, rpc_params!{ key, hash }).await?.unwrap_or_default();
-							log::info!(target: LOG_TARGET, "events at inclusion {:?}", events);
-						}
-						TransactionStatus::Retracted(hash) => {
-							log::info!(target: LOG_TARGET, "Retracted at {:?}", hash);
-						}
-						TransactionStatus::Finalized(hash) => {
-							log::info!(target: LOG_TARGET, "Finalized at {:?}", hash);
-							break
-						}
-						_ => {
-							log::warn!(target: LOG_TARGET, "Stopping listen due to other status {:?}", status_update);
-							break
-						}
-					}
+				log::trace!(target: LOG_TARGET, "status update {:?}", status_update);
+				match status_update {
+					TransactionStatus::Ready |
+					TransactionStatus::Broadcast(_) |
+					TransactionStatus::Future => continue,
+					TransactionStatus::InBlock(hash) => {
+						log::info!(target: LOG_TARGET, "included at {:?}", hash);
+						let key = StorageKey(
+							frame_support::storage::storage_prefix(b"System", b"Events").to_vec(),
+						);
+						let key2 = key.clone();
+
+						let events = match get_storage::<
+							Vec<frame_system::EventRecord<Event, <Block as BlockT>::Hash>>,
+						>(&*client, rpc_params! { key, hash })
+						.await {
+							Ok(rp) => rp.unwrap_or_default(),
+							Err(RpcHelperError::JsonRpsee(RpcError::RestartNeeded(e))) => {
+								let _ = tx.send(RpcError::RestartNeeded(e).into());
+								return;
+							}
+							// Decoding or other RPC error => just terminate the task.
+							Err(e) => {
+								log::warn!(target: LOG_TARGET, "get_storage [key: {:?}, hash: {:?}] failed: {:?}; skip block: {}",
+									key2, hash, e, at.number
+								);
+								return;
+							}
+						};
+
+						log::info!(target: LOG_TARGET, "events at inclusion {:?}", events);
+					},
+					TransactionStatus::Retracted(hash) => {
+						log::info!(target: LOG_TARGET, "Retracted at {:?}", hash);
+					},
+					TransactionStatus::Finalized(hash) => {
+						log::info!(target: LOG_TARGET, "Finalized at {:?}", hash);
+						break
+					},
+					_ => {
+						log::warn!(
+							target: LOG_TARGET,
+							"Stopping listen due to other status {:?}",
+							status_update
+						);
+						break
+					},
 				};
 			}
-
-			log::warn!(target: LOG_TARGET, "subscription to {} terminated. Retrying..", sub)
 		}
 	}
 }}}
diff --git a/polkadot/utils/staking-miner/src/rpc_helpers.rs b/polkadot/utils/staking-miner/src/rpc_helpers.rs
index 1277564ebd9..153ca0e65c0 100644
--- a/polkadot/utils/staking-miner/src/rpc_helpers.rs
+++ b/polkadot/utils/staking-miner/src/rpc_helpers.rs
@@ -17,12 +17,12 @@
 //! Helper method for RPC.
 
 use super::*;
-use jsonrpsee::types::traits::Client;
-pub(crate) use jsonrpsee::types::v2::ParamsSer;
+use jsonrpsee::core::client::ClientT;
+pub(crate) use jsonrpsee::types::ParamsSer;
 
 #[derive(frame_support::DebugNoBound, thiserror::Error)]
 pub(crate) enum RpcHelperError {
-	JsonRpsee(#[from] jsonrpsee::types::Error),
+	JsonRpsee(#[from] jsonrpsee::core::Error),
 	Codec(#[from] codec::Error),
 }
 
-- 
GitLab