|
|
package client
|
|
|
|
|
|
import (
|
|
|
"context"
|
|
|
"sync"
|
|
|
"sync/atomic"
|
|
|
"time"
|
|
|
|
|
|
clientpkg "github.com/noahlann/nnet/pkg/client"
|
|
|
)
|
|
|
|
|
|
// Pool 连接池
|
|
|
type Pool struct {
|
|
|
config *PoolConfig
|
|
|
clients []clientpkg.Client
|
|
|
mu sync.RWMutex
|
|
|
available chan clientpkg.Client
|
|
|
maxSize int
|
|
|
currentSize int32 // 使用原子操作
|
|
|
closed int32 // 使用原子操作标记是否已关闭
|
|
|
ctx context.Context
|
|
|
cancel context.CancelFunc
|
|
|
wg sync.WaitGroup
|
|
|
}
|
|
|
|
|
|
// PoolConfig 连接池配置
|
|
|
type PoolConfig struct {
|
|
|
// 最大连接数
|
|
|
MaxSize int
|
|
|
|
|
|
// 最小连接数
|
|
|
MinSize int
|
|
|
|
|
|
// 客户端配置
|
|
|
ClientConfig *clientpkg.Config
|
|
|
|
|
|
// 连接超时时间
|
|
|
AcquireTimeout time.Duration
|
|
|
|
|
|
// 空闲连接超时时间
|
|
|
IdleTimeout time.Duration
|
|
|
}
|
|
|
|
|
|
// DefaultPoolConfig 返回默认连接池配置
|
|
|
func DefaultPoolConfig() *PoolConfig {
|
|
|
return &PoolConfig{
|
|
|
MaxSize: 10,
|
|
|
MinSize: 2,
|
|
|
ClientConfig: clientpkg.DefaultConfig(),
|
|
|
AcquireTimeout: 10 * time.Second,
|
|
|
IdleTimeout: 5 * time.Minute,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// NewPool 创建连接池
|
|
|
func NewPool(config *PoolConfig) (*Pool, error) {
|
|
|
if config == nil {
|
|
|
config = DefaultPoolConfig()
|
|
|
}
|
|
|
|
|
|
if config.MaxSize <= 0 {
|
|
|
config.MaxSize = 10
|
|
|
}
|
|
|
if config.MinSize < 0 {
|
|
|
config.MinSize = 0
|
|
|
}
|
|
|
if config.MinSize > config.MaxSize {
|
|
|
config.MinSize = config.MaxSize
|
|
|
}
|
|
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
pool := &Pool{
|
|
|
config: config,
|
|
|
clients: make([]clientpkg.Client, 0),
|
|
|
available: make(chan clientpkg.Client, config.MaxSize),
|
|
|
maxSize: config.MaxSize,
|
|
|
ctx: ctx,
|
|
|
cancel: cancel,
|
|
|
}
|
|
|
|
|
|
// 创建最小连接数
|
|
|
for i := 0; i < config.MinSize; i++ {
|
|
|
client := NewTCPClient(config.ClientConfig)
|
|
|
if err := client.Connect(); err != nil {
|
|
|
// 创建失败,清理已创建的连接
|
|
|
pool.Close()
|
|
|
return nil, err
|
|
|
}
|
|
|
pool.mu.Lock()
|
|
|
pool.clients = append(pool.clients, client)
|
|
|
pool.mu.Unlock()
|
|
|
pool.available <- client
|
|
|
atomic.AddInt32(&pool.currentSize, 1)
|
|
|
}
|
|
|
|
|
|
// 启动空闲连接清理goroutine
|
|
|
if config.IdleTimeout > 0 {
|
|
|
pool.wg.Add(1)
|
|
|
go pool.idleCleanup()
|
|
|
}
|
|
|
|
|
|
return pool, nil
|
|
|
}
|
|
|
|
|
|
// Acquire 获取连接
|
|
|
func (p *Pool) Acquire() (clientpkg.Client, error) {
|
|
|
// 检查连接池是否已关闭
|
|
|
if atomic.LoadInt32(&p.closed) == 1 {
|
|
|
return nil, clientpkg.NewError("pool is closed")
|
|
|
}
|
|
|
|
|
|
// 尝试从可用连接中获取
|
|
|
select {
|
|
|
case client := <-p.available:
|
|
|
// 检查连接是否有效
|
|
|
if client.IsConnected() {
|
|
|
return client, nil
|
|
|
}
|
|
|
// 连接无效,从池中移除并创建新连接
|
|
|
p.mu.Lock()
|
|
|
p.removeClientUnlocked(client)
|
|
|
p.mu.Unlock()
|
|
|
return p.createClient()
|
|
|
case <-time.After(p.config.AcquireTimeout):
|
|
|
// 超时,尝试创建新连接
|
|
|
return p.createClient()
|
|
|
case <-p.ctx.Done():
|
|
|
return nil, clientpkg.NewError("pool is closed")
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Release 释放连接
|
|
|
func (p *Pool) Release(client clientpkg.Client) error {
|
|
|
// 检查连接池是否已关闭
|
|
|
if atomic.LoadInt32(&p.closed) == 1 {
|
|
|
// 池已关闭,直接关闭连接
|
|
|
if client != nil {
|
|
|
client.Close()
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
if client == nil {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
p.mu.Lock()
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
|
// 检查连接是否有效
|
|
|
if !client.IsConnected() {
|
|
|
// 连接无效,从池中移除
|
|
|
p.removeClientUnlocked(client)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// 检查池是否已满
|
|
|
if len(p.available) >= p.maxSize {
|
|
|
// 池已满,关闭连接
|
|
|
client.Close()
|
|
|
p.removeClientUnlocked(client)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// 将连接放回池中
|
|
|
select {
|
|
|
case p.available <- client:
|
|
|
return nil
|
|
|
default:
|
|
|
// channel已满,关闭连接
|
|
|
client.Close()
|
|
|
p.removeClientUnlocked(client)
|
|
|
return nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// createClient 创建新连接
|
|
|
func (p *Pool) createClient() (clientpkg.Client, error) {
|
|
|
// 检查是否超过最大连接数
|
|
|
if atomic.LoadInt32(&p.currentSize) >= int32(p.maxSize) {
|
|
|
return nil, clientpkg.NewError("pool is full")
|
|
|
}
|
|
|
|
|
|
client := NewTCPClient(p.config.ClientConfig)
|
|
|
if err := client.Connect(); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
p.mu.Lock()
|
|
|
// 再次检查是否超过最大连接数(双重检查)
|
|
|
if p.currentSize >= int32(p.maxSize) {
|
|
|
p.mu.Unlock()
|
|
|
client.Close()
|
|
|
return nil, clientpkg.NewError("pool is full")
|
|
|
}
|
|
|
p.clients = append(p.clients, client)
|
|
|
atomic.AddInt32(&p.currentSize, 1)
|
|
|
p.mu.Unlock()
|
|
|
|
|
|
return client, nil
|
|
|
}
|
|
|
|
|
|
// removeClientUnlocked 从池中移除连接(需要在持有锁的情况下调用)
|
|
|
func (p *Pool) removeClientUnlocked(client clientpkg.Client) {
|
|
|
for i, c := range p.clients {
|
|
|
if c == client {
|
|
|
p.clients = append(p.clients[:i], p.clients[i+1:]...)
|
|
|
atomic.AddInt32(&p.currentSize, -1)
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Close 关闭连接池
|
|
|
func (p *Pool) Close() error {
|
|
|
// 标记为已关闭
|
|
|
if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
|
|
|
// 已经关闭
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// 取消上下文,停止所有goroutine
|
|
|
p.cancel()
|
|
|
|
|
|
// 等待所有goroutine退出
|
|
|
p.wg.Wait()
|
|
|
|
|
|
p.mu.Lock()
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
|
// 关闭所有连接
|
|
|
for _, client := range p.clients {
|
|
|
client.Close()
|
|
|
}
|
|
|
|
|
|
// 清空channel
|
|
|
close(p.available)
|
|
|
for range p.available {
|
|
|
// 清空channel
|
|
|
}
|
|
|
|
|
|
p.clients = nil
|
|
|
atomic.StoreInt32(&p.currentSize, 0)
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// idleCleanup 空闲连接清理goroutine
|
|
|
func (p *Pool) idleCleanup() {
|
|
|
defer p.wg.Done()
|
|
|
|
|
|
ticker := time.NewTicker(p.config.IdleTimeout / 2)
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
case <-p.ctx.Done():
|
|
|
return
|
|
|
case <-ticker.C:
|
|
|
p.cleanupIdleConnections()
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// cleanupIdleConnections 清理空闲连接
|
|
|
func (p *Pool) cleanupIdleConnections() {
|
|
|
// 注意:这个实现简化了,实际应该记录每个连接的最后使用时间
|
|
|
// 当前实现只是检查连接是否有效,如果无效则移除
|
|
|
p.mu.Lock()
|
|
|
defer p.mu.Unlock()
|
|
|
|
|
|
// 检查可用连接是否有效
|
|
|
for {
|
|
|
select {
|
|
|
case client := <-p.available:
|
|
|
if !client.IsConnected() {
|
|
|
// 连接无效,移除
|
|
|
p.removeClientUnlocked(client)
|
|
|
client.Close()
|
|
|
} else {
|
|
|
// 连接有效,放回去
|
|
|
select {
|
|
|
case p.available <- client:
|
|
|
default:
|
|
|
// channel已满,放不回去,关闭连接
|
|
|
client.Close()
|
|
|
p.removeClientUnlocked(client)
|
|
|
}
|
|
|
}
|
|
|
default:
|
|
|
// 没有更多可用连接
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Size 获取当前连接数
|
|
|
func (p *Pool) Size() int {
|
|
|
return int(atomic.LoadInt32(&p.currentSize))
|
|
|
}
|
|
|
|
|
|
// Available 获取可用连接数
|
|
|
func (p *Pool) Available() int {
|
|
|
p.mu.RLock()
|
|
|
defer p.mu.RUnlock()
|
|
|
return len(p.available)
|
|
|
}
|
|
|
|
|
|
// IsClosed 检查连接池是否已关闭
|
|
|
func (p *Pool) IsClosed() bool {
|
|
|
return atomic.LoadInt32(&p.closed) == 1
|
|
|
}
|