You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

311 lines
6.6 KiB

package connection
import (
1 year ago
var (
ErrCloseClosedSession = errors.New("close closed session")
// ErrBrokenPipe represents the low-level connection has broken.
ErrBrokenPipe = errors.New("broken low-level pipe")
const (
// StatusStart 开始阶段
StatusStart int32 = iota + 1
// StatusPrepare 准备阶段
// StatusPending 等待工作阶段
// StatusWorking 工作阶段
// StatusClosed 连接关闭
type ConnType int
const (
ConnTypeTCP ConnType = iota // TCP connection
ConnTypeWS // Websocket connection
type (
Connection struct {
conf Config // 配置
session *session.Session // Session
1 year ago
status int32 // 连接状态
conn net.Conn // low-level conn fd
typ ConnType // 连接类型
packer packet.Packer // 封包、拆包器
serializer serialize.Serializer // 消息序列化/反序列化器
pipeline Pipeline // 连接生命周期管理
handleFn func(conn *Connection, pkg packet.IPacket) // 消息处理方法
lastMid uint64 // 最近一次消息ID
chDie chan struct{} // 停止通道
1 year ago
chSend chan PendingMessage // 消息发送通道(结构化消息)
chWrite chan []byte // 消息发送通道(二进制消息)
packetFn func(conn *Connection, pkg packet.IPacket)
Config struct {
LogDebug bool
LogPrefix string
1 year ago
PendingMessage struct {
header interface{}
payload interface{}
func NewConnection(
id int64,
conn net.Conn,
conf Config,
packerBuilder packet.PackerBuilder,
serializer serialize.Serializer,
pipeline Pipeline,
handleFn packetFn) *Connection {
r := &Connection{
conf: conf,
session: session.NewSession(id),
status: StatusStart,
conn: conn,
typ: ConnTypeTCP,
packer: packerBuilder(),
serializer: serializer,
pipeline: pipeline,
handleFn: handleFn,
lastMid: 0,
chDie: make(chan struct{}),
1 year ago
chSend: make(chan PendingMessage, 128),
chWrite: make(chan []byte, 128),
_, ok := conn.(*WSConn)
if ok {
r.typ = ConnTypeWS
return r
func (r *Connection) Send(header, payload interface{}) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
1 year ago
r.chSend <- PendingMessage{
header: header,
payload: payload,
return err
func (r *Connection) SendBytes(data []byte) (err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
r.chWrite <- data
return err
func (r *Connection) Status() int32 {
return atomic.LoadInt32(&r.status)
func (r *Connection) SetStatus(s int32) {
atomic.StoreInt32(&r.status, s)
func (r *Connection) Conn() (net.Conn, ConnType) {
return r.conn, r.typ
func (r *Connection) ID() int64 {
return r.session.ID()
func (r *Connection) Session() *session.Session {
return r.session
func (r *Connection) LastMID() uint64 {
return r.lastMid
func (r *Connection) SetLastMID(mid uint64) {
atomic.StoreUint64(&r.lastMid, mid)
func (r *Connection) Serve() {
1 year ago
_ = pool.Submit(func() {
1 year ago
_ = pool.Submit(func() {
func (r *Connection) write() {
defer func() {
_ = r.Close()
if r.conf.LogDebug {
nlog.Debugf("%s [writeLoop] connection write goroutine exit, ConnID=%d, SessionUID=%s",
r.conf.LogPrefix, r.ID(), r.session.UID())
for {
select {
case data := <-r.chSend:
// marshal packet body (data)
if r.serializer == nil {
if _, ok := data.payload.([]byte); !ok {
nlog.Errorf("%s serializer is nil, but payload type not []byte", r.conf.LogPrefix)
} else {
payload, err := r.serializer.Marshal(data.payload)
if err != nil {
nlog.Errorf("%s message body marshal err: %v", r.conf.LogPrefix, err)
data.payload = payload
// invoke pipeline
if pipe := r.pipeline; pipe != nil {
err := pipe.Outbound().Process(r, data)
if err != nil {
nlog.Errorf("%s pipeline err: %s", r.conf.LogPrefix, err.Error())
// packet pack data
p, err := r.packer.Pack(data.header, data.payload.([]byte))
if err != nil {
nlog.Errorf("%s pack err: %s", r.conf.LogPrefix, err.Error())
r.chWrite <- p
case data := <-r.chWrite:
// 回写数据
if _, err := r.conn.Write(data); err != nil {
nlog.Errorf("%s write data err: %s", r.conf.LogPrefix, err.Error())
1 year ago
//nlog.Debugf("write data %v", data)
case <-r.chDie: // connection close signal
//case <-r.ngin.dieChan: // application quit signal
// return
func (r *Connection) read() {
defer func() {
1 year ago
_ = r.Close()
buf := make([]byte, 4096)
for {
n, err := r.conn.Read(buf)
//nlog.Debugf("receive data %v", buf[:n])
if err != nil {
nlog.Errorf("%s [readLoop] Read message error: %s, session will be closed immediately",
r.conf.LogPrefix, err.Error())
if n == 0 {
nlog.Errorf("%s [readLoop] Read empty message, session will be closed immediately",
if r.packer == nil {
nlog.Errorf("%s [readLoop] unexpected error: packer is nil", r.conf.LogPrefix)
// warning: 为性能考虑复用slice处理数据buf传入后必须要copy再处理
packets, err := r.packer.Unpack(buf[:n])
if err != nil {
nlog.Errorf("%s unpack err: %s", r.conf.LogPrefix, err.Error())
// packets 处理
for _, p := range packets {
if err := r.processPacket(p); err != nil {
nlog.Errorf("%s process packet err: %s", r.conf.LogPrefix, err.Error())
func (r *Connection) processPacket(packet packet.IPacket) error {
if pipe := r.pipeline; pipe != nil {
err := pipe.Inbound().Process(r, packet)
if err != nil {
return errors.New(fmt.Sprintf("pipeline process failed: %v", err.Error()))
if r.Status() == StatusWorking {
// 处理包消息
1 year ago
_ = pool.Submit(func() {
r.handleFn(r, packet)
1 year ago
return nil
func (r *Connection) DieChan() chan struct{} {
return r.chDie
func (r *Connection) Close() error {
if r.Status() == StatusClosed {
return ErrCloseClosedSession
if r.conf.LogDebug {
nlog.Debugf("%s close connection, ID: %d", r.conf.LogPrefix, r.ID())
select {
case <-r.chDie:
1 year ago
return r.conn.Close()