Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
parity
Mirrored projects
polkadot
Commits
ee895c25
Unverified
Commit
ee895c25
authored
Oct 30, 2021
by
Squirrel
Browse files
Merge branch 'master' into giles-remove-light-client
parents
6909f0b8
d3a9b5be
Pipeline
#164022
passed with stages
in 39 minutes and 4 seconds
Changes
40
Pipelines
1
Expand all
Hide whitespace changes
Inline
Side-by-side
.gitlab-ci.yml
View file @
ee895c25
...
...
@@ -197,7 +197,7 @@ test-build-linux-stable:
script
:
-
./scripts/gitlab/test_linux_stable.sh
# we're using the bin built here, instead of having a parallel `build-linux-release`
-
time cargo build --release --verbose --bin polkadot
-
time cargo build --release --verbose --bin polkadot
--features "disputes"
-
sccache -s
# pack artifacts
-
mkdir -p ./artifacts
...
...
Cargo.lock
View file @
ee895c25
This diff is collapsed.
Click to expand it.
node/core/approval-voting/src/lib.rs
View file @
ee895c25
...
...
@@ -1326,6 +1326,9 @@ async fn handle_approved_ancestor(
let
next_wakeup
=
wakeups
.wakeup_for
(
block_hash
,
candidate_hash
);
let
approved
=
triggered
&&
{
a_entry
.local_statements
()
.1
.is_some
()
};
tracing
::
debug!
(
target
:
LOG_TARGET
,
?
candidate_hash
,
...
...
@@ -1334,6 +1337,7 @@ async fn handle_approved_ancestor(
?
next_wakeup
,
status
=
%
status
(),
triggered
,
approved
,
"assigned."
);
},
...
...
node/core/av-store/src/lib.rs
View file @
ee895c25
...
...
@@ -162,9 +162,9 @@ fn query_inner<D: Decode>(
Ok
(
Some
(
res
))
},
Ok
(
None
)
=>
Ok
(
None
),
Err
(
e
)
=>
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
err
=
?
e
,
"Error reading from the availability store"
);
Err
(
e
.into
())
Err
(
e
rr
)
=>
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
?
err
,
"Error reading from the availability store"
);
Err
(
e
rr
.into
())
},
}
}
...
...
@@ -365,6 +365,20 @@ pub enum Error {
CustomDatabase
,
}
impl
Error
{
/// Determine if the error is irrecoverable
/// or notifying the user via means of logging
/// is sufficient.
fn
is_fatal
(
&
self
)
->
bool
{
match
self
{
Self
::
Io
(
_
)
=>
true
,
Self
::
Oneshot
(
_
)
=>
true
,
Self
::
CustomDatabase
=>
true
,
_
=>
false
,
}
}
}
impl
Error
{
fn
trace
(
&
self
)
{
match
self
{
...
...
@@ -524,8 +538,7 @@ where
match
res
{
Err
(
e
)
=>
{
e
.trace
();
if
let
Error
::
Subsystem
(
SubsystemError
::
Context
(
_
))
=
e
{
if
e
.is_fatal
()
{
break
}
},
...
...
@@ -840,8 +853,18 @@ where
let
(
tx
,
rx
)
=
oneshot
::
channel
();
ctx
.send_message
(
ChainApiMessage
::
FinalizedBlockHash
(
batch_num
,
tx
))
.await
;
match
rx
.await
??
{
None
=>
{
match
rx
.await
?
{
Err
(
err
)
=>
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
batch_num
,
?
err
,
"Failed to retrieve finalized block number."
,
);
break
},
Ok
(
None
)
=>
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
"Availability store was informed that block #{} is finalized,
\
...
...
@@ -851,7 +874,7 @@ where
break
},
Some
(
h
)
=>
h
,
Ok
(
Some
(
h
)
)
=>
h
,
}
};
...
...
@@ -1093,7 +1116,7 @@ fn process_message(
},
Err
(
e
)
=>
{
let
_
=
tx
.send
(
Err
(()));
return
Err
(
e
)
return
Err
(
e
.into
()
)
},
}
},
...
...
node/core/chain-selection/src/lib.rs
View file @
ee895c25
...
...
@@ -348,14 +348,11 @@ async fn run<Context, B>(
B
:
Backend
,
{
loop
{
let
res
=
run_
iteration
(
&
mut
ctx
,
&
mut
backend
,
&
stagnant_check_interval
,
&*
clock
)
.await
;
let
res
=
run_
until_error
(
&
mut
ctx
,
&
mut
backend
,
&
stagnant_check_interval
,
&*
clock
)
.await
;
match
res
{
Err
(
e
)
=>
{
e
.trace
();
if
let
Error
::
Subsystem
(
SubsystemError
::
Context
(
_
))
=
e
{
break
}
break
},
Ok
(())
=>
{
tracing
::
info!
(
target
:
LOG_TARGET
,
"received `Conclude` signal, exiting"
);
...
...
@@ -370,7 +367,7 @@ async fn run<Context, B>(
//
// A return value of `Ok` indicates that an exit should be made, while non-fatal errors
// lead to another call to this function.
async
fn
run_
iteration
<
Context
,
B
>
(
async
fn
run_
until_error
<
Context
,
B
>
(
ctx
:
&
mut
Context
,
backend
:
&
mut
B
,
stagnant_check_interval
:
&
StagnantCheckInterval
,
...
...
@@ -440,21 +437,36 @@ async fn fetch_finalized(
ctx
:
&
mut
impl
SubsystemContext
,
)
->
Result
<
Option
<
(
Hash
,
BlockNumber
)
>
,
Error
>
{
let
(
number_tx
,
number_rx
)
=
oneshot
::
channel
();
let
(
hash_tx
,
hash_rx
)
=
oneshot
::
channel
();
ctx
.send_message
(
ChainApiMessage
::
FinalizedBlockNumber
(
number_tx
))
.await
;
let
number
=
number_rx
.await
??
;
let
number
=
match
number_rx
.await
?
{
Ok
(
number
)
=>
number
,
Err
(
err
)
=>
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
?
err
,
"Fetching finalized number failed"
);
return
Ok
(
None
)
},
};
let
(
hash_tx
,
hash_rx
)
=
oneshot
::
channel
();
ctx
.send_message
(
ChainApiMessage
::
FinalizedBlockHash
(
number
,
hash_tx
))
.await
;
match
hash_rx
.await
??
{
None
=>
{
match
hash_rx
.await
?
{
Err
(
err
)
=>
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
number
,
?
err
,
"Fetching finalized block number failed"
);
Ok
(
None
)
},
Ok
(
None
)
=>
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
number
,
"Missing hash for finalized block number"
);
return
Ok
(
None
)
Ok
(
None
)
},
Some
(
h
)
=>
Ok
(
Some
((
h
,
number
))),
Ok
(
Some
(
h
)
)
=>
Ok
(
Some
((
h
,
number
))),
}
}
...
...
@@ -462,10 +474,13 @@ async fn fetch_header(
ctx
:
&
mut
impl
SubsystemContext
,
hash
:
Hash
,
)
->
Result
<
Option
<
Header
>
,
Error
>
{
let
(
h_
tx
,
h_
rx
)
=
oneshot
::
channel
();
ctx
.send_message
(
ChainApiMessage
::
BlockHeader
(
hash
,
h_
tx
))
.await
;
let
(
tx
,
rx
)
=
oneshot
::
channel
();
ctx
.send_message
(
ChainApiMessage
::
BlockHeader
(
hash
,
tx
))
.await
;
h_rx
.await
?
.map_err
(
Into
::
into
)
Ok
(
rx
.await
?
.unwrap_or_else
(|
err
|
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
?
hash
,
?
err
,
"Missing hash for finalized block number"
);
None
}))
}
async
fn
fetch_block_weight
(
...
...
@@ -475,7 +490,12 @@ async fn fetch_block_weight(
let
(
tx
,
rx
)
=
oneshot
::
channel
();
ctx
.send_message
(
ChainApiMessage
::
BlockWeight
(
hash
,
tx
))
.await
;
rx
.await
?
.map_err
(
Into
::
into
)
let
res
=
rx
.await
?
;
Ok
(
res
.unwrap_or_else
(|
err
|
{
tracing
::
warn!
(
target
:
LOG_TARGET
,
?
hash
,
?
err
,
"Missing hash for finalized block number"
);
None
}))
}
// Handle a new active leaf.
...
...
@@ -590,7 +610,7 @@ fn extract_reversion_logs(header: &Header) -> Vec<BlockNumber> {
logs
}
// Handle a finalized block event.
//
/
Handle a finalized block event.
fn
handle_finalized_block
(
backend
:
&
mut
impl
Backend
,
finalized_hash
:
Hash
,
...
...
node/core/pvf/Cargo.toml
View file @
ee895c25
...
...
@@ -15,7 +15,7 @@ async-process = "1.1.0"
assert_matches
=
"1.4.0"
futures
=
"0.3.17"
futures-timer
=
"3.0.2"
libc
=
"0.2.10
4
"
libc
=
"0.2.10
5
"
slotmap
=
"1.0"
tracing
=
"0.1.29"
pin-project
=
"1.0.8"
...
...
node/core/pvf/src/artifacts.rs
View file @
ee895c25
...
...
@@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use
crate
::
error
::
PrepareError
;
use
always_assert
::
always
;
use
async_std
::
path
::{
Path
,
PathBuf
};
use
parity_scale_codec
::{
Decode
,
Encode
};
...
...
@@ -23,30 +24,19 @@ use std::{
time
::{
Duration
,
SystemTime
},
};
/// A final product of preparation process. Contains either a ready to run compiled artifact or
/// a description what went wrong.
/// A wrapper for the compiled PVF code.
#[derive(Encode,
Decode)]
pub
enum
Artifact
{
/// During the prevalidation stage of preparation an issue was found with the PVF.
PrevalidationErr
(
String
),
/// Compilation failed for the given PVF.
PreparationErr
(
String
),
/// This state indicates that the process assigned to prepare the artifact wasn't responsible
/// or were killed. This state is reported by the validation host (not by the worker).
DidntMakeIt
,
/// The PVF passed all the checks and is ready for execution.
Compiled
{
compiled_artifact
:
Vec
<
u8
>
},
}
pub
struct
CompiledArtifact
(
Vec
<
u8
>
);
impl
Artifact
{
/// Serializes this struct into a byte buffer.
pub
fn
serialize
(
&
self
)
->
Vec
<
u8
>
{
self
.encode
()
impl
CompiledArtifact
{
pub
fn
new
(
code
:
Vec
<
u8
>
)
->
Self
{
Self
(
code
)
}
}
/// Deserialize the given byte buffer to an a
rtifact
.
pub
fn
deserialize
(
mut
bytes
:
&
[
u8
])
->
Result
<
Self
,
String
>
{
Artifact
::
decode
(
&
mut
bytes
)
.map_err
(|
e
|
format!
(
"{:?}"
,
e
)
)
impl
AsRef
<
[
u8
]
>
for
CompiledA
rtifact
{
fn
as_ref
(
&
self
)
->
&
[
u8
]
{
self
.0
.as_slice
(
)
}
}
...
...
@@ -117,6 +107,9 @@ pub enum ArtifactState {
},
/// A task to prepare this artifact is scheduled.
Preparing
,
/// The code couldn't be compiled due to an error. Such artifacts
/// never reach the executor and stay in the host's memory.
FailedToProcess
(
PrepareError
),
}
/// A container of all known artifact ids and their states.
...
...
node/core/pvf/src/error.rs
View file @
ee895c25
...
...
@@ -14,6 +14,20 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use
parity_scale_codec
::{
Decode
,
Encode
};
/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug,
Clone,
Encode,
Decode)]
pub
enum
PrepareError
{
/// During the prevalidation stage of preparation an issue was found with the PVF.
Prevalidation
(
String
),
/// Compilation failed for the given PVF.
Preparation
(
String
),
/// This state indicates that the process assigned to prepare the artifact wasn't responsible
/// or were killed. This state is reported by the validation host (not by the worker).
DidNotMakeIt
,
}
/// A error raised during validation of the candidate.
#[derive(Debug,
Clone)]
pub
enum
ValidationError
{
...
...
@@ -54,3 +68,14 @@ pub enum InvalidCandidate {
/// PVF execution (compilation is not included) took more time than was allotted.
HardTimeout
,
}
impl
From
<
PrepareError
>
for
ValidationError
{
fn
from
(
error
:
PrepareError
)
->
Self
{
let
error_str
=
match
error
{
PrepareError
::
Prevalidation
(
err
)
=>
err
,
PrepareError
::
Preparation
(
err
)
=>
err
,
PrepareError
::
DidNotMakeIt
=>
"preparation timeout"
.to_owned
(),
};
ValidationError
::
InvalidCandidate
(
InvalidCandidate
::
WorkerReportedError
(
error_str
))
}
}
node/core/pvf/src/execute/worker.rs
View file @
ee895c25
...
...
@@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
use
crate
::{
artifacts
::{
Artifact
,
Artifact
PathId
},
artifacts
::{
Artifact
PathId
,
Compiled
Artifact
},
executor_intf
::
TaskExecutor
,
worker_common
::{
bytes_to_path
,
framed_recv
,
framed_send
,
path_to_bytes
,
spawn_with_program_path
,
...
...
@@ -49,8 +49,8 @@ pub enum Outcome {
/// PVF execution completed successfully and the result is returned. The worker is ready for
/// another job.
Ok
{
result_descriptor
:
ValidationResult
,
duration_ms
:
u64
,
idle_worker
:
IdleWorker
},
/// The candidate validation failed. It may be for example because the
preparation process
///
produced an error or the wasm execution triggered a trap
.
/// The candidate validation failed. It may be for example because the
wasm execution triggered a trap.
///
Errors related to the preparation process are not expected to be encountered by the execution workers
.
InvalidCandidate
{
err
:
String
,
idle_worker
:
IdleWorker
},
/// An internal error happened during the validation. Such an error is most likely related to
/// some transient glitch.
...
...
@@ -216,18 +216,12 @@ async fn validate_using_artifact(
Ok
(
b
)
=>
b
,
};
let
artifact
=
match
Artifact
::
de
serialize
(
&
artifact_bytes
)
{
let
artifact
=
match
Compiled
Artifact
::
de
code
(
&
mut
artifact_bytes
.as_slice
()
)
{
Err
(
e
)
=>
return
Response
::
InternalError
(
format!
(
"artifact deserialization: {:?}"
,
e
)),
Ok
(
a
)
=>
a
,
};
let
compiled_artifact
=
match
&
artifact
{
Artifact
::
PrevalidationErr
(
msg
)
=>
return
Response
::
format_invalid
(
"prevalidation"
,
msg
),
Artifact
::
PreparationErr
(
msg
)
=>
return
Response
::
format_invalid
(
"preparation"
,
msg
),
Artifact
::
DidntMakeIt
=>
return
Response
::
format_invalid
(
"preparation timeout"
,
""
),
Artifact
::
Compiled
{
compiled_artifact
}
=>
compiled_artifact
,
};
let
compiled_artifact
=
artifact
.as_ref
();
let
validation_started_at
=
Instant
::
now
();
let
descriptor_bytes
=
match
unsafe
{
...
...
node/core/pvf/src/host.rs
View file @
ee895c25
...
...
@@ -344,8 +344,7 @@ async fn run(
.await
);
},
from_prepare_queue
=
from_prepare_queue_rx
.next
()
=>
{
let
prepare
::
FromQueue
::
Prepared
(
artifact_id
)
=
break_if_fatal!
(
from_prepare_queue
.ok_or
(
Fatal
));
let
from_queue
=
break_if_fatal!
(
from_prepare_queue
.ok_or
(
Fatal
));
// Note that preparation always succeeds.
//
...
...
@@ -361,7 +360,7 @@ async fn run(
&
mut
artifacts
,
&
mut
to_execute_queue_tx
,
&
mut
awaiting_prepare
,
artifact_id
,
from_queue
,
)
.await
);
},
}
...
...
@@ -439,6 +438,9 @@ async fn handle_execute_pvf(
awaiting_prepare
.add
(
artifact_id
,
execution_timeout
,
params
,
result_tx
);
},
ArtifactState
::
FailedToProcess
(
error
)
=>
{
let
_
=
result_tx
.send
(
Err
(
ValidationError
::
from
(
error
.clone
())));
},
}
}
else
{
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
...
...
@@ -470,6 +472,7 @@ async fn handle_heads_up(
// Already preparing. We don't need to send a priority amend either because
// it can't get any lower than the background.
},
ArtifactState
::
FailedToProcess
(
_
)
=>
{},
}
}
else
{
// The artifact is unknown: register it and put a background job into the prepare queue.
...
...
@@ -491,8 +494,10 @@ async fn handle_prepare_done(
artifacts
:
&
mut
Artifacts
,
execute_queue
:
&
mut
mpsc
::
Sender
<
execute
::
ToQueue
>
,
awaiting_prepare
:
&
mut
AwaitingPrepare
,
artifact_id
:
ArtifactId
,
from_queue
:
prepare
::
FromQueue
,
)
->
Result
<
(),
Fatal
>
{
let
prepare
::
FromQueue
{
artifact_id
,
result
}
=
from_queue
;
// Make some sanity checks and extract the current state.
let
state
=
match
artifacts
.artifact_state_mut
(
&
artifact_id
)
{
None
=>
{
...
...
@@ -513,6 +518,12 @@ async fn handle_prepare_done(
never!
(
"the artifact is already prepared: {:?}"
,
artifact_id
);
return
Ok
(())
},
Some
(
ArtifactState
::
FailedToProcess
(
_
))
=>
{
// The reasoning is similar to the above, the artifact cannot be
// processed at this point.
never!
(
"the artifact is already processed unsuccessfully: {:?}"
,
artifact_id
);
return
Ok
(())
},
Some
(
state
@
ArtifactState
::
Preparing
)
=>
state
,
};
...
...
@@ -526,6 +537,12 @@ async fn handle_prepare_done(
continue
}
// Don't send failed artifacts to the execution's queue.
if
let
Err
(
ref
error
)
=
result
{
let
_
=
result_tx
.send
(
Err
(
ValidationError
::
from
(
error
.clone
())));
continue
}
send_execute
(
execute_queue
,
execute
::
ToQueue
::
Enqueue
{
...
...
@@ -538,8 +555,10 @@ async fn handle_prepare_done(
.await
?
;
}
// Now consider the artifact prepared.
*
state
=
ArtifactState
::
Prepared
{
last_time_needed
:
SystemTime
::
now
()
};
*
state
=
match
result
{
Ok
(())
=>
ArtifactState
::
Prepared
{
last_time_needed
:
SystemTime
::
now
()
},
Err
(
error
)
=>
ArtifactState
::
FailedToProcess
(
error
.clone
()),
};
Ok
(())
}
...
...
@@ -937,7 +956,7 @@ mod tests {
);
test
.from_prepare_queue_tx
.send
(
prepare
::
FromQueue
::
Prepared
(
artifact_id
(
1
))
)
.send
(
prepare
::
FromQueue
{
artifact_id
:
artifact_id
(
1
),
result
:
Ok
(())
}
)
.await
.unwrap
();
let
result_tx_pvf_1_1
=
assert_matches!
(
...
...
@@ -950,7 +969,7 @@ mod tests {
);
test
.from_prepare_queue_tx
.send
(
prepare
::
FromQueue
::
Prepared
(
artifact_id
(
2
))
)
.send
(
prepare
::
FromQueue
{
artifact_id
:
artifact_id
(
2
),
result
:
Ok
(())
}
)
.await
.unwrap
();
let
result_tx_pvf_2
=
assert_matches!
(
...
...
@@ -1005,7 +1024,7 @@ mod tests {
);
test
.from_prepare_queue_tx
.send
(
prepare
::
FromQueue
::
Prepared
(
artifact_id
(
1
))
)
.send
(
prepare
::
FromQueue
{
artifact_id
:
artifact_id
(
1
),
result
:
Ok
(())
}
)
.await
.unwrap
();
...
...
node/core/pvf/src/prepare/pool.rs
View file @
ee895c25
...
...
@@ -16,6 +16,7 @@
use
super
::
worker
::{
self
,
Outcome
};
use
crate
::{
error
::
PrepareError
,
metrics
::
Metrics
,
worker_common
::{
IdleWorker
,
WorkerHandle
},
LOG_TARGET
,
...
...
@@ -78,9 +79,16 @@ pub enum FromPool {
/// The given worker was just spawned and is ready to be used.
Spawned
(
Worker
),
/// The given worker either succeeded or failed the given job. Under any circumstances the
/// artifact file has been written. The `bool` says whether the worker ripped.
Concluded
(
Worker
,
bool
),
/// The given worker either succeeded or failed the given job.
Concluded
{
/// A key for retrieving the worker data from the pool.
worker
:
Worker
,
/// Indicates whether the worker process was killed.
rip
:
bool
,
/// [`Ok`] indicates that compiled artifact is successfully stored on disk.
/// Otherwise, an [error](PrepareError) is supplied.
result
:
Result
<
(),
PrepareError
>
,
},
/// The given worker ceased to exist.
Rip
(
Worker
),
...
...
@@ -295,7 +303,7 @@ fn handle_mux(
},
PoolEvent
::
StartWork
(
worker
,
outcome
)
=>
{
match
outcome
{
Outcome
::
Concluded
(
idle
)
=>
{
Outcome
::
Concluded
{
worker
:
idle
,
result
}
=>
{
let
data
=
match
spawned
.get_mut
(
worker
)
{
None
=>
{
// Perhaps the worker was killed meanwhile and the result is no longer
...
...
@@ -310,7 +318,7 @@ fn handle_mux(
let
old
=
data
.idle
.replace
(
idle
);
assert_matches!
(
old
,
None
,
"attempt to overwrite an idle worker"
);
reply
(
from_pool
,
FromPool
::
Concluded
(
worker
,
false
)
)
?
;
reply
(
from_pool
,
FromPool
::
Concluded
{
worker
,
rip
:
false
,
result
}
)
?
;
Ok
(())
},
...
...
@@ -321,9 +329,16 @@ fn handle_mux(
Ok
(())
},
Outcome
::
Did
n
tMakeIt
=>
{
Outcome
::
Did
No
tMakeIt
=>
{
if
attempt_retire
(
metrics
,
spawned
,
worker
)
{
reply
(
from_pool
,
FromPool
::
Concluded
(
worker
,
true
))
?
;
reply
(
from_pool
,
FromPool
::
Concluded
{
worker
,
rip
:
true
,
result
:
Err
(
PrepareError
::
DidNotMakeIt
),
},
)
?
;
}
Ok
(())
...
...
node/core/pvf/src/prepare/queue.rs
View file @
ee895c25
...
...
@@ -17,7 +17,9 @@
//! A queue that handles requests for PVF preparation.
use
super
::
pool
::{
self
,
Worker
};
use
crate
::{
artifacts
::
ArtifactId
,
metrics
::
Metrics
,
Priority
,
Pvf
,
LOG_TARGET
};
use
crate
::{
artifacts
::
ArtifactId
,
error
::
PrepareError
,
metrics
::
Metrics
,
Priority
,
Pvf
,
LOG_TARGET
,
};
use
always_assert
::{
always
,
never
};
use
async_std
::
path
::
PathBuf
;
use
futures
::{
channel
::
mpsc
,
stream
::
StreamExt
as
_
,
Future
,
SinkExt
};
...
...
@@ -29,7 +31,7 @@ pub enum ToQueue {
/// This schedules preparation of the given PVF.
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue
::Prepared
`] response. In case there is a need to bump the priority, use
/// [`FromQueue`] response. In case there is a need to bump the priority, use
/// [`ToQueue::Amend`].
Enqueue
{
priority
:
Priority
,
pvf
:
Pvf
},
/// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop.
...
...
@@ -37,9 +39,13 @@ pub enum ToQueue {
}
/// A response from queue.
#[derive(Debug,
PartialEq,
Eq)]
pub
enum
FromQueue
{
Prepared
(
ArtifactId
),
#[derive(Debug)]
pub
struct
FromQueue
{
/// Identifier of an artifact.
pub
(
crate
)
artifact_id
:
ArtifactId
,
/// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact
/// is successfully stored on disk. Otherwise, an [error](PrepareError) is supplied.
pub
(
crate
)
result
:
Result
<
(),
PrepareError
>
,
}
#[derive(Default)]
...
...
@@ -299,7 +305,8 @@ async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Resul
use
pool
::
FromPool
::
*
;
match
from_pool
{
Spawned
(
worker
)
=>
handle_worker_spawned
(
queue
,
worker
)
.await
?
,
Concluded
(
worker
,
rip
)
=>
handle_worker_concluded
(
queue
,
worker
,
rip
)
.await
?
,
Concluded
{
worker
,
rip
,
result
}
=>
handle_worker_concluded
(
queue
,
worker
,
rip
,
result
)
.await
?
,
Rip
(
worker
)
=>
handle_worker_rip
(
queue
,
worker
)
.await
?
,
}
Ok
(())
...
...
@@ -320,6 +327,7 @@ async fn handle_worker_concluded(
queue
:
&
mut
Queue
,
worker
:
Worker
,
rip
:
bool
,
result
:
Result
<
(),
PrepareError
>
,
)
->
Result
<
(),
Fatal
>
{
queue
.metrics
.prepare_concluded
();
...
...
@@ -377,7 +385,7 @@ async fn handle_worker_concluded(
"prepare worker concluded"
,
);
reply
(
&
mut
queue
.from_queue_tx
,
FromQueue
::
Prepared
(
artifact_id
)
)
?
;
reply
(
&
mut
queue
.from_queue_tx
,
FromQueue
{
artifact_id
,
result
}
)
?
;
// Figure out what to do with the worker.
if
rip
{
...
...
@@ -641,12 +649,9 @@ mod tests {
let
w
=
test
.workers
.insert
(());
test
.send_from_pool
(
pool
::
FromPool
::
Spawned
(
w
));
test
.send_from_pool
(
pool
::
FromPool
::
Concluded
(
w
,
false
)
);
test
.send_from_pool
(
pool
::
FromPool
::
Concluded
{
worker
:
w
,
rip
:
false
,
result
:
Ok
(())
}
);
assert_eq!
(
test
.poll_and_recv_from_queue
()
.await
,
FromQueue
::
Prepared
(
pvf
(
1
)
.as_artifact_id
())
);
assert_eq!
(
test
.poll_and_recv_from_queue
()
.await
.artifact_id
,
pvf
(
1
)
.as_artifact_id
());