Skip to content

Commit

Permalink
upgrade github.com/klauspost/compress to v1.15.1 and remove the need …
Browse files Browse the repository at this point in the history
…for runtime finalizers (segmentio#889)

* upgrade github.com/klauspost/compress to v1.15.1 and remove the need for runtime finalizers

* remove lowmem config
  • Loading branch information
Achille authored Apr 20, 2022
1 parent 886284f commit 6691dda
Showing 1 changed file with 24 additions and 36 deletions.
60 changes: 24 additions & 36 deletions compress/zstd/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package zstd

import (
"io"
"runtime"
"sync"

"github.com/klauspost/compress/zstd"
Expand All @@ -29,19 +28,17 @@ func (c *Codec) Name() string { return "zstd" }
// NewReader implements the compress.Codec interface.
func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
p := new(reader)
if dec, _ := decoderPool.Get().(*decoder); dec == nil {
z, err := zstd.NewReader(r)
if p.dec, _ = decoderPool.Get().(*zstd.Decoder); p.dec != nil {
p.dec.Reset(r)
} else {
z, err := zstd.NewReader(r,
zstd.WithDecoderConcurrency(1),
)
if err != nil {
p.err = err
} else {
p.dec = &decoder{z}
// We need a finalizer because the reader spawns goroutines
// that will only be stopped if the Close method is called.
runtime.SetFinalizer(p.dec, (*decoder).finalize)
p.dec = z
}
} else {
p.dec = dec
p.err = dec.Reset(r)
}
return p
}
Expand All @@ -57,18 +54,10 @@ func (c *Codec) zstdLevel() zstd.EncoderLevel {
return zstd.EncoderLevelFromZstd(c.level())
}

var decoderPool sync.Pool // *decoder

type decoder struct {
*zstd.Decoder
}

func (d *decoder) finalize() {
d.Close()
}
var decoderPool sync.Pool // *zstd.Decoder

type reader struct {
dec *decoder
dec *zstd.Decoder
err error
}

Expand All @@ -88,6 +77,9 @@ func (r *reader) Read(p []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
if r.dec == nil {
return 0, io.EOF
}
return r.dec.Read(p)
}

Expand All @@ -96,21 +88,25 @@ func (r *reader) WriteTo(w io.Writer) (int64, error) {
if r.err != nil {
return 0, r.err
}
if r.dec == nil {
return 0, io.ErrClosedPipe
}
return r.dec.WriteTo(w)
}

// NewWriter implements the compress.Codec interface.
func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
p := new(writer)
if enc, _ := c.encoderPool.Get().(*encoder); enc == nil {
z, err := zstd.NewWriter(w, zstd.WithEncoderLevel(c.zstdLevel()))
if enc, _ := c.encoderPool.Get().(*zstd.Encoder); enc == nil {
z, err := zstd.NewWriter(w,
zstd.WithEncoderLevel(c.zstdLevel()),
zstd.WithEncoderConcurrency(1),
zstd.WithZeroFrames(true),
)
if err != nil {
p.err = err
} else {
p.enc = &encoder{z}
// We need a finalizer because the writer spawns goroutines
// that will only be stopped if the Close method is called.
runtime.SetFinalizer(p.enc, (*encoder).finalize)
p.enc = z
}
} else {
p.enc = enc
Expand All @@ -120,17 +116,9 @@ func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
return p
}

type encoder struct {
*zstd.Encoder
}

func (e *encoder) finalize() {
e.Close()
}

type writer struct {
c *Codec
enc *encoder
enc *zstd.Encoder
err error
}

Expand All @@ -149,7 +137,7 @@ func (w *writer) Close() error {
w.enc = nil
return err
}
return nil
return w.err
}

// WriteTo implements the io.WriterTo interface.
Expand Down

0 comments on commit 6691dda

Please sign in to comment.