Skip to content

Commit

Permalink
Fix concurrent uploads that share layers
Browse files Browse the repository at this point in the history
Concurrent uploads which share layers worked correctly as of moby#18353,
but unfortunately moby#18785 caused a regression. This PR removed the logic
that shares digests between different push sessions. This overlooked the
case where one session was waiting for another session to upload a
layer.

This commit adds back the ability to propagate this digest information,
using the distribution.Descriptor type because this is what is received
from stats and uploads, and also what is ultimately needed for building
the manifest.

Surprisingly, there was no test covering this case. This commit adds
one. It fails without the fix.

See recent comments on moby#9132.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
  • Loading branch information
aaronlehmann committed Mar 1, 2016
1 parent d883002 commit 5c99eeb
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 32 deletions.
42 changes: 22 additions & 20 deletions distribution/push_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type v2Pusher struct {
config *ImagePushConfig
repo distribution.Repository

// pushState is state built by the Download functions.
// pushState is state built by the Upload functions.
pushState pushState
}

Expand Down Expand Up @@ -224,6 +224,7 @@ type v2PushDescriptor struct {
repoInfo reference.Named
repo distribution.Repository
pushState *pushState
remoteDescriptor distribution.Descriptor
}

func (pd *v2PushDescriptor) Key() string {
Expand All @@ -238,16 +239,16 @@ func (pd *v2PushDescriptor) DiffID() layer.DiffID {
return pd.layer.DiffID()
}

func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error {
func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
diffID := pd.DiffID()

pd.pushState.Lock()
if _, ok := pd.pushState.remoteLayers[diffID]; ok {
if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
// it is already known that the push is not needed and
// therefore doing a stat is unnecessary
pd.pushState.Unlock()
progress.Update(progressOutput, pd.ID(), "Layer already exists")
return nil
return descriptor, nil
}
pd.pushState.Unlock()

Expand All @@ -257,14 +258,14 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
if err != nil {
progress.Update(progressOutput, pd.ID(), "Image push failed")
return retryOnError(err)
return distribution.Descriptor{}, retryOnError(err)
}
if exists {
progress.Update(progressOutput, pd.ID(), "Layer already exists")
pd.pushState.Lock()
pd.pushState.remoteLayers[diffID] = descriptor
pd.pushState.Unlock()
return nil
return descriptor, nil
}
}

Expand Down Expand Up @@ -328,9 +329,9 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.

// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
return xfer.DoNotRetry{Err: err}
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}
return nil
return err.Descriptor, nil
case nil:
// blob upload session created successfully, so begin the upload
mountAttemptsRemaining = 0
Expand All @@ -345,14 +346,14 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
if layerUpload == nil {
layerUpload, err = bs.Create(ctx)
if err != nil {
return retryOnError(err)
return distribution.Descriptor{}, retryOnError(err)
}
}
defer layerUpload.Close()

arch, err := pd.layer.TarStream()
if err != nil {
return xfer.DoNotRetry{Err: err}
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}

// don't care if this fails; best effort
Expand All @@ -371,20 +372,20 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
nn, err := layerUpload.ReadFrom(tee)
compressedReader.Close()
if err != nil {
return retryOnError(err)
return distribution.Descriptor{}, retryOnError(err)
}

pushDigest := digester.Digest()
if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
return retryOnError(err)
return distribution.Descriptor{}, retryOnError(err)
}

logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
progress.Update(progressOutput, pd.ID(), "Pushed")

// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
return xfer.DoNotRetry{Err: err}
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}

pd.pushState.Lock()
Expand All @@ -393,23 +394,24 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// speaks the v2 protocol.
pd.pushState.confirmedV2 = true

pd.pushState.remoteLayers[diffID] = distribution.Descriptor{
descriptor := distribution.Descriptor{
Digest: pushDigest,
MediaType: schema2.MediaTypeLayer,
Size: nn,
}
pd.pushState.remoteLayers[diffID] = descriptor

pd.pushState.Unlock()

return nil
return descriptor, nil
}

func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor
}

func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
// Not necessary to lock pushStatus because this is always
// called after all the mutation in pushStatus.
// By the time this function is called, every layer will have
// an entry in remoteLayers.
return pd.pushState.remoteLayers[pd.DiffID()]
return pd.remoteDescriptor
}

// layerAlreadyExists checks if the registry already know about any of the
Expand Down
23 changes: 16 additions & 7 deletions distribution/xfer/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
"golang.org/x/net/context"
Expand All @@ -28,8 +29,8 @@ func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
type uploadTransfer struct {
Transfer

diffID layer.DiffID
err error
remoteDescriptor distribution.Descriptor
err error
}

// An UploadDescriptor references a layer that may need to be uploaded.
Expand All @@ -41,7 +42,12 @@ type UploadDescriptor interface {
// DiffID should return the DiffID for this layer.
DiffID() layer.DiffID
// Upload is called to perform the Upload.
Upload(ctx context.Context, progressOutput progress.Output) error
Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
// SetRemoteDescriptor provides the distribution.Descriptor that was
// returned by Upload. This descriptor is not to be confused with
// the UploadDescriptor interface, which is used for internally
// identifying layers that are being uploaded.
SetRemoteDescriptor(descriptor distribution.Descriptor)
}

