Newer
Older
on_exit: exit.clone()
}));
let (chain, state) = if let (Some(remote_backend), Some(on_demand)) =
(remote_backend.as_ref(), on_demand.as_ref()) {
// Light clients
let chain = sc_rpc::chain::new_light(
client.clone(),
subscriptions.clone(),
remote_backend.clone(),
on_demand.clone()
);
let state = sc_rpc::state::new_light(
client.clone(),
subscriptions.clone(),
remote_backend.clone(),
on_demand.clone()
);
(chain, state)
} else {
// Full nodes
let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone());
let state = sc_rpc::state::new_full(client.clone(), subscriptions.clone());
let author = sc_rpc::author::Author::new(
client.clone(),
transaction_pool.clone(),
subscriptions,
keystore.clone(),
);
let system = system::System::new(system_info, system_rpc_tx.clone());
sc_rpc_server::rpc_handler((
state::StateApi::to_delegate(state),
chain::ChainApi::to_delegate(chain),
author::AuthorApi::to_delegate(author),
system::SystemApi::to_delegate(system),
rpc_extensions.clone(),
))
};
let rpc_handlers = gen_handler();
let rpc = start_rpc_servers(&config, gen_handler)?;
let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future(
config.roles,
network_mut,
client.clone(),
network_status_sinks.clone(),
system_rpc_rx,
has_bootnodes,
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()))));
let telemetry_connection_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<()>>>> = Default::default();
// Telemetry
let telemetry = config.telemetry_endpoints.clone().map(|endpoints| {
let is_authority = config.roles.is_authority();
let network_id = network.local_peer_id().to_base58();
let name = config.name.clone();
let impl_name = config.impl_name.to_owned();
let version = version.clone();
let chain_name = config.chain_spec.name().to_owned();
let telemetry_connection_sinks_ = telemetry_connection_sinks.clone();
let telemetry = sc_telemetry::init_telemetry(sc_telemetry::TelemetryConfig {
endpoints,
wasm_external_transport: config.telemetry_external_transport.take(),
});
let startup_time = SystemTime::UNIX_EPOCH.elapsed()
.map(|dur| dur.as_millis())
.unwrap_or(0);
let future = telemetry.clone()
.map(|ev| Ok::<_, ()>(ev))
.compat()
.for_each(move |event| {
// Safe-guard in case we add more events in the future.
let sc_telemetry::TelemetryEvent::Connected = event;
telemetry!(SUBSTRATE_INFO; "system.connected";
"name" => name.clone(),
"implementation" => impl_name.clone(),
"version" => version.clone(),
"config" => "",
"chain" => chain_name.clone(),
"authority" => is_authority,
"startup_time" => startup_time,
"network_id" => network_id.clone()
);
telemetry_connection_sinks_.lock().retain(|sink| {
sink.unbounded_send(()).is_ok()
});
Ok(())
});
let _ = to_spawn_tx.unbounded_send(Box::new(future
.select(exit.clone().map(Ok).compat())
.then(|_| Ok(()))));
telemetry
});
// Grafana data source
if let Some(port) = config.grafana_port {
let future = select(
grafana_data_source::run_server(port).boxed(),
).map(|either| match either {
Either::Left((result, _)) => result.map_err(|_| ()),
Either::Right(_) => Ok(())
}).compat();
let _ = to_spawn_tx.unbounded_send(Box::new(future));
}
// Instrumentation
if let Some(tracing_targets) = config.tracing_targets.as_ref() {
let subscriber = sc_tracing::ProfilingSubscriber::new(
config.tracing_receiver, tracing_targets
);
match tracing::subscriber::set_global_default(subscriber) {
Ok(_) => (),
Err(e) => error!(target: "tracing", "Unable to set global default subscriber {}", e),
}
}
Ok(Service {
client,
network,
network_status_sinks,
select_chain,
transaction_pool,
exit,
signal: Some(signal),
essential_failed_tx,
essential_failed_rx,
to_spawn_tx,
to_spawn_rx,
to_poll: Vec::new(),
rpc_handlers,
_rpc: rpc,
_telemetry: telemetry,
_offchain_workers: offchain_workers,
_telemetry_on_connect_sinks: telemetry_connection_sinks.clone(),
keystore,
marker: PhantomData::<TBl>,
})