1
0

implement blocking (un)marshal for protocol

This commit is contained in:
EAimTY 2023-01-27 13:22:45 +09:00
parent 6229a08e61
commit 8b985b1586
4 changed files with 142 additions and 6 deletions

View File

@ -5,6 +5,7 @@ edition = "2021"
[features]
async_marshal = ["bytes", "futures-util"]
marshal = ["bytes"]
model = ["parking_lot", "thiserror"]
[dependencies]
@ -14,4 +15,4 @@ parking_lot = { version = "0.12.1", default-features = false, optional = true }
thiserror = { version = "1.0.38", default-features = false, optional = true }
[dev-dependencies]
tuic = { path = ".", features = ["async_marshal", "model"] }
tuic = { path = ".", features = ["async_marshal", "marshal", "model"] }

View File

@ -6,13 +6,13 @@ pub use self::protocol::{
Address, Authenticate, Connect, Dissociate, Header, Heartbeat, Packet, VERSION,
};
#[cfg(feature = "async_marshal")]
#[cfg(any(feature = "async_marshal", feature = "marshal"))]
mod marshal;
#[cfg(feature = "async_marshal")]
#[cfg(any(feature = "async_marshal", feature = "marshal"))]
mod unmarshal;
#[cfg(feature = "async_marshal")]
#[cfg(any(feature = "async_marshal", feature = "marshal"))]
pub use self::unmarshal::UnmarshalError;
#[cfg(feature = "model")]

View File

