You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ntool/ncmd/cmd.go

387 lines
9.7 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package ncmd
import (
"errors"
"fmt"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nlog"
"io"
"os/exec"
"sync"
"syscall"
"time"
)
var (
ErrNotStarted = errors.New("command not running")
ErrMarshalerNil = errors.New("marshaller is nil but data not []byte type")
)
type (
Options struct {
Marshaler ndef.Marshaler // 序列化
// 缓存 STDOUT 和 STDERR最终写入 Status 内
// 通过 Cmd.Status 获取
Buffered bool
// STDOUT 和 STDERR 写入到 Status.Stdout 中,类似 linux 中的 2>&1
// 注当Streaming开启时此特性无效
CombinedOutput bool
// 流式处理,会将所有数据即时输出
Streaming bool
// LineBufferSize 设置 OutputStream 行缓冲区的大小,默认值 DEFAULT_LINE_BUFFER_SIZE
// 但如果发生 ErrLineBufferOverflow 错误,请尝试增加大小
LineBufferSize uint
// DevMode 开发模式打印log
DevMode bool
}
Option func(opt *Options)
)
// Cmd 处理器
type Cmd struct {
*Options
// Runtime
Cmd *exec.Cmd // CMD
Name string // name of binary(command) to run
Args []string // arguments
Env []string // *optional environment
Dir string // working dir
stdIn io.WriteCloser // 标准输入通道
stdoutStream *OutputStream // 标准输出通道
stderrStream *OutputStream // 标准错误输出通道
// flags
started bool // cmd.Start called, no error
stopped bool // cmd.Stop called
done bool // run() done
final bool // status finalized in Status
doneChan chan struct{} // closed when done running
status Status
statusChan chan Status // nil until Start() called
startTime time.Time
// messages
Stdout chan string // Stdout sets streaming STDOUT if enabled, else nil (see Options).
Stderr chan string // Stderr sets streaming STDERR if enabled, else nil (see Options).
stdoutBuf *OutputBuffer // 标准输出缓冲区
stderrBuf *OutputBuffer // 标准错误缓冲区
chSend chan []byte // 发送的数据缓冲区
*sync.Mutex
}
type Status struct {
Cmd string
PID int
Complete bool // 如果被主动停止或kill该值为false
Exit int // exitCode
Error error // go error
StartTs int64 // 启动时间(纳秒) 0表示未启动(started)
StopTs int64 // 停止时间(纳秒) 0表示未启动(started)或未运行(running)
Runtime float64 // 运行时长
Stdout []string // buffered STDOUT
Stderr []string // buffered STDIN
}
func NewCmd(opts ...Option) *Cmd {
tmp := &Cmd{
Options: &Options{
Buffered: true,
DevMode: true,
},
// flags
started: false,
stopped: false,
done: false,
final: false,
doneChan: make(chan struct{}),
status: Status{
Exit: -1,
},
statusChan: nil,
// messages
chSend: make(chan []byte, 1024),
Mutex: &sync.Mutex{},
}
for _, opt := range opts {
opt(tmp.Options)
}
options := tmp.Options
if options.LineBufferSize == 0 {
options.LineBufferSize = DEFAULT_LINE_BUFFER_SIZE
}
if options.Buffered {
tmp.stdoutBuf = NewOutputBuffer()
tmp.stderrBuf = NewOutputBuffer()
}
if options.CombinedOutput {
tmp.stdoutBuf = NewOutputBuffer()
tmp.stderrBuf = nil
}
if options.Streaming {
tmp.Stdout = make(chan string, DEFAULT_STREAM_CHAN_SIZE)
tmp.stdoutStream = NewOutputStream(tmp.Stdout)
tmp.stdoutStream.SetLineBufferSize(int(options.LineBufferSize))
tmp.Stderr = make(chan string, DEFAULT_STREAM_CHAN_SIZE)
tmp.stderrStream = NewOutputStream(tmp.Stderr)
tmp.stderrStream.SetLineBufferSize(int(options.LineBufferSize))
}
return tmp
}
func (c *Cmd) Start(name string, args ...string) <-chan Status {
c.Name = name
c.Args = args
c.status.Cmd = name
c.Lock()
defer c.Unlock()
if c.statusChan != nil {
return c.statusChan
}
c.statusChan = make(chan Status, 1)
go c.run()
return c.statusChan
}
func (c *Cmd) Stop() error {
c.Lock()
defer c.Unlock()
if c.statusChan == nil || !c.started {
return ErrNotStarted
}
// 如果 done 为true表示程序已经执行完毕无需再Stop
if c.done {
return nil
}
c.stopped = true
// Signal the process group (-pid), not just the process, so that the process
// and all its children are signaled. Else, child procs can keep running and
// keep the stdout/stderr fd open and cause cmd.Wait to hang.
return terminateProcess(c.Cmd.Process.Pid)
}
func (c *Cmd) Status() Status {
c.Lock()
defer c.Unlock()
// Return default status if cmd hasn't been started
if c.statusChan == nil || !c.started {
return c.status
}
if c.done {
// No longer running
if !c.final {
if c.stdoutBuf != nil {
c.status.Stdout = c.stdoutBuf.Lines()
c.stdoutBuf = nil // release buffers
}
if c.stderrBuf != nil {
c.status.Stderr = c.stderrBuf.Lines()
c.stderrBuf = nil // release buffers
}
c.final = true
}
} else {
// Still running
c.status.Runtime = time.Now().Sub(c.startTime).Seconds()
if c.stdoutBuf != nil {
c.status.Stdout = c.stdoutBuf.Lines()
}
if c.stderrBuf != nil {
c.status.Stderr = c.stderrBuf.Lines()
}
}
return c.status
}
// Done 返回一个在Cmd停止运行时关闭的通道。
// 等待所有的 goroutine 完成执行
// 命令完成后调用 Status 来获取其最终状态
func (c *Cmd) Done() <-chan struct{} {
return c.doneChan
}
func (c *Cmd) run() {
defer func() {
c.statusChan <- c.Status()
close(c.doneChan)
}()
cmd := exec.Command(c.Name, c.Args...)
c.Cmd = cmd
setProcessGroupID(cmd)
switch {
case c.stdoutBuf != nil && c.stderrBuf != nil && c.stdoutStream != nil: // buffer and stream
cmd.Stdout = io.MultiWriter(c.stdoutStream, c.stdoutBuf)
cmd.Stderr = io.MultiWriter(c.stderrStream, c.stderrBuf)
case c.stdoutBuf != nil && c.stderrBuf == nil && c.stdoutStream != nil: // combined buffer and stream
cmd.Stdout = io.MultiWriter(c.stdoutStream, c.stdoutBuf)
cmd.Stderr = io.MultiWriter(c.stderrStream, c.stdoutBuf)
case c.stdoutBuf != nil && c.stderrBuf != nil: // buffer only
cmd.Stdout = c.stdoutBuf
cmd.Stderr = c.stderrBuf
case c.stdoutBuf != nil && c.stderrBuf == nil: // buffer combining stderr into stdout
cmd.Stdout = c.stdoutBuf
cmd.Stderr = c.stdoutBuf
case c.stdoutStream != nil: // stream only
cmd.Stdout = c.stdoutStream
cmd.Stderr = c.stderrStream
default: // no output (cmd >/dev/null 2>&1)
cmd.Stdout = nil
cmd.Stderr = nil
}
c.stdIn, _ = cmd.StdinPipe()
c.Env = cmd.Env
c.Dir = cmd.Dir
// Always close output streams. Do not do this after Wait because if Start
// fails and we return without closing these, it could deadlock the caller
// who's waiting for us to close them.
if c.stdoutStream != nil {
defer func() {
c.stdoutStream.Flush()
c.stderrStream.Flush()
// exec.Cmd.Wait has already waited for all output:
// Otherwise, during the execution of the command a separate goroutine
// reads from the process over a pipe and delivers that data to the
// corresponding Writer. In this case, Wait does not complete until the
// goroutine reaches EOF or encounters an error.
// from https://golang.org/pkg/os/exec/#Cmd
close(c.Stdout)
close(c.Stderr)
}()
}
now := time.Now()
if err := cmd.Start(); err != nil {
c.Lock()
c.status.Error = err
c.status.StartTs = now.UnixNano()
c.status.StopTs = time.Now().UnixNano()
c.done = true
c.Unlock()
return
}
// Set initial status
c.Lock()
c.startTime = now // running
c.status.PID = cmd.Process.Pid
c.status.StartTs = now.UnixNano()
c.started = true
c.Unlock()
// inner write
go c.writeLoop()
err := cmd.Wait()
now = time.Now()
// Get exit code of the command. According to the manual, Wait() returns:
// "If the command fails to run or doesn't complete successfully, the error
// is of type *ExitError. Other error types may be returned for I/O problems."
exitCode := 0
signaled := false
if err != nil && fmt.Sprintf("%T", err) == "*exec.ExitError" {
// This is the normal case which is not really an error. It's string
// representation is only "*exec.ExitError". It only means the cmd
// did not exit zero and caller should see ExitError.Stderr, which
// we already have. So first we'll have this as the real/underlying
// type, then discard err so status.Error doesn't contain a useless
// "*exec.ExitError". With the real type we can get the non-zero
// exit code and determine if the process was signaled, which yields
// a more specific error message, so we set err again in that case.
exiterr := err.(*exec.ExitError)
err = nil
if waitStatus, ok := exiterr.Sys().(syscall.WaitStatus); ok {
exitCode = waitStatus.ExitStatus() // -1 if signaled
if waitStatus.Signaled() {
signaled = true
err = errors.New(exiterr.Error()) // "signal: terminated"
}
}
}
// Set final status
c.Lock()
if !c.stopped && !signaled {
c.status.Complete = true
}
c.status.Runtime = now.Sub(c.startTime).Seconds()
c.status.StopTs = now.UnixNano()
c.status.Exit = exitCode
c.status.Error = err
c.done = true
c.Unlock()
}
// Send 发送指令
func (c *Cmd) Send(data any) (err error) {
var bytes []byte
if c.Marshaler == nil {
if bs, ok := data.([]byte); ok {
bytes = bs
} else {
return ErrMarshalerNil
}
} else {
bytes, err = c.Marshaler.Marshal(data)
if err != nil {
nlog.Errorf("marshal data [%v] err %v", data, err)
return err
}
}
c.chSend <- bytes
return nil
}
func (c *Cmd) writeLoop() {
defer func() {
close(c.chSend)
_ = c.stdIn.Close()
}()
for {
select {
case <-c.Done():
return
case data := <-c.chSend:
// 实际写入数据
if c.DevMode {
nlog.Debugf("发送数据: [%s]", string(data))
}
data = append(data, '\n')
if _, err := c.stdIn.Write(data); err != nil {
return
}
}
}
}