Skip to content

Commit

Permalink
Periodically reporing image pulling progress in log
Browse files Browse the repository at this point in the history
  • Loading branch information
Random-Liu committed May 24, 2016
1 parent 9625926 commit 151d0ab
Showing 1 changed file with 75 additions and 1 deletion.
76 changes: 75 additions & 1 deletion pkg/kubelet/dockertools/kube_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
"fmt"
"io"
"io/ioutil"
"sync"
"time"

"github.com/golang/glog"

dockermessage "github.com/docker/docker/pkg/jsonmessage"
dockerstdcopy "github.com/docker/docker/pkg/stdcopy"
dockerapi "github.com/docker/engine-api/client"
Expand Down Expand Up @@ -58,6 +61,9 @@ const (

// defaultShmSize is the default ShmSize to use (in bytes) if not specified.
defaultShmSize = int64(1024 * 1024 * 64)

// defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
defaultImagePullingProgressReportInterval = 10 * time.Second
)

// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
Expand Down Expand Up @@ -192,6 +198,71 @@ func base64EncodeAuth(auth dockertypes.AuthConfig) (string, error) {
return base64.URLEncoding.EncodeToString(buf.Bytes()), nil
}

// progress is a wrapper of dockermessage.JSONMessage with a lock protecting it.
type progress struct {
sync.RWMutex
// message stores the latest docker json message.
message *dockermessage.JSONMessage
}

func (p *progress) set(msg *dockermessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.message = msg
}

func (p *progress) get() string {
p.RLock()
defer p.RUnlock()
if p.message == nil {
return "No progress"
}
var prefix string
if p.message.ID != "" {
prefix = fmt.Sprintf("%s: ", p.message.ID)
}
if p.message.Progress == nil {
return fmt.Sprintf("%s%s", prefix, p.message.Status)
}
return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String())
}

// progressReporter keeps the newest image pulling progress and periodically report the newest progress.
type progressReporter struct {
progress
image string
interval time.Duration
stopCh chan struct{}
}

// newProgressReporter creates a new progressReporter for specific image with specified reporting interval
func newProgressReporter(image string, interval time.Duration) *progressReporter {
return &progressReporter{image: image, interval: interval, stopCh: make(chan struct{})}
}

// start starts the progressReporter
func (p *progressReporter) start() {
go func() {
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
// TODO(random-liu): Report as events.
select {
case <-ticker.C:
glog.V(2).Infof("Pulling image %q: %q", p.image, p.progress.get())
case <-p.stopCh:
glog.V(2).Infof("Stop pulling image %q: %q", p.image, p.progress.get())
return
}
}
}()
}

// stop stops the progressReporter
func (p *progressReporter) stop() {
close(p.stopCh)
}

func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error {
// RegistryAuth is the base64 encoded credentials for the registry
base64Auth, err := base64EncodeAuth(auth)
Expand All @@ -209,7 +280,9 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
return err
}
defer resp.Close()
// TODO(random-liu): Use the image pulling progress information.
reporter := newProgressReporter(image, defaultImagePullingProgressReportInterval)
reporter.start()
defer reporter.stop()
decoder := json.NewDecoder(resp)
for {
var msg dockermessage.JSONMessage
Expand All @@ -223,6 +296,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
if msg.Error != nil {
return msg.Error
}
reporter.set(&msg)
}
return nil
}
Expand Down

0 comments on commit 151d0ab

Please sign in to comment.