diff --git a/tuic-client/src/connection/handle_stream.rs b/tuic-client/src/connection/handle_stream.rs new file mode 100644 index 0000000..10da01c --- /dev/null +++ b/tuic-client/src/connection/handle_stream.rs @@ -0,0 +1,114 @@ +use super::Connection; +use crate::{utils::UdpRelayMode, Error}; +use bytes::Bytes; +use quinn::{RecvStream, SendStream, VarInt}; +use register_count::Register; +use std::sync::atomic::Ordering; +use tuic_quinn::Task; + +impl Connection { + pub(super) async fn accept_uni_stream(&self) -> Result<(RecvStream, Register), Error> { + let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed); + + if self.remote_uni_stream_cnt.count() as u32 == max { + self.max_concurrent_uni_streams + .store(max * 2, Ordering::Relaxed); + + self.conn + .set_max_concurrent_uni_streams(VarInt::from(max * 2)); + } + + let recv = self.conn.accept_uni().await?; + let reg = self.remote_uni_stream_cnt.reg(); + Ok((recv, reg)) + } + + pub(super) async fn accept_bi_stream( + &self, + ) -> Result<(SendStream, RecvStream, Register), Error> { + let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed); + + if self.remote_bi_stream_cnt.count() as u32 == max { + self.max_concurrent_bi_streams + .store(max * 2, Ordering::Relaxed); + + self.conn + .set_max_concurrent_bi_streams(VarInt::from(max * 2)); + } + + let (send, recv) = self.conn.accept_bi().await?; + let reg = self.remote_bi_stream_cnt.reg(); + Ok((send, recv, reg)) + } + + pub(super) async fn accept_datagram(&self) -> Result { + Ok(self.conn.read_datagram().await?) + } + + pub(super) async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { + log::debug!("[relay] incoming unidirectional stream"); + + let res = match self.model.accept_uni_stream(recv).await { + Err(err) => Err(Error::Model(err)), + Ok(Task::Packet(pkt)) => match self.udp_relay_mode { + UdpRelayMode::Quic => { + log::info!( + "[relay] [packet] [{assoc_id:#06x}] [from-quic] [{pkt_id:#06x}] {frag_id}/{frag_total}", + assoc_id = pkt.assoc_id(), + pkt_id = pkt.pkt_id(), + frag_id = pkt.frag_id(), + frag_total = pkt.frag_total(), + ); + Self::handle_packet(pkt).await; + Ok(()) + } + UdpRelayMode::Native => Err(Error::WrongPacketSource), + }, + _ => unreachable!(), // already filtered in `tuic_quinn` + }; + + if let Err(err) = res { + log::warn!("[relay] incoming unidirectional stream error: {err}"); + } + } + + pub(super) async fn handle_bi_stream(self, send: SendStream, recv: RecvStream, _reg: Register) { + log::debug!("[relay] incoming bidirectional stream"); + + let res = match self.model.accept_bi_stream(send, recv).await { + Err(err) => Err::<(), _>(Error::Model(err)), + _ => unreachable!(), // already filtered in `tuic_quinn` + }; + + if let Err(err) = res { + log::warn!("[relay] incoming bidirectional stream error: {err}"); + } + } + + pub(super) async fn handle_datagram(self, dg: Bytes) { + log::debug!("[relay] incoming datagram"); + + let res = match self.model.accept_datagram(dg) { + Err(err) => Err(Error::Model(err)), + Ok(Task::Packet(pkt)) => match self.udp_relay_mode { + UdpRelayMode::Native => { + log::info!( + "[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {frag_id}/{frag_total}", + assoc_id = pkt.assoc_id(), + pkt_id = pkt.pkt_id(), + frag_id = pkt.frag_id(), + frag_total = pkt.frag_total(), + ); + Self::handle_packet(pkt).await; + Ok(()) + } + UdpRelayMode::Quic => Err(Error::WrongPacketSource), + }, + _ => unreachable!(), // already filtered in `tuic_quinn` + }; + + if let Err(err) = res { + log::warn!("[relay] incoming datagram error: {err}"); + } + } +} diff --git a/tuic-client/src/connection/handle_task.rs b/tuic-client/src/connection/handle_task.rs new file mode 100644 index 0000000..44e423d --- /dev/null +++ b/tuic-client/src/connection/handle_task.rs @@ -0,0 +1,131 @@ +use super::Connection; +use crate::{socks5::UDP_SESSIONS as SOCKS5_UDP_SESSIONS, utils::UdpRelayMode, Error}; +use bytes::Bytes; +use socks5_proto::Address as Socks5Address; +use std::time::Duration; +use tokio::time; +use tuic::Address; +use tuic_quinn::{Connect, Packet}; + +impl Connection { + pub(super) async fn authenticate(self) { + log::debug!("[relay] [authenticate] sending authentication"); + + match self + .model + .authenticate(self.uuid, self.password.clone()) + .await + { + Ok(()) => log::info!("[relay] [authenticate] {uuid}", uuid = self.uuid), + Err(err) => log::warn!("[relay] [authenticate] authentication sending error: {err}"), + } + } + + pub async fn connect(&self, addr: Address) -> Result { + let addr_display = addr.to_string(); + log::info!("[relay] [connect] {addr_display}"); + + match self.model.connect(addr).await { + Ok(conn) => Ok(conn), + Err(err) => { + log::warn!("[relay] [connect] failed initializing relay to {addr_display}: {err}"); + Err(Error::Model(err)) + } + } + } + + pub async fn packet(&self, pkt: Bytes, addr: Address, assoc_id: u16) -> Result<(), Error> { + let addr_display = addr.to_string(); + + match self.udp_relay_mode { + UdpRelayMode::Native => { + log::info!("[relay] [packet] [{assoc_id:#06x}] [to-native] {addr_display}"); + match self.model.packet_native(pkt, addr, assoc_id) { + Ok(()) => Ok(()), + Err(err) => { + log::warn!("[relay] [packet] [{assoc_id:#06x}] [to-native] failed relaying packet to {addr_display}: {err}"); + Err(Error::Model(err)) + } + } + } + UdpRelayMode::Quic => { + log::info!("[relay] [packet] [{assoc_id:#06x}] [to-quic] {addr_display}"); + match self.model.packet_quic(pkt, addr, assoc_id).await { + Ok(()) => Ok(()), + Err(err) => { + log::warn!("[relay] [packet] [{assoc_id:#06x}] [to-quic] failed relaying packet to {addr_display}: {err}"); + Err(Error::Model(err)) + } + } + } + } + } + + pub async fn dissociate(&self, assoc_id: u16) -> Result<(), Error> { + log::info!("[relay] [dissociate] [{assoc_id:#06x}]"); + match self.model.dissociate(assoc_id).await { + Ok(()) => Ok(()), + Err(err) => { + log::warn!("[relay] [dissociate] [{assoc_id:#06x}] failed dissociating: {err}"); + Err(Error::Model(err)) + } + } + } + + pub(super) async fn heartbeat(self, heartbeat: Duration) { + loop { + time::sleep(heartbeat).await; + + if self.is_closed() { + break; + } + + if self.model.task_connect_count() + self.model.task_associate_count() == 0 { + continue; + } + + match self.model.heartbeat().await { + Ok(()) => log::debug!("[relay] [heartbeat]"), + Err(err) => log::warn!("[relay] [heartbeat] heartbeat sending error: {err}"), + } + } + } + + pub(super) async fn handle_packet(pkt: Packet) { + let assoc_id = pkt.assoc_id(); + let pkt_id = pkt.pkt_id(); + + match pkt.accept().await { + Ok(Some((pkt, addr, _))) => { + log::info!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {addr}"); + + let addr = match addr { + Address::None => unreachable!(), + Address::DomainAddress(domain, port) => { + Socks5Address::DomainAddress(domain, port) + } + Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr), + }; + + let session = SOCKS5_UDP_SESSIONS + .get() + .unwrap() + .lock() + .get(&assoc_id) + .cloned(); + + if let Some(session) = session { + if let Err(err) = session.send(pkt, addr).await { + log::warn!( + "[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] failed sending packet to socks5 client: {err}", + ); + } + } else { + log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] unable to find socks5 associate session"); + } + } + Ok(None) => {} + Err(err) => log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] packet receiving error: {err}"), + } + } +} diff --git a/tuic-client/src/connection.rs b/tuic-client/src/connection/mod.rs similarity index 51% rename from tuic-client/src/connection.rs rename to tuic-client/src/connection/mod.rs index 85bf474..35f0280 100644 --- a/tuic-client/src/connection.rs +++ b/tuic-client/src/connection/mod.rs @@ -1,57 +1,54 @@ use crate::{ config::Relay, - socks5::UDP_SESSIONS as SOCKS5_UDP_SESSIONS, utils::{self, CongestionControl, ServerAddr, UdpRelayMode}, Error, }; -use bytes::Bytes; use crossbeam_utils::atomic::AtomicCell; use once_cell::sync::OnceCell; use parking_lot::Mutex; use quinn::{ congestion::{BbrConfig, CubicConfig, NewRenoConfig}, ClientConfig, Connection as QuinnConnection, Endpoint as QuinnEndpoint, EndpointConfig, - RecvStream, SendStream, TokioRuntime, TransportConfig, VarInt, + TokioRuntime, TransportConfig, VarInt, }; -use register_count::{Counter, Register}; +use register_count::Counter; use rustls::{version, ClientConfig as RustlsClientConfig}; -use socks5_proto::Address as Socks5Address; use std::{ net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, + sync::{atomic::AtomicU32, Arc}, time::Duration, }; use tokio::{ sync::{Mutex as AsyncMutex, OnceCell as AsyncOnceCell}, time, }; -use tuic::Address; -use tuic_quinn::{side, Connect, Connection as Model, Packet, Task}; +use tuic_quinn::{side, Connection as Model}; use uuid::Uuid; +mod handle_stream; +mod handle_task; + static ENDPOINT: OnceCell> = OnceCell::new(); static CONNECTION: AsyncOnceCell> = AsyncOnceCell::const_new(); static TIMEOUT: AtomicCell = AtomicCell::new(Duration::from_secs(0)); -pub const ERROR_CODE: VarInt = VarInt::from_u32(0); +pub(crate) const ERROR_CODE: VarInt = VarInt::from_u32(0); const DEFAULT_CONCURRENT_STREAMS: u32 = 32; -pub struct Endpoint { - ep: QuinnEndpoint, - server: ServerAddr, +#[derive(Clone)] +pub struct Connection { + conn: QuinnConnection, + model: Model, uuid: Uuid, password: Arc<[u8]>, udp_relay_mode: UdpRelayMode, - zero_rtt_handshake: bool, - heartbeat: Duration, - gc_interval: Duration, - gc_lifetime: Duration, + remote_uni_stream_cnt: Counter, + remote_bi_stream_cnt: Counter, + max_concurrent_uni_streams: Arc, + max_concurrent_bi_streams: Arc, } -impl Endpoint { +impl Connection { pub fn set_config(cfg: Relay) -> Result<(), Error> { let certs = utils::load_certs(cfg.certificates, cfg.disable_native_certs)?; @@ -107,7 +104,7 @@ impl Endpoint { ep.set_default_client_config(config); - let ep = Self { + let ep = Endpoint { ep, server: ServerAddr::new(cfg.server.0, cfg.server.1, cfg.ip), uuid: cfg.uuid, @@ -129,79 +126,6 @@ impl Endpoint { Ok(()) } - async fn connect(&mut self) -> Result { - let mut last_err = None; - - for addr in self.server.resolve().await? { - let connect_to = async { - let match_ipv4 = - addr.is_ipv4() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv4()); - let match_ipv6 = - addr.is_ipv6() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv6()); - - if !match_ipv4 && !match_ipv6 { - let bind_addr = if addr.is_ipv4() { - SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)) - } else { - SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)) - }; - - self.ep - .rebind(UdpSocket::bind(bind_addr).map_err(|err| { - Error::Socket("failed to create endpoint UDP socket", err) - })?) - .map_err(|err| { - Error::Socket("failed to rebind endpoint UDP socket", err) - })?; - } - - let conn = self.ep.connect(addr, self.server.server_name())?; - let conn = if self.zero_rtt_handshake { - match conn.into_0rtt() { - Ok((conn, _)) => conn, - Err(conn) => conn.await?, - } - } else { - conn.await? - }; - - Ok(conn) - }; - - match connect_to.await { - Ok(conn) => { - return Ok(Connection::new( - conn, - self.udp_relay_mode, - self.uuid, - self.password.clone(), - self.heartbeat, - self.gc_interval, - self.gc_lifetime, - )); - } - Err(err) => last_err = Some(err), - } - } - - Err(last_err.unwrap_or(Error::DnsResolve)) - } -} - -#[derive(Clone)] -pub struct Connection { - conn: QuinnConnection, - model: Model, - uuid: Uuid, - password: Arc<[u8]>, - udp_relay_mode: UdpRelayMode, - remote_uni_stream_cnt: Counter, - remote_bi_stream_cnt: Counter, - max_concurrent_uni_streams: Arc, - max_concurrent_bi_streams: Arc, -} - -impl Connection { pub async fn get() -> Result { let try_init_conn = async { ENDPOINT @@ -288,237 +212,10 @@ impl Connection { log::warn!("[relay] connection error: {err}"); } - pub async fn connect(&self, addr: Address) -> Result { - let addr_display = addr.to_string(); - log::info!("[relay] [connect] {addr_display}"); - - match self.model.connect(addr).await { - Ok(conn) => Ok(conn), - Err(err) => { - log::warn!("[relay] [connect] failed initializing relay to {addr_display}: {err}"); - Err(Error::Model(err)) - } - } - } - - pub async fn packet(&self, pkt: Bytes, addr: Address, assoc_id: u16) -> Result<(), Error> { - let addr_display = addr.to_string(); - - match self.udp_relay_mode { - UdpRelayMode::Native => { - log::info!("[relay] [packet] [{assoc_id:#06x}] [to-native] {addr_display}"); - match self.model.packet_native(pkt, addr, assoc_id) { - Ok(()) => Ok(()), - Err(err) => { - log::warn!("[relay] [packet] [{assoc_id:#06x}] [to-native] failed relaying packet to {addr_display}: {err}"); - Err(Error::Model(err)) - } - } - } - UdpRelayMode::Quic => { - log::info!("[relay] [packet] [{assoc_id:#06x}] [to-quic] {addr_display}"); - match self.model.packet_quic(pkt, addr, assoc_id).await { - Ok(()) => Ok(()), - Err(err) => { - log::warn!("[relay] [packet] [{assoc_id:#06x}] [to-quic] failed relaying packet to {addr_display}: {err}"); - Err(Error::Model(err)) - } - } - } - } - } - - pub async fn dissociate(&self, assoc_id: u16) -> Result<(), Error> { - log::info!("[relay] [dissociate] [{assoc_id:#06x}]"); - match self.model.dissociate(assoc_id).await { - Ok(()) => Ok(()), - Err(err) => { - log::warn!("[relay] [dissociate] [{assoc_id:#06x}] failed dissociating: {err}"); - Err(Error::Model(err)) - } - } - } - fn is_closed(&self) -> bool { self.conn.close_reason().is_some() } - async fn accept_uni_stream(&self) -> Result<(RecvStream, Register), Error> { - let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed); - - if self.remote_uni_stream_cnt.count() as u32 == max { - self.max_concurrent_uni_streams - .store(max * 2, Ordering::Relaxed); - - self.conn - .set_max_concurrent_uni_streams(VarInt::from(max * 2)); - } - - let recv = self.conn.accept_uni().await?; - let reg = self.remote_uni_stream_cnt.reg(); - Ok((recv, reg)) - } - - async fn accept_bi_stream(&self) -> Result<(SendStream, RecvStream, Register), Error> { - let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed); - - if self.remote_bi_stream_cnt.count() as u32 == max { - self.max_concurrent_bi_streams - .store(max * 2, Ordering::Relaxed); - - self.conn - .set_max_concurrent_bi_streams(VarInt::from(max * 2)); - } - - let (send, recv) = self.conn.accept_bi().await?; - let reg = self.remote_bi_stream_cnt.reg(); - Ok((send, recv, reg)) - } - - async fn accept_datagram(&self) -> Result { - Ok(self.conn.read_datagram().await?) - } - - async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { - log::debug!("[relay] incoming unidirectional stream"); - - let res = match self.model.accept_uni_stream(recv).await { - Err(err) => Err(Error::Model(err)), - Ok(Task::Packet(pkt)) => match self.udp_relay_mode { - UdpRelayMode::Quic => { - log::info!( - "[relay] [packet] [{assoc_id:#06x}] [from-quic] [{pkt_id:#06x}] {frag_id}/{frag_total}", - assoc_id = pkt.assoc_id(), - pkt_id = pkt.pkt_id(), - frag_id = pkt.frag_id(), - frag_total = pkt.frag_total(), - ); - Self::handle_packet(pkt).await; - Ok(()) - } - UdpRelayMode::Native => Err(Error::WrongPacketSource), - }, - _ => unreachable!(), // already filtered in `tuic_quinn` - }; - - match res { - Ok(()) => {} - Err(err) => log::warn!("[relay] incoming unidirectional stream error: {err}"), - } - } - - async fn handle_bi_stream(self, send: SendStream, recv: RecvStream, _reg: Register) { - log::debug!("[relay] incoming bidirectional stream"); - - let res = match self.model.accept_bi_stream(send, recv).await { - Err(err) => Err(Error::Model(err)), - _ => unreachable!(), // already filtered in `tuic_quinn` - }; - - match res { - Ok(()) => {} - Err(err) => log::warn!("[relay] incoming bidirectional stream error: {err}"), - } - } - - async fn handle_datagram(self, dg: Bytes) { - log::debug!("[relay] incoming datagram"); - - let res = match self.model.accept_datagram(dg) { - Err(err) => Err(Error::Model(err)), - Ok(Task::Packet(pkt)) => match self.udp_relay_mode { - UdpRelayMode::Native => { - log::info!( - "[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {frag_id}/{frag_total}", - assoc_id = pkt.assoc_id(), - pkt_id = pkt.pkt_id(), - frag_id = pkt.frag_id(), - frag_total = pkt.frag_total(), - ); - Self::handle_packet(pkt).await; - Ok(()) - } - UdpRelayMode::Quic => Err(Error::WrongPacketSource), - }, - _ => unreachable!(), // already filtered in `tuic_quinn` - }; - - match res { - Ok(()) => {} - Err(err) => log::warn!("[relay] incoming datagram error: {err}"), - } - } - - async fn authenticate(self) { - log::debug!("[relay] [authenticate] sending authentication"); - - match self - .model - .authenticate(self.uuid, self.password.clone()) - .await - { - Ok(()) => log::info!("[relay] [authenticate] {uuid}", uuid = self.uuid), - Err(err) => log::warn!("[relay] [authenticate] authentication sending error: {err}"), - } - } - - async fn heartbeat(self, heartbeat: Duration) { - loop { - time::sleep(heartbeat).await; - - if self.is_closed() { - break; - } - - if self.model.task_connect_count() + self.model.task_associate_count() == 0 { - continue; - } - - match self.model.heartbeat().await { - Ok(()) => log::debug!("[relay] [heartbeat]"), - Err(err) => log::warn!("[relay] [heartbeat] heartbeat sending error: {err}"), - } - } - } - - async fn handle_packet(pkt: Packet) { - let assoc_id = pkt.assoc_id(); - let pkt_id = pkt.pkt_id(); - - match pkt.accept().await { - Ok(Some((pkt, addr, _))) => { - log::info!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {addr}"); - - let addr = match addr { - Address::None => unreachable!(), - Address::DomainAddress(domain, port) => { - Socks5Address::DomainAddress(domain, port) - } - Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr), - }; - - let session = SOCKS5_UDP_SESSIONS - .get() - .unwrap() - .lock() - .get(&assoc_id) - .cloned(); - - if let Some(session) = session { - if let Err(err) = session.send(pkt, addr).await { - log::warn!( - "[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] failed sending packet to socks5 client: {err}", - ); - } - } else { - log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] unable to find socks5 associate session"); - } - } - Ok(None) => {} - Err(err) => log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] packet receiving error: {err}"), - } - } - async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) { loop { time::sleep(gc_interval).await; @@ -532,3 +229,75 @@ impl Connection { } } } + +struct Endpoint { + ep: QuinnEndpoint, + server: ServerAddr, + uuid: Uuid, + password: Arc<[u8]>, + udp_relay_mode: UdpRelayMode, + zero_rtt_handshake: bool, + heartbeat: Duration, + gc_interval: Duration, + gc_lifetime: Duration, +} + +impl Endpoint { + async fn connect(&mut self) -> Result { + let mut last_err = None; + + for addr in self.server.resolve().await? { + let connect_to = async { + let match_ipv4 = + addr.is_ipv4() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv4()); + let match_ipv6 = + addr.is_ipv6() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv6()); + + if !match_ipv4 && !match_ipv6 { + let bind_addr = if addr.is_ipv4() { + SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)) + } else { + SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0)) + }; + + self.ep + .rebind(UdpSocket::bind(bind_addr).map_err(|err| { + Error::Socket("failed to create endpoint UDP socket", err) + })?) + .map_err(|err| { + Error::Socket("failed to rebind endpoint UDP socket", err) + })?; + } + + let conn = self.ep.connect(addr, self.server.server_name())?; + let conn = if self.zero_rtt_handshake { + match conn.into_0rtt() { + Ok((conn, _)) => conn, + Err(conn) => conn.await?, + } + } else { + conn.await? + }; + + Ok(conn) + }; + + match connect_to.await { + Ok(conn) => { + return Ok(Connection::new( + conn, + self.udp_relay_mode, + self.uuid, + self.password.clone(), + self.heartbeat, + self.gc_interval, + self.gc_lifetime, + )); + } + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or(Error::DnsResolve)) + } +} diff --git a/tuic-client/src/error.rs b/tuic-client/src/error.rs new file mode 100644 index 0000000..ddbaad5 --- /dev/null +++ b/tuic-client/src/error.rs @@ -0,0 +1,31 @@ +use quinn::{ConnectError, ConnectionError}; +use rustls::Error as RustlsError; +use std::io::Error as IoError; +use thiserror::Error; +use tuic_quinn::Error as ModelError; + +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + Io(#[from] IoError), + #[error(transparent)] + Connect(#[from] ConnectError), + #[error(transparent)] + Connection(#[from] ConnectionError), + #[error(transparent)] + Model(#[from] ModelError), + #[error("load native certificates error: {0}")] + LoadNativeCerts(IoError), + #[error(transparent)] + Rustls(#[from] RustlsError), + #[error("{0}: {1}")] + Socket(&'static str, IoError), + #[error("timeout establishing connection")] + Timeout, + #[error("cannot resolve the server name")] + DnsResolve, + #[error("received packet from an unexpected source")] + WrongPacketSource, + #[error("invalid socks5 authentication")] + InvalidSocks5Auth, +} diff --git a/tuic-client/src/lib.rs b/tuic-client/src/lib.rs new file mode 100644 index 0000000..736e783 --- /dev/null +++ b/tuic-client/src/lib.rs @@ -0,0 +1,13 @@ +mod config; +mod connection; +mod error; +mod socks5; +mod utils; + +pub use crate::{ + config::{Config, ConfigError}, + connection::Connection, + error::Error, + socks5::Server as Socks5Server, + utils::{CongestionControl, ServerAddr, UdpRelayMode}, +}; diff --git a/tuic-client/src/main.rs b/tuic-client/src/main.rs index 730c924..b31a0c2 100644 --- a/tuic-client/src/main.rs +++ b/tuic-client/src/main.rs @@ -1,19 +1,6 @@ -use self::{ - config::{Config, ConfigError}, - connection::Endpoint, - socks5::Server as Socks5Server, -}; use env_logger::Builder as LoggerBuilder; -use quinn::{ConnectError, ConnectionError}; -use rustls::Error as RustlsError; -use std::{env, io::Error as IoError, process}; -use thiserror::Error; -use tuic_quinn::Error as ModelError; - -mod config; -mod connection; -mod socks5; -mod utils; +use std::{env, process}; +use tuic_client::{Config, ConfigError, Connection, Socks5Server}; #[tokio::main] async fn main() { @@ -35,7 +22,7 @@ async fn main() { .format_target(false) .init(); - match Endpoint::set_config(cfg.relay) { + match Connection::set_config(cfg.relay) { Ok(()) => {} Err(err) => { eprintln!("{err}"); @@ -53,29 +40,3 @@ async fn main() { Socks5Server::start().await; } - -#[derive(Debug, Error)] -pub enum Error { - #[error(transparent)] - Io(#[from] IoError), - #[error(transparent)] - Connect(#[from] ConnectError), - #[error(transparent)] - Connection(#[from] ConnectionError), - #[error(transparent)] - Model(#[from] ModelError), - #[error("load native certificates error: {0}")] - LoadNativeCerts(IoError), - #[error(transparent)] - Rustls(#[from] RustlsError), - #[error("{0}: {1}")] - Socket(&'static str, IoError), - #[error("timeout establishing connection")] - Timeout, - #[error("cannot resolve the server name")] - DnsResolve, - #[error("received packet from an unexpected source")] - WrongPacketSource, - #[error("invalid socks5 authentication")] - InvalidSocks5Auth, -} diff --git a/tuic-client/src/socks5.rs b/tuic-client/src/socks5.rs deleted file mode 100644 index 1b41e76..0000000 --- a/tuic-client/src/socks5.rs +++ /dev/null @@ -1,460 +0,0 @@ -use crate::{ - config::Local, - connection::{Connection as TuicConnection, ERROR_CODE}, - Error, -}; -use bytes::Bytes; -use once_cell::sync::OnceCell; -use parking_lot::Mutex; -use socket2::{Domain, Protocol, SockAddr, Socket, Type}; -use socks5_proto::{Address, Reply}; -use socks5_server::{ - auth::{NoAuth, Password}, - connection::{associate, bind, connect}, - Associate, AssociatedUdpSocket, Auth, Bind, Connect, Connection, Server as Socks5Server, -}; -use std::{ - collections::HashMap, - io::{Error as IoError, ErrorKind}, - net::{IpAddr, SocketAddr, TcpListener as StdTcpListener, UdpSocket as StdUdpSocket}, - sync::{ - atomic::{AtomicU16, Ordering}, - Arc, - }, -}; -use tokio::{ - io::{self, AsyncWriteExt}, - net::{TcpListener, UdpSocket}, -}; -use tokio_util::compat::FuturesAsyncReadCompatExt; -use tuic::Address as TuicAddress; - -static SERVER: OnceCell = OnceCell::new(); -pub static UDP_SESSIONS: OnceCell>> = OnceCell::new(); - -pub struct Server { - inner: Socks5Server, - dual_stack: Option, - max_pkt_size: usize, - next_assoc_id: AtomicU16, -} - -impl Server { - pub fn set_config(cfg: Local) -> Result<(), Error> { - SERVER - .set(Self::new( - cfg.server, - cfg.dual_stack, - cfg.max_packet_size, - cfg.username.map(|s| s.into_bytes()), - cfg.password.map(|s| s.into_bytes()), - )?) - .map_err(|_| "failed initializing socks5 server") - .unwrap(); - - UDP_SESSIONS - .set(Mutex::new(HashMap::new())) - .map_err(|_| "failed initializing socks5 UDP session pool") - .unwrap(); - - Ok(()) - } - - fn new( - addr: SocketAddr, - dual_stack: Option, - max_pkt_size: usize, - username: Option>, - password: Option>, - ) -> Result { - let socket = { - let domain = match addr { - SocketAddr::V4(_) => Domain::IPV4, - SocketAddr::V6(_) => Domain::IPV6, - }; - - let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) - .map_err(|err| Error::Socket("failed to create socks5 server socket", err))?; - - if let Some(dual_stack) = dual_stack { - socket.set_only_v6(!dual_stack).map_err(|err| { - Error::Socket("socks5 server dual-stack socket setting error", err) - })?; - } - - socket.set_reuse_address(true).map_err(|err| { - Error::Socket("failed to set socks5 server socket to reuse_address", err) - })?; - - socket.set_nonblocking(true).map_err(|err| { - Error::Socket("failed setting socks5 server socket as non-blocking", err) - })?; - - socket - .bind(&SockAddr::from(addr)) - .map_err(|err| Error::Socket("failed to bind socks5 server socket", err))?; - - socket - .listen(i32::MAX) - .map_err(|err| Error::Socket("failed to listen on socks5 server socket", err))?; - - TcpListener::from_std(StdTcpListener::from(socket)) - .map_err(|err| Error::Socket("failed to create socks5 server socket", err))? - }; - - let auth: Arc = match (username, password) { - (Some(username), Some(password)) => Arc::new(Password::new(username, password)), - (None, None) => Arc::new(NoAuth), - _ => return Err(Error::InvalidSocks5Auth), - }; - - Ok(Self { - inner: Socks5Server::new(socket, auth), - dual_stack, - max_pkt_size, - next_assoc_id: AtomicU16::new(0), - }) - } - - pub async fn start() { - let server = SERVER.get().unwrap(); - - log::warn!( - "[socks5] server started, listening on {}", - server.inner.local_addr().unwrap() - ); - - loop { - match server.inner.accept().await { - Ok((conn, addr)) => { - log::debug!("[socks5] [{addr}] connection established"); - - tokio::spawn(async move { - match conn.handshake().await { - Ok(Connection::Associate(associate, _)) => { - let assoc_id = server.next_assoc_id.fetch_add(1, Ordering::Relaxed); - log::info!("[socks5] [{addr}] [associate] [{assoc_id:#06x}]"); - Self::handle_associate( - associate, - assoc_id, - server.dual_stack, - server.max_pkt_size, - ) - .await; - } - Ok(Connection::Bind(bind, _)) => { - log::info!("[socks5] [{addr}] [bind]"); - Self::handle_bind(bind).await; - } - Ok(Connection::Connect(connect, target_addr)) => { - log::info!("[socks5] [{addr}] [connect] {target_addr}"); - Self::handle_connect(connect, target_addr).await; - } - Err(err) => log::warn!("[socks5] [{addr}] handshake error: {err}"), - }; - - log::debug!("[socks5] [{addr}] connection closed"); - }); - } - Err(err) => log::warn!("[socks5] failed to establish connection: {err}"), - } - } - } - - async fn handle_associate( - assoc: Associate, - assoc_id: u16, - dual_stack: Option, - max_pkt_size: usize, - ) { - let peer_addr = assoc.peer_addr().unwrap(); - let local_ip = assoc.local_addr().unwrap().ip(); - - match UdpSession::new(assoc_id, peer_addr, local_ip, dual_stack, max_pkt_size) { - Ok(session) => { - let local_addr = session.local_addr().unwrap(); - log::debug!( - "[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] bound to {local_addr}" - ); - - let mut assoc = match assoc - .reply(Reply::Succeeded, Address::SocketAddress(local_addr)) - .await - { - Ok(assoc) => assoc, - Err(err) => { - log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}"); - return; - } - }; - - UDP_SESSIONS - .get() - .unwrap() - .lock() - .insert(assoc_id, session.clone()); - - let handle_local_incoming_pkt = async move { - loop { - let (pkt, target_addr) = match session.recv().await { - Ok(res) => res, - Err(err) => { - log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed to receive UDP packet: {err}"); - continue; - } - }; - - let forward = async move { - let target_addr = match target_addr { - Address::DomainAddress(domain, port) => { - TuicAddress::DomainAddress(domain, port) - } - Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr), - }; - - match TuicConnection::get().await { - Ok(conn) => conn.packet(pkt, target_addr, assoc_id).await, - Err(err) => Err(err), - } - }; - - tokio::spawn(async move { - match forward.await { - Ok(()) => {} - Err(err) => { - log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed relaying UDP packet: {err}"); - } - } - }); - } - }; - - match tokio::select! { - res = assoc.wait_until_closed() => res, - _ = handle_local_incoming_pkt => unreachable!(), - } { - Ok(()) => {} - Err(err) => { - log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] associate connection error: {err}") - } - } - - log::debug!( - "[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] stopped associating" - ); - - UDP_SESSIONS - .get() - .unwrap() - .lock() - .remove(&assoc_id) - .unwrap(); - - let res = match TuicConnection::get().await { - Ok(conn) => conn.dissociate(assoc_id).await, - Err(err) => Err(err), - }; - - match res { - Ok(()) => {} - Err(err) => log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed stoping UDP relaying session: {err}"), - } - } - Err(err) => { - log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed setting up UDP associate session: {err}"); - - match assoc - .reply(Reply::GeneralFailure, Address::unspecified()) - .await - { - Ok(mut assoc) => { - let _ = assoc.shutdown().await; - } - Err(err) => { - log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}") - } - } - } - } - } - - async fn handle_bind(bind: Bind) { - let peer_addr = bind.peer_addr().unwrap(); - log::warn!("[socks5] [{peer_addr}] [bind] command not supported"); - - match bind - .reply(Reply::CommandNotSupported, Address::unspecified()) - .await - { - Ok(mut bind) => { - let _ = bind.shutdown().await; - } - Err(err) => log::warn!("[socks5] [{peer_addr}] [bind] command reply error: {err}"), - } - } - - async fn handle_connect(conn: Connect, addr: Address) { - let peer_addr = conn.peer_addr().unwrap(); - let target_addr = match addr { - Address::DomainAddress(domain, port) => TuicAddress::DomainAddress(domain, port), - Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr), - }; - - let relay = match TuicConnection::get().await { - Ok(conn) => conn.connect(target_addr.clone()).await, - Err(err) => Err(err), - }; - - match relay { - Ok(relay) => { - let mut relay = relay.compat(); - - match conn.reply(Reply::Succeeded, Address::unspecified()).await { - Ok(mut conn) => match io::copy_bidirectional(&mut conn, &mut relay).await { - Ok(_) => {} - Err(err) => { - let _ = conn.shutdown().await; - let _ = relay.get_mut().reset(ERROR_CODE); - log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] TCP stream relaying error: {err}"); - } - }, - Err(err) => { - let _ = relay.shutdown().await; - log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}"); - } - } - } - Err(err) => { - log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] unable to relay TCP stream: {err}"); - - match conn - .reply(Reply::GeneralFailure, Address::unspecified()) - .await - { - Ok(mut conn) => { - let _ = conn.shutdown().await; - } - Err(err) => { - log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}") - } - } - } - } - } -} - -#[derive(Clone)] -pub struct UdpSession { - socket: Arc, - assoc_id: u16, - ctrl_addr: SocketAddr, -} - -impl UdpSession { - fn new( - assoc_id: u16, - ctrl_addr: SocketAddr, - local_ip: IpAddr, - dual_stack: Option, - max_pkt_size: usize, - ) -> Result { - let domain = match local_ip { - IpAddr::V4(_) => Domain::IPV4, - IpAddr::V6(_) => Domain::IPV6, - }; - - let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)).map_err(|err| { - Error::Socket("failed to create socks5 server UDP associate socket", err) - })?; - - if let Some(dual_stack) = dual_stack { - socket.set_only_v6(!dual_stack).map_err(|err| { - Error::Socket( - "socks5 server UDP associate dual-stack socket setting error", - err, - ) - })?; - } - - socket.set_nonblocking(true).map_err(|err| { - Error::Socket( - "failed setting socks5 server UDP associate socket as non-blocking", - err, - ) - })?; - - socket - .bind(&SockAddr::from(SocketAddr::from((local_ip, 0)))) - .map_err(|err| { - Error::Socket("failed to bind socks5 server UDP associate socket", err) - })?; - - let socket = UdpSocket::from_std(StdUdpSocket::from(socket)).map_err(|err| { - Error::Socket("failed to create socks5 server UDP associate socket", err) - })?; - - Ok(Self { - socket: Arc::new(AssociatedUdpSocket::from((socket, max_pkt_size))), - assoc_id, - ctrl_addr, - }) - } - - pub async fn send(&self, pkt: Bytes, src_addr: Address) -> Result<(), Error> { - let src_addr_display = src_addr.to_string(); - - log::debug!( - "[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr}", - ctrl_addr = self.ctrl_addr, - assoc_id = self.assoc_id, - dst_addr = self.socket.peer_addr().unwrap(), - ); - - if let Err(err) = self.socket.send(pkt, 0, src_addr).await { - log::warn!( - "[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr} error: {err}", - ctrl_addr = self.ctrl_addr, - assoc_id = self.assoc_id, - dst_addr = self.socket.peer_addr().unwrap(), - ); - - return Err(Error::Io(err)); - } - - Ok(()) - } - - pub async fn recv(&self) -> Result<(Bytes, Address), Error> { - let (pkt, frag, dst_addr, src_addr) = self.socket.recv_from().await?; - - if let Ok(connected_addr) = self.socket.peer_addr() { - if src_addr != connected_addr { - Err(IoError::new( - ErrorKind::Other, - format!("invalid source address: {src_addr}"), - ))?; - } - } else { - self.socket.connect(src_addr).await?; - } - - if frag != 0 { - Err(IoError::new( - ErrorKind::Other, - "fragmented packet is not supported", - ))?; - } - - log::debug!( - "[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] receive packet from {src_addr} to {dst_addr}", - ctrl_addr = self.ctrl_addr, - assoc_id = self.assoc_id - ); - - Ok((pkt, dst_addr)) - } - - fn local_addr(&self) -> Result { - self.socket.local_addr() - } -} diff --git a/tuic-client/src/socks5/handle_task.rs b/tuic-client/src/socks5/handle_task.rs new file mode 100644 index 0000000..be376ff --- /dev/null +++ b/tuic-client/src/socks5/handle_task.rs @@ -0,0 +1,193 @@ +use super::{udp_session::UdpSession, Server, UDP_SESSIONS}; +use crate::connection::{Connection as TuicConnection, ERROR_CODE}; +use socks5_proto::{Address, Reply}; +use socks5_server::{ + connection::{associate, bind, connect}, + Associate, Bind, Connect, +}; +use tokio::io::{self, AsyncWriteExt}; +use tokio_util::compat::FuturesAsyncReadCompatExt; +use tuic::Address as TuicAddress; + +impl Server { + pub(super) async fn handle_associate( + assoc: Associate, + assoc_id: u16, + dual_stack: Option, + max_pkt_size: usize, + ) { + let peer_addr = assoc.peer_addr().unwrap(); + let local_ip = assoc.local_addr().unwrap().ip(); + + match UdpSession::new(assoc_id, peer_addr, local_ip, dual_stack, max_pkt_size) { + Ok(session) => { + let local_addr = session.local_addr().unwrap(); + log::debug!( + "[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] bound to {local_addr}" + ); + + let mut assoc = match assoc + .reply(Reply::Succeeded, Address::SocketAddress(local_addr)) + .await + { + Ok(assoc) => assoc, + Err(err) => { + log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}"); + return; + } + }; + + UDP_SESSIONS + .get() + .unwrap() + .lock() + .insert(assoc_id, session.clone()); + + let handle_local_incoming_pkt = async move { + loop { + let (pkt, target_addr) = match session.recv().await { + Ok(res) => res, + Err(err) => { + log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed to receive UDP packet: {err}"); + continue; + } + }; + + let forward = async move { + let target_addr = match target_addr { + Address::DomainAddress(domain, port) => { + TuicAddress::DomainAddress(domain, port) + } + Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr), + }; + + match TuicConnection::get().await { + Ok(conn) => conn.packet(pkt, target_addr, assoc_id).await, + Err(err) => Err(err), + } + }; + + tokio::spawn(async move { + match forward.await { + Ok(()) => {} + Err(err) => { + log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed relaying UDP packet: {err}"); + } + } + }); + } + }; + + match tokio::select! { + res = assoc.wait_until_closed() => res, + _ = handle_local_incoming_pkt => unreachable!(), + } { + Ok(()) => {} + Err(err) => { + log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] associate connection error: {err}") + } + } + + log::debug!( + "[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] stopped associating" + ); + + UDP_SESSIONS + .get() + .unwrap() + .lock() + .remove(&assoc_id) + .unwrap(); + + let res = match TuicConnection::get().await { + Ok(conn) => conn.dissociate(assoc_id).await, + Err(err) => Err(err), + }; + + match res { + Ok(()) => {} + Err(err) => log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed stoping UDP relaying session: {err}"), + } + } + Err(err) => { + log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed setting up UDP associate session: {err}"); + + match assoc + .reply(Reply::GeneralFailure, Address::unspecified()) + .await + { + Ok(mut assoc) => { + let _ = assoc.shutdown().await; + } + Err(err) => { + log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}") + } + } + } + } + } + + pub(super) async fn handle_bind(bind: Bind) { + let peer_addr = bind.peer_addr().unwrap(); + log::warn!("[socks5] [{peer_addr}] [bind] command not supported"); + + match bind + .reply(Reply::CommandNotSupported, Address::unspecified()) + .await + { + Ok(mut bind) => { + let _ = bind.shutdown().await; + } + Err(err) => log::warn!("[socks5] [{peer_addr}] [bind] command reply error: {err}"), + } + } + + pub(super) async fn handle_connect(conn: Connect, addr: Address) { + let peer_addr = conn.peer_addr().unwrap(); + let target_addr = match addr { + Address::DomainAddress(domain, port) => TuicAddress::DomainAddress(domain, port), + Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr), + }; + + let relay = match TuicConnection::get().await { + Ok(conn) => conn.connect(target_addr.clone()).await, + Err(err) => Err(err), + }; + + match relay { + Ok(relay) => { + let mut relay = relay.compat(); + + match conn.reply(Reply::Succeeded, Address::unspecified()).await { + Ok(mut conn) => match io::copy_bidirectional(&mut conn, &mut relay).await { + Ok(_) => {} + Err(err) => { + let _ = conn.shutdown().await; + let _ = relay.get_mut().reset(ERROR_CODE); + log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] TCP stream relaying error: {err}"); + } + }, + Err(err) => { + let _ = relay.shutdown().await; + log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}"); + } + } + } + Err(err) => { + log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] unable to relay TCP stream: {err}"); + + match conn + .reply(Reply::GeneralFailure, Address::unspecified()) + .await + { + Ok(mut conn) => { + let _ = conn.shutdown().await; + } + Err(err) => { + log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}") + } + } + } + } + } +} diff --git a/tuic-client/src/socks5/mod.rs b/tuic-client/src/socks5/mod.rs new file mode 100644 index 0000000..cc17e3f --- /dev/null +++ b/tuic-client/src/socks5/mod.rs @@ -0,0 +1,154 @@ +use crate::{config::Local, Error}; +use once_cell::sync::OnceCell; +use parking_lot::Mutex; +use socket2::{Domain, Protocol, SockAddr, Socket, Type}; +use socks5_server::{ + auth::{NoAuth, Password}, + Auth, Connection, Server as Socks5Server, +}; +use std::{ + collections::HashMap, + net::{SocketAddr, TcpListener as StdTcpListener}, + sync::{ + atomic::{AtomicU16, Ordering}, + Arc, + }, +}; +use tokio::net::TcpListener; + +mod handle_task; +mod udp_session; + +pub(crate) use self::udp_session::UDP_SESSIONS; + +static SERVER: OnceCell = OnceCell::new(); + +pub struct Server { + inner: Socks5Server, + dual_stack: Option, + max_pkt_size: usize, + next_assoc_id: AtomicU16, +} + +impl Server { + pub fn set_config(cfg: Local) -> Result<(), Error> { + SERVER + .set(Self::new( + cfg.server, + cfg.dual_stack, + cfg.max_packet_size, + cfg.username.map(|s| s.into_bytes()), + cfg.password.map(|s| s.into_bytes()), + )?) + .map_err(|_| "failed initializing socks5 server") + .unwrap(); + + UDP_SESSIONS + .set(Mutex::new(HashMap::new())) + .map_err(|_| "failed initializing socks5 UDP session pool") + .unwrap(); + + Ok(()) + } + + fn new( + addr: SocketAddr, + dual_stack: Option, + max_pkt_size: usize, + username: Option>, + password: Option>, + ) -> Result { + let socket = { + let domain = match addr { + SocketAddr::V4(_) => Domain::IPV4, + SocketAddr::V6(_) => Domain::IPV6, + }; + + let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) + .map_err(|err| Error::Socket("failed to create socks5 server socket", err))?; + + if let Some(dual_stack) = dual_stack { + socket.set_only_v6(!dual_stack).map_err(|err| { + Error::Socket("socks5 server dual-stack socket setting error", err) + })?; + } + + socket.set_reuse_address(true).map_err(|err| { + Error::Socket("failed to set socks5 server socket to reuse_address", err) + })?; + + socket.set_nonblocking(true).map_err(|err| { + Error::Socket("failed setting socks5 server socket as non-blocking", err) + })?; + + socket + .bind(&SockAddr::from(addr)) + .map_err(|err| Error::Socket("failed to bind socks5 server socket", err))?; + + socket + .listen(i32::MAX) + .map_err(|err| Error::Socket("failed to listen on socks5 server socket", err))?; + + TcpListener::from_std(StdTcpListener::from(socket)) + .map_err(|err| Error::Socket("failed to create socks5 server socket", err))? + }; + + let auth: Arc = match (username, password) { + (Some(username), Some(password)) => Arc::new(Password::new(username, password)), + (None, None) => Arc::new(NoAuth), + _ => return Err(Error::InvalidSocks5Auth), + }; + + Ok(Self { + inner: Socks5Server::new(socket, auth), + dual_stack, + max_pkt_size, + next_assoc_id: AtomicU16::new(0), + }) + } + + pub async fn start() { + let server = SERVER.get().unwrap(); + + log::warn!( + "[socks5] server started, listening on {}", + server.inner.local_addr().unwrap() + ); + + loop { + match server.inner.accept().await { + Ok((conn, addr)) => { + log::debug!("[socks5] [{addr}] connection established"); + + tokio::spawn(async move { + match conn.handshake().await { + Ok(Connection::Associate(associate, _)) => { + let assoc_id = server.next_assoc_id.fetch_add(1, Ordering::Relaxed); + log::info!("[socks5] [{addr}] [associate] [{assoc_id:#06x}]"); + Self::handle_associate( + associate, + assoc_id, + server.dual_stack, + server.max_pkt_size, + ) + .await; + } + Ok(Connection::Bind(bind, _)) => { + log::info!("[socks5] [{addr}] [bind]"); + Self::handle_bind(bind).await; + } + Ok(Connection::Connect(connect, target_addr)) => { + log::info!("[socks5] [{addr}] [connect] {target_addr}"); + Self::handle_connect(connect, target_addr).await; + } + Err(err) => log::warn!("[socks5] [{addr}] handshake error: {err}"), + }; + + log::debug!("[socks5] [{addr}] connection closed"); + }); + } + Err(err) => log::warn!("[socks5] failed to establish connection: {err}"), + } + } + } +} diff --git a/tuic-client/src/socks5/udp_session.rs b/tuic-client/src/socks5/udp_session.rs new file mode 100644 index 0000000..bf18fed --- /dev/null +++ b/tuic-client/src/socks5/udp_session.rs @@ -0,0 +1,132 @@ +use crate::Error; +use bytes::Bytes; +use once_cell::sync::OnceCell; +use parking_lot::Mutex; +use socket2::{Domain, Protocol, SockAddr, Socket, Type}; +use socks5_proto::Address; +use socks5_server::AssociatedUdpSocket; +use std::{ + collections::HashMap, + io::{Error as IoError, ErrorKind}, + net::{IpAddr, SocketAddr, UdpSocket as StdUdpSocket}, + sync::Arc, +}; +use tokio::net::UdpSocket; + +pub(crate) static UDP_SESSIONS: OnceCell>> = OnceCell::new(); + +#[derive(Clone)] +pub(crate) struct UdpSession { + socket: Arc, + assoc_id: u16, + ctrl_addr: SocketAddr, +} + +impl UdpSession { + pub(super) fn new( + assoc_id: u16, + ctrl_addr: SocketAddr, + local_ip: IpAddr, + dual_stack: Option, + max_pkt_size: usize, + ) -> Result { + let domain = match local_ip { + IpAddr::V4(_) => Domain::IPV4, + IpAddr::V6(_) => Domain::IPV6, + }; + + let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)).map_err(|err| { + Error::Socket("failed to create socks5 server UDP associate socket", err) + })?; + + if let Some(dual_stack) = dual_stack { + socket.set_only_v6(!dual_stack).map_err(|err| { + Error::Socket( + "socks5 server UDP associate dual-stack socket setting error", + err, + ) + })?; + } + + socket.set_nonblocking(true).map_err(|err| { + Error::Socket( + "failed setting socks5 server UDP associate socket as non-blocking", + err, + ) + })?; + + socket + .bind(&SockAddr::from(SocketAddr::from((local_ip, 0)))) + .map_err(|err| { + Error::Socket("failed to bind socks5 server UDP associate socket", err) + })?; + + let socket = UdpSocket::from_std(StdUdpSocket::from(socket)).map_err(|err| { + Error::Socket("failed to create socks5 server UDP associate socket", err) + })?; + + Ok(Self { + socket: Arc::new(AssociatedUdpSocket::from((socket, max_pkt_size))), + assoc_id, + ctrl_addr, + }) + } + + pub(crate) async fn send(&self, pkt: Bytes, src_addr: Address) -> Result<(), Error> { + let src_addr_display = src_addr.to_string(); + + log::debug!( + "[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr}", + ctrl_addr = self.ctrl_addr, + assoc_id = self.assoc_id, + dst_addr = self.socket.peer_addr().unwrap(), + ); + + if let Err(err) = self.socket.send(pkt, 0, src_addr).await { + log::warn!( + "[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr} error: {err}", + ctrl_addr = self.ctrl_addr, + assoc_id = self.assoc_id, + dst_addr = self.socket.peer_addr().unwrap(), + ); + + return Err(Error::Io(err)); + } + + Ok(()) + } + + pub(crate) async fn recv(&self) -> Result<(Bytes, Address), Error> { + let (pkt, frag, dst_addr, src_addr) = self.socket.recv_from().await?; + + if let Ok(connected_addr) = self.socket.peer_addr() { + if src_addr != connected_addr { + Err(IoError::new( + ErrorKind::Other, + format!("invalid source address: {src_addr}"), + ))?; + } + } else { + self.socket.connect(src_addr).await?; + } + + if frag != 0 { + Err(IoError::new( + ErrorKind::Other, + "fragmented packet is not supported", + ))?; + } + + log::debug!( + "[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] receive packet from {src_addr} to {dst_addr}", + ctrl_addr = self.ctrl_addr, + assoc_id = self.assoc_id + ); + + Ok((pkt, dst_addr)) + } + + pub(super) fn local_addr(&self) -> Result { + self.socket.local_addr() + } +} diff --git a/tuic-client/src/utils.rs b/tuic-client/src/utils.rs index 5292a5e..25b4d64 100644 --- a/tuic-client/src/utils.rs +++ b/tuic-client/src/utils.rs @@ -10,7 +10,10 @@ use std::{ }; use tokio::net; -pub fn load_certs(paths: Vec, disable_native: bool) -> Result { +pub(crate) fn load_certs( + paths: Vec, + disable_native: bool, +) -> Result { let mut certs = RootCertStore::empty(); for path in &paths {