Skip to content

Commit

Permalink
Merge pull request kubernetes#17969 from dgonyeo/rkt_api_get_pods
Browse files Browse the repository at this point in the history
Auto commit by PR queue bot
  • Loading branch information
k8s-merge-robot committed Dec 11, 2015
2 parents 6e29e84 + 5a16b47 commit 6b8eb90
Show file tree
Hide file tree
Showing 3 changed files with 512 additions and 59 deletions.
28 changes: 22 additions & 6 deletions pkg/kubelet/rkt/fake_rkt_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import (
// fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose.
type fakeRktInterface struct {
sync.Mutex
info rktapi.Info
images []*rktapi.Image
called []string
err error
info rktapi.Info
images []*rktapi.Image
podFilter *rktapi.PodFilter
pods []*rktapi.Pod
called []string
err error
}

func newFakeRktInterface() *fakeRktInterface {
Expand All @@ -55,11 +57,25 @@ func (f *fakeRktInterface) GetInfo(ctx context.Context, in *rktapi.GetInfoReques
}

func (f *fakeRktInterface) ListPods(ctx context.Context, in *rktapi.ListPodsRequest, opts ...grpc.CallOption) (*rktapi.ListPodsResponse, error) {
return nil, fmt.Errorf("Not implemented")
f.Lock()
defer f.Unlock()

f.called = append(f.called, "ListPods")
f.podFilter = in.Filter
return &rktapi.ListPodsResponse{f.pods}, f.err
}

func (f *fakeRktInterface) InspectPod(ctx context.Context, in *rktapi.InspectPodRequest, opts ...grpc.CallOption) (*rktapi.InspectPodResponse, error) {
return nil, fmt.Errorf("Not implemented")
f.Lock()
defer f.Unlock()

f.called = append(f.called, "InspectPod")
for _, pod := range f.pods {
if pod.Id == in.Id {
return &rktapi.InspectPodResponse{pod}, f.err
}
}
return &rktapi.InspectPodResponse{nil}, f.err
}

func (f *fakeRktInterface) ListImages(ctx context.Context, in *rktapi.ListImagesRequest, opts ...grpc.CallOption) (*rktapi.ListImagesResponse, error) {
Expand Down
281 changes: 228 additions & 53 deletions pkg/kubelet/rkt/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ const (
unitRktID = "RktID"
unitRestartCount = "RestartCount"

k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet"
k8sRktKubeletAnnoValue = "true"
k8sRktUIDAnno = "rkt.kubernetes.io/uid"
k8sRktNameAnno = "rkt.kubernetes.io/name"
k8sRktNamespaceAnno = "rkt.kubernetes.io/namespace"
//TODO: remove the creation time annotation once this is closed: https://github.com/coreos/rkt/issues/1789
k8sRktCreationTimeAnno = "rkt.kubernetes.io/created"
k8sRktContainerHashAnno = "rkt.kubernetes.io/containerhash"
k8sRktRestartCountAnno = "rkt.kubernetes.io/restartcount"

dockerPrefix = "docker://"

authDir = "auth.d"
Expand Down Expand Up @@ -415,50 +425,60 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
var globalPortMappings []kubecontainer.PortMapping
manifest := appcschema.BlankPodManifest()

for _, c := range pod.Spec.Containers {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, err
}
imgManifest, err := r.getImageManifest(c.Image)
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
Filter: kubernetesPodFilter(pod),
})
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}

restartCount := 0
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
return nil, err
glog.Warningf("rkt: error while inspecting pod %s", rktpod.Id)
continue
}

if imgManifest.App == nil {
imgManifest.App = new(appctypes.App)
if inspectResp.Pod == nil {
glog.Warningf("rkt: pod %s vanished?!", rktpod.Id)
continue
}

img, err := r.getImageByName(c.Image)
manifest := &appcschema.PodManifest{}
err = json.Unmarshal(inspectResp.Pod.Manifest, manifest)
if err != nil {
return nil, err
}
hash, err := appctypes.NewHash(img.ID)
if err != nil {
return nil, err
glog.Warningf("rkt: error unmatshaling pod manifest: %v", err)
continue
}

opts, err := r.generator.GenerateRunContainerOptions(pod, &c)
if err != nil {
return nil, err
if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
num, err := strconv.Atoi(countString)
if err != nil {
glog.Warningf("rkt: error reading restart count on pod: %v", err)
continue
}
if num+1 > restartCount {
restartCount = num + 1
}
}
}

globalPortMappings = append(globalPortMappings, opts.PortMappings...)

if err := setApp(imgManifest.App, &c, opts); err != nil {
return nil, err
}
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktKubeletAnno), k8sRktKubeletAnnoValue)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktUIDAnno), string(pod.UID))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNameAnno), pod.Name)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNamespaceAnno), pod.Namespace)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktCreationTimeAnno), strconv.FormatInt(time.Now().Unix(), 10))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktRestartCountAnno), strconv.Itoa(restartCount))

name, err := appctypes.SanitizeACName(c.Name)
for _, c := range pod.Spec.Containers {
app, portMappings, err := r.newAppcRuntimeApp(pod, c, pullSecrets)
if err != nil {
return nil, err
}
appName := appctypes.MustACName(name)

manifest.Apps = append(manifest.Apps, appcschema.RuntimeApp{
Name: *appName,
Image: appcschema.RuntimeImage{ID: *hash},
App: imgManifest.App,
})
manifest.Apps = append(manifest.Apps, *app)
globalPortMappings = append(globalPortMappings, portMappings...)
}

