1
0

logging more detailed info at server side

This commit is contained in:
EAimTY 2023-05-29 23:51:47 +09:00
parent eb228e554e
commit fc4f33e06c
6 changed files with 155 additions and 60 deletions

View File

@ -1,6 +1,7 @@
use crossbeam_utils::atomic::AtomicCell; use crossbeam_utils::atomic::AtomicCell;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{ use std::{
fmt::{Display, Formatter, Result as FmtResult},
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
@ -49,3 +50,13 @@ impl Future for Authenticated {
} }
} }
} }
impl Display for Authenticated {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
if let Some(uuid) = self.get() {
write!(f, "{uuid}")
} else {
write!(f, "unauthenticated")
}
}
}

View File

@ -9,8 +9,12 @@ use tuic_quinn::Task;
impl Connection { impl Connection {
pub(crate) async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { pub(crate) async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) {
let addr = self.inner.remote_address(); log::debug!(
log::debug!("[{addr}] incoming unidirectional stream"); "[{id:#016x}] [{addr}] [{user}] incoming unidirectional stream",
id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
);
let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed); let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed);
@ -54,7 +58,12 @@ impl Connection {
Ok(Task::Dissociate(assoc_id)) => self.handle_dissociate(assoc_id).await, Ok(Task::Dissociate(assoc_id)) => self.handle_dissociate(assoc_id).await,
Ok(_) => unreachable!(), // already filtered in `tuic_quinn` Ok(_) => unreachable!(), // already filtered in `tuic_quinn`
Err(err) => { Err(err) => {
log::warn!("[{addr}] handle unidirection stream error: {err}"); log::warn!(
"[{id:#016x}] [{addr}] [{user}] handling incoming unidirectional stream error: {err}",
id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
);
self.close(); self.close();
} }
} }
@ -65,8 +74,12 @@ impl Connection {
(send, recv): (SendStream, RecvStream), (send, recv): (SendStream, RecvStream),
_reg: Register, _reg: Register,
) { ) {
let addr = self.inner.remote_address(); log::debug!(
log::debug!("[{addr}] incoming bidirectional stream"); "[{id:#016x}] [{addr}] [{user}] incoming bidirectional stream",
id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
);
let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed); let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed);
@ -98,15 +111,24 @@ impl Connection {
Ok(Task::Connect(conn)) => self.handle_connect(conn).await, Ok(Task::Connect(conn)) => self.handle_connect(conn).await,
Ok(_) => unreachable!(), // already filtered in `tuic_quinn` Ok(_) => unreachable!(), // already filtered in `tuic_quinn`
Err(err) => { Err(err) => {
log::warn!("[{addr}] handle bidirection stream error: {err}"); log::warn!(
"[{id:#016x}] [{addr}] [{user}] handling incoming bidirectional stream error: {err}",
id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
);
self.close(); self.close();
} }
} }
} }
pub(crate) async fn handle_datagram(self, dg: Bytes) { pub(crate) async fn handle_datagram(self, dg: Bytes) {
let addr = self.inner.remote_address(); log::debug!(
log::debug!("[{addr}] incoming datagram"); "[{id:#016x}] [{addr}] [{user}] incoming datagram",
id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
);
let pre_process = async { let pre_process = async {
let task = self.model.accept_datagram(dg)?; let task = self.model.accept_datagram(dg)?;
@ -130,7 +152,12 @@ impl Connection {
Ok(Task::Heartbeat) => self.handle_heartbeat().await, Ok(Task::Heartbeat) => self.handle_heartbeat().await,
Ok(_) => unreachable!(), Ok(_) => unreachable!(),
Err(err) => { Err(err) => {
log::warn!("[{addr}] handle datagram error: {err}"); log::warn!(
"[{id:#016x}] [{addr}] [{user}] handling incoming datagram error: {err}",
id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
);
self.close(); self.close();
} }
} }

View File

@ -17,9 +17,10 @@ use tuic_quinn::{Authenticate, Connect, Packet};
impl Connection { impl Connection {
pub(super) async fn handle_authenticate(&self, auth: Authenticate) { pub(super) async fn handle_authenticate(&self, auth: Authenticate) {
log::info!( log::info!(
"[{addr}] [{uuid}] [authenticate] authenticated as {auth_uuid}", "[{id:#016x}] [{addr}] [{user}] [authenticate] {auth_uuid}",
id = self.id(),
addr = self.inner.remote_address(), addr = self.inner.remote_address(),
uuid = self.auth.get().unwrap(), user = self.auth,
auth_uuid = auth.uuid(), auth_uuid = auth.uuid(),
); );
} }
@ -28,9 +29,10 @@ impl Connection {
let target_addr = conn.addr().to_string(); let target_addr = conn.addr().to_string();
log::info!( log::info!(
"[{addr}] [{uuid}] [connect] {target_addr}", "[{id:#016x}] [{addr}] [{user}] [connect] {target_addr}",
id = self.id(),
addr = self.inner.remote_address(), addr = self.inner.remote_address(),
uuid = self.auth.get().unwrap(), user = self.auth,
); );
let process = async { let process = async {
@ -69,9 +71,10 @@ impl Connection {
match process.await { match process.await {
Ok(()) => {} Ok(()) => {}
Err(err) => log::warn!( Err(err) => log::warn!(
"[{addr}] [{uuid}] [connect] relaying connection to {target_addr} error: {err}", "[{id:#016x}] [{addr}] [{user}] [connect] {target_addr}: {err}",
id = self.id(),
addr = self.inner.remote_address(), addr = self.inner.remote_address(),
uuid = self.auth.get().unwrap(), user = self.auth,
), ),
} }
} }
@ -83,17 +86,36 @@ impl Connection {
let frag_total = pkt.frag_total(); let frag_total = pkt.frag_total();
log::info!( log::info!(
"[{addr}] [{uuid}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {frag_id}/{frag_total}", "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {frag_id}/{frag_total}",
id = self.id(),
addr = self.inner.remote_address(), addr = self.inner.remote_address(),
uuid = self.auth.get().unwrap(), user = self.auth,
); );
self.udp_relay_mode.store(Some(mode)); self.udp_relay_mode.store(Some(mode));
let (pkt, addr, assoc_id) = match pkt.accept().await {
Ok(None) => return,
Ok(Some(res)) => res,
Err(err) => {
log::warn!(
"[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {frag_id}/{frag_total}: {err}",
id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
);
return;
}
};
let process = async { let process = async {
let Some((pkt, addr, assoc_id)) = pkt.accept().await? else { log::info!(
return Ok(()); "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {src_addr}",
}; id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
src_addr = addr,
);
let session = match self.udp_sessions.lock().entry(assoc_id) { let session = match self.udp_sessions.lock().entry(assoc_id) {
Entry::Occupied(entry) => entry.get().clone(), Entry::Occupied(entry) => entry.get().clone(),
@ -116,21 +138,23 @@ impl Connection {
session.send(pkt, socket_addr).await session.send(pkt, socket_addr).await
}; };
match process.await { if let Err(err) = process.await {
Ok(()) => {} log::warn!(
Err(err) => log::warn!( "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {src_addr}: {err}",
"[{addr}] [{uuid}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] error handling fragment {frag_id}/{frag_total}: {err}", id = self.id(),
addr = self.inner.remote_address(), addr = self.inner.remote_address(),
uuid = self.auth.get().unwrap(), user = self.auth,
), src_addr = addr,
);
} }
} }
pub(super) async fn handle_dissociate(&self, assoc_id: u16) { pub(super) async fn handle_dissociate(&self, assoc_id: u16) {
log::info!( log::info!(
"[{addr}] [{uuid}] [dissociate] [{assoc_id:#06x}]", "[{id:#016x}] [{addr}] [{user}] [dissociate] [{assoc_id:#06x}]",
id = self.id(),
addr = self.inner.remote_address(), addr = self.inner.remote_address(),
uuid = self.auth.get().unwrap(), user = self.auth,
); );
if let Some(session) = self.udp_sessions.lock().remove(&assoc_id) { if let Some(session) = self.udp_sessions.lock().remove(&assoc_id) {
@ -140,39 +164,37 @@ impl Connection {
pub(super) async fn handle_heartbeat(&self) { pub(super) async fn handle_heartbeat(&self) {
log::info!( log::info!(
"[{addr}] [{uuid}] [heartbeat]", "[{id:#016x}] [{addr}] [{user}] [heartbeat]",
id = self.id(),
addr = self.inner.remote_address(), addr = self.inner.remote_address(),
uuid = self.auth.get().unwrap(), user = self.auth,
); );
} }
pub(super) async fn send_packet(self, pkt: Bytes, addr: Address, assoc_id: u16) { pub(super) async fn relay_packet(self, pkt: Bytes, addr: Address, assoc_id: u16) {
let addr_display = addr.to_string(); let addr_display = addr.to_string();
let res = match self.udp_relay_mode.load() { log::info!(
Some(UdpRelayMode::Native) => { "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [to-{mode}] {target_addr}",
log::info!( id = self.id(),
"[{addr}] [packet-to-native] [{assoc_id}] [{target_addr}]", addr = self.inner.remote_address(),
addr = self.inner.remote_address(), user = self.auth,
target_addr = addr_display, mode = self.udp_relay_mode.load().unwrap(),
); target_addr = addr_display,
self.model.packet_native(pkt, addr, assoc_id) );
}
Some(UdpRelayMode::Quic) => { let res = match self.udp_relay_mode.load().unwrap() {
log::info!( UdpRelayMode::Native => self.model.packet_native(pkt, addr, assoc_id),
"[{addr}] [packet-to-quic] [{assoc_id}] [{target_addr}]", UdpRelayMode::Quic => self.model.packet_quic(pkt, addr, assoc_id).await,
addr = self.inner.remote_address(),
target_addr = addr_display,
);
self.model.packet_quic(pkt, addr, assoc_id).await
}
None => unreachable!(),
}; };
if let Err(err) = res { if let Err(err) = res {
log::warn!( log::warn!(
"[{addr}] [packet-to-native] [{assoc_id}] [{target_addr}] {err}", "[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] [to-{mode}] {target_addr}: {err}",
id = self.id(),
addr = self.inner.remote_address(), addr = self.inner.remote_address(),
user = self.auth,
mode = self.udp_relay_mode.load().unwrap(),
target_addr = addr_display, target_addr = addr_display,
); );
} }

View File

@ -74,7 +74,11 @@ impl Connection {
match init.await { match init.await {
Ok(conn) => { Ok(conn) => {
log::info!("[{addr}] connection established"); log::info!(
"[{id:#016x}] [{addr}] [{user}] connection established",
id = conn.id(),
user = conn.auth,
);
tokio::spawn(conn.clone().timeout_authenticate(auth_timeout)); tokio::spawn(conn.clone().timeout_authenticate(auth_timeout));
tokio::spawn(conn.clone().collect_garbage(gc_interval, gc_lifetime)); tokio::spawn(conn.clone().collect_garbage(gc_interval, gc_lifetime));
@ -101,14 +105,27 @@ impl Connection {
Ok(()) => {} Ok(()) => {}
Err(err) if err.is_locally_closed() => {} Err(err) if err.is_locally_closed() => {}
Err(err) if err.is_timeout_closed() => { Err(err) if err.is_timeout_closed() => {
log::debug!("[{addr}] connection timeout") log::debug!(
"[{id:#016x}] [{addr}] [{user}] connection timeout",
id = conn.id(),
user = conn.auth,
);
} }
Err(err) => log::warn!("[{addr}] connection error: {err}"), Err(err) => log::warn!(
"[{id:#016x}] [{addr}] [{user}] connection error: {err}",
id = conn.id(),
user = conn.auth,
),
} }
} }
} }
Err(err) if err.is_locally_closed() || err.is_timeout_closed() => unreachable!(), Err(err) if err.is_locally_closed() || err.is_timeout_closed() => unreachable!(),
Err(err) => log::warn!("[{addr}] connection establishing error: {err}"), Err(err) => {
log::warn!(
"[{id:#016x}] [{addr}] [unauthenticated] connection establishing error: {err}",
id = usize::MAX,
)
}
} }
} }
@ -155,8 +172,11 @@ impl Connection {
time::sleep(timeout).await; time::sleep(timeout).await;
if self.auth.get().is_none() { if self.auth.get().is_none() {
let addr = self.inner.remote_address(); log::warn!(
log::warn!("[{addr}] [authenticate] timeout"); "[{id:#016x}] [{addr}] [unauthenticated] [authenticate] timeout",
id = self.id(),
addr = self.inner.remote_address(),
);
self.close(); self.close();
} }
} }
@ -169,10 +189,20 @@ impl Connection {
break; break;
} }
log::debug!(
"[{id:#016x}] [{addr}] [{user}] packet fragment garbage collecting event",
id = self.id(),
addr = self.inner.remote_address(),
user = self.auth,
);
self.model.collect_garbage(gc_lifetime); self.model.collect_garbage(gc_lifetime);
} }
} }
fn id(&self) -> usize {
self.inner.stable_id()
}
fn is_closed(&self) -> bool { fn is_closed(&self) -> bool {
self.inner.close_reason().is_some() self.inner.close_reason().is_some()
} }

View File

@ -97,12 +97,17 @@ impl UdpSession {
let (pkt, addr) = match session_listening.recv().await { let (pkt, addr) = match session_listening.recv().await {
Ok(res) => res, Ok(res) => res,
Err(err) => { Err(err) => {
log::warn!("{err}"); // TODO log::warn!(
"[{id:#016x}] [{addr}] [{user}] [packet] [{assoc_id:#06x}] outbound listening error: {err}",
id = session_listening.0.conn.id(),
addr = session_listening.0.conn.inner.remote_address(),
user = session_listening.0.conn.auth,
);
continue; continue;
} }
}; };
tokio::spawn(session_listening.0.conn.clone().send_packet( tokio::spawn(session_listening.0.conn.clone().relay_packet(
pkt, pkt,
Address::SocketAddress(addr), Address::SocketAddress(addr),
session_listening.0.assoc_id, session_listening.0.assoc_id,

View File

@ -49,8 +49,8 @@ impl Server {
let mut tp_cfg = TransportConfig::default(); let mut tp_cfg = TransportConfig::default();
tp_cfg tp_cfg
.max_concurrent_bidi_streams(VarInt::from(DEFAULT_CONCURRENT_STREAMS as u32)) .max_concurrent_bidi_streams(VarInt::from(DEFAULT_CONCURRENT_STREAMS))
.max_concurrent_uni_streams(VarInt::from(DEFAULT_CONCURRENT_STREAMS as u32)) .max_concurrent_uni_streams(VarInt::from(DEFAULT_CONCURRENT_STREAMS))
.send_window(cfg.send_window) .send_window(cfg.send_window)
.stream_receive_window(VarInt::from_u32(cfg.receive_window)) .stream_receive_window(VarInt::from_u32(cfg.receive_window))
.max_idle_timeout(Some( .max_idle_timeout(Some(