// Upload is a blocking function which ensures the listed layers are present on
Expand All @@ -50,7 +56,7 @@ type UploadDescriptor interface {
func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
var (
uploads []*uploadTransfer
dedupDescriptors = make(map[string]struct{})
dedupDescriptors = make(map[string]*uploadTransfer)
)

for _, descriptor := range layers {
Expand All @@ -60,12 +66,12 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
if _, present := dedupDescriptors[key]; present {
continue
}
dedupDescriptors[key] = struct{}{}

xferFunc := lum.makeUploadFunc(descriptor)
upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.Release(watcher)
uploads = append(uploads, upload.(*uploadTransfer))
dedupDescriptors[key] = upload.(*uploadTransfer)
}

for _, upload := range uploads {
Expand All @@ -78,6 +84,9 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
}
}
}
for _, l := range layers {
l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
}

return nil
}
Expand All @@ -86,7 +95,6 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
u := &uploadTransfer{
Transfer: NewTransfer(),
diffID: descriptor.DiffID(),
}

go func() {
Expand All @@ -105,8 +113,9 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun

retries := 0
for {
err := descriptor.Upload(u.Transfer.Context(), progressOutput)
remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
if err == nil {
u.remoteDescriptor = remoteDescriptor
break
}

Expand Down
15 changes: 10 additions & 5 deletions distribution/xfer/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
Expand Down Expand Up @@ -35,32 +36,36 @@ func (u *mockUploadDescriptor) DiffID() layer.DiffID {
return u.diffID
}

// SetRemoteDescriptor is not used in the mock.
func (u *mockUploadDescriptor) SetRemoteDescriptor(remoteDescriptor distribution.Descriptor) {
}

// Upload is called to perform the upload.
func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error {
func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
if u.currentUploads != nil {
defer atomic.AddInt32(u.currentUploads, -1)

if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency {
return errors.New("concurrency limit exceeded")
return distribution.Descriptor{}, errors.New("concurrency limit exceeded")
}
}

// Sleep a bit to simulate a time-consuming upload.
for i := int64(0); i <= 10; i++ {
select {
case <-ctx.Done():
return ctx.Err()
return distribution.Descriptor{}, ctx.Err()
case <-time.After(10 * time.Millisecond):
progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10})
}
}

if u.simulateRetries != 0 {
u.simulateRetries--
return errors.New("simulating retry")
return distribution.Descriptor{}, errors.New("simulating retry")
}

return nil
return distribution.Descriptor{}, nil
}

func uploadDescriptors(currentUploads *int32) []UploadDescriptor {
Expand Down
55 changes: 55 additions & 0 deletions integration-cli/docker_cli_push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,61 @@ func (s *DockerSchema1RegistrySuite) TestPushEmptyLayer(c *check.C) {
testPushEmptyLayer(c)
}

// testConcurrentPush pushes multiple tags to the same repo
// concurrently.
func testConcurrentPush(c *check.C) {
repoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)

repos := []string{}
for _, tag := range []string{"push1", "push2", "push3"} {
repo := fmt.Sprintf("%v:%v", repoName, tag)
_, err := buildImage(repo, fmt.Sprintf(`
FROM busybox
ENTRYPOINT ["/bin/echo"]
ENV FOO foo
ENV BAR bar
CMD echo %s
`, repo), true)
c.Assert(err, checker.IsNil)
repos = append(repos, repo)
}

// Push tags, in parallel
results := make(chan error)

for _, repo := range repos {
go func(repo string) {
_, _, err := runCommandWithOutput(exec.Command(dockerBinary, "push", repo))
results <- err
}(repo)
}

for range repos {
err := <-results
c.Assert(err, checker.IsNil, check.Commentf("concurrent push failed with error: %v", err))
}

// Clear local images store.
args := append([]string{"rmi"}, repos...)
dockerCmd(c, args...)

// Re-pull and run individual tags, to make sure pushes succeeded
for _, repo := range repos {
dockerCmd(c, "pull", repo)
dockerCmd(c, "inspect", repo)
out, _ := dockerCmd(c, "run", "--rm", repo)
c.Assert(strings.TrimSpace(out), checker.Equals, "/bin/sh -c echo "+repo)
}
}

func (s *DockerRegistrySuite) TestConcurrentPush(c *check.C) {
testConcurrentPush(c)
}

func (s *DockerSchema1RegistrySuite) TestConcurrentPush(c *check.C) {
testConcurrentPush(c)
}

func (s *DockerRegistrySuite) TestCrossRepositoryLayerPush(c *check.C) {
sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
// tag the image to upload it to the private registry
Expand Down

0 comments on commit 5c99eeb

Please sign in to comment.