Skip to content

Commit

Permalink
rkt: rewrote GetPods to use rkt's api service
Browse files Browse the repository at this point in the history
This involved adding annotations to the rkt pod's manifest that contain
information about the kubernetes pod, which is later read by the
kubelet.
  • Loading branch information
Derek Gonyeo committed Dec 9, 2015
1 parent b7dc117 commit 5a16b47
Show file tree
Hide file tree
Showing 3 changed files with 512 additions and 59 deletions.
28 changes: 22 additions & 6 deletions pkg/kubelet/rkt/fake_rkt_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import (
// fakeRktInterface mocks the rktapi.PublicAPIClient interface for testing purpose.
type fakeRktInterface struct {
sync.Mutex
info rktapi.Info
images []*rktapi.Image
called []string
err error
info rktapi.Info
images []*rktapi.Image
podFilter *rktapi.PodFilter
pods []*rktapi.Pod
called []string
err error
}

func newFakeRktInterface() *fakeRktInterface {
Expand All @@ -55,11 +57,25 @@ func (f *fakeRktInterface) GetInfo(ctx context.Context, in *rktapi.GetInfoReques
}

func (f *fakeRktInterface) ListPods(ctx context.Context, in *rktapi.ListPodsRequest, opts ...grpc.CallOption) (*rktapi.ListPodsResponse, error) {
return nil, fmt.Errorf("Not implemented")
f.Lock()
defer f.Unlock()

f.called = append(f.called, "ListPods")
f.podFilter = in.Filter
return &rktapi.ListPodsResponse{f.pods}, f.err
}

func (f *fakeRktInterface) InspectPod(ctx context.Context, in *rktapi.InspectPodRequest, opts ...grpc.CallOption) (*rktapi.InspectPodResponse, error) {
return nil, fmt.Errorf("Not implemented")
f.Lock()
defer f.Unlock()

f.called = append(f.called, "InspectPod")
for _, pod := range f.pods {
if pod.Id == in.Id {
return &rktapi.InspectPodResponse{pod}, f.err
}
}
return &rktapi.InspectPodResponse{nil}, f.err
}

func (f *fakeRktInterface) ListImages(ctx context.Context, in *rktapi.ListImagesRequest, opts ...grpc.CallOption) (*rktapi.ListImagesResponse, error) {
Expand Down
281 changes: 228 additions & 53 deletions pkg/kubelet/rkt/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ const (
unitRktID = "RktID"
unitRestartCount = "RestartCount"

k8sRktKubeletAnno = "rkt.kubernetes.io/managed-by-kubelet"
k8sRktKubeletAnnoValue = "true"
k8sRktUIDAnno = "rkt.kubernetes.io/uid"
k8sRktNameAnno = "rkt.kubernetes.io/name"
k8sRktNamespaceAnno = "rkt.kubernetes.io/namespace"
//TODO: remove the creation time annotation once this is closed: https://github.com/coreos/rkt/issues/1789
k8sRktCreationTimeAnno = "rkt.kubernetes.io/created"
k8sRktContainerHashAnno = "rkt.kubernetes.io/containerhash"
k8sRktRestartCountAnno = "rkt.kubernetes.io/restartcount"

dockerPrefix = "docker://"

authDir = "auth.d"
Expand Down Expand Up @@ -415,50 +425,60 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
var globalPortMappings []kubecontainer.PortMapping
manifest := appcschema.BlankPodManifest()

for _, c := range pod.Spec.Containers {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, err
}
imgManifest, err := r.getImageManifest(c.Image)
listResp, err := r.apisvc.ListPods(context.Background(), &rktapi.ListPodsRequest{
Filter: kubernetesPodFilter(pod),
})
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}

restartCount := 0
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
return nil, err
glog.Warningf("rkt: error while inspecting pod %s", rktpod.Id)
continue
}

if imgManifest.App == nil {
imgManifest.App = new(appctypes.App)
if inspectResp.Pod == nil {
glog.Warningf("rkt: pod %s vanished?!", rktpod.Id)
continue
}

img, err := r.getImageByName(c.Image)
manifest := &appcschema.PodManifest{}
err = json.Unmarshal(inspectResp.Pod.Manifest, manifest)
if err != nil {
return nil, err
}
hash, err := appctypes.NewHash(img.ID)
if err != nil {
return nil, err
glog.Warningf("rkt: error unmatshaling pod manifest: %v", err)
continue
}

opts, err := r.generator.GenerateRunContainerOptions(pod, &c)
if err != nil {
return nil, err
if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
num, err := strconv.Atoi(countString)
if err != nil {
glog.Warningf("rkt: error reading restart count on pod: %v", err)
continue
}
if num+1 > restartCount {
restartCount = num + 1
}
}
}

globalPortMappings = append(globalPortMappings, opts.PortMappings...)

if err := setApp(imgManifest.App, &c, opts); err != nil {
return nil, err
}
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktKubeletAnno), k8sRktKubeletAnnoValue)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktUIDAnno), string(pod.UID))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNameAnno), pod.Name)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktNamespaceAnno), pod.Namespace)
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktCreationTimeAnno), strconv.FormatInt(time.Now().Unix(), 10))
manifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktRestartCountAnno), strconv.Itoa(restartCount))