volumeMap, ok := r.volumeGetter.GetVolumes(pod.UID)
Expand Down Expand Up @@ -495,6 +515,80 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
return manifest, nil
}

func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets []api.Secret) (*appcschema.RuntimeApp, []kubecontainer.PortMapping, error) {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, nil, err
}
imgManifest, err := r.getImageManifest(c.Image)
if err != nil {
return nil, nil, err
}

if imgManifest.App == nil {
imgManifest.App = new(appctypes.App)
}

img, err := r.getImageByName(c.Image)
if err != nil {
return nil, nil, err
}
hash, err := appctypes.NewHash(img.ID)
if err != nil {
return nil, nil, err
}

opts, err := r.generator.GenerateRunContainerOptions(pod, &c)
if err != nil {
return nil, nil, err
}

if err := setApp(imgManifest.App, &c, opts); err != nil {
return nil, nil, err
}

name, err := appctypes.SanitizeACName(c.Name)
if err != nil {
return nil, nil, err
}
appName := appctypes.MustACName(name)

kubehash := kubecontainer.HashContainer(&c)

return &appcschema.RuntimeApp{
Name: *appName,
Image: appcschema.RuntimeImage{ID: *hash},
App: imgManifest.App,
Annotations: []appctypes.Annotation{
{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: strconv.FormatUint(kubehash, 10),
},
},
}, opts.PortMappings, nil
}

func kubernetesPodFilter(pod *api.Pod) *rktapi.PodFilter {
return &rktapi.PodFilter{
States: []rktapi.PodState{
//TODO: In the future some pods can remain running after some apps exit: https://github.com/appc/spec/pull/500
rktapi.PodState_POD_STATE_RUNNING,
rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING,
rktapi.PodState_POD_STATE_GARBAGE,
},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(pod.UID),
},
},
}
}

func newUnitOption(section, name, value string) *unit.UnitOption {
return &unit.UnitOption{Section: section, Name: name, Value: value}
}
Expand Down Expand Up @@ -719,6 +813,79 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
return nil
}

// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, error) {
manifest := &appcschema.PodManifest{}
err := json.Unmarshal(rktpod.Manifest, manifest)
if err != nil {
return nil, err
}

podUID, ok := manifest.Annotations.Get(k8sRktUIDAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktUIDAnno)
}
podName, ok := manifest.Annotations.Get(k8sRktNameAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNameAnno)
}
podNamespace, ok := manifest.Annotations.Get(k8sRktNamespaceAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNamespaceAnno)
}
podCreatedString, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktCreationTimeAnno)
}
podCreated, err := strconv.ParseInt(podCreatedString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse pod creation timestamp: %v", err)
}

var state kubecontainer.ContainerState
switch rktpod.State {
case rktapi.PodState_POD_STATE_RUNNING:
state = kubecontainer.ContainerStateRunning
case rktapi.PodState_POD_STATE_ABORTED_PREPARE, rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING, rktapi.PodState_POD_STATE_GARBAGE:
state = kubecontainer.ContainerStateExited
default:
state = kubecontainer.ContainerStateUnknown
}

kubepod := &kubecontainer.Pod{
ID: types.UID(podUID),
Name: podName,
Namespace: podNamespace,
}
for _, app := range rktpod.Apps {
manifest := &appcschema.ImageManifest{}
err := json.Unmarshal(app.Image.Manifest, manifest)
if err != nil {
return nil, err
}
containerHashString, ok := manifest.Annotations.Get(k8sRktContainerHashAnno)
if !ok {
return nil, fmt.Errorf("app is missing annotation %s", k8sRktContainerHashAnno)
}
containerHash, err := strconv.ParseUint(containerHashString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse container's hash: %v", err)
}

kubepod.Containers = append(kubepod.Containers, &kubecontainer.Container{
ID: buildContainerID(&containerID{rktpod.Id, app.Name}),
Name: app.Name,
Image: app.Image.Name,
Hash: containerHash,
Created: podCreated,
State: state,
})
}

return kubepod, nil
}

// readServiceFile reads the service file and constructs the runtime pod and the rkt info.
func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
f, err := os.Open(serviceFilePath(serviceName))
Expand Down Expand Up @@ -770,34 +937,42 @@ func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktI
func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
glog.V(4).Infof("Rkt getting pods")

units, err := r.systemd.ListUnits()
listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
}
if !all {
listReq.Filter.States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't list pods: %v", err)
}

var pods []*kubecontainer.Pod
for _, u := range units {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) {
var state kubecontainer.ContainerState
switch {
case u.SubState == "running":
state = kubecontainer.ContainerStateRunning
default:
state = kubecontainer.ContainerStateExited
}
if !all && state != kubecontainer.ContainerStateRunning {
continue
}
pod, _, err := r.readServiceFile(u.Name)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
for _, c := range pod.Containers {
c.State = state
}
pods = append(pods, pod)
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
return nil, err
}

if inspectResp.Pod == nil {
return nil, fmt.Errorf("pod %s vanished?!", rktpod.Id)
}

pod, err := r.convertRktPod(*inspectResp.Pod)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
pods = append(pods, pod)
}
return pods, nil
}
Expand Down
Loading

0 comments on commit 6b8eb90

Please sign in to comment.