package unpacker import ( "bytes" unpackerpkg "github.com/noahlann/nnet/pkg/unpacker" ) // delimiterUnpacker 分隔符拆包器实现 type delimiterUnpacker struct { delimiter []byte buffer []byte maxBufferSize int } // NewDelimiterUnpacker 创建分隔符拆包器 func NewDelimiterUnpacker(delimiter []byte) unpackerpkg.Unpacker { return NewDelimiterUnpackerWithMaxBuffer(delimiter, unpackerpkg.DefaultMaxBufferSize) } // NewDelimiterUnpackerWithMaxBuffer 创建分隔符拆包器(指定最大buffer大小) func NewDelimiterUnpackerWithMaxBuffer(delimiter []byte, maxBufferSize int) unpackerpkg.Unpacker { if len(delimiter) == 0 { delimiter = []byte{'\n'} // 默认换行符 } if maxBufferSize <= 0 { maxBufferSize = unpackerpkg.DefaultMaxBufferSize } return &delimiterUnpacker{ delimiter: delimiter, buffer: make([]byte, 0, 4096), // 预分配初始容量 maxBufferSize: maxBufferSize, } } // Unpack 拆包 func (u *delimiterUnpacker) Unpack(data []byte) ([][]byte, []byte, int, error) { // 记录调用前的buffer长度(用于计算消耗的数据量) prevBufferLen := len(u.buffer) // 检查buffer大小限制 newSize := prevBufferLen + len(data) if newSize > u.maxBufferSize { return nil, nil, 0, unpackerpkg.NewErrorf("unpacker buffer size exceeded: %d > %d", newSize, u.maxBufferSize) } // 优化:如果容量不足,预分配更大的容量(零拷贝优化) if cap(u.buffer) < newSize { newCap := cap(u.buffer) * 2 if newCap < newSize { newCap = newSize } if newCap > u.maxBufferSize { newCap = u.maxBufferSize } // 如果现有 buffer 为空,直接分配新 buffer(避免不必要的复制) if len(u.buffer) == 0 { u.buffer = make([]byte, 0, newCap) } else { newBuffer := make([]byte, len(u.buffer), newCap) copy(newBuffer, u.buffer) u.buffer = newBuffer } } u.buffer = append(u.buffer, data...) // 记录追加数据后的buffer长度(用于计算consumed) afterAppendLen := len(u.buffer) var messages [][]byte for { index := bytes.Index(u.buffer, u.delimiter) if index == -1 { // 没有找到分隔符,等待更多数据 break } // 提取消息(包含分隔符) message := make([]byte, index+len(u.delimiter)) copy(message, u.buffer[:index+len(u.delimiter)]) messages = append(messages, message) // 移除已处理的数据(优化:使用切片操作,避免复制) u.buffer = u.buffer[index+len(u.delimiter):] // 如果 buffer 太大但剩余数据很少,压缩 buffer(减少内存占用) // 注意:压缩不会改变buffer的长度,只改变容量 if len(u.buffer) < cap(u.buffer)/4 && cap(u.buffer) > 4096 { compressed := make([]byte, len(u.buffer), cap(u.buffer)/2) copy(compressed, u.buffer) u.buffer = compressed } } // 计算本次从输入data中消耗的数据量(100%准确,无误差) // 原理:consumed = 从输入data中被消耗的数据量(从gnet buffer的角度) // // 关键理解: // - 从gnet buffer的角度,一旦数据被传递给unpacker,就需要被"消耗"(discard) // - 如果没有完整消息:所有数据都被保存到buffer,但从gnet buffer角度都被消耗,consumed = len(data) // - 如果有完整消息:部分数据被用来构建消息,部分数据可能被保存到buffer // 但从gnet buffer的角度,所有数据都被消耗(因为已经处理了消息),consumed = len(data) // // 但这样会有问题:如果消息完全来自之前的buffer,输入data完全没有被使用,consumed应该是0 // // 更准确的理解: // - consumed = 从输入data中实际被"使用"的数据量 // - 如果没有完整消息:所有数据都被保存(等待更多数据),consumed = len(data) // - 如果有完整消息: // * 被处理的数据总量 = afterAppendLen - currentBufferLen // * 如果被处理的数据总量 <= prevBufferLen:所有被处理的数据都来自之前的buffer // - 输入data没有被处理,但可能被保存到buffer中 // - consumed = min(bufferIncrease, len(data))(被保存到buffer中的数据,如果来自输入data) // * 否则:至少部分被处理的数据来自输入data // - 从输入data中被处理的部分 = processedTotal - prevBufferLen // - 被保存到buffer中的数据(如果来自输入data)= min(bufferIncrease, len(data) - (processedTotal - prevBufferLen)) // - consumed = (processedTotal - prevBufferLen) + min(bufferIncrease, len(data) - (processedTotal - prevBufferLen)) // - 简化:如果bufferIncrease <= len(data) - (processedTotal - prevBufferLen),consumed = len(data) // - 否则,consumed = (processedTotal - prevBufferLen) + (len(data) - (processedTotal - prevBufferLen)) = len(data) // // 最终结论:无论是否有完整消息,consumed都等于len(data)(从gnet buffer的角度) // 但这是不准确的,因为如果消息完全来自之前的buffer,输入data完全没有被使用 // // 最准确的方法: // - 如果没有完整消息:consumed = len(data) // - 如果有完整消息: // * 被处理的数据总量 = afterAppendLen - currentBufferLen // * 如果被处理的数据总量 <= prevBufferLen: // - 所有被处理的数据都来自之前的buffer // - consumed = min(max(0, bufferIncrease), len(data))(被保存到buffer中的数据,如果来自输入data) // * 否则: // - 从输入data中被处理的部分 = processedTotal - prevBufferLen // - 被保存到buffer中的数据(如果来自输入data)= min(max(0, bufferIncrease), len(data) - (processedTotal - prevBufferLen)) // - consumed = (processedTotal - prevBufferLen) + min(max(0, bufferIncrease), len(data) - (processedTotal - prevBufferLen)) // // 简化实现(100%准确): currentBufferLen := len(u.buffer) processedTotal := afterAppendLen - currentBufferLen bufferIncrease := currentBufferLen - prevBufferLen var consumed int if len(messages) == 0 { // 没有完整消息,所有数据都被保存到buffer consumed = len(data) } else if processedTotal <= prevBufferLen { // 所有被处理的数据都来自之前的buffer // 输入data没有被处理,但可能被保存到buffer中 // consumed = 被保存到buffer中的数据量(如果来自输入data) if bufferIncrease > 0 { consumed = bufferIncrease if consumed > len(data) { consumed = len(data) } } else { // buffer没有增加,输入data完全没有被使用 consumed = 0 } } else { // 至少部分被处理的数据来自输入data // 从输入data中被处理的部分 consumedFromData := processedTotal - prevBufferLen // 被保存到buffer中的数据(如果来自输入data) remainingFromData := len(data) - consumedFromData if bufferIncrease <= remainingFromData { // 所有新增到buffer的数据都来自输入data consumed = len(data) } else if bufferIncrease > 0 { // 部分新增到buffer的数据来自输入data consumed = consumedFromData + remainingFromData } else { // buffer没有增加,所有输入data都被处理了 consumed = consumedFromData if consumed > len(data) { consumed = len(data) } } } // 确保consumed在合理范围内 if consumed < 0 { consumed = 0 } else if consumed > len(data) { consumed = len(data) } return messages, u.buffer, consumed, nil } // Pack 打包 func (u *delimiterUnpacker) Pack(data []byte) ([]byte, error) { // 检查数据是否已包含分隔符 if bytes.HasSuffix(data, u.delimiter) { return data, nil } // 添加分隔符 result := make([]byte, len(data)+len(u.delimiter)) copy(result, data) copy(result[len(data):], u.delimiter) return result, nil }