package nnet import ( "encoding/json" "git.noahlan.cn/noahlan/nnet" "git.noahlan.cn/noahlan/nnet/conn" "git.noahlan.cn/noahlan/nnet/event" "git.noahlan.cn/noahlan/nnet/packet" "git.noahlan.cn/noahlan/ntool/nlog" ) type ( HandshakeValidatorFunc func(*HandshakeReq) error HandshakeAckPayloadFunc func() any ) func withNNetEvents( handshakeResp *HandshakeResp, validator HandshakeValidatorFunc, packer packet.Packer, ) nnet.RunOption { return func(ngin *nnet.Engine) { ngin.EventManager().RegisterEventFront(event.EvtOnReceive, onServerReceiveEvent(handshakeResp, validator, packer)) } } func onServerReceiveEvent( handshakeResp *HandshakeResp, validator HandshakeValidatorFunc, packer packet.Packer, ) event.OnReceiveFn { return func(nc *conn.Connection, p packet.IPacket) { pkg, ok := p.(*Packet) if !ok { nlog.Error(packet.ErrWrongPacketType) return } switch pkg.PacketType { case Handshake: var handshakeData HandshakeReq err := json.Unmarshal(pkg.Data, &handshakeData) nlog.Must(err) if err := validator(&handshakeData); err != nil { nlog.Error(err) return } handshakeResp.Payload = handshakeData.Payload data, err := json.Marshal(handshakeResp) nlog.Must(err) hrd, _ := packer.Pack(Header{ PacketType: Handshake, MessageHeader: MessageHeader{}, }, data) if err := nc.SendBytes(hrd); err != nil { nlog.Error(err) return } nc.SetStatus(conn.StatusPrepare) nlog.Debugf("connection handshake Id=%d, Remote=%s", nc.Session().ID(), nc.Conn().RemoteAddr()) case HandshakeAck: nc.SetStatus(conn.StatusPending) nlog.Debugf("receive handshake ACK Id=%d, Remote=%s", nc.Session().ID(), nc.Conn().RemoteAddr()) case Data: if nc.Status() < conn.StatusPending { nlog.Errorf("receive data on socket which not yet ACK, session will be closed immediately, remote=%s", nc.Conn().RemoteAddr()) return } nc.SetStatus(conn.StatusWorking) var lastMid uint64 switch pkg.MsgType { case Request: lastMid = pkg.ID case Notify: lastMid = 0 default: nlog.Errorf("Invalid message type: %s ", pkg.MsgType.String()) } nc.SetLastMID(lastMid) } } }