Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
P
polkadot-sdk
Manage
Activity
Members
Labels
Plan
Issues
0
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
0
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
parity
Mirrored projects
polkadot-sdk
Commits
35ba22cd
Commit
35ba22cd
authored
6 years ago
by
asynchronous rob
Committed by
Gav Wood
6 years ago
Browse files
Options
Downloads
Patches
Plain Diff
refactor aura slot timers a bit, add additional guards (#1303)
parent
831fd994
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
substrate/core/consensus/aura/src/lib.rs
+112
-128
112 additions, 128 deletions
substrate/core/consensus/aura/src/lib.rs
substrate/core/consensus/aura/src/slots.rs
+122
-0
122 additions, 0 deletions
substrate/core/consensus/aura/src/slots.rs
with
234 additions
and
128 deletions
substrate/core/consensus/aura/src/lib.rs
+
112
−
128
View file @
35ba22cd
...
...
@@ -38,11 +38,12 @@ extern crate substrate_consensus_common as consensus_common;
extern
crate
tokio
;
extern
crate
sr_version
as
runtime_version
;
extern
crate
substrate_network
as
network
;
extern
crate
futures
;
extern
crate
parking_lot
;
#[macro_use]
extern
crate
log
;
#[macro_use]
extern
crate
futures
;
#[cfg(test)]
extern
crate
substrate_keyring
as
keyring
;
...
...
@@ -53,10 +54,10 @@ extern crate substrate_test_client as test_client;
#[cfg(test)]
extern
crate
env_logger
;
pub
use
aura_primitives
::
*
;
mod
slots
;
use
std
::
sync
::
Arc
;
use
std
::
time
::
{
Duration
,
Instant
}
;
use
std
::
time
::
Duration
;
use
codec
::
Encode
;
use
consensus_common
::{
Authorities
,
BlockImport
,
Environment
,
Proposer
};
...
...
@@ -69,10 +70,11 @@ use network::import_queue::{Verifier, BasicQueue};
use
primitives
::{
AuthorityId
,
ed25519
};
use
futures
::{
Stream
,
Future
,
IntoFuture
,
future
::{
self
,
Either
}};
use
tokio
::
timer
::
{
Delay
,
Timeout
}
;
use
tokio
::
timer
::
Timeout
;
use
api
::
AuraApi
;
use
slots
::
Slots
;
pub
use
aura_primitives
::
AuraConsensusData
;
pub
use
aura_primitives
::
*
;
pub
use
consensus_common
::
SyncOracle
;
/// A handle to the network. This is generally implemented by providing some
...
...
@@ -221,135 +223,117 @@ pub fn start_aura<B, C, E, I, SO, Error>(
let
sync_oracle
=
sync_oracle
.clone
();
let
SlotDuration
(
slot_duration
)
=
slot_duration
;
fn
time_until_next
(
now
:
Duration
,
slot_duration
:
u64
)
->
Duration
{
let
remaining_full_secs
=
slot_duration
-
(
now
.as_secs
()
%
slot_duration
)
-
1
;
let
remaining_nanos
=
1_000_000_000
-
now
.subsec_nanos
();
Duration
::
new
(
remaining_full_secs
,
remaining_nanos
)
};
// rather than use a timer interval, we schedule our waits ourselves
Slots
::
new
(
slot_duration
)
.map_err
(|
e
|
debug!
(
target
:
"aura"
,
"Faulty timer: {:?}"
,
e
))
.for_each
(
move
|
slot_info
|
{
let
client
=
client
.clone
();
let
pair
=
pair
.clone
();
let
block_import
=
block_import
.clone
();
let
env
=
env
.clone
();
let
sync_oracle
=
sync_oracle
.clone
();
let
public_key
=
pair
.public
();
// only propose when we are not syncing.
if
sync_oracle
.is_major_syncing
()
{
debug!
(
target
:
"aura"
,
"Skipping proposal slot due to sync."
);
return
Either
::
B
(
future
::
ok
(()));
}
let
(
timestamp
,
slot_num
)
=
(
slot_info
.timestamp
,
slot_info
.number
);
let
chain_head
=
match
client
.best_block_header
()
{
Ok
(
x
)
=>
x
,
Err
(
e
)
=>
{
warn!
(
target
:
"aura"
,
"Unable to author block in slot {}.
\
no best block header: {:?}"
,
slot_num
,
e
);
return
Either
::
B
(
future
::
ok
(()))
}
};
// rather than use an interval, we schedule our waits ourselves
future
::
loop_fn
((),
move
|()|
{
let
next_slot_start
=
duration_now
()
.map
(|
now
|
Instant
::
now
()
+
time_until_next
(
now
,
slot_duration
))
.unwrap_or_else
(||
Instant
::
now
());
let
client
=
client
.clone
();
let
pair
=
pair
.clone
();
let
block_import
=
block_import
.clone
();
let
env
=
env
.clone
();
let
sync_oracle
=
sync_oracle
.clone
();
let
public_key
=
pair
.public
();
Delay
::
new
(
next_slot_start
)
.map_err
(|
e
|
debug!
(
target
:
"aura"
,
"Faulty timer: {:?}"
,
e
))
.and_then
(
move
|
_
|
{
// only propose when we are not syncing.
if
sync_oracle
.is_major_syncing
()
{
debug!
(
target
:
"aura"
,
"Skipping proposal slot due to sync."
);
let
authorities
=
match
client
.authorities
(
&
BlockId
::
Hash
(
chain_head
.hash
()))
{
Ok
(
authorities
)
=>
authorities
,
Err
(
e
)
=>
{
warn!
(
"Unable to fetch authorities at
\
block {:?}: {:?}"
,
chain_head
.hash
(),
e
);
return
Either
::
B
(
future
::
ok
(()));
}
};
let
pair
=
pair
.clone
();
let
(
timestamp
,
slot_num
)
=
match
timestamp_and_slot_now
(
slot_duration
)
{
Some
(
n
)
=>
n
,
None
=>
return
Either
::
B
(
future
::
err
(())),
};
let
chain_head
=
match
client
.best_block_header
()
{
Ok
(
x
)
=>
x
,
Err
(
e
)
=>
{
warn!
(
target
:
"aura"
,
"Unable to author block in slot {}.
\
no best block header: {:?}"
,
slot_num
,
e
);
return
Either
::
B
(
future
::
ok
(()))
}
};
let
authorities
=
match
client
.authorities
(
&
BlockId
::
Hash
(
chain_head
.hash
()))
{
Ok
(
authorities
)
=>
authorities
,
Err
(
e
)
=>
{
warn!
(
"Unable to fetch authorities at
\
block {:?}: {:?}"
,
chain_head
.hash
(),
e
);
return
Either
::
B
(
future
::
ok
(()));
}
};
let
proposal_work
=
match
slot_author
(
slot_num
,
&
authorities
)
{
None
=>
return
Either
::
B
(
future
::
ok
(())),
Some
(
author
)
=>
if
author
.0
==
public_key
.0
{
debug!
(
target
:
"aura"
,
"Starting authorship at slot {}; timestamp = {}"
,
slot_num
,
timestamp
);
// we are the slot author. make a block and sign it.
let
proposer
=
match
env
.init
(
&
chain_head
,
&
authorities
,
pair
.clone
())
{
Ok
(
p
)
=>
p
,
Err
(
e
)
=>
{
warn!
(
"Unable to author block in slot {:?}: {:?}"
,
slot_num
,
e
);
return
Either
::
B
(
future
::
ok
(()))
}
};
let
consensus_data
=
AuraConsensusData
{
timestamp
,
slot
:
slot_num
,
slot_duration
,
};
// deadline our production to approx. the end of the
// slot
Timeout
::
new
(
proposer
.propose
(
consensus_data
)
.into_future
(),
time_until_next
(
Duration
::
from_secs
(
timestamp
),
slot_duration
),
)
}
else
{
return
Either
::
B
(
future
::
ok
(()));
}
};
let
block_import
=
block_import
.clone
();
Either
::
A
(
proposal_work
.map
(
move
|
b
|
{
// minor hack since we don't have access to the timestamp
// that is actually set by the proposer.
let
slot_after_building
=
slot_now
(
slot_duration
);
if
slot_after_building
!=
Some
(
slot_num
)
{
info!
(
"Discarding proposal for slot {}; block production took too long"
,
slot_num
);
return
let
proposal_work
=
match
slot_author
(
slot_num
,
&
authorities
)
{
None
=>
return
Either
::
B
(
future
::
ok
(())),
Some
(
author
)
=>
if
author
.0
==
public_key
.0
{
debug!
(
target
:
"aura"
,
"Starting authorship at slot {}; timestamp = {}"
,
slot_num
,
timestamp
);
// we are the slot author. make a block and sign it.
let
proposer
=
match
env
.init
(
&
chain_head
,
&
authorities
,
pair
.clone
())
{
Ok
(
p
)
=>
p
,
Err
(
e
)
=>
{
warn!
(
"Unable to author block in slot {:?}: {:?}"
,
slot_num
,
e
);
return
Either
::
B
(
future
::
ok
(()))
}
};
let
consensus_data
=
AuraConsensusData
{
timestamp
,
slot
:
slot_num
,
slot_duration
,
};
// deadline our production to approx. the end of the
// slot
Timeout
::
new
(
proposer
.propose
(
consensus_data
)
.into_future
(),
slot_info
.remaining_duration
(),
)
}
else
{
return
Either
::
B
(
future
::
ok
(()));
}
};
let
(
header
,
body
)
=
b
.deconstruct
();
let
pre_hash
=
header
.hash
();
let
parent_hash
=
header
.parent_hash
()
.clone
();
// sign the pre-sealed hash of the block and then
// add it to a digest item.
let
to_sign
=
(
slot_num
,
pre_hash
)
.encode
();
let
signature
=
pair
.sign
(
&
to_sign
[
..
]);
let
item
=
<
DigestItemFor
<
B
>
as
CompatibleDigestItem
>
::
aura_seal
(
slot_num
,
signature
,
);
let
import_block
=
ImportBlock
{
origin
:
BlockOrigin
::
Own
,
header
,
justification
:
None
,
post_digests
:
vec!
[
item
],
body
:
Some
(
body
),
finalized
:
false
,
auxiliary
:
Vec
::
new
(),
};
if
let
Err
(
e
)
=
block_import
.import_block
(
import_block
,
None
)
{
warn!
(
target
:
"aura"
,
"Error with block built on {:?}: {:?}"
,
parent_hash
,
e
);
}
})
.map_err
(|
e
|
warn!
(
"Failed to construct block: {:?}"
,
e
))
)
})
.map
(|
_
|
future
::
Loop
::
Continue
(()))
})
let
block_import
=
block_import
.clone
();
Either
::
A
(
proposal_work
.map
(
move
|
b
|
{
// minor hack since we don't have access to the timestamp
// that is actually set by the proposer.
let
slot_after_building
=
slot_now
(
slot_duration
);
if
slot_after_building
!=
Some
(
slot_num
)
{
info!
(
"Discarding proposal for slot {}; block production took too long"
,
slot_num
);
return
}
let
(
header
,
body
)
=
b
.deconstruct
();
let
pre_hash
=
header
.hash
();
let
parent_hash
=
header
.parent_hash
()
.clone
();
// sign the pre-sealed hash of the block and then
// add it to a digest item.
let
to_sign
=
(
slot_num
,
pre_hash
)
.encode
();
let
signature
=
pair
.sign
(
&
to_sign
[
..
]);
let
item
=
<
DigestItemFor
<
B
>
as
CompatibleDigestItem
>
::
aura_seal
(
slot_num
,
signature
,
);
let
import_block
=
ImportBlock
{
origin
:
BlockOrigin
::
Own
,
header
,
justification
:
None
,
post_digests
:
vec!
[
item
],
body
:
Some
(
body
),
finalized
:
false
,
auxiliary
:
Vec
::
new
(),
};
if
let
Err
(
e
)
=
block_import
.import_block
(
import_block
,
None
)
{
warn!
(
target
:
"aura"
,
"Error with block built on {:?}: {:?}"
,
parent_hash
,
e
);
}
})
.map_err
(|
e
|
warn!
(
"Failed to construct block: {:?}"
,
e
))
)
})
};
let
work
=
future
::
loop_fn
((),
move
|()|
{
...
...
This diff is collapsed.
Click to expand it.
substrate/core/consensus/aura/src/slots.rs
0 → 100644
+
122
−
0
View file @
35ba22cd
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Substrate.
// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
//! Utility stream for yielding slots in a loop.
//!
//! This is used instead of `tokio_timer::Interval` because it was unreliable.
use
std
::
time
::{
Instant
,
Duration
};
use
tokio
::
timer
::
Delay
;
use
futures
::
prelude
::
*
;
/// Returns the duration until the next slot, based on current duration since
pub
(
crate
)
fn
time_until_next
(
now
:
Duration
,
slot_duration
:
u64
)
->
Duration
{
let
remaining_full_secs
=
slot_duration
-
(
now
.as_secs
()
%
slot_duration
)
-
1
;
let
remaining_nanos
=
1_000_000_000
-
now
.subsec_nanos
();
Duration
::
new
(
remaining_full_secs
,
remaining_nanos
)
}
/// Information about a slot.
#[derive(Debug,
Clone)]
pub
(
crate
)
struct
SlotInfo
{
/// The slot number.
pub
(
crate
)
number
:
u64
,
/// Current timestamp.
pub
(
crate
)
timestamp
:
u64
,
/// The instant at which the slot ends.
pub
(
crate
)
ends_at
:
Instant
,
}
impl
SlotInfo
{
/// Yields the remaining duration in the slot.
pub
(
crate
)
fn
remaining_duration
(
&
self
)
->
Duration
{
let
now
=
Instant
::
now
();
if
now
<
self
.ends_at
{
self
.ends_at
.duration_since
(
now
)
}
else
{
Duration
::
from_secs
(
0
)
}
}
}
/// A stream that returns every time there is a new slot.
pub
(
crate
)
struct
Slots
{
last_slot
:
u64
,
slot_duration
:
u64
,
inner_delay
:
Option
<
Delay
>
,
}
impl
Slots
{
/// Create a new `slots` stream.
pub
(
crate
)
fn
new
(
slot_duration
:
u64
)
->
Self
{
Slots
{
last_slot
:
0
,
slot_duration
,
inner_delay
:
None
,
}
}
}
impl
Stream
for
Slots
{
type
Item
=
SlotInfo
;
type
Error
=
tokio
::
timer
::
Error
;
fn
poll
(
&
mut
self
)
->
Poll
<
Option
<
SlotInfo
>
,
Self
::
Error
>
{
let
slot_duration
=
self
.slot_duration
;
self
.inner_delay
=
match
self
.inner_delay
.take
()
{
None
=>
{
// schedule wait.
let
wait_until
=
match
::
duration_now
()
{
None
=>
return
Ok
(
Async
::
Ready
(
None
)),
Some
(
now
)
=>
Instant
::
now
()
+
time_until_next
(
now
,
slot_duration
),
};
Some
(
Delay
::
new
(
wait_until
))
}
Some
(
d
)
=>
Some
(
d
),
};
if
let
Some
(
ref
mut
inner_delay
)
=
self
.inner_delay
{
try_ready!
(
inner_delay
.poll
());
}
// timeout has fired.
let
(
timestamp
,
slot_num
)
=
match
::
timestamp_and_slot_now
(
slot_duration
)
{
None
=>
return
Ok
(
Async
::
Ready
(
None
)),
Some
(
x
)
=>
x
,
};
// reschedule delay for next slot.
let
ends_at
=
Instant
::
now
()
+
time_until_next
(
Duration
::
from_secs
(
timestamp
),
slot_duration
);
self
.inner_delay
=
Some
(
Delay
::
new
(
ends_at
));
// never yield the same slot twice.
if
slot_num
>
self
.last_slot
{
self
.last_slot
=
slot_num
;
Ok
(
Async
::
Ready
(
Some
(
SlotInfo
{
number
:
slot_num
,
timestamp
,
ends_at
,
})))
}
else
{
// re-poll until we get a new slot.
self
.poll
()
}
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment