adding datagram and bi stream parsing methods
This commit is contained in:
parent
10cee5276e
commit
20384701a0
@ -38,7 +38,7 @@ pub mod side {
|
|||||||
|
|
||||||
pub struct Connection<'conn, Side> {
|
pub struct Connection<'conn, Side> {
|
||||||
conn: &'conn QuinnConnection,
|
conn: &'conn QuinnConnection,
|
||||||
model: ConnectionModel<Vec<u8>>,
|
model: ConnectionModel<Bytes>,
|
||||||
_marker: Side,
|
_marker: Side,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,8 +46,8 @@ impl<'conn, Side> Connection<'conn, Side> {
|
|||||||
pub async fn packet_native(
|
pub async fn packet_native(
|
||||||
&self,
|
&self,
|
||||||
pkt: impl AsRef<[u8]>,
|
pkt: impl AsRef<[u8]>,
|
||||||
assoc_id: u16,
|
|
||||||
addr: Address,
|
addr: Address,
|
||||||
|
assoc_id: u16,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let Some(max_pkt_size) = self.conn.max_datagram_size() else {
|
let Some(max_pkt_size) = self.conn.max_datagram_size() else {
|
||||||
return Err(Error::SendDatagram(SendDatagramError::Disabled));
|
return Err(Error::SendDatagram(SendDatagramError::Disabled));
|
||||||
@ -68,8 +68,8 @@ impl<'conn, Side> Connection<'conn, Side> {
|
|||||||
pub async fn packet_quic(
|
pub async fn packet_quic(
|
||||||
&self,
|
&self,
|
||||||
pkt: impl AsRef<[u8]>,
|
pkt: impl AsRef<[u8]>,
|
||||||
assoc_id: u16,
|
|
||||||
addr: Address,
|
addr: Address,
|
||||||
|
assoc_id: u16,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let model = self.model.send_packet(assoc_id, addr, u16::MAX as usize);
|
let model = self.model.send_packet(assoc_id, addr, u16::MAX as usize);
|
||||||
let mut frags = model.into_fragments(pkt);
|
let mut frags = model.into_fragments(pkt);
|
||||||
@ -79,23 +79,58 @@ impl<'conn, Side> Connection<'conn, Side> {
|
|||||||
let mut send = self.conn.open_uni().await?;
|
let mut send = self.conn.open_uni().await?;
|
||||||
header.marshal(&mut send).await?;
|
header.marshal(&mut send).await?;
|
||||||
AsyncWriteExt::write_all(&mut send, frag).await?;
|
AsyncWriteExt::write_all(&mut send, frag).await?;
|
||||||
|
send.close().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn accept_datagram(&self, dg: Bytes) -> Result<Task, Error> {
|
||||||
|
let mut dg = Cursor::new(dg);
|
||||||
|
|
||||||
|
match Header::unmarshal(&mut dg).await? {
|
||||||
|
Header::Authenticate(_) => Err(Error::BadCommand("authenticate")),
|
||||||
|
Header::Connect(_) => Err(Error::BadCommand("connect")),
|
||||||
|
Header::Packet(pkt) => {
|
||||||
|
let model = self.model.recv_packet(pkt);
|
||||||
|
let pos = dg.position() as usize;
|
||||||
|
let buf = dg.into_inner().slice(pos..pos + *model.size() as usize);
|
||||||
|
Ok(Task::Packet(self.accept_packet_native(model, buf).await?))
|
||||||
|
}
|
||||||
|
Header::Dissociate(_) => Err(Error::BadCommand("dissociate")),
|
||||||
|
Header::Heartbeat(hb) => {
|
||||||
|
let _ = self.model.recv_heartbeat(hb);
|
||||||
|
Ok(Task::Heartbeat)
|
||||||
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn accept_packet_quic(
|
async fn accept_packet_quic(
|
||||||
&self,
|
&self,
|
||||||
model: PacketModel<Rx, Vec<u8>>,
|
model: PacketModel<Rx, Bytes>,
|
||||||
mut recv: &mut RecvStream,
|
mut recv: &mut RecvStream,
|
||||||
) -> Result<Option<(Vec<u8>, Address, u16)>, Error> {
|
) -> Result<Option<(Bytes, Address, u16)>, Error> {
|
||||||
let mut buf = vec![0; *model.size() as usize];
|
let mut buf = vec![0; *model.size() as usize];
|
||||||
AsyncReadExt::read_exact(&mut recv, &mut buf).await?;
|
AsyncReadExt::read_exact(&mut recv, &mut buf).await?;
|
||||||
let mut asm = Vec::new();
|
let mut asm = Vec::new();
|
||||||
|
|
||||||
Ok(model
|
Ok(model
|
||||||
.assemble(buf)?
|
.assemble(Bytes::from(buf))?
|
||||||
.map(|pkt| pkt.assemble(&mut asm))
|
.map(|pkt| pkt.assemble(&mut asm))
|
||||||
.map(|(addr, assoc_id)| (asm, addr, assoc_id)))
|
.map(|(addr, assoc_id)| (Bytes::from(asm), addr, assoc_id)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn accept_packet_native(
|
||||||
|
&self,
|
||||||
|
model: PacketModel<Rx, Bytes>,
|
||||||
|
data: Bytes,
|
||||||
|
) -> Result<Option<(Bytes, Address, u16)>, Error> {
|
||||||
|
let mut asm = Vec::new();
|
||||||
|
|
||||||
|
Ok(model
|
||||||
|
.assemble(data)?
|
||||||
|
.map(|pkt| pkt.assemble(&mut asm))
|
||||||
|
.map(|(addr, assoc_id)| (Bytes::from(asm), addr, assoc_id)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,6 +143,14 @@ impl<'conn> Connection<'conn, side::Client> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn authenticate(&self, token: [u8; 8]) -> Result<(), Error> {
|
||||||
|
let mut send = self.conn.open_uni().await?;
|
||||||
|
let model = self.model.send_authenticate(token);
|
||||||
|
model.header().marshal(&mut send).await?;
|
||||||
|
send.close().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn connect(&self, addr: Address) -> Result<Connect, Error> {
|
pub async fn connect(&self, addr: Address) -> Result<Connect, Error> {
|
||||||
let (mut send, recv) = self.conn.open_bi().await?;
|
let (mut send, recv) = self.conn.open_bi().await?;
|
||||||
let model = self.model.send_connect(addr);
|
let model = self.model.send_connect(addr);
|
||||||
@ -115,7 +158,7 @@ impl<'conn> Connection<'conn, side::Client> {
|
|||||||
Ok(Connect::new(Side::Client(model), send, recv))
|
Ok(Connect::new(Side::Client(model), send, recv))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_uni_stream(&self, mut recv: RecvStream) -> Result<Task, Error> {
|
pub async fn accept_uni_stream(&self, mut recv: RecvStream) -> Result<Task, Error> {
|
||||||
match Header::unmarshal(&mut recv).await? {
|
match Header::unmarshal(&mut recv).await? {
|
||||||
Header::Authenticate(_) => Err(Error::BadCommand("authenticate")),
|
Header::Authenticate(_) => Err(Error::BadCommand("authenticate")),
|
||||||
Header::Connect(_) => Err(Error::BadCommand("connect")),
|
Header::Connect(_) => Err(Error::BadCommand("connect")),
|
||||||
@ -126,13 +169,33 @@ impl<'conn> Connection<'conn, side::Client> {
|
|||||||
))
|
))
|
||||||
}
|
}
|
||||||
Header::Dissociate(_) => Err(Error::BadCommand("dissociate")),
|
Header::Dissociate(_) => Err(Error::BadCommand("dissociate")),
|
||||||
Header::Heartbeat(hb) => {
|
Header::Heartbeat(_) => Err(Error::BadCommand("heartbeat")),
|
||||||
let _ = self.model.recv_heartbeat(hb);
|
|
||||||
Ok(Task::Heartbeat)
|
|
||||||
}
|
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn accept_bi_stream(
|
||||||
|
&self,
|
||||||
|
_send: SendStream,
|
||||||
|
mut recv: RecvStream,
|
||||||
|
) -> Result<Task, Error> {
|
||||||
|
match Header::unmarshal(&mut recv).await? {
|
||||||
|
Header::Authenticate(_) => Err(Error::BadCommand("authenticate")),
|
||||||
|
Header::Connect(_) => Err(Error::BadCommand("connect")),
|
||||||
|
Header::Packet(_) => Err(Error::BadCommand("packet")),
|
||||||
|
Header::Dissociate(_) => Err(Error::BadCommand("dissociate")),
|
||||||
|
Header::Heartbeat(_) => Err(Error::BadCommand("heartbeat")),
|
||||||
|
_ => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn heartbeat(&self) -> Result<(), Error> {
|
||||||
|
let model = self.model.send_heartbeat();
|
||||||
|
let mut buf = Vec::with_capacity(model.header().len());
|
||||||
|
model.header().marshal(&mut buf).await.unwrap();
|
||||||
|
self.conn.send_datagram(Bytes::from(buf))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'conn> Connection<'conn, side::Server> {
|
impl<'conn> Connection<'conn, side::Server> {
|
||||||
@ -144,7 +207,7 @@ impl<'conn> Connection<'conn, side::Server> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_uni_stream(&self, mut recv: RecvStream) -> Result<Task, Error> {
|
pub async fn accept_uni_stream(&self, mut recv: RecvStream) -> Result<Task, Error> {
|
||||||
match Header::unmarshal(&mut recv).await? {
|
match Header::unmarshal(&mut recv).await? {
|
||||||
Header::Authenticate(auth) => {
|
Header::Authenticate(auth) => {
|
||||||
let model = self.model.recv_authenticate(auth);
|
let model = self.model.recv_authenticate(auth);
|
||||||
@ -161,10 +224,25 @@ impl<'conn> Connection<'conn, side::Server> {
|
|||||||
let _ = self.model.recv_dissociate(dissoc);
|
let _ = self.model.recv_dissociate(dissoc);
|
||||||
Ok(Task::Dissociate)
|
Ok(Task::Dissociate)
|
||||||
}
|
}
|
||||||
Header::Heartbeat(hb) => {
|
Header::Heartbeat(_) => Err(Error::BadCommand("heartbeat")),
|
||||||
let _ = self.model.recv_heartbeat(hb);
|
_ => unreachable!(),
|
||||||
Ok(Task::Heartbeat)
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn accept_bi_stream(
|
||||||
|
&self,
|
||||||
|
send: SendStream,
|
||||||
|
mut recv: RecvStream,
|
||||||
|
) -> Result<Task, Error> {
|
||||||
|
match Header::unmarshal(&mut recv).await? {
|
||||||
|
Header::Authenticate(_) => Err(Error::BadCommand("authenticate")),
|
||||||
|
Header::Connect(conn) => {
|
||||||
|
let model = self.model.recv_connect(conn);
|
||||||
|
Ok(Task::Connect(Connect::new(Side::Server(model), send, recv)))
|
||||||
|
}
|
||||||
|
Header::Packet(_) => Err(Error::BadCommand("packet")),
|
||||||
|
Header::Dissociate(_) => Err(Error::BadCommand("dissociate")),
|
||||||
|
Header::Heartbeat(_) => Err(Error::BadCommand("heartbeat")),
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -228,7 +306,7 @@ impl AsyncWrite for Connect {
|
|||||||
pub enum Task {
|
pub enum Task {
|
||||||
Authenticate([u8; 8]),
|
Authenticate([u8; 8]),
|
||||||
Connect(Connect),
|
Connect(Connect),
|
||||||
Packet(Option<(Vec<u8>, Address, u16)>),
|
Packet(Option<(Bytes, Address, u16)>),
|
||||||
Dissociate,
|
Dissociate,
|
||||||
Heartbeat,
|
Heartbeat,
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user