1
0

refactor socks5 udp session & better log info

This commit is contained in:
EAimTY 2023-05-29 18:24:16 +09:00
parent f21a0d43f8
commit e2a505aba7
3 changed files with 403 additions and 273 deletions

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
config::Relay, config::Relay,
socks5::Server as Socks5Server, socks5::UDP_SESSIONS as SOCKS5_UDP_SESSIONS,
utils::{self, CongestionControl, ServerAddr, UdpRelayMode}, utils::{self, CongestionControl, ServerAddr, UdpRelayMode},
Error, Error,
}; };
@ -29,7 +29,7 @@ use tokio::{
time, time,
}; };
use tuic::Address; use tuic::Address;
use tuic_quinn::{side, Connect, Connection as Model, Task}; use tuic_quinn::{side, Connect, Connection as Model, Packet, Task};
use uuid::Uuid; use uuid::Uuid;
static ENDPOINT: OnceCell<Mutex<Endpoint>> = OnceCell::new(); static ENDPOINT: OnceCell<Mutex<Endpoint>> = OnceCell::new();
@ -158,12 +158,7 @@ impl Endpoint {
let conn = if zero_rtt_handshake { let conn = if zero_rtt_handshake {
match conn.into_0rtt() { match conn.into_0rtt() {
Ok((conn, _)) => conn, Ok((conn, _)) => conn,
Err(conn) => { Err(conn) => conn.await?,
log::info!(
"[connection] 0-RTT handshake failed, fallback to 1-RTT handshake"
);
conn.await?
}
} }
} else { } else {
conn.await? conn.await?
@ -185,8 +180,6 @@ impl Endpoint {
match res { match res {
Ok(conn) => { Ok(conn) => {
log::info!("[connection] connection established");
return Ok(Connection::new( return Ok(Connection::new(
conn, conn,
self.udp_relay_mode, self.udp_relay_mode,
@ -279,6 +272,8 @@ impl Connection {
} }
async fn init(self, heartbeat: Duration, gc_interval: Duration, gc_lifetime: Duration) { async fn init(self, heartbeat: Duration, gc_interval: Duration, gc_lifetime: Duration) {
log::info!("[relay] connection established");
tokio::spawn(self.clone().authenticate()); tokio::spawn(self.clone().authenticate());
tokio::spawn(self.clone().heartbeat(heartbeat)); tokio::spawn(self.clone().heartbeat(heartbeat));
tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime)); tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime));
@ -300,7 +295,7 @@ impl Connection {
}; };
}; };
log::error!("[connection] {err}"); log::warn!("[relay] connection error: {err}");
} }
pub async fn connect(&self, addr: Address) -> Result<Connect, Error> { pub async fn connect(&self, addr: Address) -> Result<Connect, Error> {
@ -362,88 +357,85 @@ impl Connection {
} }
async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) { async fn handle_uni_stream(self, recv: RecvStream, _reg: Register) {
log::debug!("[connection] incoming unidirectional stream"); log::debug!("[relay] incoming unidirectional stream");
let res = match self.model.accept_uni_stream(recv).await { let res = match self.model.accept_uni_stream(recv).await {
Err(err) => Err(Error::from(err)), Err(err) => Err(Error::Model(err)),
Ok(Task::Packet(pkt)) => match self.udp_relay_mode { Ok(Task::Packet(pkt)) => match self.udp_relay_mode {
UdpRelayMode::Quic => match pkt.accept().await { UdpRelayMode::Quic => {
Ok(Some((pkt, addr, assoc_id))) => { log::debug!(
let addr = match addr { "[relay] [packet] [{assoc_id:#06x}] [from-quic] [{pkt_id:#06x}] {frag_id}/{frag_total}",
Address::None => unreachable!(), assoc_id = pkt.assoc_id(),
Address::DomainAddress(domain, port) => { pkt_id = pkt.pkt_id(),
Socks5Address::DomainAddress(domain, port) frag_id = pkt.frag_id(),
} frag_total = pkt.frag_total(),
Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr), );
}; Self::handle_packet(pkt).await;
Socks5Server::recv_pkt(pkt, addr, assoc_id).await; Ok(())
Ok(()) }
}
Ok(None) => Ok(()),
Err(err) => Err(Error::from(err)),
},
UdpRelayMode::Native => Err(Error::WrongPacketSource), UdpRelayMode::Native => Err(Error::WrongPacketSource),
}, },
_ => unreachable!(), _ => unreachable!(), // already filtered in `tuic_quinn`
}; };
match res { match res {
Ok(()) => {} Ok(()) => {}
Err(err) => log::error!("[connection] {err}"), Err(err) => log::warn!("[relay] incoming unidirectional stream error: {err}"),
} }
} }
async fn handle_bi_stream(self, send: SendStream, recv: RecvStream, _reg: Register) { async fn handle_bi_stream(self, send: SendStream, recv: RecvStream, _reg: Register) {
log::debug!("[connection] incoming bidirectional stream"); log::debug!("[relay] incoming bidirectional stream");
let res = match self.model.accept_bi_stream(send, recv).await { let res = match self.model.accept_bi_stream(send, recv).await {
Err(err) => Err(Error::from(err)), Err(err) => Err(Error::Model(err)),
_ => unreachable!(), _ => unreachable!(), // already filtered in `tuic_quinn`
}; };
match res { match res {
Ok(()) => {} Ok(()) => {}
Err(err) => log::error!("[connection] {err}"), Err(err) => log::warn!("[relay] incoming bidirectional stream error: {err}"),
} }
} }
async fn handle_datagram(self, dg: Bytes) { async fn handle_datagram(self, dg: Bytes) {
log::debug!("[connection] incoming datagram"); log::debug!("[relay] incoming datagram");
let res = match self.model.accept_datagram(dg) { let res = match self.model.accept_datagram(dg) {
Err(err) => Err(Error::from(err)), Err(err) => Err(Error::Model(err)),
Ok(Task::Packet(pkt)) => match self.udp_relay_mode { Ok(Task::Packet(pkt)) => match self.udp_relay_mode {
UdpRelayMode::Native => match pkt.accept().await { UdpRelayMode::Native => {
Ok(Some((pkt, addr, assoc_id))) => { log::debug!(
let addr = match addr { "[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {frag_id}/{frag_total}",
Address::None => unreachable!(), assoc_id = pkt.assoc_id(),
Address::DomainAddress(domain, port) => { pkt_id = pkt.pkt_id(),
Socks5Address::DomainAddress(domain, port) frag_id = pkt.frag_id(),
} frag_total = pkt.frag_total(),
Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr), );
}; Self::handle_packet(pkt).await;
Socks5Server::recv_pkt(pkt, addr, assoc_id).await; Ok(())
Ok(()) }
}
Ok(None) => Ok(()),
Err(err) => Err(Error::from(err)),
},
UdpRelayMode::Quic => Err(Error::WrongPacketSource), UdpRelayMode::Quic => Err(Error::WrongPacketSource),
}, },
_ => unreachable!(), _ => unreachable!(), // already filtered in `tuic_quinn`
}; };
match res { match res {
Ok(()) => {} Ok(()) => {}
Err(err) => log::error!("[connection] {err}"), Err(err) => log::warn!("[relay] incoming datagram error: {err}"),
} }
} }
async fn authenticate(self) { async fn authenticate(self) {
log::debug!("[relay] [authenticate] sending authentication");
match self match self
.model .model
.authenticate(self.uuid, self.password.clone()) .authenticate(self.uuid, self.password.clone())
.await .await
{ {
Ok(()) => log::info!("[connection] authentication sent"), Ok(()) => log::info!("[relay] [authenticate] {uuid}", uuid = self.uuid),
Err(err) => log::warn!("[connection] authentication failed: {err}"), Err(err) => log::warn!("[relay] [authenticate] authentication sending error: {err}"),
} }
} }
@ -460,12 +452,51 @@ impl Connection {
} }
match self.model.heartbeat().await { match self.model.heartbeat().await {
Ok(()) => log::info!("[connection] heartbeat"), Ok(()) => log::debug!("[relay] [heartbeat]"),
Err(err) => log::warn!("[connection] heartbeat error: {err}"), Err(err) => log::warn!("[relay] [heartbeat] heartbeat sending error: {err}"),
} }
} }
} }
async fn handle_packet(pkt: Packet) {
let assoc_id = pkt.assoc_id();
let pkt_id = pkt.pkt_id();
match pkt.accept().await {
Ok(Some((pkt, addr, _))) => {
log::info!(
"[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] {addr}",
);
let addr = match addr {
Address::None => unreachable!(),
Address::DomainAddress(domain, port) => {
Socks5Address::DomainAddress(domain, port)
}
Address::SocketAddress(addr) => Socks5Address::SocketAddress(addr),
};
if let Some(session) = SOCKS5_UDP_SESSIONS
.get()
.unwrap()
.lock()
.get(&assoc_id)
.cloned()
{
if let Err(err) = session.send(pkt, addr).await {
log::warn!(
"[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] failed sending packet to socks5 client: {err}",
);
}
} else {
log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] unable to find socks5 associate session");
}
}
Ok(None) => {}
Err(err) => log::warn!("[relay] [packet] [{assoc_id:#06x}] [from-native] [{pkt_id:#06x}] packet receiving error: {err}"),
}
}
async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) { async fn collect_garbage(self, gc_interval: Duration, gc_lifetime: Duration) {
loop { loop {
time::sleep(gc_interval).await; time::sleep(gc_interval).await;
@ -474,7 +505,7 @@ impl Connection {
break; break;
} }
log::debug!("[connection] packet garbage collection"); log::debug!("[relay] packet fragment garbage collecting event");
self.model.collect_garbage(gc_lifetime); self.model.collect_garbage(gc_lifetime);
} }
} }

