fix: 修复因为goroutine导致ws无法正常发送数据的问题。

main v1.2.7
NoahLan 8 months ago
parent 96689a49b8
commit 379b4883a7

@ -38,22 +38,21 @@ func NewEngine(conf config.EngineConf, opts ...RunOption) *Engine {
router: rt.NewDefaultRouter(),
dieChan: make(chan struct{}),
connMgr: conn.NewConnManager(),
evtMgr: event.NewEventManager(),
sessIdMgr: session.NewSessionIDMgr(),
packerBuilder: func() packet.Packer {
return nil
},
}
if ngin.pool == nil {
ngin.pool, _ = ants.NewPool(math.MaxInt32)
}
ngin.evtMgr = event.NewEventManager(ngin.pool)
for _, opt := range opts {
opt(ngin)
}
if ngin.pool == nil {
ngin.pool, _ = ants.NewPool(math.MaxInt32)
}
return ngin
}

@ -5,7 +5,6 @@ import (
"git.noahlan.cn/noahlan/nnet/conn"
"git.noahlan.cn/noahlan/nnet/packet"
"git.noahlan.cn/noahlan/ntool/nlog"
"github.com/panjf2000/ants/v2"
)
var ErrEventTypeIllegal = errors.New("EventType illegal")
@ -65,8 +64,6 @@ type (
}
eventManager struct {
pool *ants.Pool
onConnected []OnConnectedFn
onConnectError []OnConnectErrorFn
onDisconnected []OnDisconnectedFn
@ -82,9 +79,8 @@ type (
///////////////// type-align
var _ Event = (*eventManager)(nil)
func NewEventManager(pool *ants.Pool) EventManager {
func NewEventManager() EventManager {
return &eventManager{
pool: pool,
onConnected: make([]OnConnectedFn, 0),
onConnectError: make([]OnConnectErrorFn, 0),
onDisconnected: make([]OnDisconnectedFn, 0),
@ -228,9 +224,7 @@ func (m *eventManager) OnConnected(nc *conn.Connection) {
return
}
for _, fn := range m.onConnected {
_ = m.pool.Submit(func() {
fn(nc)
})
fn(nc)
}
}
@ -239,9 +233,7 @@ func (m *eventManager) OnConnectError(err error) {
return
}
for _, fn := range m.onConnectError {
_ = m.pool.Submit(func() {
fn(err)
})
fn(err)
}
}
@ -250,9 +242,7 @@ func (m *eventManager) OnDisconnected(nc *conn.Connection, err error) {
return
}
for _, fn := range m.onDisconnected {
_ = m.pool.Submit(func() {
fn(nc, err)
})
fn(nc, err)
}
}
@ -261,9 +251,7 @@ func (m *eventManager) OnClose(nc *conn.Connection) {
return
}
for _, fn := range m.onClose {
_ = m.pool.Submit(func() {
fn(nc)
})
fn(nc)
}
}
@ -272,9 +260,7 @@ func (m *eventManager) OnSend(nc *conn.Connection, v any) {
return
}
for _, fn := range m.onSend {
_ = m.pool.Submit(func() {
fn(nc, v)
})
fn(nc, v)
}
}
@ -283,9 +269,7 @@ func (m *eventManager) OnSendError(nc *conn.Connection, v any, err error) {
return
}
for _, fn := range m.onSendError {
_ = m.pool.Submit(func() {
fn(nc, v, err)
})
fn(nc, v, err)
}
}
@ -294,9 +278,7 @@ func (m *eventManager) OnReceive(nc *conn.Connection, p packet.IPacket) {
return
}
for _, fn := range m.onReceive {
_ = m.pool.Submit(func() {
fn(nc, p)
})
fn(nc, p)
}
}
@ -305,8 +287,6 @@ func (m *eventManager) OnReceiveError(nc *conn.Connection, err error) {
return
}
for _, fn := range m.onReceiveError {
_ = m.pool.Submit(func() {
fn(nc, err)
})
fn(nc, err)
}
}

@ -29,27 +29,29 @@ func WithHeartbeat(interval time.Duration, hbdFn func(conn *conn.Connection) []b
}
var startFn event.OnConnectedFn = func(nc *conn.Connection) {
ticker := time.NewTicker(m.interval)
go func() {
ticker := time.NewTicker(m.interval)
defer func() {
ticker.Stop()
}()
defer func() {
ticker.Stop()
}()
for {
select {
case <-ticker.C:
deadline := time.Now().Add(-2 * m.interval).Unix()
if atomic.LoadInt64(&m.lastAt) < deadline {
nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline)
break
}
err := nc.SendBytes(m.hbdFn(nc))
if err != nil {
nlog.Errorf("Heartbeat err: %v", err)
return
for {
select {
case <-ticker.C:
deadline := time.Now().Add(-2 * m.interval).Unix()
if atomic.LoadInt64(&m.lastAt) < deadline {
nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline)
break
}
err := nc.SendBytes(m.hbdFn(nc))
if err != nil {
nlog.Errorf("Heartbeat err: %v", err)
return
}
}
}
}
}()
}
return func(ngin *nnet.Engine) {

@ -32,32 +32,33 @@ func WithHeartbeatWS(interval time.Duration, hbdFn WsHeartbeatFn) nnet.RunOption
}
var startFn event.OnConnectedFn = func(nc *conn.Connection) {
ticker := time.NewTicker(m.interval)
go func() {
ticker := time.NewTicker(m.interval)
defer func() {
ticker.Stop()
}()
for {
select {
case <-ticker.C:
if nc.Type() != conn.ConnTypeWS {
break
}
defer func() {
ticker.Stop()
}()
for {
select {
case <-ticker.C:
if nc.Type() != conn.ConnTypeWS {
break
}
//deadline := time.Now().Add(-2 * m.interval).Unix()
//if atomic.LoadInt64(&m.lastAt) < deadline {
// nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline)
// break
//}
//deadline := time.Now().Add(-2 * m.interval).Unix()
//if atomic.LoadInt64(&m.lastAt) < deadline {
// nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline)
// break
//}
err := m.hbdFn(nc.WsConn())
if err != nil {
nlog.Errorf("Heartbeat err: %v", err)
return
err := m.hbdFn(nc.WsConn())
if err != nil {
nlog.Errorf("Heartbeat err: %v", err)
return
}
}
}
}
}()
}
return func(ngin *nnet.Engine) {

Loading…
Cancel
Save