diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index dd79e32d5c27d..9d2cf33a92d5d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -95,7 +95,7 @@ type SyncHandler interface { // Syncs current state to match the specified pods. SyncPodType specified what // type of sync is occuring per pod. StartTime specifies the time at which // syncing began (for use in monitoring). - SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, + SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, startTime time.Time) error } @@ -267,7 +267,7 @@ type Kubelet struct { // similar to pods, this is not immutable and is protected by the same podLock. // Note that Kubelet.pods do not contain mirror pods as they are filtered // out beforehand. - mirrorPods util.StringSet + mirrorPods mirrorPods // A pod status cache stores statuses for pods (both rejected and synced). // Note that currently no thread attempts to acquire podStatusesLock while @@ -1488,7 +1488,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, start time.Time) error { +func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods mirrorPods, start time.Time) error { defer func() { metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start)) }() @@ -1536,7 +1536,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric } // Run the sync in an async manifest worker. - kl.podWorkers.UpdatePod(pod, kl.mirrorPods.Has(podFullName), func() { + kl.podWorkers.UpdatePod(pod, kl.mirrorPods.HasMirrorPod(uid), func() { metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start)) }) @@ -1606,7 +1606,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric } // Remove any orphaned mirror pods. - deleteOrphanedMirrorPods(pods, mirrorPods, kl.mirrorManager) + deleteOrphanedMirrorPods(mirrorPods, kl.mirrorManager) return err } @@ -1885,7 +1885,7 @@ func (kl *Kubelet) GetHostname() string { // GetPods returns all pods bound to the kubelet and their spec, and the mirror // pod map. -func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) { +func (kl *Kubelet) GetPods() ([]api.Pod, mirrorPods) { kl.podLock.RLock() defer kl.podLock.RUnlock() return append([]api.Pod{}, kl.pods...), kl.mirrorPods @@ -2057,6 +2057,8 @@ func getPodReadyCondition(spec *api.PodSpec, info api.PodInfo) []api.PodConditio // GetPodStatus returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { + uid = kl.translatePodUID(uid) + // Check to see if we have a cached version of the status. cachedPodStatus, found := kl.getPodStatusFromCache(podFullName) if found { @@ -2119,6 +2121,8 @@ func (kl *Kubelet) ServeLogs(w http.ResponseWriter, req *http.Request) { // Run a command in a container, returns the combined stdout, stderr as an array of bytes func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container string, cmd []string) ([]byte, error) { + uid = kl.translatePodUID(uid) + if kl.runner == nil { return nil, fmt.Errorf("no runner specified.") } @@ -2136,6 +2140,8 @@ func (kl *Kubelet) RunInContainer(podFullName string, uid types.UID, container s // ExecInContainer executes a command in a container, connecting the supplied // stdin/stdout/stderr to the command's IO streams. func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool) error { + uid = kl.translatePodUID(uid) + if kl.runner == nil { return fmt.Errorf("no runner specified.") } @@ -2153,6 +2159,8 @@ func (kl *Kubelet) ExecInContainer(podFullName string, uid types.UID, container // PortForward connects to the pod's port and copies data between the port // and the stream. func (kl *Kubelet) PortForward(podFullName string, uid types.UID, port uint16, stream io.ReadWriteCloser) error { + uid = kl.translatePodUID(uid) + if kl.runner == nil { return fmt.Errorf("no runner specified.") } @@ -2185,8 +2193,30 @@ func (kl *Kubelet) StreamingConnectionIdleTimeout() time.Duration { return kl.streamingConnectionIdleTimeout } +// If the UID belongs to a mirror pod, maps it to the UID of its static pod. +// Otherwise, return the original UID. All public-facing functions should +// perform this translation for UIDs because user may provide a mirror pod UID, +// which is not recognized by internal Kubelet functions. +func (kl *Kubelet) translatePodUID(uid types.UID) types.UID { + if uid == "" { + return uid + } + + kl.podLock.RLock() + defer kl.podLock.RUnlock() + staticUID, ok := kl.mirrorPods.GetStaticUID(uid) + if ok { + return staticUID + } else { + return uid + } +} + // GetContainerInfo returns stats (from Cadvisor) for a container. func (kl *Kubelet) GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) { + + uid = kl.translatePodUID(uid) + dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) if err != nil { return nil, err diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e9fdb0bfef967..7d198526561f9 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -585,7 +585,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -694,7 +694,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { }, } waitGroup.Add(1) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -764,7 +764,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { }, } waitGroup.Add(2) - err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now()) + err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -805,7 +805,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { ID: "9876", }, } - if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil { + if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } // Validate nothing happened. @@ -813,7 +813,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) { fakeDocker.ClearCalls() ready = true - if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil { + if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()); err != nil { t.Errorf("unexpected error: %v", err) } verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"}) @@ -852,7 +852,7 @@ func TestSyncPodsDeletes(t *testing.T) { ID: "4567", }, } - err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()) + err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -1731,7 +1731,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { }, }, }, - }, emptyPodUIDs, util.NewStringSet(), time.Now()) + }, emptyPodUIDs, *newMirrorPods(), time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -2936,7 +2936,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) } // Sync with empty pods so that the entry in status map will be removed. - kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()) + kl.SyncPods([]api.Pod{}, emptyPodUIDs, *newMirrorPods(), time.Now()) if len(kl.podStatuses) != 0 { t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses) } @@ -3183,23 +3183,119 @@ func TestDeleteOrphanedMirrorPods(t *testing.T) { testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) kl := testKubelet.kubelet manager := testKubelet.fakeMirrorManager - orphanedPodNames := []string{"pod1_ns", "pod2_ns"} - mirrorPods := util.NewStringSet() - for _, name := range orphanedPodNames { - mirrorPods.Insert(name) + orphanPods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "pod1", + Namespace: "ns", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "api", + ConfigMirrorAnnotationKey: "mirror", + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "12345679", + Name: "pod2", + Namespace: "ns", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "api", + ConfigMirrorAnnotationKey: "mirror", + }, + }, + }, + } + + mirrorPods := newMirrorPods() + for _, pod := range orphanPods { + mirrorPods.Insert(&pod) } // Sync with an empty pod list to delete all mirror pods. - err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now()) + err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, *mirrorPods, time.Now()) if err != nil { t.Errorf("unexpected error: %v", err) } if manager.NumOfPods() != 0 { t.Errorf("expected zero mirror pods, got %v", manager.GetPods()) } - for _, name := range orphanedPodNames { + for _, pod := range orphanPods { + name := GetPodFullName(&pod) creates, deletes := manager.GetCounts(name) if creates != 0 || deletes != 1 { t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes) } } } + +func TestGetContainerInfoForMirrorPods(t *testing.T) { + // pods contain one static and one mirror pod with the same name but + // different UIDs. + pods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "1234", + Name: "qux", + Namespace: "ns", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "file", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "foo"}, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "5678", + Name: "qux", + Namespace: "ns", + Annotations: map[string]string{ + ConfigSourceAnnotationKey: "api", + ConfigMirrorAnnotationKey: "mirror", + }, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "foo"}, + }, + }, + }, + } + + containerID := "ab2cdf" + containerPath := fmt.Sprintf("/docker/%v", containerID) + containerInfo := cadvisorApi.ContainerInfo{ + ContainerReference: cadvisorApi.ContainerReference{ + Name: containerPath, + }, + } + + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + fakeDocker := testKubelet.fakeDocker + mockCadvisor := testKubelet.fakeCadvisor + cadvisorReq := &cadvisorApi.ContainerInfoRequest{} + mockCadvisor.On("DockerContainer", containerID, cadvisorReq).Return(containerInfo, nil) + + fakeDocker.ContainerList = []docker.APIContainers{ + { + ID: containerID, + Names: []string{"/k8s_foo_qux_ns_1234_42"}, + }, + } + + kubelet.pods, kubelet.mirrorPods = filterAndCategorizePods(pods) + // Use the mirror pod UID to retrieve the stats. + stats, err := kubelet.GetContainerInfo("qux_ns", "5678", "foo", cadvisorReq) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if stats == nil { + t.Fatalf("stats should not be nil") + } + mockCadvisor.AssertExpectations(t) +} diff --git a/pkg/kubelet/mirror_manager.go b/pkg/kubelet/mirror_manager.go index 97ddb4d6f1f80..1d1ab68dc7b03 100644 --- a/pkg/kubelet/mirror_manager.go +++ b/pkg/kubelet/mirror_manager.go @@ -21,7 +21,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/golang/glog" ) @@ -86,15 +86,10 @@ func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error { } // Delete all orphaned mirror pods. -func deleteOrphanedMirrorPods(pods []api.Pod, mirrorPods util.StringSet, manager mirrorManager) { - existingPods := util.NewStringSet() - for _, pod := range pods { - existingPods.Insert(GetPodFullName(&pod)) - } - for podFullName := range mirrorPods { - if !existingPods.Has(podFullName) { - manager.DeleteMirrorPod(podFullName) - } +func deleteOrphanedMirrorPods(mirrorPods mirrorPods, manager mirrorManager) { + podFullNames := mirrorPods.GetOrphanedMirrorPodNames() + for _, podFullName := range podFullNames { + manager.DeleteMirrorPod(podFullName) } } @@ -123,16 +118,90 @@ func isMirrorPod(pod *api.Pod) bool { // This function separate the mirror pods from regular pods to // facilitate pods syncing and mirror pod creation/deletion. -func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, util.StringSet) { +func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, mirrorPods) { filteredPods := []api.Pod{} - mirrorPods := util.NewStringSet() + mirrorPods := newMirrorPods() + for _, pod := range pods { - name := GetPodFullName(&pod) - if isMirrorPod(&pod) { - mirrorPods.Insert(name) - } else { + mirrorPods.Insert(&pod) + if !isMirrorPod(&pod) { filteredPods = append(filteredPods, pod) } } - return filteredPods, mirrorPods + return filteredPods, *mirrorPods +} + +// mirrorPods is thread-compatible. +// TODO (yujuhong): Replace this with a pod manager that manages both regular +// pods and mirror pods. +type mirrorPods struct { + // Static pod UIDs indexed by pod full name. + static map[string]types.UID + // Mirror pod UIDs indexed by pod full name. + mirror map[string]types.UID + + // Bi-directional UID mappings. + staticToMirror map[types.UID]types.UID + mirrorToStatic map[types.UID]types.UID +} + +func newMirrorPods() *mirrorPods { + mirrorPods := mirrorPods{} + mirrorPods.static = make(map[string]types.UID) + mirrorPods.mirror = make(map[string]types.UID) + mirrorPods.staticToMirror = make(map[types.UID]types.UID) + mirrorPods.mirrorToStatic = make(map[types.UID]types.UID) + return &mirrorPods +} + +func (self *mirrorPods) Insert(pod *api.Pod) { + podFullName := GetPodFullName(pod) + if isMirrorPod(pod) { + self.mirror[podFullName] = pod.UID + } else if isStaticPod(pod) { + self.static[podFullName] = pod.UID + } + staticUID, found1 := self.static[podFullName] + mirrorUID, found2 := self.mirror[podFullName] + // Update the UID mappings. + if found1 && found2 { + self.staticToMirror[staticUID] = mirrorUID + self.mirrorToStatic[mirrorUID] = staticUID + } +} + +func (self *mirrorPods) HasStaticPod(key types.UID) bool { + _, ok := self.mirrorToStatic[key] + return ok +} + +func (self *mirrorPods) HasMirrorPod(key types.UID) bool { + _, ok := self.staticToMirror[key] + return ok +} + +func (self *mirrorPods) GetMirrorUID(key types.UID) (types.UID, bool) { + value, ok := self.staticToMirror[key] + if !ok { + return "", false + } + return value, true +} + +func (self *mirrorPods) GetStaticUID(key types.UID) (types.UID, bool) { + value, ok := self.mirrorToStatic[key] + if !ok { + return "", false + } + return value, true +} + +func (self *mirrorPods) GetOrphanedMirrorPodNames() []string { + orphanedPodNames := []string{} + for podFullName := range self.mirror { + if _, ok := self.static[podFullName]; !ok { + orphanedPodNames = append(orphanedPodNames, podFullName) + } + } + return orphanedPodNames } diff --git a/pkg/kubelet/mirror_manager_test.go b/pkg/kubelet/mirror_manager_test.go index 7bf42d9cb38df..da4222db60b65 100644 --- a/pkg/kubelet/mirror_manager_test.go +++ b/pkg/kubelet/mirror_manager_test.go @@ -121,7 +121,7 @@ func TestFilterOutMirrorPods(t *testing.T) { if !reflect.DeepEqual(expectedPods, actualPods) { t.Errorf("expected %#v, got %#v", expectedPods, actualPods) } - if !actualMirrorPods.Has(GetPodFullName(&mirrorPod)) { + if _, ok := actualMirrorPods.mirror[GetPodFullName(&mirrorPod)]; !ok { t.Errorf("mirror pod is not recorded") } } diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 1700f753afa46..aa39082d21171 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -37,7 +37,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy" "github.com/golang/glog" @@ -83,7 +82,7 @@ type HostInterface interface { GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) GetDockerVersion() ([]uint, error) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) - GetPods() ([]api.Pod, util.StringSet) + GetPods() ([]api.Pod, mirrorPods) GetPodByName(namespace, name string) (*api.Pod, bool) GetPodStatus(name string, uid types.UID) (api.PodStatus, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 0b3f29ec7a333..1daa66c377c5f 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -33,7 +33,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy" cadvisorApi "github.com/google/cadvisor/info/v1" @@ -45,7 +44,7 @@ type fakeKubelet struct { containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) machineInfoFunc func() (*cadvisorApi.MachineInfo, error) - podsFunc func() ([]api.Pod, util.StringSet) + podsFunc func() ([]api.Pod, mirrorPods) logFunc func(w http.ResponseWriter, req *http.Request) runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) dockerVersionFunc func() ([]uint, error) @@ -80,7 +79,7 @@ func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) return fk.machineInfoFunc() } -func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet) { +func (fk *fakeKubelet) GetPods() ([]api.Pod, mirrorPods) { return fk.podsFunc() }