Skip to content

Commit

Permalink
Merge pull request #8379 from jedevc/docker-pusher-concurrency
Browse files Browse the repository at this point in the history
Fix various timing issues with docker pusher
  • Loading branch information
dmcgowan authored Feb 21, 2024
2 parents 4510ca3 + a9152eb commit b8654e3
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 60 deletions.
8 changes: 1 addition & 7 deletions core/content/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

// maxResets is the no.of times the Copy() method can tolerate a reset of the body
const maxResets = 5

var ErrReset = errors.New("writer has been reset")

var bufPool = sync.Pool{
Expand Down Expand Up @@ -149,7 +146,7 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er
// Copy is buffered, so no need to wrap reader in buffered io.
func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
r := or
for i := 0; i < maxResets; i++ {
for i := 0; ; i++ {
if i >= 1 {
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
}
Expand Down Expand Up @@ -189,9 +186,6 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig
}
return nil
}

log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets)
return fmt.Errorf("failed to copy after %d retries", maxResets)
}

// CopyReaderAt copies to a writer from a given reader at for the given
Expand Down
43 changes: 36 additions & 7 deletions core/content/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"bytes"
"context"
_ "crypto/sha256" // required by go-digest
"fmt"
"errors"
"io"
"strings"
"testing"
Expand All @@ -42,7 +42,7 @@ func TestCopy(t *testing.T) {
cf1 := func(buf *bytes.Buffer, st Status) commitFunction {
i := 0
return func() error {
// function resets the first time
// function resets the first time, but then succeeds after
if i == 0 {
// this is the case where, the pipewriter to which the data was being written has
// changed. which means we need to clear the buffer
Expand All @@ -55,11 +55,28 @@ func TestCopy(t *testing.T) {
}
}

cf2err := errors.New("commit failed")
cf2 := func(buf *bytes.Buffer, st Status) commitFunction {
i := 0
return func() error {
// function resets for more than the maxReset value
if i < maxResets+1 {
// function resets a lot of times, and eventually fails
if i < 10 {
// this is the case where, the pipewriter to which the data was being written has
// changed. which means we need to clear the buffer
i++
buf.Reset()
st.Offset = 0
return ErrReset
}
return cf2err
}
}

cf3 := func(buf *bytes.Buffer, st Status) commitFunction {
i := 0
return func() error {
// function resets a lot of times, and eventually succeeds
if i < 10 {
// this is the case where, the pipewriter to which the data was being written has
// changed. which means we need to clear the buffer
i++
Expand All @@ -73,8 +90,10 @@ func TestCopy(t *testing.T) {

s1 := Status{}
s2 := Status{}
s3 := Status{}
b1 := bytes.Buffer{}
b2 := bytes.Buffer{}
b3 := bytes.Buffer{}

var testcases = []struct {
name string
Expand Down Expand Up @@ -130,15 +149,25 @@ func TestCopy(t *testing.T) {
expected: "content to copy",
},
{
name: "write fails more than maxReset times due to reset",
name: "write fails after lots of resets",
source: newCopySource("content to copy"),
writer: fakeWriter{
Buffer: &b2,
status: s2,
commitFunc: cf2(&b2, s2),
},
expected: "",
expectedErr: fmt.Errorf("failed to copy after %d retries", maxResets),
expectedErr: cf2err,
},
{
name: "write succeeds after lots of resets",
source: newCopySource("content to copy"),
writer: fakeWriter{
Buffer: &b3,
status: s3,
commitFunc: cf3(&b3, s3),
},
expected: "content to copy",
},
}

Expand All @@ -153,7 +182,7 @@ func TestCopy(t *testing.T) {

// if an error is expected then further comparisons are not required
if testcase.expectedErr != nil {
assert.Equal(t, testcase.expectedErr, err)
assert.ErrorIs(t, err, testcase.expectedErr)
return
}

Expand Down
109 changes: 63 additions & 46 deletions core/remotes/docker/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,14 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
req.body = func() (io.ReadCloser, error) {
pr, pw := io.Pipe()
pushw.setPipe(pw)
return io.NopCloser(pr), nil
return pr, nil
}
req.size = desc.Size

go func() {
resp, err := req.doWithRetries(ctx, nil)
if err != nil {
pushw.setError(err)
pushw.Close()
return
}

Expand All @@ -298,7 +297,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
err := remoteserrors.NewUnexpectedStatusErr(resp)
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
pushw.setError(err)
pushw.Close()
return
}
pushw.setResponse(resp)
}()
Expand Down Expand Up @@ -331,10 +330,12 @@ type pushWriter struct {

pipe *io.PipeWriter

pipeC chan *io.PipeWriter
respC chan *http.Response
done chan struct{}
closeOnce sync.Once
errC chan error

pipeC chan *io.PipeWriter
respC chan *http.Response
errC chan error

isManifest bool

Expand All @@ -352,19 +353,51 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S
pipeC: make(chan *io.PipeWriter, 1),
respC: make(chan *http.Response, 1),
errC: make(chan error, 1),
done: make(chan struct{}),
isManifest: isManifest,
}
}

func (pw *pushWriter) setPipe(p *io.PipeWriter) {
pw.pipeC <- p
select {
case <-pw.done:
case pw.pipeC <- p:
}
}

func (pw *pushWriter) setError(err error) {
pw.errC <- err
select {
case <-pw.done:
case pw.errC <- err:
}
}

func (pw *pushWriter) setResponse(resp *http.Response) {
pw.respC <- resp
select {
case <-pw.done:
case pw.respC <- resp:
}
}

func (pw *pushWriter) replacePipe(p *io.PipeWriter) error {
if pw.pipe == nil {
pw.pipe = p
return nil
}

pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
}

func (pw *pushWriter) Write(p []byte) (n int, err error) {
Expand All @@ -374,26 +407,18 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
}

if pw.pipe == nil {
p, ok := <-pw.pipeC
if !ok {
select {
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
pw.replacePipe(p)
}
pw.pipe = p
} else {
select {
case p, ok := <-pw.pipeC:
if !ok {
return 0, io.ErrClosedPipe
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

// If content has already been written, the bytes
// cannot be written and the caller must reset
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return 0, content.ErrReset
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
return 0, pw.replacePipe(p)
default:
}
}
Expand All @@ -403,9 +428,13 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
// if the pipe is closed, we might have the original error on the error
// channel - so we should try and get it
select {
case err2 := <-pw.errC:
err = err2
default:
case <-pw.done:
case err = <-pw.errC:
pw.Close()
case p := <-pw.pipeC:
return 0, pw.replacePipe(p)
case resp := <-pw.respC:
pw.setResponse(resp)
}
}
status.Offset += int64(n)
Expand All @@ -418,7 +447,7 @@ func (pw *pushWriter) Close() error {
// Ensure pipeC is closed but handle `Close()` being
// called multiple times without panicking
pw.closeOnce.Do(func() {
close(pw.pipeC)
close(pw.done)
})
if pw.pipe != nil {
status, err := pw.tracker.GetStatus(pw.ref)
Expand Down Expand Up @@ -458,30 +487,18 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
// TODO: timeout waiting for response
var resp *http.Response
select {
case <-pw.done:
return io.ErrClosedPipe
case err := <-pw.errC:
pw.Close()
return err
case resp = <-pw.respC:
defer resp.Body.Close()
case p, ok := <-pw.pipeC:
case p := <-pw.pipeC:
// check whether the pipe has changed in the commit, because sometimes Write
// can complete successfully, but the pipe may have changed. In that case, the
// content needs to be reset.
if !ok {
return io.ErrClosedPipe
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p

// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
return pw.replacePipe(p)
}

// 201 is specified return status, some registries return
Expand Down

0 comments on commit b8654e3

Please sign in to comment.