Skip to content

Commit

Permalink
Make stdcopy.stdWriter goroutine safe.
Browse files Browse the repository at this point in the history
Stop using global variables as prefixes to inject the writer header.
That can cause issues when two writers set the length of the buffer in
the same header concurrently.

Stop Writing to the internal buffer twice for each write. This could
mess up with the ordering information is written.

Signed-off-by: David Calavera <david.calavera@gmail.com>
(cherry picked from commit 443a5c2)
  • Loading branch information
calavera authored and Tibor Vass committed Mar 7, 2016
1 parent 97e7e23 commit a483ccc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 42 deletions.
77 changes: 40 additions & 37 deletions pkg/stdcopy/stdcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,57 @@ package stdcopy
import (
"encoding/binary"
"errors"
"fmt"
"io"

"github.com/Sirupsen/logrus"
)

// StdType is the type of standard stream
// a writer can multiplex to.
type StdType byte

const (
// Stdin represents standard input stream type.
Stdin StdType = iota
// Stdout represents standard output stream type.
Stdout
// Stderr represents standard error steam type.
Stderr

stdWriterPrefixLen = 8
stdWriterFdIndex = 0
stdWriterSizeIndex = 4

startingBufLen = 32*1024 + stdWriterPrefixLen + 1
)

// StdType prefixes type and length to standard stream.
type StdType [stdWriterPrefixLen]byte

var (
// Stdin represents standard input stream type.
Stdin = StdType{0: 0}
// Stdout represents standard output stream type.
Stdout = StdType{0: 1}
// Stderr represents standard error steam type.
Stderr = StdType{0: 2}
)

// StdWriter is wrapper of io.Writer with extra customized info.
type StdWriter struct {
// stdWriter is wrapper of io.Writer with extra customized info.
type stdWriter struct {
io.Writer
prefix StdType
sizeBuf []byte
prefix byte
}

func (w *StdWriter) Write(buf []byte) (n int, err error) {
var n1, n2 int
// Write sends the buffer to the underneath writer.
// It insert the prefix header before the buffer,
// so stdcopy.StdCopy knows where to multiplex the output.
// It makes stdWriter to implement io.Writer.
func (w *stdWriter) Write(buf []byte) (n int, err error) {
if w == nil || w.Writer == nil {
return 0, errors.New("Writer not instantiated")
}
binary.BigEndian.PutUint32(w.prefix[4:], uint32(len(buf)))
n1, err = w.Writer.Write(w.prefix[:])
if err != nil {
n = n1 - stdWriterPrefixLen
} else {
n2, err = w.Writer.Write(buf)
n = n1 + n2 - stdWriterPrefixLen
if buf == nil {
return 0, nil
}

header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(buf)))

line := append(header[:], buf...)

n, err = w.Writer.Write(line)
n -= stdWriterPrefixLen

if n < 0 {
n = 0
}
Expand All @@ -60,16 +66,13 @@ func (w *StdWriter) Write(buf []byte) (n int, err error) {
// This allows multiple write streams (e.g. stdout and stderr) to be muxed into a single connection.
// `t` indicates the id of the stream to encapsulate.
// It can be stdcopy.Stdin, stdcopy.Stdout, stdcopy.Stderr.
func NewStdWriter(w io.Writer, t StdType) *StdWriter {
return &StdWriter{
Writer: w,
prefix: t,
sizeBuf: make([]byte, 4),
func NewStdWriter(w io.Writer, t StdType) io.Writer {
return &stdWriter{
Writer: w,
prefix: byte(t),
}
}

var errInvalidStdHeader = errors.New("Unrecognized input header")

// StdCopy is a modified version of io.Copy.
//
// StdCopy will demultiplex `src`, assuming that it contains two streams,
Expand Down Expand Up @@ -110,18 +113,18 @@ func StdCopy(dstout, dsterr io.Writer, src io.Reader) (written int64, err error)
}

// Check the first byte to know where to write
switch buf[stdWriterFdIndex] {
case 0:
switch StdType(buf[stdWriterFdIndex]) {
case Stdin:
fallthrough
case 1:
case Stdout:
// Write on stdout
out = dstout
case 2:
case Stderr:
// Write on stderr
out = dsterr
default:
logrus.Debugf("Error selecting output fd: (%d)", buf[stdWriterFdIndex])
return 0, errInvalidStdHeader
return 0, fmt.Errorf("Unrecognized input header: %d", buf[stdWriterFdIndex])
}

// Retrieve the size of the frame
Expand Down
9 changes: 4 additions & 5 deletions pkg/stdcopy/stdcopy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ func TestNewStdWriter(t *testing.T) {
}

func TestWriteWithUnitializedStdWriter(t *testing.T) {
writer := StdWriter{
Writer: nil,
prefix: Stdout,
sizeBuf: make([]byte, 4),
writer := stdWriter{
Writer: nil,
prefix: byte(Stdout),
}
n, err := writer.Write([]byte("Something here"))
if n != 0 || err == nil {
Expand Down Expand Up @@ -180,7 +179,7 @@ func TestStdCopyDetectsCorruptedFrame(t *testing.T) {
src: buffer}
written, err := StdCopy(ioutil.Discard, ioutil.Discard, reader)
if written != startingBufLen {
t.Fatalf("Expected 0 bytes read, got %d", written)
t.Fatalf("Expected %d bytes read, got %d", startingBufLen, written)
}
if err != nil {
t.Fatal("Didn't get nil error")
Expand Down

0 comments on commit a483ccc

Please sign in to comment.