Skip to content

Commit

Permalink
Kubelet: support retrieving stats using UID of mirror pod
Browse files Browse the repository at this point in the history
Kubelet supports retrieving stats for pods/containers with and without UID.
This does not always work for the static pods because users may get the UIDs of
the mirror pods from the API server, and use them to query Kubelet. In this
case, Kubelet would fail to locate the containers due to mismatched UIDs.

This change adds a intenral mirror to static pod UID mapping and teaches all
public-facing functions to perform UID lookup before proceeding. This allows
users to use either mirror or static pod's UID to retrieve stats.
  • Loading branch information
yujuhong committed Mar 20, 2015
1 parent 0250fcf commit 15e9760
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 47 deletions.
42 changes: 36 additions & 6 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet,
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods,
startTime time.Time) error
}

Expand Down Expand Up @@ -267,7 +267,7 @@ type Kubelet struct {
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that Kubelet.pods do not contain mirror pods as they are filtered
// out beforehand.
mirrorPods util.StringSet
mirrorPods mirrorPods

// A pod status cache stores statuses for pods (both rejected and synced).
// Note that currently no thread attempts to acquire podStatusesLock while
Expand Down Expand Up @@ -1488,7 +1488,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
}

// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
Expand Down Expand Up @@ -1536,7 +1536,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.Has(podFullName), func() {
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})

Expand Down Expand Up @@ -1606,7 +1606,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Remove any orphaned mirror pods.
deleteOrphanedMirrorPods(pods, mirrorPods, kl.mirrorManager)
deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager)

return err
}
Expand Down Expand Up @@ -1885,7 +1885,7 @@ func (kl *Kubelet) GetHostname() string {

// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
Expand Down Expand Up @@ -2057,6 +2057,8 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
uid = kl.translatePodUID(uid)

// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found {
Expand Down Expand Up @@ -2119,6 +2121,8 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {

// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
}
Expand All @@ -2136,6 +2140,8 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
}
Expand All @@ -2153,6 +2159,8 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
}
Expand Down Expand Up @@ -2185,8 +2193,30 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}

// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
// Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions.
func (kl *Kubelet) translatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}

kl.podLock.RLock()
defer kl.podLock.RUnlock()
staticUID, ok := kl.mirrorPods.GetStaticUID(uid)
if ok {
return staticUID
} else {
return uid
}
}

// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {

uid = kl.translatePodUID(uid)

dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
return nil, err
Expand Down
132 changes: 114 additions & 18 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -764,7 +764,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(2)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -805,15 +805,15 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()

ready = true
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
Expand Down Expand Up @@ -852,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1731,7 +1731,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
}, emptyPodUIDs, util.NewStringSet(), time.Now())
}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -2936,7 +2936,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
Expand Down Expand Up @@ -3183,23 +3183,119 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorManager
orphanedPodNames := []string{"pod1_ns", "pod2_ns"}
mirrorPods := util.NewStringSet()
for _, name := range orphanedPodNames {
mirrorPods.Insert(name)
orphanPods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "pod1",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "12345679",
Name: "pod2",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
}

mirrorPods := newMirrorPods()
for _, pod := range orphanPods {
mirrorPods.Insert(&pod)
}
// Sync with an empty pod list to delete all mirror pods.
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now())
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, *mirrorPods, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if manager.NumOfPods() != 0 {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
}
for _, name := range orphanedPodNames {
for _, pod := range orphanPods {
name := GetPodFullName(&pod)
creates, deletes := manager.GetCounts(name)
if creates != 0 || deletes != 1 {
t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)
}
}
}

func TestGetContainerInfoForMirrorPods(t *testing.T) {
// pods contain one static and one mirror pod with the same name but
// different UIDs.
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "1234",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "5678",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
}

containerID := "ab2cdf"
containerPath := fmt.Sprintf("/docker/%v", containerID)
containerInfo := cadvisorApi.ContainerInfo{
ContainerReference: cadvisorApi.ContainerReference{
Name: containerPath,
},
}

testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
mockCadvisor := testKubelet.fakeCadvisor
cadvisorReq := &cadvisorApi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)

fakeDocker.ContainerList = []docker.APIContainers{
{
ID: containerID,
Names: []string{"/k8s_foo_qux_ns_1234_42"},
},
}

kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods)
// Use the mirror pod UID to retrieve the stats.
stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if stats == nil {
t.Fatalf("stats should not be nil")
}
mockCadvisor.AssertExpectations(t)
}
Loading

0 comments on commit 15e9760

Please sign in to comment.