From 5a16b4751be6d8a3841ff35fd44504e026263e26 Mon Sep 17 00:00:00 2001 From: Derek Gonyeo Date: Sat, 21 Nov 2015 00:56:35 +0000 Subject: [PATCH] rkt: rewrote GetPods to use rkt's api service This involved adding annotations to the rkt pod's manifest that contain information about the kubernetes pod, which is later read by the kubelet. --- pkg/kubelet/rkt/fake_rkt_interface.go | 28 ++- pkg/kubelet/rkt/rkt.go | 281 +++++++++++++++++++++----- pkg/kubelet/rkt/rkt_test.go | 262 ++++++++++++++++++++++++ 3 files changed, 512 insertions(+), 59 deletions(-) diff --git a/pkg/kubelet/rkt/fake_rkt_interface.go b/pkg/kubelet/rkt/fake_rkt_interface.go index f6d7b7a2384a4..e5ae640075fe7 100644 --- a/pkg/kubelet/rkt/fake_rkt_interface.go +++ b/pkg/kubelet/rkt/fake_rkt_interface.go @@ -30,10 +30,12 @@ import ( // fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose. type fakeRktInterface struct { sync.Mutex - info rktapi.Info - images []*rktapi.Image - called []string - err error + info rktapi.Info + images []*rktapi.Image + podFilter *rktapi.PodFilter + pods []*rktapi.Pod + called []string + err error } func newFakeRktInterface() *fakeRktInterface { @@ -55,11 +57,25 @@ func (f *fakeRktInterface) GetInfo(ctx context.Context, in *rktapi.GetInfoReques } func (f *fakeRktInterface) ListPods(ctx context.Context, in *rktapi.ListPodsRequest, opts ...grpc.CallOption) (*rktapi.ListPodsResponse, error) { - return nil, fmt.Errorf("Not implemented") + f.Lock() + defer f.Unlock() + + f.called = append(f.called, "ListPods") + f.podFilter = in.Filter + return &rktapi.ListPodsResponse{f.pods}, f.err } func (f *fakeRktInterface) InspectPod(ctx context.Context, in *rktapi.InspectPodRequest, opts ...grpc.CallOption) (*rktapi.InspectPodResponse, error) { - return nil, fmt.Errorf("Not implemented") + f.Lock() + defer f.Unlock() + + f.called = append(f.called, "InspectPod") + for _, pod := range f.pods { + if pod.Id == in.Id { + return &rktapi.InspectPodResponse{pod}, f.err + } + } + return &rktapi.InspectPodResponse{nil}, f.err } func (f *fakeRktInterface) ListImages(ctx context.Context, in *rktapi.ListImagesRequest, opts ...grpc.CallOption) (*rktapi.ListImagesResponse, error) { diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 03672d41da355..e7714dd62c549 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -72,6 +72,16 @@ const ( unitRktID = "RktID" unitRestartCount = "RestartCount" + k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet" + k8sRktKubeletAnnoValue = "true" + k8sRktUIDAnno = "rkt.kubernetes.io/uid" + k8sRktNameAnno = "rkt.kubernetes.io/name" + k8sRktNamespaceAnno = "rkt.kubernetes.io/namespace" + //TODO: remove the creation time annotation once this is closed: https://github.com/coreos/rkt/issues/1789 + k8sRktCreationTimeAnno = "rkt.kubernetes.io/created" + k8sRktContainerHashAnno = "rkt.kubernetes.io/containerhash" + k8sRktRestartCountAnno = "rkt.kubernetes.io/restartcount" + dockerPrefix = "docker://" authDir = "auth.d" @@ -415,50 +425,60 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc var globalPortMappings []kubecontainer.PortMapping manifest := appcschema.BlankPodManifest() - for _, c := range pod.Spec.Containers { - if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil { - return nil, err - } - imgManifest, err := r.getImageManifest(c.Image) + listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{ + Filter: kubernetesPodFilter(pod), + }) + if err != nil { + return nil, fmt.Errorf("couldn't list pods: %v", err) + } + + restartCount := 0 + for _, rktpod := range listResp.Pods { + //TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786 + inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id}) if err != nil { - return nil, err + glog.Warningf("rkt: error while inspecting pod %s", rktpod.Id) + continue } - if imgManifest.App == nil { - imgManifest.App = new(appctypes.App) + if inspectResp.Pod == nil { + glog.Warningf("rkt: pod %s vanished?!", rktpod.Id) + continue } - img, err := r.getImageByName(c.Image) + manifest := &appcschema.PodManifest{} + err = json.Unmarshal(inspectResp.Pod.Manifest, manifest) if err != nil { - return nil, err - } - hash, err := appctypes.NewHash(img.ID) - if err != nil { - return nil, err + glog.Warningf("rkt: error unmatshaling pod manifest: %v", err) + continue } - opts, err := r.generator.GenerateRunContainerOptions(pod, &c) - if err != nil { - return nil, err + if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok { + num, err := strconv.Atoi(countString) + if err != nil { + glog.Warningf("rkt: error reading restart count on pod: %v", err) + continue + } + if num+1 > restartCount { + restartCount = num + 1 + } } + } - globalPortMappings = append(globalPortMappings, opts.PortMappings...) - - if err := setApp(imgManifest.App, &c, opts); err != nil { - return nil, err - } + manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktKubeletAnno), k8sRktKubeletAnnoValue) + manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktUIDAnno), string(pod.UID)) + manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNameAnno), pod.Name) + manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNamespaceAnno), pod.Namespace) + manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktCreationTimeAnno), strconv.FormatInt(time.Now().Unix(), 10)) + manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktRestartCountAnno), strconv.Itoa(restartCount)) - name, err := appctypes.SanitizeACName(c.Name) + for _, c := range pod.Spec.Containers { + app, portMappings, err := r.newAppcRuntimeApp(pod, c, pullSecrets) if err != nil { return nil, err } - appName := appctypes.MustACName(name) - - manifest.Apps = append(manifest.Apps, appcschema.RuntimeApp{ - Name: *appName, - Image: appcschema.RuntimeImage{ID: *hash}, - App: imgManifest.App, - }) + manifest.Apps = append(manifest.Apps, *app) + globalPortMappings = append(globalPortMappings, portMappings...) } volumeMap, ok := r.volumeGetter.GetVolumes(pod.UID) @@ -495,6 +515,80 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc return manifest, nil } +func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets []api.Secret) (*appcschema.RuntimeApp, []kubecontainer.PortMapping, error) { + if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil { + return nil, nil, err + } + imgManifest, err := r.getImageManifest(c.Image) + if err != nil { + return nil, nil, err + } + + if imgManifest.App == nil { + imgManifest.App = new(appctypes.App) + } + + img, err := r.getImageByName(c.Image) + if err != nil { + return nil, nil, err + } + hash, err := appctypes.NewHash(img.ID) + if err != nil { + return nil, nil, err + } + + opts, err := r.generator.GenerateRunContainerOptions(pod, &c) + if err != nil { + return nil, nil, err + } + + if err := setApp(imgManifest.App, &c, opts); err != nil { + return nil, nil, err + } + + name, err := appctypes.SanitizeACName(c.Name) + if err != nil { + return nil, nil, err + } + appName := appctypes.MustACName(name) + + kubehash := kubecontainer.HashContainer(&c) + + return &appcschema.RuntimeApp{ + Name: *appName, + Image: appcschema.RuntimeImage{ID: *hash}, + App: imgManifest.App, + Annotations: []appctypes.Annotation{ + { + Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno), + Value: strconv.FormatUint(kubehash, 10), + }, + }, + }, opts.PortMappings, nil +} + +func kubernetesPodFilter(pod *api.Pod) *rktapi.PodFilter { + return &rktapi.PodFilter{ + States: []rktapi.PodState{ + //TODO: In the future some pods can remain running after some apps exit: https://github.com/appc/spec/pull/500 + rktapi.PodState_POD_STATE_RUNNING, + rktapi.PodState_POD_STATE_EXITED, + rktapi.PodState_POD_STATE_DELETING, + rktapi.PodState_POD_STATE_GARBAGE, + }, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktKubeletAnno, + Value: k8sRktKubeletAnnoValue, + }, + { + Key: k8sRktUIDAnno, + Value: string(pod.UID), + }, + }, + } +} + func newUnitOption(section, name, value string) *unit.UnitOption { return &unit.UnitOption{Section: section, Name: name, Value: value} } @@ -719,6 +813,79 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { return nil } +// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod +func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, error) { + manifest := &appcschema.PodManifest{} + err := json.Unmarshal(rktpod.Manifest, manifest) + if err != nil { + return nil, err + } + + podUID, ok := manifest.Annotations.Get(k8sRktUIDAnno) + if !ok { + return nil, fmt.Errorf("pod is missing annotation %s", k8sRktUIDAnno) + } + podName, ok := manifest.Annotations.Get(k8sRktNameAnno) + if !ok { + return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNameAnno) + } + podNamespace, ok := manifest.Annotations.Get(k8sRktNamespaceAnno) + if !ok { + return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNamespaceAnno) + } + podCreatedString, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno) + if !ok { + return nil, fmt.Errorf("pod is missing annotation %s", k8sRktCreationTimeAnno) + } + podCreated, err := strconv.ParseInt(podCreatedString, 10, 64) + if err != nil { + return nil, fmt.Errorf("couldn't parse pod creation timestamp: %v", err) + } + + var state kubecontainer.ContainerState + switch rktpod.State { + case rktapi.PodState_POD_STATE_RUNNING: + state = kubecontainer.ContainerStateRunning + case rktapi.PodState_POD_STATE_ABORTED_PREPARE, rktapi.PodState_POD_STATE_EXITED, + rktapi.PodState_POD_STATE_DELETING, rktapi.PodState_POD_STATE_GARBAGE: + state = kubecontainer.ContainerStateExited + default: + state = kubecontainer.ContainerStateUnknown + } + + kubepod := &kubecontainer.Pod{ + ID: types.UID(podUID), + Name: podName, + Namespace: podNamespace, + } + for _, app := range rktpod.Apps { + manifest := &appcschema.ImageManifest{} + err := json.Unmarshal(app.Image.Manifest, manifest) + if err != nil { + return nil, err + } + containerHashString, ok := manifest.Annotations.Get(k8sRktContainerHashAnno) + if !ok { + return nil, fmt.Errorf("app is missing annotation %s", k8sRktContainerHashAnno) + } + containerHash, err := strconv.ParseUint(containerHashString, 10, 64) + if err != nil { + return nil, fmt.Errorf("couldn't parse container's hash: %v", err) + } + + kubepod.Containers = append(kubepod.Containers, &kubecontainer.Container{ + ID: buildContainerID(&containerID{rktpod.Id, app.Name}), + Name: app.Name, + Image: app.Image.Name, + Hash: containerHash, + Created: podCreated, + State: state, + }) + } + + return kubepod, nil +} + // readServiceFile reads the service file and constructs the runtime pod and the rkt info. func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) { f, err := os.Open(serviceFilePath(serviceName)) @@ -770,34 +937,42 @@ func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktI func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { glog.V(4).Infof("Rkt getting pods") - units, err := r.systemd.ListUnits() + listReq := &rktapi.ListPodsRequest{ + Filter: &rktapi.PodFilter{ + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktKubeletAnno, + Value: k8sRktKubeletAnnoValue, + }, + }, + }, + } + if !all { + listReq.Filter.States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING} + } + listResp, err := r.apisvc.ListPods(context.Background(), listReq) if err != nil { - return nil, err + return nil, fmt.Errorf("couldn't list pods: %v", err) } var pods []*kubecontainer.Pod - for _, u := range units { - if strings.HasPrefix(u.Name, kubernetesUnitPrefix) { - var state kubecontainer.ContainerState - switch { - case u.SubState == "running": - state = kubecontainer.ContainerStateRunning - default: - state = kubecontainer.ContainerStateExited - } - if !all && state != kubecontainer.ContainerStateRunning { - continue - } - pod, _, err := r.readServiceFile(u.Name) - if err != nil { - glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err) - continue - } - for _, c := range pod.Containers { - c.State = state - } - pods = append(pods, pod) + for _, rktpod := range listResp.Pods { + //TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786 + inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id}) + if err != nil { + return nil, err + } + + if inspectResp.Pod == nil { + return nil, fmt.Errorf("pod %s vanished?!", rktpod.Id) + } + + pod, err := r.convertRktPod(*inspectResp.Pod) + if err != nil { + glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err) + continue } + pods = append(pods, pod) } return pods, nil } diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index df47d1e50bd02..7a71545de2cbb 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -17,11 +17,15 @@ limitations under the License. package rkt import ( + "encoding/json" "fmt" "testing" + appcschema "github.com/appc/spec/schema" + appctypes "github.com/appc/spec/schema/types" rktapi "github.com/coreos/rkt/api/v1alpha" "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/types" ) func TestCheckVersion(t *testing.T) { @@ -197,3 +201,261 @@ func TestListImages(t *testing.T) { fr.CleanCalls() } } + +func TestGetPods(t *testing.T) { + fr := newFakeRktInterface() + fs := newFakeSystemd() + r := &Runtime{apisvc: fr, systemd: fs} + + tests := []struct { + k8sUID types.UID + k8sName string + k8sNamespace string + k8sCreation int64 + k8sRestart int + k8sContHashes []uint64 + rktPodState rktapi.PodState + pods []*rktapi.Pod + }{ + {}, + { + k8sUID: types.UID("0"), + k8sName: "guestbook", + k8sNamespace: "default", + k8sCreation: 10000000000, + k8sRestart: 1, + k8sContHashes: []uint64{2353434678}, + rktPodState: rktapi.PodState_POD_STATE_RUNNING, + pods: []*rktapi.Pod{ + { + State: rktapi.PodState_POD_STATE_RUNNING, + Apps: []*rktapi.App{ + { + Name: "test", + Image: &rktapi.Image{ + Name: "test", + Manifest: mustMarshalImageManifest( + &appcschema.ImageManifest{ + ACKind: appcschema.ImageManifestKind, + ACVersion: appcschema.AppContainerVersion, + Name: *appctypes.MustACIdentifier("test"), + Annotations: appctypes.Annotations{ + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno), + Value: "2353434678", + }, + }, + }, + ), + }, + }, + }, + Manifest: mustMarshalPodManifest( + &appcschema.PodManifest{ + ACKind: appcschema.PodManifestKind, + ACVersion: appcschema.AppContainerVersion, + Annotations: appctypes.Annotations{ + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktKubeletAnno), + Value: k8sRktKubeletAnnoValue, + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktUIDAnno), + Value: "0", + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktNameAnno), + Value: "guestbook", + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno), + Value: "default", + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno), + Value: "10000000000", + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno), + Value: "1", + }, + }, + }, + ), + }, + }, + }, + { + k8sUID: types.UID("1"), + k8sName: "test-pod", + k8sNamespace: "default", + k8sCreation: 10000000001, + k8sRestart: 3, + k8sContHashes: []uint64{2353434682, 8732645}, + rktPodState: rktapi.PodState_POD_STATE_EXITED, + pods: []*rktapi.Pod{ + { + State: rktapi.PodState_POD_STATE_EXITED, + Apps: []*rktapi.App{ + { + Name: "test", + Image: &rktapi.Image{ + Name: "test", + Manifest: mustMarshalImageManifest( + &appcschema.ImageManifest{ + ACKind: appcschema.ImageManifestKind, + ACVersion: appcschema.AppContainerVersion, + Name: *appctypes.MustACIdentifier("test"), + Annotations: appctypes.Annotations{ + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno), + Value: "2353434682", + }, + }, + }, + ), + }, + }, + { + Name: "test2", + Image: &rktapi.Image{ + Name: "test2", + Manifest: mustMarshalImageManifest( + &appcschema.ImageManifest{ + ACKind: appcschema.ImageManifestKind, + ACVersion: appcschema.AppContainerVersion, + Name: *appctypes.MustACIdentifier("test2"), + Annotations: appctypes.Annotations{ + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno), + Value: "8732645", + }, + }, + }, + ), + }, + }, + }, + Manifest: mustMarshalPodManifest( + &appcschema.PodManifest{ + ACKind: appcschema.PodManifestKind, + ACVersion: appcschema.AppContainerVersion, + Annotations: appctypes.Annotations{ + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktKubeletAnno), + Value: k8sRktKubeletAnnoValue, + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktUIDAnno), + Value: "1", + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktNameAnno), + Value: "test-pod", + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno), + Value: "default", + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno), + Value: "10000000001", + }, + appctypes.Annotation{ + Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno), + Value: "3", + }, + }, + }, + ), + }, + }, + }, + } + + for i, tt := range tests { + fr.pods = tt.pods + + pods, err := r.GetPods(true) + if err != nil { + t.Errorf("%v", err) + } + assert.Equal(t, len(pods), len(tt.pods), fmt.Sprintf("test case %d: mismatched number of pods", i)) + + for j, pod := range pods { + assert.Equal(t, pod.ID, tt.k8sUID, fmt.Sprintf("test case %d: mismatched UIDs", i)) + assert.Equal(t, pod.Name, tt.k8sName, fmt.Sprintf("test case %d: mismatched Names", i)) + assert.Equal(t, pod.Namespace, tt.k8sNamespace, fmt.Sprintf("test case %d: mismatched Namespaces", i)) + assert.Equal(t, len(pod.Containers), len(tt.pods[j].Apps), fmt.Sprintf("test case %d: mismatched number of containers", i)) + for k, cont := range pod.Containers { + assert.Equal(t, cont.Created, tt.k8sCreation, fmt.Sprintf("test case %d: mismatched creation times", i)) + assert.Equal(t, cont.Hash, tt.k8sContHashes[k], fmt.Sprintf("test case %d: mismatched container hashes", i)) + } + } + + var inspectPodCalls []string + for range pods { + inspectPodCalls = append(inspectPodCalls, "InspectPod") + } + assert.Equal(t, append([]string{"ListPods"}, inspectPodCalls...), fr.called, fmt.Sprintf("test case %d: unexpected called list", i)) + + fr.CleanCalls() + } +} + +func TestGetPodsFilter(t *testing.T) { + fr := newFakeRktInterface() + fs := newFakeSystemd() + r := &Runtime{apisvc: fr, systemd: fs} + + for _, test := range []struct { + All bool + ExpectedFilter *rktapi.PodFilter + }{ + { + true, + &rktapi.PodFilter{ + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktKubeletAnno, + Value: k8sRktKubeletAnnoValue, + }, + }, + }, + }, + { + false, + &rktapi.PodFilter{ + States: []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}, + Annotations: []*rktapi.KeyValue{ + { + Key: k8sRktKubeletAnno, + Value: k8sRktKubeletAnnoValue, + }, + }, + }, + }, + } { + _, err := r.GetPods(test.All) + if err != nil { + t.Errorf("%v", err) + } + assert.Equal(t, test.ExpectedFilter, fr.podFilter, "filters didn't match when all=%b", test.All) + } +} + +func mustMarshalPodManifest(man *appcschema.PodManifest) []byte { + manblob, err := json.Marshal(man) + if err != nil { + panic(err) + } + return manblob +} + +func mustMarshalImageManifest(man *appcschema.ImageManifest) []byte { + manblob, err := json.Marshal(man) + if err != nil { + panic(err) + } + return manblob +}