Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubelet: support retrieving stats using UID of mirror pods #5730

Merged
merged 1 commit into from
Mar 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type SyncHandler interface {
// Syncs current state to match the specified pods. SyncPodType specified what
// type of sync is occuring per pod. StartTime specifies the time at which
// syncing began (for use in monitoring).
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet,
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods,
startTime time.Time) error
}

Expand Down Expand Up @@ -267,7 +267,7 @@ type Kubelet struct {
// similar to pods, this is not immutable and is protected by the same podLock.
// Note that Kubelet.pods do not contain mirror pods as they are filtered
// out beforehand.
mirrorPods util.StringSet
mirrorPods mirrorPods

// A pod status cache stores statuses for pods (both rejected and synced).
// Note that currently no thread attempts to acquire podStatusesLock while
Expand Down Expand Up @@ -1488,7 +1488,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
}

// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, start time.Time) error {
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, start time.Time) error {
defer func() {
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
}()
Expand Down Expand Up @@ -1536,7 +1536,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Run the sync in an async manifest worker.
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.Has(podFullName), func() {
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() {
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
})

Expand Down Expand Up @@ -1606,7 +1606,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
}

// Remove any orphaned mirror pods.
deleteOrphanedMirrorPods(pods, mirrorPods, kl.mirrorManager)
deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager)

return err
}
Expand Down Expand Up @@ -1885,7 +1885,7 @@ func (kl *Kubelet) GetHostname() string {

// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
Expand Down Expand Up @@ -2057,6 +2057,8 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
uid = kl.translatePodUID(uid)

// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found {
Expand Down Expand Up @@ -2119,6 +2121,8 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {

// Run a command in a container, returns the combined stdout, stderr as an array of bytes
func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return nil, fmt.Errorf("no runner specified.")
}
Expand All @@ -2136,6 +2140,8 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s
// ExecInContainer executes a command in a container, connecting the supplied
// stdin/stdout/stderr to the command's IO streams.
func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
}
Expand All @@ -2153,6 +2159,8 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container
// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
uid = kl.translatePodUID(uid)

if kl.runner == nil {
return fmt.Errorf("no runner specified.")
}
Expand Down Expand Up @@ -2185,8 +2193,30 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration {
return kl.streamingConnectionIdleTimeout
}

// If the UID belongs to a mirror pod, maps it to the UID of its static pod.
// Otherwise, return the original UID. All public-facing functions should
// perform this translation for UIDs because user may provide a mirror pod UID,
// which is not recognized by internal Kubelet functions.
func (kl *Kubelet) translatePodUID(uid types.UID) types.UID {
if uid == "" {
return uid
}

kl.podLock.RLock()
defer kl.podLock.RUnlock()
staticUID, ok := kl.mirrorPods.GetStaticUID(uid)
if ok {
return staticUID
} else {
return uid
}
}

// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) {

uid = kl.translatePodUID(uid)

dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
if err != nil {
return nil, err
Expand Down
132 changes: 114 additions & 18 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
},
}
waitGroup.Add(1)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -764,7 +764,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
},
}
waitGroup.Add(2)
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -805,15 +805,15 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ID: "9876",
},
}
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Validate nothing happened.
verifyCalls(t, fakeDocker, []string{"list"})
fakeDocker.ClearCalls()

ready = true
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
Expand Down Expand Up @@ -852,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -1731,7 +1731,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
},
},
},
}, emptyPodUIDs, util.NewStringSet(), time.Now())
}, emptyPodUIDs, *newMirrorPods(), time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -2936,7 +2936,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Sync with empty pods so that the entry in status map will be removed.
kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now())
if len(kl.podStatuses) != 0 {
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
}
Expand Down Expand Up @@ -3183,23 +3183,119 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorManager
orphanedPodNames := []string{"pod1_ns", "pod2_ns"}
mirrorPods := util.NewStringSet()
for _, name := range orphanedPodNames {
mirrorPods.Insert(name)
orphanPods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "pod1",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "12345679",
Name: "pod2",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
},
}

mirrorPods := newMirrorPods()
for _, pod := range orphanPods {
mirrorPods.Insert(&pod)
}
// Sync with an empty pod list to delete all mirror pods.
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now())
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, *mirrorPods, time.Now())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if manager.NumOfPods() != 0 {
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
}
for _, name := range orphanedPodNames {
for _, pod := range orphanPods {
name := GetPodFullName(&pod)
creates, deletes := manager.GetCounts(name)
if creates != 0 || deletes != 1 {
t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)
}
}
}

func TestGetContainerInfoForMirrorPods(t *testing.T) {
// pods contain one static and one mirror pod with the same name but
// different UIDs.
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "1234",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "file",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "5678",
Name: "qux",
Namespace: "ns",
Annotations: map[string]string{
ConfigSourceAnnotationKey: "api",
ConfigMirrorAnnotationKey: "mirror",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "foo"},
},
},
},
}

containerID := "ab2cdf"
containerPath := fmt.Sprintf("/docker/%v", containerID)
containerInfo := cadvisorApi.ContainerInfo{
ContainerReference: cadvisorApi.ContainerReference{
Name: containerPath,
},
}

testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
mockCadvisor := testKubelet.fakeCadvisor
cadvisorReq := &cadvisorApi.ContainerInfoRequest{}
mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil)

fakeDocker.ContainerList = []docker.APIContainers{
{
ID: containerID,
Names: []string{"/k8s_foo_qux_ns_1234_42"},
},
}

kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods)
// Use the mirror pod UID to retrieve the stats.
stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if stats == nil {
t.Fatalf("stats should not be nil")
}
mockCadvisor.AssertExpectations(t)
}
Loading