feat: 添加 plain 内置协议,仅支持一个handler

main v0.5.3
NoahLan 2 years ago
parent 02775f087b
commit 2cc3e06721

@ -1,5 +1,12 @@
package packet
import "errors"
var (
// ErrWrongPacketType represents a wrong packet type.
ErrWrongPacketType = errors.New("wrong packet type")
)
type (
// IPacket 数据帧
IPacket interface {

@ -1,4 +1,4 @@
package protocol
package nnet
import (
"encoding/json"
@ -15,9 +15,9 @@ type OnReadyFunc func()
func WithNNetClientPipeline(onReady OnReadyFunc, packer packet.Packer) core.RunOption {
return func(server *core.NNet) {
server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error {
pkg, ok := v.(*NNetPacket)
pkg, ok := v.(*Packet)
if !ok {
return ErrWrongPacketType
return packet.ErrWrongPacketType
}
conn, _ := entity.Conn()

@ -1,4 +1,4 @@
package protocol
package nnet
import (
"git.noahlan.cn/noahlan/nnet/core"
@ -10,7 +10,7 @@ import (
)
type (
NNetConfig struct {
Config struct {
HeartbeatInterval time.Duration
HandshakeValidator HandshakeValidatorFunc
}
@ -37,36 +37,34 @@ type (
)
func WithNNetClientProtocol(onReady OnReadyFunc) []core.RunOption {
router := NewNNetRouter().(*nNetRouter)
packer := NewNNetPacker(router.routeMap)
router := NewRouter().(*nRouter)
packer := NewPacker(router.routeMap)
opts := []core.RunOption{
WithNNetClientPipeline(onReady, packer),
core.WithRouter(router),
core.WithPacker(func() packet.Packer { return NewNNetPacker(router.routeMap) }),
core.WithPacker(func() packet.Packer { return NewPacker(router.routeMap) }),
}
return opts
}
func WithNNetProtocol(config NNetConfig) []core.RunOption {
func WithNNetProtocol(config Config) []core.RunOption {
if config.HandshakeValidator == nil {
config.HandshakeValidator = func(data *HandshakeReq) error {
return nil
}
}
router := NewNNetRouter().(*nNetRouter)
router := NewRouter().(*nRouter)
handshakeAckData := &HandshakeResp{
Heartbeat: int64(config.HeartbeatInterval.Seconds()),
RouteMap: router.routeMap,
}
router.routeMap.Routes["hahah"] = 222
packer := NewNNetPacker(router.routeMap)
packer := NewPacker(router.routeMap)
opts := []core.RunOption{
WithNNetPipeline(handshakeAckData, config.HandshakeValidator, packer),
withNNetPipeline(handshakeAckData, config.HandshakeValidator, packer),
core.WithRouter(router),
core.WithPacker(func() packet.Packer { return NewNNetPacker(router.routeMap) }),
core.WithPacker(func() packet.Packer { return NewPacker(router.routeMap) }),
}
if config.HeartbeatInterval.Seconds() > 0 {

@ -1,4 +1,4 @@
package protocol
package nnet
import (
"bytes"
@ -7,7 +7,7 @@ import (
"git.noahlan.cn/noahlan/nnet/packet"
)
type NNetPacker struct {
type Packer struct {
buf *bytes.Buffer
size int // 最近一次 length
typ byte // 最近一次 packet type
@ -31,13 +31,10 @@ var (
ErrWrongMessageType = errors.New("wrong message type")
ErrRouteInfoNotFound = errors.New("route info not found in dictionary")
ErrWrongMessage = errors.New("wrong message")
// ErrWrongPacketType represents a wrong packet type.
ErrWrongPacketType = errors.New("wrong packet type")
)
func NewNNetPacker(routeMap *RouteMap) *NNetPacker {
p := &NNetPacker{
func NewPacker(routeMap *RouteMap) *Packer {
p := &Packer{
buf: bytes.NewBuffer(nil),
routeMap: routeMap,
}
@ -45,29 +42,29 @@ func NewNNetPacker(routeMap *RouteMap) *NNetPacker {
return p
}
func (d *NNetPacker) resetFlags() {
func (d *Packer) resetFlags() {
d.size = -1
d.typ = byte(Unknown)
d.flag = 0x00
}
func (d *NNetPacker) routable(t MsgType) bool {
func (d *Packer) routable(t MsgType) bool {
return t == Request || t == Notify || t == Push
}
func (d *NNetPacker) invalidType(t MsgType) bool {
func (d *Packer) invalidType(t MsgType) bool {
return t < Request || t > Push
}
func (d *NNetPacker) Pack(header interface{}, data []byte) ([]byte, error) {
func (d *Packer) Pack(header interface{}, data []byte) ([]byte, error) {
h, ok := header.(Header)
if !ok {
return nil, ErrWrongPacketType
return nil, packet.ErrWrongPacketType
}
typ := h.PacketType
if typ < Handshake || typ > Kick {
return nil, ErrWrongPacketType
return nil, packet.ErrWrongPacketType
}
if d.invalidType(h.MsgType) {
@ -123,7 +120,7 @@ func (d *NNetPacker) Pack(header interface{}, data []byte) ([]byte, error) {
}
// Encode packet data length to bytes(Big end)
func (d *NNetPacker) intToBytes(n uint32) []byte {
func (d *Packer) intToBytes(n uint32) []byte {
buf := make([]byte, 3)
buf[0] = byte((n >> 16) & 0xFF)
buf[1] = byte((n >> 8) & 0xFF)
@ -131,7 +128,7 @@ func (d *NNetPacker) intToBytes(n uint32) []byte {
return buf
}
func (d *NNetPacker) Unpack(data []byte) ([]packet.IPacket, error) {
func (d *Packer) Unpack(data []byte) ([]packet.IPacket, error) {
d.buf.Write(data) // copy
var (
@ -219,11 +216,11 @@ func (d *NNetPacker) Unpack(data []byte) ([]packet.IPacket, error) {
return packets, nil
}
func (d *NNetPacker) readHeader() error {
func (d *Packer) readHeader() error {
header := d.buf.Next(headLength)
d.typ = header[0]
if d.typ < Handshake || d.typ > Kick {
return ErrWrongPacketType
return packet.ErrWrongPacketType
}
d.size = d.bytesToInt(header[1 : len(header)-1])
d.flag = header[len(header)-1]
@ -236,7 +233,7 @@ func (d *NNetPacker) readHeader() error {
}
// Decode packet data length byte to int(Big end)
func (d *NNetPacker) bytesToInt(b []byte) int {
func (d *Packer) bytesToInt(b []byte) int {
result := 0
for _, v := range b {
result = result<<8 + int(v)

@ -1,4 +1,4 @@
package protocol
package nnet
import (
"encoding/hex"
@ -7,7 +7,7 @@ import (
)
func TestPacker(t *testing.T) {
p := NewNNetPacker()
p := NewPacker(NewRouteMap())
body := []byte("")
header := Header{
@ -88,7 +88,7 @@ func TestPacker(t *testing.T) {
func TestUnPack(t *testing.T) {
data := []byte{0x04, 0x00, 0x00, 0x23, 0x04, 0x03, 0xE6, 0x9C, 0x8D, 0xE5, 0x8A, 0xA1, 0xE5, 0x99, 0xA8, 0xE6, 0x8E, 0xA5, 0xE6, 0x94, 0xB6, 0xE5, 0x88, 0xB0, 0xE6, 0x95, 0xB0, 0xE6, 0x8D, 0xAE, 0xE4, 0xB8, 0xBA, 0x3A, 0x20, 0x6E, 0x69, 0x20, 0x68, 0x61, 0x6F}
p := NewNNetPacker()
p := NewPacker(NewRouteMap())
unPacked, err := p.Unpack(data)
if err != nil {

@ -1,4 +1,4 @@
package protocol
package nnet
import (
"encoding/hex"
@ -61,14 +61,14 @@ type (
Route string // route for locating service
compressed bool // if message compressed
}
NNetPacket struct {
Packet struct {
Header
Data []byte // 原始数据
}
)
func newPacket(typ Type) *NNetPacket {
return &NNetPacket{
func newPacket(typ Type) *Packet {
return &Packet{
Header: Header{
PacketType: typ,
MessageHeader: MessageHeader{},
@ -76,19 +76,19 @@ func newPacket(typ Type) *NNetPacket {
}
}
func (p *NNetPacket) GetHeader() interface{} {
func (p *Packet) GetHeader() interface{} {
return p.Header
}
func (p *NNetPacket) GetLen() uint64 {
func (p *Packet) GetLen() uint64 {
return uint64(p.Length)
}
func (p *NNetPacket) GetBody() []byte {
func (p *Packet) GetBody() []byte {
return p.Data
}
func (p *NNetPacket) String() string {
return fmt.Sprintf("NNetPacket[Type: %d, Len: %d] Message[Type: %s, ID: %d, Route: %s, Compressed: %v] BodyStr: [%s], BodyHex: [%s]",
func (p *Packet) String() string {
return fmt.Sprintf("Packet[Type: %d, Len: %d] Message[Type: %s, ID: %d, Route: %s, Compressed: %v] BodyStr: [%s], BodyHex: [%s]",
p.PacketType, p.Length, p.MsgType, p.ID, p.Route, p.compressed, string(p.Data), hex.EncodeToString(p.Data))
}

@ -1,4 +1,4 @@
package protocol
package nnet
import (
"encoding/json"
@ -15,16 +15,16 @@ type (
HandshakeAckPayloadFunc func() interface{}
)
func WithNNetPipeline(
func withNNetPipeline(
handshakeResp *HandshakeResp,
validator HandshakeValidatorFunc,
packer packet.Packer,
) core.RunOption {
return func(server *core.NNet) {
server.Pipeline().Inbound().PushFront(func(entity entity.NetworkEntity, v interface{}) error {
pkg, ok := v.(*NNetPacket)
pkg, ok := v.(*Packet)
if !ok {
return ErrWrongPacketType
return packet.ErrWrongPacketType
}
conn, _ := entity.Conn()

@ -1,4 +1,4 @@
package protocol
package nnet
import (
"errors"
@ -9,11 +9,6 @@ import (
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
)
var (
Routes = make(map[string]uint16) // route map to code
Codes = make(map[uint16]string) // code map to route
)
type (
RouteMap struct {
// 路由
@ -26,7 +21,7 @@ type (
Code uint16 // 路由编码
}
nNetRouter struct {
nRouter struct {
routeMap *RouteMap
handlers map[string]core.Handler
notFound core.Handler
@ -40,17 +35,17 @@ func NewRouteMap() *RouteMap {
}
}
func NewNNetRouter() core.Router {
return &nNetRouter{
func NewRouter() core.Router {
return &nRouter{
routeMap: NewRouteMap(),
handlers: make(map[string]core.Handler),
}
}
func (r *nNetRouter) Handle(entity entity.NetworkEntity, p packet.IPacket) {
pkg, ok := p.(*NNetPacket)
func (r *nRouter) Handle(entity entity.NetworkEntity, p packet.IPacket) {
pkg, ok := p.(*Packet)
if !ok {
nlog.Error(ErrWrongMessage)
nlog.Error(packet.ErrWrongPacketType)
return
}
handler, ok := r.handlers[pkg.Header.Route]
@ -65,7 +60,7 @@ func (r *nNetRouter) Handle(entity entity.NetworkEntity, p packet.IPacket) {
handler.Handle(entity, p)
}
func (r *nNetRouter) Register(matches interface{}, handler core.Handler) error {
func (r *nRouter) Register(matches interface{}, handler core.Handler) error {
match, ok := matches.(Match)
if !ok {
return errors.New(fmt.Sprintf("the type of matches must be %T", Match{}))
@ -77,6 +72,6 @@ func (r *nNetRouter) Register(matches interface{}, handler core.Handler) error {
return nil
}
func (r *nNetRouter) SetNotFoundHandler(handler core.Handler) {
func (r *nRouter) SetNotFoundHandler(handler core.Handler) {
r.notFound = handler
}

@ -0,0 +1,41 @@
package plain
import (
"bytes"
"git.noahlan.cn/noahlan/nnet/packet"
)
type Packer struct {
buf *bytes.Buffer
size int // 最近一次 length
typ byte // 最近一次 packet type
flag byte // 最近一次 flag
}
func NewPacker() *Packer {
p := &Packer{
buf: bytes.NewBuffer(nil),
}
p.resetFlags()
return p
}
func (d *Packer) resetFlags() {
}
func (d *Packer) Pack(_ interface{}, data []byte) ([]byte, error) {
buf := make([]byte, len(data))
copy(buf, data)
return buf, nil
}
func (d *Packer) Unpack(data []byte) ([]packet.IPacket, error) {
d.buf.Write(data) // copy
var packets []packet.IPacket
p := newPacket()
p.Data = d.buf.Next(d.buf.Len())
packets = append(packets, p)
return packets, nil
}

@ -0,0 +1,31 @@
package plain
import (
"encoding/hex"
"fmt"
)
type Packet struct {
Data []byte // 原始数据
}
func newPacket() *Packet {
return &Packet{}
}
func (p *Packet) GetHeader() interface{} {
return nil
}
func (p *Packet) GetLen() uint64 {
return uint64(len(p.Data))
}
func (p *Packet) GetBody() []byte {
return p.Data
}
func (p *Packet) String() string {
return fmt.Sprintf("PlainPacket[Len: %d] BodyStr: [%s], BodyHex: [%s]",
len(p.Data), string(p.Data), hex.EncodeToString(p.Data))
}

@ -0,0 +1,22 @@
package plain
import (
"git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet"
)
func withPipeline() core.RunOption {
return func(net *core.NNet) {
net.Pipeline().Inbound().PushFront(func(et entity.NetworkEntity, v interface{}) error {
_, ok := v.(*Packet)
if !ok {
return packet.ErrWrongPacketType
}
if et.Status() != core.StatusWorking {
et.SetStatus(core.StatusWorking)
}
return nil
})
}
}

@ -0,0 +1,15 @@
package plain
import (
"git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/packet"
)
func WithPlainProtocol() []core.RunOption {
opts := []core.RunOption{
withPipeline(),
core.WithRouter(NewRouter()),
core.WithPacker(func() packet.Packer { return NewPacker() }),
}
return opts
}

@ -0,0 +1,43 @@
package plain
import (
"git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
)
type Router struct {
plainHandler core.Handler
notFound core.Handler
}
func NewRouter() core.Router {
return &Router{}
}
func (r *Router) Handle(entity entity.NetworkEntity, pkg packet.IPacket) {
p, ok := pkg.(*Packet)
if !ok {
nlog.Error(packet.ErrWrongPacketType)
return
}
if r.plainHandler == nil {
if r.notFound == nil {
nlog.Error("message handler not found")
return
}
r.notFound.Handle(entity, p)
return
}
r.plainHandler.Handle(entity, p)
}
func (r *Router) Register(_ interface{}, handler core.Handler) error {
r.plainHandler = handler
return nil
}
func (r *Router) SetNotFoundHandler(handler core.Handler) {
r.notFound = handler
}

@ -6,7 +6,7 @@ import (
"git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/protocol"
"git.noahlan.cn/noahlan/nnet/protocol/nnet"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"git.noahlan.cn/noahlan/ntools-go/core/pool"
"math"
@ -29,23 +29,23 @@ func runServer(addr string) {
Nonblocking: false,
DisablePurge: false,
},
}, protocol.WithNNetProtocol(protocol.NNetConfig{
}, nnet.WithNNetProtocol(nnet.Config{
HeartbeatInterval: 0,
HandshakeValidator: nil,
})...)
server.AddRoutes([]core.Route{
{
Matches: protocol.Match{
Matches: nnet.Match{
Route: "ping",
Code: 1,
},
Handler: func(et entity.NetworkEntity, pkg packet.IPacket) {
nlog.Info("client ping, server pong -> ")
err := et.Send(protocol.Header{
PacketType: protocol.Data,
MessageHeader: protocol.MessageHeader{
MsgType: protocol.Request,
err := et.Send(nnet.Header{
PacketType: nnet.Data,
MessageHeader: nnet.MessageHeader{
MsgType: nnet.Request,
ID: 1,
Route: "pong",
},
@ -70,13 +70,13 @@ func runClient(addr string) (client *core.Client, et entity.NetworkEntity) {
Nonblocking: false,
DisablePurge: false,
},
}, protocol.WithNNetClientProtocol(func() {
}, nnet.WithNNetClientProtocol(func() {
chReady <- struct{}{}
})...)
client.AddRoutes([]core.Route{
{
Matches: protocol.Match{
Matches: nnet.Match{
Route: "test.client",
Code: 1,
},
@ -87,7 +87,7 @@ func runClient(addr string) (client *core.Client, et entity.NetworkEntity) {
})
et = client.Dial(addr)
handshake, err := json.Marshal(&protocol.HandshakeReq{
handshake, err := json.Marshal(&nnet.HandshakeReq{
Version: "1.0.0",
Type: "test",
ClientId: "a",
@ -98,10 +98,10 @@ func runClient(addr string) (client *core.Client, et entity.NetworkEntity) {
})
nlog.Must(err)
packer := protocol.NewNNetPacker(protocol.NewRouteMap())
hsd, err := packer.Pack(protocol.Header{
PacketType: protocol.Handshake,
MessageHeader: protocol.MessageHeader{
packer := nnet.NewPacker(nnet.NewRouteMap())
hsd, err := packer.Pack(nnet.Header{
PacketType: nnet.Handshake,
MessageHeader: nnet.MessageHeader{
MsgType: 0,
ID: 0,
Route: "",

@ -4,7 +4,7 @@ import (
"git.noahlan.cn/noahlan/nnet/core"
"git.noahlan.cn/noahlan/nnet/entity"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/nnet/protocol"
"git.noahlan.cn/noahlan/nnet/protocol/nnet"
"git.noahlan.cn/noahlan/ntools-go/core/nlog"
"sync"
"testing"
@ -17,16 +17,16 @@ func TestServer(t *testing.T) {
func TestClient(t *testing.T) {
client, et := runClient("127.0.0.1:6666")
client.AddRoute(core.Route{
Matches: protocol.Match{
Matches: nnet.Match{
Route: "pong",
Code: 2,
},
Handler: func(et entity.NetworkEntity, pkg packet.IPacket) {
nlog.Info("server pong, client ping ->")
_ = et.Send(protocol.Header{
PacketType: protocol.Data,
MessageHeader: protocol.MessageHeader{
MsgType: protocol.Request,
_ = et.Send(nnet.Header{
PacketType: nnet.Data,
MessageHeader: nnet.MessageHeader{
MsgType: nnet.Request,
ID: 1,
Route: "ping",
},
@ -34,10 +34,10 @@ func TestClient(t *testing.T) {
},
})
_ = et.Send(protocol.Header{
PacketType: protocol.Data,
MessageHeader: protocol.MessageHeader{
MsgType: protocol.Request,
_ = et.Send(nnet.Header{
PacketType: nnet.Data,
MessageHeader: nnet.MessageHeader{
MsgType: nnet.Request,
ID: 1,
Route: "ping",
},

Loading…
Cancel
Save