1
0

adding log support

This commit is contained in:
EAimTY 2023-03-04 19:45:55 +09:00
parent 089af00d4a
commit 525053a3cc
3 changed files with 118 additions and 34 deletions

View File

@ -106,7 +106,7 @@ mod default {
} }
pub fn auth_timeout() -> Duration { pub fn auth_timeout() -> Duration {
Duration::from_secs(3) Duration::from_secs(10)
} }
pub fn max_idle_time() -> Duration { pub fn max_idle_time() -> Duration {

View File

@ -29,7 +29,11 @@ async fn main() {
} }
}; };
LoggerBuilder::new().filter_level(cfg.log_level).init(); LoggerBuilder::new()
.filter_level(cfg.log_level)
.format_module_path(false)
.format_target(false)
.init();
match Server::init(cfg) { match Server::init(cfg) {
Ok(server) => server.start().await, Ok(server) => server.start().await,

View File

@ -8,8 +8,8 @@ use crossbeam_utils::atomic::AtomicCell;
use parking_lot::Mutex; use parking_lot::Mutex;
use quinn::{ use quinn::{
congestion::{BbrConfig, CubicConfig, NewRenoConfig}, congestion::{BbrConfig, CubicConfig, NewRenoConfig},
Connecting, Connection as QuinnConnection, Endpoint, EndpointConfig, IdleTimeout, RecvStream, Connecting, Connection as QuinnConnection, ConnectionError, Endpoint, EndpointConfig,
SendStream, ServerConfig, TokioRuntime, TransportConfig, VarInt, IdleTimeout, RecvStream, SendStream, ServerConfig, TokioRuntime, TransportConfig, VarInt,
}; };
use register_count::{Counter, Register}; use register_count::{Counter, Register};
use rustls::{version, ServerConfig as RustlsServerConfig}; use rustls::{version, ServerConfig as RustlsServerConfig};
@ -135,8 +135,15 @@ impl Server {
} }
pub async fn start(&self) { pub async fn start(&self) {
log::warn!(
"server started, listening on {}",
self.ep.local_addr().unwrap()
);
loop { loop {
let conn = self.ep.accept().await.unwrap(); let Some(conn) = self.ep.accept().await else {
return;
};
tokio::spawn(Connection::handle( tokio::spawn(Connection::handle(
conn, conn,
@ -180,16 +187,21 @@ impl Connection {
gc_interval: Duration, gc_interval: Duration,
gc_lifetime: Duration, gc_lifetime: Duration,
) { ) {
match Self::init( let addr = conn.remote_address();
let conn = Self::init(
conn, conn,
users, users,
udp_relay_ipv6, udp_relay_ipv6,
zero_rtt_handshake, zero_rtt_handshake,
max_external_pkt_size, max_external_pkt_size,
) )
.await .await;
{
match conn {
Ok(conn) => { Ok(conn) => {
log::info!("[{addr}] connection established");
tokio::spawn(conn.clone().handle_auth_timeout(auth_timeout)); tokio::spawn(conn.clone().handle_auth_timeout(auth_timeout));
tokio::spawn(conn.clone().collect_garbage(gc_interval, gc_lifetime)); tokio::spawn(conn.clone().collect_garbage(gc_interval, gc_lifetime));
@ -200,11 +212,17 @@ impl Connection {
match conn.accept().await { match conn.accept().await {
Ok(()) => {} Ok(()) => {}
Err(err) => eprintln!("{err}"), Err(err) if err.is_locally_closed() => {}
Err(err) if err.is_timeout_closed() => {
log::debug!("[{addr}] connection timeout")
}
Err(err) => log::warn!("[{addr}] {err}"),
} }
} }
} }
Err(err) => eprintln!("{err}"), Err(err) if err.is_locally_closed() => unreachable!(),
Err(err) if err.is_timeout_closed() => log::debug!("[{addr}] connection timeout"),
Err(err) => log::warn!("[{addr}] {err}"),
} }
} }
@ -219,7 +237,7 @@ impl Connection {
match conn.into_0rtt() { match conn.into_0rtt() {
Ok((conn, _)) => conn, Ok((conn, _)) => conn,
Err(conn) => { Err(conn) => {
eprintln!("0-RTT handshake failed, fallback to 1-RTT handshake"); log::info!("0-RTT handshake failed, fallback to 1-RTT handshake");
conn.await? conn.await?
} }
} }
@ -257,6 +275,9 @@ impl Connection {
} }
async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) {
let addr = self.inner.remote_address();
log::debug!("[{addr}] incoming unidirectional stream");
let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed); let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed);
if self.remote_uni_stream_cnt.count() == max { if self.remote_uni_stream_cnt.count() == max {
@ -299,27 +320,44 @@ impl Connection {
} }
match pre_process(&self, recv).await { match pre_process(&self, recv).await {
Ok(Task::Authenticate(_)) => {} Ok(Task::Authenticate(auth)) => log::info!("[{addr}] authenticated as {}", auth.uuid()),
Ok(Task::Packet(pkt)) => { Ok(Task::Packet(pkt)) => {
let assoc_id = pkt.assoc_id();
let pkt_id = pkt.pkt_id();
let frag_id = pkt.frag_id();
let frag_total = pkt.frag_total();
log::info!(
"[{addr}] [packet-from-quic] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}]"
);
self.set_udp_relay_mode(UdpRelayMode::Quic); self.set_udp_relay_mode(UdpRelayMode::Quic);
match self.handle_packet(pkt).await { match self.handle_packet(pkt).await {
Ok(()) => {} Ok(()) => {}
Err(err) => eprintln!("{err}"), Err(err) => log::warn!(
"[{addr}] [packet-from-quic] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}] {err}"
),
} }
} }
Ok(Task::Dissociate(assoc_id)) => match self.handle_dissociate(assoc_id).await { Ok(Task::Dissociate(assoc_id)) => {
log::info!("[{addr}] [dissociate] [{assoc_id}]");
match self.handle_dissociate(assoc_id).await {
Ok(()) => {} Ok(()) => {}
Err(err) => eprintln!("{err}"), Err(err) => log::warn!("[{addr}] [dissociate] [{assoc_id}] {err}"),
}, }
}
Ok(_) => unreachable!(), Ok(_) => unreachable!(),
Err(err) => { Err(err) => {
eprintln!("{err}"); log::warn!("[{addr}] handle unidirection stream error: {err}");
self.close(); self.close();
} }
} }
} }
async fn handle_bi_stream(self, (send, recv): (SendStream, RecvStream), _reg: Register) { async fn handle_bi_stream(self, (send, recv): (SendStream, RecvStream), _reg: Register) {
let addr = self.inner.remote_address();
log::debug!("[{addr}] incoming bidirectional stream");
let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed); let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed);
if self.remote_bi_stream_cnt.count() == max { if self.remote_bi_stream_cnt.count() == max {
@ -346,19 +384,27 @@ impl Connection {
} }
match pre_process(&self, send, recv).await { match pre_process(&self, send, recv).await {
Ok(Task::Connect(conn)) => match self.handle_connect(conn).await { Ok(Task::Connect(conn)) => {
let target_addr = conn.addr().to_string();
log::info!("[{addr}] [connect] [{target_addr}]");
match self.handle_connect(conn).await {
Ok(()) => {} Ok(()) => {}
Err(err) => eprintln!("{err}"), Err(err) => log::warn!("[{addr}] [connect] [{target_addr}] {err}"),
}, }
}
Ok(_) => unreachable!(), Ok(_) => unreachable!(),
Err(err) => { Err(err) => {
eprintln!("{err}"); log::warn!("[{addr}] handle bidirection stream error: {err}");
self.close(); self.close();
} }
} }
} }
async fn handle_datagram(self, dg: Bytes) { async fn handle_datagram(self, dg: Bytes) {
let addr = self.inner.remote_address();
log::debug!("[{addr}] incoming datagram");
async fn pre_process(conn: &Connection, dg: Bytes) -> Result<Task, Error> { async fn pre_process(conn: &Connection, dg: Bytes) -> Result<Task, Error> {
let task = conn.model.accept_datagram(dg)?; let task = conn.model.accept_datagram(dg)?;
@ -378,16 +424,26 @@ impl Connection {
match pre_process(&self, dg).await { match pre_process(&self, dg).await {
Ok(Task::Packet(pkt)) => { Ok(Task::Packet(pkt)) => {
let assoc_id = pkt.assoc_id();
let pkt_id = pkt.pkt_id();
let frag_id = pkt.frag_id();
let frag_total = pkt.frag_total();
log::info!(
"[{addr}] [packet-from-native] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}]"
);
self.set_udp_relay_mode(UdpRelayMode::Native); self.set_udp_relay_mode(UdpRelayMode::Native);
match self.handle_packet(pkt).await { match self.handle_packet(pkt).await {
Ok(()) => {} Ok(()) => {}
Err(err) => eprintln!("{err}"), Err(err) => log::warn!(
"[{addr}] [packet-from-native] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}] {err}"
),
} }
} }
Ok(Task::Heartbeat) => {} Ok(Task::Heartbeat) => log::info!("[{addr}] [heartbeat]"),
Ok(_) => unreachable!(), Ok(_) => unreachable!(),
Err(err) => { Err(err) => {
eprintln!("{err}"); log::warn!("[{addr}] handle datagram error: {err}");
self.close(); self.close();
} }
} }
@ -468,6 +524,8 @@ impl Connection {
time::sleep(timeout).await; time::sleep(timeout).await;
if !self.is_authed() { if !self.is_authed() {
let addr = self.inner.remote_address();
log::warn!("[{addr}] authentication timeout");
self.close(); self.close();
} }
} }
@ -570,20 +628,32 @@ impl UdpSession {
socket_v6: Option<Arc<UdpSocket>>, socket_v6: Option<Arc<UdpSocket>>,
cancel: Receiver<()>, cancel: Receiver<()>,
) { ) {
async fn send_pkt(conn: Connection, pkt: Bytes, addr: SocketAddr, assoc_id: u16) { async fn send_pkt(conn: Connection, pkt: Bytes, target_addr: SocketAddr, assoc_id: u16) {
let addr = Address::SocketAddress(addr); let addr = conn.inner.remote_address();
let target_addr_tuic = Address::SocketAddress(target_addr);
let res = match conn.get_udp_relay_mode() { let res = match conn.get_udp_relay_mode() {
Some(UdpRelayMode::Native) => conn.model.packet_native(pkt, addr, assoc_id), Some(UdpRelayMode::Native) => {
Some(UdpRelayMode::Quic) => conn.model.packet_quic(pkt, addr, assoc_id).await, log::info!("[{addr}] [packet-to-native] [{assoc_id}] [{target_addr_tuic}]");
conn.model.packet_native(pkt, target_addr_tuic, assoc_id)
}
Some(UdpRelayMode::Quic) => {
log::info!("[{addr}] [packet-to-quic] [{assoc_id}] [{target_addr_tuic}]");
conn.model
.packet_quic(pkt, target_addr_tuic, assoc_id)
.await
}
None => unreachable!(), None => unreachable!(),
}; };
if let Err(err) = res { if let Err(err) = res {
eprintln!("{err}"); let target_addr_tuic = Address::SocketAddress(target_addr);
log::warn!("[{addr}] [packet-to-quic] [{assoc_id}] [{target_addr_tuic}] {err}");
} }
} }
let addr = conn.inner.remote_address();
tokio::select! { tokio::select! {
_ = cancel => {} _ = cancel => {}
() = async { () = async {
@ -593,10 +663,10 @@ impl UdpSession {
socket_v6.as_deref(), socket_v6.as_deref(),
conn.max_external_pkt_size, conn.max_external_pkt_size,
).await { ).await {
Ok((pkt, addr)) => { Ok((pkt, target_addr)) => {
tokio::spawn(send_pkt(conn.clone(), pkt, addr, assoc_id)); tokio::spawn(send_pkt(conn.clone(), pkt, target_addr, assoc_id));
} }
Err(err) => eprintln!("{err}"), Err(err) => log::warn!("[{addr}] [packet-to-*] [{assoc_id}] {err}"),
} }
} }
} => unreachable!(), } => unreachable!(),
@ -674,3 +744,13 @@ impl Future for IsAuthed {
} }
} }
} }
impl Error {
fn is_locally_closed(&self) -> bool {
matches!(self, Self::Connection(ConnectionError::LocallyClosed))
}
fn is_timeout_closed(&self) -> bool {
matches!(self, Self::Connection(ConnectionError::TimedOut))
}
}