diff --git a/ncmd/.gitignore b/ncmd/.gitignore new file mode 100644 index 0000000..3598c30 --- /dev/null +++ b/ncmd/.gitignore @@ -0,0 +1 @@ +tests \ No newline at end of file diff --git a/ncmd/cmd.go b/ncmd/cmd.go new file mode 100644 index 0000000..1e328ed --- /dev/null +++ b/ncmd/cmd.go @@ -0,0 +1,381 @@ +package ncmd + +import ( + "errors" + "fmt" + "git.noahlan.cn/noahlan/ntool/ndef" + "git.noahlan.cn/noahlan/ntool/nlog" + "io" + "os/exec" + "sync" + "syscall" + "time" +) + +var ( + ErrNotStarted = errors.New("command not running") + ErrMarshalerNil = errors.New("marshaller is nil but data not []byte type") +) + +type ( + Options struct { + Marshaler ndef.Marshaler // 序列化 + + // 缓存 STDOUT 和 STDERR,最终写入 Status 内 + // 通过 Cmd.Status 获取 + Buffered bool + + // STDOUT 和 STDERR 写入到 Status.Stdout 中,类似 linux 中的 2>&1 + // 注:当Streaming开启时,此特性无效 + CombinedOutput bool + + // 流式处理,会将所有数据即时输出 + Streaming bool + + // LineBufferSize 设置 OutputStream 行缓冲区的大小,默认值 DEFAULT_LINE_BUFFER_SIZE + // 但如果发生 ErrLineBufferOverflow 错误,请尝试增加大小 + LineBufferSize uint + + // DevMode 开发模式,打印log + DevMode bool + } + Option func(opt *Options) +) + +// Cmd 处理器 +type Cmd struct { + *Options + + // Runtime + Cmd *exec.Cmd // CMD + Name string // name of binary(command) to run + Args []string // arguments + Env []string // *optional environment + Dir string // working dir + stdIn io.WriteCloser // 标准输入通道 + + stdoutStream *OutputStream // 标准输出通道 + stderrStream *OutputStream // 标准错误输出通道 + + // flags + started bool // cmd.Start called, no error + stopped bool // cmd.Stop called + done bool // run() done + final bool // status finalized in Status + doneChan chan struct{} // closed when done running + status Status + statusChan chan Status // nil until Start() called + startTime time.Time + + // messages + Stdout chan string // Stdout sets streaming STDOUT if enabled, else nil (see Options). + Stderr chan string // Stderr sets streaming STDERR if enabled, else nil (see Options). + stdoutBuf *OutputBuffer // 标准输出缓冲区 + stderrBuf *OutputBuffer // 标准错误缓冲区 + chSend chan []byte // 发送的数据缓冲区 + + *sync.Mutex +} + +type Status struct { + Cmd string + PID int + Complete bool // 如果被主动停止或kill,该值为false + Exit int // exitCode + Error error // go error + StartTs int64 // 启动时间(纳秒) 0表示未启动(started) + StopTs int64 // 停止时间(纳秒) 0表示未启动(started)或未运行(running) + Runtime float64 // 运行时长 + Stdout []string // buffered STDOUT + Stderr []string // buffered STDIN +} + +func NewCmd(opts ...Option) *Cmd { + tmp := &Cmd{ + Options: &Options{ + LineBufferSize: DEFAULT_LINE_BUFFER_SIZE, + Buffered: true, + }, + // flags + started: false, + stopped: false, + done: false, + final: false, + doneChan: make(chan struct{}), + status: Status{ + Exit: -1, + }, + statusChan: nil, + // messages + chSend: make(chan []byte, 1024), + Mutex: &sync.Mutex{}, + } + + for _, opt := range opts { + opt(tmp.Options) + } + + options := tmp.Options + if options.Buffered { + tmp.stdoutBuf = NewOutputBuffer() + tmp.stderrBuf = NewOutputBuffer() + } + if options.CombinedOutput { + tmp.stdoutBuf = NewOutputBuffer() + tmp.stderrBuf = nil + } + if options.Streaming { + tmp.Stdout = make(chan string, DEFAULT_STREAM_CHAN_SIZE) + tmp.stdoutStream = NewOutputStream(tmp.Stdout) + tmp.stdoutStream.SetLineBufferSize(int(options.LineBufferSize)) + + tmp.Stderr = make(chan string, DEFAULT_STREAM_CHAN_SIZE) + tmp.stderrStream = NewOutputStream(tmp.Stderr) + tmp.stderrStream.SetLineBufferSize(int(options.LineBufferSize)) + } + + return tmp +} + +func (c *Cmd) Start(name string, args ...string) <-chan Status { + c.Name = name + c.Args = args + c.status.Cmd = name + + c.Lock() + defer c.Unlock() + + if c.statusChan != nil { + return c.statusChan + } + c.statusChan = make(chan Status, 1) + + go c.run() + + return c.statusChan +} + +func (c *Cmd) Stop() error { + c.Lock() + defer c.Unlock() + + if c.statusChan == nil || !c.started { + return ErrNotStarted + } + + // 如果 done 为true,表示程序已经执行完毕,无需再Stop + if c.done { + return nil + } + + c.stopped = true + + // Signal the process group (-pid), not just the process, so that the process + // and all its children are signaled. Else, child procs can keep running and + // keep the stdout/stderr fd open and cause cmd.Wait to hang. + return terminateProcess(c.Cmd.Process.Pid) +} + +func (c *Cmd) Status() Status { + c.Lock() + defer c.Unlock() + + // Return default status if cmd hasn't been started + if c.statusChan == nil || !c.started { + return c.status + } + if c.done { + // No longer running + if !c.final { + if c.stdoutBuf != nil { + c.status.Stdout = c.stdoutBuf.Lines() + c.stdoutBuf = nil // release buffers + + } + if c.stderrBuf != nil { + c.status.Stderr = c.stderrBuf.Lines() + c.stderrBuf = nil // release buffers + } + c.final = true + } + } else { + // Still running + c.status.Runtime = time.Now().Sub(c.startTime).Seconds() + if c.stdoutBuf != nil { + c.status.Stdout = c.stdoutBuf.Lines() + + } + if c.stderrBuf != nil { + c.status.Stderr = c.stderrBuf.Lines() + } + } + + return c.status +} + +// Done 返回一个在Cmd停止运行时关闭的通道。 +// 等待所有的 goroutine 完成执行 +// 命令完成后调用 Status 来获取其最终状态 +func (c *Cmd) Done() <-chan struct{} { + return c.doneChan +} + +func (c *Cmd) run() { + defer func() { + c.statusChan <- c.Status() + close(c.doneChan) + }() + + cmd := exec.Command(c.Name, c.Args...) + c.Cmd = cmd + + setProcessGroupID(cmd) + + switch { + case c.stdoutBuf != nil && c.stderrBuf != nil && c.stdoutStream != nil: // buffer and stream + cmd.Stdout = io.MultiWriter(c.stdoutStream, c.stdoutBuf) + cmd.Stderr = io.MultiWriter(c.stderrStream, c.stderrBuf) + case c.stdoutBuf != nil && c.stderrBuf == nil && c.stdoutStream != nil: // combined buffer and stream + cmd.Stdout = io.MultiWriter(c.stdoutStream, c.stdoutBuf) + cmd.Stderr = io.MultiWriter(c.stderrStream, c.stdoutBuf) + case c.stdoutBuf != nil && c.stderrBuf != nil: // buffer only + cmd.Stdout = c.stdoutBuf + cmd.Stderr = c.stderrBuf + case c.stdoutBuf != nil && c.stderrBuf == nil: // buffer combining stderr into stdout + cmd.Stdout = c.stdoutBuf + cmd.Stderr = c.stdoutBuf + case c.stdoutStream != nil: // stream only + cmd.Stdout = c.stdoutStream + cmd.Stderr = c.stderrStream + default: // no output (cmd >/dev/null 2>&1) + cmd.Stdout = nil + cmd.Stderr = nil + } + c.stdIn, _ = cmd.StdinPipe() + + c.Env = cmd.Env + c.Dir = cmd.Dir + + // Always close output streams. Do not do this after Wait because if Start + // fails and we return without closing these, it could deadlock the caller + // who's waiting for us to close them. + if c.stdoutStream != nil { + defer func() { + c.stdoutStream.Flush() + c.stderrStream.Flush() + // exec.Cmd.Wait has already waited for all output: + // Otherwise, during the execution of the command a separate goroutine + // reads from the process over a pipe and delivers that data to the + // corresponding Writer. In this case, Wait does not complete until the + // goroutine reaches EOF or encounters an error. + // from https://golang.org/pkg/os/exec/#Cmd + close(c.Stdout) + close(c.Stderr) + }() + } + + now := time.Now() + if err := cmd.Start(); err != nil { + c.Lock() + c.status.Error = err + c.status.StartTs = now.UnixNano() + c.status.StopTs = time.Now().UnixNano() + c.done = true + c.Unlock() + return + } + + // Set initial status + c.Lock() + c.startTime = now // running + c.status.PID = cmd.Process.Pid + c.status.StartTs = now.UnixNano() + c.started = true + c.Unlock() + + // inner write + go c.writeLoop() + + err := cmd.Wait() + now = time.Now() + + // Get exit code of the command. According to the manual, Wait() returns: + // "If the command fails to run or doesn't complete successfully, the error + // is of type *ExitError. Other error types may be returned for I/O problems." + exitCode := 0 + signaled := false + if err != nil && fmt.Sprintf("%T", err) == "*exec.ExitError" { + // This is the normal case which is not really an error. It's string + // representation is only "*exec.ExitError". It only means the cmd + // did not exit zero and caller should see ExitError.Stderr, which + // we already have. So first we'll have this as the real/underlying + // type, then discard err so status.Error doesn't contain a useless + // "*exec.ExitError". With the real type we can get the non-zero + // exit code and determine if the process was signaled, which yields + // a more specific error message, so we set err again in that case. + exiterr := err.(*exec.ExitError) + err = nil + if waitStatus, ok := exiterr.Sys().(syscall.WaitStatus); ok { + exitCode = waitStatus.ExitStatus() // -1 if signaled + if waitStatus.Signaled() { + signaled = true + err = errors.New(exiterr.Error()) // "signal: terminated" + } + } + } + // Set final status + c.Lock() + if !c.stopped && !signaled { + c.status.Complete = true + } + c.status.Runtime = now.Sub(c.startTime).Seconds() + c.status.StopTs = now.UnixNano() + c.status.Exit = exitCode + c.status.Error = err + c.done = true + c.Unlock() +} + +// Send 发送指令 +func (c *Cmd) Send(data any) (err error) { + var bytes []byte + if c.Marshaler != nil { + if bs, ok := data.([]byte); ok { + bytes = bs + } else { + return ErrMarshalerNil + } + } + bytes, err = c.Marshaler.Marshal(data) + if err != nil { + nlog.Errorf("marshal data [%v] err %v", data, err) + return err + } + c.chSend <- bytes + + return nil +} + +func (c *Cmd) writeLoop() { + defer func() { + close(c.chSend) + _ = c.stdIn.Close() + }() + + for { + select { + case <-c.Done(): + return + case data := <-c.chSend: + // 实际写入数据 + if c.DevMode { + nlog.Debugf("发送数据: [%s]", string(data)) + } + + data = append(data, '\n') + if _, err := c.stdIn.Write(data); err != nil { + return + } + } + } +} diff --git a/nsys/cmdn/cmd_darwin.go b/ncmd/cmd_darwin.go similarity index 71% rename from nsys/cmdn/cmd_darwin.go rename to ncmd/cmd_darwin.go index d09f7d5..64989ee 100644 --- a/nsys/cmdn/cmd_darwin.go +++ b/ncmd/cmd_darwin.go @@ -1,4 +1,4 @@ -package cmdn +package ncmd import ( "os/exec" @@ -16,6 +16,9 @@ func terminateProcess(pid int) error { return syscall.Kill(-pid, syscall.SIGTERM) } +// Set process group ID so the cmd and all its children become a new +// process group. This allows Stop to SIGTERM the cmd's process group +// without killing this process (i.e. this code here). func setProcessGroupID(cmd *exec.Cmd) { - cmd.SysProcAttr = &syscall.SysProcAttr{Setgpid: true} + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } diff --git a/nsys/cmdn/cmd_freebsd.go b/ncmd/cmd_freebsd.go similarity index 71% rename from nsys/cmdn/cmd_freebsd.go rename to ncmd/cmd_freebsd.go index d09f7d5..64989ee 100644 --- a/nsys/cmdn/cmd_freebsd.go +++ b/ncmd/cmd_freebsd.go @@ -1,4 +1,4 @@ -package cmdn +package ncmd import ( "os/exec" @@ -16,6 +16,9 @@ func terminateProcess(pid int) error { return syscall.Kill(-pid, syscall.SIGTERM) } +// Set process group ID so the cmd and all its children become a new +// process group. This allows Stop to SIGTERM the cmd's process group +// without killing this process (i.e. this code here). func setProcessGroupID(cmd *exec.Cmd) { - cmd.SysProcAttr = &syscall.SysProcAttr{Setgpid: true} + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } diff --git a/nsys/cmdn/cmd_linux.go b/ncmd/cmd_linux.go similarity index 77% rename from nsys/cmdn/cmd_linux.go rename to ncmd/cmd_linux.go index 37b9088..64989ee 100644 --- a/nsys/cmdn/cmd_linux.go +++ b/ncmd/cmd_linux.go @@ -1,4 +1,4 @@ -package cmdn +package ncmd import ( "os/exec" @@ -16,6 +16,9 @@ func terminateProcess(pid int) error { return syscall.Kill(-pid, syscall.SIGTERM) } +// Set process group ID so the cmd and all its children become a new +// process group. This allows Stop to SIGTERM the cmd's process group +// without killing this process (i.e. this code here). func setProcessGroupID(cmd *exec.Cmd) { cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} } diff --git a/nsys/cmdn/cmd_windows.go b/ncmd/cmd_windows.go similarity index 97% rename from nsys/cmdn/cmd_windows.go rename to ncmd/cmd_windows.go index 169c446..c1dd4db 100644 --- a/nsys/cmdn/cmd_windows.go +++ b/ncmd/cmd_windows.go @@ -1,4 +1,4 @@ -package cmdn +package ncmd import ( "os" diff --git a/ncmd/codec.go b/ncmd/codec.go new file mode 100644 index 0000000..5674623 --- /dev/null +++ b/ncmd/codec.go @@ -0,0 +1,7 @@ +package ncmd + +import "git.noahlan.cn/noahlan/ntool/nstr" + +func decodeBytes(bs []byte) string { + return nstr.ToGBKStrSafe(bs) +} diff --git a/ncmd/io.go b/ncmd/io.go new file mode 100644 index 0000000..84f303e --- /dev/null +++ b/ncmd/io.go @@ -0,0 +1,239 @@ +package ncmd + +import ( + "bufio" + "bytes" + "fmt" + "sync" +) + +// os/exec.Cmd.StdoutPipe is usually used incorrectly. The docs are clear: +// "it is incorrect to call Wait before all reads from the pipe have completed." +// Therefore, we can't read from the pipe in another goroutine because it +// causes a race condition: we'll read in one goroutine and the original +// goroutine that calls Wait will write on close which is what Wait does. +// The proper solution is using an io.Writer for cmd.Stdout. I couldn't find +// an io.Writer that's also safe for concurrent reads (as lines in a []string +// no less), so I created one: + +// OutputBuffer represents command output that is saved, line by line, in an +// unbounded buffer. It is safe for multiple goroutines to read while the command +// is running and after it has finished. If output is small (a few megabytes) +// and not read frequently, an output buffer is a good solution. +// +// A Cmd in this package uses an OutputBuffer for both STDOUT and STDERR by +// default when created by calling NewCmd. To use OutputBuffer directly with +// a Go standard library os/exec.Command: +// +// import "os/exec" +// import "github.com/go-cmd/cmd" +// runnableCmd := exec.Command(...) +// stdout := cmd.NewOutputBuffer() +// runnableCmd.Stdout = stdout +// +// While runnableCmd is running, call stdout.Lines() to read all output +// currently written. +type OutputBuffer struct { + buf *bytes.Buffer + lines []string + *sync.Mutex +} + +// NewOutputBuffer creates a new output buffer. The buffer is unbounded and safe +// for multiple goroutines to read while the command is running by calling Lines. +func NewOutputBuffer() *OutputBuffer { + out := &OutputBuffer{ + buf: &bytes.Buffer{}, + lines: []string{}, + Mutex: &sync.Mutex{}, + } + return out +} + +// Write makes OutputBuffer implement the io.Writer interface. Do not call +// this function directly. +func (rw *OutputBuffer) Write(p []byte) (n int, err error) { + rw.Lock() + n, err = rw.buf.Write(p) // and bytes.Buffer implements io.Writer + rw.Unlock() + return // implicit +} + +// Lines returns lines of output written by the Cmd. It is safe to call while +// the Cmd is running and after it has finished. Subsequent calls returns more +// lines, if more lines were written. "\r\n" are stripped from the lines. +func (rw *OutputBuffer) Lines() []string { + rw.Lock() + // Scanners are io.Readers which effectively destroy the buffer by reading + // to EOF. So once we scan the buf to lines, the buf is empty again. + s := bufio.NewScanner(rw.buf) + for s.Scan() { + rw.lines = append(rw.lines, decodeBytes(s.Bytes())) + } + rw.Unlock() + return rw.lines +} + +const ( + // DEFAULT_LINE_BUFFER_SIZE is the default size of the OutputStream line buffer. + // The default value is usually sufficient, but if ErrLineBufferOverflow errors + // occur, try increasing the size by calling OutputBuffer.SetLineBufferSize. + DEFAULT_LINE_BUFFER_SIZE = 16384 + + // DEFAULT_STREAM_CHAN_SIZE is the default string channel size for a Cmd when + // Options.Streaming is true. The string channel size can have a minor + // performance impact if too small by causing OutputStream.Write to block + // excessively. + DEFAULT_STREAM_CHAN_SIZE = 1000 +) + +// ErrLineBufferOverflow is returned by OutputStream.Write when the internal +// line buffer is filled before a newline character is written to terminate a +// line. Increasing the line buffer size by calling OutputStream.SetLineBufferSize +// can help prevent this error. +type ErrLineBufferOverflow struct { + Line string // Unterminated line that caused the error + BufferSize int // Internal line buffer size + BufferFree int // Free bytes in line buffer +} + +func (e ErrLineBufferOverflow) Error() string { + return fmt.Sprintf("line does not contain newline and is %d bytes too long to buffer (buffer size: %d)", + len(e.Line)-e.BufferSize, e.BufferSize) +} + +// OutputStream represents real time, line by line output from a running Cmd. +// Lines are terminated by a single newline preceded by an optional carriage +// return. Both newline and carriage return are stripped from the line when +// sent to a caller-provided channel. +// +// The caller must begin receiving before starting the Cmd. Write blocks on the +// channel; the caller must always read the channel. The channel is closed when +// the Cmd exits and all output has been sent. +// +// A Cmd in this package uses an OutputStream for both STDOUT and STDERR when +// created by calling NewCmdOptions and Options.Streaming is true. To use +// OutputStream directly with a Go standard library os/exec.Command: +// +// import "os/exec" +// import "github.com/go-cmd/cmd" +// +// stdoutChan := make(chan string, 100) +// go func() { +// for line := range stdoutChan { +// // Do something with the line +// } +// }() +// +// runnableCmd := exec.Command(...) +// stdout := cmd.NewOutputStream(stdoutChan) +// runnableCmd.Stdout = stdout +// +// While runnableCmd is running, lines are sent to the channel as soon as they +// are written and newline-terminated by the command. +type OutputStream struct { + streamChan chan string + bufSize int + buf []byte + lastChar int +} + +// NewOutputStream creates a new streaming output on the given channel. The +// caller must begin receiving on the channel before the command is started. +// The OutputStream never closes the channel. +func NewOutputStream(streamChan chan string) *OutputStream { + out := &OutputStream{ + streamChan: streamChan, + // -- + bufSize: DEFAULT_LINE_BUFFER_SIZE, + buf: make([]byte, DEFAULT_LINE_BUFFER_SIZE), + lastChar: 0, + } + return out +} + +// Write makes OutputStream implement the io.Writer interface. Do not call +// this function directly. +func (rw *OutputStream) Write(p []byte) (n int, err error) { + n = len(p) // end of buffer + firstChar := 0 + + for { + // Find next newline in stream buffer. nextLine starts at 0, but buff + // can contain multiple lines, like "foo\nbar". So in that case nextLine + // will be 0 ("foo\nbar\n") then 4 ("bar\n") on next iteration. And i + // will be 3 and 7, respectively. So lines are [0:3] are [4:7]. + newlineOffset := bytes.IndexByte(p[firstChar:], '\n') + if newlineOffset < 0 { + break // no newline in stream, next line incomplete + } + + // End of line offset is start (nextLine) + newline offset. Like bufio.Scanner, + // we allow \r\n but strip the \r too by decrementing the offset for that byte. + lastChar := firstChar + newlineOffset // "line\n" + if newlineOffset > 0 && p[newlineOffset-1] == '\r' { + lastChar -= 1 // "line\r\n" + } + + // Send the line, prepend line buffer if set + var line string + if rw.lastChar > 0 { + line = decodeBytes(rw.buf[0:rw.lastChar]) + //line = string(rw.buf[0:rw.lastChar]) + rw.lastChar = 0 // reset buffer + } + line += decodeBytes(p[firstChar:lastChar]) + //line += string(p[firstChar:lastChar]) + rw.streamChan <- line // blocks if chan full + + // Next line offset is the first byte (+1) after the newline (i) + firstChar += newlineOffset + 1 + } + + if firstChar < n { + remain := len(p[firstChar:]) + bufFree := len(rw.buf[rw.lastChar:]) + if remain > bufFree { + var line string + if rw.lastChar > 0 { + line = string(rw.buf[0:rw.lastChar]) + } + line += string(p[firstChar:]) + err = ErrLineBufferOverflow{ + Line: line, + BufferSize: rw.bufSize, + BufferFree: bufFree, + } + n = firstChar + return // implicit + } + copy(rw.buf[rw.lastChar:], p[firstChar:]) + rw.lastChar += remain + } + + return // implicit +} + +// Lines returns the channel to which lines are sent. This is the same channel +// passed to NewOutputStream. +func (rw *OutputStream) Lines() <-chan string { + return rw.streamChan +} + +// SetLineBufferSize sets the internal line buffer size. The default is DEFAULT_LINE_BUFFER_SIZE. +// This function must be called immediately after NewOutputStream, and it is not +// safe to call by multiple goroutines. +// +// Increasing the line buffer size can help reduce ErrLineBufferOverflow errors. +func (rw *OutputStream) SetLineBufferSize(n int) { + rw.bufSize = n + rw.buf = make([]byte, rw.bufSize) +} + +// Flush empties the buffer of its last line. +func (rw *OutputStream) Flush() { + if rw.lastChar > 0 { + line := string(rw.buf[0:rw.lastChar]) + rw.streamChan <- line + } +} diff --git a/ncmd/options.go b/ncmd/options.go new file mode 100644 index 0000000..f12dfd9 --- /dev/null +++ b/ncmd/options.go @@ -0,0 +1,41 @@ +package ncmd + +import ( + "git.noahlan.cn/noahlan/ntool/ndef" +) + +func WithOptions(o *Options) Option { + return func(opt *Options) { + opt = o + } +} + +func WithMarshaler(marshaler ndef.Marshaler) Option { + return func(opt *Options) { + opt.Marshaler = marshaler + } +} + +func WithBuffered(v bool) Option { + return func(opt *Options) { + opt.Buffered = v + } +} + +func WithCombinedOutput() Option { + return func(opt *Options) { + opt.CombinedOutput = true + } +} + +func WithStreaming() Option { + return func(opt *Options) { + opt.Streaming = true + } +} + +func WithLineBufferSize(size uint) Option { + return func(opt *Options) { + opt.LineBufferSize = size + } +} diff --git a/nstr/codec.go b/nstr/codec.go index c020066..be1533b 100644 --- a/nstr/codec.go +++ b/nstr/codec.go @@ -69,3 +69,16 @@ func ToGBK(data []byte) ([]byte, error) { } return transBytes, nil } + +func ToGBKStrSafe(data []byte) string { + if utf8.Valid(data) { + return string(data) + } else if IsGBK(data) { + gbkBytes, err := ToGBK(data) + if err != nil { + return "" + } + return string(gbkBytes) + } + return string(data) +} diff --git a/nsys/cmdn/command.go b/nsys/cmdn/command.go deleted file mode 100644 index d0e4900..0000000 --- a/nsys/cmdn/command.go +++ /dev/null @@ -1,18 +0,0 @@ -package cmdn - -type ICommand interface { - // MessageID 消息ID - MessageID() string -} - -// PlainCommand 基于明文传输数据的命令 -type PlainCommand struct { - MID string // 消息ID - ID string - Cmd string - Args []string -} - -func (c *PlainCommand) MessageID() string { - return c.MID -} diff --git a/nsys/cmdn/options.go b/nsys/cmdn/options.go deleted file mode 100644 index b3b1603..0000000 --- a/nsys/cmdn/options.go +++ /dev/null @@ -1,49 +0,0 @@ -package cmdn - -import ( - "git.noahlan.cn/noahlan/ntool/ndef" - "time" -) - -func WithSerializer(serializer ndef.Serializer) Option { - return func(opt *Options) { - opt.Marshaler = &ndef.MarshalerWrapper{Marshaler: serializer} - opt.Unmarshaler = &ndef.UnmarshalerWrapper{Unmarshaler: serializer} - } -} - -func WithMarshaler(marshaler ndef.Marshaler) Option { - return func(opt *Options) { - opt.Marshaler = marshaler - } -} - -func WithUnmarshaler(unmarshaler ndef.Unmarshaler) Option { - return func(opt *Options) { - opt.Unmarshaler = unmarshaler - } -} - -func WithStartupDecidedFunc(startupDecidedFunc LineFunc) Option { - return func(opt *Options) { - opt.StartupDecidedFunc = startupDecidedFunc - } -} - -func WithEndLineDecidedFunc(endLineDecidedFunc LineFunc) Option { - return func(opt *Options) { - opt.EndLineDecidedFunc = endLineDecidedFunc - } -} - -func WithReadIDFunc(readIDFunc ReadIDFunc) Option { - return func(opt *Options) { - opt.ReadIDFunc = readIDFunc - } -} - -func WithTimeout(timeout time.Duration) Option { - return func(opt *Options) { - opt.Timeout = timeout - } -} diff --git a/nsys/cmdn/proc.go b/nsys/cmdn/proc.go deleted file mode 100644 index dd5b9e6..0000000 --- a/nsys/cmdn/proc.go +++ /dev/null @@ -1,364 +0,0 @@ -package cmdn - -import ( - "bufio" - "context" - "errors" - "fmt" - "git.noahlan.cn/noahlan/ntool/ndef" - "git.noahlan.cn/noahlan/ntool/nstr" - atomic2 "git.noahlan.cn/noahlan/ntool/nsys/atomic" - "io" - "log" - "os/exec" - "strings" - "sync" - "sync/atomic" - "time" -) - -const ( - DefaultBlockPrefix = "block" - DefaultNonBlockPrefix = "non-block" - DefaultTimeout = time.Second * 0 -) - -var ( - ErrBrokenPipe = errors.New("broken low-level pipe") - - defaultStartupDecidedFunc = func(_ *strings.Builder, line string) bool { - return true - } - defaultEndLineDecidedFunc = func(sb *strings.Builder, line string) bool { - return true - } - - defaultReadIDFunc = func(serializer ndef.Serializer, data string) (string, error) { - return "", nil - } -) - -type ( - pendingMsg struct { - id string - chWait chan struct{} - callback CallbackFunc - } -) - -type ( - LineFunc func(sb *strings.Builder, line string) bool - ReadIDFunc func(serializer ndef.Serializer, data string) (string, error) - CallbackFunc func(serializer ndef.Serializer, data string) - - Options struct { - Marshaler ndef.Marshaler // 序列化 - Unmarshaler ndef.Unmarshaler // 反序列化 - StartupDecidedFunc LineFunc // 启动决议方法 - EndLineDecidedFunc LineFunc // 行尾判断方法 - ReadIDFunc ReadIDFunc // 从数据中获取ID的方法 - Timeout time.Duration // 超时时间 - } - Option func(opt *Options) -) - -// Processor 处理器 -type Processor struct { - *Options - Context context.Context - cancelFunc context.CancelFunc - - Cmd *exec.Cmd // CMD - stdIn io.WriteCloser // 标准输入通道 - stdOut io.ReadCloser // 标准输出通道 - stdErr io.ReadCloser // 标准错误输出通道,一些程序会在此通道输出错误信息和一般信息 - - isBlock bool // 底层是否为同步逻辑 - isStartup *atomic.Bool // 子进程是否真正启动完毕 - chStart chan struct{} // 程序真正启动完毕信号 - - chSend chan ICommand // 待发送数据 - chWrite chan []byte // 实际发送的数据 - - sendIdx *atomic2.AtomicInt64 // 发送缓冲ID - recIdx *atomic2.AtomicInt64 // 接收缓冲ID - pendingMsgMap map[string]*pendingMsg // 发送缓存区map - - *sync.Mutex -} - -func NewProcessor(block bool, opts ...Option) *Processor { - defaultSerializer := NewPlainSerializer() - tmp := &Processor{ - Options: &Options{ - Marshaler: defaultSerializer, - Unmarshaler: defaultSerializer, - StartupDecidedFunc: defaultStartupDecidedFunc, - EndLineDecidedFunc: defaultEndLineDecidedFunc, - ReadIDFunc: defaultReadIDFunc, - Timeout: DefaultTimeout, - }, - - isBlock: block, - isStartup: &atomic.Bool{}, - chStart: make(chan struct{}), - - chSend: make(chan ICommand, 64), - chWrite: make(chan []byte, 64*8), - sendIdx: atomic2.NewAtomicInt64(), - recIdx: atomic2.NewAtomicInt64(), - pendingMsgMap: make(map[string]*pendingMsg), - Mutex: &sync.Mutex{}, - } - - for _, opt := range opts { - opt(tmp.Options) - } - - if tmp.Timeout == 0 { - tmp.Context, tmp.cancelFunc = context.WithCancel(context.Background()) - } else { - tmp.Context, tmp.cancelFunc = context.WithTimeout(context.Background(), tmp.Timeout) - } - return tmp -} - -func (s *Processor) Run(name string, args ...string) error { - s.Cmd = exec.CommandContext(s.Context, name, args...) - s.stdIn, _ = s.Cmd.StdinPipe() - s.stdOut, _ = s.Cmd.StdoutPipe() - s.stdErr, _ = s.Cmd.StderrPipe() - - setProcessGroupID(s.Cmd) - - err := s.Cmd.Start() - - go func() { - err := s.Cmd.Wait() - if err != nil { - log.Println(fmt.Sprintf("错误:命令行 %+v", err)) - } - _ = s.stdErr.Close() - _ = s.stdIn.Close() - _ = s.stdOut.Close() - }() - s.listen() - - return err -} - -// Listen 开始监听 -func (s *Processor) listen() { - go s.handle(s.stdOut, "stdOut") - go s.handle(s.stdErr, "stdErr") - - // 等待程序启动完毕 - select { - case <-s.chStart: - } - - go s.writeLoop() -} - -func (s *Processor) Stop() error { - s.Lock() - defer s.Unlock() - - s.cancelFunc() - return terminateProcess(s.Cmd.Process.Pid) -} - -// Exec 异步执行命令 -func (s *Processor) Exec(data ICommand, callback CallbackFunc) (err error) { - _, err = s.exec(data, false, callback) - return -} - -// ExecAsync 同步执行命令 -func (s *Processor) ExecAsync(data ICommand, callback CallbackFunc) error { - pm, err := s.exec(data, true, callback) - if err != nil { - return err - } - - // 同步等待消息回复 - <-pm.chWait - - return err -} - -func (s *Processor) exec(data ICommand, withWait bool, callback CallbackFunc) (pm *pendingMsg, err error) { - defer func() { - if e := recover(); e != nil { - err = ErrBrokenPipe - _ = s.sendIdx.DecrementAndGet() - } - }() - - cID := data.MessageID() - if len(cID) == 0 { - if s.isBlock { - // block-1 - cID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.sendIdx.IncrementAndGet()) - } else { - cID = fmt.Sprintf("%s-%s-%d", DefaultNonBlockPrefix, cID, s.sendIdx.IncrementAndGet()) - // error - return nil, errors.New("异步底层必须消息必须传递消息ID") - } - } - - var chWait chan struct{} - if withWait { - chWait = make(chan struct{}) - } - pm = &pendingMsg{ - id: cID, - chWait: chWait, - callback: callback, - } - - s.Lock() - s.pendingMsgMap[pm.id] = pm - s.Unlock() - - s.chSend <- data - return pm, err -} - -func (s *Processor) writeLoop() { - defer func() { - close(s.chSend) - close(s.chWrite) - }() - - for { - select { - case <-s.Context.Done(): - return - case data := <-s.chSend: - var ( - bytes []byte - err error - ) - bytes, err = s.Marshaler.Marshal(data) - if err != nil { - fmt.Println(fmt.Sprintf("序列化失败: %+v", err)) - break - } - s.chWrite <- bytes - case data := <-s.chWrite: - // 实际写入数据 - fmt.Println(fmt.Sprintf("发送数据: [%s]", string(data))) - - data = append(data, '\n') - if _, err := s.stdIn.Write(data); err != nil { - return - } - } - } -} - -func (s *Processor) handle(reader io.ReadCloser, typ string) { - defer func() { - _ = reader.Close() - }() - - buffer := bufio.NewReader(reader) - endLine := false - content := strings.Builder{} - for { - select { - case <-s.Context.Done(): - return - default: - break - } - lineBytes, isPrefix, err := buffer.ReadLine() - if err != nil || err == io.EOF { - fmt.Println(fmt.Sprintf("[%s] 读取数据时发生错误: %v", typ, err)) - break - } - - line, err := s.readBytesString(lineBytes) - if err != nil { - break - } - - if !s.Started() { - fmt.Println(fmt.Sprintf("[%s] 接收普通消息:[%s]", typ, line)) - - // 判断程序成功启动 外部逻辑 - if s.StartupDecidedFunc(&content, line) { - s.storeStarted(true) - s.chStart <- struct{}{} - fmt.Println(fmt.Sprintf("[%s] 启动完毕,等待输出...", typ)) - } - continue - } - - content.WriteString(line) - if !isPrefix && len(line) > 0 { - content.WriteByte('\n') - } - // 最后一行的判定逻辑 - if !endLine && s.EndLineDecidedFunc(&content, line) { - endLine = true - } - - if endLine { - endLine = false - cStr := content.String() - if len(cStr) > 0 { - cStr = cStr[:len(cStr)-1] - } - content.Reset() - - revID, err := s.ReadIDFunc(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr) - if err != nil { - continue - } - //fmt.Println(fmt.Sprintf("[%s] 接收指令消息:[%s]", typ, cStr)) - - // block 需要自行维护ID,是无法透传的 - if len(revID) <= 0 { - revID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.recIdx.IncrementAndGet()) - } - - pending, ok := s.pendingMsgMap[revID] - if !ok { - fmt.Println("找不到已发送数据,无法对应处理数据!") - continue - } - delete(s.pendingMsgMap, revID) - - go func() { - pending.callback(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr) - if pending.chWait != nil { - pending.chWait <- struct{}{} - } - }() - } - } -} - -func (s *Processor) readBytesString(data []byte) (string, error) { - // 编码 - if nstr.Charset(data) == nstr.GBK { - gbk, err := nstr.ToGBK(data) - if err != nil { - return "", err - } - return string(gbk), nil - } else { - return string(data), nil - } -} - -// Started 是否启动完成 -func (s *Processor) Started() bool { - return s.isStartup.Load() -} - -func (s *Processor) storeStarted(val bool) { - s.isStartup.Store(val) -} diff --git a/nsys/cmdn/response.go b/nsys/cmdn/response.go deleted file mode 100644 index 366c74b..0000000 --- a/nsys/cmdn/response.go +++ /dev/null @@ -1,40 +0,0 @@ -package cmdn - -import ( - "errors" - "fmt" - "regexp" - "strings" -) - -type IResponse interface { - // MessageID 消息ID - MessageID() string -} - -type PlainResp struct { - ID string - Command string - Result string - Err error -} - -func (r *PlainResp) MessageID() string { - return r.ID -} - -func (r *PlainResp) GetResult() (string, error) { - reg, _ := regexp.Compile(`\t`) - result := strings.TrimSpace(reg.ReplaceAllString(r.Result, " ")) - - res := strings.Fields(result) - l := len(res) - if l > 0 { - if res[0] == "=" { - return strings.TrimSpace(strings.Join(res[1:], " ")), nil - } else if res[0] == "?" { - return "", errors.New(strings.Join(res[1:], " ")) - } - } - return "", errors.New(fmt.Sprintf("错误(未知应答): %s", r.Err)) -} diff --git a/nsys/cmdn/serializer_plain.go b/nsys/cmdn/serializer_plain.go deleted file mode 100644 index 58d91e1..0000000 --- a/nsys/cmdn/serializer_plain.go +++ /dev/null @@ -1,48 +0,0 @@ -package cmdn - -import ( - "errors" - "fmt" - "git.noahlan.cn/noahlan/ntool/ndef" - "runtime" - "strings" -) - -type PlainSerializer struct { - sysType string -} - -func NewPlainSerializer() ndef.Serializer { - return &PlainSerializer{ - sysType: runtime.GOOS, - } -} - -func (s *PlainSerializer) Marshal(v any) ([]byte, error) { - ret, ok := v.(*PlainCommand) - if !ok { - return nil, errors.New(fmt.Sprintf("参数类型必须为 %T", PlainCommand{})) - } - // ret arg0 arg1 arg2 ... - // cmd arg0 arg1 arg2 ... - sb := strings.Builder{} - if ret.ID != "" { - sb.WriteString(ret.ID) - sb.WriteString(" ") - } - sb.WriteString(ret.Cmd) - sb.WriteString(" ") - sb.WriteString(strings.Join(ret.Args, " ")) - return []byte(sb.String()), nil -} - -func (s *PlainSerializer) Unmarshal(data []byte, v any) error { - t, ok := v.(*PlainResp) - if !ok { - return errors.New(fmt.Sprintf("参数类型必须为 %T", PlainResp{})) - } - t.ID = "" - //t.Command - t.Result = string(data) - return nil -}