package integration import ( "runtime" "sync" "sync/atomic" "testing" "time" "github.com/noahlann/nnet/pkg/nnet" "github.com/stretchr/testify/assert" ) // TestHighConcurrencyConnections 测试高并发连接 func TestHighConcurrencyConnections(t *testing.T) { cfg := &nnet.Config{ Addr: "tcp://127.0.0.1:0", // 使用随机端口 Codec: &nnet.CodecConfig{ DefaultCodec: "plain", }, } ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) { server.Router().RegisterString("ping", func(ctx nnet.Context) error { return ctx.Response().WriteBytes([]byte("pong\n")) }) }) defer CleanupTestServer(t, ts) // 并发连接数(提高并发量) concurrency := 500 requestsPerConn := 20 var successCount int64 var errorCount int64 var wg sync.WaitGroup startTime := time.Now() // 创建并发连接 for i := 0; i < concurrency; i++ { wg.Add(1) go func(id int) { defer wg.Done() client := NewTestClient(t, ts.Addr, nil) defer CleanupTestClient(t, client) ConnectTestClient(t, client) // 每个连接发送多个请求 for j := 0; j < requestsPerConn; j++ { resp := RequestWithTimeout(t, client, []byte("ping"), 5*time.Second) if string(resp) == "pong\n" { atomic.AddInt64(&successCount, 1) } else { atomic.AddInt64(&errorCount, 1) } } }(i) } wg.Wait() duration := time.Since(startTime) totalRequests := int64(concurrency * requestsPerConn) successRate := float64(successCount) / float64(totalRequests) * 100 t.Logf("压力测试结果:") t.Logf(" 并发连接数: %d", concurrency) t.Logf(" 每连接请求数: %d", requestsPerConn) t.Logf(" 总请求数: %d", totalRequests) t.Logf(" 成功请求数: %d", successCount) t.Logf(" 失败请求数: %d", errorCount) t.Logf(" 成功率: %.2f%%", successRate) t.Logf(" 总耗时: %v", duration) t.Logf(" QPS (每秒请求数): %.2f", float64(totalRequests)/duration.Seconds()) t.Logf(" TPS (每秒事务数): %.2f", float64(successCount)/duration.Seconds()) t.Logf(" 平均响应时间: %.2f ms", duration.Seconds()*1000/float64(totalRequests)) t.Logf(" 峰值并发连接: %d", concurrency) // 验证成功率应该大于95% assert.GreaterOrEqual(t, successRate, 95.0, "成功率应该大于95%%") assert.Equal(t, int64(0), errorCount, "不应该有错误") } // TestHighConcurrencyRequests 测试高并发请求(单连接) // 注意:单连接并发存在技术限制: // 1. TCP连接是串行的,多个goroutine同时读写会导致数据混乱 // 2. 客户端实现使用互斥锁,高并发时锁竞争成为瓶颈 // 3. 请求和响应的匹配可能出错 // 建议:单连接并发控制在较低水平(50-100),高并发应使用多连接 func TestHighConcurrencyRequests(t *testing.T) { cfg := &nnet.Config{ Addr: "tcp://127.0.0.1:0", // 使用随机端口 Codec: &nnet.CodecConfig{ DefaultCodec: "plain", }, } ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) { server.Router().RegisterString("echo", func(ctx nnet.Context) error { data := ctx.Request().Raw() return ctx.Response().WriteBytes(data) }) }) defer CleanupTestServer(t, ts) client := NewTestClient(t, ts.Addr, nil) defer CleanupTestClient(t, client) ConnectTestClient(t, client) // 并发请求数(单连接,降低到合理范围) // 注意:单连接并发建议控制在50-100,超过此范围可能导致数据混乱或锁竞争 concurrency := 100 message := []byte("echo") var successCount int64 var errorCount int64 var wg sync.WaitGroup startTime := time.Now() // 并发发送请求 for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() resp := RequestWithTimeout(t, client, message, 5*time.Second) if string(resp) == string(message) { atomic.AddInt64(&successCount, 1) } else { atomic.AddInt64(&errorCount, 1) } }() } wg.Wait() duration := time.Since(startTime) successRate := float64(successCount) / float64(concurrency) * 100 t.Logf("高并发请求测试结果(单连接):") t.Logf(" 并发请求数: %d", concurrency) t.Logf(" 成功请求数: %d", successCount) t.Logf(" 失败请求数: %d", errorCount) t.Logf(" 成功率: %.2f%%", successRate) t.Logf(" 总耗时: %v", duration) t.Logf(" QPS (每秒请求数): %.2f", float64(concurrency)/duration.Seconds()) t.Logf(" TPS (每秒事务数): %.2f", float64(successCount)/duration.Seconds()) t.Logf(" 平均响应时间: %.2f ms", duration.Seconds()*1000/float64(concurrency)) t.Logf(" 注意:单连接并发存在技术限制,建议使用多连接并发测试") // 验证成功率应该大于90%(单连接并发可能受到客户端实现限制) assert.GreaterOrEqual(t, successRate, 90.0, "成功率应该大于90%%(单连接并发可能受到客户端实现限制)") } // TestMemoryUsage 测试内存使用情况 func TestMemoryUsage(t *testing.T) { cfg := &nnet.Config{ Addr: "tcp://127.0.0.1:0", // 使用随机端口 Codec: &nnet.CodecConfig{ DefaultCodec: "plain", }, } ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) { server.Router().RegisterString("echo", func(ctx nnet.Context) error { data := ctx.Request().Raw() return ctx.Response().WriteBytes(data) }) }) defer CleanupTestServer(t, ts) // 记录初始内存 runtime.GC() var m1, m2 runtime.MemStats runtime.ReadMemStats(&m1) // 创建多个连接并发送请求(提高并发量) concurrency := 200 var wg sync.WaitGroup for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() client := NewTestClient(t, ts.Addr, nil) defer CleanupTestClient(t, client) ConnectTestClient(t, client) // 发送多个请求 for j := 0; j < 100; j++ { _ = RequestWithTimeout(t, client, []byte("echo"), 5*time.Second) } }() } wg.Wait() // 记录最终内存 runtime.GC() runtime.ReadMemStats(&m2) // 计算内存增长 memAlloc := m2.Alloc - m1.Alloc memTotalAlloc := m2.TotalAlloc - m1.TotalAlloc memMallocs := m2.Mallocs - m1.Mallocs t.Logf("内存使用测试结果:") t.Logf(" 并发连接数: %d", concurrency) t.Logf(" 每连接请求数: 100") t.Logf(" 内存分配增长: %.2f MB", float64(memAlloc)/1024/1024) t.Logf(" 总内存分配: %.2f MB", float64(memTotalAlloc)/1024/1024) t.Logf(" 内存分配次数: %d", memMallocs) t.Logf(" 当前内存使用: %.2f MB", float64(m2.Alloc)/1024/1024) t.Logf(" 系统内存使用: %.2f MB", float64(m2.Sys)/1024/1024) // 验证内存使用合理(每个连接+请求应该小于1MB) maxExpectedMem := float64(concurrency) * 1.0 // 每个连接最多1MB assert.Less(t, float64(memAlloc)/1024/1024, maxExpectedMem, "内存使用应该合理") } // TestSustainedLoad 测试持续负载 func TestSustainedLoad(t *testing.T) { cfg := &nnet.Config{ Addr: "tcp://127.0.0.1:0", // 使用随机端口 Codec: &nnet.CodecConfig{ DefaultCodec: "plain", }, } ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) { server.Router().RegisterString("ping", func(ctx nnet.Context) error { return ctx.Response().WriteBytes([]byte("pong\n")) }) }) defer CleanupTestServer(t, ts) // 持续负载参数(提高并发量) duration := 10 * time.Second concurrency := 100 requestInterval := 50 * time.Millisecond var successCount int64 var errorCount int64 var wg sync.WaitGroup stopCh := make(chan struct{}) startTime := time.Now() // 启动并发客户端 for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() client := NewTestClient(t, ts.Addr, nil) defer CleanupTestClient(t, client) ConnectTestClient(t, client) ticker := time.NewTicker(requestInterval) defer ticker.Stop() for { select { case <-stopCh: return case <-ticker.C: resp := RequestWithTimeout(t, client, []byte("ping"), 5*time.Second) if string(resp) == "pong\n" { atomic.AddInt64(&successCount, 1) } else { atomic.AddInt64(&errorCount, 1) } } } }() } // 运行指定时间 time.Sleep(duration) close(stopCh) wg.Wait() totalDuration := time.Since(startTime) totalRequests := successCount + errorCount successRate := float64(successCount) / float64(totalRequests) * 100 t.Logf("持续负载测试结果:") t.Logf(" 持续时间: %v", duration) t.Logf(" 并发连接数: %d", concurrency) t.Logf(" 请求间隔: %v", requestInterval) t.Logf(" 总请求数: %d", totalRequests) t.Logf(" 成功请求数: %d", successCount) t.Logf(" 失败请求数: %d", errorCount) t.Logf(" 成功率: %.2f%%", successRate) t.Logf(" 总耗时: %v", totalDuration) t.Logf(" 平均QPS (每秒请求数): %.2f", float64(totalRequests)/totalDuration.Seconds()) t.Logf(" 平均TPS (每秒事务数): %.2f", float64(successCount)/totalDuration.Seconds()) t.Logf(" 峰值并发连接: %d", concurrency) // 验证成功率应该大于95% assert.GreaterOrEqual(t, successRate, 95.0, "成功率应该大于95%%") } // TestLargeMessageStress 测试大消息压力 func TestLargeMessageStress(t *testing.T) { cfg := &nnet.Config{ Addr: "tcp://127.0.0.1:0", // 使用随机端口 Codec: &nnet.CodecConfig{ DefaultCodec: "plain", }, } ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) { server.Router().RegisterString("echo", func(ctx nnet.Context) error { data := ctx.Request().Raw() return ctx.Response().WriteBytes(data) }) }) defer CleanupTestServer(t, ts) client := NewTestClient(t, ts.Addr, nil) defer CleanupTestClient(t, client) ConnectTestClient(t, client) // 大消息大小(100KB,降低到合理范围) messageSize := 100 * 1024 message := make([]byte, messageSize) for i := range message { message[i] = byte(i % 256) } // 注意:大消息测试使用echo路由,消息本身不包含路由匹配字符串 // 需要在消息前添加路由标识 echoMsg := append([]byte("echo"), message...) // 并发发送大消息(提高超时时间) concurrency := 30 var successCount int64 var errorCount int64 var wg sync.WaitGroup startTime := time.Now() for i := 0; i < concurrency; i++ { wg.Add(1) go func() { defer wg.Done() // 大消息需要更长的超时时间(60秒) resp := RequestWithTimeout(t, client, echoMsg, 60*time.Second) // 响应应该包含原始消息(echo路由会返回原始数据) if len(resp) >= len(echoMsg) { atomic.AddInt64(&successCount, 1) } else { atomic.AddInt64(&errorCount, 1) } }() } wg.Wait() duration := time.Since(startTime) successRate := float64(successCount) / float64(concurrency) * 100 // 计算吞吐量(使用实际消息大小) actualMessageSize := int64(len(echoMsg)) throughput := float64(successCount*actualMessageSize) / duration.Seconds() / 1024 / 1024 t.Logf("大消息压力测试结果:") t.Logf(" 消息大小: %d bytes (%.2f KB)", messageSize, float64(messageSize)/1024) t.Logf(" 并发数: %d", concurrency) t.Logf(" 成功请求数: %d", successCount) t.Logf(" 失败请求数: %d", errorCount) t.Logf(" 成功率: %.2f%%", successRate) t.Logf(" 总耗时: %v", duration) t.Logf(" 吞吐量: %.2f MB/s", throughput) t.Logf(" QPS (每秒请求数): %.2f", float64(concurrency)/duration.Seconds()) t.Logf(" TPS (每秒事务数): %.2f", float64(successCount)/duration.Seconds()) t.Logf(" 平均响应时间: %.2f ms", duration.Seconds()*1000/float64(concurrency)) // 验证成功率应该大于90%(大消息可能更容易失败) assert.GreaterOrEqual(t, successRate, 90.0, "成功率应该大于90%%") } // TestMaxConcurrency 测试最大并发连接数(极限测试) func TestMaxConcurrency(t *testing.T) { cfg := &nnet.Config{ Addr: "tcp://127.0.0.1:0", Codec: &nnet.CodecConfig{ DefaultCodec: "plain", }, } ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) { server.Router().RegisterString("ping", func(ctx nnet.Context) error { return ctx.Response().WriteBytes([]byte("pong\n")) }) }) defer CleanupTestServer(t, ts) // 逐步增加并发数,找到最大并发 maxConcurrency := 0 successThreshold := 95.0 for testConcurrency := 100; testConcurrency <= 2000; testConcurrency += 100 { var successCount int64 var errorCount int64 var wg sync.WaitGroup startTime := time.Now() // 创建并发连接 for i := 0; i < testConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() client := NewTestClient(t, ts.Addr, nil) defer CleanupTestClient(t, client) ConnectTestClient(t, client) // 每个连接发送一个请求 resp := RequestWithTimeout(t, client, []byte("ping"), 5*time.Second) if string(resp) == "pong\n" { atomic.AddInt64(&successCount, 1) } else { atomic.AddInt64(&errorCount, 1) } }() } wg.Wait() duration := time.Since(startTime) totalRequests := int64(testConcurrency) successRate := float64(successCount) / float64(totalRequests) * 100 qps := float64(totalRequests) / duration.Seconds() tps := float64(successCount) / duration.Seconds() t.Logf("并发数: %d, 成功率: %.2f%%, QPS: %.2f, TPS: %.2f, 耗时: %v", testConcurrency, successRate, qps, tps, duration) if successRate >= successThreshold { maxConcurrency = testConcurrency } else { // 如果成功率低于阈值,停止测试 break } } t.Logf("最大并发连接数(成功率>=%.0f%%): %d", successThreshold, maxConcurrency) assert.Greater(t, maxConcurrency, 0, "应该找到最大并发数") } // TestMaxQPS 测试最大QPS(极限测试) func TestMaxQPS(t *testing.T) { cfg := &nnet.Config{ Addr: "tcp://127.0.0.1:0", Codec: &nnet.CodecConfig{ DefaultCodec: "plain", }, } ts := StartTestServerWithRoutes(t, cfg, func(server nnet.Server) { server.Router().RegisterString("ping", func(ctx nnet.Context) error { return ctx.Response().WriteBytes([]byte("pong\n")) }) }) defer CleanupTestServer(t, ts) client := NewTestClient(t, ts.Addr, nil) defer CleanupTestClient(t, client) ConnectTestClient(t, client) // 测试不同并发请求数下的QPS maxQPS := 0.0 maxTPS := 0.0 bestConcurrency := 0 for testConcurrency := 100; testConcurrency <= 2000; testConcurrency += 100 { var successCount int64 var errorCount int64 var wg sync.WaitGroup startTime := time.Now() // 并发发送请求 for i := 0; i < testConcurrency; i++ { wg.Add(1) go func() { defer wg.Done() resp := RequestWithTimeout(t, client, []byte("ping"), 5*time.Second) if string(resp) == "pong\n" { atomic.AddInt64(&successCount, 1) } else { atomic.AddInt64(&errorCount, 1) } }() } wg.Wait() duration := time.Since(startTime) successRate := float64(successCount) / float64(testConcurrency) * 100 qps := float64(testConcurrency) / duration.Seconds() tps := float64(successCount) / duration.Seconds() t.Logf("并发数: %d, 成功率: %.2f%%, QPS: %.2f, TPS: %.2f, 耗时: %v", testConcurrency, successRate, qps, tps, duration) if successRate >= 95.0 && qps > float64(maxQPS) { maxQPS = qps maxTPS = tps bestConcurrency = testConcurrency } // 如果成功率太低,停止测试 if successRate < 80.0 { break } } t.Logf("最大QPS: %.2f, 最大TPS: %.2f (并发数: %d)", maxQPS, maxTPS, bestConcurrency) assert.Greater(t, maxQPS, 0.0, "应该找到最大QPS") }