@ -3,15 +3,26 @@ use crate::protocol::{
};
use bytes::BufMut;
use futures_util::{AsyncWrite, AsyncWriteExt};
use std::{io::Error as IoError, net::SocketAddr};
use std::{
io::{Error as IoError, Write},
net::SocketAddr,
};
impl Header {
#[cfg(feature = "async_marshal")]
pub async fn async_marshal(&self, s: &mut (impl AsyncWrite + Unpin)) -> Result<(), IoError> {
let mut buf = vec![0; self.len()];
self.write(&mut buf);
s.write_all(&buf).await
}
#[cfg(feature = "marshal")]
pub fn marshal(&self, s: &mut impl Write) -> Result<(), IoError> {
let mut buf = vec![0; self.len()];
self.write(&mut buf);
s.write_all(&buf)
}
pub fn write(&self, buf: &mut impl BufMut) {
buf.put_u8(VERSION);
buf.put_u8(self.type_code());

View File

@ -2,10 +2,15 @@ use crate::protocol::{
Address, Authenticate, Connect, Dissociate, Header, Heartbeat, Packet, VERSION,
};
use futures_util::{AsyncRead, AsyncReadExt};
use std::{io::Error as IoError, net::SocketAddr, string::FromUtf8Error};
use std::{
io::{Error as IoError, Read},
net::SocketAddr,
string::FromUtf8Error,
};
use thiserror::Error;
impl Header {
#[cfg(feature = "async_marshal")]
pub async fn async_unmarshal(s: &mut (impl AsyncRead + Unpin)) -> Result<Self, UnmarshalError> {
let mut buf = [0; 1];
s.read_exact(&mut buf).await?;
@ -30,9 +35,34 @@ impl Header {
_ => Err(UnmarshalError::InvalidCommand(cmd)),
}
}
#[cfg(feature = "marshal")]
pub fn unmarshal(s: &mut impl Read) -> Result<Self, UnmarshalError> {
let mut buf = [0; 1];
s.read_exact(&mut buf)?;
let ver = buf[0];
if ver != VERSION {
return Err(UnmarshalError::InvalidVersion(ver));
}
let mut buf = [0; 1];
s.read_exact(&mut buf)?;
let cmd = buf[0];
match cmd {
Header::TYPE_CODE_AUTHENTICATE => Authenticate::read(s).map(Self::Authenticate),
Header::TYPE_CODE_CONNECT => Connect::read(s).map(Self::Connect),
Header::TYPE_CODE_PACKET => Packet::read(s).map(Self::Packet),
Header::TYPE_CODE_DISSOCIATE => Dissociate::read(s).map(Self::Dissociate),
Header::TYPE_CODE_HEARTBEAT => Heartbeat::read(s).map(Self::Heartbeat),
_ => Err(UnmarshalError::InvalidCommand(cmd)),
}
}
}
impl Address {
#[cfg(feature = "async_marshal")]
async fn async_read(s: &mut (impl AsyncRead + Unpin)) -> Result<Self, UnmarshalError> {
let mut buf = [0; 1];
s.read_exact(&mut buf).await?;
@ -80,23 +110,87 @@ impl Address {
_ => Err(UnmarshalError::InvalidAddressType(type_code)),
}
}
#[cfg(feature = "marshal")]
fn read(s: &mut impl Read) -> Result<Self, UnmarshalError> {
let mut buf = [0; 1];
s.read_exact(&mut buf)?;
let type_code = buf[0];
match type_code {
Address::TYPE_CODE_NONE => Ok(Self::None),
Address::TYPE_CODE_DOMAIN => {
let mut buf = [0; 1];
s.read_exact(&mut buf)?;
let len = buf[0] as usize;
let mut buf = vec![0; len + 2];
s.read_exact(&mut buf)?;
let port = u16::from_be_bytes([buf[len], buf[len + 1]]);
buf.truncate(len);
let domain = String::from_utf8(buf)?;
Ok(Self::DomainAddress(domain, port))
}
Address::TYPE_CODE_IPV4 => {
let mut buf = [0; 6];
s.read_exact(&mut buf)?;
let ip = [buf[0], buf[1], buf[2], buf[3]];
let port = u16::from_be_bytes([buf[4], buf[5]]);
Ok(Self::SocketAddress(SocketAddr::from((ip, port))))
}
Address::TYPE_CODE_IPV6 => {
let mut buf = [0; 18];
s.read_exact(&mut buf)?;
let ip = [
u16::from_be_bytes([buf[0], buf[1]]),
u16::from_be_bytes([buf[2], buf[3]]),
u16::from_be_bytes([buf[4], buf[5]]),
u16::from_be_bytes([buf[6], buf[7]]),
u16::from_be_bytes([buf[8], buf[9]]),
u16::from_be_bytes([buf[10], buf[11]]),
u16::from_be_bytes([buf[12], buf[13]]),
u16::from_be_bytes([buf[14], buf[15]]),
];
let port = u16::from_be_bytes([buf[16], buf[17]]);
Ok(Self::SocketAddress(SocketAddr::from((ip, port))))
}
_ => Err(UnmarshalError::InvalidAddressType(type_code)),
}
}
}
impl Authenticate {
#[cfg(feature = "async_marshal")]
async fn async_read(s: &mut (impl AsyncRead + Unpin)) -> Result<Self, UnmarshalError> {
let mut buf = [0; 8];
s.read_exact(&mut buf).await?;
Ok(Self::new(buf))
}
#[cfg(feature = "marshal")]
fn read(s: &mut impl Read) -> Result<Self, UnmarshalError> {
let mut buf = [0; 8];
s.read_exact(&mut buf)?;
Ok(Self::new(buf))
}
}
impl Connect {
#[cfg(feature = "async_marshal")]
async fn async_read(s: &mut (impl AsyncRead + Unpin)) -> Result<Self, UnmarshalError> {
Ok(Self::new(Address::async_read(s).await?))
}
#[cfg(feature = "marshal")]
fn read(s: &mut impl Read) -> Result<Self, UnmarshalError> {
Ok(Self::new(Address::read(s)?))
}
}
impl Packet {
#[cfg(feature = "async_marshal")]
async fn async_read(s: &mut (impl AsyncRead + Unpin)) -> Result<Self, UnmarshalError> {
let mut buf = [0; 8];
s.read_exact(&mut buf).await?;
@ -110,21 +204,51 @@ impl Packet {
Ok(Self::new(assoc_id, pkt_id, frag_total, frag_id, size, addr))
}
#[cfg(feature = "marshal")]
fn read(s: &mut impl Read) -> Result<Self, UnmarshalError> {
let mut buf = [0; 8];
s.read_exact(&mut buf)?;
let assoc_id = u16::from_be_bytes([buf[0], buf[1]]);
let pkt_id = u16::from_be_bytes([buf[2], buf[3]]);
let frag_total = buf[4];
let frag_id = buf[5];
let size = u16::from_be_bytes([buf[6], buf[7]]);
let addr = Address::read(s)?;
Ok(Self::new(assoc_id, pkt_id, frag_total, frag_id, size, addr))
}
}
impl Dissociate {
#[cfg(feature = "async_marshal")]
async fn async_read(s: &mut (impl AsyncRead + Unpin)) -> Result<Self, UnmarshalError> {
let mut buf = [0; 2];
s.read_exact(&mut buf).await?;
let assoc_id = u16::from_be_bytes(buf);
Ok(Self::new(assoc_id))
}
#[cfg(feature = "marshal")]
fn read(s: &mut impl Read) -> Result<Self, UnmarshalError> {
let mut buf = [0; 2];
s.read_exact(&mut buf)?;
let assoc_id = u16::from_be_bytes(buf);
Ok(Self::new(assoc_id))
}
}
impl Heartbeat {
#[cfg(feature = "async_marshal")]
async fn async_read(_s: &mut (impl AsyncRead + Unpin)) -> Result<Self, UnmarshalError> {
Ok(Self::new())
}
#[cfg(feature = "marshal")]
fn read(_s: &mut impl Read) -> Result<Self, UnmarshalError> {
Ok(Self::new())
}
}
#[derive(Debug, Error)]