Skip to content
Snippets Groups Projects
Commit 0b606613 authored by Pierre Krieger's avatar Pierre Krieger Committed by Bastian Köcher
Browse files

ServerToWorkerMsg -> ServiceToWorkerMsg (#4519)

parent 4a9697db
No related merge requests found
......@@ -114,7 +114,7 @@ pub struct NetworkService<B: BlockT + 'static, S: NetworkSpecialization<B>, H: E
/// nodes it should be connected to or not.
peerset: PeersetHandle,
/// Channel that sends messages to the actual worker.
to_worker: mpsc::UnboundedSender<ServerToWorkerMsg<B, S>>,
to_worker: mpsc::UnboundedSender<ServiceToWorkerMsg<B, S>>,
/// Marker to pin the `H` generic. Serves no purpose except to not break backwards
/// compatibility.
_marker: PhantomData<H>,
......@@ -434,7 +434,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
/// The protocol must have been registered with `register_notifications_protocol`.
///
pub fn write_notification(&self, target: PeerId, engine_id: ConsensusEngineId, message: Vec<u8>) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::WriteNotification {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::WriteNotification {
target,
engine_id,
message,
......@@ -449,7 +449,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
pub fn event_stream(&self) -> impl Stream<Item = Event, Error = ()> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = mpsc::unbounded();
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::EventStream(tx));
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx
}
......@@ -466,7 +466,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
&self,
engine_id: ConsensusEngineId,
) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::RegisterNotifProtocol {
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::RegisterNotifProtocol {
engine_id,
});
}
......@@ -476,7 +476,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
/// The latest transactions will be fetched from the `TransactionPool` that was passed at
/// initialization as part of the configuration.
pub fn trigger_repropagate(&self) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::PropagateExtrinsics);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::PropagateExtrinsics);
}
/// Make sure an important block is propagated to peers.
......@@ -484,7 +484,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
/// In chain-based consensus, we often need to make sure non-best forks are
/// at least temporarily synced. This function forces such an announcement.
pub fn announce_block(&self, hash: B::Hash, data: Vec<u8>) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash, data));
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::AnnounceBlock(hash, data));
}
/// Report a given peer as either beneficial (+) or costly (-) according to the
......@@ -497,7 +497,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
///
/// This triggers the same effects as if the connection had closed itself spontaneously.
pub fn disconnect_peer(&self, who: PeerId) {
let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::DisconnectPeer(who));
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::DisconnectPeer(who));
}
/// Request a justification for the given block from the network.
......@@ -507,7 +507,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
pub fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::RequestJustification(hash.clone(), number));
.unbounded_send(ServiceToWorkerMsg::RequestJustification(hash.clone(), number));
}
/// Execute a closure with the chain-specific network specialization.
......@@ -516,7 +516,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
{
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::ExecuteWithSpec(Box::new(f)));
.unbounded_send(ServiceToWorkerMsg::ExecuteWithSpec(Box::new(f)));
}
/// Are we in the process of downloading the chain?
......@@ -531,7 +531,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
pub fn get_value(&self, key: &record::Key) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::GetValue(key.clone()));
.unbounded_send(ServiceToWorkerMsg::GetValue(key.clone()));
}
/// Start putting a value in the DHT.
......@@ -541,7 +541,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
pub fn put_value(&self, key: record::Key, value: Vec<u8>) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::PutValue(key, value));
.unbounded_send(ServiceToWorkerMsg::PutValue(key, value));
}
/// Connect to unreserved peers and allow unreserved peers to connect.
......@@ -566,7 +566,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
self.peerset.add_reserved_peer(peer_id.clone());
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::AddKnownAddress(peer_id, addr));
.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
Ok(())
}
......@@ -579,7 +579,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
pub fn set_sync_fork_request(&self, peers: Vec<PeerId>, hash: B::Hash, number: NumberFor<B>) {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::SyncFork(peers, hash, number));
.unbounded_send(ServiceToWorkerMsg::SyncFork(peers, hash, number));
}
/// Modify a peerset priority group.
......@@ -594,7 +594,7 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> NetworkServic
for (peer_id, addr) in peers.into_iter() {
let _ = self
.to_worker
.unbounded_send(ServerToWorkerMsg::AddKnownAddress(peer_id, addr));
.unbounded_send(ServiceToWorkerMsg::AddKnownAddress(peer_id, addr));
}
Ok(())
......@@ -659,7 +659,7 @@ impl<B, S, H> NetworkStateInfo for NetworkService<B, S, H>
/// Messages sent from the `NetworkService` to the `NetworkWorker`.
///
/// Each entry corresponds to a method of `NetworkService`.
enum ServerToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
enum ServiceToWorkerMsg<B: BlockT, S: NetworkSpecialization<B>> {
PropagateExtrinsics,
RequestJustification(B::Hash, NumberFor<B>),
AnnounceBlock(B::Hash, Vec<u8>),
......@@ -698,7 +698,7 @@ pub struct NetworkWorker<B: BlockT + 'static, S: NetworkSpecialization<B>, H: Ex
/// The import queue that was passed as initialization.
import_queue: Box<dyn ImportQueue<B>>,
/// Messages from the `NetworkService` and that must be processed.
from_worker: mpsc::UnboundedReceiver<ServerToWorkerMsg<B, S>>,
from_worker: mpsc::UnboundedReceiver<ServiceToWorkerMsg<B, S>>,
/// Receiver for queries from the light client that must be processed.
light_client_rqs: Option<mpsc::UnboundedReceiver<RequestData<B>>>,
/// Senders for events that happen on the network.
......@@ -734,36 +734,36 @@ impl<B: BlockT + 'static, S: NetworkSpecialization<B>, H: ExHashT> Future for Ne
};
match msg {
ServerToWorkerMsg::ExecuteWithSpec(task) => {
ServiceToWorkerMsg::ExecuteWithSpec(task) => {
let protocol = self.network_service.user_protocol_mut();
let (mut context, spec) = protocol.specialization_lock();
task(spec, &mut context);
},
ServerToWorkerMsg::AnnounceBlock(hash, data) =>
ServiceToWorkerMsg::AnnounceBlock(hash, data) =>
self.network_service.user_protocol_mut().announce_block(hash, data),
ServerToWorkerMsg::RequestJustification(hash, number) =>
ServiceToWorkerMsg::RequestJustification(hash, number) =>
self.network_service.user_protocol_mut().request_justification(&hash, number),
ServerToWorkerMsg::PropagateExtrinsics =>
ServiceToWorkerMsg::PropagateExtrinsics =>
self.network_service.user_protocol_mut().propagate_extrinsics(),
ServerToWorkerMsg::GetValue(key) =>
ServiceToWorkerMsg::GetValue(key) =>
self.network_service.get_value(&key),
ServerToWorkerMsg::PutValue(key, value) =>
ServiceToWorkerMsg::PutValue(key, value) =>
self.network_service.put_value(key, value),
ServerToWorkerMsg::AddKnownAddress(peer_id, addr) =>
ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) =>
self.network_service.add_known_address(peer_id, addr),
ServerToWorkerMsg::SyncFork(peer_ids, hash, number) =>
ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) =>
self.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number),
ServerToWorkerMsg::EventStream(sender) =>
ServiceToWorkerMsg::EventStream(sender) =>
self.event_streams.push(sender),
ServerToWorkerMsg::WriteNotification { message, engine_id, target } =>
ServiceToWorkerMsg::WriteNotification { message, engine_id, target } =>
self.network_service.user_protocol_mut().write_notification(target, engine_id, message),
ServerToWorkerMsg::RegisterNotifProtocol { engine_id } => {
ServiceToWorkerMsg::RegisterNotifProtocol { engine_id } => {
let events = self.network_service.user_protocol_mut().register_notifications_protocol(engine_id);
for event in events {
self.event_streams.retain(|sender| sender.unbounded_send(event.clone()).is_ok());
}
},
ServerToWorkerMsg::DisconnectPeer(who) =>
ServiceToWorkerMsg::DisconnectPeer(who) =>
self.network_service.user_protocol_mut().disconnect_peer(&who),
}
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment