Newer
Older
asynchronous rob
committed
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//! Polkadot-specific base networking protocol.
//!
//! This is implemented using the `sc-network` APIs for futures-based
asynchronous rob
committed
//! notifications protocols. In some cases, we emulate request/response on top
//! of the notifications machinery, which is slightly less efficient but not
//! meaningfully so.
//!
//! We handle events from `sc-network` in a thin wrapper that forwards to a
//! background worker, which also handles commands from other parts of the node.
asynchronous rob
committed
asynchronous rob
committed
use codec::{Decode, Encode};
use futures::channel::{mpsc, oneshot};
use futures::future::Either;
use futures::prelude::*;
use futures::task::{Spawn, SpawnExt, Context, Poll};
use futures::stream::{FuturesUnordered, StreamFuture};
asynchronous rob
committed
use log::{debug, trace};
use polkadot_primitives::{
Hash, Block,
parachain::{
PoVBlock, ValidatorId, ValidatorIndex, Collation, AbridgedCandidateReceipt,
asynchronous rob
committed
ErasureChunk, ParachainHost, Id as ParaId, CollatorId,
},
};
use polkadot_validation::{
SharedTable, TableRouter, Network as ParachainNetwork, Validated, GenericStatement, Collators,
asynchronous rob
committed
};
use sc_network::{ObservedRole, Event, PeerId};
asynchronous rob
committed
use sp_api::ProvideRuntimeApi;
asynchronous rob
committed
use std::collections::{hash_map::{Entry, HashMap}, HashSet};
asynchronous rob
committed
use std::pin::Pin;
asynchronous rob
committed
use std::time::Duration;
use super::{cost, benefit, PolkadotNetworkService};
use crate::legacy::collator_pool::Role as CollatorRole;
use crate::legacy::gossip::{GossipMessage, ErasureChunkMessage, RegisteredMessageValidator};
asynchronous rob
committed
/// The current protocol version.
pub const VERSION: u32 = 1;
/// The minimum supported protocol version.
pub const MIN_SUPPORTED_VERSION: u32 = 1;
/// The engine ID of the polkadot network protocol.
pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2";
pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/1";
asynchronous rob
committed
pub use crate::legacy::gossip::ChainContext;
asynchronous rob
committed
// Messages from the service API or network adapter.
enum ServiceToWorkerMsg {
// basic peer messages.
PeerConnected(PeerId, ObservedRole),
asynchronous rob
committed
PeerMessage(PeerId, Vec<bytes::Bytes>),
PeerDisconnected(PeerId),
// service messages.
BuildConsensusNetworking(mpsc::Receiver<ServiceToWorkerMsg>, Arc<SharedTable>, Vec<ValidatorId>),
AbridgedCandidateReceipt,
PoVBlock,
asynchronous rob
committed
(ValidatorIndex, Vec<ErasureChunk>),
),
FetchPoVBlock(
AbridgedCandidateReceipt,
asynchronous rob
committed
oneshot::Sender<PoVBlock>,
),
FetchErasureChunk(
Hash, // candidate-hash.
u32, // validator index.
oneshot::Sender<ErasureChunk>,
),
DistributeErasureChunk(
Hash, // candidate-hash,
ErasureChunk,
),
asynchronous rob
committed
AwaitCollation(
Hash, // relay-parent,
ParaId,
oneshot::Sender<Collation>,
),
NoteBadCollator(
CollatorId,
),
RegisterAvailabilityStore(
av_store::Store,
),
OurCollation(
HashSet<ValidatorId>,
Collation,
),
ListenCheckedStatements(
Hash, // relay-parent,
oneshot::Sender<Pin<Box<dyn Stream<Item = SignedStatement> + Send>>>,
),
/// Used in tests to ensure that all other messages sent from the same
/// thread have been flushed. Also executes arbitrary logic with the protocl
/// handler.
#[cfg(test)]
Synchronize(Box<dyn FnOnce(&mut ProtocolHandler) + Send>),
}
/// Messages from a background task to the main worker task.
enum BackgroundToWorkerMsg {
// Spawn a given future.
Spawn(future::BoxFuture<'static, ()>),
}
/// Operations that a handle to an underlying network service should provide.
pub trait NetworkServiceOps: Send + Sync {
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
/// Report the peer as having a particular positive or negative value.
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange);
/// Write a notification to a given peer.
fn write_notification(
&self,
peer: PeerId,
engine_id: ConsensusEngineId,
notification: Vec<u8>,
);
}
impl NetworkServiceOps for PolkadotNetworkService {
fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange) {
PolkadotNetworkService::report_peer(self, peer, value);
}
fn write_notification(
&self,
peer: PeerId,
engine_id: ConsensusEngineId,
notification: Vec<u8>,
) {
PolkadotNetworkService::write_notification(self, peer, engine_id, notification);
}
}
/// Operations that a handle to a gossip network should provide.
trait GossipOps: Clone + Send + crate::legacy::GossipService + 'static {
fn new_local_leaf(
&self,
validation_data: crate::legacy::gossip::MessageValidationData,
) -> crate::legacy::gossip::NewLeafActions;
/// Register an availability store in the gossip service to evaluate incoming
/// messages with.
fn register_availability_store(
&self,
store: av_store::Store,
);
}
impl GossipOps for RegisteredMessageValidator {
fn new_local_leaf(
&self,
validation_data: crate::legacy::gossip::MessageValidationData,
) -> crate::legacy::gossip::NewLeafActions {
RegisteredMessageValidator::new_local_leaf(
self,
validation_data,
)
}
fn register_availability_store(
&self,
store: av_store::Store,
) {
RegisteredMessageValidator::register_availability_store(self, store);
}
asynchronous rob
committed
}
/// An async handle to the network service.
pub struct Service<N = PolkadotNetworkService> {
asynchronous rob
committed
sender: mpsc::Sender<ServiceToWorkerMsg>,
network_service: Arc<N>,
}
impl<N> Clone for Service<N> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
network_service: self.network_service.clone(),
}
}
asynchronous rob
committed
}
/// Registers the protocol.
///
/// You are very strongly encouraged to call this method very early on. Any connection open
/// will retain the protocols that were registered then, and not any new one.
pub fn start<C, Api, SP>(
service: Arc<PolkadotNetworkService>,
config: Config,
chain_context: C,
api: Arc<Api>,
executor: SP,
) -> Result<Service<PolkadotNetworkService>, futures::task::SpawnError> where
asynchronous rob
committed
C: ChainContext + 'static,
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
SP: Spawn + Clone + Send + 'static,
{
const SERVICE_TO_WORKER_BUF: usize = 256;
let mut event_stream = service.event_stream("polkadot-network");
service.register_notifications_protocol(POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME);
asynchronous rob
committed
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
let (mut worker_sender, worker_receiver) = mpsc::channel(SERVICE_TO_WORKER_BUF);
let gossip_validator = crate::legacy::gossip::register_validator(
service.clone(),
chain_context,
&executor,
);
executor.spawn(worker_loop(
config,
service.clone(),
gossip_validator,
api,
worker_receiver,
executor.clone(),
))?;
let polkadot_service = Service {
sender: worker_sender.clone(),
network_service: service.clone(),
};
executor.spawn(async move {
while let Some(event) = event_stream.next().await {
let res = match event {
Event::Dht(_) => continue,
Event::NotificationStreamOpened {
remote,
engine_id,
asynchronous rob
committed
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }
worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, role)).await
asynchronous rob
committed
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
},
Event::NotificationStreamClosed {
remote,
engine_id,
} => {
if engine_id != POLKADOT_ENGINE_ID { continue }
worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await
},
Event::NotificationsReceived {
remote,
messages,
} => {
let our_notifications = messages.into_iter()
.filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID {
Some(message)
} else {
None
})
.collect();
worker_sender.send(
ServiceToWorkerMsg::PeerMessage(remote, our_notifications)
).await
}
};
if let Err(e) = res {
// full is impossible here, as we've `await`ed the value being sent.
if e.is_disconnected() {
break
}
}
}
})?;
Ok(polkadot_service)
}
/// The Polkadot protocol status message.
#[derive(Debug, Encode, Decode, PartialEq)]
asynchronous rob
committed
pub struct Status {
version: u32, // protocol version.
collating_for: Option<(CollatorId, ParaId)>,
}
/// Polkadot-specific messages from peer to peer.
#[derive(Debug, Encode, Decode, PartialEq)]
asynchronous rob
committed
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
pub enum Message {
/// Exchange status with a peer. This should be the first message sent.
#[codec(index = "0")]
Status(Status),
/// Inform a peer of their role as a collator. May only be sent after
/// validator ID.
#[codec(index = "1")]
CollatorRole(CollatorRole),
/// Send a collation.
#[codec(index = "2")]
Collation(Hash, Collation),
/// Inform a peer of a new validator public key.
#[codec(index = "3")]
ValidatorId(ValidatorId),
}
// ensures collator-protocol messages are sent in correct order.
// session key must be sent before collator role.
enum CollatorState {
Fresh,
RolePending(CollatorRole),
Primed(Option<CollatorRole>),
}
impl CollatorState {
fn send_key<F: FnMut(Message)>(&mut self, key: ValidatorId, mut f: F) {
f(Message::ValidatorId(key));
match self {
CollatorState::RolePending(role) => {
f(Message::CollatorRole(*role));
*self = CollatorState::Primed(Some(*role));
},
CollatorState::Fresh => {
*self = CollatorState::Primed(None);
},
CollatorState::Primed(_) => {},
asynchronous rob
committed
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
}
}
fn set_role<F: FnMut(Message)>(&mut self, role: CollatorRole, mut f: F) {
if let CollatorState::Primed(ref mut r) = *self {
f(Message::CollatorRole(role));
*r = Some(role);
} else {
*self = CollatorState::RolePending(role);
}
}
}
enum ProtocolState {
Fresh,
Ready(Status, CollatorState),
}
struct PeerData {
claimed_validator: bool,
protocol_state: ProtocolState,
session_keys: RecentValidatorIds,
}
impl PeerData {
fn ready_and_collating_for(&self) -> Option<(CollatorId, ParaId)> {
match self.protocol_state {
ProtocolState::Ready(ref status, _) => status.collating_for.clone(),
_ => None,
}
}
fn collator_state_mut(&mut self) -> Option<&mut CollatorState> {
match self.protocol_state {
ProtocolState::Ready(_, ref mut c_state) => Some(c_state),
_ => None,
}
}
fn should_send_key(&self) -> bool {
self.claimed_validator || self.ready_and_collating_for().is_some()
}
}
struct ConsensusNetworkingInstance {
statement_table: Arc<SharedTable>,
relay_parent: Hash,
attestation_topic: Hash,
_drop_signal: exit_future::Signal,
}
/// A utility future that resolves when the receiving end of a channel has hung up.
///
/// This is an `.await`-friendly interface around `poll_canceled`.
// TODO: remove in favor of https://github.com/rust-lang/futures-rs/pull/2092/
// once published.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct AwaitCanceled<'a, T> {
inner: &'a mut oneshot::Sender<T>,
}
impl<T> Future for AwaitCanceled<'_, T> {
type Output = ();
fn poll(
mut self: Pin<&mut Self>,
cx: &mut futures::task::Context<'_>,
) -> futures::task::Poll<()> {
self.inner.poll_canceled(cx)
}
}
asynchronous rob
committed
/// Protocol configuration.
#[derive(Default)]
pub struct Config {
/// Which collator-id to use when collating, and on which parachain.
/// `None` if not collating.
pub collating_for: Option<(CollatorId, ParaId)>,
}
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
// 3 is chosen because sessions change infrequently and usually
// only the last 2 (current session and "last" session) are relevant.
// the extra is an error boundary.
const RECENT_SESSIONS: usize = 3;
/// Result when inserting recent session key.
#[derive(PartialEq, Eq)]
pub(crate) enum InsertedRecentKey {
/// Key was already known.
AlreadyKnown,
/// Key was new and pushed out optional old item.
New(Option<ValidatorId>),
}
/// Wrapper for managing recent session keys.
#[derive(Default)]
struct RecentValidatorIds {
inner: ArrayVec<[ValidatorId; RECENT_SESSIONS]>,
}
impl RecentValidatorIds {
/// Insert a new session key. This returns one to be pushed out if the
/// set is full.
fn insert(&mut self, key: ValidatorId) -> InsertedRecentKey {
if self.inner.contains(&key) { return InsertedRecentKey::AlreadyKnown }
let old = if self.inner.len() == RECENT_SESSIONS {
Some(self.inner.remove(0))
} else {
None
};
self.inner.push(key);
InsertedRecentKey::New(old)
}
/// As a slice. Most recent is last.
fn as_slice(&self) -> &[ValidatorId] {
&*self.inner
}
/// Returns the last inserted session key.
fn latest(&self) -> Option<&ValidatorId> {
self.inner.last()
}
asynchronous rob
committed
struct ProtocolHandler {
service: Arc<dyn NetworkServiceOps>,
asynchronous rob
committed
peers: HashMap<PeerId, PeerData>,
// reverse mapping from validator-ID to PeerID. Multiple peers can represent
// the same validator because of sentry nodes.
connected_validators: HashMap<ValidatorId, HashSet<PeerId>>,
consensus_instances: HashMap<Hash, ConsensusNetworkingInstance>,
asynchronous rob
committed
collators: crate::legacy::collator_pool::CollatorPool,
local_collations: crate::legacy::local_collations::LocalCollations<Collation>,
config: Config,
asynchronous rob
committed
}
impl ProtocolHandler {
fn new(
service: Arc<dyn NetworkServiceOps>,
asynchronous rob
committed
config: Config,
) -> Self {
ProtocolHandler {
service,
peers: HashMap::new(),
connected_validators: HashMap::new(),
consensus_instances: HashMap::new(),
asynchronous rob
committed
collators: Default::default(),
local_collations: Default::default(),
asynchronous rob
committed
config,
}
}
fn on_connect(&mut self, peer: PeerId, role: ObservedRole) {
let claimed_validator = matches!(role, ObservedRole::OurSentry | ObservedRole::OurGuardedAuthority | ObservedRole::Authority);
asynchronous rob
committed
self.peers.insert(peer.clone(), PeerData {
claimed_validator,
protocol_state: ProtocolState::Fresh,
session_keys: Default::default(),
});
let status = Message::Status(Status {
version: VERSION,
collating_for: self.config.collating_for.clone(),
}).encode();
self.service.write_notification(peer, POLKADOT_ENGINE_ID, status);
}
fn on_disconnect(&mut self, peer: PeerId) {
let mut new_primary = None;
if let Some(data) = self.peers.remove(&peer) {
asynchronous rob
committed
if let Some((collator_id, _)) = data.ready_and_collating_for() {
if self.collators.collator_id_to_peer_id(&collator_id) == Some(&peer) {
new_primary = self.collators.on_disconnect(collator_id);
}
}
// clean up stated validator IDs.
for validator_id in data.session_keys.as_slice().iter().cloned() {
self.validator_representative_removed(validator_id, &peer);
}
asynchronous rob
committed
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
}
let service = &self.service;
let peers = &mut self.peers;
if let Some(new_primary) = new_primary {
let new_primary_peer_id = match self.collators.collator_id_to_peer_id(&new_primary) {
None => return,
Some(p) => p.clone(),
};
if let Some(c_state) = peers.get_mut(&new_primary_peer_id)
.and_then(|p| p.collator_state_mut())
{
c_state.set_role(
CollatorRole::Primary,
|msg| service.write_notification(
new_primary_peer_id.clone(),
POLKADOT_ENGINE_ID,
msg.encode(),
),
);
}
}
}
fn on_raw_messages(&mut self, remote: PeerId, messages: Vec<bytes::Bytes>) {
for raw_message in messages {
match Message::decode(&mut raw_message.as_ref()) {
Ok(message) => {
self.service.report_peer(remote.clone(), benefit::VALID_FORMAT);
match message {
Message::Status(status) => {
self.on_status(remote.clone(), status);
}
Message::CollatorRole(role) => {
self.on_collator_role(remote.clone(), role)
}
Message::Collation(relay_parent, collation) => {
self.on_remote_collation(remote.clone(), relay_parent, collation);
}
Message::ValidatorId(session_key) => {
self.on_validator_id(remote.clone(), session_key)
}
}
},
Err(_) => self.service.report_peer(remote.clone(), cost::INVALID_FORMAT),
}
}
}
fn on_status(&mut self, remote: PeerId, status: Status) {
let peer = match self.peers.get_mut(&remote) {
None => { self.service.report_peer(remote, cost::UNKNOWN_PEER); return }
Some(p) => p,
};
match peer.protocol_state {
ProtocolState::Fresh => {
peer.protocol_state = ProtocolState::Ready(status, CollatorState::Fresh);
if let Some((collator_id, para_id)) = peer.ready_and_collating_for() {
let collator_attached = self.collators
.collator_id_to_peer_id(&collator_id)
.map_or(false, |id| id != &remote);
// we only care about the first connection from this collator.
if !collator_attached {
let role = self.collators
.on_new_collator(collator_id, para_id, remote.clone());
let service = &self.service;
let send_key = peer.should_send_key();
asynchronous rob
committed
if let Some(c_state) = peer.collator_state_mut() {
if send_key {
if let Some(key) = self.local_keys.latest() {
c_state.send_key(key.clone(), |msg| service.write_notification(
remote.clone(),
POLKADOT_ENGINE_ID,
msg.encode(),
));
}
}
asynchronous rob
committed
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
c_state.set_role(role, |msg| service.write_notification(
remote.clone(),
POLKADOT_ENGINE_ID,
msg.encode(),
));
}
}
}
}
ProtocolState::Ready(_, _) => {
self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE);
}
}
}
fn on_remote_collation(&mut self, remote: PeerId, relay_parent: Hash, collation: Collation) {
let peer = match self.peers.get_mut(&remote) {
None => { self.service.report_peer(remote, cost::UNKNOWN_PEER); return }
Some(p) => p,
};
let (collator_id, para_id) = match peer.ready_and_collating_for() {
None => {
self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE);
return
}
Some(x) => x,
};
let collation_para = collation.info.parachain_index;
let collated_acc = collation.info.collator.clone();
let structurally_valid = para_id == collation_para && collator_id == collated_acc;
if structurally_valid && collation.info.check_signature().is_ok() {
debug!(target: "p_net", "Received collation for parachain {:?} from peer {}",
para_id, remote);
if self.collators.collator_id_to_peer_id(&collator_id) == Some(&remote) {
self.collators.on_collation(collator_id, relay_parent, collation);
self.service.report_peer(remote, benefit::GOOD_COLLATION);
}
} else {
self.service.report_peer(remote, cost::INVALID_FORMAT);
}
}
fn on_collator_role(&mut self, remote: PeerId, role: CollatorRole) {
let collations_to_send;
{
let peer = match self.peers.get_mut(&remote) {
None => { self.service.report_peer(remote, cost::UNKNOWN_PEER); return }
Some(p) => p,
};
match peer.protocol_state {
ProtocolState::Fresh => {
self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE);
return;
}
ProtocolState::Ready(_, _) => {
let last_key = match peer.session_keys.as_slice().last() {
None => {
self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE);
return;
}
Some(k) => k,
};
collations_to_send = self.local_collations
.note_validator_role(last_key.clone(), role);
}
}
}
send_peer_collations(&*self.service, remote, collations_to_send);
asynchronous rob
committed
}
fn on_validator_id(&mut self, remote: PeerId, key: ValidatorId) {
let mut collations_to_send = Vec::new();
asynchronous rob
committed
{
let peer = match self.peers.get_mut(&remote) {
None => { self.service.report_peer(remote, cost::UNKNOWN_PEER); return }
Some(p) => p,
};
match peer.protocol_state {
ProtocolState::Fresh => {
self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE);
return
}
ProtocolState::Ready(_, _) => {
if let InsertedRecentKey::New(Some(last)) = peer.session_keys.insert(key.clone()) {
collations_to_send = self.local_collations.fresh_key(&last, &key);
asynchronous rob
committed
}
}
}
}
if let Some(invalidated) = invalidated_key {
self.validator_representative_removed(invalidated, &remote);
}
self.connected_validators.entry(key).or_insert_with(HashSet::new).insert(remote.clone());
send_peer_collations(&*self.service, remote, collations_to_send);
asynchronous rob
committed
}
// call when the given peer no longer represents the given validator key.
//
// this can occur when the peer advertises a new key, invalidating an old one,
// or when the peer disconnects.
fn validator_representative_removed(&mut self, validator_id: ValidatorId, peer_id: &PeerId) {
if let Entry::Occupied(mut entry) = self.connected_validators.entry(validator_id) {
entry.get_mut().remove(peer_id);
if entry.get().is_empty() {
let _ = entry.remove_entry();
}
asynchronous rob
committed
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
}
}
fn await_collation(
&mut self,
relay_parent: Hash,
para_id: ParaId,
sender: oneshot::Sender<Collation>,
) {
self.collators.await_collation(relay_parent, para_id, sender);
}
fn collect_garbage(&mut self) {
self.collators.collect_garbage(None);
self.local_collations.collect_garbage(None);
}
fn note_bad_collator(&mut self, who: CollatorId) {
if let Some(peer) = self.collators.collator_id_to_peer_id(&who) {
self.service.report_peer(peer.clone(), cost::BAD_COLLATION);
}
}
// distribute a new session key to any relevant peers.
fn distribute_new_session_key(&mut self, key: ValidatorId) {
let service = &self.service;
for (peer_id, peer) in self.peers.iter_mut() {
if !peer.should_send_key() { continue }
if let Some(c_state) = peer.collator_state_mut() {
c_state.send_key(key.clone(), |msg| service.write_notification(
peer_id.clone(),
POLKADOT_ENGINE_ID,
msg.encode(),
));
}
}
}
// distribute our (as a collator node) collation to peers.
fn distribute_our_collation(&mut self, targets: HashSet<ValidatorId>, collation: Collation) {
let relay_parent = collation.info.relay_parent;
let distribution = self.local_collations.add_collation(relay_parent, targets, collation);
for (validator, collation) in distribution {
let validator_representatives = self.connected_validators.get(&validator)
.into_iter().flat_map(|reps| reps);
for remote in validator_representatives {
send_peer_collations(
&*self.service,
remote.clone(),
std::iter::once((relay_parent, collation.clone())),
);
}
}
}
fn drop_consensus_networking(&mut self, relay_parent: &Hash) {
// this triggers an abort of the background task.
self.consensus_instances.remove(relay_parent);
}
remote: PeerId,
collations: impl IntoIterator<Item=(Hash, Collation)>,
) {
for (relay_parent, collation) in collations {
service.write_notification(
remote.clone(),
POLKADOT_ENGINE_ID,
Message::Collation(relay_parent, collation).encode(),
);
}
asynchronous rob
committed
}
/// Receives messages associated to a certain consensus networking instance.
struct ConsensusNetworkingReceiver {
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
/// The relay parent of this consensus network.
relay_parent: Hash,
}
impl Stream for ConsensusNetworkingReceiver {
type Item = ServiceToWorkerMsg;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_next(cx)
}
}
struct Worker<Api, Sp, Gossip> {
protocol_handler: ProtocolHandler,
asynchronous rob
committed
api: Arc<Api>,
executor: Sp,
gossip_handle: Gossip,
background_to_main_sender: mpsc::Sender<BackgroundToWorkerMsg>,
background_receiver: mpsc::Receiver<BackgroundToWorkerMsg>,
service_receiver: mpsc::Receiver<ServiceToWorkerMsg>,
consensus_networking_receivers: FuturesUnordered<StreamFuture<ConsensusNetworkingReceiver>>,
}
impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
asynchronous rob
committed
Api: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Api::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
Sp: Spawn + Clone,
Gossip: GossipOps,
asynchronous rob
committed
{
// spawns a background task to spawn consensus networking.
fn build_consensus_networking(
&mut self,
receiver: mpsc::Receiver<ServiceToWorkerMsg>,
table: Arc<SharedTable>,
authorities: Vec<ValidatorId>,
) {
// glue: let gossip know about our new local leaf.
let (signal, exit) = exit_future::signal();
let key = table.session_key();
if let Some(key) = key {
if let InsertedRecentKey::New(_) = self.protocol_handler.local_keys.insert(key.clone()) {
self.protocol_handler.distribute_new_session_key(key);
}
}
asynchronous rob
committed
let signing_context = table.signing_context().clone();
let relay_parent = signing_context.parent_hash.clone();
let new_leaf_actions = self.gossip_handle.new_local_leaf(
crate::legacy::gossip::MessageValidationData { authorities, signing_context },
asynchronous rob
committed
new_leaf_actions.perform(&self.gossip_handle);
asynchronous rob
committed
self.protocol_handler.consensus_instances.insert(
ConsensusNetworkingInstance {
statement_table: table.clone(),
relay_parent: relay_parent.clone(),
attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent.clone()),
_drop_signal: signal,
},
);
let relay_parent = table.signing_context().parent_hash;
self.consensus_networking_receivers.push(ConsensusNetworkingReceiver { receiver, relay_parent }.into_future());
// glue the incoming messages, shared table, and validation
// work together.
let _ = self.executor.spawn(statement_import_loop(
relay_parent,
table,
self.api.clone(),
self.gossip_handle.clone(),
self.background_to_main_sender.clone(),
exit,
));
}
asynchronous rob
committed
fn handle_service_message(&mut self, message: ServiceToWorkerMsg) {
asynchronous rob
committed
match message {
ServiceToWorkerMsg::PeerConnected(remote, role) => {
self.protocol_handler.on_connect(remote, role);
asynchronous rob
committed
}
ServiceToWorkerMsg::PeerDisconnected(remote) => {
self.protocol_handler.on_disconnect(remote);
asynchronous rob
committed
}
ServiceToWorkerMsg::PeerMessage(remote, messages) => {
self.protocol_handler.on_raw_messages(remote, messages)
asynchronous rob
committed
}
ServiceToWorkerMsg::BuildConsensusNetworking(receiver, table, authorities) => {
self.build_consensus_networking(receiver, table, authorities);
asynchronous rob
committed
}
ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => {
let relay_parent = receipt.relay_parent;
let instance = match self.protocol_handler.consensus_instances.get(&relay_parent) {
None => return,
asynchronous rob
committed
Some(instance) => instance,
};
asynchronous rob
committed
instance,
receipt,
pov_block,
asynchronous rob
committed
chunks,
asynchronous rob
committed
);
}
ServiceToWorkerMsg::FetchPoVBlock(candidate, mut sender) => {
// The gossip system checks that the correct pov-block data is present
// before placing in the pool, so we can safely check by candidate hash.
let get_msg = fetch_pov_from_gossip(&candidate, &self.gossip_handle);
let _ = self.executor.spawn(async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((pov_block, _)) = res {
let _ = sender.send(pov_block);
}
});
asynchronous rob
committed
}
ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
// for every erasure-root, relay-parent pair, there should only be one
// valid chunk with the given index.
//
// so we only care about the first item of the filtered stream.
let get_msg = self.gossip_handle.gossip_messages_for(topic)
.filter_map(move |(msg, _)| {
future::ready(match msg {
GossipMessage::ErasureChunk(chunk) =>
if chunk.chunk.index == validator_index {
Some(chunk.chunk)
} else {
None
},
_ => None,
})
})
.into_future()
.map(|(item, _)| item.expect(
"gossip message streams do not conclude early; qed"
));
let _ = self.executor.spawn(async move {
let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await;
if let Either::Left((chunk, _)) = res {
let _ = sender.send(chunk);
}
});
}
ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => {
let topic = crate::erasure_coding_topic(&candidate_hash);
topic,
GossipMessage::ErasureChunk(ErasureChunkMessage {
chunk: erasure_chunk,
candidate_hash,
})
);
}
asynchronous rob
committed
ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => {
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
self.protocol_handler.await_collation(relay_parent, para_id, sender)
asynchronous rob
committed
}
ServiceToWorkerMsg::NoteBadCollator(collator) => {
self.protocol_handler.note_bad_collator(collator);
asynchronous rob
committed
}
ServiceToWorkerMsg::RegisterAvailabilityStore(store) => {
self.gossip_handle.register_availability_store(store);
}
ServiceToWorkerMsg::OurCollation(targets, collation) => {
self.protocol_handler.distribute_our_collation(targets, collation);
}
ServiceToWorkerMsg::ListenCheckedStatements(relay_parent, sender) => {
let topic = crate::legacy::gossip::attestation_topic(relay_parent);
let checked_messages = self.gossip_handle.gossip_messages_for(topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)),
_ => future::ready(None),
})
.boxed();