View File

@ -43,7 +43,7 @@ async fn main() {
} }
} }
match Socks5Server::set_config(cfg.local).await { match Socks5Server::set_config(cfg.local) {
Ok(()) => {} Ok(()) => {}
Err(err) => { Err(err) => {
eprintln!("{err}"); eprintln!("{err}");

View File

@ -13,7 +13,7 @@ use socks5_server::{
use std::{ use std::{
collections::HashMap, collections::HashMap,
io::{Error as IoError, ErrorKind}, io::{Error as IoError, ErrorKind},
net::{SocketAddr, TcpListener as StdTcpListener, UdpSocket as StdUdpSocket}, net::{IpAddr, SocketAddr, TcpListener as StdTcpListener, UdpSocket as StdUdpSocket},
sync::{ sync::{
atomic::{AtomicU16, Ordering}, atomic::{AtomicU16, Ordering},
Arc, Arc,
@ -27,19 +27,45 @@ use tokio_util::compat::FuturesAsyncReadCompatExt;
use tuic::Address as TuicAddress; use tuic::Address as TuicAddress;
static SERVER: OnceCell<Server> = OnceCell::new(); static SERVER: OnceCell<Server> = OnceCell::new();
pub static UDP_SESSIONS: OnceCell<Mutex<HashMap<u16, UdpSession>>> = OnceCell::new();
pub struct Server { pub struct Server {
inner: Socks5Server, inner: Socks5Server,
dual_stack: Option<bool>, dual_stack: Option<bool>,
max_pkt_size: usize, max_pkt_size: usize,
next_assoc_id: AtomicU16, next_assoc_id: AtomicU16,
udp_sessions: Mutex<HashMap<u16, Arc<AssociatedUdpSocket>>>,
} }
impl Server { impl Server {
pub async fn set_config(cfg: Local) -> Result<(), Error> { pub fn set_config(cfg: Local) -> Result<(), Error> {
SERVER
.set(Self::new(
cfg.server,
cfg.dual_stack,
cfg.max_packet_size,
cfg.username.map(|s| s.into_bytes()),
cfg.password.map(|s| s.into_bytes()),
)?)
.map_err(|_| "failed initializing socks5 server")
.unwrap();
UDP_SESSIONS
.set(Mutex::new(HashMap::new()))
.map_err(|_| "failed initializing socks5 UDP session pool")
.unwrap();
Ok(())
}
fn new(
addr: SocketAddr,
dual_stack: Option<bool>,
max_pkt_size: usize,
username: Option<Vec<u8>>,
password: Option<Vec<u8>>,
) -> Result<Self, Error> {
let socket = { let socket = {
let domain = match cfg.server { let domain = match addr {
SocketAddr::V4(_) => Domain::IPV4, SocketAddr::V4(_) => Domain::IPV4,
SocketAddr::V6(_) => Domain::IPV6, SocketAddr::V6(_) => Domain::IPV6,
}; };
@ -47,7 +73,7 @@ impl Server {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP)) let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))
.map_err(|err| Error::Socket("failed to create socks5 server socket", err))?; .map_err(|err| Error::Socket("failed to create socks5 server socket", err))?;
if let Some(dual_stack) = cfg.dual_stack { if let Some(dual_stack) = dual_stack {
socket.set_only_v6(!dual_stack).map_err(|err| { socket.set_only_v6(!dual_stack).map_err(|err| {
Error::Socket("socks5 server dual-stack socket setting error", err) Error::Socket("socks5 server dual-stack socket setting error", err)
})?; })?;
@ -62,7 +88,7 @@ impl Server {
})?; })?;
socket socket
.bind(&SockAddr::from(cfg.server)) .bind(&SockAddr::from(addr))
.map_err(|err| Error::Socket("failed to bind socks5 server socket", err))?; .map_err(|err| Error::Socket("failed to bind socks5 server socket", err))?;
socket socket
@ -73,53 +99,58 @@ impl Server {
.map_err(|err| Error::Socket("failed to create socks5 server socket", err))? .map_err(|err| Error::Socket("failed to create socks5 server socket", err))?
}; };
let auth: Arc<dyn Auth + Send + Sync> = match (cfg.username, cfg.password) { let auth: Arc<dyn Auth + Send + Sync> = match (username, password) {
(Some(username), Some(password)) => { (Some(username), Some(password)) => Arc::new(Password::new(username, password)),
Arc::new(Password::new(username.into_bytes(), password.into_bytes()))
}
(None, None) => Arc::new(NoAuth), (None, None) => Arc::new(NoAuth),
_ => return Err(Error::InvalidSocks5Auth), _ => return Err(Error::InvalidSocks5Auth),
}; };
let server = Self { Ok(Self {
inner: Socks5Server::new(socket, auth), inner: Socks5Server::new(socket, auth),
dual_stack: cfg.dual_stack, dual_stack,
max_pkt_size: cfg.max_packet_size, max_pkt_size,
next_assoc_id: AtomicU16::new(0), next_assoc_id: AtomicU16::new(0),
udp_sessions: Mutex::new(HashMap::new()), })
};
SERVER
.set(server)
.map_err(|_| "socks5 server already initialized")
.unwrap();
Ok(())
} }
pub async fn start() { pub async fn start() {
log::warn!("[socks5] server started, listening on {}", Self::addr()); let server = SERVER.get().unwrap();
log::warn!(
"[socks5] server started, listening on {}",
server.inner.local_addr().unwrap()
);
loop { loop {
match SERVER.get().unwrap().inner.accept().await { match server.inner.accept().await {
Ok((conn, addr)) => { Ok((conn, addr)) => {
log::debug!("[socks5] [{addr}] connection established"); log::debug!("[socks5] [{addr}] connection established");
tokio::spawn(async move { tokio::spawn(async move {
let res = match conn.handshake().await { match conn.handshake().await {
Ok(Connection::Associate(associate, addr)) => { Ok(Connection::Associate(associate, _)) => {
Self::handle_associate(associate, addr).await let assoc_id = server.next_assoc_id.fetch_add(1, Ordering::Relaxed);
log::info!("[socks5] [{addr}] [associate] [{assoc_id:#06x}]");
Self::handle_associate(
associate,
assoc_id,
server.dual_stack,
server.max_pkt_size,
)
.await;
} }
Ok(Connection::Bind(bind, addr)) => Self::handle_bind(bind, addr).await, Ok(Connection::Bind(bind, _)) => {
Ok(Connection::Connect(connect, addr)) => { log::info!("[socks5] [{addr}] [bind]");
Self::handle_connect(connect, addr).await Self::handle_bind(bind).await;
} }
Err(err) => Err(Error::from(err)), Ok(Connection::Connect(connect, target_addr)) => {
log::info!("[socks5] [{addr}] [connect] [{target_addr}]");
Self::handle_connect(connect, target_addr).await;
}
Err(err) => log::warn!("[socks5] [{addr}] handshake error: {err}"),
}; };
match res { log::debug!("[socks5] [{addr}] connection closed");
Ok(()) => log::debug!("[socks5] [{addr}] connection closed"),
Err(err) => log::warn!("[socks5] [{addr}] {err}"),
}
}); });
} }
Err(err) => log::warn!("[socks5] failed to establish connection: {err}"), Err(err) => log::warn!("[socks5] failed to establish connection: {err}"),
@ -129,87 +160,145 @@ impl Server {
async fn handle_associate( async fn handle_associate(
assoc: Associate<associate::NeedReply>, assoc: Associate<associate::NeedReply>,
_addr: Address, assoc_id: u16,
) -> Result<(), Error> { dual_stack: Option<bool>,
async fn get_assoc_socket() -> Result<Arc<AssociatedUdpSocket>, Error> { max_pkt_size: usize,
let domain = match Server::addr() { ) {
SocketAddr::V4(_) => Domain::IPV4, let peer_addr = assoc.peer_addr().unwrap();
SocketAddr::V6(_) => Domain::IPV6, let local_ip = assoc.local_addr().unwrap().ip();
};
let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)).map_err(|err| { match UdpSession::new(assoc_id, peer_addr, local_ip, dual_stack, max_pkt_size) {
Error::Socket("failed to create socks5 server UDP associate socket", err) Ok(session) => {
})?; let local_addr = session.local_addr().unwrap();
log::debug!(
"[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] bound to {local_addr}"
);
if let Some(dual_stack) = Server::dual_stack() { let mut assoc = match assoc
socket.set_only_v6(!dual_stack).map_err(|err| { .reply(Reply::Succeeded, Address::SocketAddress(local_addr))
Error::Socket( .await
"socks5 server UDP associate dual-stack socket setting error", {
err, Ok(assoc) => assoc,
) Err(err) => {
})?; log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}");
} return;
}
};
socket.set_nonblocking(true).map_err(|err| { UDP_SESSIONS
Error::Socket( .get()
"failed setting socks5 server UDP associate socket as non-blocking", .unwrap()
err, .lock()
) .insert(assoc_id, session.clone());
})?;
socket let handle_local_incoming_pkt = async move {
.bind(&SockAddr::from(Server::addr())) loop {
.map_err(|err| { let (pkt, target_addr) = match session.recv().await {
Error::Socket("failed to bind socks5 server UDP associate socket", err) Ok(res) => res,
})?; Err(err) => {
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed to receive UDP packet: {err}");
continue;
}
};
let socket = UdpSocket::from_std(StdUdpSocket::from(socket)).map_err(|err| { let forward = async move {
Error::Socket("failed to create socks5 server UDP associate socket", err) let target_addr = match target_addr {
})?; Address::DomainAddress(domain, port) => {
TuicAddress::DomainAddress(domain, port)
}
Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr),
};
Ok(Arc::new(AssociatedUdpSocket::from(( match TuicConnection::get().await {
socket, Ok(conn) => conn.packet(pkt, target_addr, assoc_id).await,
Server::max_pkt_size(), Err(err) => Err(err),
)))) }
} };
match get_assoc_socket().await { tokio::spawn(async move {
Ok(assoc_socket) => { match forward.await {
let assoc = assoc Ok(()) => {}
.reply( Err(err) => {
Reply::Succeeded, log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed relaying UDP packet: {err}");
Address::SocketAddress(assoc_socket.local_addr().unwrap()), }
) }
.await?; });
Self::send_pkt(assoc, assoc_socket).await }
};
match tokio::select! {
res = assoc.wait_until_closed() => res,
_ = handle_local_incoming_pkt => unreachable!(),
} {
Ok(()) => {}
Err(err) => {
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] associate connection error: {err}")
}
}
log::debug!(
"[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] stopped associating"
);
UDP_SESSIONS
.get()
.unwrap()
.lock()
.remove(&assoc_id)
.unwrap();
let res = match TuicConnection::get().await {
Ok(conn) => conn.dissociate(assoc_id).await,
Err(err) => Err(err),
};
match res {
Ok(()) => {}
Err(err) => log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed stoping UDP relaying session: {err}"),
}
} }
Err(err) => { Err(err) => {
log::warn!("[socks5] failed to create associated socket: {err}"); log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] failed setting up UDP associate session: {err}");
let mut assoc = assoc
match assoc
.reply(Reply::GeneralFailure, Address::unspecified()) .reply(Reply::GeneralFailure, Address::unspecified())
.await?; .await
let _ = assoc.shutdown().await; {
Ok(()) Ok(mut assoc) => {
let _ = assoc.shutdown().await;
}
Err(err) => {
log::warn!("[socks5] [{peer_addr}] [associate] [{assoc_id:#06x}] command reply error: {err}")
}
}
} }
} }
} }
async fn handle_bind(bind: Bind<bind::NeedFirstReply>, _addr: Address) -> Result<(), Error> { async fn handle_bind(bind: Bind<bind::NeedFirstReply>) {
let mut conn = bind let peer_addr = bind.peer_addr().unwrap();
log::warn!("[socks5] [{peer_addr}] [bind] command not supported");
match bind
.reply(Reply::CommandNotSupported, Address::unspecified()) .reply(Reply::CommandNotSupported, Address::unspecified())
.await?; .await
let _ = conn.shutdown().await; {
Ok(()) Ok(mut bind) => {
let _ = bind.shutdown().await;
}
Err(err) => log::warn!("[socks5] [{peer_addr}] [bind] command reply error: {err}"),
}
} }
async fn handle_connect(conn: Connect<connect::NeedReply>, addr: Address) -> Result<(), Error> { async fn handle_connect(conn: Connect<connect::NeedReply>, addr: Address) {
let peer_addr = conn.peer_addr().unwrap();
let target_addr = match addr { let target_addr = match addr {
Address::DomainAddress(domain, port) => TuicAddress::DomainAddress(domain, port), Address::DomainAddress(domain, port) => TuicAddress::DomainAddress(domain, port),
Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr), Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr),
}; };
let relay = match TuicConnection::get().await { let relay = match TuicConnection::get().await {
Ok(conn) => conn.connect(target_addr).await, Ok(conn) => conn.connect(target_addr.clone()).await,
Err(err) => Err(err), Err(err) => Err(err),
}; };
@ -219,140 +308,150 @@ impl Server {
match conn.reply(Reply::Succeeded, Address::unspecified()).await { match conn.reply(Reply::Succeeded, Address::unspecified()).await {
Ok(mut conn) => match io::copy_bidirectional(&mut conn, &mut relay).await { Ok(mut conn) => match io::copy_bidirectional(&mut conn, &mut relay).await {
Ok(_) => Ok(()), Ok(_) => {}
Err(err) => { Err(err) => {
let _ = conn.shutdown().await; let _ = conn.shutdown().await;
let _ = relay.get_mut().reset(VarInt::from_u32(0)); let _ = relay.get_mut().reset(VarInt::from_u32(0));
Err(Error::from(err)) log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] TCP stream relaying error: {err}");
} }
}, },
Err(err) => { Err(err) => {
let _ = relay.shutdown().await; let _ = relay.shutdown().await;
Err(Error::from(err)) log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}");
} }
} }
} }
Err(relay_err) => { Err(err) => {
log::error!("[connection] {relay_err}"); log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] unable to relay TCP stream: {err}");
let mut conn = conn
match conn
.reply(Reply::GeneralFailure, Address::unspecified()) .reply(Reply::GeneralFailure, Address::unspecified())
.await?; .await
let _ = conn.shutdown().await; {
Ok(()) Ok(mut conn) => {
} let _ = conn.shutdown().await;
} }
} Err(err) => {
log::warn!("[socks5] [{peer_addr}] [connect] [{target_addr}] command reply error: {err}")
async fn send_pkt( }
mut assoc: Associate<associate::Ready>,
assoc_socket: Arc<AssociatedUdpSocket>,
) -> Result<(), Error> {
let assoc_id = SERVER
.get()
.unwrap()
.next_assoc_id
.fetch_add(1, Ordering::AcqRel);
SERVER
.get()
.unwrap()
.udp_sessions
.lock()
.insert(assoc_id, assoc_socket.clone());
let mut connected = None;
async fn accept_pkt(
assoc_socket: &AssociatedUdpSocket,
connected: &mut Option<SocketAddr>,
assoc_id: u16,
) -> Result<(), Error> {
let (pkt, frag, dst_addr, src_addr) = assoc_socket.recv_from().await?;
if let Some(connected) = connected {
if connected != &src_addr {
Err(IoError::new(
ErrorKind::Other,
format!("invalid source address: {src_addr}"),
))?;
} }
} else {
assoc_socket.connect(src_addr).await?;
*connected = Some(src_addr);
} }
if frag != 0 {
Err(IoError::new(
ErrorKind::Other,
"fragmented packet is not supported",
))?;
}
let target_addr = match dst_addr {
Address::DomainAddress(domain, port) => TuicAddress::DomainAddress(domain, port),
Address::SocketAddress(addr) => TuicAddress::SocketAddress(addr),
};
let res = match TuicConnection::get().await {
Ok(conn) => conn.packet(pkt, target_addr, assoc_id).await,
Err(err) => Err(err),
};
match res {
Ok(()) => {}
Err(err) => log::error!("[connection] {err}"),
}
Ok(())
} }
}
let res = tokio::select! { }
res = assoc.wait_until_closed() => res,
_ = async { loop { #[derive(Clone)]
if let Err(err) = accept_pkt(&assoc_socket, &mut connected, assoc_id).await { pub struct UdpSession {
log::warn!("[socks5] {err}"); socket: Arc<AssociatedUdpSocket>,
} assoc_id: u16,
}} => unreachable!(), ctrl_addr: SocketAddr,
}; }
let _ = assoc.shutdown().await; impl UdpSession {
SERVER.get().unwrap().udp_sessions.lock().remove(&assoc_id); fn new(
assoc_id: u16,
let dissoc_res = match TuicConnection::get().await { ctrl_addr: SocketAddr,
Ok(conn) => conn.dissociate(assoc_id).await, local_ip: IpAddr,
Err(err) => Err(err), dual_stack: Option<bool>,
}; max_pkt_size: usize,
) -> Result<Self, Error> {
match dissoc_res { let domain = match local_ip {
Ok(()) => {} IpAddr::V4(_) => Domain::IPV4,
Err(err) => log::error!("[connection] [dissociate] {err}"), IpAddr::V6(_) => Domain::IPV6,
} };
Ok(res?) let socket = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP)).map_err(|err| {
} Error::Socket("failed to create socks5 server UDP associate socket", err)
})?;
pub async fn recv_pkt(pkt: Bytes, addr: Address, assoc_id: u16) {
let assoc_socket = { if let Some(dual_stack) = dual_stack {
let sessions = SERVER.get().unwrap().udp_sessions.lock(); socket.set_only_v6(!dual_stack).map_err(|err| {
let Some(assoc_socket) = sessions.get(&assoc_id) else { unreachable!() }; Error::Socket(
assoc_socket.clone() "socks5 server UDP associate dual-stack socket setting error",
}; err,
)
match assoc_socket.send(pkt, 0, addr).await { })?;
Ok(_) => {} }
Err(err) => log::error!("[socks5] [send] {err}"),
} socket.set_nonblocking(true).map_err(|err| {
} Error::Socket(
"failed setting socks5 server UDP associate socket as non-blocking",
fn addr() -> SocketAddr { err,
SERVER.get().unwrap().inner.local_addr().unwrap() )
} })?;
fn dual_stack() -> Option<bool> { socket
SERVER.get().unwrap().dual_stack .bind(&SockAddr::from(SocketAddr::from((local_ip, 0))))
} .map_err(|err| {
Error::Socket("failed to bind socks5 server UDP associate socket", err)
fn max_pkt_size() -> usize { })?;
SERVER.get().unwrap().max_pkt_size
let socket = UdpSocket::from_std(StdUdpSocket::from(socket)).map_err(|err| {
Error::Socket("failed to create socks5 server UDP associate socket", err)
})?;
Ok(Self {
socket: Arc::new(AssociatedUdpSocket::from((socket, max_pkt_size))),
assoc_id,
ctrl_addr,
})
}
pub async fn send(&self, pkt: Bytes, src_addr: Address) -> Result<(), Error> {
let src_addr_display = src_addr.to_string();
log::debug!(
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr}",
ctrl_addr = self.ctrl_addr,
assoc_id = self.assoc_id,
dst_addr = self.socket.peer_addr().unwrap(),
);
if let Err(err) = self.socket.send(pkt, 0, src_addr).await {
log::warn!(
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] send packet from {src_addr_display} to {dst_addr} error: {err}",
ctrl_addr = self.ctrl_addr,
assoc_id = self.assoc_id,
dst_addr = self.socket.peer_addr().unwrap(),
);
return Err(Error::Io(err));
}
Ok(())
}
pub async fn recv(&self) -> Result<(Bytes, Address), Error> {
let (pkt, frag, dst_addr, src_addr) = self.socket.recv_from().await?;
if let Ok(connected_addr) = self.socket.peer_addr() {
if src_addr != connected_addr {
Err(IoError::new(
ErrorKind::Other,
format!("invalid source address: {src_addr}"),
))?;
}
} else {
self.socket.connect(src_addr).await?;
}
if frag != 0 {
Err(IoError::new(
ErrorKind::Other,
"fragmented packet is not supported",
))?;
}
log::debug!(
"[socks5] [{ctrl_addr}] [associate] [{assoc_id:#06x}] receive packet from {src_addr} to {dst_addr}",
ctrl_addr = self.ctrl_addr,
assoc_id = self.assoc_id
);
Ok((pkt, dst_addr))
}
fn local_addr(&self) -> Result<SocketAddr, IoError> {
self.socket.local_addr()
} }
} }