1
0

add config task_negotiation_timeout

This commit is contained in:
EAimTY 2023-05-27 20:18:37 +09:00
parent 27c378e5b6
commit 9f86717f93
4 changed files with 81 additions and 30 deletions

View File

@ -67,6 +67,10 @@ tuic-server -c PATH/TO/CONFIG
// Default: 3s // Default: 3s
"auth_timeout": "3s", "auth_timeout": "3s",
// Optional. Maximum duration server expects for task negotiation
// Default: 3s
"task_negotiation_timeout": "3s",
// Optional. How long the server should wait before closing an idle connection // Optional. How long the server should wait before closing an idle connection
// Default: 10s // Default: 10s
"max_idle_time": "10s", "max_idle_time": "10s",

View File

@ -55,6 +55,12 @@ pub struct Config {
)] )]
pub auth_timeout: Duration, pub auth_timeout: Duration,
#[serde(
default = "default::task_negotiation_timeout",
deserialize_with = "deserialize_duration"
)]
pub task_negotiation_timeout: Duration,
#[serde( #[serde(
default = "default::max_idle_time", default = "default::max_idle_time",
deserialize_with = "deserialize_duration" deserialize_with = "deserialize_duration"
@ -136,6 +142,10 @@ mod default {
Duration::from_secs(3) Duration::from_secs(3)
} }
pub fn task_negotiation_timeout() -> Duration {
Duration::from_secs(3)
}
pub fn max_idle_time() -> Duration { pub fn max_idle_time() -> Duration {
Duration::from_secs(10) Duration::from_secs(10)
} }

View File

@ -66,6 +66,8 @@ pub enum Error {
UnexpectedPacketSource, UnexpectedPacketSource,
#[error("{0}: {1}")] #[error("{0}: {1}")]
Socket(&'static str, IoError), Socket(&'static str, IoError),
#[error("task negotiation timed out")]
TaskNegotiationTimeout,
#[error("{0} resolved to {1} but IPv6 UDP relaying is disabled")] #[error("{0} resolved to {1} but IPv6 UDP relaying is disabled")]
UdpRelayIpv6Disabled(Address, SocketAddr), UdpRelayIpv6Disabled(Address, SocketAddr),
} }

View File

