You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ngs/internal/message/message.go

223 lines
5.3 KiB
Go

package message
import (
"encoding/binary"
"errors"
"fmt"
"git.noahlan.cn/northlan/ngs/internal/log"
"strings"
)
// Type represents the type of message, which could be Request/Notify/Response/Push
type Type byte
// Message types
const (
Request Type = 0x00
Notify = 0x01
Response = 0x02
Push = 0x03
)
const (
msgRouteCompressMask = 0x01 // 0000 0001 last bit
msgTypeMask = 0x07 // 0000 0111 1-3 bit (需要>>)
msgRouteLengthMask = 0xFF // 1111 1111 last 8 bit
msgHeadLength = 0x02 // 0000 0010 2 bit
)
var types = map[Type]string{
Request: "Request",
Notify: "Notify",
Response: "Response",
Push: "Push",
}
func (t Type) String() string {
return types[t]
}
var (
routes = make(map[string]uint16) // route map to code
codes = make(map[uint16]string) // code map to route
)
// Errors that could be occurred in message codec
var (
ErrWrongMessageType = errors.New("wrong message type")
ErrInvalidMessage = errors.New("invalid message")
ErrRouteInfoNotFound = errors.New("route info not found in dictionary")
ErrWrongMessage = errors.New("wrong message")
)
// Message represents an unmarshaler message or a message which to be marshaled
type Message struct {
Type Type // message type (flag)
ID uint64 // unique id, zero while notify mode
Route string // route for locating service
Data []byte // payload
compressed bool // if message compressed
}
// New returns a new message instance
func New() *Message {
return &Message{}
}
// String, implementation of fmt.Stringer interface
func (m *Message) String() string {
return fmt.Sprintf("%s %s (%dbytes)", types[m.Type], m.Route, len(m.Data))
}
// Encode marshals message to binary format.
func (m *Message) Encode() ([]byte, error) {
return Encode(m)
}
func routable(t Type) bool {
return t == Request || t == Notify || t == Push
}
func invalidType(t Type) bool {
return t < Request || t > Push
}
// Encode marshals message to binary format. Different message types is corresponding to
// different message header, message types is identified by 2-4 bit of flag field. The
// relationship between message types and message header is presented as follows:
// ------------------------------------------
// | type | flag | other |
// |----------|--------|--------------------|
// | request |----000-|<message id>|<route>|
// | notify |----001-|<route> |
// | response |----010-|<message id> |
// | push |----011-|<route> |
// ------------------------------------------
// The figure above indicates that the bit does not affect the type of message.
func Encode(m *Message) ([]byte, error) {
if invalidType(m.Type) {
return nil, ErrWrongMessageType
}
buf := make([]byte, 0)
flag := byte(m.Type << 1) // 编译器提示,此处 byte 转换不能删
code, compressed := routes[m.Route]
if compressed {
flag |= msgRouteCompressMask
}
buf = append(buf, flag)
if m.Type == Request || m.Type == Response {
n := m.ID
// variant length encode
for {
b := byte(n % 128)
n >>= 7
if n != 0 {
buf = append(buf, b+128)
} else {
buf = append(buf, b)
break
}
}
}
if routable(m.Type) {
if compressed {
buf = append(buf, byte((code>>8)&0xFF))
buf = append(buf, byte(code&0xFF))
} else {
buf = append(buf, byte(len(m.Route)))
buf = append(buf, []byte(m.Route)...)
}
}
buf = append(buf, m.Data...)
return buf, nil
}
// Decode unmarshal the bytes slice to a message
func Decode(data []byte) (*Message, error) {
if len(data) < msgHeadLength {
return nil, ErrInvalidMessage
}
m := New()
flag := data[0]
offset := 1
m.Type = Type((flag >> 1) & msgTypeMask) // 编译器提示,此处Type转换不能删
if invalidType(m.Type) {
return nil, ErrWrongMessageType
}
if m.Type == Request || m.Type == Response {
id := uint64(0)
// little end byte order
// WARNING: must can be stored in 64 bits integer
// variant length encode
for i := offset; i < len(data); i++ {
b := data[i]
id += uint64(b&0x7F) << uint64(7*(i-offset))
if b < 128 {
offset = i + 1
break
}
}
m.ID = id
}
if offset >= len(data) {
return nil, ErrWrongMessage
}
if routable(m.Type) {
if flag&msgRouteCompressMask == 1 {
m.compressed = true
code := binary.BigEndian.Uint16(data[offset:(offset + 2)])
route, ok := codes[code]
if !ok {
return nil, ErrRouteInfoNotFound
}
m.Route = route
offset += 2
} else {
m.compressed = false
rl := data[offset]
offset++
if offset+int(rl) > len(data) {
return nil, ErrWrongMessage
}
m.Route = string(data[offset:(offset + int(rl))])
offset += int(rl)
}
}
if offset > len(data) {
return nil, ErrWrongMessage
}
m.Data = data[offset:]
return m, nil
}
// SetDictionary set routes map which be used to compress route.
// TODO(warning): set dictionary in runtime would be a dangerous operation!!!!!!
func SetDictionary(dict map[string]uint16) {
for route, code := range dict {
r := strings.TrimSpace(route)
// duplication check
if _, ok := routes[r]; ok {
log.Println(fmt.Sprintf("duplicated route(route: %s, code: %d)", r, code))
}
if _, ok := codes[code]; ok {
log.Println(fmt.Sprintf("duplicated route(route: %s, code: %d)", r, code))
}
// update map, using last value when key duplicated
routes[r] = code
codes[code] = r
}
}