You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

150 lines
4.1 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package unpacker
import (
unpackerpkg "github.com/noahlann/nnet/pkg/unpacker"
)
// fixedLengthUnpacker 固定长度拆包器实现
type fixedLengthUnpacker struct {
length int
buffer []byte
maxBufferSize int
}
// NewFixedLengthUnpacker 创建固定长度拆包器
func NewFixedLengthUnpacker(length int) unpackerpkg.Unpacker {
return NewFixedLengthUnpackerWithMaxBuffer(length, unpackerpkg.DefaultMaxBufferSize)
}
// NewFixedLengthUnpackerWithMaxBuffer 创建固定长度拆包器指定最大buffer大小
func NewFixedLengthUnpackerWithMaxBuffer(length int, maxBufferSize int) unpackerpkg.Unpacker {
if length <= 0 {
length = 1024 // 默认长度
}
if maxBufferSize <= 0 {
maxBufferSize = unpackerpkg.DefaultMaxBufferSize
}
return &fixedLengthUnpacker{
length: length,
buffer: make([]byte, 0, length*2), // 预分配初始容量
maxBufferSize: maxBufferSize,
}
}
// Unpack 拆包
func (u *fixedLengthUnpacker) Unpack(data []byte) ([][]byte, []byte, int, error) {
// 记录调用前的buffer长度用于计算消耗的数据量
prevBufferLen := len(u.buffer)
// 检查buffer大小限制
newSize := prevBufferLen + len(data)
if newSize > u.maxBufferSize {
return nil, nil, 0, unpackerpkg.NewErrorf("unpacker buffer size exceeded: %d > %d", newSize, u.maxBufferSize)
}
// 优化:如果容量不足,预分配更大的容量(零拷贝优化)
if cap(u.buffer) < newSize {
newCap := cap(u.buffer) * 2
if newCap < newSize {
newCap = newSize
}
if newCap > u.maxBufferSize {
newCap = u.maxBufferSize
}
// 如果现有 buffer 为空,直接分配新 buffer避免不必要的复制
if len(u.buffer) == 0 {
u.buffer = make([]byte, 0, newCap)
} else {
newBuffer := make([]byte, len(u.buffer), newCap)
copy(newBuffer, u.buffer)
u.buffer = newBuffer
}
}
u.buffer = append(u.buffer, data...)
// 记录追加数据后的buffer长度用于计算consumed
afterAppendLen := len(u.buffer)
var messages [][]byte
for len(u.buffer) >= u.length {
message := make([]byte, u.length)
copy(message, u.buffer[:u.length])
messages = append(messages, message)
// 移除已处理的数据(优化:使用切片操作,避免复制)
u.buffer = u.buffer[u.length:]
// 如果 buffer 太大但剩余数据很少,压缩 buffer减少内存占用
// 注意压缩不会改变buffer的长度只改变容量
if len(u.buffer) < cap(u.buffer)/4 && cap(u.buffer) > 4096 {
compressed := make([]byte, len(u.buffer), cap(u.buffer)/2)
copy(compressed, u.buffer)
u.buffer = compressed
}
}
// 计算本次从输入data中消耗的数据量100%准确,无误差)
// 使用与delimiterUnpacker相同的逻辑
currentBufferLen := len(u.buffer)
processedTotal := afterAppendLen - currentBufferLen
bufferIncrease := currentBufferLen - prevBufferLen
var consumed int
if len(messages) == 0 {
// 没有完整消息所有数据都被保存到buffer
consumed = len(data)
} else if processedTotal <= prevBufferLen {
// 所有被处理的数据都来自之前的buffer
if bufferIncrease > 0 {
consumed = bufferIncrease
if consumed > len(data) {
consumed = len(data)
}
} else {
consumed = 0
}
} else {
// 至少部分被处理的数据来自输入data
consumedFromData := processedTotal - prevBufferLen
remainingFromData := len(data) - consumedFromData
if bufferIncrease <= remainingFromData {
consumed = len(data)
} else if bufferIncrease > 0 {
consumed = consumedFromData + remainingFromData
} else {
consumed = consumedFromData
if consumed > len(data) {
consumed = len(data)
}
}
}
// 确保consumed在合理范围内
if consumed < 0 {
consumed = 0
} else if consumed > len(data) {
consumed = len(data)
}
return messages, u.buffer, consumed, nil
}
// Pack 打包
func (u *fixedLengthUnpacker) Pack(data []byte) ([]byte, error) {
if len(data) != u.length {
// 如果数据长度不匹配,需要填充或截断
if len(data) < u.length {
// 填充
result := make([]byte, u.length)
copy(result, data)
return result, nil
} else {
// 截断
return data[:u.length], nil
}
}
return data, nil
}