@ -49,6 +49,7 @@ pub struct Server {
udp_relay_ipv6: bool, udp_relay_ipv6: bool,
zero_rtt_handshake: bool, zero_rtt_handshake: bool,
auth_timeout: Duration, auth_timeout: Duration,
task_negotiation_timeout: Duration,
max_external_pkt_size: usize, max_external_pkt_size: usize,
gc_interval: Duration, gc_interval: Duration,
gc_lifetime: Duration, gc_lifetime: Duration,
@ -136,6 +137,7 @@ impl Server {
udp_relay_ipv6: cfg.udp_relay_ipv6, udp_relay_ipv6: cfg.udp_relay_ipv6,
zero_rtt_handshake: cfg.zero_rtt_handshake, zero_rtt_handshake: cfg.zero_rtt_handshake,
auth_timeout: cfg.auth_timeout, auth_timeout: cfg.auth_timeout,
task_negotiation_timeout: cfg.task_negotiation_timeout,
max_external_pkt_size: cfg.max_external_packet_size, max_external_pkt_size: cfg.max_external_packet_size,
gc_interval: cfg.gc_interval, gc_interval: cfg.gc_interval,
gc_lifetime: cfg.gc_lifetime, gc_lifetime: cfg.gc_lifetime,
@ -159,6 +161,7 @@ impl Server {
self.udp_relay_ipv6, self.udp_relay_ipv6,
self.zero_rtt_handshake, self.zero_rtt_handshake,
self.auth_timeout, self.auth_timeout,
self.task_negotiation_timeout,
self.max_external_pkt_size, self.max_external_pkt_size,
self.gc_interval, self.gc_interval,
self.gc_lifetime, self.gc_lifetime,
@ -174,6 +177,7 @@ struct Connection {
users: Arc<HashMap<Uuid, Vec<u8>>>, users: Arc<HashMap<Uuid, Vec<u8>>>,
udp_relay_ipv6: bool, udp_relay_ipv6: bool,
is_authed: IsAuthed, is_authed: IsAuthed,
task_negotiation_timeout: Duration,
udp_sessions: Arc<AsyncMutex<HashMap<u16, UdpSession>>>, udp_sessions: Arc<AsyncMutex<HashMap<u16, UdpSession>>>,
udp_relay_mode: Arc<AtomicCell<Option<UdpRelayMode>>>, udp_relay_mode: Arc<AtomicCell<Option<UdpRelayMode>>>,
max_external_pkt_size: usize, max_external_pkt_size: usize,
@ -191,22 +195,52 @@ impl Connection {
udp_relay_ipv6: bool, udp_relay_ipv6: bool,
zero_rtt_handshake: bool, zero_rtt_handshake: bool,
auth_timeout: Duration, auth_timeout: Duration,
task_negotiation_timeout: Duration,
max_external_pkt_size: usize, max_external_pkt_size: usize,
gc_interval: Duration, gc_interval: Duration,
gc_lifetime: Duration, gc_lifetime: Duration,
) { ) {
async fn init(
conn: Connecting,
users: Arc<HashMap<Uuid, Vec<u8>>>,
udp_relay_ipv6: bool,
zero_rtt_handshake: bool,
task_negotiation_timeout: Duration,
max_external_pkt_size: usize,
) -> Result<Connection, Error> {
let conn = if zero_rtt_handshake {
match conn.into_0rtt() {
Ok((conn, _)) => conn,
Err(conn) => {
log::info!("0-RTT handshake failed, fallback to 1-RTT handshake");
conn.await?
}
}
} else {
conn.await?
};
Ok(Connection::new(
conn,
users,
udp_relay_ipv6,
task_negotiation_timeout,
max_external_pkt_size,
))
}
let addr = conn.remote_address(); let addr = conn.remote_address();
let conn = Self::init( match init(
conn, conn,
users, users,
udp_relay_ipv6, udp_relay_ipv6,
zero_rtt_handshake, zero_rtt_handshake,
task_negotiation_timeout,
max_external_pkt_size, max_external_pkt_size,
) )
.await; .await
{
match conn {
Ok(conn) => { Ok(conn) => {
log::info!("[{addr}] connection established"); log::info!("[{addr}] connection established");
@ -234,31 +268,20 @@ impl Connection {
} }
} }
async fn init( fn new(
conn: Connecting, conn: QuinnConnection,
users: Arc<HashMap<Uuid, Vec<u8>>>, users: Arc<HashMap<Uuid, Vec<u8>>>,
udp_relay_ipv6: bool, udp_relay_ipv6: bool,
zero_rtt_handshake: bool, task_negotiation_timeout: Duration,
max_external_pkt_size: usize, max_external_pkt_size: usize,
) -> Result<Self, Error> { ) -> Self {
let conn = if zero_rtt_handshake { Self {
match conn.into_0rtt() {
Ok((conn, _)) => conn,
Err(conn) => {
log::info!("0-RTT handshake failed, fallback to 1-RTT handshake");
conn.await?
}
}
} else {
conn.await?
};
Ok(Self {
inner: conn.clone(), inner: conn.clone(),
model: Model::<side::Server>::new(conn), model: Model::<side::Server>::new(conn),
users, users,
udp_relay_ipv6, udp_relay_ipv6,
is_authed: IsAuthed::new(), is_authed: IsAuthed::new(),
task_negotiation_timeout,
udp_sessions: Arc::new(AsyncMutex::new(HashMap::new())), udp_sessions: Arc::new(AsyncMutex::new(HashMap::new())),
udp_relay_mode: Arc::new(AtomicCell::new(None)), udp_relay_mode: Arc::new(AtomicCell::new(None)),
max_external_pkt_size, max_external_pkt_size,
@ -266,7 +289,7 @@ impl Connection {
remote_bi_stream_cnt: Counter::new(), remote_bi_stream_cnt: Counter::new(),
max_concurrent_uni_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)), max_concurrent_uni_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)),
max_concurrent_bi_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)), max_concurrent_bi_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)),
}) }
} }
async fn accept(&self) -> Result<(), Error> { async fn accept(&self) -> Result<(), Error> {
@ -296,8 +319,14 @@ impl Connection {
.set_max_concurrent_uni_streams(VarInt::from((max * 2) as u32)); .set_max_concurrent_uni_streams(VarInt::from((max * 2) as u32));
} }
async fn pre_process(conn: &Connection, recv: RecvStream) -> Result<Task, Error> { async fn pre_process(
let task = conn.model.accept_uni_stream(recv).await?; conn: &Connection,
recv: RecvStream,
task_negotiation_timeout: Duration,
) -> Result<Task, Error> {
let task = time::timeout(task_negotiation_timeout, conn.model.accept_uni_stream(recv))
.await
.map_err(|_| Error::TaskNegotiationTimeout)??;
if let Task::Authenticate(auth) = &task { if let Task::Authenticate(auth) = &task {
if conn.is_authed() { if conn.is_authed() {
@ -315,7 +344,7 @@ impl Connection {
tokio::select! { tokio::select! {
() = conn.authed() => {} () = conn.authed() => {}
err = conn.inner.closed() => Err(err)?, err = conn.inner.closed() => return Err(Error::Connection(err)),
}; };
let same_pkt_src = matches!(task, Task::Packet(_)) let same_pkt_src = matches!(task, Task::Packet(_))
@ -327,7 +356,7 @@ impl Connection {
Ok(task) Ok(task)
} }
match pre_process(&self, recv).await { match pre_process(&self, recv, self.task_negotiation_timeout).await {
Ok(Task::Authenticate(auth)) => log::info!("[{addr}] authenticated as {}", auth.uuid()), Ok(Task::Authenticate(auth)) => log::info!("[{addr}] authenticated as {}", auth.uuid()),
Ok(Task::Packet(pkt)) => { Ok(Task::Packet(pkt)) => {
let assoc_id = pkt.assoc_id(); let assoc_id = pkt.assoc_id();
@ -380,18 +409,24 @@ impl Connection {
conn: &Connection, conn: &Connection,
send: SendStream, send: SendStream,
recv: RecvStream, recv: RecvStream,
task_negotiation_timeout: Duration,
) -> Result<Task, Error> { ) -> Result<Task, Error> {
let task = conn.model.accept_bi_stream(send, recv).await?; let task = time::timeout(
task_negotiation_timeout,
conn.model.accept_bi_stream(send, recv),
)
.await
.map_err(|_| Error::TaskNegotiationTimeout)??;
tokio::select! { tokio::select! {
() = conn.authed() => {} () = conn.authed() => {}
err = conn.inner.closed() => Err(err)?, err = conn.inner.closed() => return Err(Error::Connection(err)),
}; };
Ok(task) Ok(task)
} }
match pre_process(&self, send, recv).await { match pre_process(&self, send, recv, self.task_negotiation_timeout).await {
Ok(Task::Connect(conn)) => { Ok(Task::Connect(conn)) => {
let target_addr = conn.addr().to_string(); let target_addr = conn.addr().to_string();
log::info!("[{addr}] [connect] [{target_addr}]"); log::info!("[{addr}] [connect] [{target_addr}]");
@ -418,7 +453,7 @@ impl Connection {
tokio::select! { tokio::select! {
() = conn.authed() => {} () = conn.authed() => {}
err = conn.inner.closed() => Err(err)?, err = conn.inner.closed() => return Err(Error::Connection(err)),
}; };
let same_pkt_src = matches!(task, Task::Packet(_)) let same_pkt_src = matches!(task, Task::Packet(_))