From fc4f33e06c139fa60959010f3541c6778dfa9453 Mon Sep 17 00:00:00 2001 From: EAimTY Date: Mon, 29 May 2023 23:51:47 +0900 Subject: [PATCH] logging more detailed info at server side --- tuic-server/src/connection/authenticated.rs | 11 +++ tuic-server/src/connection/handle_stream.rs | 45 +++++++-- tuic-server/src/connection/handle_task.rs | 104 ++++++++++++-------- tuic-server/src/connection/mod.rs | 42 ++++++-- tuic-server/src/connection/udp_session.rs | 9 +- tuic-server/src/server.rs | 4 +- 6 files changed, 155 insertions(+), 60 deletions(-) diff --git a/tuic-server/src/connection/authenticated.rs b/tuic-server/src/connection/authenticated.rs index 030d7d7..b38f4ad 100644 --- a/tuic-server/src/connection/authenticated.rs +++ b/tuic-server/src/connection/authenticated.rs @@ -1,6 +1,7 @@ use crossbeam_utils::atomic::AtomicCell; use parking_lot::Mutex; use std::{ + fmt::{Display, Formatter, Result as FmtResult}, future::Future, pin::Pin, sync::Arc, @@ -49,3 +50,13 @@ impl Future for Authenticated { } } } + +impl Display for Authenticated { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + if let Some(uuid) = self.get() { + write!(f, "{uuid}") + } else { + write!(f, "unauthenticated") + } + } +} diff --git a/tuic-server/src/connection/handle_stream.rs b/tuic-server/src/connection/handle_stream.rs index 514661f..bcbbfab 100644 --- a/tuic-server/src/connection/handle_stream.rs +++ b/tuic-server/src/connection/handle_stream.rs @@ -9,8 +9,12 @@ use tuic_quinn::Task; impl Connection { pub(crate) async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { - let addr = self.inner.remote_address(); - log::debug!("[{addr}] incoming unidirectional stream"); + log::debug!( + "[{id:#016x}] [{addr}] [{user}] incoming unidirectional stream", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + ); let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed); @@ -54,7 +58,12 @@ impl Connection { Ok(Task::Dissociate(assoc_id)) => self.handle_dissociate(assoc_id).await, Ok(_) => unreachable!(), // already filtered in `tuic_quinn` Err(err) => { - log::warn!("[{addr}] handle unidirection stream error: {err}"); + log::warn!( + "[{id:#016x}] [{addr}] [{user}] handling incoming unidirectional stream error: {err}", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + ); self.close(); } } @@ -65,8 +74,12 @@ impl Connection { (send, recv): (SendStream, RecvStream), _reg: Register, ) { - let addr = self.inner.remote_address(); - log::debug!("[{addr}] incoming bidirectional stream"); + log::debug!( + "[{id:#016x}] [{addr}] [{user}] incoming bidirectional stream", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + ); let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed); @@ -98,15 +111,24 @@ impl Connection { Ok(Task::Connect(conn)) => self.handle_connect(conn).await, Ok(_) => unreachable!(), // already filtered in `tuic_quinn` Err(err) => { - log::warn!("[{addr}] handle bidirection stream error: {err}"); + log::warn!( + "[{id:#016x}] [{addr}] [{user}] handling incoming bidirectional stream error: {err}", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + ); self.close(); } } } pub(crate) async fn handle_datagram(self, dg: Bytes) { - let addr = self.inner.remote_address(); - log::debug!("[{addr}] incoming datagram"); + log::debug!( + "[{id:#016x}] [{addr}] [{user}] incoming datagram", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + ); let pre_process = async { let task = self.model.accept_datagram(dg)?; @@ -130,7 +152,12 @@ impl Connection { Ok(Task::Heartbeat) => self.handle_heartbeat().await, Ok(_) => unreachable!(), Err(err) => { - log::warn!("[{addr}] handle datagram error: {err}"); + log::warn!( + "[{id:#016x}] [{addr}] [{user}] handling incoming datagram error: {err}", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + ); self.close(); } } diff --git a/tuic-server/src/connection/handle_task.rs b/tuic-server/src/connection/handle_task.rs index 2341c4c..e2779fa 100644 --- a/tuic-server/src/connection/handle_task.rs +++ b/tuic-server/src/connection/handle_task.rs @@ -17,9 +17,10 @@ use tuic_quinn::{Authenticate, Connect, Packet}; impl Connection { pub(super) async fn handle_authenticate(&self, auth: Authenticate) { log::info!( - "[{addr}] [{uuid}] [authenticate] authenticated as {auth_uuid}", + "[{id:#016x}] [{addr}] [{user}] [authenticate] {auth_uuid}", + id = self.id(), addr = self.inner.remote_address(), - uuid = self.auth.get().unwrap(), + user = self.auth, auth_uuid = auth.uuid(), ); } @@ -28,9 +29,10 @@ impl Connection { let target_addr = conn.addr().to_string(); log::info!( - "[{addr}] [{uuid}] [connect] {target_addr}", + "[{id:#016x}] [{addr}] [{user}] [connect] {target_addr}", + id = self.id(), addr = self.inner.remote_address(), - uuid = self.auth.get().unwrap(), + user = self.auth, ); let process = async { @@ -69,9 +71,10 @@ impl Connection { match process.await { Ok(()) => {} Err(err) => log::warn!( - "[{addr}] [{uuid}] [connect] relaying connection to {target_addr} error: {err}", + "[{id:#016x}] [{addr}] [{user}] [connect] {target_addr}: {err}", + id = self.id(), addr = self.inner.remote_address(), - uuid = self.auth.get().unwrap(), + user = self.auth, ), } } @@ -83,17 +86,36 @@ impl Connection { let frag_total = pkt.frag_total(); log::info!( - "[{addr}] [{uuid}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {frag_id}/{frag_total}", + "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {frag_id}/{frag_total}", + id = self.id(), addr = self.inner.remote_address(), - uuid = self.auth.get().unwrap(), + user = self.auth, ); self.udp_relay_mode.store(Some(mode)); + let (pkt, addr, assoc_id) = match pkt.accept().await { + Ok(None) => return, + Ok(Some(res)) => res, + Err(err) => { + log::warn!( + "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {frag_id}/{frag_total}: {err}", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + ); + return; + } + }; + let process = async { - let Some((pkt, addr, assoc_id)) = pkt.accept().await? else { - return Ok(()); - }; + log::info!( + "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {src_addr}", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + src_addr = addr, + ); let session = match self.udp_sessions.lock().entry(assoc_id) { Entry::Occupied(entry) => entry.get().clone(), @@ -116,21 +138,23 @@ impl Connection { session.send(pkt, socket_addr).await }; - match process.await { - Ok(()) => {} - Err(err) => log::warn!( - "[{addr}] [{uuid}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] error handling fragment {frag_id}/{frag_total}: {err}", + if let Err(err) = process.await { + log::warn!( + "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {src_addr}: {err}", + id = self.id(), addr = self.inner.remote_address(), - uuid = self.auth.get().unwrap(), - ), + user = self.auth, + src_addr = addr, + ); } } pub(super) async fn handle_dissociate(&self, assoc_id: u16) { log::info!( - "[{addr}] [{uuid}] [dissociate] [{assoc_id:#06x}]", + "[{id:#016x}] [{addr}] [{user}] [dissociate] [{assoc_id:#06x}]", + id = self.id(), addr = self.inner.remote_address(), - uuid = self.auth.get().unwrap(), + user = self.auth, ); if let Some(session) = self.udp_sessions.lock().remove(&assoc_id) { @@ -140,39 +164,37 @@ impl Connection { pub(super) async fn handle_heartbeat(&self) { log::info!( - "[{addr}] [{uuid}] [heartbeat]", + "[{id:#016x}] [{addr}] [{user}] [heartbeat]", + id = self.id(), addr = self.inner.remote_address(), - uuid = self.auth.get().unwrap(), + user = self.auth, ); } - pub(super) async fn send_packet(self, pkt: Bytes, addr: Address, assoc_id: u16) { + pub(super) async fn relay_packet(self, pkt: Bytes, addr: Address, assoc_id: u16) { let addr_display = addr.to_string(); - let res = match self.udp_relay_mode.load() { - Some(UdpRelayMode::Native) => { - log::info!( - "[{addr}] [packet-to-native] [{assoc_id}] [{target_addr}]", - addr = self.inner.remote_address(), - target_addr = addr_display, - ); - self.model.packet_native(pkt, addr, assoc_id) - } - Some(UdpRelayMode::Quic) => { - log::info!( - "[{addr}] [packet-to-quic] [{assoc_id}] [{target_addr}]", - addr = self.inner.remote_address(), - target_addr = addr_display, - ); - self.model.packet_quic(pkt, addr, assoc_id).await - } - None => unreachable!(), + log::info!( + "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [to-{mode}] {target_addr}", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + mode = self.udp_relay_mode.load().unwrap(), + target_addr = addr_display, + ); + + let res = match self.udp_relay_mode.load().unwrap() { + UdpRelayMode::Native => self.model.packet_native(pkt, addr, assoc_id), + UdpRelayMode::Quic => self.model.packet_quic(pkt, addr, assoc_id).await, }; if let Err(err) = res { log::warn!( - "[{addr}] [packet-to-native] [{assoc_id}] [{target_addr}] {err}", + "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [to-{mode}] {target_addr}: {err}", + id = self.id(), addr = self.inner.remote_address(), + user = self.auth, + mode = self.udp_relay_mode.load().unwrap(), target_addr = addr_display, ); } diff --git a/tuic-server/src/connection/mod.rs b/tuic-server/src/connection/mod.rs index a1123b9..2846b7e 100644 --- a/tuic-server/src/connection/mod.rs +++ b/tuic-server/src/connection/mod.rs @@ -74,7 +74,11 @@ impl Connection { match init.await { Ok(conn) => { - log::info!("[{addr}] connection established"); + log::info!( + "[{id:#016x}] [{addr}] [{user}] connection established", + id = conn.id(), + user = conn.auth, + ); tokio::spawn(conn.clone().timeout_authenticate(auth_timeout)); tokio::spawn(conn.clone().collect_garbage(gc_interval, gc_lifetime)); @@ -101,14 +105,27 @@ impl Connection { Ok(()) => {} Err(err) if err.is_locally_closed() => {} Err(err) if err.is_timeout_closed() => { - log::debug!("[{addr}] connection timeout") + log::debug!( + "[{id:#016x}] [{addr}] [{user}] connection timeout", + id = conn.id(), + user = conn.auth, + ); } - Err(err) => log::warn!("[{addr}] connection error: {err}"), + Err(err) => log::warn!( + "[{id:#016x}] [{addr}] [{user}] connection error: {err}", + id = conn.id(), + user = conn.auth, + ), } } } Err(err) if err.is_locally_closed() || err.is_timeout_closed() => unreachable!(), - Err(err) => log::warn!("[{addr}] connection establishing error: {err}"), + Err(err) => { + log::warn!( + "[{id:#016x}] [{addr}] [unauthenticated] connection establishing error: {err}", + id = usize::MAX, + ) + } } } @@ -155,8 +172,11 @@ impl Connection { time::sleep(timeout).await; if self.auth.get().is_none() { - let addr = self.inner.remote_address(); - log::warn!("[{addr}] [authenticate] timeout"); + log::warn!( + "[{id:#016x}] [{addr}] [unauthenticated] [authenticate] timeout", + id = self.id(), + addr = self.inner.remote_address(), + ); self.close(); } } @@ -169,10 +189,20 @@ impl Connection { break; } + log::debug!( + "[{id:#016x}] [{addr}] [{user}] packet fragment garbage collecting event", + id = self.id(), + addr = self.inner.remote_address(), + user = self.auth, + ); self.model.collect_garbage(gc_lifetime); } } + fn id(&self) -> usize { + self.inner.stable_id() + } + fn is_closed(&self) -> bool { self.inner.close_reason().is_some() } diff --git a/tuic-server/src/connection/udp_session.rs b/tuic-server/src/connection/udp_session.rs index fb52c6c..7f27109 100644 --- a/tuic-server/src/connection/udp_session.rs +++ b/tuic-server/src/connection/udp_session.rs @@ -97,12 +97,17 @@ impl UdpSession { let (pkt, addr) = match session_listening.recv().await { Ok(res) => res, Err(err) => { - log::warn!("{err}"); // TODO + log::warn!( + "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] outbound listening error: {err}", + id = session_listening.0.conn.id(), + addr = session_listening.0.conn.inner.remote_address(), + user = session_listening.0.conn.auth, + ); continue; } }; - tokio::spawn(session_listening.0.conn.clone().send_packet( + tokio::spawn(session_listening.0.conn.clone().relay_packet( pkt, Address::SocketAddress(addr), session_listening.0.assoc_id, diff --git a/tuic-server/src/server.rs b/tuic-server/src/server.rs index e9fee7a..c626ed1 100644 --- a/tuic-server/src/server.rs +++ b/tuic-server/src/server.rs @@ -49,8 +49,8 @@ impl Server { let mut tp_cfg = TransportConfig::default(); tp_cfg - .max_concurrent_bidi_streams(VarInt::from(DEFAULT_CONCURRENT_STREAMS as u32)) - .max_concurrent_uni_streams(VarInt::from(DEFAULT_CONCURRENT_STREAMS as u32)) + .max_concurrent_bidi_streams(VarInt::from(DEFAULT_CONCURRENT_STREAMS)) + .max_concurrent_uni_streams(VarInt::from(DEFAULT_CONCURRENT_STREAMS)) .send_window(cfg.send_window) .stream_receive_window(VarInt::from_u32(cfg.receive_window)) .max_idle_timeout(Some(