package client import ( "context" "sync" "sync/atomic" "time" clientpkg "github.com/noahlann/nnet/pkg/client" ) // Pool 连接池 type Pool struct { config *PoolConfig clients []clientpkg.Client mu sync.RWMutex available chan clientpkg.Client maxSize int currentSize int32 // 使用原子操作 closed int32 // 使用原子操作标记是否已关闭 ctx context.Context cancel context.CancelFunc wg sync.WaitGroup } // PoolConfig 连接池配置 type PoolConfig struct { // 最大连接数 MaxSize int // 最小连接数 MinSize int // 客户端配置 ClientConfig *clientpkg.Config // 连接超时时间 AcquireTimeout time.Duration // 空闲连接超时时间 IdleTimeout time.Duration } // DefaultPoolConfig 返回默认连接池配置 func DefaultPoolConfig() *PoolConfig { return &PoolConfig{ MaxSize: 10, MinSize: 2, ClientConfig: clientpkg.DefaultConfig(), AcquireTimeout: 10 * time.Second, IdleTimeout: 5 * time.Minute, } } // NewPool 创建连接池 func NewPool(config *PoolConfig) (*Pool, error) { if config == nil { config = DefaultPoolConfig() } if config.MaxSize <= 0 { config.MaxSize = 10 } if config.MinSize < 0 { config.MinSize = 0 } if config.MinSize > config.MaxSize { config.MinSize = config.MaxSize } ctx, cancel := context.WithCancel(context.Background()) pool := &Pool{ config: config, clients: make([]clientpkg.Client, 0), available: make(chan clientpkg.Client, config.MaxSize), maxSize: config.MaxSize, ctx: ctx, cancel: cancel, } // 创建最小连接数 for i := 0; i < config.MinSize; i++ { client := NewTCPClient(config.ClientConfig) if err := client.Connect(); err != nil { // 创建失败,清理已创建的连接 pool.Close() return nil, err } pool.mu.Lock() pool.clients = append(pool.clients, client) pool.mu.Unlock() pool.available <- client atomic.AddInt32(&pool.currentSize, 1) } // 启动空闲连接清理goroutine if config.IdleTimeout > 0 { pool.wg.Add(1) go pool.idleCleanup() } return pool, nil } // Acquire 获取连接 func (p *Pool) Acquire() (clientpkg.Client, error) { // 检查连接池是否已关闭 if atomic.LoadInt32(&p.closed) == 1 { return nil, clientpkg.NewError("pool is closed") } // 尝试从可用连接中获取 select { case client := <-p.available: // 检查连接是否有效 if client.IsConnected() { return client, nil } // 连接无效,从池中移除并创建新连接 p.mu.Lock() p.removeClientUnlocked(client) p.mu.Unlock() return p.createClient() case <-time.After(p.config.AcquireTimeout): // 超时,尝试创建新连接 return p.createClient() case <-p.ctx.Done(): return nil, clientpkg.NewError("pool is closed") } } // Release 释放连接 func (p *Pool) Release(client clientpkg.Client) error { // 检查连接池是否已关闭 if atomic.LoadInt32(&p.closed) == 1 { // 池已关闭,直接关闭连接 if client != nil { client.Close() } return nil } if client == nil { return nil } p.mu.Lock() defer p.mu.Unlock() // 检查连接是否有效 if !client.IsConnected() { // 连接无效,从池中移除 p.removeClientUnlocked(client) return nil } // 检查池是否已满 if len(p.available) >= p.maxSize { // 池已满,关闭连接 client.Close() p.removeClientUnlocked(client) return nil } // 将连接放回池中 select { case p.available <- client: return nil default: // channel已满,关闭连接 client.Close() p.removeClientUnlocked(client) return nil } } // createClient 创建新连接 func (p *Pool) createClient() (clientpkg.Client, error) { // 检查是否超过最大连接数 if atomic.LoadInt32(&p.currentSize) >= int32(p.maxSize) { return nil, clientpkg.NewError("pool is full") } client := NewTCPClient(p.config.ClientConfig) if err := client.Connect(); err != nil { return nil, err } p.mu.Lock() // 再次检查是否超过最大连接数(双重检查) if p.currentSize >= int32(p.maxSize) { p.mu.Unlock() client.Close() return nil, clientpkg.NewError("pool is full") } p.clients = append(p.clients, client) atomic.AddInt32(&p.currentSize, 1) p.mu.Unlock() return client, nil } // removeClientUnlocked 从池中移除连接(需要在持有锁的情况下调用) func (p *Pool) removeClientUnlocked(client clientpkg.Client) { for i, c := range p.clients { if c == client { p.clients = append(p.clients[:i], p.clients[i+1:]...) atomic.AddInt32(&p.currentSize, -1) break } } } // Close 关闭连接池 func (p *Pool) Close() error { // 标记为已关闭 if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) { // 已经关闭 return nil } // 取消上下文,停止所有goroutine p.cancel() // 等待所有goroutine退出 p.wg.Wait() p.mu.Lock() defer p.mu.Unlock() // 关闭所有连接 for _, client := range p.clients { client.Close() } // 清空channel close(p.available) for range p.available { // 清空channel } p.clients = nil atomic.StoreInt32(&p.currentSize, 0) return nil } // idleCleanup 空闲连接清理goroutine func (p *Pool) idleCleanup() { defer p.wg.Done() ticker := time.NewTicker(p.config.IdleTimeout / 2) defer ticker.Stop() for { select { case <-p.ctx.Done(): return case <-ticker.C: p.cleanupIdleConnections() } } } // cleanupIdleConnections 清理空闲连接 func (p *Pool) cleanupIdleConnections() { // 注意:这个实现简化了,实际应该记录每个连接的最后使用时间 // 当前实现只是检查连接是否有效,如果无效则移除 p.mu.Lock() defer p.mu.Unlock() // 检查可用连接是否有效 for { select { case client := <-p.available: if !client.IsConnected() { // 连接无效,移除 p.removeClientUnlocked(client) client.Close() } else { // 连接有效,放回去 select { case p.available <- client: default: // channel已满,放不回去,关闭连接 client.Close() p.removeClientUnlocked(client) } } default: // 没有更多可用连接 return } } } // Size 获取当前连接数 func (p *Pool) Size() int { return int(atomic.LoadInt32(&p.currentSize)) } // Available 获取可用连接数 func (p *Pool) Available() int { p.mu.RLock() defer p.mu.RUnlock() return len(p.available) } // IsClosed 检查连接池是否已关闭 func (p *Pool) IsClosed() bool { return atomic.LoadInt32(&p.closed) == 1 }