From 20384701a0c72535a9b03463f7ff9780e1dbe443 Mon Sep 17 00:00:00 2001 From: EAimTY Date: Fri, 27 Jan 2023 00:16:51 +0900 Subject: [PATCH] adding datagram and bi stream parsing methods --- tuic-quinn/src/lib.rs | 112 +++++++++++++++++++++++++++++++++++------- 1 file changed, 95 insertions(+), 17 deletions(-) diff --git a/tuic-quinn/src/lib.rs b/tuic-quinn/src/lib.rs index b247b11..0cfc3e1 100644 --- a/tuic-quinn/src/lib.rs +++ b/tuic-quinn/src/lib.rs @@ -38,7 +38,7 @@ pub mod side { pub struct Connection<'conn, Side> { conn: &'conn QuinnConnection, - model: ConnectionModel>, + model: ConnectionModel, _marker: Side, } @@ -46,8 +46,8 @@ impl<'conn, Side> Connection<'conn, Side> { pub async fn packet_native( &self, pkt: impl AsRef<[u8]>, - assoc_id: u16, addr: Address, + assoc_id: u16, ) -> Result<(), Error> { let Some(max_pkt_size) = self.conn.max_datagram_size() else { return Err(Error::SendDatagram(SendDatagramError::Disabled)); @@ -68,8 +68,8 @@ impl<'conn, Side> Connection<'conn, Side> { pub async fn packet_quic( &self, pkt: impl AsRef<[u8]>, - assoc_id: u16, addr: Address, + assoc_id: u16, ) -> Result<(), Error> { let model = self.model.send_packet(assoc_id, addr, u16::MAX as usize); let mut frags = model.into_fragments(pkt); @@ -79,23 +79,58 @@ impl<'conn, Side> Connection<'conn, Side> { let mut send = self.conn.open_uni().await?; header.marshal(&mut send).await?; AsyncWriteExt::write_all(&mut send, frag).await?; + send.close().await?; Ok(()) } + pub async fn accept_datagram(&self, dg: Bytes) -> Result { + let mut dg = Cursor::new(dg); + + match Header::unmarshal(&mut dg).await? { + Header::Authenticate(_) => Err(Error::BadCommand("authenticate")), + Header::Connect(_) => Err(Error::BadCommand("connect")), + Header::Packet(pkt) => { + let model = self.model.recv_packet(pkt); + let pos = dg.position() as usize; + let buf = dg.into_inner().slice(pos..pos + *model.size() as usize); + Ok(Task::Packet(self.accept_packet_native(model, buf).await?)) + } + Header::Dissociate(_) => Err(Error::BadCommand("dissociate")), + Header::Heartbeat(hb) => { + let _ = self.model.recv_heartbeat(hb); + Ok(Task::Heartbeat) + } + _ => unreachable!(), + } + } + async fn accept_packet_quic( &self, - model: PacketModel>, + model: PacketModel, mut recv: &mut RecvStream, - ) -> Result, Address, u16)>, Error> { + ) -> Result, Error> { let mut buf = vec![0; *model.size() as usize]; AsyncReadExt::read_exact(&mut recv, &mut buf).await?; let mut asm = Vec::new(); Ok(model - .assemble(buf)? + .assemble(Bytes::from(buf))? .map(|pkt| pkt.assemble(&mut asm)) - .map(|(addr, assoc_id)| (asm, addr, assoc_id))) + .map(|(addr, assoc_id)| (Bytes::from(asm), addr, assoc_id))) + } + + async fn accept_packet_native( + &self, + model: PacketModel, + data: Bytes, + ) -> Result, Error> { + let mut asm = Vec::new(); + + Ok(model + .assemble(data)? + .map(|pkt| pkt.assemble(&mut asm)) + .map(|(addr, assoc_id)| (Bytes::from(asm), addr, assoc_id))) } } @@ -108,6 +143,14 @@ impl<'conn> Connection<'conn, side::Client> { } } + pub async fn authenticate(&self, token: [u8; 8]) -> Result<(), Error> { + let mut send = self.conn.open_uni().await?; + let model = self.model.send_authenticate(token); + model.header().marshal(&mut send).await?; + send.close().await?; + Ok(()) + } + pub async fn connect(&self, addr: Address) -> Result { let (mut send, recv) = self.conn.open_bi().await?; let model = self.model.send_connect(addr); @@ -115,7 +158,7 @@ impl<'conn> Connection<'conn, side::Client> { Ok(Connect::new(Side::Client(model), send, recv)) } - pub async fn handle_uni_stream(&self, mut recv: RecvStream) -> Result { + pub async fn accept_uni_stream(&self, mut recv: RecvStream) -> Result { match Header::unmarshal(&mut recv).await? { Header::Authenticate(_) => Err(Error::BadCommand("authenticate")), Header::Connect(_) => Err(Error::BadCommand("connect")), @@ -126,13 +169,33 @@ impl<'conn> Connection<'conn, side::Client> { )) } Header::Dissociate(_) => Err(Error::BadCommand("dissociate")), - Header::Heartbeat(hb) => { - let _ = self.model.recv_heartbeat(hb); - Ok(Task::Heartbeat) - } + Header::Heartbeat(_) => Err(Error::BadCommand("heartbeat")), _ => unreachable!(), } } + + pub async fn accept_bi_stream( + &self, + _send: SendStream, + mut recv: RecvStream, + ) -> Result { + match Header::unmarshal(&mut recv).await? { + Header::Authenticate(_) => Err(Error::BadCommand("authenticate")), + Header::Connect(_) => Err(Error::BadCommand("connect")), + Header::Packet(_) => Err(Error::BadCommand("packet")), + Header::Dissociate(_) => Err(Error::BadCommand("dissociate")), + Header::Heartbeat(_) => Err(Error::BadCommand("heartbeat")), + _ => unreachable!(), + } + } + + pub async fn heartbeat(&self) -> Result<(), Error> { + let model = self.model.send_heartbeat(); + let mut buf = Vec::with_capacity(model.header().len()); + model.header().marshal(&mut buf).await.unwrap(); + self.conn.send_datagram(Bytes::from(buf))?; + Ok(()) + } } impl<'conn> Connection<'conn, side::Server> { @@ -144,7 +207,7 @@ impl<'conn> Connection<'conn, side::Server> { } } - pub async fn handle_uni_stream(&self, mut recv: RecvStream) -> Result { + pub async fn accept_uni_stream(&self, mut recv: RecvStream) -> Result { match Header::unmarshal(&mut recv).await? { Header::Authenticate(auth) => { let model = self.model.recv_authenticate(auth); @@ -161,10 +224,25 @@ impl<'conn> Connection<'conn, side::Server> { let _ = self.model.recv_dissociate(dissoc); Ok(Task::Dissociate) } - Header::Heartbeat(hb) => { - let _ = self.model.recv_heartbeat(hb); - Ok(Task::Heartbeat) + Header::Heartbeat(_) => Err(Error::BadCommand("heartbeat")), + _ => unreachable!(), + } + } + + pub async fn accept_bi_stream( + &self, + send: SendStream, + mut recv: RecvStream, + ) -> Result { + match Header::unmarshal(&mut recv).await? { + Header::Authenticate(_) => Err(Error::BadCommand("authenticate")), + Header::Connect(conn) => { + let model = self.model.recv_connect(conn); + Ok(Task::Connect(Connect::new(Side::Server(model), send, recv))) } + Header::Packet(_) => Err(Error::BadCommand("packet")), + Header::Dissociate(_) => Err(Error::BadCommand("dissociate")), + Header::Heartbeat(_) => Err(Error::BadCommand("heartbeat")), _ => unreachable!(), } } @@ -228,7 +306,7 @@ impl AsyncWrite for Connect { pub enum Task { Authenticate([u8; 8]), Connect(Connect), - Packet(Option<(Vec, Address, u16)>), + Packet(Option<(Bytes, Address, u16)>), Dissociate, Heartbeat, }