Unverified Commit b32dcc44 authored by Fedor Sakharov's avatar Fedor Sakharov Committed by GitHub
Browse files

Collator protocol subsystem (#1659)



* WIP

* The initial implementation of the collator side.

* Improve comments

* Multiple collation requests

* Add more tests and comments to validator side

* Add comments, remove dead code

* Apply suggestions from code review
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>

* Fix build after suggested changes

* Also connect to the next validator group

* Remove a Future impl and move TimeoutExt to util

* Minor nits

* Fix build

* Change FetchCollations back to FetchCollation

* Try this

* Final fixes

* Fix build
Co-authored-by: default avatarPeter Goodspeed-Niklaus <coriolinus@users.noreply.github.com>
parent 61dd388b
Pipeline #106175 passed with stages
in 20 minutes and 41 seconds
......@@ -883,7 +883,7 @@ dependencies = [
"log 0.4.11",
"regalloc",
"serde",
"smallvec 1.4.1",
"smallvec 1.4.2",
"target-lexicon",
"thiserror",
]
......@@ -921,7 +921,7 @@ checksum = "2ef419efb4f94ecc02e5d9fbcc910d2bb7f0040e2de570e63a454f883bc891d6"
dependencies = [
"cranelift-codegen",
"log 0.4.11",
"smallvec 1.4.1",
"smallvec 1.4.2",
"target-lexicon",
]
......@@ -1566,7 +1566,7 @@ dependencies = [
"parity-scale-codec",
"paste",
"serde",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-arithmetic",
"sp-core",
"sp-inherents",
......@@ -2665,7 +2665,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-api",
"sp-authority-discovery",
"sp-block-builder",
......@@ -2703,7 +2703,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0315ef2f688e33844400b31f11c263f2b3dc21d8b9355c6891c5f185fae43f9a"
dependencies = [
"parity-util-mem",
"smallvec 1.4.1",
"smallvec 1.4.2",
]
[[package]]
......@@ -2732,7 +2732,7 @@ dependencies = [
"parking_lot 0.10.2",
"regex",
"rocksdb",
"smallvec 1.4.1",
"smallvec 1.4.2",
]
[[package]]
......@@ -2828,7 +2828,7 @@ dependencies = [
"parity-multiaddr",
"parking_lot 0.10.2",
"pin-project",
"smallvec 1.4.1",
"smallvec 1.4.2",
"wasm-timer",
]
......@@ -2859,7 +2859,7 @@ dependencies = [
"ring",
"rw-stream-sink",
"sha2 0.8.2",
"smallvec 1.4.1",
"smallvec 1.4.2",
"thiserror",
"unsigned-varint 0.4.0",
"void",
......@@ -2912,7 +2912,7 @@ dependencies = [
"prost",
"prost-build",
"rand 0.7.3",
"smallvec 1.4.1",
"smallvec 1.4.2",
]
[[package]]
......@@ -2936,7 +2936,7 @@ dependencies = [
"prost-build",
"rand 0.7.3",
"sha2 0.8.2",
"smallvec 1.4.1",
"smallvec 1.4.2",
"unsigned-varint 0.4.0",
"wasm-timer",
]
......@@ -2953,7 +2953,7 @@ dependencies = [
"log 0.4.11",
"prost",
"prost-build",
"smallvec 1.4.1",
"smallvec 1.4.2",
"wasm-timer",
]
......@@ -2977,7 +2977,7 @@ dependencies = [
"prost-build",
"rand 0.7.3",
"sha2 0.8.2",
"smallvec 1.4.1",
"smallvec 1.4.2",
"uint",
"unsigned-varint 0.4.0",
"void",
......@@ -3001,7 +3001,7 @@ dependencies = [
"log 0.4.11",
"net2",
"rand 0.7.3",
"smallvec 1.4.1",
"smallvec 1.4.2",
"void",
"wasm-timer",
]
......@@ -3104,7 +3104,7 @@ dependencies = [
"log 0.4.11",
"lru 0.6.0",
"rand 0.7.3",
"smallvec 1.4.1",
"smallvec 1.4.2",
"wasm-timer",
]
......@@ -3148,7 +3148,7 @@ dependencies = [
"libp2p-core",
"log 0.4.11",
"rand 0.7.3",
"smallvec 1.4.1",
"smallvec 1.4.2",
"void",
"wasm-timer",
]
......@@ -3604,7 +3604,7 @@ dependencies = [
"futures 0.3.5",
"log 0.4.11",
"pin-project",
"smallvec 1.4.1",
"smallvec 1.4.2",
"unsigned-varint 0.4.0",
]
......@@ -4307,7 +4307,7 @@ dependencies = [
"pallet-transaction-payment-rpc-runtime-api",
"parity-scale-codec",
"serde",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-core",
"sp-io",
"sp-runtime",
......@@ -4486,7 +4486,7 @@ dependencies = [
"parity-util-mem-derive",
"parking_lot 0.10.2",
"primitive-types",
"smallvec 1.4.1",
"smallvec 1.4.2",
"winapi 0.3.9",
]
......@@ -4593,7 +4593,7 @@ dependencies = [
"cloudabi 0.0.3",
"libc",
"redox_syscall",
"smallvec 1.4.1",
"smallvec 1.4.2",
"winapi 0.3.9",
]
......@@ -4608,7 +4608,7 @@ dependencies = [
"instant",
"libc",
"redox_syscall",
"smallvec 1.4.1",
"smallvec 1.4.2",
"winapi 0.3.9",
]
......@@ -4751,6 +4751,7 @@ dependencies = [
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-network",
"smol 0.3.3",
......@@ -4776,9 +4777,10 @@ dependencies = [
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"sc-keystore",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-core",
"sp-keyring",
"streamunordered",
......@@ -4810,6 +4812,29 @@ dependencies = [
"wasm-bindgen-futures",
]
[[package]]
name = "polkadot-collator-protocol"
version = "0.1.0"
dependencies = [
"assert_matches",
"derive_more 0.99.9",
"env_logger",
"futures 0.3.5",
"futures-timer 3.0.2",
"log 0.4.11",
"parity-scale-codec",
"polkadot-network-bridge",
"polkadot-node-network-protocol",
"polkadot-node-subsystem",
"polkadot-node-subsystem-test-helpers",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"smallvec 1.4.2",
"smol-timeout",
"sp-core",
"sp-keyring",
]
[[package]]
name = "polkadot-core-primitives"
version = "0.7.30"
......@@ -5073,7 +5098,7 @@ dependencies = [
"polkadot-primitives",
"polkadot-statement-table",
"sc-network",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-core",
"substrate-prometheus-endpoint",
]
......@@ -5092,10 +5117,11 @@ dependencies = [
"pin-project",
"polkadot-node-primitives",
"polkadot-node-subsystem",
"polkadot-node-subsystem-util",
"polkadot-primitives",
"polkadot-statement-table",
"sc-network",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-core",
]
......@@ -5120,7 +5146,7 @@ dependencies = [
"polkadot-statement-table",
"sc-keystore",
"sc-network",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-core",
"streamunordered",
]
......@@ -5286,7 +5312,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-api",
"sp-authority-discovery",
"sp-block-builder",
......@@ -5588,7 +5614,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-api",
"sp-authority-discovery",
"sp-block-builder",
......@@ -6229,7 +6255,7 @@ version = "4.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a415a013dd7c5d4221382329a5a3482566da675737494935cbbbcdec04662f9d"
dependencies = [
"smallvec 1.4.1",
"smallvec 1.4.2",
]
[[package]]
......@@ -6260,7 +6286,7 @@ checksum = "b9ba8aaf5fe7cf307c6dbdaeed85478961d29e25e3bee5169e11b92fa9f027a8"
dependencies = [
"log 0.4.11",
"rustc-hash",
"smallvec 1.4.1",
"smallvec 1.4.2",
]
[[package]]
......@@ -6393,7 +6419,7 @@ dependencies = [
"polkadot-runtime-parachains",
"serde",
"serde_derive",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-api",
"sp-authority-discovery",
"sp-block-builder",
......@@ -7767,9 +7793,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.4.1"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3757cb9d89161a2f24e1cf78efa0c1fcff485d18e3f55e0aa3480824ddaa0f3f"
checksum = "fbee7696b84bbf3d89a1c2eccff0850e3047ed46bfcd2e92c29a2d074d57e252"
[[package]]
name = "smol"
......@@ -7808,6 +7834,16 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "smol-timeout"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "024818c1f00b80e8171ddcfcee33860134293aa3aced60c9cbd7a5a2d41db392"
dependencies = [
"pin-project",
"smol 0.1.18",
]
[[package]]
name = "snow"
version = "0.7.1"
......@@ -8334,7 +8370,7 @@ dependencies = [
"parity-scale-codec",
"parking_lot 0.10.2",
"rand 0.7.3",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-core",
"sp-externalities",
"sp-panic-handler",
......@@ -9312,7 +9348,7 @@ dependencies = [
"serde",
"serde_json",
"sharded-slab",
"smallvec 1.4.1",
"smallvec 1.4.2",
"thread_local",
"tracing-core",
"tracing-log",
......@@ -9335,7 +9371,7 @@ dependencies = [
"hashbrown 0.8.0",
"log 0.4.11",
"rustc-hex",
"smallvec 1.4.1",
"smallvec 1.4.2",
]
[[package]]
......@@ -9703,7 +9739,7 @@ dependencies = [
"log 0.4.11",
"region",
"rustc-demangle",
"smallvec 1.4.1",
"smallvec 1.4.2",
"target-lexicon",
"wasmparser 0.59.0",
"wasmtime-environ",
......@@ -9964,7 +10000,7 @@ dependencies = [
"serde",
"serde_derive",
"serde_json",
"smallvec 1.4.1",
"smallvec 1.4.2",
"sp-api",
"sp-authority-discovery",
"sp-block-builder",
......
......@@ -54,6 +54,7 @@ members = [
"node/network/statement-distribution",
"node/network/bitfield-distribution",
"node/network/availability-distribution",
"node/network/collator-protocol",
"node/overseer",
"node/primitives",
"node/service",
......
......@@ -212,7 +212,12 @@ impl CandidateSelectionJob {
) {
if self.seconded_candidate.is_none() {
let (candidate_receipt, pov) =
match get_collation(relay_parent, para_id, self.sender.clone()).await {
match get_collation(
relay_parent,
para_id,
collator_id.clone(),
self.sender.clone(),
).await {
Ok(response) => response,
Err(err) => {
log::warn!(
......@@ -296,12 +301,14 @@ impl CandidateSelectionJob {
async fn get_collation(
relay_parent: Hash,
para_id: ParaId,
collator_id: CollatorId,
mut sender: mpsc::Sender<FromJob>,
) -> Result<(CandidateReceipt, PoV), Error> {
let (tx, rx) = oneshot::channel();
sender
.send(FromJob::Collator(CollatorProtocolMessage::FetchCollation(
relay_parent,
collator_id,
para_id,
tx,
)))
......@@ -514,7 +521,7 @@ mod tests {
CandidateSelectionMessage::Collation(
relay_parent,
para_id,
collator_id_clone,
collator_id_clone.clone(),
),
))
.await
......@@ -525,11 +532,13 @@ mod tests {
match msg {
FromJob::Collator(CollatorProtocolMessage::FetchCollation(
got_relay_parent,
collator_id,
got_para_id,
return_sender,
)) => {
assert_eq!(got_relay_parent, relay_parent);
assert_eq!(got_para_id, para_id);
assert_eq!(collator_id, collator_id_clone);
return_sender
.send((candidate_receipt.clone(), pov.clone()))
......
......@@ -14,6 +14,7 @@ polkadot-erasure-coding = { path = "../../../erasure-coding" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-network-bridge = { path = "../../network/bridge" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
derive_more = "0.99.9"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
......
......@@ -22,7 +22,8 @@ use polkadot_primitives::v1::{
GroupRotationInfo, HeadData, PersistedValidationData, OccupiedCore,
PoV, ScheduledCore, ValidatorPair,
};
use polkadot_subsystem_testhelpers::{self as test_helpers, TimeoutExt};
use polkadot_subsystem_testhelpers::{self as test_helpers};
use polkadot_node_subsystem_util::TimeoutExt;
use polkadot_node_network_protocol::ObservedRole;
use futures::{executor, future, Future};
......
......@@ -13,6 +13,7 @@ codec = { package="parity-scale-codec", version = "1.3.4" }
node-primitives = { package = "polkadot-node-primitives", path = "../../primitives" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
polkadot-node-subsystem-util = { package = "polkadot-node-subsystem-util", path = "../../subsystem-util" }
polkadot-network-bridge = { path = "../../network/bridge" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
......
......@@ -614,7 +614,8 @@ mod test {
use futures::executor;
use maplit::hashmap;
use polkadot_primitives::v1::{Signed, ValidatorPair, AvailabilityBitfield};
use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TimeoutExt};
use polkadot_node_subsystem_test_helpers::make_subsystem_context;
use polkadot_node_subsystem_util::TimeoutExt;
use sp_core::crypto::Pair;
use std::time::Duration;
use assert_matches::assert_matches;
......
[package]
name = "polkadot-collator-protocol"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
[dependencies]
futures = "0.3.5"
log = "0.4.11"
derive_more = "0.99.9"
codec = { package="parity-scale-codec", version = "1.3.4", features = ["std"] }
polkadot-primitives = { path = "../../../primitives" }
polkadot-network-bridge = { path = "../../network/bridge" }
polkadot-node-network-protocol = { path = "../../network/protocol" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" }
[dev-dependencies]
env_logger = "0.7.1"
assert_matches = "1.3.0"
smol-timeout = "0.1.0"
smallvec = "1.4.2"
futures-timer = "3.0.2"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] }
sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem-testhelpers = { package = "polkadot-node-subsystem-test-helpers", path = "../../subsystem-test-helpers" }
This diff is collapsed.
// Copyright 2020 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! The Collator Protocol allows collators and validators talk to each other.
//! This subsystem implements both sides of the collator protocol.
#![deny(missing_docs)]
use std::time::Duration;
use futures::{channel::oneshot, FutureExt};
use log::trace;
use polkadot_subsystem::{
Subsystem, SubsystemContext, SubsystemError, SpawnedSubsystem,
errors::RuntimeApiError,
metrics::{self, prometheus},
messages::{
AllMessages, CollatorProtocolMessage, NetworkBridgeMessage,
},
};
use polkadot_node_network_protocol::{
PeerId, ReputationChange as Rep,
};
use polkadot_primitives::v1::CollatorId;
use polkadot_node_subsystem_util as util;
mod collator_side;
mod validator_side;
const TARGET: &'static str = "colp";
const REQUEST_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Debug, derive_more::From)]
enum Error {
#[from]
Subsystem(SubsystemError),
#[from]
Oneshot(oneshot::Canceled),
#[from]
RuntimeApi(RuntimeApiError),
#[from]
UtilError(util::Error),
}
type Result<T> = std::result::Result<T, Error>;
enum ProtocolSide {
Validator,
Collator(CollatorId),
}
/// The collator protocol subsystem.
pub struct CollatorProtocolSubsystem {
protocol_side: ProtocolSide,
}
impl CollatorProtocolSubsystem {
/// Start the collator protocol.
/// If `id` is `Some` this is a collator side of the protocol.
/// If `id` is `None` this is a validator side of the protocol.
pub fn new(id: Option<CollatorId>) -> Self {
let protocol_side = match id {
Some(id) => ProtocolSide::Collator(id),
None => ProtocolSide::Validator,
};
Self {
protocol_side,
}
}
async fn run<Context>(self, ctx: Context) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
match self.protocol_side {
ProtocolSide::Validator => validator_side::run(ctx, REQUEST_TIMEOUT).await,
ProtocolSide::Collator(id) => collator_side::run(ctx, id).await,
}
}
}
/// Collator protocol metrics.
#[derive(Default, Clone)]
pub struct Metrics;
impl metrics::Metrics for Metrics {
fn try_register(_registry: &prometheus::Registry)
-> std::result::Result<Self, prometheus::PrometheusError> {
Ok(Metrics)
}
}
impl<Context> Subsystem<Context> for CollatorProtocolSubsystem
where
Context: SubsystemContext<Message = CollatorProtocolMessage> + Sync + Send,
{
type Metrics = Metrics;
fn start(self, ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "collator-protocol-subsystem",
future: Box::pin(async move { self.run(ctx) }.map(|_| ())),
}
}
}
/// Modify the reputation of a peer based on its behavior.
async fn modify_reputation<Context>(ctx: &mut Context, peer: PeerId, rep: Rep) -> Result<()>
where
Context: SubsystemContext<Message = CollatorProtocolMessage>,
{
trace!(
target: TARGET,
"Reputation change of {:?} for peer {:?}", rep, peer,
);
ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::ReportPeer(peer, rep),
)).await?;
Ok(())
}
This diff is collapsed.
......@@ -16,6 +16,7 @@ parking_lot = "0.10.0"
pin-project = "0.4.23"
polkadot-node-primitives = { path = "../primitives" }
polkadot-node-subsystem = { path = "../subsystem" }
polkadot-node-subsystem-util = { path = "../subsystem-util" }
polkadot-primitives = { path = "../../primitives" }
polkadot-statement-table = { path = "../../statement-table" }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
......