package cmdn import ( "bufio" "context" "errors" "fmt" "git.noahlan.cn/noahlan/ntool/ndef" "git.noahlan.cn/noahlan/ntool/nstr" atomic2 "git.noahlan.cn/noahlan/ntool/nsys/atomic" "io" "log" "os/exec" "strings" "sync" "sync/atomic" "time" ) const ( DefaultBlockPrefix = "block" DefaultNonBlockPrefix = "non-block" DefaultTimeout = time.Second * 0 ) var ( ErrBrokenPipe = errors.New("broken low-level pipe") defaultStartupDecidedFunc = func(_ *strings.Builder, line string) bool { return true } defaultEndLineDecidedFunc = func(sb *strings.Builder, line string) bool { return true } defaultReadIDFunc = func(serializer ndef.Serializer, data string) (string, error) { return "", nil } ) type ( pendingMsg struct { id string chWait chan struct{} callback CallbackFunc } ) type ( LineFunc func(sb *strings.Builder, line string) bool ReadIDFunc func(serializer ndef.Serializer, data string) (string, error) CallbackFunc func(serializer ndef.Serializer, data string) Options struct { Marshaler ndef.Marshaler // 序列化 Unmarshaler ndef.Unmarshaler // 反序列化 StartupDecidedFunc LineFunc // 启动决议方法 EndLineDecidedFunc LineFunc // 行尾判断方法 ReadIDFunc ReadIDFunc // 从数据中获取ID的方法 Timeout time.Duration // 超时时间 } Option func(opt *Options) ) // Processor 处理器 type Processor struct { *Options Context context.Context cancelFunc context.CancelFunc Cmd *exec.Cmd // CMD stdIn io.WriteCloser // 标准输入通道 stdOut io.ReadCloser // 标准输出通道 stdErr io.ReadCloser // 标准错误输出通道,一些程序会在此通道输出错误信息和一般信息 isBlock bool // 底层是否为同步逻辑 isStartup *atomic.Bool // 子进程是否真正启动完毕 chStart chan struct{} // 程序真正启动完毕信号 chSend chan ICommand // 待发送数据 chWrite chan []byte // 实际发送的数据 sendIdx *atomic2.AtomicInt64 // 发送缓冲ID recIdx *atomic2.AtomicInt64 // 接收缓冲ID pendingMsgMap map[string]*pendingMsg // 发送缓存区map *sync.Mutex } func NewProcessor(block bool, opts ...Option) *Processor { defaultSerializer := NewPlainSerializer() tmp := &Processor{ Options: &Options{ Marshaler: defaultSerializer, Unmarshaler: defaultSerializer, StartupDecidedFunc: defaultStartupDecidedFunc, EndLineDecidedFunc: defaultEndLineDecidedFunc, ReadIDFunc: defaultReadIDFunc, Timeout: DefaultTimeout, }, isBlock: block, isStartup: &atomic.Bool{}, chStart: make(chan struct{}), chSend: make(chan ICommand, 64), chWrite: make(chan []byte, 64*8), sendIdx: atomic2.NewAtomicInt64(), recIdx: atomic2.NewAtomicInt64(), pendingMsgMap: make(map[string]*pendingMsg), Mutex: &sync.Mutex{}, } for _, opt := range opts { opt(tmp.Options) } if tmp.Timeout == 0 { tmp.Context, tmp.cancelFunc = context.WithCancel(context.Background()) } else { tmp.Context, tmp.cancelFunc = context.WithTimeout(context.Background(), tmp.Timeout) } return tmp } func (s *Processor) Run(name string, args ...string) error { s.Cmd = exec.CommandContext(s.Context, name, args...) s.stdIn, _ = s.Cmd.StdinPipe() s.stdOut, _ = s.Cmd.StdoutPipe() s.stdErr, _ = s.Cmd.StderrPipe() setProcessGroupID(s.Cmd) err := s.Cmd.Start() go func() { err := s.Cmd.Wait() if err != nil { log.Println(fmt.Sprintf("错误:命令行 %+v", err)) } _ = s.stdErr.Close() _ = s.stdIn.Close() _ = s.stdOut.Close() }() s.listen() return err } // Listen 开始监听 func (s *Processor) listen() { go s.handle(s.stdOut, "stdOut") go s.handle(s.stdErr, "stdErr") // 等待程序启动完毕 select { case <-s.chStart: } go s.writeLoop() } func (s *Processor) Stop() error { s.Lock() defer s.Unlock() s.cancelFunc() return terminateProcess(s.Cmd.Process.Pid) } // Exec 异步执行命令 func (s *Processor) Exec(data ICommand, callback CallbackFunc) (err error) { _, err = s.exec(data, false, callback) return } // ExecAsync 同步执行命令 func (s *Processor) ExecAsync(data ICommand, callback CallbackFunc) error { pm, err := s.exec(data, true, callback) if err != nil { return err } // 同步等待消息回复 <-pm.chWait return err } func (s *Processor) exec(data ICommand, withWait bool, callback CallbackFunc) (pm *pendingMsg, err error) { defer func() { if e := recover(); e != nil { err = ErrBrokenPipe _ = s.sendIdx.DecrementAndGet() } }() cID := data.MessageID() if len(cID) == 0 { if s.isBlock { // block-1 cID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.sendIdx.IncrementAndGet()) } else { cID = fmt.Sprintf("%s-%s-%d", DefaultNonBlockPrefix, cID, s.sendIdx.IncrementAndGet()) // error return nil, errors.New("异步底层必须消息必须传递消息ID") } } var chWait chan struct{} if withWait { chWait = make(chan struct{}) } pm = &pendingMsg{ id: cID, chWait: chWait, callback: callback, } s.Lock() s.pendingMsgMap[pm.id] = pm s.Unlock() s.chSend <- data return pm, err } func (s *Processor) writeLoop() { defer func() { close(s.chSend) close(s.chWrite) }() for { select { case <-s.Context.Done(): return case data := <-s.chSend: var ( bytes []byte err error ) bytes, err = s.Marshaler.Marshal(data) if err != nil { fmt.Println(fmt.Sprintf("序列化失败: %+v", err)) break } s.chWrite <- bytes case data := <-s.chWrite: // 实际写入数据 fmt.Println(fmt.Sprintf("发送数据: [%s]", string(data))) data = append(data, '\n') if _, err := s.stdIn.Write(data); err != nil { return } } } } func (s *Processor) handle(reader io.ReadCloser, typ string) { defer func() { _ = reader.Close() }() buffer := bufio.NewReader(reader) endLine := false content := strings.Builder{} for { select { case <-s.Context.Done(): return default: break } lineBytes, isPrefix, err := buffer.ReadLine() if err != nil || err == io.EOF { fmt.Println(fmt.Sprintf("[%s] 读取数据时发生错误: %v", typ, err)) break } line, err := s.readBytesString(lineBytes) if err != nil { break } if !s.Started() { fmt.Println(fmt.Sprintf("[%s] 接收普通消息:[%s]", typ, line)) // 判断程序成功启动 外部逻辑 if s.StartupDecidedFunc(&content, line) { s.storeStarted(true) s.chStart <- struct{}{} fmt.Println(fmt.Sprintf("[%s] 启动完毕,等待输出...", typ)) } continue } content.WriteString(line) if !isPrefix && len(line) > 0 { content.WriteByte('\n') } // 最后一行的判定逻辑 if !endLine && s.EndLineDecidedFunc(&content, line) { endLine = true } if endLine { endLine = false cStr := content.String() if len(cStr) > 0 { cStr = cStr[:len(cStr)-1] } content.Reset() revID, err := s.ReadIDFunc(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr) if err != nil { continue } //fmt.Println(fmt.Sprintf("[%s] 接收指令消息:[%s]", typ, cStr)) // block 需要自行维护ID,是无法透传的 if len(revID) <= 0 { revID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.recIdx.IncrementAndGet()) } pending, ok := s.pendingMsgMap[revID] if !ok { fmt.Println("找不到已发送数据,无法对应处理数据!") continue } delete(s.pendingMsgMap, revID) go func() { pending.callback(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr) if pending.chWait != nil { pending.chWait <- struct{}{} } }() } } } func (s *Processor) readBytesString(data []byte) (string, error) { // 编码 if nstr.Charset(data) == nstr.GBK { gbk, err := nstr.ToGBK(data) if err != nil { return "", err } return string(gbk), nil } else { return string(data), nil } } // Started 是否启动完成 func (s *Processor) Started() bool { return s.isStartup.Load() } func (s *Processor) storeStarted(val bool) { s.isStartup.Store(val) }