better error handling mechanism on server
This commit is contained in:
parent
f5326259bd
commit
6c61c70c06
@ -38,9 +38,10 @@ use tokio::{
|
|||||||
};
|
};
|
||||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||||
use tuic::Address;
|
use tuic::Address;
|
||||||
use tuic_quinn::{side, Connect, Connection as Model, Packet, Task};
|
use tuic_quinn::{side, Authenticate, Connect, Connection as Model, Packet, Task};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
const ERROR_CODE: VarInt = VarInt::from_u32(0);
|
||||||
const DEFAULT_CONCURRENT_STREAMS: usize = 32;
|
const DEFAULT_CONCURRENT_STREAMS: usize = 32;
|
||||||
|
|
||||||
pub struct Server {
|
pub struct Server {
|
||||||
@ -227,7 +228,7 @@ impl Connection {
|
|||||||
Ok(conn) => {
|
Ok(conn) => {
|
||||||
log::info!("[{addr}] connection established");
|
log::info!("[{addr}] connection established");
|
||||||
|
|
||||||
tokio::spawn(conn.clone().handle_auth_timeout(auth_timeout));
|
tokio::spawn(conn.clone().timeout_authenticate(auth_timeout));
|
||||||
tokio::spawn(conn.clone().collect_garbage(gc_interval, gc_lifetime));
|
tokio::spawn(conn.clone().collect_garbage(gc_interval, gc_lifetime));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
@ -287,6 +288,31 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn authenticate(&self, auth: &Authenticate) -> Result<(), Error> {
|
||||||
|
if self.auth.get().is_some() {
|
||||||
|
Err(Error::DuplicatedAuth)
|
||||||
|
} else if self
|
||||||
|
.users
|
||||||
|
.get(&auth.uuid())
|
||||||
|
.map_or(false, |password| auth.validate(password))
|
||||||
|
{
|
||||||
|
self.auth.set(auth.uuid());
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(Error::AuthFailed(auth.uuid()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn timeout_authenticate(self, timeout: Duration) {
|
||||||
|
time::sleep(timeout).await;
|
||||||
|
|
||||||
|
if self.auth.get().is_none() {
|
||||||
|
let addr = self.inner.remote_address();
|
||||||
|
log::warn!("[{addr}] [authenticate] timeout");
|
||||||
|
self.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) {
|
async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) {
|
||||||
let addr = self.inner.remote_address();
|
let addr = self.inner.remote_address();
|
||||||
log::debug!("[{addr}] incoming unidirectional stream");
|
log::debug!("[{addr}] incoming unidirectional stream");
|
||||||
@ -310,17 +336,7 @@ impl Connection {
|
|||||||
.map_err(|_| Error::TaskNegotiationTimeout)??;
|
.map_err(|_| Error::TaskNegotiationTimeout)??;
|
||||||
|
|
||||||
if let Task::Authenticate(auth) = &task {
|
if let Task::Authenticate(auth) = &task {
|
||||||
if self.auth.get().is_some() {
|
self.authenticate(auth)?;
|
||||||
return Err(Error::DuplicatedAuth);
|
|
||||||
} else if self
|
|
||||||
.users
|
|
||||||
.get(&auth.uuid())
|
|
||||||
.map_or(false, |password| auth.validate(password))
|
|
||||||
{
|
|
||||||
self.auth.set(auth.uuid());
|
|
||||||
} else {
|
|
||||||
return Err(Error::AuthFailed(auth.uuid()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
@ -338,35 +354,10 @@ impl Connection {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match pre_process.await {
|
match pre_process.await {
|
||||||
Ok(Task::Authenticate(auth)) => {
|
Ok(Task::Authenticate(auth)) => self.handle_authenticate(auth).await,
|
||||||
log::info!("[{addr}] [{uuid}] [authenticate]", uuid = auth.uuid())
|
Ok(Task::Packet(pkt)) => self.handle_packet(pkt, UdpRelayMode::Quic).await,
|
||||||
}
|
Ok(Task::Dissociate(assoc_id)) => self.handle_dissociate(assoc_id).await,
|
||||||
Ok(Task::Packet(pkt)) => {
|
Ok(_) => unreachable!(), // already filtered in `tuic_quinn`
|
||||||
let assoc_id = pkt.assoc_id();
|
|
||||||
let pkt_id = pkt.pkt_id();
|
|
||||||
let frag_id = pkt.frag_id();
|
|
||||||
let frag_total = pkt.frag_total();
|
|
||||||
log::info!(
|
|
||||||
"[{addr}] [packet-from-quic] [{assoc_id}] [{pkt_id}] [{frag_id}/{frag_total}]"
|
|
||||||
);
|
|
||||||
|
|
||||||
self.set_udp_relay_mode(UdpRelayMode::Quic);
|
|
||||||
match self.handle_packet(pkt).await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => log::warn!(
|
|
||||||
"[{addr}] [packet-from-quic] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}] {err}"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Task::Dissociate(assoc_id)) => {
|
|
||||||
log::info!("[{addr}] [dissociate] [{assoc_id}]");
|
|
||||||
|
|
||||||
match self.handle_dissociate(assoc_id).await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => log::warn!("[{addr}] [dissociate] [{assoc_id}] {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(_) => unreachable!(),
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
log::warn!("[{addr}] handle unidirection stream error: {err}");
|
log::warn!("[{addr}] handle unidirection stream error: {err}");
|
||||||
self.close();
|
self.close();
|
||||||
@ -405,16 +396,8 @@ impl Connection {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match pre_process.await {
|
match pre_process.await {
|
||||||
Ok(Task::Connect(conn)) => {
|
Ok(Task::Connect(conn)) => self.handle_connect(conn).await,
|
||||||
let target_addr = conn.addr().to_string();
|
Ok(_) => unreachable!(), // already filtered in `tuic_quinn`
|
||||||
log::info!("[{addr}] [connect] [{target_addr}]");
|
|
||||||
|
|
||||||
match self.handle_connect(conn).await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => log::warn!("[{addr}] [connect] [{target_addr}] {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(_) => unreachable!(),
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
log::warn!("[{addr}] handle bidirection stream error: {err}");
|
log::warn!("[{addr}] handle bidirection stream error: {err}");
|
||||||
self.close();
|
self.close();
|
||||||
@ -444,24 +427,8 @@ impl Connection {
|
|||||||
};
|
};
|
||||||
|
|
||||||
match pre_process.await {
|
match pre_process.await {
|
||||||
Ok(Task::Packet(pkt)) => {
|
Ok(Task::Packet(pkt)) => self.handle_packet(pkt, UdpRelayMode::Native).await,
|
||||||
let assoc_id = pkt.assoc_id();
|
Ok(Task::Heartbeat) => self.handle_heartbeat().await,
|
||||||
let pkt_id = pkt.pkt_id();
|
|
||||||
let frag_id = pkt.frag_id();
|
|
||||||
let frag_total = pkt.frag_total();
|
|
||||||
log::info!(
|
|
||||||
"[{addr}] [packet-from-native] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}]"
|
|
||||||
);
|
|
||||||
|
|
||||||
self.set_udp_relay_mode(UdpRelayMode::Native);
|
|
||||||
match self.handle_packet(pkt).await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => log::warn!(
|
|
||||||
"[{addr}] [packet-from-native] [{assoc_id}] [{pkt_id}] [{frag_id}:{frag_total}] {err}"
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Task::Heartbeat) => log::info!("[{addr}] [heartbeat]"),
|
|
||||||
Ok(_) => unreachable!(),
|
Ok(_) => unreachable!(),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
log::warn!("[{addr}] handle datagram error: {err}");
|
log::warn!("[{addr}] handle datagram error: {err}");
|
||||||
@ -470,86 +437,142 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_connect(&self, conn: Connect) -> Result<(), Error> {
|
async fn handle_authenticate(&self, auth: Authenticate) {
|
||||||
let mut stream = None;
|
log::info!(
|
||||||
let mut last_err = None;
|
"[{addr}] [{uuid}] [authenticate] authenticated as {auth_uuid}",
|
||||||
|
addr = self.inner.remote_address(),
|
||||||
|
uuid = self.auth.get().unwrap(),
|
||||||
|
auth_uuid = auth.uuid(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
match resolve_dns(conn.addr()).await {
|
async fn handle_connect(&self, conn: Connect) {
|
||||||
Ok(addrs) => {
|
let target_addr = conn.addr().to_string();
|
||||||
for addr in addrs {
|
|
||||||
match TcpStream::connect(addr).await {
|
log::info!(
|
||||||
Ok(s) => {
|
"[{addr}] [{uuid}] [connect] {target_addr}",
|
||||||
stream = Some(s);
|
addr = self.inner.remote_address(),
|
||||||
break;
|
uuid = self.auth.get().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let process = async {
|
||||||
|
let mut stream = None;
|
||||||
|
let mut last_err = None;
|
||||||
|
|
||||||
|
match resolve_dns(conn.addr()).await {
|
||||||
|
Ok(addrs) => {
|
||||||
|
for addr in addrs {
|
||||||
|
match TcpStream::connect(addr).await {
|
||||||
|
Ok(s) => {
|
||||||
|
stream = Some(s);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(err) => last_err = Some(err),
|
||||||
}
|
}
|
||||||
Err(err) => last_err = Some(err),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Err(err) => last_err = Some(err),
|
||||||
}
|
}
|
||||||
Err(err) => last_err = Some(err),
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(mut stream) = stream {
|
if let Some(mut stream) = stream {
|
||||||
let mut conn = conn.compat();
|
let mut conn = conn.compat();
|
||||||
let res = io::copy_bidirectional(&mut conn, &mut stream).await;
|
let res = io::copy_bidirectional(&mut conn, &mut stream).await;
|
||||||
let _ = conn.get_mut().reset(VarInt::from_u32(0));
|
let _ = conn.get_mut().reset(ERROR_CODE);
|
||||||
let _ = stream.shutdown().await;
|
let _ = stream.shutdown().await;
|
||||||
res?;
|
res?;
|
||||||
|
Ok::<_, Error>(())
|
||||||
|
} else {
|
||||||
|
let _ = conn.compat().shutdown().await;
|
||||||
|
Err(last_err
|
||||||
|
.unwrap_or_else(|| IoError::new(ErrorKind::NotFound, "no address resolved")))?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match process.await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(err) => log::warn!(
|
||||||
|
"[{addr}] [{uuid}] [connect] relaying connection to {target_addr} error: {err}",
|
||||||
|
addr = self.inner.remote_address(),
|
||||||
|
uuid = self.auth.get().unwrap(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_packet(&self, pkt: Packet, mode: UdpRelayMode) {
|
||||||
|
let assoc_id = pkt.assoc_id();
|
||||||
|
let pkt_id = pkt.pkt_id();
|
||||||
|
let frag_id = pkt.frag_id();
|
||||||
|
let frag_total = pkt.frag_total();
|
||||||
|
|
||||||
|
log::info!(
|
||||||
|
"[{addr}] [{uuid}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] {frag_id}/{frag_total}",
|
||||||
|
addr = self.inner.remote_address(),
|
||||||
|
uuid = self.auth.get().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
|
self.set_udp_relay_mode(mode);
|
||||||
|
|
||||||
|
let process = async {
|
||||||
|
let Some((pkt, addr, assoc_id)) = pkt.accept().await? else {
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
|
||||||
|
let (socket_v4, socket_v6) = match self.udp_sessions.lock().await.entry(assoc_id) {
|
||||||
|
Entry::Occupied(mut entry) => {
|
||||||
|
let session = entry.get_mut();
|
||||||
|
(session.socket_v4.clone(), session.socket_v6.clone())
|
||||||
|
}
|
||||||
|
Entry::Vacant(entry) => {
|
||||||
|
let session = entry.insert(
|
||||||
|
UdpSession::new(assoc_id, self.clone(), self.udp_relay_ipv6).await?,
|
||||||
|
);
|
||||||
|
|
||||||
|
(session.socket_v4.clone(), session.socket_v6.clone())
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let Some(socket_addr) = resolve_dns(&addr).await?.next() else {
|
||||||
|
return Err(Error::from(IoError::new(ErrorKind::NotFound, "no address resolved")));
|
||||||
|
};
|
||||||
|
|
||||||
|
let socket = match socket_addr {
|
||||||
|
SocketAddr::V4(_) => socket_v4,
|
||||||
|
SocketAddr::V6(_) => {
|
||||||
|
socket_v6.ok_or_else(|| Error::UdpRelayIpv6Disabled(addr, socket_addr))?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
socket.send_to(&pkt, socket_addr).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
};
|
||||||
let _ = conn.compat().shutdown().await;
|
|
||||||
Err(last_err
|
match process.await {
|
||||||
.unwrap_or_else(|| IoError::new(ErrorKind::NotFound, "no address resolved")))?
|
Ok(()) => {}
|
||||||
|
Err(err) => log::warn!(
|
||||||
|
"[{addr}] [{uuid}] [packet] [{assoc_id:#06x}] [from-{mode}] [{pkt_id:#06x}] error handling fragment {frag_id}/{frag_total}: {err}",
|
||||||
|
addr = self.inner.remote_address(),
|
||||||
|
uuid = self.auth.get().unwrap(),
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_packet(&self, pkt: Packet) -> Result<(), Error> {
|
async fn handle_dissociate(&self, assoc_id: u16) {
|
||||||
let Some((pkt, addr, assoc_id)) = pkt.accept().await? else {
|
log::info!(
|
||||||
return Ok(());
|
"[{addr}] [{uuid}] [dissociate] [{assoc_id:#06x}]",
|
||||||
};
|
addr = self.inner.remote_address(),
|
||||||
|
uuid = self.auth.get().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
let (socket_v4, socket_v6) = match self.udp_sessions.lock().await.entry(assoc_id) {
|
|
||||||
Entry::Occupied(mut entry) => {
|
|
||||||
let session = entry.get_mut();
|
|
||||||
(session.socket_v4.clone(), session.socket_v6.clone())
|
|
||||||
}
|
|
||||||
Entry::Vacant(entry) => {
|
|
||||||
let session = entry
|
|
||||||
.insert(UdpSession::new(assoc_id, self.clone(), self.udp_relay_ipv6).await?);
|
|
||||||
|
|
||||||
(session.socket_v4.clone(), session.socket_v6.clone())
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let Some(socket_addr) = resolve_dns(&addr).await?.next() else {
|
|
||||||
return Err(Error::from(IoError::new(ErrorKind::NotFound, "no address resolved")));
|
|
||||||
};
|
|
||||||
|
|
||||||
let socket = match socket_addr {
|
|
||||||
SocketAddr::V4(_) => socket_v4,
|
|
||||||
SocketAddr::V6(_) => {
|
|
||||||
socket_v6.ok_or_else(|| Error::UdpRelayIpv6Disabled(addr, socket_addr))?
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
socket.send_to(&pkt, socket_addr).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_dissociate(&self, assoc_id: u16) -> Result<(), Error> {
|
|
||||||
self.udp_sessions.lock().await.remove(&assoc_id);
|
self.udp_sessions.lock().await.remove(&assoc_id);
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_auth_timeout(self, timeout: Duration) {
|
async fn handle_heartbeat(&self) {
|
||||||
time::sleep(timeout).await;
|
log::info!(
|
||||||
|
"[{addr}] [{uuid}] [heartbeat]",
|
||||||
if self.auth.get().is_none() {
|
addr = self.inner.remote_address(),
|
||||||
let addr = self.inner.remote_address();
|
uuid = self.auth.get().unwrap(),
|
||||||
log::warn!("[{addr}] authentication timeout");
|
);
|
||||||
self.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) {
|
async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) {
|
||||||
@ -577,7 +600,7 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn close(&self) {
|
fn close(&self) {
|
||||||
self.inner.close(VarInt::from_u32(0), b"");
|
self.inner.close(ERROR_CODE, &[]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
use rustls::{Certificate, PrivateKey};
|
use rustls::{Certificate, PrivateKey};
|
||||||
use rustls_pemfile::Item;
|
use rustls_pemfile::Item;
|
||||||
use std::{
|
use std::{
|
||||||
|
fmt::{Display, Formatter, Result as FmtResult},
|
||||||
fs::{self, File},
|
fs::{self, File},
|
||||||
io::{BufReader, Error as IoError},
|
io::{BufReader, Error as IoError},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
@ -46,6 +47,15 @@ pub enum UdpRelayMode {
|
|||||||
Quic,
|
Quic,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Display for UdpRelayMode {
|
||||||
|
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
|
||||||
|
match self {
|
||||||
|
Self::Native => write!(f, "native"),
|
||||||
|
Self::Quic => write!(f, "quic"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub enum CongestionControl {
|
pub enum CongestionControl {
|
||||||
Cubic,
|
Cubic,
|
||||||
NewReno,
|
NewReno,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user