Skip to content
builder.rs 37.4 KiB
Newer Older
			let used_state_cache_size = match info.used_state_cache_size {
				Some(size) => size,
				None => 0,
			};

			// get cpu usage and memory usage of this process
			let (cpu_usage, memory) = if let Some(self_pid) = self_pid {
				if sys.refresh_process(self_pid) {
					let proc = sys.get_process(self_pid)
						.expect("Above refresh_process succeeds, this should be Some(), qed");
					(proc.cpu_usage(), proc.memory())
				} else { (0.0, 0) }
			} else { (0.0, 0) };

			telemetry!(
				SUBSTRATE_INFO;
				"system.interval";
				"peers" => num_peers,
				"height" => best_number,
				"best" => ?best_hash,
				"txcount" => txpool_status.ready,
				"cpu" => cpu_usage,
				"memory" => memory,
				"finalized_height" => finalized_number,
				"finalized_hash" => ?info.chain.finalized_hash,
				"bandwidth_download" => bandwidth_download,
				"bandwidth_upload" => bandwidth_upload,
				"used_state_cache_size" => used_state_cache_size,
			);
Ashley's avatar
Ashley committed
			record_metrics!(
				"peers".to_owned() => num_peers,
				"height".to_owned() => best_number,
				"txcount".to_owned() => txpool_status.ready,
				"cpu".to_owned() => cpu_usage,
				"memory".to_owned() => memory,
				"finalized_height".to_owned() => finalized_number,
				"bandwidth_download".to_owned() => bandwidth_download,
				"bandwidth_upload".to_owned() => bandwidth_upload,
				"used_state_cache_size".to_owned() => used_state_cache_size
Ashley's avatar
Ashley committed
			);

			Ok(())
		}).select(exit.clone()).then(|_| Ok(()));
		let _ = to_spawn_tx.unbounded_send(Box::new(tel_task));

		// Periodically send the network state to the telemetry.
		let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>();
		network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx);
		let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| {
			telemetry!(
				SUBSTRATE_INFO;
				"system.network_state";
				"state" => network_state,
			);
			Ok(())
		}).select(exit.clone()).then(|_| Ok(()));
		let _ = to_spawn_tx.unbounded_send(Box::new(tel_task_2));

		// RPC
		let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded();
		let gen_handler = || {
			use rpc::{chain, state, author, system};

			let system_info = rpc::system::SystemInfo {
				chain_name: config.chain_spec.name().into(),
				impl_name: config.impl_name.into(),
				impl_version: config.impl_version.into(),
				properties: config.chain_spec.properties().clone(),
			};

			let subscriptions = rpc::Subscriptions::new(Arc::new(SpawnTaskHandle {
				sender: to_spawn_tx.clone(),
				on_exit: exit.clone()
			}));

			let (chain, state) = if let (Some(remote_backend), Some(on_demand)) =
				(remote_backend.as_ref(), on_demand.as_ref()) {
				// Light clients
				let chain = rpc::chain::new_light(
					client.clone(),
					subscriptions.clone(),
					remote_backend.clone(),
					on_demand.clone()
				);
				let state = rpc::state::new_light(
					client.clone(),
					subscriptions.clone(),
					remote_backend.clone(),
					on_demand.clone()
				);
				(chain, state)

			} else {
				// Full nodes
				let chain = rpc::chain::new_full(client.clone(), subscriptions.clone());
				let state = rpc::state::new_full(client.clone(), subscriptions.clone());
				(chain, state)
			};

			let author = rpc::author::Author::new(
				client.clone(),
				transaction_pool.clone(),
				subscriptions,
				keystore.clone(),
			);
			let system = system::System::new(system_info, system_rpc_tx.clone());

			rpc_servers::rpc_handler((
				state::StateApi::to_delegate(state),
				chain::ChainApi::to_delegate(chain),
				author::AuthorApi::to_delegate(author),
				system::SystemApi::to_delegate(system),
				rpc_extensions.clone(),
			))
		};
		let rpc_handlers = gen_handler();
		let rpc = start_rpc_servers(&config, gen_handler)?;


		let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(
			config.roles,
			network_mut,
			client.clone(),
			network_status_sinks.clone(),
			system_rpc_rx,
			has_bootnodes,
			dht_event_tx,
			.map_err(|_| ())
			.select(exit.clone())
			.then(|_| Ok(()))));

		let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();

		// Telemetry
		let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
			let is_authority = config.roles.is_authority();
			let network_id = network.local_peer_id().to_base58();
			let name = config.name.clone();
			let impl_name = config.impl_name.to_owned();
			let version = version.clone();
			let chain_name = config.chain_spec.name().to_owned();
			let telemetry_connection_sinks_ = telemetry_connection_sinks.clone();
			let telemetry = tel::init_telemetry(tel::TelemetryConfig {
				endpoints,
				wasm_external_transport: config.telemetry_external_transport.take(),
			});
			let startup_time = SystemTime::UNIX_EPOCH.elapsed()
				.map(|dur| dur.as_millis())
				.unwrap_or(0);
			let future = telemetry.clone()
				.map(|ev| Ok::<_, ()>(ev))
				.compat()
				.for_each(move |event| {
					// Safe-guard in case we add more events in the future.
					let tel::TelemetryEvent::Connected = event;

					telemetry!(SUBSTRATE_INFO; "system.connected";
						"name" => name.clone(),
						"implementation" => impl_name.clone(),
						"version" => version.clone(),
						"config" => "",
						"chain" => chain_name.clone(),
						"authority" => is_authority,
						"startup_time" => startup_time,
						"network_id" => network_id.clone()
					);

					telemetry_connection_sinks_.lock().retain(|sink| {
						sink.unbounded_send(()).is_ok()
					});
					Ok(())
				});
			let _ = to_spawn_tx.unbounded_send(Box::new(future
				.select(exit.clone())
				.then(|_| Ok(()))));
			telemetry
		});

Ashley's avatar
Ashley committed
		// Grafana data source
		if let Some(port) = config.grafana_port {
			let future = select(
				grafana_data_source::run_server(port).boxed(),
				exit.clone().compat()
			).map(|either| match either {
				Either::Left((result, _)) => result.map_err(|_| ()),
				Either::Right(_) => Ok(())
			}).compat();

			let _ = to_spawn_tx.unbounded_send(Box::new(future));
    }
		// Instrumentation
		if let Some(tracing_targets) = config.tracing_targets.as_ref() {
			let subscriber = substrate_tracing::ProfilingSubscriber::new(
				config.tracing_receiver, tracing_targets
			);
			match tracing::subscriber::set_global_default(subscriber) {
				Ok(_) => (),
				Err(e) => error!(target: "tracing", "Unable to set global default subscriber {}", e),
			}
		}

		Ok(Service {
			client,
			network,
			network_status_sinks,
			select_chain,
			transaction_pool,
			exit,
			signal: Some(signal),
			essential_failed_tx,
			essential_failed_rx,
			to_spawn_tx,
			to_spawn_rx,
			to_poll: Vec::new(),
			rpc_handlers,
			_rpc: rpc,
			_telemetry: telemetry,
			_offchain_workers: offchain_workers,
			_telemetry_on_connect_sinks: telemetry_connection_sinks.clone(),
			keystore,
			marker: PhantomData::<TBl>,
		})