From 62193b259697f98fd5052f1a5ff78583b33e0072 Mon Sep 17 00:00:00 2001 From: EAimTY Date: Sat, 27 May 2023 18:24:31 +0900 Subject: [PATCH] less confusing connection initialization --- tuic-client/src/connection.rs | 114 ++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 54 deletions(-) diff --git a/tuic-client/src/connection.rs b/tuic-client/src/connection.rs index 14e758f..5307cc2 100644 --- a/tuic-client/src/connection.rs +++ b/tuic-client/src/connection.rs @@ -131,11 +131,8 @@ impl Endpoint { ep: &mut QuinnEndpoint, addr: SocketAddr, server_name: &str, - uuid: Uuid, - password: Arc<[u8]>, - udp_relay_mode: UdpRelayMode, zero_rtt_handshake: bool, - ) -> Result { + ) -> Result { let match_ipv4 = addr.is_ipv4() && ep.local_addr().map_or(false, |addr| addr.is_ipv4()); let match_ipv6 = addr.is_ipv6() && ep.local_addr().map_or(false, |addr| addr.is_ipv6()); @@ -155,6 +152,7 @@ impl Endpoint { } let conn = ep.connect(addr, server_name)?; + let conn = if zero_rtt_handshake { match conn.into_0rtt() { Ok((conn, _)) => conn, @@ -169,7 +167,7 @@ impl Endpoint { conn.await? }; - Ok(Connection::new(conn, udp_relay_mode, uuid, password)) + Ok(conn) } let mut last_err = None; @@ -179,9 +177,6 @@ impl Endpoint { &mut self.ep, addr, self.server.server_name(), - self.uuid, - self.password.clone(), - self.udp_relay_mode, self.zero_rtt_handshake, ) .await; @@ -189,12 +184,16 @@ impl Endpoint { match res { Ok(conn) => { log::info!("[connection] connection established"); - tokio::spawn(conn.clone().init( + + return Ok(Connection::new( + conn, + self.udp_relay_mode, + self.uuid, + self.password.clone(), self.heartbeat, self.gc_interval, self.gc_lifetime, )); - return Ok(conn); } Err(err) => last_err = Some(err), } @@ -218,25 +217,6 @@ pub struct Connection { } impl Connection { - fn new( - conn: QuinnConnection, - udp_relay_mode: UdpRelayMode, - uuid: Uuid, - password: Arc<[u8]>, - ) -> Self { - Self { - conn: conn.clone(), - model: Model::::new(conn), - uuid, - password, - udp_relay_mode, - remote_uni_stream_cnt: Counter::new(), - remote_bi_stream_cnt: Counter::new(), - max_concurrent_uni_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)), - max_concurrent_bi_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)), - } - } - pub async fn get() -> Result { let try_init_conn = async { ENDPOINT @@ -270,6 +250,57 @@ impl Connection { Ok(conn) } + fn new( + conn: QuinnConnection, + udp_relay_mode: UdpRelayMode, + uuid: Uuid, + password: Arc<[u8]>, + heartbeat: Duration, + gc_interval: Duration, + gc_lifetime: Duration, + ) -> Self { + let conn = Self { + conn: conn.clone(), + model: Model::::new(conn), + uuid, + password, + udp_relay_mode, + remote_uni_stream_cnt: Counter::new(), + remote_bi_stream_cnt: Counter::new(), + max_concurrent_uni_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)), + max_concurrent_bi_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)), + }; + + tokio::spawn(conn.clone().init(heartbeat, gc_interval, gc_lifetime)); + + conn + } + + async fn init(self, heartbeat: Duration, gc_interval: Duration, gc_lifetime: Duration) { + tokio::spawn(self.clone().authenticate()); + tokio::spawn(self.clone().heartbeat(heartbeat)); + tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime)); + + let err = loop { + tokio::select! { + res = self.accept_uni_stream() => match res { + Ok((recv, reg)) => tokio::spawn(self.clone().handle_uni_stream(recv, reg)), + Err(err) => break err, + }, + res = self.accept_bi_stream() => match res { + Ok((send, recv, reg)) => tokio::spawn(self.clone().handle_bi_stream(send, recv, reg)), + Err(err) => break err, + }, + res = self.accept_datagram() => match res { + Ok(dg) => tokio::spawn(self.clone().handle_datagram(dg)), + Err(err) => break err, + }, + }; + }; + + log::error!("[connection] {err}"); + } + pub async fn connect(&self, addr: Address) -> Result { Ok(self.model.connect(addr).await?) } @@ -445,29 +476,4 @@ impl Connection { self.model.collect_garbage(gc_lifetime); } } - - async fn init(self, heartbeat: Duration, gc_interval: Duration, gc_lifetime: Duration) { - tokio::spawn(self.clone().authenticate()); - tokio::spawn(self.clone().heartbeat(heartbeat)); - tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime)); - - let err = loop { - tokio::select! { - res = self.accept_uni_stream() => match res { - Ok((recv, reg)) => tokio::spawn(self.clone().handle_uni_stream(recv, reg)), - Err(err) => break err, - }, - res = self.accept_bi_stream() => match res { - Ok((send, recv, reg)) => tokio::spawn(self.clone().handle_bi_stream(send, recv, reg)), - Err(err) => break err, - }, - res = self.accept_datagram() => match res { - Ok(dg) => tokio::spawn(self.clone().handle_datagram(dg)), - Err(err) => break err, - }, - }; - }; - - log::error!("[connection] {err}"); - } }