Skip to content

Commit

Permalink
Merge pull request kubernetes#4901 from vmarmol/mon-startup
Browse files Browse the repository at this point in the history
Adding sync pod latency metric (again).
  • Loading branch information
rjnagal committed Feb 27, 2015
2 parents 537d8cf + ed0f588 commit c6175fa
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 38 deletions.
52 changes: 44 additions & 8 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ const podOomScoreAdj = -100

// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncPods([]api.BoundPod) error
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
}

type SourceReadyFn func(source string) bool
Expand Down Expand Up @@ -942,7 +945,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.Docke
func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
start := time.Now()
defer func() {
metrics.ImagePullLatency.Observe(float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()))
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
}()

if err := kl.dockerPuller.Pull(img); err != nil {
Expand Down Expand Up @@ -1270,7 +1273,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.BoundPod, running []*docker
}

// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
func (kl *Kubelet) SyncPods(pods []api.BoundPod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
glog.V(4).Infof("Desired: %#v", pods)
var err error
desiredContainers := make(map[podContainer]empty)
Expand All @@ -1296,7 +1299,9 @@ func (kl *Kubelet) SyncPods(pods []api.BoundPod) error {
}

// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(*pod)
kl.podWorkers.UpdatePod(pod, func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})
}

// Stop the workers for no-longer existing pods.
Expand Down Expand Up @@ -1416,19 +1421,21 @@ func (kl *Kubelet) handleUpdate(u PodUpdate) {
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
unsyncedPod := false
podSyncTypes := make(map[types.UID]metrics.SyncPodType)
select {
case u := <-updates:
kl.updatePods(u)
kl.updatePods(u, podSyncTypes)
unsyncedPod = true
case <-time.After(kl.resyncInterval):
glog.V(4).Infof("Periodic sync")
}
start := time.Now()
// If we already caught some update, try to wait for some short time
// to possibly batch it with other incoming updates.
for unsyncedPod {
select {
case u := <-updates:
kl.updatePods(u)
kl.updatePods(u, podSyncTypes)
case <-time.After(5 * time.Millisecond):
// Break the for loop.
unsyncedPod = false
Expand All @@ -1440,25 +1447,54 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
glog.Errorf("Failed to get bound pods.")
return
}
if err := handler.SyncPods(pods); err != nil {
if err := handler.SyncPods(pods, podSyncTypes, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
}
}

func (kl *Kubelet) updatePods(u PodUpdate) {
// Updated the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
switch u.Op {
case SET:
glog.V(3).Infof("SET: Containers changed")

// Store the new pods. Don't worry about filtering host ports since those
// pods will never be looked up.
existingPods := make(map[types.UID]struct{})
for i := range kl.pods {
existingPods[kl.pods[i].UID] = struct{}{}
}
for i := range u.Pods {
if _, ok := existingPods[u.Pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate
}
}

kl.pods = u.Pods
kl.pods = filterHostPortConflicts(kl.pods)
case UPDATE:
glog.V(3).Infof("Update: Containers changed")

// Store the updated pods. Don't worry about filtering host ports since those
// pods will never be looked up.
for i := range u.Pods {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
}

kl.pods = updateBoundPods(u.Pods, kl.pods)
kl.pods = filterHostPortConflicts(kl.pods)
default:
panic("syncLoop does not support incremental changes")
}

// Mark all remaining pods as sync.
for i := range kl.pods {
if _, ok := podSyncTypes[kl.pods[i].UID]; !ok {
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodSync
}
}
}

// Returns Docker version for this Kubelet.
Expand Down
29 changes: 16 additions & 13 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/host_path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
Expand Down Expand Up @@ -382,6 +383,8 @@ func (cr *channelReader) GetList() [][]api.BoundPod {
return cr.list
}

var emptyPodUIDs map[types.UID]metrics.SyncPodType

func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, fakeDocker, waitGroup := newTestKubelet(t)
container := api.Container{Name: "bar"}
Expand Down Expand Up @@ -413,7 +416,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -444,7 +447,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -491,7 +494,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -542,7 +545,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -590,7 +593,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -645,7 +648,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -690,7 +693,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -728,15 +731,15 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()

ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
Expand Down Expand Up @@ -787,15 +790,15 @@ func TestSyncPodsDeletesWhenContainerSourceReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()

ready = true
if err := kubelet.SyncPods([]api.BoundPod{}); err != nil {
if err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
Expand Down Expand Up @@ -833,7 +836,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.BoundPod{})
err := kubelet.SyncPods([]api.BoundPod{}, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -2091,7 +2094,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
})
}, emptyPodUIDs, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
38 changes: 37 additions & 1 deletion pkg/kubelet/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package metrics

import (
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
Expand All @@ -35,8 +36,16 @@ var (
Help: "Image pull latency in microseconds.",
},
)
// TODO(vmarmol): Break down by number of containers in pod?
SyncPodLatency = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Subsystem: kubeletSubsystem,
Name: "sync_pod_latency_microseconds",
Help: "Latency in microseconds to sync a single pod. Broken down by operation type: create, update, or sync",
},
[]string{"operation_type"},
)
// TODO(vmarmol): Containers per pod
// TODO(vmarmol): Latency of pod startup
// TODO(vmarmol): Latency of SyncPods
)

Expand All @@ -47,10 +56,37 @@ func Register(containerCache dockertools.DockerCache) {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(ImagePullLatency)
prometheus.MustRegister(SyncPodLatency)
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
})
}

type SyncPodType int

const (
SyncPodCreate SyncPodType = iota
SyncPodUpdate
SyncPodSync
)

func (self SyncPodType) String() string {
switch self {
case SyncPodCreate:
return "create"
case SyncPodUpdate:
return "update"
case SyncPodSync:
return "sync"
default:
return "unknown"
}
}

// Gets the time since the specified start in microseconds.
func SinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}

func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector {
return &podAndContainerCollector{
containerCache: containerCache,
Expand Down
Loading

0 comments on commit c6175fa

Please sign in to comment.