Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
parity
Mirrored projects
polkadot
Commits
96da2bf7
Commit
96da2bf7
authored
Aug 02, 2018
by
asynchronous rob
Committed by
Arkadiy Paronyan
Aug 02, 2018
Browse files
force BFT delay in consensus service, not in proposer logic (#477)
* move forced delay to consensus service * fiddle with logging
parent
4711e1e0
Changes
2
Hide whitespace changes
Inline
Side-by-side
consensus/src/lib.rs
View file @
96da2bf7
...
...
@@ -258,8 +258,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
)
->
Result
<
(
Self
::
Proposer
,
Self
::
Input
,
Self
::
Output
),
Error
>
{
use
runtime_primitives
::
traits
::{
Hash
as
HashT
,
BlakeTwo256
};
const
DELAY_UNTIL
:
Duration
=
Duration
::
from_millis
(
5000
);
let
parent_hash
=
parent_header
.hash
()
.into
();
let
id
=
BlockId
::
hash
(
parent_hash
);
...
...
@@ -295,9 +293,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
self
.parachain_empty_duration
.clone
(),
);
debug!
(
target
:
"bft"
,
"Initialising consensus proposer. Refusing to evaluate for {:?} from now."
,
DELAY_UNTIL
);
let
validation_para
=
match
local_duty
.validation
{
Chain
::
Relay
=>
None
,
Chain
::
Parachain
(
id
)
=>
Some
(
id
),
...
...
@@ -320,7 +315,6 @@ impl<C, N, P> bft::Environment<Block> for ProposerFactory<C, N, P>
client
:
self
.client
.clone
(),
dynamic_inclusion
,
local_key
:
sign_with
,
minimum_delay
:
now
+
DELAY_UNTIL
,
parent_hash
,
parent_id
:
id
,
parent_number
:
parent_header
.number
,
...
...
@@ -375,7 +369,6 @@ pub struct Proposer<C: PolkadotApi> {
client
:
Arc
<
C
>
,
dynamic_inclusion
:
DynamicInclusion
,
local_key
:
Arc
<
ed25519
::
Pair
>
,
minimum_delay
:
Instant
,
parent_hash
:
Hash
,
parent_id
:
BlockId
,
parent_number
:
BlockNumber
,
...
...
@@ -406,17 +399,10 @@ impl<C> bft::Proposer<Block> for Proposer<C>
initial_included
,
)
.unwrap_or_else
(||
now
+
Duration
::
from_millis
(
1
));
let
minimum_delay
=
if
self
.minimum_delay
>
now
+
ATTEMPT_PROPOSE_EVERY
{
Some
(
Delay
::
new
(
self
.minimum_delay
))
}
else
{
None
};
let
timing
=
ProposalTiming
{
attempt_propose
:
Interval
::
new
(
now
+
ATTEMPT_PROPOSE_EVERY
,
ATTEMPT_PROPOSE_EVERY
),
enough_candidates
:
Delay
::
new
(
enough_candidates
),
dynamic_inclusion
:
self
.dynamic_inclusion
.clone
(),
minimum_delay
,
last_included
:
initial_included
,
};
...
...
@@ -489,11 +475,7 @@ impl<C> bft::Proposer<Block> for Proposer<C>
// delay casting vote until able according to minimum block time,
// timestamp delay, and count delay.
// construct a future from the maximum of the two durations.
let
max_delay
=
[
timestamp_delay
,
count_delay
,
Some
(
self
.minimum_delay
)]
.iter
()
.cloned
()
.max
()
.expect
(
"iterator not empty; thus max returns `Some`; qed"
);
let
max_delay
=
::
std
::
cmp
::
max
(
timestamp_delay
,
count_delay
);
let
temporary_delay
=
match
max_delay
{
Some
(
duration
)
=>
future
::
Either
::
A
(
...
...
@@ -615,7 +597,6 @@ struct ProposalTiming {
attempt_propose
:
Interval
,
dynamic_inclusion
:
DynamicInclusion
,
enough_candidates
:
Delay
,
minimum_delay
:
Option
<
Delay
>
,
last_included
:
usize
,
}
...
...
@@ -632,12 +613,6 @@ impl ProposalTiming {
x
.expect
(
"timer still alive; intervals never end; qed"
);
}
if
let
Some
(
ref
mut
min
)
=
self
.minimum_delay
{
try_ready!
(
min
.poll
()
.map_err
(
ErrorKind
::
Timer
));
}
self
.minimum_delay
=
None
;
// after this point, the future must have completed.
if
included
==
self
.last_included
{
return
self
.enough_candidates
.poll
()
.map_err
(
ErrorKind
::
Timer
);
}
...
...
consensus/src/service.rs
View file @
96da2bf7
...
...
@@ -38,7 +38,7 @@ use transaction_pool::TransactionPool;
use
tokio
::
executor
::
current_thread
::
TaskExecutor
as
LocalThreadHandle
;
use
tokio
::
runtime
::
TaskExecutor
as
ThreadPoolHandle
;
use
tokio
::
runtime
::
current_thread
::
Runtime
as
LocalRuntime
;
use
tokio
::
timer
::
Interval
;
use
tokio
::
timer
::
{
Delay
,
Interval
}
;
use
super
::{
Network
,
Collators
,
ProposerFactory
};
use
error
;
...
...
@@ -49,8 +49,8 @@ const TIMER_INTERVAL_MS: u64 = 500;
// spin up an instance of BFT agreement on the current thread's executor.
// panics if there is no current thread executor.
fn
start_bft
<
F
,
C
>
(
header
:
&
Header
,
bft_service
:
&
BftService
<
Block
,
F
,
C
>
,
header
:
Header
,
bft_service
:
Arc
<
BftService
<
Block
,
F
,
C
>
>
,
)
where
F
:
bft
::
Environment
<
Block
>
+
'static
,
C
:
bft
::
BlockImport
<
Block
>
+
bft
::
Authorities
<
Block
>
+
'static
,
...
...
@@ -58,14 +58,35 @@ fn start_bft<F, C>(
<
F
::
Proposer
as
bft
::
Proposer
<
Block
>>
::
Error
:
::
std
::
fmt
::
Display
+
Into
<
error
::
Error
>
,
<
F
as
bft
::
Environment
<
Block
>>
::
Error
:
::
std
::
fmt
::
Display
{
const
DELAY_UNTIL
:
Duration
=
Duration
::
from_millis
(
5000
);
let
mut
handle
=
LocalThreadHandle
::
current
();
match
bft_service
.build_upon
(
&
header
)
{
Ok
(
Some
(
bft
))
=>
if
let
Err
(
e
)
=
handle
.spawn_local
(
Box
::
new
(
bft
))
{
debug!
(
target
:
"bft"
,
"Couldn't initialize BFT agreement: {:?}"
,
e
);
},
Ok
(
None
)
=>
{},
Err
(
e
)
=>
warn!
(
target
:
"bft"
,
"BFT agreement error: {}"
,
e
),
}
let
work
=
Delay
::
new
(
Instant
::
now
()
+
DELAY_UNTIL
)
.then
(
move
|
res
|
{
if
let
Err
(
e
)
=
res
{
warn!
(
target
:
"bft"
,
"Failed to force delay of consensus: {:?}"
,
e
);
}
match
bft_service
.build_upon
(
&
header
)
{
Ok
(
maybe_bft_work
)
=>
{
if
maybe_bft_work
.is_some
()
{
debug!
(
target
:
"bft"
,
"Starting agreement. After forced delay for {:?}"
,
DELAY_UNTIL
);
}
maybe_bft_work
}
Err
(
e
)
=>
{
warn!
(
target
:
"bft"
,
"BFT agreement error: {}"
,
e
);
None
}
}
})
.map
(|
_
|
());
if
let
Err
(
e
)
=
handle
.spawn_local
(
Box
::
new
(
work
))
{
debug!
(
target
:
"bft"
,
"Couldn't initialize BFT agreement: {:?}"
,
e
);
}
}
/// Consensus service. Starts working when created.
...
...
@@ -113,7 +134,7 @@ impl Service {
client
.import_notification_stream
()
.for_each
(
move
|
notification
|
{
if
notification
.is_new_best
{
start_bft
(
&
notification
.header
,
&*
bft_service
);
start_bft
(
notification
.header
,
bft_service
.clone
()
);
}
Ok
(())
})
...
...
@@ -139,9 +160,9 @@ impl Service {
interval
.map_err
(|
e
|
debug!
(
"Timer error: {:?}"
,
e
))
.for_each
(
move
|
_
|
{
if
let
Ok
(
best_block
)
=
c
.best_block_header
()
{
let
hash
=
best_block
.hash
();
if
hash
==
prev_best
{
if
hash
==
prev_best
&&
s
.live_agreement
()
!=
Some
(
hash
)
{
debug!
(
"Starting consensus round after a timeout"
);
start_bft
(
&
best_block
,
&*
s
);
start_bft
(
best_block
,
s
.clone
()
);
}
prev_best
=
hash
;
}
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment