You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ngs/cluster/cluster.go

173 lines
4.2 KiB
Go

package cluster
import (
"context"
"fmt"
"git.noahlan.cn/northlan/ngs/cluster/clusterpb"
"git.noahlan.cn/northlan/ngs/internal/log"
"sync"
)
// cluster represents a ngs cluster, which contains a bunch of ngs nodes
// and each of them provide a group of different services. All services requests
// from client will send to gate firstly and be forwarded to appropriate node.
type cluster struct {
// If cluster is not large enough, use slice is OK
currentNode *Node
rpcClient *rpcClient
mu sync.RWMutex
members []*Member
}
func newCluster(currentNode *Node) *cluster {
return &cluster{currentNode: currentNode}
}
// Register implements the MasterServer gRPC service
func (c *cluster) Register(_ context.Context, req *clusterpb.RegisterRequest) (*clusterpb.RegisterResponse, error) {
if req.MemberInfo == nil {
return nil, ErrInvalidRegisterReq
}
resp := &clusterpb.RegisterResponse{}
for _, m := range c.members {
if m.memberInfo.ServiceAddr == req.MemberInfo.ServiceAddr {
return nil, fmt.Errorf("address %s has registered", req.MemberInfo.ServiceAddr)
}
}
// Notify registered node to update remote services
newMember := &clusterpb.NewMemberRequest{MemberInfo: req.MemberInfo}
for _, m := range c.members {
resp.Members = append(resp.Members, m.memberInfo)
if m.isMaster {
continue
}
pool, err := c.rpcClient.getConnPool(m.memberInfo.ServiceAddr)
if err != nil {
return nil, err
}
client := clusterpb.NewMemberClient(pool.Get())
_, err = client.NewMember(context.Background(), newMember)
if err != nil {
return nil, err
}
}
log.Println("New peer register to cluster", req.MemberInfo.ServiceAddr)
// Register services to current node
c.currentNode.handler.addRemoteService(req.MemberInfo)
c.mu.Lock()
c.members = append(c.members, &Member{isMaster: false, memberInfo: req.MemberInfo})
c.mu.Unlock()
return resp, nil
}
// Unregister implements the MasterServer gRPC service
func (c *cluster) Unregister(_ context.Context, req *clusterpb.UnregisterRequest) (*clusterpb.UnregisterResponse, error) {
if req.ServiceAddr == "" {
return nil, ErrInvalidRegisterReq
}
var index = -1
resp := &clusterpb.UnregisterResponse{}
for i, m := range c.members {
if m.memberInfo.ServiceAddr == req.ServiceAddr {
index = i
break
}
}
if index < 0 {
return nil, fmt.Errorf("address %s has notregistered", req.ServiceAddr)
}
// Notify registered node to update remote services
delMember := &clusterpb.DelMemberRequest{ServiceAddr: req.ServiceAddr}
for _, m := range c.members {
if m.MemberInfo().ServiceAddr == c.currentNode.ServiceAddr {
continue
}
pool, err := c.rpcClient.getConnPool(m.memberInfo.ServiceAddr)
if err != nil {
return nil, err
}
client := clusterpb.NewMemberClient(pool.Get())
_, err = client.DelMember(context.Background(), delMember)
if err != nil {
return nil, err
}
}
log.Println("Exists peer unregister to cluster", req.ServiceAddr)
// Register services to current node
c.currentNode.handler.delMember(req.ServiceAddr)
c.mu.Lock()
if index == len(c.members)-1 {
c.members = c.members[:index]
} else {
c.members = append(c.members[:index], c.members[index+1:]...)
}
c.mu.Unlock()
return resp, nil
}
func (c *cluster) setRpcClient(client *rpcClient) {
c.rpcClient = client
}
func (c *cluster) remoteAddrs() []string {
var addrs []string
c.mu.RLock()
for _, m := range c.members {
addrs = append(addrs, m.memberInfo.ServiceAddr)
}
c.mu.RUnlock()
return addrs
}
func (c *cluster) initMembers(members []*clusterpb.MemberInfo) {
c.mu.Lock()
for _, info := range members {
c.members = append(c.members, &Member{
memberInfo: info,
})
}
c.mu.Unlock()
}
func (c *cluster) addMember(info *clusterpb.MemberInfo) {
c.mu.Lock()
var found bool
for _, member := range c.members {
if member.memberInfo.ServiceAddr == info.ServiceAddr {
member.memberInfo = info
found = true
break
}
}
if !found {
c.members = append(c.members, &Member{
memberInfo: info,
})
}
c.mu.Unlock()
}
func (c *cluster) delMember(addr string) {
c.mu.Lock()
var index = -1
for i, member := range c.members {
if member.memberInfo.ServiceAddr == addr {
index = i
break
}
}
if index != -1 {
c.members = append(c.members[:index], c.members[index+1:]...)
}
c.mu.Unlock()
}