refactor: nsys/cmdn -> ncmd

main v1.2.0
NoahLan 11 months ago
parent b01002828e
commit b3872ce052

1
ncmd/.gitignore vendored

@ -0,0 +1 @@
tests

@ -0,0 +1,381 @@
package ncmd
import (
"errors"
"fmt"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nlog"
"io"
"os/exec"
"sync"
"syscall"
"time"
)
var (
ErrNotStarted = errors.New("command not running")
ErrMarshalerNil = errors.New("marshaller is nil but data not []byte type")
)
type (
Options struct {
Marshaler ndef.Marshaler // 序列化
// 缓存 STDOUT 和 STDERR最终写入 Status 内
// 通过 Cmd.Status 获取
Buffered bool
// STDOUT 和 STDERR 写入到 Status.Stdout 中,类似 linux 中的 2>&1
// 注当Streaming开启时此特性无效
CombinedOutput bool
// 流式处理,会将所有数据即时输出
Streaming bool
// LineBufferSize 设置 OutputStream 行缓冲区的大小,默认值 DEFAULT_LINE_BUFFER_SIZE
// 但如果发生 ErrLineBufferOverflow 错误,请尝试增加大小
LineBufferSize uint
// DevMode 开发模式打印log
DevMode bool
}
Option func(opt *Options)
)
// Cmd 处理器
type Cmd struct {
*Options
// Runtime
Cmd *exec.Cmd // CMD
Name string // name of binary(command) to run
Args []string // arguments
Env []string // *optional environment
Dir string // working dir
stdIn io.WriteCloser // 标准输入通道
stdoutStream *OutputStream // 标准输出通道
stderrStream *OutputStream // 标准错误输出通道
// flags
started bool // cmd.Start called, no error
stopped bool // cmd.Stop called
done bool // run() done
final bool // status finalized in Status
doneChan chan struct{} // closed when done running
status Status
statusChan chan Status // nil until Start() called
startTime time.Time
// messages
Stdout chan string // Stdout sets streaming STDOUT if enabled, else nil (see Options).
Stderr chan string // Stderr sets streaming STDERR if enabled, else nil (see Options).
stdoutBuf *OutputBuffer // 标准输出缓冲区
stderrBuf *OutputBuffer // 标准错误缓冲区
chSend chan []byte // 发送的数据缓冲区
*sync.Mutex
}
type Status struct {
Cmd string
PID int
Complete bool // 如果被主动停止或kill该值为false
Exit int // exitCode
Error error // go error
StartTs int64 // 启动时间(纳秒) 0表示未启动(started)
StopTs int64 // 停止时间(纳秒) 0表示未启动(started)或未运行(running)
Runtime float64 // 运行时长
Stdout []string // buffered STDOUT
Stderr []string // buffered STDIN
}
func NewCmd(opts ...Option) *Cmd {
tmp := &Cmd{
Options: &Options{
LineBufferSize: DEFAULT_LINE_BUFFER_SIZE,
Buffered: true,
},
// flags
started: false,
stopped: false,
done: false,
final: false,
doneChan: make(chan struct{}),
status: Status{
Exit: -1,
},
statusChan: nil,
// messages
chSend: make(chan []byte, 1024),
Mutex: &sync.Mutex{},
}
for _, opt := range opts {
opt(tmp.Options)
}
options := tmp.Options
if options.Buffered {
tmp.stdoutBuf = NewOutputBuffer()
tmp.stderrBuf = NewOutputBuffer()
}
if options.CombinedOutput {
tmp.stdoutBuf = NewOutputBuffer()
tmp.stderrBuf = nil
}
if options.Streaming {
tmp.Stdout = make(chan string, DEFAULT_STREAM_CHAN_SIZE)
tmp.stdoutStream = NewOutputStream(tmp.Stdout)
tmp.stdoutStream.SetLineBufferSize(int(options.LineBufferSize))
tmp.Stderr = make(chan string, DEFAULT_STREAM_CHAN_SIZE)
tmp.stderrStream = NewOutputStream(tmp.Stderr)
tmp.stderrStream.SetLineBufferSize(int(options.LineBufferSize))
}
return tmp
}
func (c *Cmd) Start(name string, args ...string) <-chan Status {
c.Name = name
c.Args = args
c.status.Cmd = name
c.Lock()
defer c.Unlock()
if c.statusChan != nil {
return c.statusChan
}
c.statusChan = make(chan Status, 1)
go c.run()
return c.statusChan
}
func (c *Cmd) Stop() error {
c.Lock()
defer c.Unlock()
if c.statusChan == nil || !c.started {
return ErrNotStarted
}
// 如果 done 为true表示程序已经执行完毕无需再Stop
if c.done {
return nil
}
c.stopped = true
// Signal the process group (-pid), not just the process, so that the process
// and all its children are signaled. Else, child procs can keep running and
// keep the stdout/stderr fd open and cause cmd.Wait to hang.
return terminateProcess(c.Cmd.Process.Pid)
}
func (c *Cmd) Status() Status {
c.Lock()
defer c.Unlock()
// Return default status if cmd hasn't been started
if c.statusChan == nil || !c.started {
return c.status
}
if c.done {
// No longer running
if !c.final {
if c.stdoutBuf != nil {
c.status.Stdout = c.stdoutBuf.Lines()
c.stdoutBuf = nil // release buffers
}
if c.stderrBuf != nil {
c.status.Stderr = c.stderrBuf.Lines()
c.stderrBuf = nil // release buffers
}
c.final = true
}
} else {
// Still running
c.status.Runtime = time.Now().Sub(c.startTime).Seconds()
if c.stdoutBuf != nil {
c.status.Stdout = c.stdoutBuf.Lines()
}
if c.stderrBuf != nil {
c.status.Stderr = c.stderrBuf.Lines()
}
}
return c.status
}
// Done 返回一个在Cmd停止运行时关闭的通道。
// 等待所有的 goroutine 完成执行
// 命令完成后调用 Status 来获取其最终状态
func (c *Cmd) Done() <-chan struct{} {
return c.doneChan
}
func (c *Cmd) run() {
defer func() {
c.statusChan <- c.Status()
close(c.doneChan)
}()
cmd := exec.Command(c.Name, c.Args...)
c.Cmd = cmd
setProcessGroupID(cmd)
switch {
case c.stdoutBuf != nil && c.stderrBuf != nil && c.stdoutStream != nil: // buffer and stream
cmd.Stdout = io.MultiWriter(c.stdoutStream, c.stdoutBuf)
cmd.Stderr = io.MultiWriter(c.stderrStream, c.stderrBuf)
case c.stdoutBuf != nil && c.stderrBuf == nil && c.stdoutStream != nil: // combined buffer and stream
cmd.Stdout = io.MultiWriter(c.stdoutStream, c.stdoutBuf)
cmd.Stderr = io.MultiWriter(c.stderrStream, c.stdoutBuf)
case c.stdoutBuf != nil && c.stderrBuf != nil: // buffer only
cmd.Stdout = c.stdoutBuf
cmd.Stderr = c.stderrBuf
case c.stdoutBuf != nil && c.stderrBuf == nil: // buffer combining stderr into stdout
cmd.Stdout = c.stdoutBuf
cmd.Stderr = c.stdoutBuf
case c.stdoutStream != nil: // stream only
cmd.Stdout = c.stdoutStream
cmd.Stderr = c.stderrStream
default: // no output (cmd >/dev/null 2>&1)
cmd.Stdout = nil
cmd.Stderr = nil
}
c.stdIn, _ = cmd.StdinPipe()
c.Env = cmd.Env
c.Dir = cmd.Dir
// Always close output streams. Do not do this after Wait because if Start
// fails and we return without closing these, it could deadlock the caller
// who's waiting for us to close them.
if c.stdoutStream != nil {
defer func() {
c.stdoutStream.Flush()
c.stderrStream.Flush()
// exec.Cmd.Wait has already waited for all output:
// Otherwise, during the execution of the command a separate goroutine
// reads from the process over a pipe and delivers that data to the
// corresponding Writer. In this case, Wait does not complete until the
// goroutine reaches EOF or encounters an error.
// from https://golang.org/pkg/os/exec/#Cmd
close(c.Stdout)
close(c.Stderr)
}()
}
now := time.Now()
if err := cmd.Start(); err != nil {
c.Lock()
c.status.Error = err
c.status.StartTs = now.UnixNano()
c.status.StopTs = time.Now().UnixNano()
c.done = true
c.Unlock()
return
}
// Set initial status
c.Lock()
c.startTime = now // running
c.status.PID = cmd.Process.Pid
c.status.StartTs = now.UnixNano()
c.started = true
c.Unlock()
// inner write
go c.writeLoop()
err := cmd.Wait()
now = time.Now()
// Get exit code of the command. According to the manual, Wait() returns:
// "If the command fails to run or doesn't complete successfully, the error
// is of type *ExitError. Other error types may be returned for I/O problems."
exitCode := 0
signaled := false
if err != nil && fmt.Sprintf("%T", err) == "*exec.ExitError" {
// This is the normal case which is not really an error. It's string
// representation is only "*exec.ExitError". It only means the cmd
// did not exit zero and caller should see ExitError.Stderr, which
// we already have. So first we'll have this as the real/underlying
// type, then discard err so status.Error doesn't contain a useless
// "*exec.ExitError". With the real type we can get the non-zero
// exit code and determine if the process was signaled, which yields
// a more specific error message, so we set err again in that case.
exiterr := err.(*exec.ExitError)
err = nil
if waitStatus, ok := exiterr.Sys().(syscall.WaitStatus); ok {
exitCode = waitStatus.ExitStatus() // -1 if signaled
if waitStatus.Signaled() {
signaled = true
err = errors.New(exiterr.Error()) // "signal: terminated"
}
}
}
// Set final status
c.Lock()
if !c.stopped && !signaled {
c.status.Complete = true
}
c.status.Runtime = now.Sub(c.startTime).Seconds()
c.status.StopTs = now.UnixNano()
c.status.Exit = exitCode
c.status.Error = err
c.done = true
c.Unlock()
}
// Send 发送指令
func (c *Cmd) Send(data any) (err error) {
var bytes []byte
if c.Marshaler != nil {
if bs, ok := data.([]byte); ok {
bytes = bs
} else {
return ErrMarshalerNil
}
}
bytes, err = c.Marshaler.Marshal(data)
if err != nil {
nlog.Errorf("marshal data [%v] err %v", data, err)
return err
}
c.chSend <- bytes
return nil
}
func (c *Cmd) writeLoop() {
defer func() {
close(c.chSend)
_ = c.stdIn.Close()
}()
for {
select {
case <-c.Done():
return
case data := <-c.chSend:
// 实际写入数据
if c.DevMode {
nlog.Debugf("发送数据: [%s]", string(data))
}
data = append(data, '\n')
if _, err := c.stdIn.Write(data); err != nil {
return
}
}
}
}

