Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Menu
Open sidebar
ddorgan
polkadot
Commits
e97a1715
Unverified
Commit
e97a1715
authored
Dec 10, 2019
by
Ashley
Browse files
Rewrite some functions as async
parent
970e4851
Changes
9
Hide whitespace changes
Inline
Side-by-side
collator/src/lib.rs
View file @
e97a1715
...
@@ -48,6 +48,7 @@ use std::collections::HashSet;
...
@@ -48,6 +48,7 @@ use std::collections::HashSet;
use
std
::
fmt
;
use
std
::
fmt
;
use
std
::
sync
::
Arc
;
use
std
::
sync
::
Arc
;
use
std
::
time
::
Duration
;
use
std
::
time
::
Duration
;
use
std
::
pin
::
Pin
;
use
futures
::{
future
,
Future
,
Stream
,
FutureExt
,
TryFutureExt
,
StreamExt
,
task
::
Spawn
};
use
futures
::{
future
,
Future
,
Stream
,
FutureExt
,
TryFutureExt
,
StreamExt
,
task
::
Spawn
};
use
log
::{
warn
,
error
};
use
log
::{
warn
,
error
};
...
@@ -242,20 +243,26 @@ impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E,
...
@@ -242,20 +243,26 @@ impl<P: 'static, E: 'static, SP: 'static> RelayChainContext for ApiContext<P, E,
SP
:
Spawn
+
Clone
+
Send
+
Sync
SP
:
Spawn
+
Clone
+
Send
+
Sync
{
{
type
Error
=
String
;
type
Error
=
String
;
type
FutureEgress
=
Box
<
dyn
Future
<
Output
=
Result
<
ConsolidatedIngress
,
String
>>
+
Unpin
+
Send
>
;
type
FutureEgress
=
Pin
<
Box
<
dyn
Future
<
Output
=
Result
<
ConsolidatedIngress
,
String
>>
+
Send
>
>
;
fn
unrouted_egress
(
&
self
,
_id
:
ParaId
)
->
Self
::
FutureEgress
{
fn
unrouted_egress
(
&
self
,
_id
:
ParaId
)
->
Self
::
FutureEgress
{
// TODO: https://github.com/paritytech/polkadot/issues/253
let
network
=
self
.network
.clone
();
//
let
parent_hash
=
self
.parent_hash
;
// Fetch ingress and accumulate all unrounted egress
let
authorities
=
self
.validators
.clone
();
let
_session
=
self
.network
.instantiate_leaf_work
(
LeafWorkParams
{
local_session_key
:
None
,
async
move
{
parent_hash
:
self
.parent_hash
,
// TODO: https://github.com/paritytech/polkadot/issues/253
authorities
:
self
.validators
.clone
(),
//
})
// Fetch ingress and accumulate all unrounted egress
.map_err
(|
e
|
format!
(
"unable to instantiate validation session: {:?}"
,
e
));
let
_session
=
network
.instantiate_leaf_work
(
LeafWorkParams
{
local_session_key
:
None
,
Box
::
new
(
future
::
ok
(
ConsolidatedIngress
(
Vec
::
new
())))
parent_hash
,
authorities
,
})
.map_err
(|
e
|
format!
(
"unable to instantiate validation session: {:?}"
,
e
));
Ok
(
ConsolidatedIngress
(
Vec
::
new
()))
}
.boxed
()
}
}
}
}
...
@@ -425,7 +432,7 @@ impl<P, E> Worker for CollationNode<P, E> where
...
@@ -425,7 +432,7 @@ impl<P, E> Worker for CollationNode<P, E> where
);
);
let
exit
=
inner_exit_2
.clone
();
let
exit
=
inner_exit_2
.clone
();
tokio
::
spawn
(
future
::
select
(
res
,
exit
)
.map
(
drop
));
tokio
::
spawn
(
future
::
select
(
res
.boxed
()
,
exit
)
.map
(
drop
));
})
})
});
});
...
...
network/src/lib.rs
View file @
e97a1715
...
@@ -28,7 +28,6 @@ pub mod gossip;
...
@@ -28,7 +28,6 @@ pub mod gossip;
use
codec
::{
Decode
,
Encode
};
use
codec
::{
Decode
,
Encode
};
use
futures
::
channel
::{
oneshot
,
mpsc
};
use
futures
::
channel
::{
oneshot
,
mpsc
};
use
futures
::
prelude
::
*
;
use
futures
::
prelude
::
*
;
use
futures
::
future
::
Either
;
use
polkadot_primitives
::{
Block
,
Hash
,
Header
};
use
polkadot_primitives
::{
Block
,
Hash
,
Header
};
use
polkadot_primitives
::
parachain
::{
use
polkadot_primitives
::
parachain
::{
Id
as
ParaId
,
CollatorId
,
CandidateReceipt
,
Collation
,
PoVBlock
,
Id
as
ParaId
,
CollatorId
,
CandidateReceipt
,
Collation
,
PoVBlock
,
...
@@ -837,25 +836,6 @@ impl PolkadotProtocol {
...
@@ -837,25 +836,6 @@ impl PolkadotProtocol {
debug!
(
target
:
"p_net"
,
"Importing local collation on relay parent {:?} and parachain {:?}"
,
debug!
(
target
:
"p_net"
,
"Importing local collation on relay parent {:?} and parachain {:?}"
,
relay_parent
,
collation
.info.parachain_index
);
relay_parent
,
collation
.info.parachain_index
);
let
res
=
match
self
.availability_store
{
Some
(
ref
availability_store
)
=>
{
let
availability_store_cloned
=
availability_store
.clone
();
let
collation_cloned
=
collation
.clone
();
Either
::
Left
((
async
move
{
let
_
=
availability_store_cloned
.make_available
(
av_store
::
Data
{
relay_parent
,
parachain_id
:
collation_cloned
.info.parachain_index
,
block_data
:
collation_cloned
.pov.block_data
.clone
(),
outgoing_queues
:
Some
(
outgoing_targeted
.clone
()
.into
()),
})
.await
;
}
)
.boxed
()
)
}
None
=>
Either
::
Right
(
futures
::
future
::
ready
(())),
};
for
(
primary
,
cloned_collation
)
in
self
.local_collations
.add_collation
(
relay_parent
,
targets
,
collation
.clone
())
{
for
(
primary
,
cloned_collation
)
in
self
.local_collations
.add_collation
(
relay_parent
,
targets
,
collation
.clone
())
{
match
self
.validators
.get
(
&
primary
)
{
match
self
.validators
.get
(
&
primary
)
{
Some
(
who
)
=>
{
Some
(
who
)
=>
{
...
@@ -871,7 +851,19 @@ impl PolkadotProtocol {
...
@@ -871,7 +851,19 @@ impl PolkadotProtocol {
}
}
}
}
res
let
availability_store
=
self
.availability_store
.clone
();
let
collation_cloned
=
collation
.clone
();
async
move
{
if
let
Some
(
availability_store
)
=
availability_store
{
let
_
=
availability_store
.make_available
(
av_store
::
Data
{
relay_parent
,
parachain_id
:
collation_cloned
.info.parachain_index
,
block_data
:
collation_cloned
.pov.block_data
.clone
(),
outgoing_queues
:
Some
(
outgoing_targeted
.clone
()
.into
()),
})
.await
;
}
}
}
}
/// Give the network protocol a handle to an availability store, used for
/// Give the network protocol a handle to an availability store, used for
...
...
network/src/router.rs
View file @
e97a1715
...
@@ -41,8 +41,9 @@ use log::{debug, trace};
...
@@ -41,8 +41,9 @@ use log::{debug, trace};
use
std
::
collections
::{
HashMap
,
HashSet
};
use
std
::
collections
::{
HashMap
,
HashSet
};
use
std
::
io
;
use
std
::
io
;
use
std
::
sync
::
Arc
;
use
std
::
sync
::
Arc
;
use
std
::
pin
::
Pin
;
use
crate
::
validation
::{
self
,
LeafWorkDataFetcher
,
Executor
};
use
crate
::
validation
::{
LeafWorkDataFetcher
,
Executor
};
use
crate
::
NetworkService
;
use
crate
::
NetworkService
;
/// Compute the gossip topic for attestations on the given parent hash.
/// Compute the gossip topic for attestations on the given parent hash.
...
@@ -232,7 +233,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
...
@@ -232,7 +233,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
E
:
Future
<
Output
=
()
>
+
Clone
+
Send
+
'static
,
E
:
Future
<
Output
=
()
>
+
Clone
+
Send
+
'static
,
{
{
type
Error
=
io
::
Error
;
type
Error
=
io
::
Error
;
type
FetchValidationProof
=
validation
::
PoVReceiver
;
type
FetchValidationProof
=
Pin
<
Box
<
dyn
Future
<
Output
=
Result
<
PoVBlock
,
io
::
Error
>>
+
Send
>>
;
// We have fetched from a collator and here the receipt should have been already formed.
// We have fetched from a collator and here the receipt should have been already formed.
fn
local_collation
(
fn
local_collation
(
...
...
network/src/tests/validation.rs
View file @
e97a1715
...
@@ -41,7 +41,7 @@ use std::collections::HashMap;
...
@@ -41,7 +41,7 @@ use std::collections::HashMap;
use
std
::
sync
::
Arc
;
use
std
::
sync
::
Arc
;
use
std
::
pin
::
Pin
;
use
std
::
pin
::
Pin
;
use
std
::
task
::{
Poll
,
Context
};
use
std
::
task
::{
Poll
,
Context
};
use
futures
::{
prelude
::
*
,
channel
::
mpsc
};
use
futures
::{
prelude
::
*
,
channel
::
mpsc
,
future
::{
select
,
Either
}
};
use
codec
::
Encode
;
use
codec
::
Encode
;
use
super
::{
TestContext
,
TestChainContext
};
use
super
::{
TestContext
,
TestChainContext
};
...
@@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification {
...
@@ -66,77 +66,48 @@ fn clone_gossip(n: &TopicNotification) -> TopicNotification {
}
}
}
}
struct
GossipRouter
{
async
fn
gossip_router
(
incoming_messages
:
mpsc
::
UnboundedReceiver
<
(
Hash
,
TopicNotification
)
>
,
mut
incoming_messages
:
mpsc
::
UnboundedReceiver
<
(
Hash
,
TopicNotification
)
>
,
incoming_streams
:
mpsc
::
UnboundedReceiver
<
(
Hash
,
mpsc
::
UnboundedSender
<
TopicNotification
>
)
>
,
mut
incoming_streams
:
mpsc
::
UnboundedReceiver
<
(
Hash
,
mpsc
::
UnboundedSender
<
TopicNotification
>
)
>
outgoing
:
Vec
<
(
Hash
,
mpsc
::
UnboundedSender
<
TopicNotification
>
)
>
,
)
{
messages
:
Vec
<
(
Hash
,
TopicNotification
)
>
,
let
mut
outgoing
:
Vec
<
(
Hash
,
mpsc
::
UnboundedSender
<
TopicNotification
>
)
>
=
Vec
::
new
();
}
let
mut
messages
=
Vec
::
new
();
impl
GossipRouter
{
loop
{
fn
add_message
(
&
mut
self
,
topic
:
Hash
,
message
:
TopicNotification
)
{
match
select
(
incoming_messages
.next
(),
incoming_streams
.next
())
.await
{
self
.outgoing
.retain
(|
&
(
ref
o_topic
,
ref
sender
)|
{
Either
::
Left
((
Some
((
topic
,
message
)),
_
))
=>
{
o_topic
!=
&
topic
||
sender
.unbounded_send
(
clone_gossip
(
&
message
))
.is_ok
()
outgoing
.retain
(|
&
(
ref
o_topic
,
ref
sender
)|
{
});
o_topic
!=
&
topic
||
sender
.unbounded_send
(
clone_gossip
(
&
message
))
.is_ok
()
self
.messages
.push
((
topic
,
message
));
});
}
messages
.push
((
topic
,
message
));
},
fn
add_outgoing
(
&
mut
self
,
topic
:
Hash
,
sender
:
mpsc
::
UnboundedSender
<
TopicNotification
>
)
{
Either
::
Right
((
Some
((
topic
,
sender
)),
_
))
=>
{
for
message
in
self
.messages
.iter
()
for
message
in
messages
.iter
()
.filter
(|
&&
(
ref
t
,
_
)|
t
==
&
topic
)
.filter
(|
&&
(
ref
t
,
_
)|
t
==
&
topic
)
.map
(|
&
(
_
,
ref
msg
)|
clone_gossip
(
msg
))
.map
(|
&
(
_
,
ref
msg
)|
clone_gossip
(
msg
))
{
{
if
let
Err
(
_
)
=
sender
.unbounded_send
(
message
)
{
return
}
if
let
Err
(
_
)
=
sender
.unbounded_send
(
message
)
{
return
}
}
}
self
.outgoing
.push
((
topic
,
sender
));
outgoing
.push
((
topic
,
sender
));
}
},
}
Either
::
Left
((
None
,
_
))
|
Either
::
Right
((
None
,
_
))
=>
panic!
(
"ended early."
)
impl
Future
for
GossipRouter
{
type
Output
=
();
fn
poll
(
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
)
->
Poll
<
Self
::
Output
>
{
let
this
=
Pin
::
into_inner
(
self
);
loop
{
match
Pin
::
new
(
&
mut
this
.incoming_messages
)
.poll_next
(
cx
)
{
Poll
::
Ready
(
Some
((
topic
,
message
)))
=>
this
.add_message
(
topic
,
message
),
Poll
::
Ready
(
None
)
=>
panic!
(
"ended early."
),
Poll
::
Pending
=>
break
,
}
}
loop
{
match
Pin
::
new
(
&
mut
this
.incoming_streams
)
.poll_next
(
cx
)
{
Poll
::
Ready
(
Some
((
topic
,
sender
)))
=>
this
.add_outgoing
(
topic
,
sender
),
Poll
::
Ready
(
None
)
=>
panic!
(
"ended early."
),
Poll
::
Pending
=>
break
,
}
}
}
Poll
::
Pending
}
}
}
}
#[derive(Clone)]
#[derive(Clone)]
struct
GossipHandle
{
struct
GossipHandle
{
send_message
:
mpsc
::
UnboundedSender
<
(
Hash
,
TopicNotification
)
>
,
send_message
:
mpsc
::
UnboundedSender
<
(
Hash
,
TopicNotification
)
>
,
send_listener
:
mpsc
::
UnboundedSender
<
(
Hash
,
mpsc
::
UnboundedSender
<
TopicNotification
>
)
>
,
send_listener
:
mpsc
::
UnboundedSender
<
(
Hash
,
mpsc
::
UnboundedSender
<
TopicNotification
>
)
>
,
}
}
fn
make_gossip
()
->
(
GossipRouter
,
GossipHandle
)
{
fn
make_gossip
()
->
(
impl
Future
<
Output
=
()
>
,
GossipHandle
)
{
let
(
message_tx
,
message_rx
)
=
mpsc
::
unbounded
();
let
(
message_tx
,
message_rx
)
=
mpsc
::
unbounded
();
let
(
listener_tx
,
listener_rx
)
=
mpsc
::
unbounded
();
let
(
listener_tx
,
listener_rx
)
=
mpsc
::
unbounded
();
(
(
GossipRouter
{
gossip_router
(
message_rx
,
listener_rx
),
incoming_messages
:
message_rx
,
incoming_streams
:
listener_rx
,
outgoing
:
Vec
::
new
(),
messages
:
Vec
::
new
(),
},
GossipHandle
{
send_message
:
message_tx
,
send_listener
:
listener_tx
},
GossipHandle
{
send_message
:
message_tx
,
send_listener
:
listener_tx
},
)
)
}
}
...
@@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork<
...
@@ -344,7 +315,7 @@ type TestValidationNetwork = crate::validation::ValidationNetwork<
>
;
>
;
struct
Built
{
struct
Built
{
gossip
:
GossipRouter
,
gossip
:
Pin
<
Box
<
dyn
Future
<
Output
=
()
>>>
,
api_handle
:
Arc
<
Mutex
<
ApiData
>>
,
api_handle
:
Arc
<
Mutex
<
ApiData
>>
,
networks
:
Vec
<
TestValidationNetwork
>
,
networks
:
Vec
<
TestValidationNetwork
>
,
}
}
...
@@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
...
@@ -377,7 +348,7 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built {
let
networks
:
Vec
<
_
>
=
networks
.collect
();
let
networks
:
Vec
<
_
>
=
networks
.collect
();
Built
{
Built
{
gossip
:
gossip_router
,
gossip
:
gossip_router
.boxed
()
,
api_handle
,
api_handle
,
networks
,
networks
,
}
}
...
...
network/src/validation.rs
View file @
e97a1715
...
@@ -33,14 +33,13 @@ use polkadot_primitives::parachain::{
...
@@ -33,14 +33,13 @@ use polkadot_primitives::parachain::{
use
futures
::
prelude
::
*
;
use
futures
::
prelude
::
*
;
use
futures
::
task
::
SpawnExt
;
use
futures
::
task
::
SpawnExt
;
pub
use
futures
::
task
::
Spawn
as
Executor
;
pub
use
futures
::
task
::
Spawn
as
Executor
;
use
futures
::
channel
::
oneshot
::{
self
,
Receiver
}
;
use
futures
::
channel
::
oneshot
;
use
futures
::
future
::{
ready
,
select
};
use
futures
::
future
::{
ready
,
select
};
use
std
::
collections
::
hash_map
::{
HashMap
,
Entry
};
use
std
::
collections
::
hash_map
::{
HashMap
,
Entry
};
use
std
::
io
;
use
std
::
io
;
use
std
::
sync
::
Arc
;
use
std
::
sync
::
Arc
;
use
std
::
pin
::
Pin
;
use
std
::
pin
::
Pin
;
use
std
::
task
::{
Poll
,
Context
};
use
arrayvec
::
ArrayVec
;
use
arrayvec
::
ArrayVec
;
use
parking_lot
::
Mutex
;
use
parking_lot
::
Mutex
;
...
@@ -242,47 +241,30 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
...
@@ -242,47 +241,30 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
#[derive(Clone,
Copy,
Debug,
PartialEq,
Eq)]
#[derive(Clone,
Copy,
Debug,
PartialEq,
Eq)]
pub
struct
NetworkDown
;
pub
struct
NetworkDown
;
/// A future that resolves when a collation is received.
pub
struct
AwaitingCollation
{
outer
:
oneshot
::
Receiver
<
oneshot
::
Receiver
<
Collation
>>
,
inner
:
Option
<
oneshot
::
Receiver
<
Collation
>>
}
impl
Future
for
AwaitingCollation
{
type
Output
=
Result
<
Collation
,
NetworkDown
>
;
fn
poll
(
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
)
->
Poll
<
Self
::
Output
>
{
let
this
=
Pin
::
into_inner
(
self
);
if
let
Some
(
ref
mut
inner
)
=
this
.inner
{
return
Pin
::
new
(
inner
)
.poll
(
cx
)
.map_err
(|
_
|
NetworkDown
)
}
match
Pin
::
new
(
&
mut
this
.outer
)
.poll
(
cx
)
{
Poll
::
Ready
(
Ok
(
inner
))
=>
{
this
.inner
=
Some
(
inner
);
Pin
::
new
(
this
)
.poll
(
cx
)
},
Poll
::
Ready
(
Err
(
_
))
=>
Poll
::
Ready
(
Err
(
NetworkDown
)),
Poll
::
Pending
=>
Poll
::
Pending
,
}
}
}
impl
<
P
,
E
:
Clone
,
N
,
T
:
Clone
>
Collators
for
ValidationNetwork
<
P
,
E
,
N
,
T
>
where
impl
<
P
,
E
:
Clone
,
N
,
T
:
Clone
>
Collators
for
ValidationNetwork
<
P
,
E
,
N
,
T
>
where
P
:
ProvideRuntimeApi
+
Send
+
Sync
+
'static
,
P
:
ProvideRuntimeApi
+
Send
+
Sync
+
'static
,
P
::
Api
:
ParachainHost
<
Block
>
,
P
::
Api
:
ParachainHost
<
Block
>
,
N
:
NetworkService
,
N
:
NetworkService
,
{
{
type
Error
=
NetworkDown
;
type
Error
=
NetworkDown
;
type
Collation
=
AwaitingCollation
;
type
Collation
=
Pin
<
Box
<
dyn
Future
<
Output
=
Result
<
Collation
,
NetworkDown
>>
+
Send
>>
;
fn
collate
(
&
self
,
parachain
:
ParaId
,
relay_parent
:
Hash
)
->
Self
::
Collation
{
fn
collate
(
&
self
,
parachain
:
ParaId
,
relay_parent
:
Hash
)
->
Self
::
Collation
{
let
(
tx
,
rx
)
=
oneshot
::
channel
();
let
(
tx
,
rx
)
=
oneshot
::
channel
();
self
.network
.with_spec
(
move
|
spec
,
_
|
{
let
network
=
self
.network
.clone
();
let
collation
=
spec
.await_collation
(
relay_parent
,
parachain
);
let
_
=
tx
.send
(
collation
);
// A future that resolves when a collation is received.
});
async
move
{
AwaitingCollation
{
outer
:
rx
,
inner
:
None
}
network
.with_spec
(
move
|
spec
,
_
|
{
let
collation
=
spec
.await_collation
(
relay_parent
,
parachain
);
let
_
=
tx
.send
(
collation
);
});
rx
.await
.map_err
(|
_
|
NetworkDown
)
?
.await
.map_err
(|
_
|
NetworkDown
)
}
.boxed
()
}
}
...
@@ -348,27 +330,6 @@ impl Knowledge {
...
@@ -348,27 +330,6 @@ impl Knowledge {
}
}
}
}
/// receiver for incoming data.
#[derive(Clone)]
pub
struct
IncomingReceiver
{
inner
:
future
::
Shared
<
Receiver
<
Incoming
>>
}
impl
Future
for
IncomingReceiver
{
type
Output
=
Result
<
Incoming
,
io
::
Error
>
;
fn
poll
(
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
)
->
Poll
<
Self
::
Output
>
{
match
Pin
::
new
(
&
mut
Pin
::
into_inner
(
self
)
.inner
)
.poll
(
cx
)
{
Poll
::
Ready
(
Ok
(
i
))
=>
Poll
::
Ready
(
Ok
(
Incoming
::
clone
(
&
i
))),
Poll
::
Ready
(
Err
(
_
))
=>
Poll
::
Ready
(
Err
(
io
::
Error
::
new
(
io
::
ErrorKind
::
Other
,
"Sending end of channel hung up"
,
))),
Poll
::
Pending
=>
Poll
::
Pending
,
}
}
}
/// A current validation leaf-work instance
/// A current validation leaf-work instance
#[derive(Clone)]
#[derive(Clone)]
pub
(
crate
)
struct
LiveValidationLeaf
{
pub
(
crate
)
struct
LiveValidationLeaf
{
...
@@ -564,36 +525,6 @@ impl LiveValidationLeaves {
...
@@ -564,36 +525,6 @@ impl LiveValidationLeaves {
}
}
}
}
/// Receiver for block data.
pub
struct
PoVReceiver
{
outer
:
Receiver
<
Receiver
<
PoVBlock
>>
,
inner
:
Option
<
Receiver
<
PoVBlock
>>
}
impl
Future
for
PoVReceiver
{
type
Output
=
Result
<
PoVBlock
,
io
::
Error
>
;
fn
poll
(
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
)
->
Poll
<
Self
::
Output
>
{
let
this
=
Pin
::
into_inner
(
self
);
let
map_err
=
|
_
|
io
::
Error
::
new
(
io
::
ErrorKind
::
Other
,
"Sending end of channel hung up"
,
);
if
let
Some
(
ref
mut
inner
)
=
this
.inner
{
return
Pin
::
new
(
inner
)
.poll
(
cx
)
.map_err
(
map_err
);
}
match
Pin
::
new
(
&
mut
this
.outer
)
.poll
(
cx
)
.map_err
(
map_err
)
?
{
Poll
::
Ready
(
inner
)
=>
{
this
.inner
=
Some
(
inner
);
Pin
::
new
(
this
)
.poll
(
cx
)
}
Poll
::
Pending
=>
Poll
::
Pending
,
}
}
}
/// Can fetch data for a given validation leaf-work instance.
/// Can fetch data for a given validation leaf-work instance.
pub
struct
LeafWorkDataFetcher
<
P
,
E
,
N
:
NetworkService
,
T
>
{
pub
struct
LeafWorkDataFetcher
<
P
,
E
,
N
:
NetworkService
,
T
>
{
network
:
Arc
<
N
>
,
network
:
Arc
<
N
>
,
...
@@ -658,9 +589,14 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
...
@@ -658,9 +589,14 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
E
:
Future
<
Output
=
()
>
+
Clone
+
Send
+
'static
,
E
:
Future
<
Output
=
()
>
+
Clone
+
Send
+
'static
,
{
{
/// Fetch PoV block for the given candidate receipt.
/// Fetch PoV block for the given candidate receipt.
pub
fn
fetch_pov_block
(
&
self
,
candidate
:
&
CandidateReceipt
)
->
PoVReceiver
{
pub
fn
fetch_pov_block
(
&
self
,
candidate
:
&
CandidateReceipt
)
->
Pin
<
Box
<
dyn
Future
<
Output
=
Result
<
PoVBlock
,
io
::
Error
>>
+
Send
>>
{
let
parachain
=
candidate
.parachain_index
;
let
parachain
=
candidate
.parachain_index
;
let
parent_hash
=
self
.parent_hash
;
let
parent_hash
=
self
.parent_hash
;
let
network
=
self
.network
.clone
();
let
candidate
=
candidate
.clone
();
let
(
tx
,
rx
)
=
oneshot
::
channel
();
let
canon_roots
=
self
.api
.runtime_api
()
.ingress
(
let
canon_roots
=
self
.api
.runtime_api
()
.ingress
(
&
BlockId
::
hash
(
parent_hash
),
&
BlockId
::
hash
(
parent_hash
),
...
@@ -676,15 +612,24 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
...
@@ -676,15 +612,24 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
)
)
);
);
let
candidate
=
candidate
.clone
();
async
move
{
let
(
tx
,
rx
)
=
oneshot
::
channel
();
network
.with_spec
(
move
|
spec
,
ctx
|
{
self
.network
.with_spec
(
move
|
spec
,
ctx
|
{
if
let
Ok
(
Some
(
canon_roots
))
=
canon_roots
{
if
let
Ok
(
Some
(
canon_roots
))
=
canon_roots
{
let
inner_rx
=
spec
.fetch_pov_block
(
ctx
,
&
candidate
,
parent_hash
,
canon_roots
);
let
inner_rx
=
spec
.fetch_pov_block
(
ctx
,
&
candidate
,
parent_hash
,
canon_roots
);
let
_
=
tx
.send
(
inner_rx
);
let
_
=
tx
.send
(
inner_rx
);
}
}
});
});
PoVReceiver
{
outer
:
rx
,
inner
:
None
}
let
map_err
=
|
_
|
io
::
Error
::
new
(
io
::
ErrorKind
::
Other
,
"Sending end of channel hung up"
,
);
rx
.await
.map_err
(
map_err
)
?
.await
.map_err
(
map_err
)
}
.boxed
()
}
}
}
}
...
...
validation/src/collation.rs
View file @
e97a1715
...
@@ -32,8 +32,6 @@ use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, Messag
...
@@ -32,8 +32,6 @@ use parachain::{wasm_executor::{self, ExternalitiesError, ExecutionMode}, Messag
use
trie
::
TrieConfiguration
;
use
trie
::
TrieConfiguration
;
use
futures
::
prelude
::
*
;
use
futures
::
prelude
::
*
;
use
log
::
debug
;
use
log
::
debug
;
use
std
::
task
::{
Poll
,
Context
};
use
std
::
pin
::
Pin
;
/// Encapsulates connections to collators and allows collation on any parachain.
/// Encapsulates connections to collators and allows collation on any parachain.
///
///
...
@@ -58,94 +56,41 @@ pub trait Collators: Clone {
...
@@ -58,94 +56,41 @@ pub trait Collators: Clone {
}
}
/// A future which resolves when a collation is available.
/// A future which resolves when a collation is available.
///
pub
async
fn
collation_fetch
<
C
:
Collators
,
P
>
(
/// This future is fused.
pub
struct
CollationFetch
<
C
:
Collators
,
P
>
{
parachain
:
ParaId
,
parachain
:
ParaId
,
relay_parent_hash
:
Hash
,
relay_parent_hash
:
Hash
,
relay_parent
:
BlockId
,
collators
:
C
,
collators
:
C
,
live_fetch
:
Option
<
C
::
Collation
>
,
client
:
Arc
<
P
>
,
client
:
Arc
<
P
>
,
max_block_data_size
:
Option
<
u64
>
,
max_block_data_size
:
Option
<
u64
>
,
}
)
->
Result
<
(
Collation
,
OutgoingMessages
,
Balance
),
C
::
Error
>
impl
<
C
:
Collators
,
P
>
CollationFetch
<
C
,
P
>
{
/// Create a new collation fetcher for the given chain.
pub
fn
new
(
parachain
:
ParaId
,
relay_parent_hash
:
Hash
,
collators
:
C
,
client
:
Arc
<
P
>
,