package middleware import ( "git.noahlan.cn/noahlan/nnet" "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/event" "git.noahlan.cn/noahlan/nnet/packet" rt "git.noahlan.cn/noahlan/nnet/router" "git.noahlan.cn/noahlan/ntool/nlog" "sync/atomic" "time" ) type HeartbeatMiddleware struct { lastAt int64 interval time.Duration hbdFn func(conn *conn.Connection) []byte } func WithHeartbeat(interval time.Duration, hbdFn func(conn *conn.Connection) []byte) nnet.RunOption { m := &HeartbeatMiddleware{ lastAt: time.Now().Unix(), interval: interval, hbdFn: hbdFn, } if hbdFn == nil { nlog.Error("dataFn must not be nil") panic("dataFn must not be nil") } return func(ngin *nnet.Engine) { ngin.EventManager().RegisterEvent(event.EvtOnConnected, m.start) ngin.Use(func(next rt.HandlerFunc) rt.HandlerFunc { return func(conn *conn.Connection, pkg packet.IPacket) { m.handle(conn, pkg) next(conn, pkg) } }) } } func (m *HeartbeatMiddleware) start(conn *conn.Connection) { ticker := time.NewTicker(m.interval) defer func() { ticker.Stop() }() for { select { case <-ticker.C: deadline := time.Now().Add(-2 * m.interval).Unix() if atomic.LoadInt64(&m.lastAt) < deadline { nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) return } err := conn.SendBytes(m.hbdFn(conn)) if err != nil { nlog.Errorf("Heartbeat err: %v", err) return } } } } func (m *HeartbeatMiddleware) handle(_ *conn.Connection, _ packet.IPacket) { atomic.StoreInt64(&m.lastAt, time.Now().Unix()) }