|
|
package connection
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
"github.com/panjf2000/gnet/v2"
|
|
|
"github.com/rs/xid"
|
|
|
)
|
|
|
|
|
|
// writeBufferPool 写入缓冲区池(用于零拷贝优化)
|
|
|
var writeBufferPool = sync.Pool{
|
|
|
New: func() interface{} {
|
|
|
return make([]byte, 0, 4096)
|
|
|
},
|
|
|
}
|
|
|
|
|
|
// Connection 连接实现
|
|
|
type Connection struct {
|
|
|
id string
|
|
|
conn gnet.Conn
|
|
|
remoteAddr string
|
|
|
localAddr string
|
|
|
ctx context.Context
|
|
|
mu sync.RWMutex
|
|
|
createdAt time.Time
|
|
|
lastActive time.Time
|
|
|
attributes map[string]interface{}
|
|
|
}
|
|
|
|
|
|
// NewConnection 创建新连接
|
|
|
// 如果id为空,将自动生成唯一的xid
|
|
|
func NewConnection(id string, conn gnet.Conn) *Connection {
|
|
|
if id == "" {
|
|
|
id = xid.New().String()
|
|
|
}
|
|
|
return &Connection{
|
|
|
id: id,
|
|
|
conn: conn,
|
|
|
remoteAddr: conn.RemoteAddr().String(),
|
|
|
localAddr: conn.LocalAddr().String(),
|
|
|
createdAt: time.Now(),
|
|
|
lastActive: time.Now(),
|
|
|
attributes: make(map[string]interface{}),
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// ID 获取连接ID
|
|
|
func (c *Connection) ID() string {
|
|
|
return c.id
|
|
|
}
|
|
|
|
|
|
// RemoteAddr 获取远程地址
|
|
|
func (c *Connection) RemoteAddr() string {
|
|
|
return c.remoteAddr
|
|
|
}
|
|
|
|
|
|
// LocalAddr 获取本地地址
|
|
|
func (c *Connection) LocalAddr() string {
|
|
|
return c.localAddr
|
|
|
}
|
|
|
|
|
|
// Write 写入数据(异步写入,零拷贝优化)
|
|
|
func (c *Connection) Write(data []byte) error {
|
|
|
c.mu.Lock()
|
|
|
c.lastActive = time.Now()
|
|
|
c.mu.Unlock()
|
|
|
|
|
|
var buf []byte
|
|
|
var shouldRecycle bool
|
|
|
|
|
|
// 从池中获取缓冲区(零拷贝优化)
|
|
|
pooledBuf := writeBufferPool.Get().([]byte)
|
|
|
if cap(pooledBuf) >= len(data) {
|
|
|
// 池中的缓冲区足够大,复用
|
|
|
buf = pooledBuf[:len(data)]
|
|
|
shouldRecycle = true
|
|
|
} else {
|
|
|
// 池中的缓冲区太小,重新分配
|
|
|
buf = make([]byte, len(data))
|
|
|
shouldRecycle = false
|
|
|
// 将池中的缓冲区归还(虽然太小,但可以供其他小数据使用)
|
|
|
pooledBuf = pooledBuf[:0]
|
|
|
if cap(pooledBuf) <= 64*1024 {
|
|
|
writeBufferPool.Put(pooledBuf)
|
|
|
}
|
|
|
}
|
|
|
copy(buf, data)
|
|
|
|
|
|
// 保存用于回收的缓冲区引用
|
|
|
recycleBuf := buf
|
|
|
if !shouldRecycle {
|
|
|
recycleBuf = nil
|
|
|
}
|
|
|
|
|
|
// 使用AsyncWrite异步写入,避免阻塞事件循环
|
|
|
err := c.conn.AsyncWrite(buf, func(c gnet.Conn, err error) error {
|
|
|
// 写入完成后,将缓冲区归还到池中(零拷贝优化)
|
|
|
if recycleBuf != nil {
|
|
|
// 重置长度但保留容量
|
|
|
recycleBuf = recycleBuf[:0]
|
|
|
// 如果容量太大,不回收(避免池中积累大对象)
|
|
|
if cap(recycleBuf) <= 64*1024 {
|
|
|
writeBufferPool.Put(recycleBuf)
|
|
|
}
|
|
|
}
|
|
|
if err != nil {
|
|
|
// 写入错误处理(可以在这里记录日志或更新metrics)
|
|
|
// 注意:这里不能访问Connection的锁,因为可能在不同goroutine中执行
|
|
|
// TODO: 添加日志记录写入错误
|
|
|
}
|
|
|
return nil
|
|
|
})
|
|
|
if err != nil {
|
|
|
// AsyncWrite失败,记录错误
|
|
|
// 注意:这里不能访问logger,因为Connection不包含logger
|
|
|
// 错误会在AsyncWrite的回调中处理
|
|
|
}
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
// Close 关闭连接
|
|
|
func (c *Connection) Close() error {
|
|
|
return c.conn.Close()
|
|
|
}
|
|
|
|
|
|
// Context 获取上下文
|
|
|
func (c *Connection) Context() context.Context {
|
|
|
c.mu.RLock()
|
|
|
defer c.mu.RUnlock()
|
|
|
return c.ctx
|
|
|
}
|
|
|
|
|
|
// SetContext 设置上下文
|
|
|
func (c *Connection) SetContext(ctx context.Context) {
|
|
|
c.mu.Lock()
|
|
|
defer c.mu.Unlock()
|
|
|
c.ctx = ctx
|
|
|
}
|
|
|
|
|
|
// SetAttribute 设置属性
|
|
|
func (c *Connection) SetAttribute(key string, value interface{}) {
|
|
|
c.mu.Lock()
|
|
|
defer c.mu.Unlock()
|
|
|
if c.attributes == nil {
|
|
|
c.attributes = make(map[string]interface{})
|
|
|
}
|
|
|
c.attributes[key] = value
|
|
|
}
|
|
|
|
|
|
// GetAttribute 获取属性
|
|
|
func (c *Connection) GetAttribute(key string) interface{} {
|
|
|
c.mu.RLock()
|
|
|
defer c.mu.RUnlock()
|
|
|
if c.attributes == nil {
|
|
|
return nil
|
|
|
}
|
|
|
return c.attributes[key]
|
|
|
}
|
|
|
|
|
|
// LastActive 获取最后活动时间
|
|
|
func (c *Connection) LastActive() time.Time {
|
|
|
c.mu.RLock()
|
|
|
defer c.mu.RUnlock()
|
|
|
return c.lastActive
|
|
|
}
|
|
|
|
|
|
// UpdateActive 更新活动时间
|
|
|
func (c *Connection) UpdateActive() {
|
|
|
c.mu.Lock()
|
|
|
defer c.mu.Unlock()
|
|
|
c.lastActive = time.Now()
|
|
|
}
|
|
|
|
|
|
// GnetConn 获取gnet连接(内部使用)
|
|
|
func (c *Connection) GnetConn() gnet.Conn {
|
|
|
return c.conn
|
|
|
}
|