diff --git a/bilibili/live.go b/bilibili/live.go index b7a7103..294a5e9 100644 --- a/bilibili/live.go +++ b/bilibili/live.go @@ -3,11 +3,11 @@ package bilibili import ( "encoding/json" "fmt" + "git.noahlan.cn/northlan/ntools-go/logger" "io/ioutil" "live-gateway/bilibili/msg_handler" "live-gateway/config" "live-gateway/live" - "live-gateway/pkg/logger" "live-gateway/ws" "net/http" "time" @@ -46,13 +46,6 @@ func NewLiveBilibili() *live.Live { msgHandlerMapper: make(map[string]MsgHandler, 6), entered: make(chan struct{}), } - // 内置处理器 - bl.AddMessageHandler( - msg_handler.NewDanmakuHandler(), - msg_handler.NewSendGiftHandler(), - //&msg_handler.InterActWordHandler{}, - //&msg_handler.SendGiftHandler{}, - ) l := live.NewLive( live.WithWsOptions( @@ -103,6 +96,14 @@ func (l *LiveBilibili) PreConnect() (url string, err error) { l.RoomInfo = getRoomInfoResp.Data logger.SLog.Infof("获取房间数据: %+v\n", l.RoomInfo) //log.With("获取房间数据", l.RoomInfo) + + // 获取房间信息后再添加消息处理器 是因为需要房间号 + l.AddMessageHandler( + msg_handler.NewDanmakuHandler(l.RoomInfo.RoomId), + msg_handler.NewSendGiftHandler(l.RoomInfo.RoomId), + //&msg_handler.InterActWordHandler{}, + //&msg_handler.SendGiftHandler{}, + ) return } diff --git a/bilibili/msg_handler/danmaku.go b/bilibili/msg_handler/danmaku.go index 82b0f4b..7a12762 100644 --- a/bilibili/msg_handler/danmaku.go +++ b/bilibili/msg_handler/danmaku.go @@ -2,10 +2,11 @@ package msg_handler import ( "encoding/json" + "git.noahlan.cn/northlan/ntools-go/kafka" + "git.noahlan.cn/northlan/ntools-go/logger" "live-gateway/config" "live-gateway/pb" - "live-gateway/pkg/kafka" - "live-gateway/pkg/logger" + kfk "live-gateway/pkg/kafka" "strconv" ) @@ -39,13 +40,15 @@ type Danmaku struct { } type DanmakuHandler struct { - producer *kafka.Producer + producer *kafka.Producer + liveRoomId int64 } -func NewDanmakuHandler() *DanmakuHandler { +func NewDanmakuHandler(liveRoomId int64) *DanmakuHandler { cfg := config.Config.Kafka.Danmaku return &DanmakuHandler{ - producer: kafka.NewKafkaProducer(cfg.Addr, cfg.Topic), + producer: kafka.NewKafkaProducer(kfk.DefaultProducerConfig, cfg.Addr, cfg.Topic), + liveRoomId: liveRoomId, } } @@ -111,14 +114,15 @@ func (d *DanmakuHandler) HandlerMessage(data []byte) { logger.SLog.Debugf("%s 说: %s", dm.Uname, dm.Content) dmMsg := &pbMq.MqDanmaku{ - Platform: "bilibili", - Uid: dm.UID, - Uname: dm.Uname, - Content: dm.Content, - SendTime: dm.SendTime, + Platform: "bilibili", + LiveRoomId: d.liveRoomId, + Uid: dm.UID, + Uname: dm.Uname, + Content: dm.Content, + SendTime: dm.SendTime, } - _ = d.producer.SendMessageAsync(dmMsg, strconv.Itoa(int(dm.UID))) + _ = d.producer.SendMessageAsync(dmMsg, strconv.FormatInt(dm.UID, 10)) } func floatToBool(v float64) bool { diff --git a/bilibili/msg_handler/interact_word.go b/bilibili/msg_handler/interact_word.go index d59c1a7..949ae53 100644 --- a/bilibili/msg_handler/interact_word.go +++ b/bilibili/msg_handler/interact_word.go @@ -2,7 +2,6 @@ package msg_handler import ( "encoding/json" - "live-gateway/pkg/logger" ) type InterActWord struct { @@ -45,5 +44,5 @@ func (h *InterActWordHandler) HandlerMessage(data []byte) { return } - logger.SLog.Debugf("%s进入直播间", baseMsg.Data.Uname) + //logger.SLog.Debugf("%s进入直播间", baseMsg.Data.Uname) } diff --git a/bilibili/msg_handler/send_gift.go b/bilibili/msg_handler/send_gift.go index b66a07c..b850e78 100644 --- a/bilibili/msg_handler/send_gift.go +++ b/bilibili/msg_handler/send_gift.go @@ -2,9 +2,10 @@ package msg_handler import ( "encoding/json" + "git.noahlan.cn/northlan/ntools-go/kafka" "live-gateway/config" "live-gateway/pb" - "live-gateway/pkg/kafka" + kfk "live-gateway/pkg/kafka" "strconv" ) @@ -60,13 +61,15 @@ type SendGift struct { } type SendGiftHandler struct { - producer *kafka.Producer + producer *kafka.Producer + liveRoomId int64 } -func NewSendGiftHandler() *SendGiftHandler { +func NewSendGiftHandler(liveRoomId int64) *SendGiftHandler { cfg := config.Config.Kafka.Gift return &SendGiftHandler{ - producer: kafka.NewKafkaProducer(cfg.Addr, cfg.Topic), + producer: kafka.NewKafkaProducer(kfk.DefaultProducerConfig, cfg.Addr, cfg.Topic), + liveRoomId: liveRoomId, } } @@ -86,14 +89,15 @@ func (h *SendGiftHandler) HandlerMessage(data []byte) { //logger.SLog.Infof("%s %s礼物 %s x%d", baseMsg.Data.Uname, baseMsg.Data.Action, baseMsg.Data.GiftName, baseMsg.Data.Num) dmMsg := &pbMq.MqGift{ - Platform: "bilibili", - Uid: int64(baseMsg.Data.Uid), - Uname: baseMsg.Data.Uname, - GiftId: int32(baseMsg.Data.GiftId), - GiftName: baseMsg.Data.GiftName, - TotalCoin: int64(baseMsg.Data.TotalCoin), - SendTime: int64(baseMsg.Data.Timestamp), + Platform: "bilibili", + LiveRoomId: h.liveRoomId, + Uid: int64(baseMsg.Data.Uid), + Uname: baseMsg.Data.Uname, + GiftId: int32(baseMsg.Data.GiftId), + GiftName: baseMsg.Data.GiftName, + TotalCoin: int64(baseMsg.Data.TotalCoin), + SendTime: int64(baseMsg.Data.Timestamp), } - _ = h.producer.SendMessageAsync(dmMsg, strconv.Itoa(int(dmMsg.Uid))) + _ = h.producer.SendMessageAsync(dmMsg, strconv.FormatInt(dmMsg.Uid, 10)) } diff --git a/config.yml b/config.yml index 0ad7f75..9f83f27 100644 --- a/config.yml +++ b/config.yml @@ -3,7 +3,7 @@ Bilibili: GetRoomUrl: https://api.live.bilibili.com/room/v1/Room/room_init?id= HeartbeatInterval: 30 UserId: 111222 - RoomId: 6925399 + RoomId: 7777 Log: Console: Level: info diff --git a/config/config.go b/config/config.go index 841d39a..87e679e 100644 --- a/config/config.go +++ b/config/config.go @@ -2,9 +2,9 @@ package config import ( "fmt" + "git.noahlan.cn/northlan/ntools-go/logger" c "github.com/gookit/config/v2" "github.com/gookit/config/v2/yaml" - "live-gateway/pkg/logger" "time" ) diff --git a/go.mod b/go.mod index 141cbb5..4d54ffa 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,18 @@ module live-gateway go 1.18 require ( + git.noahlan.cn/northlan/ntools-go/kafka v1.0.1 + git.noahlan.cn/northlan/ntools-go/logger v1.0.1 + github.com/Shopify/sarama v1.32.0 github.com/andybalholm/brotli v1.0.4 github.com/gookit/config/v2 v2.1.0 github.com/gorilla/websocket v1.5.0 github.com/jpillora/backoff v1.0.0 github.com/pkg/errors v0.9.1 + google.golang.org/protobuf v1.28.0 ) require ( - github.com/Shopify/sarama v1.32.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect @@ -36,8 +39,7 @@ require ( go.uber.org/multierr v1.8.0 // indirect go.uber.org/zap v1.21.0 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect - golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 // indirect + golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect - google.golang.org/protobuf v1.28.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 61262f8..9d9c1c1 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,13 @@ +git.noahlan.cn/northlan/ntools-go/kafka v1.0.1 h1:SDUwYRzksZ3Vcu7PTZxk+TEMF2f3gBiQEboKOhi1yfI= +git.noahlan.cn/northlan/ntools-go/kafka v1.0.1/go.mod h1:RxX9JSUIr3Gbk+cvUwE5k+i08AgIK3TA9ayDJCMn2n8= +git.noahlan.cn/northlan/ntools-go/logger v1.0.1 h1:+08dMbsKGECM1B7H8GqwtRzGqOl5yrNNbJYo9tFoMf0= +git.noahlan.cn/northlan/ntools-go/logger v1.0.1/go.mod h1:QQwgylABV9P8MFGvXKlujJO5NV0MP0JUPzqQt3I0Y+w= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/Shopify/sarama v1.32.0 h1:P+RUjEaRU0GMMbYexGMDyrMkLhbbBVUVISDywi+IlFU= github.com/Shopify/sarama v1.32.0/go.mod h1:+EmJJKZWVT/faR9RcOxJerP+LId4iWdQPBGLy1Y1Njs= +github.com/Shopify/toxiproxy/v2 v2.3.0 h1:62YkpiP4bzdhKMH+6uC5E95y608k3zDwdzuBMsnn3uQ= github.com/Shopify/toxiproxy/v2 v2.3.0/go.mod h1:KvQTtB6RjCJY4zqNJn7C7JDFgsG5uoHYDirfUfpIm0c= github.com/agext/levenshtein v1.2.1/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558= github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= @@ -9,6 +15,7 @@ github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHG github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM= github.com/apparentlymart/go-textseg v1.0.0/go.mod h1:z96Txxhf3xSFMPmb5X/1W05FF/Nj9VFpLOpjS5yuumk= github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -21,7 +28,9 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns= github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/golang/protobuf v1.1.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -32,6 +41,7 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o= github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gookit/color v1.5.0 h1:1Opow3+BWDwqor78DcJkJCIwnkviFi+rrOANki9BUFw= @@ -60,6 +70,7 @@ github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8 github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= @@ -71,13 +82,13 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.1 h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583BL1A= github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= @@ -101,6 +112,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= @@ -132,6 +144,7 @@ github.com/zclconf/go-cty-debug v0.0.0-20191215020915-b22d67c1ba0b/go.mod h1:ZRK go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= @@ -158,8 +171,8 @@ golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 h1:6mzvA99KwZxbOrxww4EvWVQUnN1+xEu9tafK5ZxkYeA= -golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 h1:yssD99+7tqHWO5Gwh81phT+67hg+KttniBr6UnEXOY8= +golang.org/x/net v0.0.0-20220421235706-1d1ef9303861/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -192,6 +205,7 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= @@ -199,10 +213,11 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/live/live.go b/live/live.go index 5071f56..afea7aa 100644 --- a/live/live.go +++ b/live/live.go @@ -1,8 +1,8 @@ package live import ( + "git.noahlan.cn/northlan/ntools-go/logger" "github.com/pkg/errors" - "live-gateway/pkg/logger" "live-gateway/ws" ) diff --git a/main.go b/main.go index 760c5c0..301016f 100644 --- a/main.go +++ b/main.go @@ -2,9 +2,9 @@ package main import ( "flag" + "git.noahlan.cn/northlan/ntools-go/logger" "live-gateway/bilibili" "live-gateway/config" - "live-gateway/pkg/logger" "sync" ) diff --git a/pb/mq.pb.go b/pb/mq.pb.go index 20332ed..42465c4 100644 --- a/pb/mq.pb.go +++ b/pb/mq.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.0 +// protoc-gen-go v1.27.1 // protoc v3.19.4 // source: mq.proto @@ -25,11 +25,12 @@ type MqDanmaku struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` - Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` - Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"` - Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"` - SendTime int64 `protobuf:"varint,5,opt,name=sendTime,proto3" json:"sendTime,omitempty"` + Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` + LiveRoomId int64 `protobuf:"varint,2,opt,name=liveRoomId,proto3" json:"liveRoomId,omitempty"` + Uid int64 `protobuf:"varint,3,opt,name=uid,proto3" json:"uid,omitempty"` + Uname string `protobuf:"bytes,4,opt,name=uname,proto3" json:"uname,omitempty"` + Content string `protobuf:"bytes,5,opt,name=content,proto3" json:"content,omitempty"` + SendTime int64 `protobuf:"varint,6,opt,name=sendTime,proto3" json:"sendTime,omitempty"` } func (x *MqDanmaku) Reset() { @@ -71,6 +72,13 @@ func (x *MqDanmaku) GetPlatform() string { return "" } +func (x *MqDanmaku) GetLiveRoomId() int64 { + if x != nil { + return x.LiveRoomId + } + return 0 +} + func (x *MqDanmaku) GetUid() int64 { if x != nil { return x.Uid @@ -104,13 +112,14 @@ type MqGift struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` - Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` - Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"` - GiftId int32 `protobuf:"varint,4,opt,name=giftId,proto3" json:"giftId,omitempty"` - GiftName string `protobuf:"bytes,5,opt,name=giftName,proto3" json:"giftName,omitempty"` - TotalCoin int64 `protobuf:"varint,6,opt,name=totalCoin,proto3" json:"totalCoin,omitempty"` - SendTime int64 `protobuf:"varint,7,opt,name=sendTime,proto3" json:"sendTime,omitempty"` + Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` + LiveRoomId int64 `protobuf:"varint,2,opt,name=liveRoomId,proto3" json:"liveRoomId,omitempty"` + Uid int64 `protobuf:"varint,3,opt,name=uid,proto3" json:"uid,omitempty"` + Uname string `protobuf:"bytes,4,opt,name=uname,proto3" json:"uname,omitempty"` + GiftId int32 `protobuf:"varint,5,opt,name=giftId,proto3" json:"giftId,omitempty"` + GiftName string `protobuf:"bytes,6,opt,name=giftName,proto3" json:"giftName,omitempty"` + TotalCoin int64 `protobuf:"varint,7,opt,name=totalCoin,proto3" json:"totalCoin,omitempty"` + SendTime int64 `protobuf:"varint,8,opt,name=sendTime,proto3" json:"sendTime,omitempty"` } func (x *MqGift) Reset() { @@ -152,6 +161,13 @@ func (x *MqGift) GetPlatform() string { return "" } +func (x *MqGift) GetLiveRoomId() int64 { + if x != nil { + return x.LiveRoomId + } + return 0 +} + func (x *MqGift) GetUid() int64 { if x != nil { return x.Uid @@ -197,27 +213,31 @@ func (x *MqGift) GetSendTime() int64 { var File_mq_proto protoreflect.FileDescriptor var file_mq_proto_rawDesc = []byte{ - 0x0a, 0x08, 0x6d, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x85, + 0x0a, 0x08, 0x6d, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0xa5, 0x01, 0x0a, 0x09, 0x4d, 0x71, 0x44, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x6e, - 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, - 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6c, 0x69, 0x76, 0x65, + 0x52, 0x6f, 0x6f, 0x6d, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x6c, 0x69, + 0x76, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x49, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, - 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, - 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xba, 0x01, 0x0a, 0x06, 0x4d, 0x71, 0x47, 0x69, 0x66, + 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, + 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xda, 0x01, 0x0a, 0x06, 0x4d, 0x71, 0x47, 0x69, 0x66, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, - 0x03, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, - 0x14, 0x0a, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x1e, 0x0a, + 0x0a, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0a, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x49, 0x64, 0x12, 0x10, 0x0a, + 0x03, 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, + 0x14, 0x0a, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, 0x64, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, - 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, + 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x6f, 0x74, - 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, + 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, - 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, + 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x07, 0x5a, 0x05, 0x2f, 0x70, 0x62, 0x4d, 0x71, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } diff --git a/pb/mq.proto b/pb/mq.proto index 769c0b6..fb888d1 100644 --- a/pb/mq.proto +++ b/pb/mq.proto @@ -6,18 +6,20 @@ option go_package = "/pbMq"; message MqDanmaku { string platform = 1; - int64 uid = 2; - string uname = 3; - string content = 4; - int64 sendTime = 5; + int64 liveRoomId = 2; + int64 uid = 3; + string uname = 4; + string content = 5; + int64 sendTime = 6; } message MqGift { string platform = 1; - int64 uid = 2; - string uname = 3; - int32 giftId = 4; - string giftName = 5; - int64 totalCoin = 6; - int64 sendTime = 7; + int64 liveRoomId = 2; + int64 uid = 3; + string uname = 4; + int32 giftId = 5; + string giftName = 6; + int64 totalCoin = 7; + int64 sendTime = 8; } \ No newline at end of file diff --git a/pkg/kafka/codec.go b/pkg/kafka/codec.go new file mode 100644 index 0000000..d5933f9 --- /dev/null +++ b/pkg/kafka/codec.go @@ -0,0 +1,29 @@ +package kafka + +import ( + "errors" + "git.noahlan.cn/northlan/ntools-go/kafka" + "google.golang.org/protobuf/proto" +) + +var _ kafka.Marshaler = (*protobufMarshaler)(nil) +var _ kafka.UnMarshaler = (*protobufMarshaler)(nil) + +var ProtobufMarshaler = &protobufMarshaler{} + +type protobufMarshaler struct { +} + +func (p *protobufMarshaler) Marshal(v interface{}) ([]byte, error) { + if msg, ok := v.(proto.Message); ok { + return proto.Marshal(msg) + } + return nil, errors.New("v must be proto message") +} + +func (p *protobufMarshaler) UnMarshal(data []byte, v interface{}) error { + if msg, ok := v.(proto.Message); ok { + return proto.Unmarshal(data, msg) + } + return errors.New("v must be proto message") +} diff --git a/pkg/kafka/consumer.go b/pkg/kafka/consumer.go deleted file mode 100644 index d108021..0000000 --- a/pkg/kafka/consumer.go +++ /dev/null @@ -1,41 +0,0 @@ -package kafka - -import ( - "github.com/Shopify/sarama" - "live-gateway/pkg/logger" -) - -type Consumer struct { - client sarama.Client - topic string - consumer sarama.Consumer - partitions []int32 -} - -func NewKafkaConsumer(addr []string, topic string) (*Consumer, error) { - p := Consumer{} - p.topic = topic - - config := sarama.NewConfig() - config.Version = sarama.V3_1_0_0 - config.Consumer.Offsets.Initial = sarama.OffsetNewest - - var err error - p.client, err = sarama.NewClient(addr, config) - if err != nil { - logger.SLog.Error("new kafka client err:", err) - return nil, err - } - - p.consumer, err = sarama.NewConsumerFromClient(p.client) - if err != nil { - logger.SLog.Error("new kafka consumer err:", err) - return nil, err - } - p.partitions, err = p.consumer.Partitions(topic) - if err != nil { - logger.SLog.Errorf("get partitions for topic %s err", topic) - return nil, err - } - return &p, nil -} diff --git a/pkg/kafka/consumer_group.go b/pkg/kafka/consumer_group.go deleted file mode 100644 index 7a4f4e1..0000000 --- a/pkg/kafka/consumer_group.go +++ /dev/null @@ -1,47 +0,0 @@ -package kafka - -import ( - "context" - "github.com/Shopify/sarama" - "live-gateway/pkg/logger" -) - -type ConsumerGroup struct { - sarama.ConsumerGroup - groupId string - topics []string -} - -type ConsumerGroupConfig struct { - KafkaVersion sarama.KafkaVersion - OffsetsInitial int64 - IsReturnErr bool -} - -func NewConsumerGroup(config *ConsumerGroupConfig, addr, topics []string, groupId string) (*ConsumerGroup, error) { - c := sarama.NewConfig() - c.Version = config.KafkaVersion - c.Consumer.Offsets.Initial = config.OffsetsInitial - c.Consumer.Return.Errors = config.IsReturnErr - - client, err := sarama.NewClient(addr, c) - if err != nil { - return nil, err - } - - consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, client) - if err != nil { - return nil, err - } - return &ConsumerGroup{consumerGroup, groupId, topics}, nil -} - -func (cg *ConsumerGroup) RegisterHandlerAndConsumer(handler sarama.ConsumerGroupHandler) { - ctx := context.Background() - for { - err := cg.ConsumerGroup.Consume(ctx, cg.topics, handler) - if err != nil { - logger.SLog.Error("RegisterHandlerAndConsumer error: ", err) - } - } -} diff --git a/pkg/kafka/producer.go b/pkg/kafka/producer.go deleted file mode 100644 index 08b9c54..0000000 --- a/pkg/kafka/producer.go +++ /dev/null @@ -1,62 +0,0 @@ -package kafka - -import ( - "github.com/Shopify/sarama" - "google.golang.org/protobuf/proto" - "live-gateway/pkg/logger" -) - -type Producer struct { - topic string - client sarama.Client - producer sarama.AsyncProducer -} - -func NewKafkaProducer(addr []string, topic string) *Producer { - p := Producer{} - - config := sarama.NewConfig() //Instantiate a sarama Config - config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully - config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all - config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - - p.topic = topic - - var err error - p.client, err = sarama.NewClient(addr, config) - if err != nil { - logger.SLog.Error("new kafka client err:", err) - return &p - } - p.producer, err = sarama.NewAsyncProducerFromClient(p.client) - if err != nil { - logger.SLog.Error("new kafka producer err:", err) - return &p - } - - go func() { - for range p.producer.Successes() { - } - }() - - return &p -} - -func (p *Producer) SendMessageAsync(m proto.Message, key ...string) error { - kMsg := &sarama.ProducerMessage{} - kMsg.Topic = p.topic - if len(key) > 0 { - kMsg.Key = sarama.StringEncoder(key[0]) - } - bMsg, err := proto.Marshal(m) - if err != nil { - logger.SLog.Error("proto marshal err:", err) - return err - } - kMsg.Value = sarama.ByteEncoder(bMsg) - - select { - case p.producer.Input() <- kMsg: - } - return nil -} diff --git a/pkg/kafka/vars.go b/pkg/kafka/vars.go new file mode 100644 index 0000000..3b43a5e --- /dev/null +++ b/pkg/kafka/vars.go @@ -0,0 +1,13 @@ +package kafka + +import ( + nkfk "git.noahlan.cn/northlan/ntools-go/kafka" + "github.com/Shopify/sarama" +) + +var DefaultProducerConfig = &nkfk.ProducerConfig{ + RequiredAcks: sarama.WaitForAll, + Partitioner: sarama.NewHashPartitioner, + IsReturnSuccess: true, + Marshaler: ProtobufMarshaler, +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go deleted file mode 100644 index 2cee6cf..0000000 --- a/pkg/logger/logger.go +++ /dev/null @@ -1,141 +0,0 @@ -package logger - -import ( - "github.com/natefinch/lumberjack" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "os" - "path/filepath" -) - -var Log *zap.Logger -var SLog *zap.SugaredLogger - -const DefaultLogPath = "./logs" - -type ( - // FileConfig 日志文件配置 - FileConfig struct { - Level string // 日志打印级别 debug info warning error - Format string // 输出日志格式 console, json - Enabled bool // 是否开启 - - Path string // 输出日志文件路径 - FileName string // 输出日志文件名称 - FileMaxSize int // 【日志分割】单个日志文件最多存储量 单位(mb) - FileMaxBackups int // 【日志分割】日志备份文件最多数量 - MaxAge int // 日志保留时间,单位: 天 (day) - Compress bool // 是否压缩日志 - } - - // ConsoleConfig 控制台日志配置 - ConsoleConfig struct { - Level string // 日志打印级别 debug info warning error - Format string // 输出日志格式 console, json - } -) - -var logLevel = map[string]zapcore.Level{ - "debug": zapcore.DebugLevel, - "info": zapcore.InfoLevel, - "warn": zapcore.WarnLevel, - "error": zapcore.ErrorLevel, -} - -func Sync() { - if SLog != nil { - _ = SLog.Sync() - } - if Log != nil { - _ = Log.Sync() - } -} - -// InitLogger 初始化 log -func InitLogger(fileConf *FileConfig, consoleConf *ConsoleConfig) error { - cores := make([]zapcore.Core, 0, 2) - - consoleCore := zapcore.NewCore(getEncoder(consoleConf), zapcore.AddSync(os.Stdout), getLogLevel(consoleConf.Level)) - cores = append(cores, consoleCore) - - if fileConf.Enabled { - writeSyncer, err := getLogWriter(fileConf) // 日志文件配置 文件位置和切割 - if err != nil { - return err - } - fileCore := zapcore.NewCore(getEncoder(fileConf), writeSyncer, getLogLevel(fileConf.Level)) - cores = append(cores, fileCore) - } - - // 控制台/文件 配置分离 - core := zapcore.NewTee(cores...) - - logger := zap.New(core, zap.AddCaller()) //zap.AddCaller() 输出日志打印文件和行数如: logger/logger_test.go:33 - SLog = logger.Sugar() - Log = logger - return nil -} - -// getLogLevel 获取日志打印级别 -func getLogLevel(level string) zapcore.Level { - l, ok := logLevel[level] // 日志打印级别 - if !ok { - l = logLevel["info"] - } - return l -} - -// getLogWriter 获取日志输出方式 日志文件 控制台 -func getLogWriter(conf *FileConfig) (zapcore.WriteSyncer, error) { - // 判断日志路径是否存在,如果不存在就创建 - if conf.Path == "" { - conf.Path = DefaultLogPath - } - if exist := isExist(conf.Path); !exist { - if err := os.MkdirAll(conf.Path, os.ModePerm); err != nil { - conf.Path = DefaultLogPath - if err := os.MkdirAll(conf.Path, os.ModePerm); err != nil { - return nil, err - } - } - } - - // 日志文件 与 日志切割 配置 - lumberJackLogger := &lumberjack.Logger{ - Filename: filepath.Join(conf.Path, conf.FileName), // 日志文件路径 - MaxSize: conf.FileMaxSize, // 单个日志文件最大多少 mb - MaxBackups: conf.FileMaxBackups, // 日志备份数量 - MaxAge: conf.MaxAge, // 日志最长保留时间 - Compress: conf.Compress, // 是否压缩日志 - } - return zapcore.AddSync(lumberJackLogger), nil -} - -// getEncoder 编码器(如何写入日志) -func getEncoder(conf interface{}) zapcore.Encoder { - encoderConfig := zap.NewProductionEncoderConfig() - encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000Z07") // log 时间格式 例如: 2021-09-11t20:05:54.852+0800 - encoderConfig.EncodeCaller = zapcore.FullCallerEncoder - - var format string - switch conf.(type) { - case FileConfig: - format = conf.(FileConfig).Format - encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder - case ConsoleConfig: - format = conf.(ConsoleConfig).Format - // 输出level序列化为全大写字符串,如 INFO DEBUG ERROR 彩色 - encoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder - } - - if format == "json" { - return zapcore.NewJSONEncoder(encoderConfig) // 以json格式写入 - } - return zapcore.NewConsoleEncoder(encoderConfig) // 以默认console格式写入 -} - -// isExist 判断文件或者目录是否存在 -func isExist(path string) bool { - _, err := os.Stat(path) - return err == nil || os.IsExist(err) -} diff --git a/ws/connection.go b/ws/connection.go index ec9b345..06596bb 100644 --- a/ws/connection.go +++ b/ws/connection.go @@ -400,7 +400,7 @@ func (c *NWebsocket) readInternalLoop() { for { select { default: - _ = c.webSocket.wsConn.SetReadDeadline(time.Now().Add(c.Config.ReadDeadline)) + //_ = c.webSocket.wsConn.SetReadDeadline(time.Now().Add(c.Config.ReadDeadline)) msgType, data, err := c.webSocket.wsConn.ReadMessage() if err != nil { if c.onDisconnected != nil {