reorganizing modules
This commit is contained in:
parent
a5ead464f9
commit
2b6cbe7287
114
tuic-client/src/connection/handle_stream.rs
Normal file
114
tuic-client/src/connection/handle_stream.rs
Normal file
@ -0,0 +1,114 @@
|
|||||||
|
use super::Connection;
|
||||||
|
use crate::{utils::UdpRelayMode, Error};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use quinn::{RecvStream, SendStream, VarInt};
|
||||||
|
use register_count::Register;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
use tuic_quinn::Task;
|
||||||
|
|
||||||
|
impl Connection {
|
||||||
|
pub(super) async fn accept_uni_stream(&self) -> Result<(RecvStream, Register), Error> {
|
||||||
|
let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed);
|
||||||
|
|
||||||
|
if self.remote_uni_stream_cnt.count() as u32 == max {
|
||||||
|
self.max_concurrent_uni_streams
|
||||||
|
.store(max * 2, Ordering::Relaxed);
|
||||||
|
|
||||||
|
self.conn
|
||||||
|
.set_max_concurrent_uni_streams(VarInt::from(max * 2));
|
||||||
|
}
|
||||||
|
|
||||||
|
let recv = self.conn.accept_uni().await?;
|
||||||
|
let reg = self.remote_uni_stream_cnt.reg();
|
||||||
|
Ok((recv, reg))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn accept_bi_stream(
|
||||||
|
&self,
|
||||||
|
) -> Result<(SendStream, RecvStream, Register), Error> {
|
||||||
|
let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed);
|
||||||
|
|
||||||
|
if self.remote_bi_stream_cnt.count() as u32 == max {
|
||||||
|
self.max_concurrent_bi_streams
|
||||||
|
.store(max * 2, Ordering::Relaxed);
|
||||||
|
|
||||||
|
self.conn
|
||||||
|
.set_max_concurrent_bi_streams(VarInt::from(max * 2));
|
||||||
|
}
|
||||||
|
|
||||||
|
let (send, recv) = self.conn.accept_bi().await?;
|
||||||
|
let reg = self.remote_bi_stream_cnt.reg();
|
||||||
|
Ok((send, recv, reg))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn accept_datagram(&self) -> Result<Bytes, Error> {
|
||||||
|
Ok(self.conn.read_datagram().await?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) {
|
||||||
|
log::debug!("[relay] incoming unidirectional stream");
|
||||||
|
|
||||||
|
let res = match self.model.accept_uni_stream(recv).await {
|
||||||
|
Err(err) => Err(Error::Model(err)),
|
||||||
|
Ok(Task::Packet(pkt)) => match self.udp_relay_mode {
|
||||||
|
UdpRelayMode::Quic => {
|
||||||
|
log::info!(
|
||||||
|
"[relay] [packet] [{assoc_id:#06x}] [from-quic] [{pkt_id:#06x}] {frag_id}/{frag_total}",
|
||||||
|
assoc_id = pkt.assoc_id(),
|
||||||
|
pkt_id = pkt.pkt_id(),
|
||||||
|
frag_id = pkt.frag_id(),
|
||||||
|
frag_total = pkt.frag_total(),
|
||||||
|
);
|
||||||
|
Self::handle_packet(pkt).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
UdpRelayMode::Native => Err(Error::WrongPacketSource),
|
||||||
|
},
|
||||||
|
_ => unreachable!(), // already filtered in `tuic_quinn`
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = res {
|
||||||
|
log::warn!("[relay] incoming unidirectional stream error: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn handle_bi_stream(self, send: SendStream, recv: RecvStream, _reg: Register) {
|
||||||
|
log::debug!("[relay] incoming bidirectional stream");
|
||||||
|
|
||||||
|
let res = match self.model.accept_bi_stream(send, recv).await {
|
||||||
|
Err(err) => Err::<(), _>(Error::Model(err)),
|
||||||
|
_ => unreachable!(), // already filtered in `tuic_quinn`
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = res {
|
||||||
|
log::warn!("[relay] incoming bidirectional stream error: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn handle_datagram(self, dg: Bytes) {
|
||||||
|
log::debug!("[relay] incoming datagram");
|
||||||
|
|
||||||
|
let res = match self.model.accept_datagram(dg) {
|
||||||
|
Err(err) => Err(Error::Model(err)),
|
||||||
|
Ok(Task::Packet(pkt)) => match self.udp_relay_mode {
|
||||||
|
UdpRelayMode::Native => {
|
||||||
|
log::info!(
|
||||||
|
"[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {frag_id}/{frag_total}",
|
||||||
|
assoc_id = pkt.assoc_id(),
|
||||||
|
pkt_id = pkt.pkt_id(),
|
||||||
|
frag_id = pkt.frag_id(),
|
||||||
|
frag_total = pkt.frag_total(),
|
||||||
|
);
|
||||||
|
Self::handle_packet(pkt).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
UdpRelayMode::Quic => Err(Error::WrongPacketSource),
|
||||||
|
},
|
||||||
|
_ => unreachable!(), // already filtered in `tuic_quinn`
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(err) = res {
|
||||||
|
log::warn!("[relay] incoming datagram error: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
131
tuic-client/src/connection/handle_task.rs
Normal file
131
tuic-client/src/connection/handle_task.rs
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
use super::Connection;
|
||||||
|
use crate::{socks5::UDP_SESSIONS as SOCKS5_UDP_SESSIONS, utils::UdpRelayMode, Error};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use socks5_proto::Address as Socks5Address;
|
||||||
|
use std::time::Duration;
|
||||||
|
use tokio::time;
|
||||||
|
use tuic::Address;
|
||||||
|
use tuic_quinn::{Connect, Packet};
|
||||||
|
|
||||||
|
impl Connection {
|
||||||
|
pub(super) async fn authenticate(self) {
|
||||||
|
log::debug!("[relay] [authenticate] sending authentication");
|
||||||
|
|
||||||
|
match self
|
||||||
|
.model
|
||||||
|
.authenticate(self.uuid, self.password.clone())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(()) => log::info!("[relay] [authenticate] {uuid}", uuid = self.uuid),
|
||||||
|
Err(err) => log::warn!("[relay] [authenticate] authentication sending error: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn connect(&self, addr: Address) -> Result<Connect, Error> {
|
||||||
|
let addr_display = addr.to_string();
|
||||||
|
log::info!("[relay] [connect] {addr_display}");
|
||||||
|
|
||||||
|
match self.model.connect(addr).await {
|
||||||
|
Ok(conn) => Ok(conn),
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[relay] [connect] failed initializing relay to {addr_display}: {err}");
|
||||||
|
Err(Error::Model(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn packet(&self, pkt: Bytes, addr: Address, assoc_id: u16) -> Result<(), Error> {
|
||||||
|
let addr_display = addr.to_string();
|
||||||
|
|
||||||
|
match self.udp_relay_mode {
|
||||||
|
UdpRelayMode::Native => {
|
||||||
|
log::info!("[relay] [packet] [{assoc_id:#06x}] [to-native] {addr_display}");
|
||||||
|
match self.model.packet_native(pkt, addr, assoc_id) {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[relay] [packet] [{assoc_id:#06x}] [to-native] failed relaying packet to {addr_display}: {err}");
|
||||||
|
Err(Error::Model(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
UdpRelayMode::Quic => {
|
||||||
|
log::info!("[relay] [packet] [{assoc_id:#06x}] [to-quic] {addr_display}");
|
||||||
|
match self.model.packet_quic(pkt, addr, assoc_id).await {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[relay] [packet] [{assoc_id:#06x}] [to-quic] failed relaying packet to {addr_display}: {err}");
|
||||||
|
Err(Error::Model(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn dissociate(&self, assoc_id: u16) -> Result<(), Error> {
|
||||||
|
log::info!("[relay] [dissociate] [{assoc_id:#06x}]");
|
||||||
|
match self.model.dissociate(assoc_id).await {
|
||||||
|
Ok(()) => Ok(()),
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[relay] [dissociate] [{assoc_id:#06x}] failed dissociating: {err}");
|
||||||
|
Err(Error::Model(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn heartbeat(self, heartbeat: Duration) {
|
||||||
|
loop {
|
||||||
|
time::sleep(heartbeat).await;
|
||||||
|
|
||||||
|
if self.is_closed() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.model.task_connect_count() + self.model.task_associate_count() == 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.model.heartbeat().await {
|
||||||
|
Ok(()) => log::debug!("[relay] [heartbeat]"),
|
||||||
|
Err(err) => log::warn!("[relay] [heartbeat] heartbeat sending error: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn handle_packet(pkt: Packet) {
|
||||||
|
let assoc_id = pkt.assoc_id();
|
||||||
|
let pkt_id = pkt.pkt_id();
|
||||||
|
|
||||||
|
match pkt.accept().await {
|
||||||
|
Ok(Some((pkt, addr, _))) => {
|
||||||
|
log::info!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {addr}");
|
||||||
|
|
||||||
|
let addr = match addr {
|
||||||
|
Address::None => unreachable!(),
|
||||||
|
Address::DomainAddress(domain, port) => {
|
||||||
|
Socks5Address::DomainAddress(domain, port)
|
||||||
|
}
|
||||||
|
Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr),
|
||||||
|
};
|
||||||
|
|
||||||
|
let session = SOCKS5_UDP_SESSIONS
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.lock()
|
||||||
|
.get(&assoc_id)
|
||||||
|
.cloned();
|
||||||
|
|
||||||
|
if let Some(session) = session {
|
||||||
|
if let Err(err) = session.send(pkt, addr).await {
|
||||||
|
log::warn!(
|
||||||
|
"[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] failed sending packet to socks5 client: {err}",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] unable to find socks5 associate session");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => {}
|
||||||
|
Err(err) => log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] packet receiving error: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,57 +1,54 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
config::Relay,
|
config::Relay,
|
||||||
socks5::UDP_SESSIONS as SOCKS5_UDP_SESSIONS,
|
|
||||||
utils::{self, CongestionControl, ServerAddr, UdpRelayMode},
|
utils::{self, CongestionControl, ServerAddr, UdpRelayMode},
|
||||||
Error,
|
Error,
|
||||||
};
|
};
|
||||||
use bytes::Bytes;
|
|
||||||
use crossbeam_utils::atomic::AtomicCell;
|
use crossbeam_utils::atomic::AtomicCell;
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use quinn::{
|
use quinn::{
|
||||||
congestion::{BbrConfig, CubicConfig, NewRenoConfig},
|
congestion::{BbrConfig, CubicConfig, NewRenoConfig},
|
||||||
ClientConfig, Connection as QuinnConnection, Endpoint as QuinnEndpoint, EndpointConfig,
|
ClientConfig, Connection as QuinnConnection, Endpoint as QuinnEndpoint, EndpointConfig,
|
||||||
RecvStream, SendStream, TokioRuntime, TransportConfig, VarInt,
|
TokioRuntime, TransportConfig, VarInt,
|
||||||
};
|
};
|
||||||
use register_count::{Counter, Register};
|
use register_count::Counter;
|
||||||
use rustls::{version, ClientConfig as RustlsClientConfig};
|
use rustls::{version, ClientConfig as RustlsClientConfig};
|
||||||
use socks5_proto::Address as Socks5Address;
|
|
||||||
use std::{
|
use std::{
|
||||||
net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
|
net::{Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
|
||||||
sync::{
|
sync::{atomic::AtomicU32, Arc},
|
||||||
atomic::{AtomicU32, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{Mutex as AsyncMutex, OnceCell as AsyncOnceCell},
|
sync::{Mutex as AsyncMutex, OnceCell as AsyncOnceCell},
|
||||||
time,
|
time,
|
||||||
};
|
};
|
||||||
use tuic::Address;
|
use tuic_quinn::{side, Connection as Model};
|
||||||
use tuic_quinn::{side, Connect, Connection as Model, Packet, Task};
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
mod handle_stream;
|
||||||
|
mod handle_task;
|
||||||
|
|
||||||
static ENDPOINT: OnceCell<Mutex<Endpoint>> = OnceCell::new();
|
static ENDPOINT: OnceCell<Mutex<Endpoint>> = OnceCell::new();
|
||||||
static CONNECTION: AsyncOnceCell<AsyncMutex<Connection>> = AsyncOnceCell::const_new();
|
static CONNECTION: AsyncOnceCell<AsyncMutex<Connection>> = AsyncOnceCell::const_new();
|
||||||
static TIMEOUT: AtomicCell<Duration> = AtomicCell::new(Duration::from_secs(0));
|
static TIMEOUT: AtomicCell<Duration> = AtomicCell::new(Duration::from_secs(0));
|
||||||
|
|
||||||
pub const ERROR_CODE: VarInt = VarInt::from_u32(0);
|
pub(crate) const ERROR_CODE: VarInt = VarInt::from_u32(0);
|
||||||
const DEFAULT_CONCURRENT_STREAMS: u32 = 32;
|
const DEFAULT_CONCURRENT_STREAMS: u32 = 32;
|
||||||
|
|
||||||
pub struct Endpoint {
|
#[derive(Clone)]
|
||||||
ep: QuinnEndpoint,
|
pub struct Connection {
|
||||||
server: ServerAddr,
|
conn: QuinnConnection,
|
||||||
|
model: Model<side::Client>,
|
||||||
uuid: Uuid,
|
uuid: Uuid,
|
||||||
password: Arc<[u8]>,
|
password: Arc<[u8]>,
|
||||||
udp_relay_mode: UdpRelayMode,
|
udp_relay_mode: UdpRelayMode,
|
||||||
zero_rtt_handshake: bool,
|
remote_uni_stream_cnt: Counter,
|
||||||
heartbeat: Duration,
|
remote_bi_stream_cnt: Counter,
|
||||||
gc_interval: Duration,
|
max_concurrent_uni_streams: Arc<AtomicU32>,
|
||||||
gc_lifetime: Duration,
|
max_concurrent_bi_streams: Arc<AtomicU32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Endpoint {
|
impl Connection {
|
||||||
pub fn set_config(cfg: Relay) -> Result<(), Error> {
|
pub fn set_config(cfg: Relay) -> Result<(), Error> {
|
||||||
let certs = utils::load_certs(cfg.certificates, cfg.disable_native_certs)?;
|
let certs = utils::load_certs(cfg.certificates, cfg.disable_native_certs)?;
|
||||||
|
|
||||||
@ -107,7 +104,7 @@ impl Endpoint {
|
|||||||
|
|
||||||
ep.set_default_client_config(config);
|
ep.set_default_client_config(config);
|
||||||
|
|
||||||
let ep = Self {
|
let ep = Endpoint {
|
||||||
ep,
|
ep,
|
||||||
server: ServerAddr::new(cfg.server.0, cfg.server.1, cfg.ip),
|
server: ServerAddr::new(cfg.server.0, cfg.server.1, cfg.ip),
|
||||||
uuid: cfg.uuid,
|
uuid: cfg.uuid,
|
||||||
@ -129,79 +126,6 @@ impl Endpoint {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connect(&mut self) -> Result<Connection, Error> {
|
|
||||||
let mut last_err = None;
|
|
||||||
|
|
||||||
for addr in self.server.resolve().await? {
|
|
||||||
let connect_to = async {
|
|
||||||
let match_ipv4 =
|
|
||||||
addr.is_ipv4() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv4());
|
|
||||||
let match_ipv6 =
|
|
||||||
addr.is_ipv6() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv6());
|
|
||||||
|
|
||||||
if !match_ipv4 && !match_ipv6 {
|
|
||||||
let bind_addr = if addr.is_ipv4() {
|
|
||||||
SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0))
|
|
||||||
} else {
|
|
||||||
SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0))
|
|
||||||
};
|
|
||||||
|
|
||||||
self.ep
|
|
||||||
.rebind(UdpSocket::bind(bind_addr).map_err(|err| {
|
|
||||||
Error::Socket("failed to create endpoint UDP socket", err)
|
|
||||||
})?)
|
|
||||||
.map_err(|err| {
|
|
||||||
Error::Socket("failed to rebind endpoint UDP socket", err)
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
|
|
||||||
let conn = self.ep.connect(addr, self.server.server_name())?;
|
|
||||||
let conn = if self.zero_rtt_handshake {
|
|
||||||
match conn.into_0rtt() {
|
|
||||||
Ok((conn, _)) => conn,
|
|
||||||
Err(conn) => conn.await?,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
conn.await?
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(conn)
|
|
||||||
};
|
|
||||||
|
|
||||||
match connect_to.await {
|
|
||||||
Ok(conn) => {
|
|
||||||
return Ok(Connection::new(
|
|
||||||
conn,
|
|
||||||
self.udp_relay_mode,
|
|
||||||
self.uuid,
|
|
||||||
self.password.clone(),
|
|
||||||
self.heartbeat,
|
|
||||||
self.gc_interval,
|
|
||||||
self.gc_lifetime,
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Err(err) => last_err = Some(err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(last_err.unwrap_or(Error::DnsResolve))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Connection {
|
|
||||||
conn: QuinnConnection,
|
|
||||||
model: Model<side::Client>,
|
|
||||||
uuid: Uuid,
|
|
||||||
password: Arc<[u8]>,
|
|
||||||
udp_relay_mode: UdpRelayMode,
|
|
||||||
remote_uni_stream_cnt: Counter,
|
|
||||||
remote_bi_stream_cnt: Counter,
|
|
||||||
max_concurrent_uni_streams: Arc<AtomicU32>,
|
|
||||||
max_concurrent_bi_streams: Arc<AtomicU32>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Connection {
|
|
||||||
pub async fn get() -> Result<Connection, Error> {
|
pub async fn get() -> Result<Connection, Error> {
|
||||||
let try_init_conn = async {
|
let try_init_conn = async {
|
||||||
ENDPOINT
|
ENDPOINT
|
||||||
@ -288,237 +212,10 @@ impl Connection {
|
|||||||
log::warn!("[relay] connection error: {err}");
|
log::warn!("[relay] connection error: {err}");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn connect(&self, addr: Address) -> Result<Connect, Error> {
|
|
||||||
let addr_display = addr.to_string();
|
|
||||||
log::info!("[relay] [connect] {addr_display}");
|
|
||||||
|
|
||||||
match self.model.connect(addr).await {
|
|
||||||
Ok(conn) => Ok(conn),
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[relay] [connect] failed initializing relay to {addr_display}: {err}");
|
|
||||||
Err(Error::Model(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn packet(&self, pkt: Bytes, addr: Address, assoc_id: u16) -> Result<(), Error> {
|
|
||||||
let addr_display = addr.to_string();
|
|
||||||
|
|
||||||
match self.udp_relay_mode {
|
|
||||||
UdpRelayMode::Native => {
|
|
||||||
log::info!("[relay] [packet] [{assoc_id:#06x}] [to-native] {addr_display}");
|
|
||||||
match self.model.packet_native(pkt, addr, assoc_id) {
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[relay] [packet] [{assoc_id:#06x}] [to-native] failed relaying packet to {addr_display}: {err}");
|
|
||||||
Err(Error::Model(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
UdpRelayMode::Quic => {
|
|
||||||
log::info!("[relay] [packet] [{assoc_id:#06x}] [to-quic] {addr_display}");
|
|
||||||
match self.model.packet_quic(pkt, addr, assoc_id).await {
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[relay] [packet] [{assoc_id:#06x}] [to-quic] failed relaying packet to {addr_display}: {err}");
|
|
||||||
Err(Error::Model(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn dissociate(&self, assoc_id: u16) -> Result<(), Error> {
|
|
||||||
log::info!("[relay] [dissociate] [{assoc_id:#06x}]");
|
|
||||||
match self.model.dissociate(assoc_id).await {
|
|
||||||
Ok(()) => Ok(()),
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[relay] [dissociate] [{assoc_id:#06x}] failed dissociating: {err}");
|
|
||||||
Err(Error::Model(err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_closed(&self) -> bool {
|
fn is_closed(&self) -> bool {
|
||||||
self.conn.close_reason().is_some()
|
self.conn.close_reason().is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn accept_uni_stream(&self) -> Result<(RecvStream, Register), Error> {
|
|
||||||
let max = self.max_concurrent_uni_streams.load(Ordering::Relaxed);
|
|
||||||
|
|
||||||
if self.remote_uni_stream_cnt.count() as u32 == max {
|
|
||||||
self.max_concurrent_uni_streams
|
|
||||||
.store(max * 2, Ordering::Relaxed);
|
|
||||||
|
|
||||||
self.conn
|
|
||||||
.set_max_concurrent_uni_streams(VarInt::from(max * 2));
|
|
||||||
}
|
|
||||||
|
|
||||||
let recv = self.conn.accept_uni().await?;
|
|
||||||
let reg = self.remote_uni_stream_cnt.reg();
|
|
||||||
Ok((recv, reg))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn accept_bi_stream(&self) -> Result<(SendStream, RecvStream, Register), Error> {
|
|
||||||
let max = self.max_concurrent_bi_streams.load(Ordering::Relaxed);
|
|
||||||
|
|
||||||
if self.remote_bi_stream_cnt.count() as u32 == max {
|
|
||||||
self.max_concurrent_bi_streams
|
|
||||||
.store(max * 2, Ordering::Relaxed);
|
|
||||||
|
|
||||||
self.conn
|
|
||||||
.set_max_concurrent_bi_streams(VarInt::from(max * 2));
|
|
||||||
}
|
|
||||||
|
|
||||||
let (send, recv) = self.conn.accept_bi().await?;
|
|
||||||
let reg = self.remote_bi_stream_cnt.reg();
|
|
||||||
Ok((send, recv, reg))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn accept_datagram(&self) -> Result<Bytes, Error> {
|
|
||||||
Ok(self.conn.read_datagram().await?)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) {
|
|
||||||
log::debug!("[relay] incoming unidirectional stream");
|
|
||||||
|
|
||||||
let res = match self.model.accept_uni_stream(recv).await {
|
|
||||||
Err(err) => Err(Error::Model(err)),
|
|
||||||
Ok(Task::Packet(pkt)) => match self.udp_relay_mode {
|
|
||||||
UdpRelayMode::Quic => {
|
|
||||||
log::info!(
|
|
||||||
"[relay] [packet] [{assoc_id:#06x}] [from-quic] [{pkt_id:#06x}] {frag_id}/{frag_total}",
|
|
||||||
assoc_id = pkt.assoc_id(),
|
|
||||||
pkt_id = pkt.pkt_id(),
|
|
||||||
frag_id = pkt.frag_id(),
|
|
||||||
frag_total = pkt.frag_total(),
|
|
||||||
);
|
|
||||||
Self::handle_packet(pkt).await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
UdpRelayMode::Native => Err(Error::WrongPacketSource),
|
|
||||||
},
|
|
||||||
_ => unreachable!(), // already filtered in `tuic_quinn`
|
|
||||||
};
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => log::warn!("[relay] incoming unidirectional stream error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_bi_stream(self, send: SendStream, recv: RecvStream, _reg: Register) {
|
|
||||||
log::debug!("[relay] incoming bidirectional stream");
|
|
||||||
|
|
||||||
let res = match self.model.accept_bi_stream(send, recv).await {
|
|
||||||
Err(err) => Err(Error::Model(err)),
|
|
||||||
_ => unreachable!(), // already filtered in `tuic_quinn`
|
|
||||||
};
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => log::warn!("[relay] incoming bidirectional stream error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_datagram(self, dg: Bytes) {
|
|
||||||
log::debug!("[relay] incoming datagram");
|
|
||||||
|
|
||||||
let res = match self.model.accept_datagram(dg) {
|
|
||||||
Err(err) => Err(Error::Model(err)),
|
|
||||||
Ok(Task::Packet(pkt)) => match self.udp_relay_mode {
|
|
||||||
UdpRelayMode::Native => {
|
|
||||||
log::info!(
|
|
||||||
"[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {frag_id}/{frag_total}",
|
|
||||||
assoc_id = pkt.assoc_id(),
|
|
||||||
pkt_id = pkt.pkt_id(),
|
|
||||||
frag_id = pkt.frag_id(),
|
|
||||||
frag_total = pkt.frag_total(),
|
|
||||||
);
|
|
||||||
Self::handle_packet(pkt).await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
UdpRelayMode::Quic => Err(Error::WrongPacketSource),
|
|
||||||
},
|
|
||||||
_ => unreachable!(), // already filtered in `tuic_quinn`
|
|
||||||
};
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => log::warn!("[relay] incoming datagram error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn authenticate(self) {
|
|
||||||
log::debug!("[relay] [authenticate] sending authentication");
|
|
||||||
|
|
||||||
match self
|
|
||||||
.model
|
|
||||||
.authenticate(self.uuid, self.password.clone())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(()) => log::info!("[relay] [authenticate] {uuid}", uuid = self.uuid),
|
|
||||||
Err(err) => log::warn!("[relay] [authenticate] authentication sending error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn heartbeat(self, heartbeat: Duration) {
|
|
||||||
loop {
|
|
||||||
time::sleep(heartbeat).await;
|
|
||||||
|
|
||||||
if self.is_closed() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if self.model.task_connect_count() + self.model.task_associate_count() == 0 {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.model.heartbeat().await {
|
|
||||||
Ok(()) => log::debug!("[relay] [heartbeat]"),
|
|
||||||
Err(err) => log::warn!("[relay] [heartbeat] heartbeat sending error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_packet(pkt: Packet) {
|
|
||||||
let assoc_id = pkt.assoc_id();
|
|
||||||
let pkt_id = pkt.pkt_id();
|
|
||||||
|
|
||||||
match pkt.accept().await {
|
|
||||||
Ok(Some((pkt, addr, _))) => {
|
|
||||||
log::info!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {addr}");
|
|
||||||
|
|
||||||
let addr = match addr {
|
|
||||||
Address::None => unreachable!(),
|
|
||||||
Address::DomainAddress(domain, port) => {
|
|
||||||
Socks5Address::DomainAddress(domain, port)
|
|
||||||
}
|
|
||||||
Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr),
|
|
||||||
};
|
|
||||||
|
|
||||||
let session = SOCKS5_UDP_SESSIONS
|
|
||||||
.get()
|
|
||||||
.unwrap()
|
|
||||||
.lock()
|
|
||||||
.get(&assoc_id)
|
|
||||||
.cloned();
|
|
||||||
|
|
||||||
if let Some(session) = session {
|
|
||||||
if let Err(err) = session.send(pkt, addr).await {
|
|
||||||
log::warn!(
|
|
||||||
"[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] failed sending packet to socks5 client: {err}",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] unable to find socks5 associate session");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(None) => {}
|
|
||||||
Err(err) => log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] packet receiving error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) {
|
async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) {
|
||||||
loop {
|
loop {
|
||||||
time::sleep(gc_interval).await;
|
time::sleep(gc_interval).await;
|
||||||
@ -532,3 +229,75 @@ impl Connection {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct Endpoint {
|
||||||
|
ep: QuinnEndpoint,
|
||||||
|
server: ServerAddr,
|
||||||
|
uuid: Uuid,
|
||||||
|
password: Arc<[u8]>,
|
||||||
|
udp_relay_mode: UdpRelayMode,
|
||||||
|
zero_rtt_handshake: bool,
|
||||||
|
heartbeat: Duration,
|
||||||
|
gc_interval: Duration,
|
||||||
|
gc_lifetime: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Endpoint {
|
||||||
|
async fn connect(&mut self) -> Result<Connection, Error> {
|
||||||
|
let mut last_err = None;
|
||||||
|
|
||||||
|
for addr in self.server.resolve().await? {
|
||||||
|
let connect_to = async {
|
||||||
|
let match_ipv4 =
|
||||||
|
addr.is_ipv4() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv4());
|
||||||
|
let match_ipv6 =
|
||||||
|
addr.is_ipv6() && self.ep.local_addr().map_or(false, |addr| addr.is_ipv6());
|
||||||
|
|
||||||
|
if !match_ipv4 && !match_ipv6 {
|
||||||
|
let bind_addr = if addr.is_ipv4() {
|
||||||
|
SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0))
|
||||||
|
} else {
|
||||||
|
SocketAddr::from((Ipv6Addr::UNSPECIFIED, 0))
|
||||||
|
};
|
||||||
|
|
||||||
|
self.ep
|
||||||
|
.rebind(UdpSocket::bind(bind_addr).map_err(|err| {
|
||||||
|
Error::Socket("failed to create endpoint UDP socket", err)
|
||||||
|
})?)
|
||||||
|
.map_err(|err| {
|
||||||
|
Error::Socket("failed to rebind endpoint UDP socket", err)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let conn = self.ep.connect(addr, self.server.server_name())?;
|
||||||
|
let conn = if self.zero_rtt_handshake {
|
||||||
|
match conn.into_0rtt() {
|
||||||
|
Ok((conn, _)) => conn,
|
||||||
|
Err(conn) => conn.await?,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
conn.await?
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(conn)
|
||||||
|
};
|
||||||
|
|
||||||
|
match connect_to.await {
|
||||||
|
Ok(conn) => {
|
||||||
|
return Ok(Connection::new(
|
||||||
|
conn,
|
||||||
|
self.udp_relay_mode,
|
||||||
|
self.uuid,
|
||||||
|
self.password.clone(),
|
||||||
|
self.heartbeat,
|
||||||
|
self.gc_interval,
|
||||||
|
self.gc_lifetime,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Err(err) => last_err = Some(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(last_err.unwrap_or(Error::DnsResolve))
|
||||||
|
}
|
||||||
|
}
|
31
tuic-client/src/error.rs
Normal file
31
tuic-client/src/error.rs
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
use quinn::{ConnectError, ConnectionError};
|
||||||
|
use rustls::Error as RustlsError;
|
||||||
|
use std::io::Error as IoError;
|
||||||
|
use thiserror::Error;
|
||||||
|
use tuic_quinn::Error as ModelError;
|
||||||
|
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error(transparent)]
|
||||||
|
Io(#[from] IoError),
|
||||||
|
#[error(transparent)]
|
||||||
|
Connect(#[from] ConnectError),
|
||||||
|
#[error(transparent)]
|
||||||
|
Connection(#[from] ConnectionError),
|
||||||
|
#[error(transparent)]
|
||||||
|
Model(#[from] ModelError),
|
||||||
|
#[error("load native certificates error: {0}")]
|
||||||
|
LoadNativeCerts(IoError),
|
||||||
|
#[error(transparent)]
|
||||||
|
Rustls(#[from] RustlsError),
|
||||||
|
#[error("{0}: {1}")]
|
||||||
|
Socket(&'static str, IoError),
|
||||||
|
#[error("timeout establishing connection")]
|
||||||
|
Timeout,
|
||||||
|
#[error("cannot resolve the server name")]
|
||||||
|
DnsResolve,
|
||||||
|
#[error("received packet from an unexpected source")]
|
||||||
|
WrongPacketSource,
|
||||||
|
#[error("invalid socks5 authentication")]
|
||||||
|
InvalidSocks5Auth,
|
||||||
|
}
|
13
tuic-client/src/lib.rs
Normal file
13
tuic-client/src/lib.rs
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
mod config;
|
||||||
|
mod connection;
|
||||||
|
mod error;
|
||||||
|
mod socks5;
|
||||||
|
mod utils;
|
||||||
|
|
||||||
|
pub use crate::{
|
||||||
|
config::{Config, ConfigError},
|
||||||
|
connection::Connection,
|
||||||
|
error::Error,
|
||||||
|
socks5::Server as Socks5Server,
|
||||||
|
utils::{CongestionControl, ServerAddr, UdpRelayMode},
|
||||||
|
};
|
@ -1,19 +1,6 @@
|
|||||||
use self::{
|
|
||||||
config::{Config, ConfigError},
|
|
||||||
connection::Endpoint,
|
|
||||||
socks5::Server as Socks5Server,
|
|
||||||
};
|
|
||||||
use env_logger::Builder as LoggerBuilder;
|
use env_logger::Builder as LoggerBuilder;
|
||||||
use quinn::{ConnectError, ConnectionError};
|
use std::{env, process};
|
||||||
use rustls::Error as RustlsError;
|
use tuic_client::{Config, ConfigError, Connection, Socks5Server};
|
||||||
use std::{env, io::Error as IoError, process};
|
|
||||||
use thiserror::Error;
|
|
||||||
use tuic_quinn::Error as ModelError;
|
|
||||||
|
|
||||||
mod config;
|
|
||||||
mod connection;
|
|
||||||
mod socks5;
|
|
||||||
mod utils;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
@ -35,7 +22,7 @@ async fn main() {
|
|||||||
.format_target(false)
|
.format_target(false)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
match Endpoint::set_config(cfg.relay) {
|
match Connection::set_config(cfg.relay) {
|
||||||
Ok(()) => {}
|
Ok(()) => {}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
eprintln!("{err}");
|
eprintln!("{err}");
|
||||||
@ -53,29 +40,3 @@ async fn main() {
|
|||||||
|
|
||||||
Socks5Server::start().await;
|
Socks5Server::start().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum Error {
|
|
||||||
#[error(transparent)]
|
|
||||||
Io(#[from] IoError),
|
|
||||||
#[error(transparent)]
|
|
||||||
Connect(#[from] ConnectError),
|
|
||||||
#[error(transparent)]
|
|
||||||
Connection(#[from] ConnectionError),
|
|
||||||
#[error(transparent)]
|
|
||||||
Model(#[from] ModelError),
|
|
||||||
#[error("load native certificates error: {0}")]
|
|
||||||
LoadNativeCerts(IoError),
|
|
||||||
#[error(transparent)]
|
|
||||||
Rustls(#[from] RustlsError),
|
|
||||||
#[error("{0}: {1}")]
|
|
||||||
Socket(&'static str, IoError),
|
|
||||||
#[error("timeout establishing connection")]
|
|
||||||
Timeout,
|
|
||||||
#[error("cannot resolve the server name")]
|
|
||||||
DnsResolve,
|
|
||||||
#[error("received packet from an unexpected source")]
|
|
||||||
WrongPacketSource,
|
|
||||||
#[error("invalid socks5 authentication")]
|
|
||||||
InvalidSocks5Auth,
|
|
||||||
}
|
|
||||||
|
@ -1,460 +0,0 @@
|
|||||||
use crate::{
|
|
||||||
config::Local,
|
|
||||||
connection::{Connection as TuicConnection, ERROR_CODE},
|
|
||||||
Error,
|
|
||||||
};
|
|
||||||
use bytes::Bytes;
|
|
||||||
use once_cell::sync::OnceCell;
|
|
||||||
use parking_lot::Mutex;
|
|
||||||
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
|
|
||||||
use socks5_proto::{Address, Reply};
|
|
||||||
use socks5_server::{
|
|
||||||
auth::{NoAuth, Password},
|
|
||||||
connection::{associate, bind, connect},
|
|
||||||
Associate, AssociatedUdpSocket, Auth, Bind, Connect, Connection, Server as Socks5Server,
|
|
||||||
};
|
|
||||||
use std::{
|
|
||||||
collections::HashMap,
|
|
||||||
io::{Error as IoError, ErrorKind},
|
|
||||||
net::{IpAddr, SocketAddr, TcpListener as StdTcpListener, UdpSocket as StdUdpSocket},
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicU16, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use tokio::{
|
|
||||||
io::{self, AsyncWriteExt},
|
|
||||||
net::{TcpListener, UdpSocket},
|
|
||||||
};
|
|
||||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
|
||||||
use tuic::Address as TuicAddress;
|
|
||||||
|
|
||||||
static SERVER: OnceCell<Server> = OnceCell::new();
|
|
||||||
pub static UDP_SESSIONS: OnceCell<Mutex<HashMap<u16, UdpSession>>> = OnceCell::new();
|
|
||||||
|
|
||||||
pub struct Server {
|
|
||||||
inner: Socks5Server,
|
|
||||||
dual_stack: Option<bool>,
|
|
||||||
max_pkt_size: usize,
|
|
||||||
next_assoc_id: AtomicU16,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Server {
|
|
||||||
pub fn set_config(cfg: Local) -> Result<(), Error> {
|
|
||||||
SERVER
|
|
||||||
.set(Self::new(
|
|
||||||
cfg.server,
|
|
||||||
cfg.dual_stack,
|
|
||||||
cfg.max_packet_size,
|
|
||||||
cfg.username.map(|s| s.into_bytes()),
|
|
||||||
cfg.password.map(|s| s.into_bytes()),
|
|
||||||
)?)
|
|
||||||
.map_err(|_| "failed initializing socks5 server")
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
UDP_SESSIONS
|
|
||||||
.set(Mutex::new(HashMap::new()))
|
|
||||||
.map_err(|_| "failed initializing socks5 UDP session pool")
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new(
|
|
||||||
addr: SocketAddr,
|
|
||||||
dual_stack: Option<bool>,
|
|
||||||
max_pkt_size: usize,
|
|
||||||
username: Option<Vec<u8>>,
|
|
||||||
password: Option<Vec<u8>>,
|
|
||||||
) -> Result<Self, Error> {
|
|
||||||
let socket = {
|
|
||||||
let domain = match addr {
|
|
||||||
SocketAddr::V4(_) => Domain::IPV4,
|
|
||||||
SocketAddr::V6(_) => Domain::IPV6,
|
|
||||||
};
|
|
||||||
|
|
||||||
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
|
|
||||||
.map_err(|err| Error::Socket("failed to create socks5 server socket", err))?;
|
|
||||||
|
|
||||||
if let Some(dual_stack) = dual_stack {
|
|
||||||
socket.set_only_v6(!dual_stack).map_err(|err| {
|
|
||||||
Error::Socket("socks5 server dual-stack socket setting error", err)
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
|
|
||||||
socket.set_reuse_address(true).map_err(|err| {
|
|
||||||
Error::Socket("failed to set socks5 server socket to reuse_address", err)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
socket.set_nonblocking(true).map_err(|err| {
|
|
||||||
Error::Socket("failed setting socks5 server socket as non-blocking", err)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
socket
|
|
||||||
.bind(&SockAddr::from(addr))
|
|
||||||
.map_err(|err| Error::Socket("failed to bind socks5 server socket", err))?;
|
|
||||||
|
|
||||||
socket
|
|
||||||
.listen(i32::MAX)
|
|
||||||
.map_err(|err| Error::Socket("failed to listen on socks5 server socket", err))?;
|
|
||||||
|
|
||||||
TcpListener::from_std(StdTcpListener::from(socket))
|
|
||||||
.map_err(|err| Error::Socket("failed to create socks5 server socket", err))?
|
|
||||||
};
|
|
||||||
|
|
||||||
let auth: Arc<dyn Auth + Send + Sync> = match (username, password) {
|
|
||||||
(Some(username), Some(password)) => Arc::new(Password::new(username, password)),
|
|
||||||
(None, None) => Arc::new(NoAuth),
|
|
||||||
_ => return Err(Error::InvalidSocks5Auth),
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
inner: Socks5Server::new(socket, auth),
|
|
||||||
dual_stack,
|
|
||||||
max_pkt_size,
|
|
||||||
next_assoc_id: AtomicU16::new(0),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start() {
|
|
||||||
let server = SERVER.get().unwrap();
|
|
||||||
|
|
||||||
log::warn!(
|
|
||||||
"[socks5] server started, listening on {}",
|
|
||||||
server.inner.local_addr().unwrap()
|
|
||||||
);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match server.inner.accept().await {
|
|
||||||
Ok((conn, addr)) => {
|
|
||||||
log::debug!("[socks5] [{addr}] connection established");
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
match conn.handshake().await {
|
|
||||||
Ok(Connection::Associate(associate, _)) => {
|
|
||||||
let assoc_id = server.next_assoc_id.fetch_add(1, Ordering::Relaxed);
|
|
||||||
log::info!("[socks5] [{addr}] [associate] [{assoc_id:#06x}]");
|
|
||||||
Self::handle_associate(
|
|
||||||
associate,
|
|
||||||
assoc_id,
|
|
||||||
server.dual_stack,
|
|
||||||
server.max_pkt_size,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
Ok(Connection::Bind(bind, _)) => {
|
|
||||||
log::info!("[socks5] [{addr}] [bind]");
|
|
||||||
Self::handle_bind(bind).await;
|
|
||||||
}
|
|
||||||
Ok(Connection::Connect(connect, target_addr)) => {
|
|
||||||
log::info!("[socks5] [{addr}] [connect] {target_addr}");
|
|
||||||
Self::handle_connect(connect, target_addr).await;
|
|
||||||
}
|
|
||||||
Err(err) => log::warn!("[socks5] [{addr}] handshake error: {err}"),
|
|
||||||
};
|
|
||||||
|
|
||||||
log::debug!("[socks5] [{addr}] connection closed");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Err(err) => log::warn!("[socks5] failed to establish connection: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_associate(
|
|
||||||
assoc: Associate<associate::NeedReply>,
|
|
||||||
assoc_id: u16,
|
|
||||||
dual_stack: Option<bool>,
|
|
||||||
max_pkt_size: usize,
|
|
||||||
) {
|
|
||||||
let peer_addr = assoc.peer_addr().unwrap();
|
|
||||||
let local_ip = assoc.local_addr().unwrap().ip();
|
|
||||||
|
|
||||||
match UdpSession::new(assoc_id, peer_addr, local_ip, dual_stack, max_pkt_size) {
|
|
||||||
Ok(session) => {
|
|
||||||
let local_addr = session.local_addr().unwrap();
|
|
||||||
log::debug!(
|
|
||||||
"[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] bound to {local_addr}"
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut assoc = match assoc
|
|
||||||
.reply(Reply::Succeeded, Address::SocketAddress(local_addr))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(assoc) => assoc,
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
UDP_SESSIONS
|
|
||||||
.get()
|
|
||||||
.unwrap()
|
|
||||||
.lock()
|
|
||||||
.insert(assoc_id, session.clone());
|
|
||||||
|
|
||||||
let handle_local_incoming_pkt = async move {
|
|
||||||
loop {
|
|
||||||
let (pkt, target_addr) = match session.recv().await {
|
|
||||||
Ok(res) => res,
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed to receive UDP packet: {err}");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let forward = async move {
|
|
||||||
let target_addr = match target_addr {
|
|
||||||
Address::DomainAddress(domain, port) => {
|
|
||||||
TuicAddress::DomainAddress(domain, port)
|
|
||||||
}
|
|
||||||
Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr),
|
|
||||||
};
|
|
||||||
|
|
||||||
match TuicConnection::get().await {
|
|
||||||
Ok(conn) => conn.packet(pkt, target_addr, assoc_id).await,
|
|
||||||
Err(err) => Err(err),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
match forward.await {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed relaying UDP packet: {err}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match tokio::select! {
|
|
||||||
res = assoc.wait_until_closed() => res,
|
|
||||||
_ = handle_local_incoming_pkt => unreachable!(),
|
|
||||||
} {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] associate connection error: {err}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!(
|
|
||||||
"[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] stopped associating"
|
|
||||||
);
|
|
||||||
|
|
||||||
UDP_SESSIONS
|
|
||||||
.get()
|
|
||||||
.unwrap()
|
|
||||||
.lock()
|
|
||||||
.remove(&assoc_id)
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let res = match TuicConnection::get().await {
|
|
||||||
Ok(conn) => conn.dissociate(assoc_id).await,
|
|
||||||
Err(err) => Err(err),
|
|
||||||
};
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(err) => log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed stoping UDP relaying session: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed setting up UDP associate session: {err}");
|
|
||||||
|
|
||||||
match assoc
|
|
||||||
.reply(Reply::GeneralFailure, Address::unspecified())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(mut assoc) => {
|
|
||||||
let _ = assoc.shutdown().await;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_bind(bind: Bind<bind::NeedFirstReply>) {
|
|
||||||
let peer_addr = bind.peer_addr().unwrap();
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [bind] command not supported");
|
|
||||||
|
|
||||||
match bind
|
|
||||||
.reply(Reply::CommandNotSupported, Address::unspecified())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(mut bind) => {
|
|
||||||
let _ = bind.shutdown().await;
|
|
||||||
}
|
|
||||||
Err(err) => log::warn!("[socks5] [{peer_addr}] [bind] command reply error: {err}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_connect(conn: Connect<connect::NeedReply>, addr: Address) {
|
|
||||||
let peer_addr = conn.peer_addr().unwrap();
|
|
||||||
let target_addr = match addr {
|
|
||||||
Address::DomainAddress(domain, port) => TuicAddress::DomainAddress(domain, port),
|
|
||||||
Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr),
|
|
||||||
};
|
|
||||||
|
|
||||||
let relay = match TuicConnection::get().await {
|
|
||||||
Ok(conn) => conn.connect(target_addr.clone()).await,
|
|
||||||
Err(err) => Err(err),
|
|
||||||
};
|
|
||||||
|
|
||||||
match relay {
|
|
||||||
Ok(relay) => {
|
|
||||||
let mut relay = relay.compat();
|
|
||||||
|
|
||||||
match conn.reply(Reply::Succeeded, Address::unspecified()).await {
|
|
||||||
Ok(mut conn) => match io::copy_bidirectional(&mut conn, &mut relay).await {
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(err) => {
|
|
||||||
let _ = conn.shutdown().await;
|
|
||||||
let _ = relay.get_mut().reset(ERROR_CODE);
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] TCP stream relaying error: {err}");
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
let _ = relay.shutdown().await;
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] unable to relay TCP stream: {err}");
|
|
||||||
|
|
||||||
match conn
|
|
||||||
.reply(Reply::GeneralFailure, Address::unspecified())
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(mut conn) => {
|
|
||||||
let _ = conn.shutdown().await;
|
|
||||||
}
|
|
||||||
Err(err) => {
|
|
||||||
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct UdpSession {
|
|
||||||
socket: Arc<AssociatedUdpSocket>,
|
|
||||||
assoc_id: u16,
|
|
||||||
ctrl_addr: SocketAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UdpSession {
|
|
||||||
fn new(
|
|
||||||
assoc_id: u16,
|
|
||||||
ctrl_addr: SocketAddr,
|
|
||||||
local_ip: IpAddr,
|
|
||||||
dual_stack: Option<bool>,
|
|
||||||
max_pkt_size: usize,
|
|
||||||
) -> Result<Self, Error> {
|
|
||||||
let domain = match local_ip {
|
|
||||||
IpAddr::V4(_) => Domain::IPV4,
|
|
||||||
IpAddr::V6(_) => Domain::IPV6,
|
|
||||||
};
|
|
||||||
|
|
||||||
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)).map_err(|err| {
|
|
||||||
Error::Socket("failed to create socks5 server UDP associate socket", err)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if let Some(dual_stack) = dual_stack {
|
|
||||||
socket.set_only_v6(!dual_stack).map_err(|err| {
|
|
||||||
Error::Socket(
|
|
||||||
"socks5 server UDP associate dual-stack socket setting error",
|
|
||||||
err,
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
|
|
||||||
socket.set_nonblocking(true).map_err(|err| {
|
|
||||||
Error::Socket(
|
|
||||||
"failed setting socks5 server UDP associate socket as non-blocking",
|
|
||||||
err,
|
|
||||||
)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
socket
|
|
||||||
.bind(&SockAddr::from(SocketAddr::from((local_ip, 0))))
|
|
||||||
.map_err(|err| {
|
|
||||||
Error::Socket("failed to bind socks5 server UDP associate socket", err)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let socket = UdpSocket::from_std(StdUdpSocket::from(socket)).map_err(|err| {
|
|
||||||
Error::Socket("failed to create socks5 server UDP associate socket", err)
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(Self {
|
|
||||||
socket: Arc::new(AssociatedUdpSocket::from((socket, max_pkt_size))),
|
|
||||||
assoc_id,
|
|
||||||
ctrl_addr,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn send(&self, pkt: Bytes, src_addr: Address) -> Result<(), Error> {
|
|
||||||
let src_addr_display = src_addr.to_string();
|
|
||||||
|
|
||||||
log::debug!(
|
|
||||||
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr}",
|
|
||||||
ctrl_addr = self.ctrl_addr,
|
|
||||||
assoc_id = self.assoc_id,
|
|
||||||
dst_addr = self.socket.peer_addr().unwrap(),
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Err(err) = self.socket.send(pkt, 0, src_addr).await {
|
|
||||||
log::warn!(
|
|
||||||
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr} error: {err}",
|
|
||||||
ctrl_addr = self.ctrl_addr,
|
|
||||||
assoc_id = self.assoc_id,
|
|
||||||
dst_addr = self.socket.peer_addr().unwrap(),
|
|
||||||
);
|
|
||||||
|
|
||||||
return Err(Error::Io(err));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn recv(&self) -> Result<(Bytes, Address), Error> {
|
|
||||||
let (pkt, frag, dst_addr, src_addr) = self.socket.recv_from().await?;
|
|
||||||
|
|
||||||
if let Ok(connected_addr) = self.socket.peer_addr() {
|
|
||||||
if src_addr != connected_addr {
|
|
||||||
Err(IoError::new(
|
|
||||||
ErrorKind::Other,
|
|
||||||
format!("invalid source address: {src_addr}"),
|
|
||||||
))?;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.socket.connect(src_addr).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if frag != 0 {
|
|
||||||
Err(IoError::new(
|
|
||||||
ErrorKind::Other,
|
|
||||||
"fragmented packet is not supported",
|
|
||||||
))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!(
|
|
||||||
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] receive packet from {src_addr} to {dst_addr}",
|
|
||||||
ctrl_addr = self.ctrl_addr,
|
|
||||||
assoc_id = self.assoc_id
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok((pkt, dst_addr))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn local_addr(&self) -> Result<SocketAddr, IoError> {
|
|
||||||
self.socket.local_addr()
|
|
||||||
}
|
|
||||||
}
|
|
193
tuic-client/src/socks5/handle_task.rs
Normal file
193
tuic-client/src/socks5/handle_task.rs
Normal file
@ -0,0 +1,193 @@
|
|||||||
|
use super::{udp_session::UdpSession, Server, UDP_SESSIONS};
|
||||||
|
use crate::connection::{Connection as TuicConnection, ERROR_CODE};
|
||||||
|
use socks5_proto::{Address, Reply};
|
||||||
|
use socks5_server::{
|
||||||
|
connection::{associate, bind, connect},
|
||||||
|
Associate, Bind, Connect,
|
||||||
|
};
|
||||||
|
use tokio::io::{self, AsyncWriteExt};
|
||||||
|
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||||
|
use tuic::Address as TuicAddress;
|
||||||
|
|
||||||
|
impl Server {
|
||||||
|
pub(super) async fn handle_associate(
|
||||||
|
assoc: Associate<associate::NeedReply>,
|
||||||
|
assoc_id: u16,
|
||||||
|
dual_stack: Option<bool>,
|
||||||
|
max_pkt_size: usize,
|
||||||
|
) {
|
||||||
|
let peer_addr = assoc.peer_addr().unwrap();
|
||||||
|
let local_ip = assoc.local_addr().unwrap().ip();
|
||||||
|
|
||||||
|
match UdpSession::new(assoc_id, peer_addr, local_ip, dual_stack, max_pkt_size) {
|
||||||
|
Ok(session) => {
|
||||||
|
let local_addr = session.local_addr().unwrap();
|
||||||
|
log::debug!(
|
||||||
|
"[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] bound to {local_addr}"
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut assoc = match assoc
|
||||||
|
.reply(Reply::Succeeded, Address::SocketAddress(local_addr))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(assoc) => assoc,
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
UDP_SESSIONS
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.lock()
|
||||||
|
.insert(assoc_id, session.clone());
|
||||||
|
|
||||||
|
let handle_local_incoming_pkt = async move {
|
||||||
|
loop {
|
||||||
|
let (pkt, target_addr) = match session.recv().await {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed to receive UDP packet: {err}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let forward = async move {
|
||||||
|
let target_addr = match target_addr {
|
||||||
|
Address::DomainAddress(domain, port) => {
|
||||||
|
TuicAddress::DomainAddress(domain, port)
|
||||||
|
}
|
||||||
|
Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr),
|
||||||
|
};
|
||||||
|
|
||||||
|
match TuicConnection::get().await {
|
||||||
|
Ok(conn) => conn.packet(pkt, target_addr, assoc_id).await,
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
match forward.await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed relaying UDP packet: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match tokio::select! {
|
||||||
|
res = assoc.wait_until_closed() => res,
|
||||||
|
_ = handle_local_incoming_pkt => unreachable!(),
|
||||||
|
} {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] associate connection error: {err}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
"[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] stopped associating"
|
||||||
|
);
|
||||||
|
|
||||||
|
UDP_SESSIONS
|
||||||
|
.get()
|
||||||
|
.unwrap()
|
||||||
|
.lock()
|
||||||
|
.remove(&assoc_id)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let res = match TuicConnection::get().await {
|
||||||
|
Ok(conn) => conn.dissociate(assoc_id).await,
|
||||||
|
Err(err) => Err(err),
|
||||||
|
};
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(err) => log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed stoping UDP relaying session: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed setting up UDP associate session: {err}");
|
||||||
|
|
||||||
|
match assoc
|
||||||
|
.reply(Reply::GeneralFailure, Address::unspecified())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(mut assoc) => {
|
||||||
|
let _ = assoc.shutdown().await;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn handle_bind(bind: Bind<bind::NeedFirstReply>) {
|
||||||
|
let peer_addr = bind.peer_addr().unwrap();
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [bind] command not supported");
|
||||||
|
|
||||||
|
match bind
|
||||||
|
.reply(Reply::CommandNotSupported, Address::unspecified())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(mut bind) => {
|
||||||
|
let _ = bind.shutdown().await;
|
||||||
|
}
|
||||||
|
Err(err) => log::warn!("[socks5] [{peer_addr}] [bind] command reply error: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn handle_connect(conn: Connect<connect::NeedReply>, addr: Address) {
|
||||||
|
let peer_addr = conn.peer_addr().unwrap();
|
||||||
|
let target_addr = match addr {
|
||||||
|
Address::DomainAddress(domain, port) => TuicAddress::DomainAddress(domain, port),
|
||||||
|
Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr),
|
||||||
|
};
|
||||||
|
|
||||||
|
let relay = match TuicConnection::get().await {
|
||||||
|
Ok(conn) => conn.connect(target_addr.clone()).await,
|
||||||
|
Err(err) => Err(err),
|
||||||
|
};
|
||||||
|
|
||||||
|
match relay {
|
||||||
|
Ok(relay) => {
|
||||||
|
let mut relay = relay.compat();
|
||||||
|
|
||||||
|
match conn.reply(Reply::Succeeded, Address::unspecified()).await {
|
||||||
|
Ok(mut conn) => match io::copy_bidirectional(&mut conn, &mut relay).await {
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(err) => {
|
||||||
|
let _ = conn.shutdown().await;
|
||||||
|
let _ = relay.get_mut().reset(ERROR_CODE);
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] TCP stream relaying error: {err}");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
let _ = relay.shutdown().await;
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] unable to relay TCP stream: {err}");
|
||||||
|
|
||||||
|
match conn
|
||||||
|
.reply(Reply::GeneralFailure, Address::unspecified())
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(mut conn) => {
|
||||||
|
let _ = conn.shutdown().await;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
154
tuic-client/src/socks5/mod.rs
Normal file
154
tuic-client/src/socks5/mod.rs
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
use crate::{config::Local, Error};
|
||||||
|
use once_cell::sync::OnceCell;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
|
||||||
|
use socks5_server::{
|
||||||
|
auth::{NoAuth, Password},
|
||||||
|
Auth, Connection, Server as Socks5Server,
|
||||||
|
};
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
net::{SocketAddr, TcpListener as StdTcpListener},
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicU16, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
|
mod handle_task;
|
||||||
|
mod udp_session;
|
||||||
|
|
||||||
|
pub(crate) use self::udp_session::UDP_SESSIONS;
|
||||||
|
|
||||||
|
static SERVER: OnceCell<Server> = OnceCell::new();
|
||||||
|
|
||||||
|
pub struct Server {
|
||||||
|
inner: Socks5Server,
|
||||||
|
dual_stack: Option<bool>,
|
||||||
|
max_pkt_size: usize,
|
||||||
|
next_assoc_id: AtomicU16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Server {
|
||||||
|
pub fn set_config(cfg: Local) -> Result<(), Error> {
|
||||||
|
SERVER
|
||||||
|
.set(Self::new(
|
||||||
|
cfg.server,
|
||||||
|
cfg.dual_stack,
|
||||||
|
cfg.max_packet_size,
|
||||||
|
cfg.username.map(|s| s.into_bytes()),
|
||||||
|
cfg.password.map(|s| s.into_bytes()),
|
||||||
|
)?)
|
||||||
|
.map_err(|_| "failed initializing socks5 server")
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
UDP_SESSIONS
|
||||||
|
.set(Mutex::new(HashMap::new()))
|
||||||
|
.map_err(|_| "failed initializing socks5 UDP session pool")
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn new(
|
||||||
|
addr: SocketAddr,
|
||||||
|
dual_stack: Option<bool>,
|
||||||
|
max_pkt_size: usize,
|
||||||
|
username: Option<Vec<u8>>,
|
||||||
|
password: Option<Vec<u8>>,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
let socket = {
|
||||||
|
let domain = match addr {
|
||||||
|
SocketAddr::V4(_) => Domain::IPV4,
|
||||||
|
SocketAddr::V6(_) => Domain::IPV6,
|
||||||
|
};
|
||||||
|
|
||||||
|
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
|
||||||
|
.map_err(|err| Error::Socket("failed to create socks5 server socket", err))?;
|
||||||
|
|
||||||
|
if let Some(dual_stack) = dual_stack {
|
||||||
|
socket.set_only_v6(!dual_stack).map_err(|err| {
|
||||||
|
Error::Socket("socks5 server dual-stack socket setting error", err)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.set_reuse_address(true).map_err(|err| {
|
||||||
|
Error::Socket("failed to set socks5 server socket to reuse_address", err)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
socket.set_nonblocking(true).map_err(|err| {
|
||||||
|
Error::Socket("failed setting socks5 server socket as non-blocking", err)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
socket
|
||||||
|
.bind(&SockAddr::from(addr))
|
||||||
|
.map_err(|err| Error::Socket("failed to bind socks5 server socket", err))?;
|
||||||
|
|
||||||
|
socket
|
||||||
|
.listen(i32::MAX)
|
||||||
|
.map_err(|err| Error::Socket("failed to listen on socks5 server socket", err))?;
|
||||||
|
|
||||||
|
TcpListener::from_std(StdTcpListener::from(socket))
|
||||||
|
.map_err(|err| Error::Socket("failed to create socks5 server socket", err))?
|
||||||
|
};
|
||||||
|
|
||||||
|
let auth: Arc<dyn Auth + Send + Sync> = match (username, password) {
|
||||||
|
(Some(username), Some(password)) => Arc::new(Password::new(username, password)),
|
||||||
|
(None, None) => Arc::new(NoAuth),
|
||||||
|
_ => return Err(Error::InvalidSocks5Auth),
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
inner: Socks5Server::new(socket, auth),
|
||||||
|
dual_stack,
|
||||||
|
max_pkt_size,
|
||||||
|
next_assoc_id: AtomicU16::new(0),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn start() {
|
||||||
|
let server = SERVER.get().unwrap();
|
||||||
|
|
||||||
|
log::warn!(
|
||||||
|
"[socks5] server started, listening on {}",
|
||||||
|
server.inner.local_addr().unwrap()
|
||||||
|
);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match server.inner.accept().await {
|
||||||
|
Ok((conn, addr)) => {
|
||||||
|
log::debug!("[socks5] [{addr}] connection established");
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
match conn.handshake().await {
|
||||||
|
Ok(Connection::Associate(associate, _)) => {
|
||||||
|
let assoc_id = server.next_assoc_id.fetch_add(1, Ordering::Relaxed);
|
||||||
|
log::info!("[socks5] [{addr}] [associate] [{assoc_id:#06x}]");
|
||||||
|
Self::handle_associate(
|
||||||
|
associate,
|
||||||
|
assoc_id,
|
||||||
|
server.dual_stack,
|
||||||
|
server.max_pkt_size,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Ok(Connection::Bind(bind, _)) => {
|
||||||
|
log::info!("[socks5] [{addr}] [bind]");
|
||||||
|
Self::handle_bind(bind).await;
|
||||||
|
}
|
||||||
|
Ok(Connection::Connect(connect, target_addr)) => {
|
||||||
|
log::info!("[socks5] [{addr}] [connect] {target_addr}");
|
||||||
|
Self::handle_connect(connect, target_addr).await;
|
||||||
|
}
|
||||||
|
Err(err) => log::warn!("[socks5] [{addr}] handshake error: {err}"),
|
||||||
|
};
|
||||||
|
|
||||||
|
log::debug!("[socks5] [{addr}] connection closed");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(err) => log::warn!("[socks5] failed to establish connection: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
132
tuic-client/src/socks5/udp_session.rs
Normal file
132
tuic-client/src/socks5/udp_session.rs
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
use crate::Error;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use once_cell::sync::OnceCell;
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
|
||||||
|
use socks5_proto::Address;
|
||||||
|
use socks5_server::AssociatedUdpSocket;
|
||||||
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
io::{Error as IoError, ErrorKind},
|
||||||
|
net::{IpAddr, SocketAddr, UdpSocket as StdUdpSocket},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
use tokio::net::UdpSocket;
|
||||||
|
|
||||||
|
pub(crate) static UDP_SESSIONS: OnceCell<Mutex<HashMap<u16, UdpSession>>> = OnceCell::new();
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct UdpSession {
|
||||||
|
socket: Arc<AssociatedUdpSocket>,
|
||||||
|
assoc_id: u16,
|
||||||
|
ctrl_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UdpSession {
|
||||||
|
pub(super) fn new(
|
||||||
|
assoc_id: u16,
|
||||||
|
ctrl_addr: SocketAddr,
|
||||||
|
local_ip: IpAddr,
|
||||||
|
dual_stack: Option<bool>,
|
||||||
|
max_pkt_size: usize,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
let domain = match local_ip {
|
||||||
|
IpAddr::V4(_) => Domain::IPV4,
|
||||||
|
IpAddr::V6(_) => Domain::IPV6,
|
||||||
|
};
|
||||||
|
|
||||||
|
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)).map_err(|err| {
|
||||||
|
Error::Socket("failed to create socks5 server UDP associate socket", err)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if let Some(dual_stack) = dual_stack {
|
||||||
|
socket.set_only_v6(!dual_stack).map_err(|err| {
|
||||||
|
Error::Socket(
|
||||||
|
"socks5 server UDP associate dual-stack socket setting error",
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
socket.set_nonblocking(true).map_err(|err| {
|
||||||
|
Error::Socket(
|
||||||
|
"failed setting socks5 server UDP associate socket as non-blocking",
|
||||||
|
err,
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
socket
|
||||||
|
.bind(&SockAddr::from(SocketAddr::from((local_ip, 0))))
|
||||||
|
.map_err(|err| {
|
||||||
|
Error::Socket("failed to bind socks5 server UDP associate socket", err)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let socket = UdpSocket::from_std(StdUdpSocket::from(socket)).map_err(|err| {
|
||||||
|
Error::Socket("failed to create socks5 server UDP associate socket", err)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
socket: Arc::new(AssociatedUdpSocket::from((socket, max_pkt_size))),
|
||||||
|
assoc_id,
|
||||||
|
ctrl_addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn send(&self, pkt: Bytes, src_addr: Address) -> Result<(), Error> {
|
||||||
|
let src_addr_display = src_addr.to_string();
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr}",
|
||||||
|
ctrl_addr = self.ctrl_addr,
|
||||||
|
assoc_id = self.assoc_id,
|
||||||
|
dst_addr = self.socket.peer_addr().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Err(err) = self.socket.send(pkt, 0, src_addr).await {
|
||||||
|
log::warn!(
|
||||||
|
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr} error: {err}",
|
||||||
|
ctrl_addr = self.ctrl_addr,
|
||||||
|
assoc_id = self.assoc_id,
|
||||||
|
dst_addr = self.socket.peer_addr().unwrap(),
|
||||||
|
);
|
||||||
|
|
||||||
|
return Err(Error::Io(err));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn recv(&self) -> Result<(Bytes, Address), Error> {
|
||||||
|
let (pkt, frag, dst_addr, src_addr) = self.socket.recv_from().await?;
|
||||||
|
|
||||||
|
if let Ok(connected_addr) = self.socket.peer_addr() {
|
||||||
|
if src_addr != connected_addr {
|
||||||
|
Err(IoError::new(
|
||||||
|
ErrorKind::Other,
|
||||||
|
format!("invalid source address: {src_addr}"),
|
||||||
|
))?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.socket.connect(src_addr).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if frag != 0 {
|
||||||
|
Err(IoError::new(
|
||||||
|
ErrorKind::Other,
|
||||||
|
"fragmented packet is not supported",
|
||||||
|
))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
log::debug!(
|
||||||
|
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] receive packet from {src_addr} to {dst_addr}",
|
||||||
|
ctrl_addr = self.ctrl_addr,
|
||||||
|
assoc_id = self.assoc_id
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok((pkt, dst_addr))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn local_addr(&self) -> Result<SocketAddr, IoError> {
|
||||||
|
self.socket.local_addr()
|
||||||
|
}
|
||||||
|
}
|
@ -10,7 +10,10 @@ use std::{
|
|||||||
};
|
};
|
||||||
use tokio::net;
|
use tokio::net;
|
||||||
|
|
||||||
pub fn load_certs(paths: Vec<PathBuf>, disable_native: bool) -> Result<RootCertStore, Error> {
|
pub(crate) fn load_certs(
|
||||||
|
paths: Vec<PathBuf>,
|
||||||
|
disable_native: bool,
|
||||||
|
) -> Result<RootCertStore, Error> {
|
||||||
let mut certs = RootCertStore::empty();
|
let mut certs = RootCertStore::empty();
|
||||||
|
|
||||||
for path in &paths {
|
for path in &paths {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user