You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
74 lines
2.1 KiB
Go
74 lines
2.1 KiB
Go
2 years ago
|
package mq
|
||
3 years ago
|
|
||
|
import (
|
||
|
"dcg/game/manager"
|
||
|
"dcg/game/pb"
|
||
|
pbCommon "dcg/game/pb/common"
|
||
|
pbMq "dcg/game/pb/mq"
|
||
|
"dcg/game/svc"
|
||
|
kfk "dcg/pkg/kafka"
|
||
|
"git.noahlan.cn/northlan/ntools-go/kafka"
|
||
|
"git.noahlan.cn/northlan/ntools-go/logger"
|
||
|
"github.com/Shopify/sarama"
|
||
|
"github.com/golang/protobuf/proto"
|
||
|
)
|
||
|
|
||
|
type CoinTransferHandler struct {
|
||
|
svcCtx *svc.ServiceContext
|
||
|
msgHandle map[string]kafkaMsgHandlerFunc
|
||
|
ConsumerGroup *kafka.ConsumerGroup
|
||
|
}
|
||
|
|
||
|
func (h *CoinTransferHandler) Init(svcCtx *svc.ServiceContext) {
|
||
|
h.svcCtx = svcCtx
|
||
2 years ago
|
cfg := svcCtx.Config.Kafka.UserCoin
|
||
3 years ago
|
h.msgHandle = make(map[string]kafkaMsgHandlerFunc)
|
||
|
h.msgHandle[cfg.Topic] = h.handleTopic
|
||
|
|
||
|
var err error
|
||
|
h.ConsumerGroup, err = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
|
||
|
KafkaVersion: sarama.V3_1_0_0,
|
||
|
OffsetsInitial: sarama.OffsetNewest,
|
||
|
IsReturnErr: false,
|
||
|
UnMarshaler: kfk.ProtobufMarshaler,
|
||
|
}, cfg.Addr, []string{cfg.Topic}, cfg.ConsumerGroup)
|
||
|
|
||
|
if err != nil {
|
||
|
logger.SLog.Error(err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (h *CoinTransferHandler) handleTopic(data []byte, _ string) {
|
||
|
// msg proto
|
||
|
var msgFromMq pbMq.MqUserCoinChanged
|
||
|
err := proto.Unmarshal(data, &msgFromMq)
|
||
|
if err != nil {
|
||
|
logger.SLog.Error("unmarshal msg err", err)
|
||
|
return
|
||
|
}
|
||
|
manager.GameManager.Broadcast(pb.PushCoinChanged, &pbCommon.UserCoinChangedMsg{
|
||
|
User: &pbCommon.PbUser{
|
||
|
UserId: msgFromMq.UserId,
|
||
|
Username: msgFromMq.Username,
|
||
|
Avatar: msgFromMq.Avatar,
|
||
2 years ago
|
Platform: "bilibili", // TODO 平台
|
||
3 years ago
|
},
|
||
|
Reason: msgFromMq.Reason,
|
||
|
Change: msgFromMq.Change,
|
||
|
Current: msgFromMq.Current,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func (CoinTransferHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
|
||
|
func (CoinTransferHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
|
||
|
func (h *CoinTransferHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
||
|
for msg := range claim.Messages() {
|
||
|
//logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
|
||
|
if hFunc, ok := h.msgHandle[msg.Topic]; ok {
|
||
|
hFunc(msg.Value, string(msg.Key))
|
||
|
}
|
||
|
sess.MarkMessage(msg, "")
|
||
|
}
|
||
|
return nil
|
||
|
}
|