You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

534 lines
15 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package integration
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/noahlann/nnet/pkg/nnet"
"github.com/stretchr/testify/assert"
)
// TestHighConcurrencyConnections 测试高并发连接
func TestHighConcurrencyConnections(t *testing.T) {
cfg := &nnet.Config{
Addr: "tcp://127.0.0.1:0", // 使用随机端口
Codec: &nnet.CodecConfig{
DefaultCodec: "plain",
},
}
ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) {
server.Router().RegisterString("ping", func(ctx nnet.Context) error {
return ctx.Response().WriteBytes([]byte("pong\n"))
})
})
defer CleanupTestServer(t, ts)
// 并发连接数(提高并发量)
concurrency := 500
requestsPerConn := 20
var successCount int64
var errorCount int64
var wg sync.WaitGroup
startTime := time.Now()
// 创建并发连接
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
// 每个连接发送多个请求
for j := 0; j < requestsPerConn; j++ {
resp := RequestWithTimeout(t, client, []byte("ping"), 5*time.Second)
if string(resp) == "pong\n" {
atomic.AddInt64(&successCount, 1)
} else {
atomic.AddInt64(&errorCount, 1)
}
}
}(i)
}
wg.Wait()
duration := time.Since(startTime)
totalRequests := int64(concurrency * requestsPerConn)
successRate := float64(successCount) / float64(totalRequests) * 100
t.Logf("压力测试结果:")
t.Logf(" 并发连接数: %d", concurrency)
t.Logf(" 每连接请求数: %d", requestsPerConn)
t.Logf(" 总请求数: %d", totalRequests)
t.Logf(" 成功请求数: %d", successCount)
t.Logf(" 失败请求数: %d", errorCount)
t.Logf(" 成功率: %.2f%%", successRate)
t.Logf(" 总耗时: %v", duration)
t.Logf(" QPS (每秒请求数): %.2f", float64(totalRequests)/duration.Seconds())
t.Logf(" TPS (每秒事务数): %.2f", float64(successCount)/duration.Seconds())
t.Logf(" 平均响应时间: %.2f ms", duration.Seconds()*1000/float64(totalRequests))
t.Logf(" 峰值并发连接: %d", concurrency)
// 验证成功率应该大于95%
assert.GreaterOrEqual(t, successRate, 95.0, "成功率应该大于95%%")
assert.Equal(t, int64(0), errorCount, "不应该有错误")
}
// TestHighConcurrencyRequests 测试高并发请求(单连接)
// 注意:单连接并发存在技术限制:
// 1. TCP连接是串行的多个goroutine同时读写会导致数据混乱
// 2. 客户端实现使用互斥锁,高并发时锁竞争成为瓶颈
// 3. 请求和响应的匹配可能出错
// 建议单连接并发控制在较低水平50-100高并发应使用多连接
func TestHighConcurrencyRequests(t *testing.T) {
cfg := &nnet.Config{
Addr: "tcp://127.0.0.1:0", // 使用随机端口
Codec: &nnet.CodecConfig{
DefaultCodec: "plain",
},
}
ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) {
server.Router().RegisterString("echo", func(ctx nnet.Context) error {
data := ctx.Request().Raw()
return ctx.Response().WriteBytes(data)
})
})
defer CleanupTestServer(t, ts)
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
// 并发请求数(单连接,降低到合理范围)
// 注意单连接并发建议控制在50-100超过此范围可能导致数据混乱或锁竞争
concurrency := 100
message := []byte("echo")
var successCount int64
var errorCount int64
var wg sync.WaitGroup
startTime := time.Now()
// 并发发送请求
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp := RequestWithTimeout(t, client, message, 5*time.Second)
if string(resp) == string(message) {
atomic.AddInt64(&successCount, 1)
} else {
atomic.AddInt64(&errorCount, 1)
}
}()
}
wg.Wait()
duration := time.Since(startTime)
successRate := float64(successCount) / float64(concurrency) * 100
t.Logf("高并发请求测试结果(单连接):")
t.Logf(" 并发请求数: %d", concurrency)
t.Logf(" 成功请求数: %d", successCount)
t.Logf(" 失败请求数: %d", errorCount)
t.Logf(" 成功率: %.2f%%", successRate)
t.Logf(" 总耗时: %v", duration)
t.Logf(" QPS (每秒请求数): %.2f", float64(concurrency)/duration.Seconds())
t.Logf(" TPS (每秒事务数): %.2f", float64(successCount)/duration.Seconds())
t.Logf(" 平均响应时间: %.2f ms", duration.Seconds()*1000/float64(concurrency))
t.Logf(" 注意:单连接并发存在技术限制,建议使用多连接并发测试")
// 验证成功率应该大于90%(单连接并发可能受到客户端实现限制)
assert.GreaterOrEqual(t, successRate, 90.0, "成功率应该大于90%%(单连接并发可能受到客户端实现限制)")
}
// TestMemoryUsage 测试内存使用情况
func TestMemoryUsage(t *testing.T) {
cfg := &nnet.Config{
Addr: "tcp://127.0.0.1:0", // 使用随机端口
Codec: &nnet.CodecConfig{
DefaultCodec: "plain",
},
}
ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) {
server.Router().RegisterString("echo", func(ctx nnet.Context) error {
data := ctx.Request().Raw()
return ctx.Response().WriteBytes(data)
})
})
defer CleanupTestServer(t, ts)
// 记录初始内存
runtime.GC()
var m1, m2 runtime.MemStats
runtime.ReadMemStats(&m1)
// 创建多个连接并发送请求(提高并发量)
concurrency := 200
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
// 发送多个请求
for j := 0; j < 100; j++ {
_ = RequestWithTimeout(t, client, []byte("echo"), 5*time.Second)
}
}()
}
wg.Wait()
// 记录最终内存
runtime.GC()
runtime.ReadMemStats(&m2)
// 计算内存增长
memAlloc := m2.Alloc - m1.Alloc
memTotalAlloc := m2.TotalAlloc - m1.TotalAlloc
memMallocs := m2.Mallocs - m1.Mallocs
t.Logf("内存使用测试结果:")
t.Logf(" 并发连接数: %d", concurrency)
t.Logf(" 每连接请求数: 100")
t.Logf(" 内存分配增长: %.2f MB", float64(memAlloc)/1024/1024)
t.Logf(" 总内存分配: %.2f MB", float64(memTotalAlloc)/1024/1024)
t.Logf(" 内存分配次数: %d", memMallocs)
t.Logf(" 当前内存使用: %.2f MB", float64(m2.Alloc)/1024/1024)
t.Logf(" 系统内存使用: %.2f MB", float64(m2.Sys)/1024/1024)
// 验证内存使用合理(每个连接+请求应该小于1MB
maxExpectedMem := float64(concurrency) * 1.0 // 每个连接最多1MB
assert.Less(t, float64(memAlloc)/1024/1024, maxExpectedMem, "内存使用应该合理")
}
// TestSustainedLoad 测试持续负载
func TestSustainedLoad(t *testing.T) {
cfg := &nnet.Config{
Addr: "tcp://127.0.0.1:0", // 使用随机端口
Codec: &nnet.CodecConfig{
DefaultCodec: "plain",
},
}
ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) {
server.Router().RegisterString("ping", func(ctx nnet.Context) error {
return ctx.Response().WriteBytes([]byte("pong\n"))
})
})
defer CleanupTestServer(t, ts)
// 持续负载参数(提高并发量)
duration := 10 * time.Second
concurrency := 100
requestInterval := 50 * time.Millisecond
var successCount int64
var errorCount int64
var wg sync.WaitGroup
stopCh := make(chan struct{})
startTime := time.Now()
// 启动并发客户端
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
ticker := time.NewTicker(requestInterval)
defer ticker.Stop()
for {
select {
case <-stopCh:
return
case <-ticker.C:
resp := RequestWithTimeout(t, client, []byte("ping"), 5*time.Second)
if string(resp) == "pong\n" {
atomic.AddInt64(&successCount, 1)
} else {
atomic.AddInt64(&errorCount, 1)
}
}
}
}()
}
// 运行指定时间
time.Sleep(duration)
close(stopCh)
wg.Wait()
totalDuration := time.Since(startTime)
totalRequests := successCount + errorCount
successRate := float64(successCount) / float64(totalRequests) * 100
t.Logf("持续负载测试结果:")
t.Logf(" 持续时间: %v", duration)
t.Logf(" 并发连接数: %d", concurrency)
t.Logf(" 请求间隔: %v", requestInterval)
t.Logf(" 总请求数: %d", totalRequests)
t.Logf(" 成功请求数: %d", successCount)
t.Logf(" 失败请求数: %d", errorCount)
t.Logf(" 成功率: %.2f%%", successRate)
t.Logf(" 总耗时: %v", totalDuration)
t.Logf(" 平均QPS (每秒请求数): %.2f", float64(totalRequests)/totalDuration.Seconds())
t.Logf(" 平均TPS (每秒事务数): %.2f", float64(successCount)/totalDuration.Seconds())
t.Logf(" 峰值并发连接: %d", concurrency)
// 验证成功率应该大于95%
assert.GreaterOrEqual(t, successRate, 95.0, "成功率应该大于95%%")
}
// TestLargeMessageStress 测试大消息压力
func TestLargeMessageStress(t *testing.T) {
cfg := &nnet.Config{
Addr: "tcp://127.0.0.1:0", // 使用随机端口
Codec: &nnet.CodecConfig{
DefaultCodec: "plain",
},
}
ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) {
server.Router().RegisterString("echo", func(ctx nnet.Context) error {
data := ctx.Request().Raw()
return ctx.Response().WriteBytes(data)
})
})
defer CleanupTestServer(t, ts)
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
// 大消息大小100KB降低到合理范围
messageSize := 100 * 1024
message := make([]byte, messageSize)
for i := range message {
message[i] = byte(i % 256)
}
// 注意大消息测试使用echo路由消息本身不包含路由匹配字符串
// 需要在消息前添加路由标识
echoMsg := append([]byte("echo"), message...)
// 并发发送大消息(提高超时时间)
concurrency := 30
var successCount int64
var errorCount int64
var wg sync.WaitGroup
startTime := time.Now()
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 大消息需要更长的超时时间60秒
resp := RequestWithTimeout(t, client, echoMsg, 60*time.Second)
// 响应应该包含原始消息echo路由会返回原始数据
if len(resp) >= len(echoMsg) {
atomic.AddInt64(&successCount, 1)
} else {
atomic.AddInt64(&errorCount, 1)
}
}()
}
wg.Wait()
duration := time.Since(startTime)
successRate := float64(successCount) / float64(concurrency) * 100
// 计算吞吐量(使用实际消息大小)
actualMessageSize := int64(len(echoMsg))
throughput := float64(successCount*actualMessageSize) / duration.Seconds() / 1024 / 1024
t.Logf("大消息压力测试结果:")
t.Logf(" 消息大小: %d bytes (%.2f KB)", messageSize, float64(messageSize)/1024)
t.Logf(" 并发数: %d", concurrency)
t.Logf(" 成功请求数: %d", successCount)
t.Logf(" 失败请求数: %d", errorCount)
t.Logf(" 成功率: %.2f%%", successRate)
t.Logf(" 总耗时: %v", duration)
t.Logf(" 吞吐量: %.2f MB/s", throughput)
t.Logf(" QPS (每秒请求数): %.2f", float64(concurrency)/duration.Seconds())
t.Logf(" TPS (每秒事务数): %.2f", float64(successCount)/duration.Seconds())
t.Logf(" 平均响应时间: %.2f ms", duration.Seconds()*1000/float64(concurrency))
// 验证成功率应该大于90%(大消息可能更容易失败)
assert.GreaterOrEqual(t, successRate, 90.0, "成功率应该大于90%%")
}
// TestMaxConcurrency 测试最大并发连接数(极限测试)
func TestMaxConcurrency(t *testing.T) {
cfg := &nnet.Config{
Addr: "tcp://127.0.0.1:0",
Codec: &nnet.CodecConfig{
DefaultCodec: "plain",
},
}
ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) {
server.Router().RegisterString("ping", func(ctx nnet.Context) error {
return ctx.Response().WriteBytes([]byte("pong\n"))
})
})
defer CleanupTestServer(t, ts)
// 逐步增加并发数,找到最大并发
maxConcurrency := 0
successThreshold := 95.0
for testConcurrency := 100; testConcurrency <= 2000; testConcurrency += 100 {
var successCount int64
var errorCount int64
var wg sync.WaitGroup
startTime := time.Now()
// 创建并发连接
for i := 0; i < testConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
// 每个连接发送一个请求
resp := RequestWithTimeout(t, client, []byte("ping"), 5*time.Second)
if string(resp) == "pong\n" {
atomic.AddInt64(&successCount, 1)
} else {
atomic.AddInt64(&errorCount, 1)
}
}()
}
wg.Wait()
duration := time.Since(startTime)
totalRequests := int64(testConcurrency)
successRate := float64(successCount) / float64(totalRequests) * 100
qps := float64(totalRequests) / duration.Seconds()
tps := float64(successCount) / duration.Seconds()
t.Logf("并发数: %d, 成功率: %.2f%%, QPS: %.2f, TPS: %.2f, 耗时: %v", testConcurrency, successRate, qps, tps, duration)
if successRate >= successThreshold {
maxConcurrency = testConcurrency
} else {
// 如果成功率低于阈值,停止测试
break
}
}
t.Logf("最大并发连接数(成功率>=%.0f%%: %d", successThreshold, maxConcurrency)
assert.Greater(t, maxConcurrency, 0, "应该找到最大并发数")
}
// TestMaxQPS 测试最大QPS极限测试
func TestMaxQPS(t *testing.T) {
cfg := &nnet.Config{
Addr: "tcp://127.0.0.1:0",
Codec: &nnet.CodecConfig{
DefaultCodec: "plain",
},
}
ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) {
server.Router().RegisterString("ping", func(ctx nnet.Context) error {
return ctx.Response().WriteBytes([]byte("pong\n"))
})
})
defer CleanupTestServer(t, ts)
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
// 测试不同并发请求数下的QPS
maxQPS := 0.0
maxTPS := 0.0
bestConcurrency := 0
for testConcurrency := 100; testConcurrency <= 2000; testConcurrency += 100 {
var successCount int64
var errorCount int64
var wg sync.WaitGroup
startTime := time.Now()
// 并发发送请求
for i := 0; i < testConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
resp := RequestWithTimeout(t, client, []byte("ping"), 5*time.Second)
if string(resp) == "pong\n" {
atomic.AddInt64(&successCount, 1)
} else {
atomic.AddInt64(&errorCount, 1)
}
}()
}
wg.Wait()
duration := time.Since(startTime)
successRate := float64(successCount) / float64(testConcurrency) * 100
qps := float64(testConcurrency) / duration.Seconds()
tps := float64(successCount) / duration.Seconds()
t.Logf("并发数: %d, 成功率: %.2f%%, QPS: %.2f, TPS: %.2f, 耗时: %v", testConcurrency, successRate, qps, tps, duration)
if successRate >= 95.0 && qps > float64(maxQPS) {
maxQPS = qps
maxTPS = tps
bestConcurrency = testConcurrency
}
// 如果成功率太低,停止测试
if successRate < 80.0 {
break
}
}
t.Logf("最大QPS: %.2f, 最大TPS: %.2f (并发数: %d)", maxQPS, maxTPS, bestConcurrency)
assert.Greater(t, maxQPS, 0.0, "应该找到最大QPS")
}