diff --git a/config.yml b/config.yml index a5beb07..7373c06 100644 --- a/config.yml +++ b/config.yml @@ -1,8 +1,8 @@ Server: Debug: false - Listen: 0.0.0.0:8889 + Listen: 0.0.0.0:8888 Command: - Regex: "([jJ])|([sS])|([wW])|([cC]\\d)|([mM]\\d)" + Regex: "([jJ])|([sS])|([wW])|([cC]\\d)|([mM]\\d)|([bB]\\d)" Log: Console: Level: debug diff --git a/config/Config.go b/config/Config.go index 0ee734d..7fe093a 100644 --- a/config/Config.go +++ b/config/Config.go @@ -1,8 +1,8 @@ package config import ( - "dcg/pkg/logger" "fmt" + "git.noahlan.cn/northlan/ntools-go/logger" c "github.com/gookit/config/v2" "github.com/gookit/config/v2/yaml" ) diff --git a/game/command/manager.go b/game/command/manager.go index b77cf34..488f585 100644 --- a/game/command/manager.go +++ b/game/command/manager.go @@ -1,11 +1,11 @@ package command import ( - "dcg/game/pb" + pushPb "dcg/game/pb/push" "strings" ) -type HandlerFunc func(cmd string, user *pb.User) +type HandlerFunc func(roomId int64, cmd string, user *pushPb.User) type Manager struct { handlers map[string]HandlerFunc @@ -24,13 +24,13 @@ func (m *Manager) Register(cmd string, h HandlerFunc) { m.handlers[cmd] = h } -func (m *Manager) Handle(cmd string, user *pb.User) { +func (m *Manager) Handle(roomId int64, cmd string, user *pushPb.User) { if len(cmd) < 0 { return } fChar := cmd[0] c := strings.ToLower(string(fChar)) if h, ok := m.handlers[c]; ok { - h(cmd, user) + h(roomId, cmd, user) } } diff --git a/game/msg_transfer/danmaku/msg_to_db.go b/game/msg_transfer/danmaku/msg_to_db.go index 964e008..258efb6 100644 --- a/game/msg_transfer/danmaku/msg_to_db.go +++ b/game/msg_transfer/danmaku/msg_to_db.go @@ -3,11 +3,12 @@ package danmaku import ( "dcg/config" "dcg/game/command" - "dcg/game/pb" pbMq "dcg/game/pb/mq" + pbPush "dcg/game/pb/push" "dcg/game/svc" - "dcg/pkg/kafka" - "dcg/pkg/logger" + kfk "dcg/pkg/kafka" + "git.noahlan.cn/northlan/ntools-go/kafka" + "git.noahlan.cn/northlan/ntools-go/logger" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "regexp" @@ -45,6 +46,7 @@ func (h *MsgToDbHandler) Init(ctx *svc.ServiceContext) { KafkaVersion: sarama.V3_1_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + UnMarshaler: kfk.ProtobufMarshaler, }, cfg.Addr, []string{cfg.Topic}, config.Config.ConsumerGroupId.MsgToDb) if err != nil { @@ -52,7 +54,7 @@ func (h *MsgToDbHandler) Init(ctx *svc.ServiceContext) { } } -func (h *MsgToDbHandler) handleAllCmd(_ string, user *pb.User) { +func (h *MsgToDbHandler) handleAllCmd(_ int64, _ string, user *pbPush.User) { } @@ -66,7 +68,7 @@ func (h *MsgToDbHandler) handleDanmaku(data []byte, msgKey string) { } cmdArr := h.parseCommands(msgFromMq.Content) for _, cmd := range cmdArr { - h.commandManager.Handle(cmd, &pb.User{ + h.commandManager.Handle(msgFromMq.LiveRoomId, cmd, &pbPush.User{ UId: msgFromMq.Uid, Uname: msgFromMq.Uname, }) diff --git a/game/msg_transfer/danmaku/msg_to_push.go b/game/msg_transfer/danmaku/msg_to_push.go index 4538a55..c3e5275 100644 --- a/game/msg_transfer/danmaku/msg_to_push.go +++ b/game/msg_transfer/danmaku/msg_to_push.go @@ -3,11 +3,12 @@ package danmaku import ( "dcg/config" "dcg/game/command" - "dcg/game/pb" pbMq "dcg/game/pb/mq" + pushPb "dcg/game/pb/push" "dcg/game/svc" - "dcg/pkg/kafka" - "dcg/pkg/logger" + kfk "dcg/pkg/kafka" + "git.noahlan.cn/northlan/ntools-go/kafka" + "git.noahlan.cn/northlan/ntools-go/logger" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" "regexp" @@ -34,6 +35,7 @@ func (h *MsgToPushHandler) Init(ctx *svc.ServiceContext) { h.commandManager.Register("s", h.handleOutbreak) h.commandManager.Register("m", h.handleMove) h.commandManager.Register("w", h.handleWai) + h.commandManager.Register("b", h.handleMode) var err error h.regex, err = regexp.Compile(config.Config.Command.Regex) @@ -45,6 +47,7 @@ func (h *MsgToPushHandler) Init(ctx *svc.ServiceContext) { KafkaVersion: sarama.V3_1_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + UnMarshaler: kfk.ProtobufMarshaler, }, cfg.Addr, []string{cfg.Topic}, config.Config.ConsumerGroupId.MsgToPush) if err != nil { @@ -52,38 +55,49 @@ func (h *MsgToPushHandler) Init(ctx *svc.ServiceContext) { } } -func (h *MsgToPushHandler) handleJoinGame(_ string, user *pb.User) { - h.ctx.RoomManager.Broadcast("game.join", &pb.JoinGame{User: user}) +func (h *MsgToPushHandler) handleJoinGame(roomId int64, _ string, user *pushPb.User) { + h.ctx.RoomManager.PushToRoom(roomId, "game.join", &pushPb.JoinGame{User: user}) } -func (h *MsgToPushHandler) handleOutbreak(_ string, user *pb.User) { - h.ctx.RoomManager.Broadcast("game.outbreak", &pb.Outbreak{User: user}) +func (h *MsgToPushHandler) handleOutbreak(roomId int64, _ string, user *pushPb.User) { + h.ctx.RoomManager.PushToRoom(roomId, "game.outbreak", &pushPb.Outbreak{User: user}) } -func (h *MsgToPushHandler) handleCreateUnit(cmd string, user *pb.User) { +func (h *MsgToPushHandler) handleCreateUnit(roomId int64, cmd string, user *pushPb.User) { if len(cmd) < 2 { return } unit := cmd[1] - h.ctx.RoomManager.Broadcast("game.createUnit", &pb.CreateUnit{ + h.ctx.RoomManager.PushToRoom(roomId, "game.createUnit", &pushPb.CreateUnit{ User: user, Unit: string(unit), }) } -func (h *MsgToPushHandler) handleMove(cmd string, user *pb.User) { +func (h *MsgToPushHandler) handleMove(roomId int64, cmd string, user *pushPb.User) { if len(cmd) < 2 { return } line := cmd[1] - h.ctx.RoomManager.Broadcast("game.move", &pb.Move{ + h.ctx.RoomManager.PushToRoom(roomId, "game.move", &pushPb.Move{ User: user, Line: string(line), }) } -func (h *MsgToPushHandler) handleWai(cmd string, user *pb.User) { - h.ctx.RoomManager.Broadcast("game.wai", &pb.Wai{User: user}) +func (h *MsgToPushHandler) handleWai(roomId int64, _ string, user *pushPb.User) { + h.ctx.RoomManager.PushToRoom(roomId, "game.wai", &pushPb.Wai{User: user}) +} + +func (h *MsgToPushHandler) handleMode(roomId int64, cmd string, user *pushPb.User) { + if len(cmd) < 2 { + return + } + line := cmd[1] + h.ctx.RoomManager.PushToRoom(roomId, "game.mode", &pushPb.BuildingMode{ + User: user, + Mode: string(line), + }) } func (h *MsgToPushHandler) handleDanmaku(data []byte, msgKey string) { @@ -94,11 +108,19 @@ func (h *MsgToPushHandler) handleDanmaku(data []byte, msgKey string) { logger.SLog.Error("unmarshal msg err", err) return } + pbUser := &pushPb.User{ + UId: msgFromMq.Uid, + Uname: msgFromMq.Uname, + } cmdArr := h.parseCommands(msgFromMq.Content) for _, cmd := range cmdArr { - h.commandManager.Handle(cmd, &pb.User{ - UId: msgFromMq.Uid, - Uname: msgFromMq.Uname, + h.commandManager.Handle(msgFromMq.LiveRoomId, cmd, pbUser) + } + // 发送正常非命令弹幕消息 + if len(cmdArr) <= 0 { + h.ctx.RoomManager.PushToRoom(msgFromMq.LiveRoomId, "live.danmaku", &pushPb.DanmakuMsg{ + User: pbUser, + Content: msgFromMq.Content, }) } } diff --git a/game/msg_transfer/gift/msg_to_push.go b/game/msg_transfer/gift/msg_to_push.go index 749d094..7c99a73 100644 --- a/game/msg_transfer/gift/msg_to_push.go +++ b/game/msg_transfer/gift/msg_to_push.go @@ -2,11 +2,12 @@ package gift import ( "dcg/config" - "dcg/game/pb" pbMq "dcg/game/pb/mq" + pushPb "dcg/game/pb/push" "dcg/game/svc" - "dcg/pkg/kafka" - "dcg/pkg/logger" + kfk "dcg/pkg/kafka" + "git.noahlan.cn/northlan/ntools-go/kafka" + "git.noahlan.cn/northlan/ntools-go/logger" "github.com/Shopify/sarama" "github.com/golang/protobuf/proto" ) @@ -29,6 +30,7 @@ func (h *MsgToPushHandler) Init(ctx *svc.ServiceContext) { KafkaVersion: sarama.V3_1_0_0, OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false, + UnMarshaler: kfk.ProtobufMarshaler, }, cfg.Addr, []string{cfg.Topic}, config.Config.ConsumerGroupId.GiftToPush) if err != nil { @@ -44,8 +46,8 @@ func (h *MsgToPushHandler) handleGift(data []byte, msgKey string) { logger.SLog.Error("unmarshal msg err", err) return } - h.ctx.RoomManager.Broadcast("game.gift", &pb.Gift{ - User: &pb.User{ + h.ctx.RoomManager.Broadcast("game.gift", &pushPb.Gift{ + User: &pushPb.User{ UId: msgFromMq.Uid, Uname: msgFromMq.Uname, }, diff --git a/game/pb/mq/mq.pb.go b/game/pb/mq/mq.pb.go index 20332ed..42465c4 100644 --- a/game/pb/mq/mq.pb.go +++ b/game/pb/mq/mq.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.0 +// protoc-gen-go v1.27.1 // protoc v3.19.4 // source: mq.proto @@ -25,11 +25,12 @@ type MqDanmaku struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` - Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` - Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"` - Content string `protobuf:"bytes,4,opt,name=content,proto3" json:"content,omitempty"` - SendTime int64 `protobuf:"varint,5,opt,name=sendTime,proto3" json:"sendTime,omitempty"` + Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` + LiveRoomId int64 `protobuf:"varint,2,opt,name=liveRoomId,proto3" json:"liveRoomId,omitempty"` + Uid int64 `protobuf:"varint,3,opt,name=uid,proto3" json:"uid,omitempty"` + Uname string `protobuf:"bytes,4,opt,name=uname,proto3" json:"uname,omitempty"` + Content string `protobuf:"bytes,5,opt,name=content,proto3" json:"content,omitempty"` + SendTime int64 `protobuf:"varint,6,opt,name=sendTime,proto3" json:"sendTime,omitempty"` } func (x *MqDanmaku) Reset() { @@ -71,6 +72,13 @@ func (x *MqDanmaku) GetPlatform() string { return "" } +func (x *MqDanmaku) GetLiveRoomId() int64 { + if x != nil { + return x.LiveRoomId + } + return 0 +} + func (x *MqDanmaku) GetUid() int64 { if x != nil { return x.Uid @@ -104,13 +112,14 @@ type MqGift struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` - Uid int64 `protobuf:"varint,2,opt,name=uid,proto3" json:"uid,omitempty"` - Uname string `protobuf:"bytes,3,opt,name=uname,proto3" json:"uname,omitempty"` - GiftId int32 `protobuf:"varint,4,opt,name=giftId,proto3" json:"giftId,omitempty"` - GiftName string `protobuf:"bytes,5,opt,name=giftName,proto3" json:"giftName,omitempty"` - TotalCoin int64 `protobuf:"varint,6,opt,name=totalCoin,proto3" json:"totalCoin,omitempty"` - SendTime int64 `protobuf:"varint,7,opt,name=sendTime,proto3" json:"sendTime,omitempty"` + Platform string `protobuf:"bytes,1,opt,name=platform,proto3" json:"platform,omitempty"` + LiveRoomId int64 `protobuf:"varint,2,opt,name=liveRoomId,proto3" json:"liveRoomId,omitempty"` + Uid int64 `protobuf:"varint,3,opt,name=uid,proto3" json:"uid,omitempty"` + Uname string `protobuf:"bytes,4,opt,name=uname,proto3" json:"uname,omitempty"` + GiftId int32 `protobuf:"varint,5,opt,name=giftId,proto3" json:"giftId,omitempty"` + GiftName string `protobuf:"bytes,6,opt,name=giftName,proto3" json:"giftName,omitempty"` + TotalCoin int64 `protobuf:"varint,7,opt,name=totalCoin,proto3" json:"totalCoin,omitempty"` + SendTime int64 `protobuf:"varint,8,opt,name=sendTime,proto3" json:"sendTime,omitempty"` } func (x *MqGift) Reset() { @@ -152,6 +161,13 @@ func (x *MqGift) GetPlatform() string { return "" } +func (x *MqGift) GetLiveRoomId() int64 { + if x != nil { + return x.LiveRoomId + } + return 0 +} + func (x *MqGift) GetUid() int64 { if x != nil { return x.Uid @@ -197,27 +213,31 @@ func (x *MqGift) GetSendTime() int64 { var File_mq_proto protoreflect.FileDescriptor var file_mq_proto_rawDesc = []byte{ - 0x0a, 0x08, 0x6d, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x85, + 0x0a, 0x08, 0x6d, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0xa5, 0x01, 0x0a, 0x09, 0x4d, 0x71, 0x44, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x6e, - 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, - 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6c, 0x69, 0x76, 0x65, + 0x52, 0x6f, 0x6f, 0x6d, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x6c, 0x69, + 0x76, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x49, 0x64, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x75, 0x6e, + 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, - 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, - 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xba, 0x01, 0x0a, 0x06, 0x4d, 0x71, 0x47, 0x69, 0x66, + 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, + 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xda, 0x01, 0x0a, 0x06, 0x4d, 0x71, 0x47, 0x69, 0x66, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x10, 0x0a, - 0x03, 0x75, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, - 0x14, 0x0a, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x6c, 0x61, 0x74, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x1e, 0x0a, + 0x0a, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0a, 0x6c, 0x69, 0x76, 0x65, 0x52, 0x6f, 0x6f, 0x6d, 0x49, 0x64, 0x12, 0x10, 0x0a, + 0x03, 0x75, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x03, 0x75, 0x69, 0x64, 0x12, + 0x14, 0x0a, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, 0x64, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, - 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, + 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x6f, 0x74, - 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, + 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, - 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, + 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x07, 0x5a, 0x05, 0x2f, 0x70, 0x62, 0x4d, 0x71, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } diff --git a/game/pb/mq/mq.proto b/game/pb/mq/mq.proto index 769c0b6..fb888d1 100644 --- a/game/pb/mq/mq.proto +++ b/game/pb/mq/mq.proto @@ -6,18 +6,20 @@ option go_package = "/pbMq"; message MqDanmaku { string platform = 1; - int64 uid = 2; - string uname = 3; - string content = 4; - int64 sendTime = 5; + int64 liveRoomId = 2; + int64 uid = 3; + string uname = 4; + string content = 5; + int64 sendTime = 6; } message MqGift { string platform = 1; - int64 uid = 2; - string uname = 3; - int32 giftId = 4; - string giftName = 5; - int64 totalCoin = 6; - int64 sendTime = 7; + int64 liveRoomId = 2; + int64 uid = 3; + string uname = 4; + int32 giftId = 5; + string giftName = 6; + int64 totalCoin = 7; + int64 sendTime = 8; } \ No newline at end of file diff --git a/game/pb/broadcast.pb.go b/game/pb/push/broadcast.pb.go similarity index 67% rename from game/pb/broadcast.pb.go rename to game/pb/push/broadcast.pb.go index 4f71c5d..b55de0b 100644 --- a/game/pb/broadcast.pb.go +++ b/game/pb/push/broadcast.pb.go @@ -1,10 +1,10 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.0 +// protoc-gen-go v1.27.1 // protoc v3.19.4 // source: broadcast.proto -package pb +package pbPush import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" @@ -284,9 +284,11 @@ type Gift struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - User *User `protobuf:"bytes,1,opt,name=user,proto3" json:"user,omitempty"` - GiftId int32 `protobuf:"varint,2,opt,name=giftId,proto3" json:"giftId,omitempty"` - TotalCoin int64 `protobuf:"varint,3,opt,name=totalCoin,proto3" json:"totalCoin,omitempty"` + User *User `protobuf:"bytes,1,opt,name=user,proto3" json:"user,omitempty"` + GiftId int32 `protobuf:"varint,2,opt,name=giftId,proto3" json:"giftId,omitempty"` + GiftName string `protobuf:"bytes,3,opt,name=giftName,proto3" json:"giftName,omitempty"` + TotalCoin int64 `protobuf:"varint,4,opt,name=totalCoin,proto3" json:"totalCoin,omitempty"` + SendTime int64 `protobuf:"varint,5,opt,name=sendTime,proto3" json:"sendTime,omitempty"` } func (x *Gift) Reset() { @@ -335,6 +337,13 @@ func (x *Gift) GetGiftId() int32 { return 0 } +func (x *Gift) GetGiftName() string { + if x != nil { + return x.GiftName + } + return "" +} + func (x *Gift) GetTotalCoin() int64 { if x != nil { return x.TotalCoin @@ -342,6 +351,13 @@ func (x *Gift) GetTotalCoin() int64 { return 0 } +func (x *Gift) GetSendTime() int64 { + if x != nil { + return x.SendTime + } + return 0 +} + type Wai struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -389,6 +405,116 @@ func (x *Wai) GetUser() *User { return nil } +type BuildingMode struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + User *User `protobuf:"bytes,1,opt,name=user,proto3" json:"user,omitempty"` + Mode string `protobuf:"bytes,2,opt,name=mode,proto3" json:"mode,omitempty"` +} + +func (x *BuildingMode) Reset() { + *x = BuildingMode{} + if protoimpl.UnsafeEnabled { + mi := &file_broadcast_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BuildingMode) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BuildingMode) ProtoMessage() {} + +func (x *BuildingMode) ProtoReflect() protoreflect.Message { + mi := &file_broadcast_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BuildingMode.ProtoReflect.Descriptor instead. +func (*BuildingMode) Descriptor() ([]byte, []int) { + return file_broadcast_proto_rawDescGZIP(), []int{7} +} + +func (x *BuildingMode) GetUser() *User { + if x != nil { + return x.User + } + return nil +} + +func (x *BuildingMode) GetMode() string { + if x != nil { + return x.Mode + } + return "" +} + +type DanmakuMsg struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + User *User `protobuf:"bytes,1,opt,name=user,proto3" json:"user,omitempty"` + Content string `protobuf:"bytes,2,opt,name=content,proto3" json:"content,omitempty"` +} + +func (x *DanmakuMsg) Reset() { + *x = DanmakuMsg{} + if protoimpl.UnsafeEnabled { + mi := &file_broadcast_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DanmakuMsg) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DanmakuMsg) ProtoMessage() {} + +func (x *DanmakuMsg) ProtoReflect() protoreflect.Message { + mi := &file_broadcast_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DanmakuMsg.ProtoReflect.Descriptor instead. +func (*DanmakuMsg) Descriptor() ([]byte, []int) { + return file_broadcast_proto_rawDescGZIP(), []int{8} +} + +func (x *DanmakuMsg) GetUser() *User { + if x != nil { + return x.User + } + return nil +} + +func (x *DanmakuMsg) GetContent() string { + if x != nil { + return x.Content + } + return "" +} + var File_broadcast_proto protoreflect.FileDescriptor var file_broadcast_proto_rawDesc = []byte{ @@ -409,16 +535,28 @@ var file_broadcast_proto_rawDesc = []byte{ 0x01, 0x28, 0x09, 0x52, 0x04, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0x28, 0x0a, 0x08, 0x4f, 0x75, 0x74, 0x62, 0x72, 0x65, 0x61, 0x6b, 0x12, 0x1c, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x70, 0x62, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x52, 0x04, 0x75, - 0x73, 0x65, 0x72, 0x22, 0x5a, 0x0a, 0x04, 0x47, 0x69, 0x66, 0x74, 0x12, 0x1c, 0x0a, 0x04, 0x75, - 0x73, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x70, 0x62, 0x2e, 0x55, - 0x73, 0x65, 0x72, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x67, 0x69, 0x66, - 0x74, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x67, 0x69, 0x66, 0x74, 0x49, - 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x22, - 0x23, 0x0a, 0x03, 0x57, 0x61, 0x69, 0x12, 0x1c, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x70, 0x62, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x52, 0x04, - 0x75, 0x73, 0x65, 0x72, 0x42, 0x05, 0x5a, 0x03, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x73, 0x65, 0x72, 0x22, 0x92, 0x01, 0x0a, 0x04, 0x47, 0x69, 0x66, 0x74, 0x12, 0x1c, 0x0a, 0x04, + 0x75, 0x73, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x70, 0x62, 0x2e, + 0x55, 0x73, 0x65, 0x72, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x67, 0x69, + 0x66, 0x74, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x67, 0x69, 0x66, 0x74, + 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x67, 0x69, 0x66, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1c, + 0x0a, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x09, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x43, 0x6f, 0x69, 0x6e, 0x12, 0x1a, 0x0a, 0x08, + 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, + 0x73, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x23, 0x0a, 0x03, 0x57, 0x61, 0x69, 0x12, + 0x1c, 0x0a, 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, + 0x70, 0x62, 0x2e, 0x55, 0x73, 0x65, 0x72, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, 0x22, 0x40, 0x0a, + 0x0c, 0x42, 0x75, 0x69, 0x6c, 0x64, 0x69, 0x6e, 0x67, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1c, 0x0a, + 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x70, 0x62, + 0x2e, 0x55, 0x73, 0x65, 0x72, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x6d, + 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x22, + 0x44, 0x0a, 0x0a, 0x44, 0x61, 0x6e, 0x6d, 0x61, 0x6b, 0x75, 0x4d, 0x73, 0x67, 0x12, 0x1c, 0x0a, + 0x04, 0x75, 0x73, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x08, 0x2e, 0x70, 0x62, + 0x2e, 0x55, 0x73, 0x65, 0x72, 0x52, 0x04, 0x75, 0x73, 0x65, 0x72, 0x12, 0x18, 0x0a, 0x07, 0x63, + 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, + 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x42, 0x09, 0x5a, 0x07, 0x2f, 0x70, 0x62, 0x50, 0x75, 0x73, 0x68, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -433,15 +571,17 @@ func file_broadcast_proto_rawDescGZIP() []byte { return file_broadcast_proto_rawDescData } -var file_broadcast_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_broadcast_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_broadcast_proto_goTypes = []interface{}{ - (*User)(nil), // 0: pb.User - (*JoinGame)(nil), // 1: pb.JoinGame - (*CreateUnit)(nil), // 2: pb.CreateUnit - (*Move)(nil), // 3: pb.Move - (*Outbreak)(nil), // 4: pb.Outbreak - (*Gift)(nil), // 5: pb.Gift - (*Wai)(nil), // 6: pb.Wai + (*User)(nil), // 0: pb.User + (*JoinGame)(nil), // 1: pb.JoinGame + (*CreateUnit)(nil), // 2: pb.CreateUnit + (*Move)(nil), // 3: pb.Move + (*Outbreak)(nil), // 4: pb.Outbreak + (*Gift)(nil), // 5: pb.Gift + (*Wai)(nil), // 6: pb.Wai + (*BuildingMode)(nil), // 7: pb.BuildingMode + (*DanmakuMsg)(nil), // 8: pb.DanmakuMsg } var file_broadcast_proto_depIdxs = []int32{ 0, // 0: pb.JoinGame.user:type_name -> pb.User @@ -450,11 +590,13 @@ var file_broadcast_proto_depIdxs = []int32{ 0, // 3: pb.Outbreak.user:type_name -> pb.User 0, // 4: pb.Gift.user:type_name -> pb.User 0, // 5: pb.Wai.user:type_name -> pb.User - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 0, // 6: pb.BuildingMode.user:type_name -> pb.User + 0, // 7: pb.DanmakuMsg.user:type_name -> pb.User + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_broadcast_proto_init() } @@ -547,6 +689,30 @@ func file_broadcast_proto_init() { return nil } } + file_broadcast_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BuildingMode); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_broadcast_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DanmakuMsg); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -554,7 +720,7 @@ func file_broadcast_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_broadcast_proto_rawDesc, NumEnums: 0, - NumMessages: 7, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/game/pb/broadcast.proto b/game/pb/push/broadcast.proto similarity index 66% rename from game/pb/broadcast.proto rename to game/pb/push/broadcast.proto index 21b158e..b4f2afa 100644 --- a/game/pb/broadcast.proto +++ b/game/pb/push/broadcast.proto @@ -2,7 +2,7 @@ syntax = "proto3"; package pb; -option go_package = "/pb"; +option go_package = "/pbPush"; message User{ int64 uId = 1;//用户id @@ -30,9 +30,21 @@ message Outbreak{ message Gift{ User user = 1; int32 giftId = 2; - int64 totalCoin = 3; + string giftName = 3; + int64 totalCoin = 4; + int64 sendTime = 5; } message Wai{ User user = 1; +} + +message BuildingMode{ + User user = 1; + string mode = 2; +} + +message DanmakuMsg { + User user = 1; + string content = 2; } \ No newline at end of file diff --git a/game/pb/push/gen.bat b/game/pb/push/gen.bat new file mode 100644 index 0000000..f379068 --- /dev/null +++ b/game/pb/push/gen.bat @@ -0,0 +1,2 @@ +protoc --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative --go-grpc_opt=require_unimplemented_servers=false --go_out=. --go-grpc_out=. --proto_path=. *.proto +protoc --csharp_out=. --proto_path=. *.proto \ No newline at end of file diff --git a/game/room/manager.go b/game/room/manager.go index e0e70d6..aa17f51 100644 --- a/game/room/manager.go +++ b/game/room/manager.go @@ -2,12 +2,12 @@ package room import ( "dcg/game/pb" - "dcg/pkg/logger" "fmt" "git.noahlan.cn/northlan/ngs" "git.noahlan.cn/northlan/ngs/component" "git.noahlan.cn/northlan/ngs/scheduler" "git.noahlan.cn/northlan/ngs/session" + "git.noahlan.cn/northlan/ntools-go/logger" "github.com/golang/protobuf/proto" "time" ) @@ -103,3 +103,12 @@ func (m *Manager) Broadcast(route string, msg proto.Message) { } } } + +func (m *Manager) PushToRoom(roomId int64, route string, msg proto.Message) { + if room, ok := m.rooms[roomId]; ok { + err := room.group.Broadcast(route, msg) + if err != nil { + logger.SLog.Errorf("broadcast message to room %d err:%+v", room.id, err) + } + } +} diff --git a/go.mod b/go.mod index 28f0466..80a4da5 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( ) require ( + git.noahlan.cn/northlan/ntools-go/kafka v1.0.1 // indirect + git.noahlan.cn/northlan/ntools-go/logger v1.0.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.2.0 // indirect @@ -40,7 +42,7 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect - golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 // indirect + golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 // indirect golang.org/x/sys v0.0.0-20220412211240-33da011f77ad // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/genproto v0.0.0-20220414192740-2d67ff6cf2b4 // indirect diff --git a/go.sum b/go.sum index 089d175..6bf95e7 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,14 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= git.noahlan.cn/northlan/ngs v0.1.2 h1:0+cZIAff14VgGBqkCw5Hur9gVD6HzxTmFIvuoWvFphQ= git.noahlan.cn/northlan/ngs v0.1.2/go.mod h1:dWoj94sHXJPFE1BbCvF8hOLtMRUe0V6v7RGpGs4+iAs= +git.noahlan.cn/northlan/ntools-go/kafka v1.0.0 h1:5wTBhunxJRkp7mEBkB8dv61FSRqS9lcQ7wlRGjDijuA= +git.noahlan.cn/northlan/ntools-go/kafka v1.0.0/go.mod h1:RxX9JSUIr3Gbk+cvUwE5k+i08AgIK3TA9ayDJCMn2n8= +git.noahlan.cn/northlan/ntools-go/kafka v1.0.1 h1:SDUwYRzksZ3Vcu7PTZxk+TEMF2f3gBiQEboKOhi1yfI= +git.noahlan.cn/northlan/ntools-go/kafka v1.0.1/go.mod h1:RxX9JSUIr3Gbk+cvUwE5k+i08AgIK3TA9ayDJCMn2n8= +git.noahlan.cn/northlan/ntools-go/logger v1.0.0 h1:u4aqq7v/ZBtvS7LuZXD+V4Co/D5S5ADZbnyx0CRl6lA= +git.noahlan.cn/northlan/ntools-go/logger v1.0.0/go.mod h1:QQwgylABV9P8MFGvXKlujJO5NV0MP0JUPzqQt3I0Y+w= +git.noahlan.cn/northlan/ntools-go/logger v1.0.1 h1:+08dMbsKGECM1B7H8GqwtRzGqOl5yrNNbJYo9tFoMf0= +git.noahlan.cn/northlan/ntools-go/logger v1.0.1/go.mod h1:QQwgylABV9P8MFGvXKlujJO5NV0MP0JUPzqQt3I0Y+w= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I= @@ -227,6 +235,8 @@ golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2 h1:6mzvA99KwZxbOrxww4EvWVQUnN1+xEu9tafK5ZxkYeA= golang.org/x/net v0.0.0-20220418201149-a630d4f3e7a2/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220421235706-1d1ef9303861 h1:yssD99+7tqHWO5Gwh81phT+67hg+KttniBr6UnEXOY8= +golang.org/x/net v0.0.0-20220421235706-1d1ef9303861/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/main.go b/main.go index 83a7d0e..7dc897f 100644 --- a/main.go +++ b/main.go @@ -4,10 +4,10 @@ import ( "dcg/config" "dcg/game/msg_transfer" "dcg/game/svc" - "dcg/pkg/logger" "flag" "git.noahlan.cn/northlan/ngs" "git.noahlan.cn/northlan/ngs/serialize/protobuf" + "git.noahlan.cn/northlan/ntools-go/logger" ) var configFile = flag.String("f", "./config.yml", "the config file") diff --git a/pkg/kafka/codec.go b/pkg/kafka/codec.go new file mode 100644 index 0000000..d5933f9 --- /dev/null +++ b/pkg/kafka/codec.go @@ -0,0 +1,29 @@ +package kafka + +import ( + "errors" + "git.noahlan.cn/northlan/ntools-go/kafka" + "google.golang.org/protobuf/proto" +) + +var _ kafka.Marshaler = (*protobufMarshaler)(nil) +var _ kafka.UnMarshaler = (*protobufMarshaler)(nil) + +var ProtobufMarshaler = &protobufMarshaler{} + +type protobufMarshaler struct { +} + +func (p *protobufMarshaler) Marshal(v interface{}) ([]byte, error) { + if msg, ok := v.(proto.Message); ok { + return proto.Marshal(msg) + } + return nil, errors.New("v must be proto message") +} + +func (p *protobufMarshaler) UnMarshal(data []byte, v interface{}) error { + if msg, ok := v.(proto.Message); ok { + return proto.Unmarshal(data, msg) + } + return errors.New("v must be proto message") +} diff --git a/pkg/kafka/consumer.go b/pkg/kafka/consumer.go deleted file mode 100644 index 4a42f68..0000000 --- a/pkg/kafka/consumer.go +++ /dev/null @@ -1,41 +0,0 @@ -package kafka - -import ( - "dcg/pkg/logger" - "github.com/Shopify/sarama" -) - -type Consumer struct { - client sarama.Client - topic string - consumer sarama.Consumer - partitions []int32 -} - -func NewKafkaConsumer(addr []string, topic string) (*Consumer, error) { - p := Consumer{} - p.topic = topic - - config := sarama.NewConfig() - config.Version = sarama.V3_1_0_0 - config.Consumer.Offsets.Initial = sarama.OffsetNewest - - var err error - p.client, err = sarama.NewClient(addr, config) - if err != nil { - logger.SLog.Error("new kafka client err:", err) - return nil, err - } - - p.consumer, err = sarama.NewConsumerFromClient(p.client) - if err != nil { - logger.SLog.Error("new kafka consumer err:", err) - return nil, err - } - p.partitions, err = p.consumer.Partitions(topic) - if err != nil { - logger.SLog.Errorf("get partitions for topic %s err", topic) - return nil, err - } - return &p, nil -} diff --git a/pkg/kafka/consumer_group.go b/pkg/kafka/consumer_group.go deleted file mode 100644 index 65ef3c6..0000000 --- a/pkg/kafka/consumer_group.go +++ /dev/null @@ -1,47 +0,0 @@ -package kafka - -import ( - "context" - "dcg/pkg/logger" - "github.com/Shopify/sarama" -) - -type ConsumerGroup struct { - sarama.ConsumerGroup - groupId string - topics []string -} - -type ConsumerGroupConfig struct { - KafkaVersion sarama.KafkaVersion - OffsetsInitial int64 - IsReturnErr bool -} - -func NewConsumerGroup(config *ConsumerGroupConfig, addr, topics []string, groupId string) (*ConsumerGroup, error) { - c := sarama.NewConfig() - c.Version = config.KafkaVersion - c.Consumer.Offsets.Initial = config.OffsetsInitial - c.Consumer.Return.Errors = config.IsReturnErr - - client, err := sarama.NewClient(addr, c) - if err != nil { - return nil, err - } - - consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, client) - if err != nil { - return nil, err - } - return &ConsumerGroup{consumerGroup, groupId, topics}, nil -} - -func (cg *ConsumerGroup) RegisterHandlerAndConsumer(handler sarama.ConsumerGroupHandler) { - ctx := context.Background() - for { - err := cg.ConsumerGroup.Consume(ctx, cg.topics, handler) - if err != nil { - logger.SLog.Error("RegisterHandlerAndConsumer error: ", err) - } - } -} diff --git a/pkg/kafka/producer.go b/pkg/kafka/producer.go deleted file mode 100644 index 643f2d6..0000000 --- a/pkg/kafka/producer.go +++ /dev/null @@ -1,62 +0,0 @@ -package kafka - -import ( - "dcg/pkg/logger" - "github.com/Shopify/sarama" - "google.golang.org/protobuf/proto" -) - -type Producer struct { - topic string - client sarama.Client - producer sarama.AsyncProducer -} - -func NewKafkaProducer(addr []string, topic string) *Producer { - p := Producer{} - - config := sarama.NewConfig() //Instantiate a sarama Config - config.Producer.Return.Successes = true //Whether to enable the successes channel to be notified after the message is sent successfully - config.Producer.RequiredAcks = sarama.WaitForAll //Set producer Message Reply level 0 1 all - config.Producer.Partitioner = sarama.NewHashPartitioner //Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly - - p.topic = topic - - var err error - p.client, err = sarama.NewClient(addr, config) - if err != nil { - logger.SLog.Error("new kafka client err:", err) - return &p - } - p.producer, err = sarama.NewAsyncProducerFromClient(p.client) - if err != nil { - logger.SLog.Error("new kafka producer err:", err) - return &p - } - - go func() { - for range p.producer.Successes() { - } - }() - - return &p -} - -func (p *Producer) SendMessageAsync(m proto.Message, key ...string) error { - kMsg := &sarama.ProducerMessage{} - kMsg.Topic = p.topic - if len(key) > 0 { - kMsg.Key = sarama.StringEncoder(key[0]) - } - bMsg, err := proto.Marshal(m) - if err != nil { - logger.SLog.Error("proto marshal err:", err) - return err - } - kMsg.Value = sarama.ByteEncoder(bMsg) - - select { - case p.producer.Input() <- kMsg: - } - return nil -} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go deleted file mode 100644 index 6314624..0000000 --- a/pkg/logger/logger.go +++ /dev/null @@ -1,141 +0,0 @@ -package logger - -import ( - "github.com/natefinch/lumberjack" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "os" - "path/filepath" -) - -var Log *zap.Logger -var SLog *zap.SugaredLogger - -const DefaultLogPath = "/logs" - -type ( - // FileConfig 日志文件配置 - FileConfig struct { - Level string // 日志打印级别 debug info warning error - Format string // 输出日志格式 console, json - Enabled bool // 是否开启 - - Path string // 输出日志文件路径 - FileName string // 输出日志文件名称 - FileMaxSize int // 【日志分割】单个日志文件最多存储量 单位(mb) - FileMaxBackups int // 【日志分割】日志备份文件最多数量 - MaxAge int // 日志保留时间,单位: 天 (day) - Compress bool // 是否压缩日志 - } - - // ConsoleConfig 控制台日志配置 - ConsoleConfig struct { - Level string // 日志打印级别 debug info warning error - Format string // 输出日志格式 console, json - } -) - -var logLevel = map[string]zapcore.Level{ - "debug": zapcore.DebugLevel, - "info": zapcore.InfoLevel, - "warn": zapcore.WarnLevel, - "error": zapcore.ErrorLevel, -} - -func Sync() { - if SLog != nil { - _ = SLog.Sync() - } - if Log != nil { - _ = Log.Sync() - } -} - -// InitLogger 初始化 log -func InitLogger(fileConf *FileConfig, consoleConf *ConsoleConfig) error { - cores := make([]zapcore.Core, 0, 2) - - consoleCore := zapcore.NewCore(getEncoder(consoleConf), zapcore.AddSync(os.Stdout), getLogLevel(consoleConf.Level)) - cores = append(cores, consoleCore) - - if fileConf.Enabled { - writeSyncer, err := getLogWriter(fileConf) // 日志文件配置 文件位置和切割 - if err != nil { - return err - } - fileCore := zapcore.NewCore(getEncoder(fileConf), writeSyncer, getLogLevel(fileConf.Level)) - cores = append(cores, fileCore) - } - - // 控制台/文件 配置分离 - core := zapcore.NewTee(cores...) - - logger := zap.New(core, zap.AddCaller()) //zap.AddCaller() 输出日志打印文件和行数如: logger/logger_test.go:33 - SLog = logger.Sugar() - Log = logger - return nil -} - -// getLogLevel 获取日志打印级别 -func getLogLevel(level string) zapcore.Level { - l, ok := logLevel[level] // 日志打印级别 - if !ok { - l = logLevel["info"] - } - return l -} - -// getLogWriter 获取日志输出方式 日志文件 控制台 -func getLogWriter(conf *FileConfig) (zapcore.WriteSyncer, error) { - // 判断日志路径是否存在,如果不存在就创建 - if conf.Path == "" { - conf.Path = getCurrentAbPath() + DefaultLogPath - } - if exist := isExist(conf.Path); !exist { - if err := os.MkdirAll(conf.Path, os.ModePerm); err != nil { - conf.Path = getCurrentAbPath() + DefaultLogPath - if err := os.MkdirAll(conf.Path, os.ModePerm); err != nil { - return nil, err - } - } - } - - // 日志文件 与 日志切割 配置 - lumberJackLogger := &lumberjack.Logger{ - Filename: filepath.Join(conf.Path, conf.FileName), // 日志文件路径 - MaxSize: conf.FileMaxSize, // 单个日志文件最大多少 mb - MaxBackups: conf.FileMaxBackups, // 日志备份数量 - MaxAge: conf.MaxAge, // 日志最长保留时间 - Compress: conf.Compress, // 是否压缩日志 - } - return zapcore.AddSync(lumberJackLogger), nil -} - -// getEncoder 编码器(如何写入日志) -func getEncoder(conf interface{}) zapcore.Encoder { - encoderConfig := zap.NewProductionEncoderConfig() - encoderConfig.EncodeTime = zapcore.TimeEncoderOfLayout("2006-01-02 15:04:05.000Z07") // log 时间格式 例如: 2021-09-11t20:05:54.852+0800 - encoderConfig.EncodeCaller = zapcore.FullCallerEncoder - - var format string - switch conf.(type) { - case FileConfig: - format = conf.(FileConfig).Format - encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder - case ConsoleConfig: - format = conf.(ConsoleConfig).Format - // 输出level序列化为全大写字符串,如 INFO DEBUG ERROR 彩色 - encoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder - } - - if format == "json" { - return zapcore.NewJSONEncoder(encoderConfig) // 以json格式写入 - } - return zapcore.NewConsoleEncoder(encoderConfig) // 以默认console格式写入 -} - -// isExist 判断文件或者目录是否存在 -func isExist(path string) bool { - _, err := os.Stat(path) - return err == nil || os.IsExist(err) -}