From 525053a3cc13de7aac497be776858eb6f740b40e Mon Sep 17 00:00:00 2001 From: EAimTY Date: Sat, 4 Mar 2023 19:45:55 +0900 Subject: [PATCH] adding log support --- tuic-server/src/config.rs | 2 +- tuic-server/src/main.rs | 6 +- tuic-server/src/server.rs | 144 +++++++++++++++++++++++++++++--------- 3 files changed, 118 insertions(+), 34 deletions(-) diff --git a/tuic-server/src/config.rs b/tuic-server/src/config.rs index e409ffe..de01b6c 100644 --- a/tuic-server/src/config.rs +++ b/tuic-server/src/config.rs @@ -106,7 +106,7 @@ mod default { } pub fn auth_timeout() -> Duration { - Duration::from_secs(3) + Duration::from_secs(10) } pub fn max_idle_time() -> Duration { diff --git a/tuic-server/src/main.rs b/tuic-server/src/main.rs index 9e689a3..1b8d039 100644 --- a/tuic-server/src/main.rs +++ b/tuic-server/src/main.rs @@ -29,7 +29,11 @@ async fn main() { } }; - LoggerBuilder::new().filter_level(cfg.log_level).init(); + LoggerBuilder::new() + .filter_level(cfg.log_level) + .format_module_path(false) + .format_target(false) + .init(); match Server::init(cfg) { Ok(server) => server.start().await, diff --git a/tuic-server/src/server.rs b/tuic-server/src/server.rs index 1f35e56..b6c3204 100644 --- a/tuic-server/src/server.rs +++ b/tuic-server/src/server.rs @@ -8,8 +8,8 @@ use crossbeam_utils::atomic::AtomicCell; use parking_lot::Mutex; use quinn::{ congestion::{BbrConfig, CubicConfig, NewRenoConfig}, - Connecting, Connection as QuinnConnection, Endpoint, EndpointConfig, IdleTimeout, RecvStream, - SendStream, ServerConfig, TokioRuntime, TransportConfig, VarInt, + Connecting, Connection as QuinnConnection, ConnectionError, Endpoint, EndpointConfig, + IdleTimeout, RecvStream, SendStream, ServerConfig, TokioRuntime, TransportConfig, VarInt, }; use register_count::{Counter, Register}; use rustls::{version, ServerConfig as RustlsServerConfig}; @@ -135,8 +135,15 @@ impl Server { } pub async fn start(&self) { + log::warn!( + "server started, listening on {}", + self.ep.local_addr().unwrap() + ); + loop { - let conn = self.ep.accept().await.unwrap(); + let Some(conn) = self.ep.accept().await else { + return; + }; tokio::spawn(Connection::handle( conn, @@ -180,16 +187,21 @@ impl Connection { gc_interval: Duration, gc_lifetime: Duration, ) { - match Self::init( + let addr = conn.remote_address(); + + let conn = Self::init( conn, users, udp_relay_ipv6, zero_rtt_handshake, max_external_pkt_size, ) - .await - { + .await; + + match conn { Ok(conn) => { + log::info!("[{addr}] connection established"); + tokio::spawn(conn.clone().handle_auth_timeout(auth_timeout)); tokio::spawn(conn.clone().collect_garbage(gc_interval, gc_lifetime)); @@ -200,11 +212,17 @@ impl Connection { match conn.accept().await { Ok(()) => {} - Err(err) => eprintln!("{err}"), + Err(err) if err.is_locally_closed() => {} + Err(err) if err.is_timeout_closed() => { + log::debug!("[{addr}] connection timeout") + } + Err(err) => log::warn!("[{addr}] {err}"), } } } - Err(err) => eprintln!("{err}"), + Err(err) if err.is_locally_closed() => unreachable!(), + Err(err) if err.is_timeout_closed() => log::debug!("[{addr}] connection timeout"), + Err(err) => log::warn!("[{addr}] {err}"), } } @@ -219,7 +237,7 @@ impl Connection { match conn.into_0rtt() { Ok((conn, _)) => conn, Err(conn) => { - eprintln!("0-RTT handshake failed, fallback to 1-RTT handshake"); + log::info!("0-RTT handshake failed, fallback to 1-RTT handshake"); conn.await? } } @@ -257,6 +275,9 @@ impl Connection { } async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { + let addr = self.inner.remote_address(); + log::debug!("[{addr}] incoming unidirectional stream"); + let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed); if self.remote_uni_stream_cnt.count() == max { @@ -299,27 +320,44 @@ impl Connection { } match pre_process(&self, recv).await { - Ok(Task::Authenticate(_)) => {} + Ok(Task::Authenticate(auth)) => log::info!("[{addr}] authenticated as {}", auth.uuid()), Ok(Task::Packet(pkt)) => { + let assoc_id = pkt.assoc_id(); + let pkt_id = pkt.pkt_id(); + let frag_id = pkt.frag_id(); + let frag_total = pkt.frag_total(); + log::info!( + "[{addr}] [packet-from-quic] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}]" + ); + self.set_udp_relay_mode(UdpRelayMode::Quic); match self.handle_packet(pkt).await { Ok(()) => {} - Err(err) => eprintln!("{err}"), + Err(err) => log::warn!( + "[{addr}] [packet-from-quic] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}] {err}" + ), + } + } + Ok(Task::Dissociate(assoc_id)) => { + log::info!("[{addr}] [dissociate] [{assoc_id}]"); + + match self.handle_dissociate(assoc_id).await { + Ok(()) => {} + Err(err) => log::warn!("[{addr}] [dissociate] [{assoc_id}] {err}"), } } - Ok(Task::Dissociate(assoc_id)) => match self.handle_dissociate(assoc_id).await { - Ok(()) => {} - Err(err) => eprintln!("{err}"), - }, Ok(_) => unreachable!(), Err(err) => { - eprintln!("{err}"); + log::warn!("[{addr}] handle unidirection stream error: {err}"); self.close(); } } } async fn handle_bi_stream(self, (send, recv): (SendStream, RecvStream), _reg: Register) { + let addr = self.inner.remote_address(); + log::debug!("[{addr}] incoming bidirectional stream"); + let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed); if self.remote_bi_stream_cnt.count() == max { @@ -346,19 +384,27 @@ impl Connection { } match pre_process(&self, send, recv).await { - Ok(Task::Connect(conn)) => match self.handle_connect(conn).await { - Ok(()) => {} - Err(err) => eprintln!("{err}"), - }, + Ok(Task::Connect(conn)) => { + let target_addr = conn.addr().to_string(); + log::info!("[{addr}] [connect] [{target_addr}]"); + + match self.handle_connect(conn).await { + Ok(()) => {} + Err(err) => log::warn!("[{addr}] [connect] [{target_addr}] {err}"), + } + } Ok(_) => unreachable!(), Err(err) => { - eprintln!("{err}"); + log::warn!("[{addr}] handle bidirection stream error: {err}"); self.close(); } } } async fn handle_datagram(self, dg: Bytes) { + let addr = self.inner.remote_address(); + log::debug!("[{addr}] incoming datagram"); + async fn pre_process(conn: &Connection, dg: Bytes) -> Result { let task = conn.model.accept_datagram(dg)?; @@ -378,16 +424,26 @@ impl Connection { match pre_process(&self, dg).await { Ok(Task::Packet(pkt)) => { + let assoc_id = pkt.assoc_id(); + let pkt_id = pkt.pkt_id(); + let frag_id = pkt.frag_id(); + let frag_total = pkt.frag_total(); + log::info!( + "[{addr}] [packet-from-native] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}]" + ); + self.set_udp_relay_mode(UdpRelayMode::Native); match self.handle_packet(pkt).await { Ok(()) => {} - Err(err) => eprintln!("{err}"), + Err(err) => log::warn!( + "[{addr}] [packet-from-native] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}] {err}" + ), } } - Ok(Task::Heartbeat) => {} + Ok(Task::Heartbeat) => log::info!("[{addr}] [heartbeat]"), Ok(_) => unreachable!(), Err(err) => { - eprintln!("{err}"); + log::warn!("[{addr}] handle datagram error: {err}"); self.close(); } } @@ -468,6 +524,8 @@ impl Connection { time::sleep(timeout).await; if !self.is_authed() { + let addr = self.inner.remote_address(); + log::warn!("[{addr}] authentication timeout"); self.close(); } } @@ -570,20 +628,32 @@ impl UdpSession { socket_v6: Option>, cancel: Receiver<()>, ) { - async fn send_pkt(conn: Connection, pkt: Bytes, addr: SocketAddr, assoc_id: u16) { - let addr = Address::SocketAddress(addr); + async fn send_pkt(conn: Connection, pkt: Bytes, target_addr: SocketAddr, assoc_id: u16) { + let addr = conn.inner.remote_address(); + let target_addr_tuic = Address::SocketAddress(target_addr); let res = match conn.get_udp_relay_mode() { - Some(UdpRelayMode::Native) => conn.model.packet_native(pkt, addr, assoc_id), - Some(UdpRelayMode::Quic) => conn.model.packet_quic(pkt, addr, assoc_id).await, + Some(UdpRelayMode::Native) => { + log::info!("[{addr}] [packet-to-native] [{assoc_id}] [{target_addr_tuic}]"); + conn.model.packet_native(pkt, target_addr_tuic, assoc_id) + } + Some(UdpRelayMode::Quic) => { + log::info!("[{addr}] [packet-to-quic] [{assoc_id}] [{target_addr_tuic}]"); + conn.model + .packet_quic(pkt, target_addr_tuic, assoc_id) + .await + } None => unreachable!(), }; if let Err(err) = res { - eprintln!("{err}"); + let target_addr_tuic = Address::SocketAddress(target_addr); + log::warn!("[{addr}] [packet-to-quic] [{assoc_id}] [{target_addr_tuic}] {err}"); } } + let addr = conn.inner.remote_address(); + tokio::select! { _ = cancel => {} () = async { @@ -593,10 +663,10 @@ impl UdpSession { socket_v6.as_deref(), conn.max_external_pkt_size, ).await { - Ok((pkt, addr)) => { - tokio::spawn(send_pkt(conn.clone(), pkt, addr, assoc_id)); + Ok((pkt, target_addr)) => { + tokio::spawn(send_pkt(conn.clone(), pkt, target_addr, assoc_id)); } - Err(err) => eprintln!("{err}"), + Err(err) => log::warn!("[{addr}] [packet-to-*] [{assoc_id}] {err}"), } } } => unreachable!(), @@ -674,3 +744,13 @@ impl Future for IsAuthed { } } } + +impl Error { + fn is_locally_closed(&self) -> bool { + matches!(self, Self::Connection(ConnectionError::LocallyClosed)) + } + + fn is_timeout_closed(&self) -> bool { + matches!(self, Self::Connection(ConnectionError::TimedOut)) + } +}