You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ntool/ncmd/io.go

240 lines
8.0 KiB
Go

package ncmd
import (
"bufio"
"bytes"
"fmt"
"sync"
)
// os/exec.Cmd.StdoutPipe is usually used incorrectly. The docs are clear:
// "it is incorrect to call Wait before all reads from the pipe have completed."
// Therefore, we can't read from the pipe in another goroutine because it
// causes a race condition: we'll read in one goroutine and the original
// goroutine that calls Wait will write on close which is what Wait does.
// The proper solution is using an io.Writer for cmd.Stdout. I couldn't find
// an io.Writer that's also safe for concurrent reads (as lines in a []string
// no less), so I created one:
// OutputBuffer represents command output that is saved, line by line, in an
// unbounded buffer. It is safe for multiple goroutines to read while the command
// is running and after it has finished. If output is small (a few megabytes)
// and not read frequently, an output buffer is a good solution.
//
// A Cmd in this package uses an OutputBuffer for both STDOUT and STDERR by
// default when created by calling NewCmd. To use OutputBuffer directly with
// a Go standard library os/exec.Command:
//
// import "os/exec"
// import "github.com/go-cmd/cmd"
// runnableCmd := exec.Command(...)
// stdout := cmd.NewOutputBuffer()
// runnableCmd.Stdout = stdout
//
// While runnableCmd is running, call stdout.Lines() to read all output
// currently written.
type OutputBuffer struct {
buf *bytes.Buffer
lines []string
*sync.Mutex
}
// NewOutputBuffer creates a new output buffer. The buffer is unbounded and safe
// for multiple goroutines to read while the command is running by calling Lines.
func NewOutputBuffer() *OutputBuffer {
out := &OutputBuffer{
buf: &bytes.Buffer{},
lines: []string{},
Mutex: &sync.Mutex{},
}
return out
}
// Write makes OutputBuffer implement the io.Writer interface. Do not call
// this function directly.
func (rw *OutputBuffer) Write(p []byte) (n int, err error) {
rw.Lock()
n, err = rw.buf.Write(p) // and bytes.Buffer implements io.Writer
rw.Unlock()
return // implicit
}
// Lines returns lines of output written by the Cmd. It is safe to call while
// the Cmd is running and after it has finished. Subsequent calls returns more
// lines, if more lines were written. "\r\n" are stripped from the lines.
func (rw *OutputBuffer) Lines() []string {
rw.Lock()
// Scanners are io.Readers which effectively destroy the buffer by reading
// to EOF. So once we scan the buf to lines, the buf is empty again.
s := bufio.NewScanner(rw.buf)
for s.Scan() {
rw.lines = append(rw.lines, decodeBytes(s.Bytes()))
}
rw.Unlock()
return rw.lines
}
const (
// DEFAULT_LINE_BUFFER_SIZE is the default size of the OutputStream line buffer.
// The default value is usually sufficient, but if ErrLineBufferOverflow errors
// occur, try increasing the size by calling OutputBuffer.SetLineBufferSize.
DEFAULT_LINE_BUFFER_SIZE = 16384
// DEFAULT_STREAM_CHAN_SIZE is the default string channel size for a Cmd when
// Options.Streaming is true. The string channel size can have a minor
// performance impact if too small by causing OutputStream.Write to block
// excessively.
DEFAULT_STREAM_CHAN_SIZE = 1000
)
// ErrLineBufferOverflow is returned by OutputStream.Write when the internal
// line buffer is filled before a newline character is written to terminate a
// line. Increasing the line buffer size by calling OutputStream.SetLineBufferSize
// can help prevent this error.
type ErrLineBufferOverflow struct {
Line string // Unterminated line that caused the error
BufferSize int // Internal line buffer size
BufferFree int // Free bytes in line buffer
}
func (e ErrLineBufferOverflow) Error() string {
return fmt.Sprintf("line does not contain newline and is %d bytes too long to buffer (buffer size: %d)",
len(e.Line)-e.BufferSize, e.BufferSize)
}
// OutputStream represents real time, line by line output from a running Cmd.
// Lines are terminated by a single newline preceded by an optional carriage
// return. Both newline and carriage return are stripped from the line when
// sent to a caller-provided channel.
//
// The caller must begin receiving before starting the Cmd. Write blocks on the
// channel; the caller must always read the channel. The channel is closed when
// the Cmd exits and all output has been sent.
//
// A Cmd in this package uses an OutputStream for both STDOUT and STDERR when
// created by calling NewCmdOptions and Options.Streaming is true. To use
// OutputStream directly with a Go standard library os/exec.Command:
//
// import "os/exec"
// import "github.com/go-cmd/cmd"
//
// stdoutChan := make(chan string, 100)
// go func() {
// for line := range stdoutChan {
// // Do something with the line
// }
// }()
//
// runnableCmd := exec.Command(...)
// stdout := cmd.NewOutputStream(stdoutChan)
// runnableCmd.Stdout = stdout
//
// While runnableCmd is running, lines are sent to the channel as soon as they
// are written and newline-terminated by the command.
type OutputStream struct {
streamChan chan string
bufSize int
buf []byte
lastChar int
}
// NewOutputStream creates a new streaming output on the given channel. The
// caller must begin receiving on the channel before the command is started.
// The OutputStream never closes the channel.
func NewOutputStream(streamChan chan string) *OutputStream {
out := &OutputStream{
streamChan: streamChan,
// --
bufSize: DEFAULT_LINE_BUFFER_SIZE,
buf: make([]byte, DEFAULT_LINE_BUFFER_SIZE),
lastChar: 0,
}
return out
}
// Write makes OutputStream implement the io.Writer interface. Do not call
// this function directly.
func (rw *OutputStream) Write(p []byte) (n int, err error) {
n = len(p) // end of buffer
firstChar := 0
for {
// Find next newline in stream buffer. nextLine starts at 0, but buff
// can contain multiple lines, like "foo\nbar". So in that case nextLine
// will be 0 ("foo\nbar\n") then 4 ("bar\n") on next iteration. And i
// will be 3 and 7, respectively. So lines are [0:3] are [4:7].
newlineOffset := bytes.IndexByte(p[firstChar:], '\n')
if newlineOffset < 0 {
break // no newline in stream, next line incomplete
}
// End of line offset is start (nextLine) + newline offset. Like bufio.Scanner,
// we allow \r\n but strip the \r too by decrementing the offset for that byte.
lastChar := firstChar + newlineOffset // "line\n"
if newlineOffset > 0 && p[newlineOffset-1] == '\r' {
lastChar -= 1 // "line\r\n"
}
// Send the line, prepend line buffer if set
var line string
if rw.lastChar > 0 {
line = decodeBytes(rw.buf[0:rw.lastChar])
//line = string(rw.buf[0:rw.lastChar])
rw.lastChar = 0 // reset buffer
}
line += decodeBytes(p[firstChar:lastChar])
//line += string(p[firstChar:lastChar])
rw.streamChan <- line // blocks if chan full
// Next line offset is the first byte (+1) after the newline (i)
firstChar += newlineOffset + 1
}
if firstChar < n {
remain := len(p[firstChar:])
bufFree := len(rw.buf[rw.lastChar:])
if remain > bufFree {
var line string
if rw.lastChar > 0 {
line = string(rw.buf[0:rw.lastChar])
}
line += string(p[firstChar:])
err = ErrLineBufferOverflow{
Line: line,
BufferSize: rw.bufSize,
BufferFree: bufFree,
}
n = firstChar
return // implicit
}
copy(rw.buf[rw.lastChar:], p[firstChar:])
rw.lastChar += remain
}
return // implicit
}
// Lines returns the channel to which lines are sent. This is the same channel
// passed to NewOutputStream.
func (rw *OutputStream) Lines() <-chan string {
return rw.streamChan
}
// SetLineBufferSize sets the internal line buffer size. The default is DEFAULT_LINE_BUFFER_SIZE.
// This function must be called immediately after NewOutputStream, and it is not
// safe to call by multiple goroutines.
//
// Increasing the line buffer size can help reduce ErrLineBufferOverflow errors.
func (rw *OutputStream) SetLineBufferSize(n int) {
rw.bufSize = n
rw.buf = make([]byte, rw.bufSize)
}
// Flush empties the buffer of its last line.
func (rw *OutputStream) Flush() {
if rw.lastChar > 0 {
line := string(rw.buf[0:rw.lastChar])
rw.streamChan <- line
}
}