Skip to content

Commit

Permalink
Embed the diode lib to avoid test dependencies
Browse files Browse the repository at this point in the history
This commit introduces a breaking change in the diode API in order to
hide the diodes package interface. This removes a good number of
dependencies introduced by the test framework used by the diodes
package.
  • Loading branch information
rs committed May 25, 2018
1 parent 64faaa6 commit 77db4b4
Show file tree
Hide file tree
Showing 8 changed files with 438 additions and 18 deletions.
16 changes: 9 additions & 7 deletions diode/diode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"sync"
"time"

diodes "code.cloudfoundry.org/go-diodes"
"github.com/rs/zerolog/diode/internal/diodes"
)

var bufPool = &sync.Pool{
Expand All @@ -17,6 +17,8 @@ var bufPool = &sync.Pool{
},
}

type Alerter func(missed int)

// Writer is a io.Writer wrapper that uses a diode to make Write lock-free,
// non-blocking and thread safe.
type Writer struct {
Expand All @@ -33,19 +35,19 @@ type Writer struct {
//
// Use a diode.Writer when
//
// d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) {
// w := diode.NewWriter(w, 1000, 10 * time.Millisecond, func(missed int) {
// log.Printf("Dropped %d messages", missed)
// }))
// w := diode.NewWriter(w, d, 10 * time.Millisecond)
// })
// log := zerolog.New(w)
//
// See code.cloudfoundry.org/go-diodes for more info on diode.
func NewWriter(w io.Writer, manyToOneDiode *diodes.ManyToOne, poolInterval time.Duration) Writer {
func NewWriter(w io.Writer, size int, poolInterval time.Duration, f Alerter) Writer {
ctx, cancel := context.WithCancel(context.Background())
d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
dw := Writer{
w: w,
d: manyToOneDiode,
p: diodes.NewPoller(manyToOneDiode,
d: d,
p: diodes.NewPoller(d,
diodes.WithPollingInterval(poolInterval),
diodes.WithPollingContext(ctx)),
c: cancel,
Expand Down
6 changes: 2 additions & 4 deletions diode/diode_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,14 @@ import (
"os"
"time"

diodes "code.cloudfoundry.org/go-diodes"
"github.com/rs/zerolog"
"github.com/rs/zerolog/diode"
)

func ExampleNewWriter() {
d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) {
w := diode.NewWriter(os.Stdout, 1000, 10*time.Millisecond, func(missed int) {
fmt.Printf("Dropped %d messages\n", missed)
}))
w := diode.NewWriter(os.Stdout, d, 10*time.Millisecond)
})
log := zerolog.New(w)
log.Print("test")

Expand Down
11 changes: 4 additions & 7 deletions diode/diode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@ import (
"testing"
"time"

diodes "code.cloudfoundry.org/go-diodes"
"github.com/rs/zerolog"
"github.com/rs/zerolog/diode"
"github.com/rs/zerolog/internal/cbor"
)

func TestNewWriter(t *testing.T) {
d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) {
fmt.Printf("Dropped %d messages\n", missed)
}))
buf := bytes.Buffer{}
w := diode.NewWriter(&buf, d, 10*time.Millisecond)
w := diode.NewWriter(&buf, 1000, 10*time.Millisecond, func(missed int) {
fmt.Printf("Dropped %d messages\n", missed)
})
log := zerolog.New(w)
log.Print("test")

Expand All @@ -35,8 +33,7 @@ func TestNewWriter(t *testing.T) {
func Benchmark(b *testing.B) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr)
d := diodes.NewManyToOne(100000, nil)
w := diode.NewWriter(ioutil.Discard, d, 10*time.Millisecond)
w := diode.NewWriter(ioutil.Discard, 100000, 10*time.Millisecond, nil)
log := zerolog.New(w)
defer w.Close()

Expand Down
1 change: 1 addition & 0 deletions diode/internal/diodes/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Copied from https://github.com/cloudfoundry/go-diodes to avoid test dependencies.
130 changes: 130 additions & 0 deletions diode/internal/diodes/many_to_one.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package diodes

import (
"log"
"sync/atomic"
"unsafe"
)

// ManyToOne diode is optimal for many writers (go-routines B-n) and a single
// reader (go-routine A). It is not thread safe for multiple readers.
type ManyToOne struct {
buffer []unsafe.Pointer
writeIndex uint64
readIndex uint64
alerter Alerter
}

// NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
// is optimzed for many writers (on go-routines B-n) and a single reader
// (on go-routine A). The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewManyToOne(size int, alerter Alerter) *ManyToOne {
if alerter == nil {
alerter = AlertFunc(func(int) {})
}

d := &ManyToOne{
buffer: make([]unsafe.Pointer, size),
alerter: alerter,
}

// Start write index at the value before 0
// to allow the first write to use AddUint64
// and still have a beginning index of 0
d.writeIndex = ^d.writeIndex
return d
}

// Set sets the data in the next slot of the ring buffer.
func (d *ManyToOne) Set(data GenericDataType) {
for {
writeIndex := atomic.AddUint64(&d.writeIndex, 1)
idx := writeIndex % uint64(len(d.buffer))
old := atomic.LoadPointer(&d.buffer[idx])

if old != nil &&
(*bucket)(old) != nil &&
(*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}

newBucket := &bucket{
data: data,
seq: writeIndex,
}

if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
log.Println("Diode set collision: consider using a larger diode")
continue
}

return
}
}

// TryNext will attempt to read from the next slot of the ring buffer.
// If there is not data available, it will return (nil, false).
func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
// Read a value from the ring buffer based on the readIndex.
idx := d.readIndex % uint64(len(d.buffer))
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))