@ -1,4 +1,4 @@
package cmdn package ncmd
import ( import (
"os/exec" "os/exec"
@ -16,6 +16,9 @@ func terminateProcess(pid int) error {
return syscall.Kill(-pid, syscall.SIGTERM) return syscall.Kill(-pid, syscall.SIGTERM)
} }
// Set process group ID so the cmd and all its children become a new
// process group. This allows Stop to SIGTERM the cmd's process group
// without killing this process (i.e. this code here).
func setProcessGroupID(cmd *exec.Cmd) { func setProcessGroupID(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{Setgpid: true} cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
} }

@ -1,4 +1,4 @@
package cmdn package ncmd
import ( import (
"os/exec" "os/exec"
@ -16,6 +16,9 @@ func terminateProcess(pid int) error {
return syscall.Kill(-pid, syscall.SIGTERM) return syscall.Kill(-pid, syscall.SIGTERM)
} }
// Set process group ID so the cmd and all its children become a new
// process group. This allows Stop to SIGTERM the cmd's process group
// without killing this process (i.e. this code here).
func setProcessGroupID(cmd *exec.Cmd) { func setProcessGroupID(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{Setgpid: true} cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
} }

@ -1,4 +1,4 @@
package cmdn package ncmd
import ( import (
"os/exec" "os/exec"
@ -16,6 +16,9 @@ func terminateProcess(pid int) error {
return syscall.Kill(-pid, syscall.SIGTERM) return syscall.Kill(-pid, syscall.SIGTERM)
} }
// Set process group ID so the cmd and all its children become a new
// process group. This allows Stop to SIGTERM the cmd's process group
// without killing this process (i.e. this code here).
func setProcessGroupID(cmd *exec.Cmd) { func setProcessGroupID(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
} }

@ -1,4 +1,4 @@
package cmdn package ncmd
import ( import (
"os" "os"

@ -0,0 +1,7 @@
package ncmd
import "git.noahlan.cn/noahlan/ntool/nstr"
func decodeBytes(bs []byte) string {
return nstr.ToGBKStrSafe(bs)
}

@ -0,0 +1,239 @@
package ncmd
import (
"bufio"
"bytes"
"fmt"
"sync"
)
// os/exec.Cmd.StdoutPipe is usually used incorrectly. The docs are clear:
// "it is incorrect to call Wait before all reads from the pipe have completed."
// Therefore, we can't read from the pipe in another goroutine because it
// causes a race condition: we'll read in one goroutine and the original
// goroutine that calls Wait will write on close which is what Wait does.
// The proper solution is using an io.Writer for cmd.Stdout. I couldn't find
// an io.Writer that's also safe for concurrent reads (as lines in a []string
// no less), so I created one:
// OutputBuffer represents command output that is saved, line by line, in an
// unbounded buffer. It is safe for multiple goroutines to read while the command
// is running and after it has finished. If output is small (a few megabytes)
// and not read frequently, an output buffer is a good solution.
//
// A Cmd in this package uses an OutputBuffer for both STDOUT and STDERR by
// default when created by calling NewCmd. To use OutputBuffer directly with
// a Go standard library os/exec.Command:
//
// import "os/exec"
// import "github.com/go-cmd/cmd"
// runnableCmd := exec.Command(...)
// stdout := cmd.NewOutputBuffer()
// runnableCmd.Stdout = stdout
//
// While runnableCmd is running, call stdout.Lines() to read all output
// currently written.
type OutputBuffer struct {
buf *bytes.Buffer
lines []string
*sync.Mutex
}
// NewOutputBuffer creates a new output buffer. The buffer is unbounded and safe
// for multiple goroutines to read while the command is running by calling Lines.
func NewOutputBuffer() *OutputBuffer {
out := &OutputBuffer{
buf: &bytes.Buffer{},
lines: []string{},
Mutex: &sync.Mutex{},
}
return out
}
// Write makes OutputBuffer implement the io.Writer interface. Do not call
// this function directly.
func (rw *OutputBuffer) Write(p []byte) (n int, err error) {
rw.Lock()
n, err = rw.buf.Write(p) // and bytes.Buffer implements io.Writer
rw.Unlock()
return // implicit
}
// Lines returns lines of output written by the Cmd. It is safe to call while
// the Cmd is running and after it has finished. Subsequent calls returns more
// lines, if more lines were written. "\r\n" are stripped from the lines.
func (rw *OutputBuffer) Lines() []string {
rw.Lock()
// Scanners are io.Readers which effectively destroy the buffer by reading
// to EOF. So once we scan the buf to lines, the buf is empty again.
s := bufio.NewScanner(rw.buf)
for s.Scan() {
rw.lines = append(rw.lines, decodeBytes(s.Bytes()))
}
rw.Unlock()
return rw.lines
}
const (
// DEFAULT_LINE_BUFFER_SIZE is the default size of the OutputStream line buffer.
// The default value is usually sufficient, but if ErrLineBufferOverflow errors
// occur, try increasing the size by calling OutputBuffer.SetLineBufferSize.
DEFAULT_LINE_BUFFER_SIZE = 16384
// DEFAULT_STREAM_CHAN_SIZE is the default string channel size for a Cmd when
// Options.Streaming is true. The string channel size can have a minor
// performance impact if too small by causing OutputStream.Write to block
// excessively.
DEFAULT_STREAM_CHAN_SIZE = 1000
)
// ErrLineBufferOverflow is returned by OutputStream.Write when the internal
// line buffer is filled before a newline character is written to terminate a
// line. Increasing the line buffer size by calling OutputStream.SetLineBufferSize
// can help prevent this error.
type ErrLineBufferOverflow struct {
Line string // Unterminated line that caused the error
BufferSize int // Internal line buffer size
BufferFree int // Free bytes in line buffer
}
func (e ErrLineBufferOverflow) Error() string {
return fmt.Sprintf("line does not contain newline and is %d bytes too long to buffer (buffer size: %d)",
len(e.Line)-e.BufferSize, e.BufferSize)
}
// OutputStream represents real time, line by line output from a running Cmd.
// Lines are terminated by a single newline preceded by an optional carriage
// return. Both newline and carriage return are stripped from the line when
// sent to a caller-provided channel.
//
// The caller must begin receiving before starting the Cmd. Write blocks on the
// channel; the caller must always read the channel. The channel is closed when
// the Cmd exits and all output has been sent.
//
// A Cmd in this package uses an OutputStream for both STDOUT and STDERR when
// created by calling NewCmdOptions and Options.Streaming is true. To use
// OutputStream directly with a Go standard library os/exec.Command:
//
// import "os/exec"
// import "github.com/go-cmd/cmd"
//
// stdoutChan := make(chan string, 100)
// go func() {
// for line := range stdoutChan {
// // Do something with the line
// }
// }()
//
// runnableCmd := exec.Command(...)
// stdout := cmd.NewOutputStream(stdoutChan)
// runnableCmd.Stdout = stdout
//
// While runnableCmd is running, lines are sent to the channel as soon as they
// are written and newline-terminated by the command.
type OutputStream struct {
streamChan chan string
bufSize int
buf []byte
lastChar int
}
// NewOutputStream creates a new streaming output on the given channel. The
// caller must begin receiving on the channel before the command is started.
// The OutputStream never closes the channel.
func NewOutputStream(streamChan chan string) *OutputStream {
out := &OutputStream{
streamChan: streamChan,
// --
bufSize: DEFAULT_LINE_BUFFER_SIZE,
buf: make([]byte, DEFAULT_LINE_BUFFER_SIZE),
lastChar: 0,
}
return out
}
// Write makes OutputStream implement the io.Writer interface. Do not call
// this function directly.
func (rw *OutputStream) Write(p []byte) (n int, err error) {
n = len(p) // end of buffer
firstChar := 0
for {
// Find next newline in stream buffer. nextLine starts at 0, but buff
// can contain multiple lines, like "foo\nbar". So in that case nextLine
// will be 0 ("foo\nbar\n") then 4 ("bar\n") on next iteration. And i
// will be 3 and 7, respectively. So lines are [0:3] are [4:7].
newlineOffset := bytes.IndexByte(p[firstChar:], '\n')
if newlineOffset < 0 {
break // no newline in stream, next line incomplete
}
// End of line offset is start (nextLine) + newline offset. Like bufio.Scanner,
// we allow \r\n but strip the \r too by decrementing the offset for that byte.
lastChar := firstChar + newlineOffset // "line\n"
if newlineOffset > 0 && p[newlineOffset-1] == '\r' {
lastChar -= 1 // "line\r\n"
}
// Send the line, prepend line buffer if set
var line string
if rw.lastChar > 0 {
line = decodeBytes(rw.buf[0:rw.lastChar])
//line = string(rw.buf[0:rw.lastChar])
rw.lastChar = 0 // reset buffer
}
line += decodeBytes(p[firstChar:lastChar])
//line += string(p[firstChar:lastChar])
rw.streamChan <- line // blocks if chan full
// Next line offset is the first byte (+1) after the newline (i)
firstChar += newlineOffset + 1
}
if firstChar < n {
remain := len(p[firstChar:])
bufFree := len(rw.buf[rw.lastChar:])
if remain > bufFree {
var line string
if rw.lastChar > 0 {
line = string(rw.buf[0:rw.lastChar])
}
line += string(p[firstChar:])
err = ErrLineBufferOverflow{
Line: line,
BufferSize: rw.bufSize,
BufferFree: bufFree,
}
n = firstChar
return // implicit
}
copy(rw.buf[rw.lastChar:], p[firstChar:])
rw.lastChar += remain
}
return // implicit
}
// Lines returns the channel to which lines are sent. This is the same channel
// passed to NewOutputStream.
func (rw *OutputStream) Lines() <-chan string {
return rw.streamChan
}
// SetLineBufferSize sets the internal line buffer size. The default is DEFAULT_LINE_BUFFER_SIZE.
// This function must be called immediately after NewOutputStream, and it is not
// safe to call by multiple goroutines.
//
// Increasing the line buffer size can help reduce ErrLineBufferOverflow errors.
func (rw *OutputStream) SetLineBufferSize(n int) {
rw.bufSize = n
rw.buf = make([]byte, rw.bufSize)
}
// Flush empties the buffer of its last line.
func (rw *OutputStream) Flush() {
if rw.lastChar > 0 {
line := string(rw.buf[0:rw.lastChar])
rw.streamChan <- line
}
}

@ -0,0 +1,41 @@
package ncmd
import (
"git.noahlan.cn/noahlan/ntool/ndef"
)
func WithOptions(o *Options) Option {
return func(opt *Options) {
opt = o
}
}
func WithMarshaler(marshaler ndef.Marshaler) Option {
return func(opt *Options) {
opt.Marshaler = marshaler
}
}
func WithBuffered(v bool) Option {
return func(opt *Options) {
opt.Buffered = v
}
}
func WithCombinedOutput() Option {
return func(opt *Options) {
opt.CombinedOutput = true
}
}
func WithStreaming() Option {
return func(opt *Options) {
opt.Streaming = true
}
}
func WithLineBufferSize(size uint) Option {
return func(opt *Options) {
opt.LineBufferSize = size
}
}

@ -69,3 +69,16 @@ func ToGBK(data []byte) ([]byte, error) {
} }
return transBytes, nil return transBytes, nil
} }
func ToGBKStrSafe(data []byte) string {
if utf8.Valid(data) {
return string(data)
} else if IsGBK(data) {
gbkBytes, err := ToGBK(data)
if err != nil {
return ""
}
return string(gbkBytes)
}
return string(data)
}

@ -1,18 +0,0 @@
package cmdn
type ICommand interface {
// MessageID 消息ID
MessageID() string
}
// PlainCommand 基于明文传输数据的命令
type PlainCommand struct {
MID string // 消息ID
ID string
Cmd string
Args []string
}
func (c *PlainCommand) MessageID() string {
return c.MID
}

@ -1,49 +0,0 @@
package cmdn
import (
"git.noahlan.cn/noahlan/ntool/ndef"
"time"
)
func WithSerializer(serializer ndef.Serializer) Option {
return func(opt *Options) {
opt.Marshaler = &ndef.MarshalerWrapper{Marshaler: serializer}
opt.Unmarshaler = &ndef.UnmarshalerWrapper{Unmarshaler: serializer}
}
}
func WithMarshaler(marshaler ndef.Marshaler) Option {
return func(opt *Options) {
opt.Marshaler = marshaler
}
}
func WithUnmarshaler(unmarshaler ndef.Unmarshaler) Option {
return func(opt *Options) {
opt.Unmarshaler = unmarshaler
}
}
func WithStartupDecidedFunc(startupDecidedFunc LineFunc) Option {
return func(opt *Options) {
opt.StartupDecidedFunc = startupDecidedFunc
}
}
func WithEndLineDecidedFunc(endLineDecidedFunc LineFunc) Option {
return func(opt *Options) {
opt.EndLineDecidedFunc = endLineDecidedFunc
}
}
func WithReadIDFunc(readIDFunc ReadIDFunc) Option {
return func(opt *Options) {
opt.ReadIDFunc = readIDFunc
}
}
func WithTimeout(timeout time.Duration) Option {
return func(opt *Options) {
opt.Timeout = timeout
}
}

@ -1,364 +0,0 @@
package cmdn
import (
"bufio"
"context"
"errors"
"fmt"
"git.noahlan.cn/noahlan/ntool/ndef"
"git.noahlan.cn/noahlan/ntool/nstr"
atomic2 "git.noahlan.cn/noahlan/ntool/nsys/atomic"
"io"
"log"
"os/exec"
"strings"
"sync"
"sync/atomic"
"time"
)
const (
DefaultBlockPrefix = "block"
DefaultNonBlockPrefix = "non-block"
DefaultTimeout = time.Second * 0
)
var (
ErrBrokenPipe = errors.New("broken low-level pipe")
defaultStartupDecidedFunc = func(_ *strings.Builder, line string) bool {
return true
}
defaultEndLineDecidedFunc = func(sb *strings.Builder, line string) bool {
return true
}
defaultReadIDFunc = func(serializer ndef.Serializer, data string) (string, error) {
return "", nil
}
)
type (
pendingMsg struct {
id string
chWait chan struct{}
callback CallbackFunc
}
)
type (
LineFunc func(sb *strings.Builder, line string) bool
ReadIDFunc func(serializer ndef.Serializer, data string) (string, error)
CallbackFunc func(serializer ndef.Serializer, data string)
Options struct {
Marshaler ndef.Marshaler // 序列化
Unmarshaler ndef.Unmarshaler // 反序列化
StartupDecidedFunc LineFunc // 启动决议方法
EndLineDecidedFunc LineFunc // 行尾判断方法
ReadIDFunc ReadIDFunc // 从数据中获取ID的方法
Timeout time.Duration // 超时时间
}
Option func(opt *Options)
)
// Processor 处理器
type Processor struct {
*Options
Context context.Context
cancelFunc context.CancelFunc
Cmd *exec.Cmd // CMD
stdIn io.WriteCloser // 标准输入通道
stdOut io.ReadCloser // 标准输出通道
stdErr io.ReadCloser // 标准错误输出通道,一些程序会在此通道输出错误信息和一般信息
isBlock bool // 底层是否为同步逻辑
isStartup *atomic.Bool // 子进程是否真正启动完毕
chStart chan struct{} // 程序真正启动完毕信号
chSend chan ICommand // 待发送数据
chWrite chan []byte // 实际发送的数据
sendIdx *atomic2.AtomicInt64 // 发送缓冲ID
recIdx *atomic2.AtomicInt64 // 接收缓冲ID
pendingMsgMap map[string]*pendingMsg // 发送缓存区map
*sync.Mutex
}
func NewProcessor(block bool, opts ...Option) *Processor {
defaultSerializer := NewPlainSerializer()
tmp := &Processor{
Options: &Options{
Marshaler: defaultSerializer,
Unmarshaler: defaultSerializer,
StartupDecidedFunc: defaultStartupDecidedFunc,
EndLineDecidedFunc: defaultEndLineDecidedFunc,
ReadIDFunc: defaultReadIDFunc,
Timeout: DefaultTimeout,
},
isBlock: block,
isStartup: &atomic.Bool{},
chStart: make(chan struct{}),
chSend: make(chan ICommand, 64),
chWrite: make(chan []byte, 64*8),
sendIdx: atomic2.NewAtomicInt64(),
recIdx: atomic2.NewAtomicInt64(),
pendingMsgMap: make(map[string]*pendingMsg),
Mutex: &sync.Mutex{},
}
for _, opt := range opts {
opt(tmp.Options)
}
if tmp.Timeout == 0 {
tmp.Context, tmp.cancelFunc = context.WithCancel(context.Background())
} else {
tmp.Context, tmp.cancelFunc = context.WithTimeout(context.Background(), tmp.Timeout)
}
return tmp
}
func (s *Processor) Run(name string, args ...string) error {
s.Cmd = exec.CommandContext(s.Context, name, args...)
s.stdIn, _ = s.Cmd.StdinPipe()
s.stdOut, _ = s.Cmd.StdoutPipe()
s.stdErr, _ = s.Cmd.StderrPipe()
setProcessGroupID(s.Cmd)
err := s.Cmd.Start()
go func() {
err := s.Cmd.Wait()
if err != nil {
log.Println(fmt.Sprintf("错误:命令行 %+v", err))
}
_ = s.stdErr.Close()
_ = s.stdIn.Close()
_ = s.stdOut.Close()
}()
s.listen()
return err
}
// Listen 开始监听
func (s *Processor) listen() {
go s.handle(s.stdOut, "stdOut")
go s.handle(s.stdErr, "stdErr")
// 等待程序启动完毕
select {
case <-s.chStart:
}
go s.writeLoop()
}
func (s *Processor) Stop() error {
s.Lock()
defer s.Unlock()
s.cancelFunc()
return terminateProcess(s.Cmd.Process.Pid)
}
// Exec 异步执行命令
func (s *Processor) Exec(data ICommand, callback CallbackFunc) (err error) {
_, err = s.exec(data, false, callback)
return
}
// ExecAsync 同步执行命令
func (s *Processor) ExecAsync(data ICommand, callback CallbackFunc) error {
pm, err := s.exec(data, true, callback)
if err != nil {
return err
}
// 同步等待消息回复
<-pm.chWait
return err
}
func (s *Processor) exec(data ICommand, withWait bool, callback CallbackFunc) (pm *pendingMsg, err error) {
defer func() {
if e := recover(); e != nil {
err = ErrBrokenPipe
_ = s.sendIdx.DecrementAndGet()
}
}()
cID := data.MessageID()
if len(cID) == 0 {
if s.isBlock {
// block-1
cID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.sendIdx.IncrementAndGet())
} else {
cID = fmt.Sprintf("%s-%s-%d", DefaultNonBlockPrefix, cID, s.sendIdx.IncrementAndGet())
// error
return nil, errors.New("异步底层必须消息必须传递消息ID")
}
}
var chWait chan struct{}
if withWait {
chWait = make(chan struct{})
}
pm = &pendingMsg{
id: cID,
chWait: chWait,
callback: callback,
}
s.Lock()
s.pendingMsgMap[pm.id] = pm
s.Unlock()
s.chSend <- data
return pm, err
}
func (s *Processor) writeLoop() {
defer func() {
close(s.chSend)
close(s.chWrite)
}()
for {
select {
case <-s.Context.Done():
return
case data := <-s.chSend:
var (
bytes []byte
err error
)
bytes, err = s.Marshaler.Marshal(data)
if err != nil {
fmt.Println(fmt.Sprintf("序列化失败: %+v", err))
break
}
s.chWrite <- bytes
case data := <-s.chWrite:
// 实际写入数据
fmt.Println(fmt.Sprintf("发送数据: [%s]", string(data)))
data = append(data, '\n')
if _, err := s.stdIn.Write(data); err != nil {
return
}
}
}
}
func (s *Processor) handle(reader io.ReadCloser, typ string) {
defer func() {
_ = reader.Close()
}()
buffer := bufio.NewReader(reader)
endLine := false
content := strings.Builder{}
for {
select {
case <-s.Context.Done():
return
default:
break
}
lineBytes, isPrefix, err := buffer.ReadLine()
if err != nil || err == io.EOF {
fmt.Println(fmt.Sprintf("[%s] 读取数据时发生错误: %v", typ, err))
break
}
line, err := s.readBytesString(lineBytes)
if err != nil {
break
}
if !s.Started() {
fmt.Println(fmt.Sprintf("[%s] 接收普通消息:[%s]", typ, line))
// 判断程序成功启动 外部逻辑
if s.StartupDecidedFunc(&content, line) {
s.storeStarted(true)
s.chStart <- struct{}{}
fmt.Println(fmt.Sprintf("[%s] 启动完毕,等待输出...", typ))
}
continue
}
content.WriteString(line)
if !isPrefix && len(line) > 0 {
content.WriteByte('\n')
}
// 最后一行的判定逻辑
if !endLine && s.EndLineDecidedFunc(&content, line) {
endLine = true
}
if endLine {
endLine = false
cStr := content.String()
if len(cStr) > 0 {
cStr = cStr[:len(cStr)-1]
}
content.Reset()
revID, err := s.ReadIDFunc(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr)
if err != nil {
continue
}
//fmt.Println(fmt.Sprintf("[%s] 接收指令消息:[%s]", typ, cStr))
// block 需要自行维护ID是无法透传的
if len(revID) <= 0 {
revID = fmt.Sprintf("%s-%d", DefaultBlockPrefix, s.recIdx.IncrementAndGet())
}
pending, ok := s.pendingMsgMap[revID]
if !ok {
fmt.Println("找不到已发送数据,无法对应处理数据!")
continue
}
delete(s.pendingMsgMap, revID)
go func() {
pending.callback(ndef.NewSerializerWrapper(s.Marshaler, s.Unmarshaler), cStr)
if pending.chWait != nil {
pending.chWait <- struct{}{}
}
}()
}
}
}
func (s *Processor) readBytesString(data []byte) (string, error) {
// 编码
if nstr.Charset(data) == nstr.GBK {
gbk, err := nstr.ToGBK(data)
if err != nil {
return "", err
}
return string(gbk), nil
} else {
return string(data), nil
}
}
// Started 是否启动完成
func (s *Processor) Started() bool {
return s.isStartup.Load()
}
func (s *Processor) storeStarted(val bool) {
s.isStartup.Store(val)
}

@ -1,40 +0,0 @@
package cmdn
import (
"errors"
"fmt"
"regexp"
"strings"
)
type IResponse interface {
// MessageID 消息ID
MessageID() string
}
type PlainResp struct {
ID string
Command string
Result string
Err error
}
func (r *PlainResp) MessageID() string {
return r.ID
}
func (r *PlainResp) GetResult() (string, error) {
reg, _ := regexp.Compile(`\t`)
result := strings.TrimSpace(reg.ReplaceAllString(r.Result, " "))
res := strings.Fields(result)
l := len(res)
if l > 0 {
if res[0] == "=" {
return strings.TrimSpace(strings.Join(res[1:], " ")), nil
} else if res[0] == "?" {
return "", errors.New(strings.Join(res[1:], " "))
}
}
return "", errors.New(fmt.Sprintf("错误(未知应答): %s", r.Err))
}

@ -1,48 +0,0 @@
package cmdn
import (
"errors"
"fmt"
"git.noahlan.cn/noahlan/ntool/ndef"
"runtime"
"strings"
)
type PlainSerializer struct {
sysType string
}
func NewPlainSerializer() ndef.Serializer {
return &PlainSerializer{
sysType: runtime.GOOS,
}
}
func (s *PlainSerializer) Marshal(v any) ([]byte, error) {
ret, ok := v.(*PlainCommand)
if !ok {
return nil, errors.New(fmt.Sprintf("参数类型必须为 %T", PlainCommand{}))
}
// ret arg0 arg1 arg2 ...
// cmd arg0 arg1 arg2 ...
sb := strings.Builder{}
if ret.ID != "" {
sb.WriteString(ret.ID)
sb.WriteString(" ")
}
sb.WriteString(ret.Cmd)
sb.WriteString(" ")
sb.WriteString(strings.Join(ret.Args, " "))
return []byte(sb.String()), nil
}
func (s *PlainSerializer) Unmarshal(data []byte, v any) error {
t, ok := v.(*PlainResp)
if !ok {
return errors.New(fmt.Sprintf("参数类型必须为 %T", PlainResp{}))
}
t.ID = ""
//t.Command
t.Result = string(data)
return nil
}
Loading…
Cancel
Save