name, err := appctypes.SanitizeACName(c.Name)
for _, c := range pod.Spec.Containers {
app, portMappings, err := r.newAppcRuntimeApp(pod, c, pullSecrets)
if err != nil {
return nil, err
}
appName := appctypes.MustACName(name)

manifest.Apps = append(manifest.Apps, appcschema.RuntimeApp{
Name: *appName,
Image: appcschema.RuntimeImage{ID: *hash},
App: imgManifest.App,
})
manifest.Apps = append(manifest.Apps, *app)
globalPortMappings = append(globalPortMappings, portMappings...)
}

volumeMap, ok := r.volumeGetter.GetVolumes(pod.UID)
Expand Down Expand Up @@ -495,6 +515,80 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
return manifest, nil
}

func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, c api.Container, pullSecrets []api.Secret) (*appcschema.RuntimeApp, []kubecontainer.PortMapping, error) {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, nil, err
}
imgManifest, err := r.getImageManifest(c.Image)
if err != nil {
return nil, nil, err
}

if imgManifest.App == nil {
imgManifest.App = new(appctypes.App)
}

img, err := r.getImageByName(c.Image)
if err != nil {
return nil, nil, err
}
hash, err := appctypes.NewHash(img.ID)
if err != nil {
return nil, nil, err
}

opts, err := r.generator.GenerateRunContainerOptions(pod, &c)
if err != nil {
return nil, nil, err
}

if err := setApp(imgManifest.App, &c, opts); err != nil {
return nil, nil, err
}

name, err := appctypes.SanitizeACName(c.Name)
if err != nil {
return nil, nil, err
}
appName := appctypes.MustACName(name)

kubehash := kubecontainer.HashContainer(&c)

return &appcschema.RuntimeApp{
Name: *appName,
Image: appcschema.RuntimeImage{ID: *hash},
App: imgManifest.App,
Annotations: []appctypes.Annotation{
{
Name: *appctypes.MustACIdentifier(k8sRktContainerHashAnno),
Value: strconv.FormatUint(kubehash, 10),
},
},
}, opts.PortMappings, nil
}

func kubernetesPodFilter(pod *api.Pod) *rktapi.PodFilter {
return &rktapi.PodFilter{
States: []rktapi.PodState{
//TODO: In the future some pods can remain running after some apps exit: https://github.com/appc/spec/pull/500
rktapi.PodState_POD_STATE_RUNNING,
rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING,
rktapi.PodState_POD_STATE_GARBAGE,
},
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
{
Key: k8sRktUIDAnno,
Value: string(pod.UID),
},
},
}
}

