You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ntool/nsys/cmdn/proc.go

365 lines
8.0 KiB
Go

1 year ago
package cmdn
import (
"bufio"
"context"
"errors"
"fmt"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nstr"
atomic2 "git.noahlan.cn/noahlan/ntool/nsys/atomic"
"io"
"log"
"os/exec"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
DefaultBlockPrefix = "block"
DefaultNonBlockPrefix = "non-block"
DefaultTimeout = time.Second * 0
)
var (
ErrBrokenPipe = errors.New("broken low-level pipe")
defaultStartupDecidedFunc = func(_ *strings.Builder, line string) bool {
return true
}
defaultEndLineDecidedFunc = func(sb *strings.Builder, line string) bool {
return true
}
defaultReadIDFunc = func(serializer ndef.Serializer, data string) (string, error) {
return "", nil
}
)
type (
pendingMsg struct {
id string
chWait chan struct{}
callback CallbackFunc
}
)
type (
LineFunc func(sb *strings.Builder, line string) bool
ReadIDFunc func(serializer ndef.Serializer, data string) (string, error)
CallbackFunc func(serializer ndef.Serializer, data string)
Options struct {
Marshaler ndef.Marshaler // 序列化
Unmarshaler ndef.Unmarshaler // 反序列化
StartupDecidedFunc LineFunc // 启动决议方法
EndLineDecidedFunc LineFunc // 行尾判断方法
ReadIDFunc ReadIDFunc // 从数据中获取ID的方法
Timeout time.Duration // 超时时间
}
Option func(opt *Options)
)
// Processor 处理器
type Processor struct {
*Options
Context context.Context
cancelFunc context.CancelFunc
Cmd *exec.Cmd // CMD
stdIn io.WriteCloser // 标准输入通道
stdOut io.ReadCloser // 标准输出通道
stdErr io.ReadCloser // 标准错误输出通道,一些程序会在此通道输出错误信息和一般信息
isBlock bool // 底层是否为同步逻辑
isStartup *atomic.Bool // 子进程是否真正启动完毕
chStart chan struct{} // 程序真正启动完毕信号
chSend chan ICommand // 待发送数据
chWrite chan []byte // 实际发送的数据
sendIdx *atomic2.AtomicInt64 // 发送缓冲ID
recIdx *atomic2.AtomicInt64 // 接收缓冲ID
pendingMsgMap map[string]*pendingMsg // 发送缓存区map
*sync.Mutex
}
func NewProcessor(block bool, opts ...Option) *Processor {
defaultSerializer := NewPlainSerializer()
tmp := &Processor{
Options: &Options{
Marshaler: defaultSerializer,
Unmarshaler: defaultSerializer,
StartupDecidedFunc: defaultStartupDecidedFunc,
EndLineDecidedFunc: defaultEndLineDecidedFunc,
ReadIDFunc: defaultReadIDFunc,
Timeout: DefaultTimeout,
},
isBlock: block,
isStartup: &atomic.Bool{},
chStart: make(chan struct{}),
chSend: make(chan ICommand, 64),
chWrite: make(chan []byte, 64*8),
sendIdx: atomic2.NewAtomicInt64(),
recIdx: atomic2.NewAtomicInt64(),
pendingMsgMap: make(map[string]*pendingMsg),
Mutex: &sync.Mutex{},
}
for _, opt := range opts {
opt(tmp.Options)
}
if tmp.Timeout == 0 {
tmp.Context, tmp.cancelFunc = context.WithCancel(context.Background())
} else {
tmp.Context, tmp.cancelFunc = context.WithTimeout(context.Background(), tmp.Timeout)
}
return tmp
}
func (s *Processor) Run(name string, args ...string) error {
s.Cmd = exec.CommandContext(s.Context, name, args...)
s.stdIn, _ = s.Cmd.StdinPipe()
s.stdOut, _ = s.Cmd.StdoutPipe()
s.stdErr, _ = s.Cmd.StderrPipe()
setProcessGroupID(s.Cmd)
err := s.Cmd.Start()
go func() {
err := s.Cmd.Wait()
if err != nil {
log.Println(fmt.Sprintf("错误:命令行 %+v", err))
}
_ = s.stdErr.Close()
_ = s.stdIn.Close()
_ = s.stdOut.Close()
}()
s.listen()
return err
}
// Listen 开始监听
func (s *Processor) listen() {
go s.handle(s.stdOut, "stdOut")
go s.handle(s.stdErr, "stdErr")
// 等待程序启动完毕
select {
case <-s.chStart:
}
go s.writeLoop()
}
func (s *Processor) Stop() error {
s.Lock()
defer s.Unlock()
s.cancelFunc()
return terminateProcess(s.Cmd.Process.Pid)
}
// Exec 异步执行命令
func (s *Processor) Exec(data ICommand, callback CallbackFunc) (err error) {
_, err = s.exec(data, false, callback)
return
}
// ExecAsync 同步执行命令
func (s *Processor) ExecAsync(data ICommand, callback CallbackFunc) error {
pm, err := s.exec(data, true, callback)
if err != nil {
return err
}
// 同步等待消息回复
<-pm.chWait
return err
}
func (s *Processor) exec(data ICommand, withWait bool, callback CallbackFunc) (pm *pendingMsg, err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
_ = s.sendIdx.DecrementAndGet()
}
}()
cID := data.MessageID()
if len(cID) == 0 {
if s.isBlock {
// block-1
cID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.sendIdx.IncrementAndGet())
} else {
cID = fmt.Sprintf("%s-%s-%d", DefaultNonBlockPrefix, cID, s.sendIdx.IncrementAndGet())
// error
return nil, errors.New("异步底层必须消息必须传递消息ID")
}
}
var chWait chan struct{}
if withWait {
chWait = make(chan struct{})
}
pm = &pendingMsg{
id: cID,
chWait: chWait,
callback: callback,
}
s.Lock()
s.pendingMsgMap[pm.id] = pm
s.Unlock()
s.chSend <- data
return pm, err
}
func (s *Processor) writeLoop() {
defer func() {
close(s.chSend)
close(s.chWrite)
}()
for {
select {
case <-s.Context.Done():
return
case data := <-s.chSend:
var (
bytes []byte
err error
)
bytes, err = s.Marshaler.Marshal(data)
if err != nil {
fmt.Println(fmt.Sprintf("序列化失败: %+v", err))
break
}
s.chWrite <- bytes
case data := <-s.chWrite:
// 实际写入数据
fmt.Println(fmt.Sprintf("发送数据: [%s]", string(data)))
data = append(data, '\n')
if _, err := s.stdIn.Write(data); err != nil {
return
}
}
}
}
func (s *Processor) handle(reader io.ReadCloser, typ string) {
defer func() {
_ = reader.Close()
}()
buffer := bufio.NewReader(reader)
endLine := false
content := strings.Builder{}
for {
select {
case <-s.Context.Done():
return
default:
break
}
lineBytes, isPrefix, err := buffer.ReadLine()
if err != nil || err == io.EOF {
fmt.Println(fmt.Sprintf("[%s] 读取数据时发生错误: %v", typ, err))
break
}
line, err := s.readBytesString(lineBytes)
if err != nil {
break
}
if !s.Started() {
fmt.Println(fmt.Sprintf("[%s] 接收普通消息:[%s]", typ, line))
// 判断程序成功启动 外部逻辑
if s.StartupDecidedFunc(&content, line) {
s.storeStarted(true)
s.chStart <- struct{}{}
fmt.Println(fmt.Sprintf("[%s] 启动完毕,等待输出...", typ))
}
continue
}
content.WriteString(line)
if !isPrefix && len(line) > 0 {
content.WriteByte('\n')
}
// 最后一行的判定逻辑
if !endLine && s.EndLineDecidedFunc(&content, line) {
endLine = true
}
if endLine {
endLine = false
cStr := content.String()
if len(cStr) > 0 {
cStr = cStr[:len(cStr)-1]
}
content.Reset()
revID, err := s.ReadIDFunc(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr)
if err != nil {
continue
}
//fmt.Println(fmt.Sprintf("[%s] 接收指令消息:[%s]", typ, cStr))
// block 需要自行维护ID是无法透传的
if len(revID) <= 0 {
revID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.recIdx.IncrementAndGet())
}
pending, ok := s.pendingMsgMap[revID]
if !ok {
fmt.Println("找不到已发送数据,无法对应处理数据!")
continue
}
delete(s.pendingMsgMap, revID)
go func() {
pending.callback(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr)
if pending.chWait != nil {
pending.chWait <- struct{}{}
}
}()
}
}
}
func (s *Processor) readBytesString(data []byte) (string, error) {
// 编码
if nstr.Charset(data) == nstr.GBK {
gbk, err := nstr.ToGBK(data)
if err != nil {
return "", err
}
return string(gbk), nil
} else {
return string(data), nil
}
}
// Started 是否启动完成
func (s *Processor) Started() bool {
return s.isStartup.Load()
}
func (s *Processor) storeStarted(val bool) {
s.isStartup.Store(val)
}