1
0

assembling packet to existed buffer

This commit is contained in:
EAimTY 2023-01-25 20:08:05 +09:00
parent 000f38da15
commit a4d178c95e
2 changed files with 38 additions and 24 deletions

View File

@ -5,6 +5,7 @@ use crate::protocol::{
use parking_lot::Mutex; use parking_lot::Mutex;
use std::{ use std::{
collections::HashMap, collections::HashMap,
mem,
sync::{ sync::{
atomic::{AtomicU16, Ordering}, atomic::{AtomicU16, Ordering},
Arc, Weak, Arc, Weak,
@ -202,7 +203,7 @@ where
Dissociate::<side::Rx>::new(assoc_id) Dissociate::<side::Rx>::new(assoc_id)
} }
fn insert<A>( fn insert(
&mut self, &mut self,
assoc_id: u16, assoc_id: u16,
pkt_id: u16, pkt_id: u16,
@ -211,10 +212,7 @@ where
size: u16, size: u16,
addr: Address, addr: Address,
data: B, data: B,
) -> Result<Option<(A, Address)>, AssembleError> ) -> Result<Option<Assemblable<B>>, AssembleError> {
where
A: Assembled<B>,
{
self.sessions self.sessions
.entry(assoc_id) .entry(assoc_id)
.or_insert_with(|| UdpSession::new(self.task_associate_count.register())) .or_insert_with(|| UdpSession::new(self.task_associate_count.register()))
@ -273,7 +271,7 @@ where
Packet::<side::Rx, B>::new(sessions, assoc_id, pkt_id, frag_total, frag_id, size, addr) Packet::<side::Rx, B>::new(sessions, assoc_id, pkt_id, frag_total, frag_id, size, addr)
} }
fn insert<A>( fn insert(
&mut self, &mut self,
pkt_id: u16, pkt_id: u16,
frag_total: u8, frag_total: u8,
@ -281,10 +279,7 @@ where
size: u16, size: u16,
addr: Address, addr: Address,
data: B, data: B,
) -> Result<Option<(A, Address)>, AssembleError> ) -> Result<Option<Assemblable<B>>, AssembleError> {
where
A: Assembled<B>,
{
let res = self let res = self
.pkt_buf .pkt_buf
.entry(pkt_id) .entry(pkt_id)
@ -328,17 +323,14 @@ where
} }
} }
fn insert<A>( fn insert(
&mut self, &mut self,
frag_total: u8, frag_total: u8,
frag_id: u8, frag_id: u8,
size: u16, size: u16,
addr: Address, addr: Address,
data: B, data: B,
) -> Result<Option<(A, Address)>, AssembleError> ) -> Result<Option<Assemblable<B>>, AssembleError> {
where
A: Assembled<B>,
{
if data.as_ref().len() != size as usize { if data.as_ref().len() != size as usize {
return Err(AssembleError::InvalidFragmentSize); return Err(AssembleError::InvalidFragmentSize);
} }
@ -363,20 +355,45 @@ where
} }
if self.frag_received == self.frag_total { if self.frag_received == self.frag_total {
let iter = self.buf.iter_mut().map(|x| x.take().unwrap()); Ok(Some(Assemblable::new(
Ok(Some((A::assemble(iter)?, self.addr.take()))) mem::take(&mut self.buf),
self.addr.take(),
)))
} else { } else {
Ok(None) Ok(None)
} }
} }
} }
pub trait Assembled<B> pub struct Assemblable<B> {
buf: Vec<Option<B>>,
addr: Address,
}
impl<B> Assemblable<B>
where
B: AsRef<[u8]>,
{
fn new(buf: Vec<Option<B>>, addr: Address) -> Self {
Self { buf, addr }
}
pub fn assemble<A>(self, buf: &mut A) -> Address
where
A: Assembler<B>,
{
let data = self.buf.into_iter().map(|b| b.unwrap());
buf.assemble(data);
self.addr
}
}
pub trait Assembler<B>
where where
Self: Sized, Self: Sized,
B: AsRef<[u8]>, B: AsRef<[u8]>,
{ {
fn assemble(buf: impl IntoIterator<Item = B>) -> Result<Self, AssembleError>; fn assemble(&mut self, data: impl IntoIterator<Item = B>);
} }
#[derive(Debug, Error)] #[derive(Debug, Error)]

View File

@ -1,6 +1,6 @@
use super::{ use super::{
side::{self, Side}, side::{self, Side},
AssembleError, Assembled, UdpSessions, Assemblable, AssembleError, UdpSessions,
}; };
use crate::protocol::{Address, Header, Packet as PacketHeader}; use crate::protocol::{Address, Header, Packet as PacketHeader};
use parking_lot::Mutex; use parking_lot::Mutex;
@ -77,10 +77,7 @@ where
} }
} }
pub fn assemble<A>(self, data: B) -> Result<Option<(A, Address)>, AssembleError> pub fn assemble(self, data: B) -> Result<Option<Assemblable<B>>, AssembleError> {
where
A: Assembled<B>,
{
let Side::Rx(rx) = self.inner else { unreachable!() }; let Side::Rx(rx) = self.inner else { unreachable!() };
let mut sessions = rx.sessions.lock(); let mut sessions = rx.sessions.lock();