You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
dcg/game/msg-transfer/danmaku_consumer_handler.go

52 lines
1.5 KiB
Go

package msg_transfer
import (
"dcg/game/pb"
"dcg/pkg/kafka"
"dcg/pkg/logger"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
)
type fcb func(data []byte, msgKey string)
type DanmakuConsumerHandler struct {
msgHandle map[string]fcb
cg *kafka.ConsumerGroup
}
func (h *DanmakuConsumerHandler) Init() {
h.msgHandle = make(map[string]fcb)
h.msgHandle["danmaku"] = h.handleDanmaku
h.cg, _ = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
KafkaVersion: sarama.V3_1_0_0,
OffsetsInitial: sarama.OffsetNewest,
IsReturnErr: false,
}, []string{"danmaku"}, []string{"danmaku"}, "")
}
func (h *DanmakuConsumerHandler) handleDanmaku(msg []byte, msgKey string) {
logger.SLog.Debug("msg come", string(msg), msgKey)
// danmaku msg proto
var msgFromMq pb.Danmaku
err := proto.Unmarshal(msg, &msgFromMq)
if err != nil {
logger.SLog.Error("unmarshal msg err", err)
return
}
//
}
func (DanmakuConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (DanmakuConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h *DanmakuConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
logger.SLog.Infow("kafka get info to mysql", "msgTopic", msg.Topic, "msgPartition", msg.Partition, "msg", string(msg.Value))
h.msgHandle[msg.Topic](msg.Value, string(msg.Key))
sess.MarkMessage(msg, "")
}
return nil
}