diff --git a/middleware/heartbeat_ws.go b/middleware/heartbeat_ws.go index da2a489..4fcdb29 100644 --- a/middleware/heartbeat_ws.go +++ b/middleware/heartbeat_ws.go @@ -31,8 +31,37 @@ func WithHeartbeatWS(interval time.Duration, hbdFn WsHeartbeatFn) nnet.RunOption panic("dataFn must not be nil") } + var startFn event.OnConnectedFn = func(nc *conn.Connection) { + ticker := time.NewTicker(m.interval) + + defer func() { + ticker.Stop() + }() + + for { + select { + case <-ticker.C: + if nc.Type() != conn.ConnTypeWS { + break + } + + deadline := time.Now().Add(-2 * m.interval).Unix() + if atomic.LoadInt64(&m.lastAt) < deadline { + nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) + return + } + + err := m.hbdFn(nc.WsConn()) + if err != nil { + nlog.Errorf("Heartbeat err: %v", err) + return + } + } + } + } + return func(ngin *nnet.Engine) { - ngin.EventManager().RegisterEvent(event.EvtOnConnected, m.start) + ngin.EventManager().RegisterEvent(event.EvtOnConnected, startFn) ngin.Use(func(next rt.HandlerFunc) rt.HandlerFunc { return func(conn *conn.Connection, pkg packet.IPacket) { @@ -44,35 +73,6 @@ func WithHeartbeatWS(interval time.Duration, hbdFn WsHeartbeatFn) nnet.RunOption } } -func (m *HeartbeatWsMiddleware) start(nc *conn.Connection) { - ticker := time.NewTicker(m.interval) - - defer func() { - ticker.Stop() - }() - - for { - select { - case <-ticker.C: - if nc.Type() != conn.ConnTypeWS { - break - } - - deadline := time.Now().Add(-2 * m.interval).Unix() - if atomic.LoadInt64(&m.lastAt) < deadline { - nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) - return - } - - err := m.hbdFn(nc.WsConn()) - if err != nil { - nlog.Errorf("Heartbeat err: %v", err) - return - } - } - } -} - func (m *HeartbeatWsMiddleware) handle(_ *conn.Connection, _ packet.IPacket) { atomic.StoreInt64(&m.lastAt, time.Now().Unix()) }