Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
parity
Mirrored projects
polkadot
Commits
4d13f3d1
Commit
4d13f3d1
authored
Aug 06, 2018
by
asynchronous rob
Committed by
Benjamin Kampmann
Aug 06, 2018
Browse files
Availability/Extrinsic store (#465)
parent
96da2bf7
Changes
15
Hide whitespace changes
Inline
Side-by-side
availability-store/Cargo.toml
0 → 100644
View file @
4d13f3d1
[package]
name
=
"polkadot-availability-store"
description
=
"Persistent database for parachain data"
version
=
"0.1.0"
authors
=
[
"Parity Technologies <admin@parity.io>"
]
[dependencies]
polkadot-primitives
=
{
path
=
"../primitives"
}
parking_lot
=
"0.4"
log
=
"0.3"
substrate-codec
=
{
path
=
"../../substrate/codec"
}
substrate-primitives
=
{
path
=
"../../substrate/primitives"
}
kvdb
=
{
git
=
"https://github.com/paritytech/parity.git"
}
kvdb-rocksdb
=
{
git
=
"https://github.com/paritytech/parity.git"
}
kvdb-memorydb
=
{
git
=
"https://github.com/paritytech/parity.git"
}
availability-store/src/lib.rs
0 → 100644
View file @
4d13f3d1
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.
// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.
//! Persistent database for parachain data.
extern
crate
polkadot_primitives
;
extern
crate
parking_lot
;
extern
crate
substrate_codec
as
codec
;
extern
crate
substrate_primitives
;
extern
crate
kvdb
;
extern
crate
kvdb_rocksdb
;
extern
crate
kvdb_memorydb
;
#[macro_use]
extern
crate
log
;
use
codec
::{
Encode
,
Decode
};
use
kvdb
::{
KeyValueDB
,
DBTransaction
};
use
kvdb_rocksdb
::{
Database
,
DatabaseConfig
};
use
polkadot_primitives
::
Hash
;
use
polkadot_primitives
::
parachain
::{
Id
as
ParaId
,
BlockData
,
Extrinsic
};
use
std
::
collections
::
HashSet
;
use
std
::
path
::
PathBuf
;
use
std
::
sync
::
Arc
;
use
std
::
io
;
mod
columns
{
pub
const
DATA
:
Option
<
u32
>
=
Some
(
0
);
pub
const
META
:
Option
<
u32
>
=
Some
(
1
);
pub
const
NUM_COLUMNS
:
u32
=
2
;
}
/// Configuration for the availability store.
pub
struct
Config
{
/// Cache size in bytes. If `None` default is used.
pub
cache_size
:
Option
<
usize
>
,
/// Path to the database.
pub
path
:
PathBuf
,
}
/// Some data to keep available.
pub
struct
Data
{
/// The relay chain parent hash this should be localized to.
pub
relay_parent
:
Hash
,
/// The parachain index for this candidate.
pub
parachain_id
:
ParaId
,
/// Unique candidate receipt hash.
pub
candidate_hash
:
Hash
,
/// Block data.
pub
block_data
:
BlockData
,
/// Extrinsic data.
pub
extrinsic
:
Option
<
Extrinsic
>
,
}
fn
extract_io_err
(
err
:
::
kvdb
::
Error
)
->
io
::
Error
{
match
err
{
::
kvdb
::
Error
(::
kvdb
::
ErrorKind
::
Io
(
io_err
),
_
)
=>
io_err
,
::
kvdb
::
Error
(::
kvdb
::
ErrorKind
::
Msg
(
msg
),
_
)
=>
io
::
Error
::
new
(
io
::
ErrorKind
::
Other
,
msg
,
),
x
=>
io
::
Error
::
new
(
io
::
ErrorKind
::
Other
,
format!
(
"Unexpected error variant: {:?}"
,
x
),
// only necessary because of nonexaustive match.
)
}
}
fn
block_data_key
(
relay_parent
:
&
Hash
,
candidate_hash
:
&
Hash
)
->
Vec
<
u8
>
{
(
relay_parent
,
candidate_hash
,
0i8
)
.encode
()
}
fn
extrinsic_key
(
relay_parent
:
&
Hash
,
candidate_hash
:
&
Hash
)
->
Vec
<
u8
>
{
(
relay_parent
,
candidate_hash
,
1i8
)
.encode
()
}
/// Handle to the availability store.
#[derive(Clone)]
pub
struct
Store
{
inner
:
Arc
<
KeyValueDB
>
,
}
impl
Store
{
/// Create a new `Store` with given config on disk.
pub
fn
new
(
config
:
Config
)
->
io
::
Result
<
Self
>
{
let
mut
db_config
=
DatabaseConfig
::
with_columns
(
Some
(
columns
::
NUM_COLUMNS
));
db_config
.memory_budget
=
config
.cache_size
;
db_config
.wal
=
true
;
let
path
=
config
.path
.to_str
()
.ok_or_else
(||
io
::
Error
::
new
(
io
::
ErrorKind
::
Other
,
format!
(
"Bad database path: {:?}"
,
config
.path
),
))
?
;
let
db
=
Database
::
open
(
&
db_config
,
&
path
)
.map_err
(
extract_io_err
)
?
;
Ok
(
Store
{
inner
:
Arc
::
new
(
db
),
})
}
/// Create a new `Store` in-memory. Useful for tests.
pub
fn
new_in_memory
()
->
Self
{
Store
{
inner
:
Arc
::
new
(::
kvdb_memorydb
::
create
(::
columns
::
NUM_COLUMNS
)),
}
}
/// Make some data available provisionally.
pub
fn
make_available
(
&
self
,
data
:
Data
)
->
io
::
Result
<
()
>
{
let
mut
tx
=
DBTransaction
::
new
();
// note the meta key.
let
mut
v
=
match
self
.inner
.get
(
columns
::
META
,
&*
data
.relay_parent
)
{
Ok
(
Some
(
raw
))
=>
Vec
::
decode
(
&
mut
&
raw
[
..
])
.expect
(
"all stored data serialized correctly; qed"
),
Ok
(
None
)
=>
Vec
::
new
(),
Err
(
e
)
=>
{
warn!
(
target
:
"availability"
,
"Error reading from availability store: {:?}"
,
e
);
Vec
::
new
()
}
};
v
.push
(
data
.candidate_hash
);
tx
.put_vec
(
columns
::
META
,
&
data
.relay_parent
[
..
],
v
.encode
());
tx
.put_vec
(
columns
::
DATA
,
block_data_key
(
&
data
.relay_parent
,
&
data
.candidate_hash
)
.as_slice
(),
data
.block_data
.encode
()
);
if
let
Some
(
_extrinsic
)
=
data
.extrinsic
{
tx
.put_vec
(
columns
::
DATA
,
extrinsic_key
(
&
data
.relay_parent
,
&
data
.candidate_hash
)
.as_slice
(),
vec!
[],
);
}
self
.inner
.write
(
tx
)
.map_err
(
extract_io_err
)
}
/// Note that a set of candidates have been included in a finalized block with given hash and parent hash.
pub
fn
candidates_finalized
(
&
self
,
parent
:
Hash
,
finalized_candidates
:
HashSet
<
Hash
>
)
->
io
::
Result
<
()
>
{
let
mut
tx
=
DBTransaction
::
new
();
let
v
=
match
self
.inner
.get
(
columns
::
META
,
&
parent
[
..
])
{
Ok
(
Some
(
raw
))
=>
Vec
::
decode
(
&
mut
&
raw
[
..
])
.expect
(
"all stored data serialized correctly; qed"
),
Ok
(
None
)
=>
Vec
::
new
(),
Err
(
e
)
=>
{
warn!
(
target
:
"availability"
,
"Error reading from availability store: {:?}"
,
e
);
Vec
::
new
()
}
};
tx
.delete
(
columns
::
META
,
&
parent
[
..
]);
for
candidate_hash
in
v
{
if
!
finalized_candidates
.contains
(
&
candidate_hash
)
{
tx
.delete
(
columns
::
DATA
,
block_data_key
(
&
parent
,
&
candidate_hash
)
.as_slice
());
tx
.delete
(
columns
::
DATA
,
extrinsic_key
(
&
parent
,
&
candidate_hash
)
.as_slice
());
}
}
self
.inner
.write
(
tx
)
.map_err
(
extract_io_err
)
}
/// Query block data.
pub
fn
block_data
(
&
self
,
relay_parent
:
Hash
,
candidate_hash
:
Hash
)
->
Option
<
BlockData
>
{
let
encoded_key
=
block_data_key
(
&
relay_parent
,
&
candidate_hash
);
match
self
.inner
.get
(
columns
::
DATA
,
&
encoded_key
[
..
])
{
Ok
(
Some
(
raw
))
=>
Some
(
BlockData
::
decode
(
&
mut
&
raw
[
..
])
.expect
(
"all stored data serialized correctly; qed"
)
),
Ok
(
None
)
=>
None
,
Err
(
e
)
=>
{
warn!
(
target
:
"availability"
,
"Error reading from availability store: {:?}"
,
e
);
None
}
}
}
/// Query extrinsic data.
pub
fn
extrinsic
(
&
self
,
relay_parent
:
Hash
,
candidate_hash
:
Hash
)
->
Option
<
Extrinsic
>
{
let
encoded_key
=
extrinsic_key
(
&
relay_parent
,
&
candidate_hash
);
match
self
.inner
.get
(
columns
::
DATA
,
&
encoded_key
[
..
])
{
Ok
(
Some
(
_raw
))
=>
Some
(
Extrinsic
),
Ok
(
None
)
=>
None
,
Err
(
e
)
=>
{
warn!
(
target
:
"availability"
,
"Error reading from availability store: {:?}"
,
e
);
None
}
}
}
}
#[cfg(test)]
mod
tests
{
use
super
::
*
;
#[test]
fn
finalization_removes_unneeded
()
{
let
relay_parent
=
[
1
;
32
]
.into
();
let
para_id_1
=
5
.into
();
let
para_id_2
=
6
.into
();
let
candidate_1
=
[
2
;
32
]
.into
();
let
candidate_2
=
[
3
;
32
]
.into
();
let
block_data_1
=
BlockData
(
vec!
[
1
,
2
,
3
]);
let
block_data_2
=
BlockData
(
vec!
[
4
,
5
,
6
]);
let
store
=
Store
::
new_in_memory
();
store
.make_available
(
Data
{
relay_parent
,
parachain_id
:
para_id_1
,
candidate_hash
:
candidate_1
,
block_data
:
block_data_1
.clone
(),
extrinsic
:
Some
(
Extrinsic
),
})
.unwrap
();
store
.make_available
(
Data
{
relay_parent
,
parachain_id
:
para_id_2
,
candidate_hash
:
candidate_2
,
block_data
:
block_data_2
.clone
(),
extrinsic
:
Some
(
Extrinsic
),
})
.unwrap
();
assert_eq!
(
store
.block_data
(
relay_parent
,
candidate_1
)
.unwrap
(),
block_data_1
);
assert_eq!
(
store
.block_data
(
relay_parent
,
candidate_2
)
.unwrap
(),
block_data_2
);
assert!
(
store
.extrinsic
(
relay_parent
,
candidate_1
)
.is_some
());
assert!
(
store
.extrinsic
(
relay_parent
,
candidate_2
)
.is_some
());
store
.candidates_finalized
(
relay_parent
,
[
candidate_1
]
.iter
()
.cloned
()
.collect
())
.unwrap
();
assert_eq!
(
store
.block_data
(
relay_parent
,
candidate_1
)
.unwrap
(),
block_data_1
);
assert!
(
store
.block_data
(
relay_parent
,
candidate_2
)
.is_none
());
assert!
(
store
.extrinsic
(
relay_parent
,
candidate_1
)
.is_some
());
assert!
(
store
.extrinsic
(
relay_parent
,
candidate_2
)
.is_none
());
}
}
consensus/Cargo.toml
View file @
4d13f3d1
...
...
@@ -13,6 +13,7 @@ log = "0.3"
exit-future
=
"0.1"
rhododendron
=
"0.2"
polkadot-api
=
{
path
=
"../api"
}
polkadot-availability-store
=
{
path
=
"../availability-store"
}
polkadot-parachain
=
{
path
=
"../parachain"
}
polkadot-primitives
=
{
path
=
"../primitives"
}
polkadot-runtime
=
{
path
=
"../runtime"
}
...
...
consensus/src/collation.rs
View file @
4d13f3d1
...
...
@@ -73,6 +73,11 @@ impl<C: Collators, P: PolkadotApi> CollationFetch<C, P> {
live_fetch
:
None
,
}
}
/// Access the underlying relay parent hash.
pub
fn
relay_parent
(
&
self
)
->
Hash
{
self
.relay_parent_hash
}
}
impl
<
C
:
Collators
,
P
:
PolkadotApi
>
Future
for
CollationFetch
<
C
,
P
>
{
...
...
consensus/src/lib.rs
View file @
4d13f3d1
...
...
@@ -32,6 +32,7 @@
extern
crate
ed25519
;
extern
crate
parking_lot
;
extern
crate
polkadot_api
;
extern
crate
polkadot_availability_store
as
extrinsic_store
;
extern
crate
polkadot_statement_table
as
table
;
extern
crate
polkadot_parachain
as
parachain
;
extern
crate
polkadot_transaction_pool
as
transaction_pool
;
...
...
@@ -66,6 +67,7 @@ use std::sync::Arc;
use
std
::
time
::{
Duration
,
Instant
};
use
codec
::{
Decode
,
Encode
};
use
extrinsic_store
::
Store
as
ExtrinsicStore
;
use
polkadot_api
::
PolkadotApi
;
use
polkadot_primitives
::{
Hash
,
Block
,
BlockId
,
BlockNumber
,
Header
,
Timestamp
,
SessionKey
};
use
polkadot_primitives
::
parachain
::{
Id
as
ParaId
,
Chain
,
DutyRoster
,
BlockData
,
Extrinsic
as
ParachainExtrinsic
,
CandidateReceipt
,
CandidateSignature
};
...
...
@@ -236,6 +238,8 @@ pub struct ProposerFactory<C, N, P> {
pub
handle
:
TaskExecutor
,
/// The duration after which parachain-empty blocks will be allowed.
pub
parachain_empty_duration
:
Duration
,
/// Store for extrinsic data.
pub
extrinsic_store
:
ExtrinsicStore
,
}
impl
<
C
,
N
,
P
>
bft
::
Environment
<
Block
>
for
ProposerFactory
<
C
,
N
,
P
>
...
...
@@ -279,7 +283,7 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
debug!
(
target
:
"consensus"
,
"Active parachains: {:?}"
,
active_parachains
);
let
n_parachains
=
active_parachains
.len
();
let
table
=
Arc
::
new
(
SharedTable
::
new
(
group_info
,
sign_with
.clone
(),
parent_hash
));
let
table
=
Arc
::
new
(
SharedTable
::
new
(
group_info
,
sign_with
.clone
(),
parent_hash
,
self
.extrinsic_store
.clone
()
));
let
(
router
,
input
,
output
)
=
self
.network
.communication_for
(
authorities
,
table
.clone
(),
...
...
@@ -309,6 +313,7 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
router
.clone
(),
&
self
.handle
,
collation_work
,
self
.extrinsic_store
.clone
(),
);
let
proposer
=
Proposer
{
...
...
@@ -334,19 +339,42 @@ fn dispatch_collation_work<R, C, P>(
router
:
R
,
handle
:
&
TaskExecutor
,
work
:
Option
<
CollationFetch
<
C
,
P
>>
,
extrinsic_store
:
ExtrinsicStore
,
)
->
exit_future
::
Signal
where
C
:
Collators
+
Send
+
'static
,
P
:
PolkadotApi
+
Send
+
Sync
+
'static
,
<
C
::
Collation
as
IntoFuture
>
::
Future
:
Send
+
'static
,
R
:
TableRouter
+
Send
+
'static
,
{
use
extrinsic_store
::
Data
;
let
(
signal
,
exit
)
=
exit_future
::
signal
();
let
work
=
match
work
{
Some
(
w
)
=>
w
,
None
=>
return
signal
,
};
let
relay_parent
=
work
.relay_parent
();
let
handled_work
=
work
.then
(
move
|
result
|
match
result
{
Ok
(
Some
((
collation
,
extrinsic
)))
=>
{
router
.local_candidate
(
collation
.receipt
,
collation
.block_data
,
extrinsic
);
Ok
((
collation
,
extrinsic
))
=>
{
let
res
=
extrinsic_store
.make_available
(
Data
{
relay_parent
,
parachain_id
:
collation
.receipt.parachain_index
,
candidate_hash
:
collation
.receipt
.hash
(),
block_data
:
collation
.block_data
.clone
(),
extrinsic
:
Some
(
extrinsic
.clone
()),
});
match
res
{
Ok
(())
=>
router
.local_candidate
(
collation
.receipt
,
collation
.block_data
,
extrinsic
),
Err
(
e
)
=>
warn!
(
target
:
"consensus"
,
"Failed to make collation data available: {:?}"
,
e
),
}
Ok
(())
}
Ok
(
None
)
=>
Ok
(()),
Err
(
_e
)
=>
{
warn!
(
target
:
"consensus"
,
"Failed to collate candidate"
);
Ok
(())
...
...
consensus/src/service.rs
View file @
4d13f3d1
...
...
@@ -28,12 +28,13 @@ use std::time::{Duration, Instant};
use
std
::
sync
::
Arc
;
use
bft
::{
self
,
BftService
};
use
client
::{
BlockchainEvents
,
ChainHead
};
use
client
::{
BlockchainEvents
,
ChainHead
,
BlockBody
};
use
ed25519
;
use
futures
::
prelude
::
*
;
use
polkadot_api
::
LocalPolkadotApi
;
use
polkadot_primitives
::{
Block
,
Header
};
use
transaction_pool
::
TransactionPool
;
use
extrinsic_store
::
Store
as
ExtrinsicStore
;
use
tokio
::
executor
::
current_thread
::
TaskExecutor
as
LocalThreadHandle
;
use
tokio
::
runtime
::
TaskExecutor
as
ThreadPoolHandle
;
...
...
@@ -89,6 +90,56 @@ fn start_bft<F, C>(
}
}
// creates a task to prune redundant entries in availability store upon block finalization
//
// NOTE: this will need to be changed to finality notification rather than
// block import notifications when the consensus switches to non-instant finality.
fn
prune_unneeded_availability
<
C
>
(
client
:
Arc
<
C
>
,
extrinsic_store
:
ExtrinsicStore
)
->
impl
Future
<
Item
=
(),
Error
=
()
>
+
Send
where
C
:
Send
+
Sync
+
BlockchainEvents
<
Block
>
+
BlockBody
<
Block
>
+
'static
{
use
codec
::{
Encode
,
Decode
};
use
polkadot_primitives
::
BlockId
;
use
polkadot_runtime
::
CheckedBlock
;
enum
NotifyError
{
NoBody
(::
client
::
error
::
Error
),
UnexpectedFormat
,
ExtrinsicsWrong
,
}
impl
NotifyError
{
fn
log
(
&
self
,
hash
:
&
::
polkadot_primitives
::
Hash
)
{
match
*
self
{
NotifyError
::
NoBody
(
ref
err
)
=>
warn!
(
"Failed to fetch block body for imported block {:?}: {:?}"
,
hash
,
err
),
NotifyError
::
UnexpectedFormat
=>
warn!
(
"Consensus outdated: Block {:?} has unexpected body format"
,
hash
),
NotifyError
::
ExtrinsicsWrong
=>
warn!
(
"Consensus outdated: Failed to fetch block body for imported block {:?}"
,
hash
),
}
}
}
client
.import_notification_stream
()
.for_each
(
move
|
notification
|
{
let
checked_block
=
client
.block_body
(
&
BlockId
::
hash
(
notification
.hash
))
.map_err
(
NotifyError
::
NoBody
)
.map
(|
b
|
::
polkadot_runtime
::
Block
::
decode
(
&
mut
b
.encode
()
.as_slice
()))
.and_then
(|
maybe_block
|
maybe_block
.ok_or
(
NotifyError
::
UnexpectedFormat
))
.and_then
(|
block
|
CheckedBlock
::
new
(
block
)
.map_err
(|
_
|
NotifyError
::
ExtrinsicsWrong
));
match
checked_block
{
Ok
(
block
)
=>
{
let
candidate_hashes
=
block
.parachain_heads
()
.iter
()
.map
(|
c
|
c
.hash
())
.collect
();
if
let
Err
(
e
)
=
extrinsic_store
.candidates_finalized
(
notification
.header.parent_hash
,
candidate_hashes
)
{
warn!
(
target
:
"consensus"
,
"Failed to prune unneeded available data: {:?}"
,
e
);
}
}
Err
(
e
)
=>
e
.log
(
&
notification
.hash
)
}
Ok
(())
})
}
/// Consensus service. Starts working when created.
pub
struct
Service
{
thread
:
Option
<
thread
::
JoinHandle
<
()
>>
,
...
...
@@ -105,10 +156,11 @@ impl Service {
thread_pool
:
ThreadPoolHandle
,
parachain_empty_duration
:
Duration
,
key
:
ed25519
::
Pair
,
extrinsic_store
:
ExtrinsicStore
,
)
->
Service
where
A
:
LocalPolkadotApi
+
Send
+
Sync
+
'static
,
C
:
BlockchainEvents
<
Block
>
+
ChainHead
<
Block
>
+
bft
::
BlockImport
<
Block
>
+
bft
::
Authorities
<
Block
>
+
Send
+
Sync
+
'static
,
C
:
BlockchainEvents
<
Block
>
+
ChainHead
<
Block
>
+
BlockBody
<
Block
>
+
bft
::
BlockImport
<
Block
>
+
bft
::
Authorities
<
Block
>
+
Send
+
Sync
+
'static
,
N
:
Network
+
Collators
+
Send
+
'static
,
N
::
TableRouter
:
Send
+
'static
,
<
N
::
Collation
as
IntoFuture
>
::
Future
:
Send
+
'static
,
...
...
@@ -124,7 +176,8 @@ impl Service {
collators
:
network
.clone
(),
network
,
parachain_empty_duration
,
handle
:
thread_pool
,
handle
:
thread_pool
.clone
(),
extrinsic_store
:
extrinsic_store
.clone
(),
};
let
bft_service
=
Arc
::
new
(
BftService
::
new
(
client
.clone
(),
key
,
factory
));
...
...
@@ -172,6 +225,14 @@ impl Service {
runtime
.spawn
(
notifications
);
runtime
.spawn
(
timed
);
let
prune_available
=
prune_unneeded_availability
(
client
,
extrinsic_store
)
.select
(
exit
.clone
())
.then
(|
_
|
Ok
(()));
// spawn this on the tokio executor since it's fine on a thread pool.
thread_pool
.spawn
(
prune_available
);
if
let
Err
(
e
)
=
runtime
.block_on
(
exit
)
{
debug!
(
"BFT event loop error {:?}"
,
e
);
}
...
...
consensus/src/shared_table/mod.rs
View file @
4d13f3d1
...
...
@@ -20,6 +20,7 @@
use
std
::
collections
::{
HashMap
,
HashSet
};
use
std
::
sync
::
Arc
;
use
extrinsic_store
::{
Data
,
Store
as
ExtrinsicStore
};
use
table
::{
self
,
Table
,
Context
as
TableContextTrait
};
use
polkadot_primitives
::{
Hash
,
SessionKey
};
use
polkadot_primitives
::
parachain
::{
Id
as
ParaId
,
BlockData
,
Collation
,
Extrinsic
,
CandidateReceipt
};
...
...
@@ -82,6 +83,7 @@ struct SharedTableInner {
checked_validity
:
HashSet
<
Hash
>
,
checked_availability
:
HashSet
<
Hash
>
,
trackers
:
Vec
<
IncludabilitySender
>
,
extrinsic_store
:
ExtrinsicStore
,
}
impl
SharedTableInner
{
...
...
@@ -153,6 +155,8 @@ impl SharedTableInner {
work
.map
(|
work
|
StatementProducer
{
produced_statements
:
Default
::
default
(),
extrinsic_store
:
self
.extrinsic_store
.clone
(),
relay_parent
:
context
.parent_hash
.clone
(),
work
})
}
...
...
@@ -186,6 +190,8 @@ pub struct ProducedStatements {
pub
struct
StatementProducer
<
D
:
Future
,
E
:
Future
>
{
produced_statements
:
ProducedStatements
,
work
:
Work
<
D
,
E
>
,
relay_parent
:
Hash
,
extrinsic_store
:
ExtrinsicStore
,
}
impl
<
D
:
Future
,
E
:
Future
>
StatementProducer
<
D
,
E
>
{
...
...
@@ -221,25 +227,32 @@ impl<D, E, C, Err> Future for PrimedStatementProducer<D, E, C>
D
:
Future
<
Item
=
BlockData
,
Error
=
Err
>
,
E
:
Future
<
Item
=
Extrinsic
,
Error
=
Err
>
,
C
:
FnMut
(
Collation
)
->
Option
<
bool
>
,
Err
:
From
<
::
std
::
io
::
Error
>
,
{
type
Item
=
ProducedStatements
;
type
Error
=
Err
;
fn
poll
(
&
mut
self
)
->
Poll
<
ProducedStatements
,
Err
>
{
let
work
=
&
mut
self
.inner.work
;
let
candidate
=
&
work
.candidate_receipt
;
let
statements
=
&
mut
self
.inner.produced_statements
;
let
mut
candidate_hash
=
None
;
let
mut
candidate_hash
=
move
||
candidate_hash
.get_or_insert_with
(||
candidate
.hash
())
.clone
();
if
let
Async
::
Ready
(
block_data
)
=
work
.fetch_block_data
.poll
()
?
{
self
.inner.produced_
statements.block_data
=
Some
(
block_data
.clone
());
statements
.block_data
=
Some
(
block_data
.clone
());
if
work
.evaluate
{
let
is_good
=
(
self
.check_candidate
)(
Collation
{
block_data
,
receipt
:
work
.candidate_receipt
.clone
(),
});
let
hash
=
work
.
candidate_
receipt
.
hash
();
let
hash
=
candidate_hash
();
debug!
(
target
:
"consensus"
,
"Making validity statement about candidate {}: is_good? {:?}"
,
hash
,
is_good
);
self
.inner.produced_
statements.validity
=
match
is_good
{
statements
.validity
=
match
is_good
{
Some
(
true
)
=>
Some
(
GenericStatement
::
Valid
(
hash
)),
Some
(
false
)
=>
Some
(
GenericStatement
::
Invalid
(
hash
)),
None
=>
None
,
...
...
@@ -251,12 +264,11 @@ impl<D, E, C, Err> Future for PrimedStatementProducer<D, E, C>
if
let
Async
::
Ready
(
Some
(
extrinsic
))
=
work
.fetch_extrinsic
.poll
()
?
{
if
work
.ensure_available
{
let
hash
=
work
.
candidate_
receipt
.
hash
();
let
hash
=
candidate_hash
();
debug!
(
target
:
"consensus"
,
"Claiming candidate {} available."
,
hash
);
// TODO: actually wait for block data and then ensure availability.
self
.inner.produced_statements.extrinsic
=
Some
(
extrinsic
);
self
.inner.produced_statements.availability
=
statements
.extrinsic
=
Some
(
extrinsic
);
statements
.availability
=
Some
(
GenericStatement
::
Available
(
hash
));
work
.ensure_available
=
false
;
...
...
@@ -269,7 +281,18 @@ impl<D, E, C, Err> Future for PrimedStatementProducer<D, E, C>
};
if
done
{
Ok
(
Async
::
Ready
(::
std
::
mem
::
replace
(
&
mut
self
.inner.produced_statements
,
Default
::
default
())))
// commit claimed-available data to disk before returning statements from the future.
if
let
(
&
Some
(
ref
block
),
extrinsic
)
=
(
&
statements
.block_data
,
&
statements
.extrinsic
)
{
self
.inner.extrinsic_store
.make_available
(
Data
{
relay_parent
:
self
.inner.relay_parent
,
parachain_id
:
work
.candidate_receipt.parachain_index
,
candidate_hash
:
candidate_hash
(),
block_data
:
block
.clone
(),
extrinsic
:
extrinsic
.clone
(),
})
?
;
}
Ok
(
Async
::
Ready
(::
std
::
mem
::
replace
(
statements
,
Default
::
default
())))
}
else
{
Ok
(
Async
::
NotReady
)
}
...
...
@@ -296,7 +319,12 @@ impl SharedTable {
///
/// Provide the key to sign with, and the parent hash of the relay chain
/// block being built.
pub
fn
new
(
groups
:
HashMap
<
ParaId
,
GroupInfo
>
,
key
:
Arc
<
::
ed25519
::
Pair
>
,
parent_hash
:
Hash
)
->
Self
{
pub
fn
new
(
groups
:
HashMap
<
ParaId
,
GroupInfo
>
,
key
:
Arc
<
::
ed25519
::
Pair
>
,
parent_hash
:
Hash
,
extrinsic_store
:
ExtrinsicStore
,
)
->
Self
{
SharedTable
{