func newUnitOption(section, name, value string) *unit.UnitOption {
return &unit.UnitOption{Section: section, Name: name, Value: value}
}
Expand Down Expand Up @@ -719,6 +813,79 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
return nil
}

// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, error) {
manifest := &appcschema.PodManifest{}
err := json.Unmarshal(rktpod.Manifest, manifest)
if err != nil {
return nil, err
}

podUID, ok := manifest.Annotations.Get(k8sRktUIDAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktUIDAnno)
}
podName, ok := manifest.Annotations.Get(k8sRktNameAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNameAnno)
}
podNamespace, ok := manifest.Annotations.Get(k8sRktNamespaceAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktNamespaceAnno)
}
podCreatedString, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
return nil, fmt.Errorf("pod is missing annotation %s", k8sRktCreationTimeAnno)
}
podCreated, err := strconv.ParseInt(podCreatedString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse pod creation timestamp: %v", err)
}

var state kubecontainer.ContainerState
switch rktpod.State {
case rktapi.PodState_POD_STATE_RUNNING:
state = kubecontainer.ContainerStateRunning
case rktapi.PodState_POD_STATE_ABORTED_PREPARE, rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING, rktapi.PodState_POD_STATE_GARBAGE:
state = kubecontainer.ContainerStateExited
default:
state = kubecontainer.ContainerStateUnknown
}

kubepod := &kubecontainer.Pod{
ID: types.UID(podUID),
Name: podName,
Namespace: podNamespace,
}
for _, app := range rktpod.Apps {
manifest := &appcschema.ImageManifest{}
err := json.Unmarshal(app.Image.Manifest, manifest)
if err != nil {
return nil, err
}
containerHashString, ok := manifest.Annotations.Get(k8sRktContainerHashAnno)
if !ok {
return nil, fmt.Errorf("app is missing annotation %s", k8sRktContainerHashAnno)
}
containerHash, err := strconv.ParseUint(containerHashString, 10, 64)
if err != nil {
return nil, fmt.Errorf("couldn't parse container's hash: %v", err)
}

kubepod.Containers = append(kubepod.Containers, &kubecontainer.Container{
ID: buildContainerID(&containerID{rktpod.Id, app.Name}),
Name: app.Name,
Image: app.Image.Name,
Hash: containerHash,
Created: podCreated,
State: state,
})
}

return kubepod, nil
}

// readServiceFile reads the service file and constructs the runtime pod and the rkt info.
func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
f, err := os.Open(serviceFilePath(serviceName))
Expand Down Expand Up @@ -770,34 +937,42 @@ func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktI
func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
glog.V(4).Infof("Rkt getting pods")

units, err := r.systemd.ListUnits()
listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
Annotations: []*rktapi.KeyValue{
{
Key: k8sRktKubeletAnno,
Value: k8sRktKubeletAnnoValue,
},
},
},
}
if !all {
listReq.Filter.States = []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING}
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't list pods: %v", err)
}

var pods []*kubecontainer.Pod
for _, u := range units {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) {
var state kubecontainer.ContainerState
switch {
case u.SubState == "running":
state = kubecontainer.ContainerStateRunning
default:
state = kubecontainer.ContainerStateExited
}
if !all && state != kubecontainer.ContainerStateRunning {
continue
}
pod, _, err := r.readServiceFile(u.Name)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
for _, c := range pod.Containers {
c.State = state
}
pods = append(pods, pod)
for _, rktpod := range listResp.Pods {
//TODO: get the manifest from listresp.Pods when this gets merged: https://github.com/coreos/rkt/pull/1786
inspectResp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{rktpod.Id})
if err != nil {
return nil, err
}

if inspectResp.Pod == nil {
return nil, fmt.Errorf("pod %s vanished?!", rktpod.Id)
}

pod, err := r.convertRktPod(*inspectResp.Pod)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
pods = append(pods, pod)
}
return pods, nil
}
Expand Down
Loading

0 comments on commit 5a16b47

Please sign in to comment.