package middleware import ( "git.noahlan.cn/noahlan/nnet/core" "git.noahlan.cn/noahlan/nnet/entity" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntools-go/core/nlog" "sync/atomic" "time" ) type HeartbeatMiddleware struct { lastAt int64 interval time.Duration hbdFn func(entity entity.NetworkEntity) []byte } func WithHeartbeat(interval time.Duration, hbdFn func(entity entity.NetworkEntity) []byte) core.RunOption { m := &HeartbeatMiddleware{ lastAt: time.Now().Unix(), interval: interval, hbdFn: hbdFn, } if hbdFn == nil { nlog.Error("dataFn must not be nil") panic("dataFn must not be nil") } return func(server *core.NNet) { server.Lifetime().OnOpen(m.start) server.Use(func(next core.HandlerFunc) core.HandlerFunc { return func(entity entity.NetworkEntity, pkg packet.IPacket) { m.handle(entity, pkg) next(entity, pkg) } }) } } func (m *HeartbeatMiddleware) start(entity entity.NetworkEntity) { ticker := time.NewTicker(m.interval) defer func() { ticker.Stop() }() for { select { case <-ticker.C: deadline := time.Now().Add(-2 * m.interval).Unix() if atomic.LoadInt64(&m.lastAt) < deadline { nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) return } err := entity.SendBytes(m.hbdFn(entity)) if err != nil { nlog.Errorf("Heartbeat err: %v", err) return } } } } func (m *HeartbeatMiddleware) handle(_ entity.NetworkEntity, _ packet.IPacket) { atomic.StoreInt64(&m.lastAt, time.Now().Unix()) }