1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
use std::sync::Arc; use parking_lot::Mutex; use bytes::Bytes; use message::{Command, Error}; use p2p::Context; use net::{PeerContext, PeerStats}; use protocol::{Protocol, PingProtocol, SyncProtocol, AddrProtocol, SeednodeProtocol}; use util::PeerInfo; pub trait SessionFactory { fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session; } pub struct SeednodeSessionFactory; impl SessionFactory for SeednodeSessionFactory { fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session { let peer_context = Arc::new(PeerContext::new(context, info, synchronous)); let ping = PingProtocol::new(peer_context.clone()).boxed(); let addr = AddrProtocol::new(peer_context.clone(), true).boxed(); let seed = SeednodeProtocol::new(peer_context.clone()).boxed(); Session::new(peer_context, vec![ping, addr, seed]) } } pub struct NormalSessionFactory; impl SessionFactory for NormalSessionFactory { fn new_session(context: Arc<Context>, info: PeerInfo, synchronous: bool) -> Session { let peer_context = Arc::new(PeerContext::new(context, info, synchronous)); let ping = PingProtocol::new(peer_context.clone()).boxed(); let addr = AddrProtocol::new(peer_context.clone(), false).boxed(); let sync = SyncProtocol::new(peer_context.clone()).boxed(); Session::new(peer_context, vec![ping, addr, sync]) } } pub struct Session { peer_context: Arc<PeerContext>, protocols: Mutex<Vec<Box<Protocol>>>, } impl Session { pub fn new(peer_context: Arc<PeerContext>, protocols: Vec<Box<Protocol>>) -> Self { Session { peer_context: peer_context, protocols: Mutex::new(protocols), } } pub fn initialize(&self) { for protocol in self.protocols.lock().iter_mut() { protocol.initialize(); } } pub fn maintain(&self) { for protocol in self.protocols.lock().iter_mut() { protocol.maintain(); } } pub fn on_message(&self, command: Command, payload: Bytes) -> Result<(), Error> { self.stats().lock().report_recv(command.clone(), payload.len()); self.protocols.lock() .iter_mut() .map(|protocol| { protocol.on_message(&command, &payload) }) .collect::<Result<Vec<_>, Error>>() .map(|_| ()) } pub fn on_close(&self) { for protocol in self.protocols.lock().iter_mut() { protocol.on_close(); } } pub fn stats(&self) -> &Mutex<PeerStats> { self.peer_context.stats() } }