You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
6.1 KiB
6.1 KiB
NNet协议粘包拆包问题分析与解决方案
问题分析
当前nnet协议的问题
nnet协议的Decode方法目前存在以下问题:
- 假设数据完整:
Decode方法假设传入的数据已经是一个完整的包 - 无法处理粘包:如果一次接收到多个包,无法正确拆分
- 无法处理半包:如果数据不完整,会直接返回错误,无法等待更多数据
// 当前实现的问题
func (p *NNetProtocol) Decode(data []byte) (protocolpkg.FrameHeader, []byte, error) {
// 检查最小长度
if len(data) < 11 {
return nil, nil, fmt.Errorf("invalid packet length: %d", len(data))
}
// ... 如果数据不完整,直接返回错误
}
NNet协议格式
[Magic(4 bytes)][Version(1 byte)][Length(4 bytes)][Data(N bytes)][Checksum(2 bytes)]
- Magic: 4字节,固定为"NNET"
- Version: 1字节
- Length: 4字节,大端序,表示Data部分的长度
- Data: N字节,实际数据
- Checksum: 2字节,大端序
总长度 = 5(Magic+Version) + 4(Length字段) + Length(数据长度) + 2(Checksum) = 11 + Length
解决方案:使用内置Unpacker
1. 使用LengthFieldUnpacker(推荐)
nnet协议有长度字段,最适合使用LengthFieldUnpacker:
// nnet协议的长度字段配置
unpackerConfig := unpacker.LengthFieldUnpacker{
LengthFieldOffset: 5, // Magic(4) + Version(1) = 5
LengthFieldLength: 4, // Length字段是4字节
LengthAdjustment: 2, // 需要加上Checksum(2字节)
InitialBytesToStrip: 0, // 不跳过任何字节,保留完整包
}
// 创建拆包器
nnetUnpacker := unpacker.NewLengthFieldUnpacker(unpackerConfig)
工作原理:
- 从偏移5的位置读取4字节的长度字段
- 实际包长度 = 长度字段值 + 2(Checksum)
- 总长度 = 5(Magic+Version) + 4(Length字段) + Length(数据) + 2(Checksum)
2. 使用FrameHeaderUnpacker
也可以使用FrameHeaderUnpacker,通过自定义函数从帧头获取长度:
// nnet协议的帧头拆包器配置
unpackerConfig := unpacker.FrameHeaderUnpacker{
HeaderLength: 9, // Magic(4) + Version(1) + Length(4) = 9
GetLength: func(header []byte) int {
// 从header中提取Length字段(偏移5-8)
if len(header) < 9 {
return 0
}
length := binary.BigEndian.Uint32(header[5:9])
// 总长度 = 9(帧头) + length(数据) + 2(Checksum)
return int(length) + 2
},
}
// 创建拆包器
nnetUnpacker := unpacker.NewFrameHeaderUnpacker(unpackerConfig)
集成到服务器
当前服务器代码的问题
当前OnTraffic方法直接调用协议的Decode方法,无法处理粘包拆包:
// 当前实现(有问题)
data, _ := c.Peek(-1) // 读取所有数据
if protocol != nil {
parseProtocolHeader(ctx.Request(), data, protocol) // 直接解码,无法处理粘包
}
正确的实现方式
需要在协议解码之前先使用unpacker进行拆包:
// 正确的实现
// 1. 获取或创建连接的unpacker(每个连接一个,保持状态)
unpacker := getOrCreateUnpacker(connID, protocol)
// 2. 读取新数据
newData, _ := c.Peek(-1)
// 3. 使用unpacker拆包(处理粘包拆包)
messages, remaining, err := unpacker.Unpack(newData)
if err != nil {
// 处理错误
}
// 4. 对每个完整的消息进行协议解码
for _, message := range messages {
// 使用协议的Decode方法解码单个完整消息
header, body, err := protocol.Decode(message)
// ... 处理
}
// 5. 保存剩余数据到连接的缓冲区
saveRemainingData(connID, remaining)
// 6. 丢弃已处理的数据
c.Discard(processedBytes)
内置Unpacker的使用
1. FixedLengthUnpacker(固定长度)
适用于固定长度的协议:
unpacker := unpacker.NewFixedLengthUnpacker(1024) // 每个包固定1024字节
2. LengthFieldUnpacker(长度字段)
适用于有长度字段的协议(如nnet):
config := unpacker.LengthFieldUnpacker{
LengthFieldOffset: 5,
LengthFieldLength: 4,
LengthAdjustment: 2,
InitialBytesToStrip: 0,
}
unpacker := unpacker.NewLengthFieldUnpacker(config)
3. DelimiterUnpacker(分隔符)
适用于用分隔符分割的协议:
unpacker := unpacker.NewDelimiterUnpacker([]byte{'\n'}) // 使用换行符分隔
4. FrameHeaderUnpacker(帧头)
适用于有帧头的协议,可以自定义长度提取逻辑:
config := unpacker.FrameHeaderUnpacker{
HeaderLength: 9,
GetLength: func(header []byte) int {
// 自定义长度提取逻辑
return int(binary.BigEndian.Uint32(header[5:9])) + 2
},
}
unpacker := unpacker.NewFrameHeaderUnpacker(config)
建议的改进方案
方案1:在协议层面集成Unpacker
为每个协议配置Unpacker,协议管理器负责管理:
type Protocol interface {
// ... 现有方法
// Unpacker 获取协议的拆包器
Unpacker() unpacker.Unpacker
}
方案2:在连接层面管理Unpacker
每个连接维护自己的Unpacker实例和缓冲区:
type Connection struct {
// ... 现有字段
unpacker unpacker.Unpacker
buffer []byte // 累积不完整的数据
}
方案3:在服务器层面统一处理
在OnTraffic中统一使用Unpacker处理所有数据:
func (h *eventHandler) OnTraffic(c gnet.Conn) {
// 1. 获取连接的unpacker
unpacker := h.getUnpacker(connID, protocol)
// 2. 读取新数据
newData, _ := c.Peek(-1)
// 3. 拆包
messages, remaining, _ := unpacker.Unpack(newData)
// 4. 处理每个完整消息
for _, msg := range messages {
// 协议解码
// 路由匹配
// 处理请求
}
}
总结
- nnet协议当前无法正确处理粘包拆包,需要集成Unpacker
- 推荐使用LengthFieldUnpacker,因为nnet协议有长度字段
- 内置的4种Unpacker都可以使用,根据协议特点选择
- 需要在服务器层面集成Unpacker,在协议解码之前先拆包