// When the result is nil that means the writer has not had the
// opportunity to write a value into the diode. This value must be ignored
// and the read head must not increment.
if result == nil {
return nil, false
}

// When the seq value is less than the current read index that means a
// value was read from idx that was previously written but has since has
// been dropped. This value must be ignored and the read head must not
// increment.
//
// The simulation for this scenario assumes the fast forward occurred as
// detailed below.
//
// 5. The reader reads again getting seq 5. It then reads again expecting
// seq 6 but gets seq 2. This is a read of a stale value that was
// effectively "dropped" so the read fails and the read head stays put.
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
//
if result.seq < d.readIndex {
return nil, false
}

// When the seq value is greater than the current read index that means a
// value was read from idx that overwrote the value that was expected to
// be at this idx. This happens when the writer has lapped the reader. The
// reader needs to catch up to the writer so it moves its write head to
// the new seq, effectively dropping the messages that were not read in
// between the two values.
//
// Here is a simulation of this scenario:
//
// 1. Both the read and write heads start at 0.
// `| nil | nil | nil | nil |` r: 0, w: 0
// 2. The writer fills the buffer.
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
// 3. The writer laps the read head.
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
// this forces the reader to fast forward to 5.
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
//
if result.seq > d.readIndex {
dropped := result.seq - d.readIndex
d.readIndex = result.seq
d.alerter.Alert(int(dropped))
}

// Only increment read index if a regular read occurred (where seq was
// equal to readIndex) or a value was read that caused a fast forward
// (where seq was greater than readIndex).
//
d.readIndex++
return result.data, true
}
129 changes: 129 additions & 0 deletions diode/internal/diodes/one_to_one.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package diodes

import (
"sync/atomic"
"unsafe"
)

// GenericDataType is the data type the diodes operate on.
type GenericDataType unsafe.Pointer

// Alerter is used to report how many values were overwritten since the
// last write.
type Alerter interface {
Alert(missed int)
}

// AlertFunc type is an adapter to allow the use of ordinary functions as
// Alert handlers.
type AlertFunc func(missed int)

// Alert calls f(missed)
func (f AlertFunc) Alert(missed int) {
f(missed)
}

type bucket struct {
data GenericDataType
seq uint64 // seq is the recorded write index at the time of writing
}

// OneToOne diode is meant to be used by a single reader and a single writer.
// It is not thread safe if used otherwise.
type OneToOne struct {
buffer []unsafe.Pointer
writeIndex uint64
readIndex uint64
alerter Alerter
}

// NewOneToOne creates a new diode is meant to be used by a single reader and
// a single writer. The alerter is invoked on the read's go-routine. It is
// called when it notices that the writer go-routine has passed it and wrote
// over data. A nil can be used to ignore alerts.
func NewOneToOne(size int, alerter Alerter) *OneToOne {
if alerter == nil {
alerter = AlertFunc(func(int) {})
}

return &OneToOne{
buffer: make([]unsafe.Pointer, size),
alerter: alerter,
}
}

// Set sets the data in the next slot of the ring buffer.
func (d *OneToOne) Set(data GenericDataType) {
idx := d.writeIndex % uint64(len(d.buffer))

newBucket := &bucket{
data: data,
seq: d.writeIndex,
}
d.writeIndex++

atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket))
}

// TryNext will attempt to read from the next slot of the ring buffer.
// If there is no data available, it will return (nil, false).
func (d *OneToOne) TryNext() (data GenericDataType, ok bool) {
// Read a value from the ring buffer based on the readIndex.
idx := d.readIndex % uint64(len(d.buffer))
result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))

// When the result is nil that means the writer has not had the
// opportunity to write a value into the diode. This value must be ignored
// and the read head must not increment.
if result == nil {
return nil, false
}

// When the seq value is less than the current read index that means a
// value was read from idx that was previously written but has since has
// been dropped. This value must be ignored and the read head must not
// increment.
//
// The simulation for this scenario assumes the fast forward occurred as
// detailed below.
//
// 5. The reader reads again getting seq 5. It then reads again expecting
// seq 6 but gets seq 2. This is a read of a stale value that was
// effectively "dropped" so the read fails and the read head stays put.
// `| 4 | 5 | 2 | 3 |` r: 7, w: 6
//
if result.seq < d.readIndex {
return nil, false
}

// When the seq value is greater than the current read index that means a
// value was read from idx that overwrote the value that was expected to
// be at this idx. This happens when the writer has lapped the reader. The
// reader needs to catch up to the writer so it moves its write head to
// the new seq, effectively dropping the messages that were not read in
// between the two values.
//
// Here is a simulation of this scenario:
//
// 1. Both the read and write heads start at 0.
// `| nil | nil | nil | nil |` r: 0, w: 0
// 2. The writer fills the buffer.
// `| 0 | 1 | 2 | 3 |` r: 0, w: 4
// 3. The writer laps the read head.
// `| 4 | 5 | 2 | 3 |` r: 0, w: 6
// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
// this forces the reader to fast forward to 5.
// `| 4 | 5 | 2 | 3 |` r: 5, w: 6
//
if result.seq > d.readIndex {
dropped := result.seq - d.readIndex
d.readIndex = result.seq
d.alerter.Alert(int(dropped))
}

// Only increment read index if a regular read occurred (where seq was
// equal to readIndex) or a value was read that caused a fast forward
// (where seq was greater than readIndex).
d.readIndex++
return result.data, true
}
Loading

0 comments on commit 77db4b4

Please sign in to comment.