1
0

less confusing connection initialization

This commit is contained in:
EAimTY 2023-05-27 18:24:31 +09:00
parent c61863df5d
commit 62193b2596

View File

@ -131,11 +131,8 @@ impl Endpoint {
ep: &mut QuinnEndpoint, ep: &mut QuinnEndpoint,
addr: SocketAddr, addr: SocketAddr,
server_name: &str, server_name: &str,
uuid: Uuid,
password: Arc<[u8]>,
udp_relay_mode: UdpRelayMode,
zero_rtt_handshake: bool, zero_rtt_handshake: bool,
) -> Result<Connection, Error> { ) -> Result<QuinnConnection, Error> {
let match_ipv4 = addr.is_ipv4() && ep.local_addr().map_or(false, |addr| addr.is_ipv4()); let match_ipv4 = addr.is_ipv4() && ep.local_addr().map_or(false, |addr| addr.is_ipv4());
let match_ipv6 = addr.is_ipv6() && ep.local_addr().map_or(false, |addr| addr.is_ipv6()); let match_ipv6 = addr.is_ipv6() && ep.local_addr().map_or(false, |addr| addr.is_ipv6());
@ -155,6 +152,7 @@ impl Endpoint {
} }
let conn = ep.connect(addr, server_name)?; let conn = ep.connect(addr, server_name)?;
let conn = if zero_rtt_handshake { let conn = if zero_rtt_handshake {
match conn.into_0rtt() { match conn.into_0rtt() {
Ok((conn, _)) => conn, Ok((conn, _)) => conn,
@ -169,7 +167,7 @@ impl Endpoint {
conn.await? conn.await?
}; };
Ok(Connection::new(conn, udp_relay_mode, uuid, password)) Ok(conn)
} }
let mut last_err = None; let mut last_err = None;
@ -179,9 +177,6 @@ impl Endpoint {
&mut self.ep, &mut self.ep,
addr, addr,
self.server.server_name(), self.server.server_name(),
self.uuid,
self.password.clone(),
self.udp_relay_mode,
self.zero_rtt_handshake, self.zero_rtt_handshake,
) )
.await; .await;
@ -189,12 +184,16 @@ impl Endpoint {
match res { match res {
Ok(conn) => { Ok(conn) => {
log::info!("[connection] connection established"); log::info!("[connection] connection established");
tokio::spawn(conn.clone().init(
return Ok(Connection::new(
conn,
self.udp_relay_mode,
self.uuid,
self.password.clone(),
self.heartbeat, self.heartbeat,
self.gc_interval, self.gc_interval,
self.gc_lifetime, self.gc_lifetime,
)); ));
return Ok(conn);
} }
Err(err) => last_err = Some(err), Err(err) => last_err = Some(err),
} }
@ -218,25 +217,6 @@ pub struct Connection {
} }
impl Connection { impl Connection {
fn new(
conn: QuinnConnection,
udp_relay_mode: UdpRelayMode,
uuid: Uuid,
password: Arc<[u8]>,
) -> Self {
Self {
conn: conn.clone(),
model: Model::<side::Client>::new(conn),
uuid,
password,
udp_relay_mode,
remote_uni_stream_cnt: Counter::new(),
remote_bi_stream_cnt: Counter::new(),
max_concurrent_uni_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)),
max_concurrent_bi_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)),
}
}
pub async fn get() -> Result<Connection, Error> { pub async fn get() -> Result<Connection, Error> {
let try_init_conn = async { let try_init_conn = async {
ENDPOINT ENDPOINT
@ -270,6 +250,57 @@ impl Connection {
Ok(conn) Ok(conn)
} }
fn new(
conn: QuinnConnection,
udp_relay_mode: UdpRelayMode,
uuid: Uuid,
password: Arc<[u8]>,
heartbeat: Duration,
gc_interval: Duration,
gc_lifetime: Duration,
) -> Self {
let conn = Self {
conn: conn.clone(),
model: Model::<side::Client>::new(conn),
uuid,
password,
udp_relay_mode,
remote_uni_stream_cnt: Counter::new(),
remote_bi_stream_cnt: Counter::new(),
max_concurrent_uni_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)),
max_concurrent_bi_streams: Arc::new(AtomicUsize::new(DEFAULT_CONCURRENT_STREAMS)),
};
tokio::spawn(conn.clone().init(heartbeat, gc_interval, gc_lifetime));
conn
}
async fn init(self, heartbeat: Duration, gc_interval: Duration, gc_lifetime: Duration) {
tokio::spawn(self.clone().authenticate());
tokio::spawn(self.clone().heartbeat(heartbeat));
tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime));
let err = loop {
tokio::select! {
res = self.accept_uni_stream() => match res {
Ok((recv, reg)) => tokio::spawn(self.clone().handle_uni_stream(recv, reg)),
Err(err) => break err,
},
res = self.accept_bi_stream() => match res {
Ok((send, recv, reg)) => tokio::spawn(self.clone().handle_bi_stream(send, recv, reg)),
Err(err) => break err,
},
res = self.accept_datagram() => match res {
Ok(dg) => tokio::spawn(self.clone().handle_datagram(dg)),
Err(err) => break err,
},
};
};
log::error!("[connection] {err}");
}
pub async fn connect(&self, addr: Address) -> Result<Connect, Error> { pub async fn connect(&self, addr: Address) -> Result<Connect, Error> {
Ok(self.model.connect(addr).await?) Ok(self.model.connect(addr).await?)
} }
@ -445,29 +476,4 @@ impl Connection {
self.model.collect_garbage(gc_lifetime); self.model.collect_garbage(gc_lifetime);
} }
} }
async fn init(self, heartbeat: Duration, gc_interval: Duration, gc_lifetime: Duration) {
tokio::spawn(self.clone().authenticate());
tokio::spawn(self.clone().heartbeat(heartbeat));
tokio::spawn(self.clone().collect_garbage(gc_interval, gc_lifetime));
let err = loop {
tokio::select! {
res = self.accept_uni_stream() => match res {
Ok((recv, reg)) => tokio::spawn(self.clone().handle_uni_stream(recv, reg)),
Err(err) => break err,
},
res = self.accept_bi_stream() => match res {
Ok((send, recv, reg)) => tokio::spawn(self.clone().handle_bi_stream(send, recv, reg)),
Err(err) => break err,
},
res = self.accept_datagram() => match res {
Ok(dg) => tokio::spawn(self.clone().handle_datagram(dg)),
Err(err) => break err,
},
};
};
log::error!("[connection] {err}");
}
} }