Skip to content

Commit

Permalink
Merge pull request kubernetes#5730 from yujuhong/static_stats
Browse files Browse the repository at this point in the history
Kubelet: support retrieving stats using UID of mirror pods
  • Loading branch information
vmarmol committed Mar 21, 2015
2 parents cc17181 + 15e9760 commit 4d2e798
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 47 deletions.
42 changes: 36 additions & 6 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet,
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods,
startTime time.Time) error
}

Expand Down Expand Up @@ -267,7 +267,7 @@ type Kubelet struct {
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that Kubelet.pods do not contain mirror pods as they are filtered
// out beforehand.
mirrorPods util.StringSet
mirrorPods mirrorPods

// A pod status cache stores statuses for pods (both rejected and synced).
// Note that currently no thread attempts to acquire podStatusesLock while
Expand Down Expand Up @@ -1488,7 +1488,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
}

// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
Expand Down Expand Up @@ -1536,7 +1536,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.Has(podFullName), func() {
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})

Expand Down Expand Up @@ -1606,7 +1606,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Remove any orphaned mirror pods.
deleteOrphanedMirrorPods(pods, mirrorPods, kl.mirrorManager)
deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager)

return err
}
Expand Down Expand Up @@ -1885,7 +1885,7 @@ func (kl *Kubelet) GetHostname() string {

// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
Expand Down Expand Up @@ -2057,6 +2057,8 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
uid = kl.translatePodUID(uid)

// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found {
Expand Down Expand Up @@ -2119,6 +2121,8 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {

// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
}
Expand All @@ -2136,6 +2140,8 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
}
Expand All @@ -2153,6 +2159,8 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
}
Expand Down Expand Up @@ -2185,8 +2193,30 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}

// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
// Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions.
func (kl *Kubelet) translatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}

kl.podLock.RLock()
defer kl.podLock.RUnlock()
staticUID, ok := kl.mirrorPods.GetStaticUID(uid)
if ok {
return staticUID
} else {
return uid
}
}

// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {

uid = kl.translatePodUID(uid)

dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
return nil, err
Expand Down
132 changes: 114 additions & 18 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -764,7 +764,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(2)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -805,15 +805,15 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()

ready = true
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
Expand Down Expand Up @@ -852,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1731,7 +1731,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
}, emptyPodUIDs, util.NewStringSet(), time.Now())
}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -2936,7 +2936,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
Expand Down Expand Up @@ -3183,23 +3183,119 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorManager
orphanedPodNames := []string{"pod1_ns", "pod2_ns"}
mirrorPods := util.NewStringSet()
for _, name := range orphanedPodNames {
mirrorPods.Insert(name)
orphanPods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "pod1",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "12345679",
Name: "pod2",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
}

mirrorPods := newMirrorPods()
for _, pod := range orphanPods {
mirrorPods.Insert(&pod)
}
// Sync with an empty pod list to delete all mirror pods.
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now())
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, *mirrorPods, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if manager.NumOfPods() != 0 {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
}
for _, name := range orphanedPodNames {
for _, pod := range orphanPods {
name := GetPodFullName(&pod)
creates, deletes := manager.GetCounts(name)
if creates != 0 || deletes != 1 {
t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)
}
}
}

func TestGetContainerInfoForMirrorPods(t *testing.T) {
// pods contain one static and one mirror pod with the same name but
// different UIDs.
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "1234",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "5678",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
}

containerID := "ab2cdf"
containerPath := fmt.Sprintf("/docker/%v", containerID)
containerInfo := cadvisorApi.ContainerInfo{
ContainerReference: cadvisorApi.ContainerReference{
Name: containerPath,
},
}

testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
mockCadvisor := testKubelet.fakeCadvisor
cadvisorReq := &cadvisorApi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)

fakeDocker.ContainerList = []docker.APIContainers{
{
ID: containerID,
Names: []string{"/k8s_foo_qux_ns_1234_42"},
},
}

kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods)
// Use the mirror pod UID to retrieve the stats.
stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if stats == nil {
t.Fatalf("stats should not be nil")
}
mockCadvisor.AssertExpectations(t)
}
Loading

0 comments on commit 4d2e798

Please sign in to comment.