You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ntool/nsys/cmdn/proc.go

365 lines
8.0 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package cmdn
import (
"bufio"
"context"
"errors"
"fmt"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nstr"
atomic2 "git.noahlan.cn/noahlan/ntool/nsys/atomic"
"io"
"log"
"os/exec"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
DefaultBlockPrefix = "block"
DefaultNonBlockPrefix = "non-block"
DefaultTimeout = time.Second * 0
)
var (
ErrBrokenPipe = errors.New("broken low-level pipe")
defaultStartupDecidedFunc = func(_ *strings.Builder, line string) bool {
return true
}
defaultEndLineDecidedFunc = func(sb *strings.Builder, line string) bool {
return true
}
defaultReadIDFunc = func(serializer ndef.Serializer, data string) (string, error) {
return "", nil
}
)
type (
pendingMsg struct {
id string
chWait chan struct{}
callback CallbackFunc
}
)
type (
LineFunc func(sb *strings.Builder, line string) bool
ReadIDFunc func(serializer ndef.Serializer, data string) (string, error)
CallbackFunc func(serializer ndef.Serializer, data string)
Options struct {
Marshaler ndef.Marshaler // 序列化
Unmarshaler ndef.Unmarshaler // 反序列化
StartupDecidedFunc LineFunc // 启动决议方法
EndLineDecidedFunc LineFunc // 行尾判断方法
ReadIDFunc ReadIDFunc // 从数据中获取ID的方法
Timeout time.Duration // 超时时间
}
Option func(opt *Options)
)
// Processor 处理器
type Processor struct {
*Options
Context context.Context
cancelFunc context.CancelFunc
Cmd *exec.Cmd // CMD
stdIn io.WriteCloser // 标准输入通道
stdOut io.ReadCloser // 标准输出通道
stdErr io.ReadCloser // 标准错误输出通道,一些程序会在此通道输出错误信息和一般信息
isBlock bool // 底层是否为同步逻辑
isStartup *atomic.Bool // 子进程是否真正启动完毕
chStart chan struct{} // 程序真正启动完毕信号
chSend chan ICommand // 待发送数据
chWrite chan []byte // 实际发送的数据
sendIdx *atomic2.AtomicInt64 // 发送缓冲ID
recIdx *atomic2.AtomicInt64 // 接收缓冲ID
pendingMsgMap map[string]*pendingMsg // 发送缓存区map
*sync.Mutex
}
func NewProcessor(block bool, opts ...Option) *Processor {
defaultSerializer := NewPlainSerializer()
tmp := &Processor{
Options: &Options{
Marshaler: defaultSerializer,
Unmarshaler: defaultSerializer,
StartupDecidedFunc: defaultStartupDecidedFunc,
EndLineDecidedFunc: defaultEndLineDecidedFunc,
ReadIDFunc: defaultReadIDFunc,
Timeout: DefaultTimeout,
},
isBlock: block,
isStartup: &atomic.Bool{},
chStart: make(chan struct{}),
chSend: make(chan ICommand, 64),
chWrite: make(chan []byte, 64*8),
sendIdx: atomic2.NewAtomicInt64(),
recIdx: atomic2.NewAtomicInt64(),
pendingMsgMap: make(map[string]*pendingMsg),
Mutex: &sync.Mutex{},
}
for _, opt := range opts {
opt(tmp.Options)
}
if tmp.Timeout == 0 {
tmp.Context, tmp.cancelFunc = context.WithCancel(context.Background())
} else {
tmp.Context, tmp.cancelFunc = context.WithTimeout(context.Background(), tmp.Timeout)
}
return tmp
}
func (s *Processor) Run(name string, args ...string) error {
s.Cmd = exec.CommandContext(s.Context, name, args...)
s.stdIn, _ = s.Cmd.StdinPipe()
s.stdOut, _ = s.Cmd.StdoutPipe()
s.stdErr, _ = s.Cmd.StderrPipe()
setProcessGroupID(s.Cmd)
err := s.Cmd.Start()
go func() {
err := s.Cmd.Wait()
if err != nil {
log.Println(fmt.Sprintf("错误:命令行 %+v", err))
}
_ = s.stdErr.Close()
_ = s.stdIn.Close()
_ = s.stdOut.Close()
}()
s.listen()
return err
}
// Listen 开始监听
func (s *Processor) listen() {
go s.handle(s.stdOut, "stdOut")
go s.handle(s.stdErr, "stdErr")
// 等待程序启动完毕
select {
case <-s.chStart:
}
go s.writeLoop()
}
func (s *Processor) Stop() error {
s.Lock()
defer s.Unlock()
s.cancelFunc()
return terminateProcess(s.Cmd.Process.Pid)
}
// Exec 异步执行命令
func (s *Processor) Exec(data ICommand, callback CallbackFunc) (err error) {
_, err = s.exec(data, false, callback)
return
}
// ExecAsync 同步执行命令
func (s *Processor) ExecAsync(data ICommand, callback CallbackFunc) error {
pm, err := s.exec(data, true, callback)
if err != nil {
return err
}
// 同步等待消息回复
<-pm.chWait
return err
}
func (s *Processor) exec(data ICommand, withWait bool, callback CallbackFunc) (pm *pendingMsg, err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
_ = s.sendIdx.DecrementAndGet()
}
}()
cID := data.MessageID()
if len(cID) == 0 {
if s.isBlock {
// block-1
cID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.sendIdx.IncrementAndGet())
} else {
cID = fmt.Sprintf("%s-%s-%d", DefaultNonBlockPrefix, cID, s.sendIdx.IncrementAndGet())
// error
return nil, errors.New("异步底层必须消息必须传递消息ID")
}
}
var chWait chan struct{}
if withWait {
chWait = make(chan struct{})
}
pm = &pendingMsg{
id: cID,
chWait: chWait,
callback: callback,
}
s.Lock()
s.pendingMsgMap[pm.id] = pm
s.Unlock()
s.chSend <- data
return pm, err
}
func (s *Processor) writeLoop() {
defer func() {
close(s.chSend)
close(s.chWrite)
}()
for {
select {
case <-s.Context.Done():
return
case data := <-s.chSend:
var (
bytes []byte
err error
)
bytes, err = s.Marshaler.Marshal(data)
if err != nil {
fmt.Println(fmt.Sprintf("序列化失败: %+v", err))
break
}
s.chWrite <- bytes
case data := <-s.chWrite:
// 实际写入数据
fmt.Println(fmt.Sprintf("发送数据: [%s]", string(data)))
data = append(data, '\n')
if _, err := s.stdIn.Write(data); err != nil {
return
}
}
}
}
func (s *Processor) handle(reader io.ReadCloser, typ string) {
defer func() {
_ = reader.Close()
}()
buffer := bufio.NewReader(reader)
endLine := false
content := strings.Builder{}
for {
select {
case <-s.Context.Done():
return
default:
break
}
lineBytes, isPrefix, err := buffer.ReadLine()
if err != nil || err == io.EOF {
fmt.Println(fmt.Sprintf("[%s] 读取数据时发生错误: %v", typ, err))
break
}
line, err := s.readBytesString(lineBytes)
if err != nil {
break
}
if !s.Started() {
fmt.Println(fmt.Sprintf("[%s] 接收普通消息:[%s]", typ, line))
// 判断程序成功启动 外部逻辑
if s.StartupDecidedFunc(&content, line) {
s.storeStarted(true)
s.chStart <- struct{}{}
fmt.Println(fmt.Sprintf("[%s] 启动完毕,等待输出...", typ))
}
continue
}
content.WriteString(line)
if !isPrefix && len(line) > 0 {
content.WriteByte('\n')
}
// 最后一行的判定逻辑
if !endLine && s.EndLineDecidedFunc(&content, line) {
endLine = true
}
if endLine {
endLine = false
cStr := content.String()
if len(cStr) > 0 {
cStr = cStr[:len(cStr)-1]
}
content.Reset()
revID, err := s.ReadIDFunc(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr)
if err != nil {
continue
}
//fmt.Println(fmt.Sprintf("[%s] 接收指令消息:[%s]", typ, cStr))
// block 需要自行维护ID是无法透传的
if len(revID) <= 0 {
revID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.recIdx.IncrementAndGet())
}
pending, ok := s.pendingMsgMap[revID]
if !ok {
fmt.Println("找不到已发送数据,无法对应处理数据!")
continue
}
delete(s.pendingMsgMap, revID)
go func() {
pending.callback(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr)
if pending.chWait != nil {
pending.chWait <- struct{}{}
}
}()
}
}
}
func (s *Processor) readBytesString(data []byte) (string, error) {
// 编码
if nstr.Charset(data) == nstr.GBK {
gbk, err := nstr.ToGBK(data)
if err != nil {
return "", err
}
return string(gbk), nil
} else {
return string(data), nil
}
}
// Started 是否启动完成
func (s *Processor) Started() bool {
return s.isStartup.Load()
}
func (s *Processor) storeStarted(val bool) {
s.isStartup.Store(val)
}