From 58d5c8f8e9c5d7a032efc8114f8a76735ce02936 Mon Sep 17 00:00:00 2001 From: NoahLan <6995syu@163.com> Date: Thu, 14 Sep 2023 11:54:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0websocket=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E4=B8=AD=E9=97=B4=E4=BB=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/heartbeat_ws.go | 78 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 middleware/heartbeat_ws.go diff --git a/middleware/heartbeat_ws.go b/middleware/heartbeat_ws.go new file mode 100644 index 0000000..da2a489 --- /dev/null +++ b/middleware/heartbeat_ws.go @@ -0,0 +1,78 @@ +package middleware + +import ( + "git.noahlan.cn/noahlan/nnet" + "git.noahlan.cn/noahlan/nnet/conn" + "git.noahlan.cn/noahlan/nnet/event" + "git.noahlan.cn/noahlan/nnet/packet" + rt "git.noahlan.cn/noahlan/nnet/router" + "git.noahlan.cn/noahlan/ntool/nlog" + "sync/atomic" + "time" +) + +type ( + HeartbeatWsMiddleware struct { + lastAt int64 + interval time.Duration + hbdFn WsHeartbeatFn + } + WsHeartbeatFn func(conn *conn.WSConn) error +) + +func WithHeartbeatWS(interval time.Duration, hbdFn WsHeartbeatFn) nnet.RunOption { + m := &HeartbeatWsMiddleware{ + lastAt: time.Now().Unix(), + interval: interval, + hbdFn: hbdFn, + } + if hbdFn == nil { + nlog.Error("dataFn must not be nil") + panic("dataFn must not be nil") + } + + return func(ngin *nnet.Engine) { + ngin.EventManager().RegisterEvent(event.EvtOnConnected, m.start) + + ngin.Use(func(next rt.HandlerFunc) rt.HandlerFunc { + return func(conn *conn.Connection, pkg packet.IPacket) { + m.handle(conn, pkg) + + next(conn, pkg) + } + }) + } +} + +func (m *HeartbeatWsMiddleware) start(nc *conn.Connection) { + ticker := time.NewTicker(m.interval) + + defer func() { + ticker.Stop() + }() + + for { + select { + case <-ticker.C: + if nc.Type() != conn.ConnTypeWS { + break + } + + deadline := time.Now().Add(-2 * m.interval).Unix() + if atomic.LoadInt64(&m.lastAt) < deadline { + nlog.Errorf("Heartbeat timeout, LastTime=%d, Deadline=%d", atomic.LoadInt64(&m.lastAt), deadline) + return + } + + err := m.hbdFn(nc.WsConn()) + if err != nil { + nlog.Errorf("Heartbeat err: %v", err) + return + } + } + } +} + +func (m *HeartbeatWsMiddleware) handle(_ *conn.Connection, _ packet.IPacket) { + atomic.StoreInt64(&m.lastAt, time.Now().Unix()) +}