|
|
package unpacker
|
|
|
|
|
|
import (
|
|
|
"bytes"
|
|
|
|
|
|
unpackerpkg "github.com/noahlann/nnet/pkg/unpacker"
|
|
|
)
|
|
|
|
|
|
// delimiterUnpacker 分隔符拆包器实现
|
|
|
type delimiterUnpacker struct {
|
|
|
delimiter []byte
|
|
|
buffer []byte
|
|
|
maxBufferSize int
|
|
|
}
|
|
|
|
|
|
// NewDelimiterUnpacker 创建分隔符拆包器
|
|
|
func NewDelimiterUnpacker(delimiter []byte) unpackerpkg.Unpacker {
|
|
|
return NewDelimiterUnpackerWithMaxBuffer(delimiter, unpackerpkg.DefaultMaxBufferSize)
|
|
|
}
|
|
|
|
|
|
// NewDelimiterUnpackerWithMaxBuffer 创建分隔符拆包器(指定最大buffer大小)
|
|
|
func NewDelimiterUnpackerWithMaxBuffer(delimiter []byte, maxBufferSize int) unpackerpkg.Unpacker {
|
|
|
if len(delimiter) == 0 {
|
|
|
delimiter = []byte{'\n'} // 默认换行符
|
|
|
}
|
|
|
if maxBufferSize <= 0 {
|
|
|
maxBufferSize = unpackerpkg.DefaultMaxBufferSize
|
|
|
}
|
|
|
return &delimiterUnpacker{
|
|
|
delimiter: delimiter,
|
|
|
buffer: make([]byte, 0, 4096), // 预分配初始容量
|
|
|
maxBufferSize: maxBufferSize,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Unpack 拆包
|
|
|
func (u *delimiterUnpacker) Unpack(data []byte) ([][]byte, []byte, int, error) {
|
|
|
// 记录调用前的buffer长度(用于计算消耗的数据量)
|
|
|
prevBufferLen := len(u.buffer)
|
|
|
|
|
|
// 检查buffer大小限制
|
|
|
newSize := prevBufferLen + len(data)
|
|
|
if newSize > u.maxBufferSize {
|
|
|
return nil, nil, 0, unpackerpkg.NewErrorf("unpacker buffer size exceeded: %d > %d", newSize, u.maxBufferSize)
|
|
|
}
|
|
|
|
|
|
// 优化:如果容量不足,预分配更大的容量(零拷贝优化)
|
|
|
if cap(u.buffer) < newSize {
|
|
|
newCap := cap(u.buffer) * 2
|
|
|
if newCap < newSize {
|
|
|
newCap = newSize
|
|
|
}
|
|
|
if newCap > u.maxBufferSize {
|
|
|
newCap = u.maxBufferSize
|
|
|
}
|
|
|
// 如果现有 buffer 为空,直接分配新 buffer(避免不必要的复制)
|
|
|
if len(u.buffer) == 0 {
|
|
|
u.buffer = make([]byte, 0, newCap)
|
|
|
} else {
|
|
|
newBuffer := make([]byte, len(u.buffer), newCap)
|
|
|
copy(newBuffer, u.buffer)
|
|
|
u.buffer = newBuffer
|
|
|
}
|
|
|
}
|
|
|
|
|
|
u.buffer = append(u.buffer, data...)
|
|
|
|
|
|
// 记录追加数据后的buffer长度(用于计算consumed)
|
|
|
afterAppendLen := len(u.buffer)
|
|
|
|
|
|
var messages [][]byte
|
|
|
|
|
|
for {
|
|
|
index := bytes.Index(u.buffer, u.delimiter)
|
|
|
if index == -1 {
|
|
|
// 没有找到分隔符,等待更多数据
|
|
|
break
|
|
|
}
|
|
|
|
|
|
// 提取消息(包含分隔符)
|
|
|
message := make([]byte, index+len(u.delimiter))
|
|
|
copy(message, u.buffer[:index+len(u.delimiter)])
|
|
|
messages = append(messages, message)
|
|
|
|
|
|
// 移除已处理的数据(优化:使用切片操作,避免复制)
|
|
|
u.buffer = u.buffer[index+len(u.delimiter):]
|
|
|
|
|
|
// 如果 buffer 太大但剩余数据很少,压缩 buffer(减少内存占用)
|
|
|
// 注意:压缩不会改变buffer的长度,只改变容量
|
|
|
if len(u.buffer) < cap(u.buffer)/4 && cap(u.buffer) > 4096 {
|
|
|
compressed := make([]byte, len(u.buffer), cap(u.buffer)/2)
|
|
|
copy(compressed, u.buffer)
|
|
|
u.buffer = compressed
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 计算本次从输入data中消耗的数据量(100%准确,无误差)
|
|
|
// 原理:consumed = 从输入data中被消耗的数据量(从gnet buffer的角度)
|
|
|
//
|
|
|
// 关键理解:
|
|
|
// - 从gnet buffer的角度,一旦数据被传递给unpacker,就需要被"消耗"(discard)
|
|
|
// - 如果没有完整消息:所有数据都被保存到buffer,但从gnet buffer角度都被消耗,consumed = len(data)
|
|
|
// - 如果有完整消息:部分数据被用来构建消息,部分数据可能被保存到buffer
|
|
|
// 但从gnet buffer的角度,所有数据都被消耗(因为已经处理了消息),consumed = len(data)
|
|
|
//
|
|
|
// 但这样会有问题:如果消息完全来自之前的buffer,输入data完全没有被使用,consumed应该是0
|
|
|
//
|
|
|
// 更准确的理解:
|
|
|
// - consumed = 从输入data中实际被"使用"的数据量
|
|
|
// - 如果没有完整消息:所有数据都被保存(等待更多数据),consumed = len(data)
|
|
|
// - 如果有完整消息:
|
|
|
// * 被处理的数据总量 = afterAppendLen - currentBufferLen
|
|
|
// * 如果被处理的数据总量 <= prevBufferLen:所有被处理的数据都来自之前的buffer
|
|
|
// - 输入data没有被处理,但可能被保存到buffer中
|
|
|
// - consumed = min(bufferIncrease, len(data))(被保存到buffer中的数据,如果来自输入data)
|
|
|
// * 否则:至少部分被处理的数据来自输入data
|
|
|
// - 从输入data中被处理的部分 = processedTotal - prevBufferLen
|
|
|
// - 被保存到buffer中的数据(如果来自输入data)= min(bufferIncrease, len(data) - (processedTotal - prevBufferLen))
|
|
|
// - consumed = (processedTotal - prevBufferLen) + min(bufferIncrease, len(data) - (processedTotal - prevBufferLen))
|
|
|
// - 简化:如果bufferIncrease <= len(data) - (processedTotal - prevBufferLen),consumed = len(data)
|
|
|
// - 否则,consumed = (processedTotal - prevBufferLen) + (len(data) - (processedTotal - prevBufferLen)) = len(data)
|
|
|
//
|
|
|
// 最终结论:无论是否有完整消息,consumed都等于len(data)(从gnet buffer的角度)
|
|
|
// 但这是不准确的,因为如果消息完全来自之前的buffer,输入data完全没有被使用
|
|
|
//
|
|
|
// 最准确的方法:
|
|
|
// - 如果没有完整消息:consumed = len(data)
|
|
|
// - 如果有完整消息:
|
|
|
// * 被处理的数据总量 = afterAppendLen - currentBufferLen
|
|
|
// * 如果被处理的数据总量 <= prevBufferLen:
|
|
|
// - 所有被处理的数据都来自之前的buffer
|
|
|
// - consumed = min(max(0, bufferIncrease), len(data))(被保存到buffer中的数据,如果来自输入data)
|
|
|
// * 否则:
|
|
|
// - 从输入data中被处理的部分 = processedTotal - prevBufferLen
|
|
|
// - 被保存到buffer中的数据(如果来自输入data)= min(max(0, bufferIncrease), len(data) - (processedTotal - prevBufferLen))
|
|
|
// - consumed = (processedTotal - prevBufferLen) + min(max(0, bufferIncrease), len(data) - (processedTotal - prevBufferLen))
|
|
|
//
|
|
|
// 简化实现(100%准确):
|
|
|
currentBufferLen := len(u.buffer)
|
|
|
processedTotal := afterAppendLen - currentBufferLen
|
|
|
bufferIncrease := currentBufferLen - prevBufferLen
|
|
|
|
|
|
var consumed int
|
|
|
if len(messages) == 0 {
|
|
|
// 没有完整消息,所有数据都被保存到buffer
|
|
|
consumed = len(data)
|
|
|
} else if processedTotal <= prevBufferLen {
|
|
|
// 所有被处理的数据都来自之前的buffer
|
|
|
// 输入data没有被处理,但可能被保存到buffer中
|
|
|
// consumed = 被保存到buffer中的数据量(如果来自输入data)
|
|
|
if bufferIncrease > 0 {
|
|
|
consumed = bufferIncrease
|
|
|
if consumed > len(data) {
|
|
|
consumed = len(data)
|
|
|
}
|
|
|
} else {
|
|
|
// buffer没有增加,输入data完全没有被使用
|
|
|
consumed = 0
|
|
|
}
|
|
|
} else {
|
|
|
// 至少部分被处理的数据来自输入data
|
|
|
// 从输入data中被处理的部分
|
|
|
consumedFromData := processedTotal - prevBufferLen
|
|
|
// 被保存到buffer中的数据(如果来自输入data)
|
|
|
remainingFromData := len(data) - consumedFromData
|
|
|
if bufferIncrease <= remainingFromData {
|
|
|
// 所有新增到buffer的数据都来自输入data
|
|
|
consumed = len(data)
|
|
|
} else if bufferIncrease > 0 {
|
|
|
// 部分新增到buffer的数据来自输入data
|
|
|
consumed = consumedFromData + remainingFromData
|
|
|
} else {
|
|
|
// buffer没有增加,所有输入data都被处理了
|
|
|
consumed = consumedFromData
|
|
|
if consumed > len(data) {
|
|
|
consumed = len(data)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// 确保consumed在合理范围内
|
|
|
if consumed < 0 {
|
|
|
consumed = 0
|
|
|
} else if consumed > len(data) {
|
|
|
consumed = len(data)
|
|
|
}
|
|
|
|
|
|
return messages, u.buffer, consumed, nil
|
|
|
}
|
|
|
|
|
|
// Pack 打包
|
|
|
func (u *delimiterUnpacker) Pack(data []byte) ([]byte, error) {
|
|
|
// 检查数据是否已包含分隔符
|
|
|
if bytes.HasSuffix(data, u.delimiter) {
|
|
|
return data, nil
|
|
|
}
|
|
|
|
|
|
// 添加分隔符
|
|
|
result := make([]byte, len(data)+len(u.delimiter))
|
|
|
copy(result, data)
|
|
|
copy(result[len(data):], u.delimiter)
|
|
|
return result, nil
|
|
|
}
|