You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
4.1 KiB
Go

package integration
import (
"fmt"
"net"
"strings"
"testing"
"time"
internalprotocol "github.com/noahlann/nnet/internal/protocol/nnet"
"github.com/noahlann/nnet/pkg/nnet"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestLargeMessage 测试大消息处理
func TestLargeMessage(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
listener.Close()
cfg := &nnet.Config{
Addr: fmt.Sprintf("tcp://127.0.0.1:%d", port),
Codec: &nnet.CodecConfig{
DefaultCodec: "json",
},
}
ts := StartTestServerWithRoutes(t, cfg, func(srv nnet.Server) {
srv.Router().RegisterString("large", func(ctx nnet.Context) error {
// 返回大消息
largeData := strings.Repeat("x", 10000)
return ctx.Response().Write(map[string]any{
"size": len(largeData),
"data": largeData,
})
})
})
defer CleanupTestServer(t, ts)
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
time.Sleep(100 * time.Millisecond)
// 发送请求
resp := RequestWithTimeout(t, client, []byte("large"), 5*time.Second)
t.Logf("Response size: %d bytes", len(resp))
assert.Greater(t, len(resp), 1000, "Response should be large")
}
// TestMultipleMessages 测试多个消息处理
func TestMultipleMessages(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
listener.Close()
cfg := &nnet.Config{
Addr: fmt.Sprintf("tcp://127.0.0.1:%d", port),
}
ts := StartTestServerWithRoutes(t, cfg, func(srv nnet.Server) {
srv.Router().RegisterString("test", func(ctx nnet.Context) error {
return ctx.Response().WriteBytes([]byte("ok\n"))
})
})
defer CleanupTestServer(t, ts)
client := NewTestClient(t, ts.Addr, nil)
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
time.Sleep(100 * time.Millisecond)
// 发送多个请求
const numRequests = 10
for i := 0; i < numRequests; i++ {
resp := RequestWithTimeout(t, client, []byte("test"), 3*time.Second)
assert.Contains(t, string(resp), "ok", "Request %d should succeed", i)
}
}
// TestUnpacker 测试粘包拆包
func TestUnpacker(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
port := listener.Addr().(*net.TCPAddr).Port
listener.Close()
cfg := &nnet.Config{
Addr: fmt.Sprintf("tcp://127.0.0.1:%d", port),
ApplicationProtocol: "nnet",
Codec: &nnet.CodecConfig{
DefaultCodec: "json",
EnableProtocolEncode: true,
},
}
server, err := nnet.NewServer(cfg)
require.NoError(t, err)
// 注册协议
pm := server.ProtocolManager()
proto := internalprotocol.NewNNetProtocol("1.0")
require.NoError(t, pm.Register(proto), "Should register protocol")
var messageCount int
server.Router().RegisterString("test", func(ctx nnet.Context) error {
messageCount++
return ctx.Response().WriteBytes([]byte("ok\n"))
})
ts := &TestServer{
Server: server,
Addr: cfg.Addr,
stopCh: make(chan struct{}),
}
ts.wg.Add(1)
go func() {
defer ts.wg.Done()
if err := server.Start(); err != nil {
t.Logf("Server error: %v", err)
}
}()
require.Eventually(t, func() bool {
return server.Started()
}, 3*time.Second, 50*time.Millisecond, "Server should start")
time.Sleep(200 * time.Millisecond)
defer CleanupTestServer(t, ts)
client := NewTestClient(t, ts.Addr, &nnet.ClientConfig{
ApplicationProtocol: "nnet",
})
defer CleanupTestClient(t, client)
ConnectTestClient(t, client)
time.Sleep(100 * time.Millisecond)
// 发送多个消息(粘包)
testData := []byte("test")
packet1, _ := proto.Encode(testData, nil)
packet2, _ := proto.Encode(testData, nil)
combinedPacket := append(packet1, packet2...)
// 发送合并的数据包
err = client.Send(combinedPacket)
require.NoError(t, err, "Send should succeed")
// 等待处理
time.Sleep(200 * time.Millisecond)
// 验证消息被正确拆包
assert.GreaterOrEqual(t, messageCount, 1, "At least one message should be processed")
t.Logf("Processed %d messages", messageCount)
}