From 1a352b74bae91e582f6b24a22d369e46c7047263 Mon Sep 17 00:00:00 2001 From: Filip Grzadkowski Date: Mon, 9 Mar 2015 15:23:52 +0100 Subject: [PATCH] Periodically update pod status from kubelet. --- cmd/integration/integration.go | 2 +- pkg/api/validation/validation.go | 1 + pkg/client/fake_pods.go | 5 ++ pkg/client/pods.go | 21 +++++-- pkg/kubelet/config/apiserver.go | 2 +- pkg/kubelet/dockertools/fake_docker_client.go | 1 + pkg/kubelet/kubelet.go | 63 ++++++++++++++----- pkg/kubelet/kubelet_test.go | 22 +++---- pkg/kubelet/pod_workers.go | 2 +- pkg/kubelet/server.go | 8 +-- pkg/kubelet/server_test.go | 4 +- pkg/master/master.go | 9 ++- 12 files changed, 94 insertions(+), 46 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 3b5cdce42341e..6fa38abc6b9f7 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -716,7 +716,7 @@ func runServiceTest(client *client.Client) { glog.Fatalf("Failed to create service: %v, %v", svc1, err) } - // create an identical service in the default namespace + // create an identical service in the non-default namespace svc3 := &api.Service{ ObjectMeta: api.ObjectMeta{Name: "service1"}, Spec: api.ServiceSpec{ diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 968e58f216b4f..eb9c83c9373e9 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -691,6 +691,7 @@ func ValidatePodStatusUpdate(newPod, oldPod *api.Pod) errs.ValidationErrorList { allErrs = append(allErrs, errs.NewFieldInvalid("status.host", newPod.Status.Host, "pod host cannot be changed directly")) } + // For status update we ignore changes to pod spec. newPod.Spec = oldPod.Spec return allErrs diff --git a/pkg/client/fake_pods.go b/pkg/client/fake_pods.go index ae5137bdcdbbb..970904cca2729 100644 --- a/pkg/client/fake_pods.go +++ b/pkg/client/fake_pods.go @@ -64,3 +64,8 @@ func (c *FakePods) Bind(bind *api.Binding) error { c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name}) return nil } + +func (c *FakePods) UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error) { + c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-status-pod", Value: name}) + return &api.Pod{}, nil +} diff --git a/pkg/client/pods.go b/pkg/client/pods.go index b9ce669ffb345..fc50cbc65f74d 100644 --- a/pkg/client/pods.go +++ b/pkg/client/pods.go @@ -40,6 +40,7 @@ type PodInterface interface { Update(pod *api.Pod) (*api.Pod, error) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) Bind(binding *api.Binding) error + UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error) } // pods implements PodsNamespacer interface @@ -63,7 +64,7 @@ func (c *pods) List(selector labels.Selector) (result *api.PodList, err error) { return } -// GetPod takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs +// Get takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs func (c *pods) Get(name string) (result *api.Pod, err error) { if len(name) == 0 { return nil, errors.New("name is required parameter to Get") @@ -74,19 +75,19 @@ func (c *pods) Get(name string) (result *api.Pod, err error) { return } -// DeletePod takes the name of the pod, and returns an error if one occurs +// Delete takes the name of the pod, and returns an error if one occurs func (c *pods) Delete(name string) error { return c.r.Delete().Namespace(c.ns).Resource("pods").Name(name).Do().Error() } -// CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs. +// Create takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs. func (c *pods) Create(pod *api.Pod) (result *api.Pod, err error) { result = &api.Pod{} err = c.r.Post().Namespace(c.ns).Resource("pods").Body(pod).Do().Into(result) return } -// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs. +// Update takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs. func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) { result = &api.Pod{} if len(pod.ResourceVersion) == 0 { @@ -113,3 +114,15 @@ func (c *pods) Watch(label labels.Selector, field fields.Selector, resourceVersi func (c *pods) Bind(binding *api.Binding) error { return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error() } + +// UpdateStatus takes the name of the pod and the new status. Returns the server's representation of the pod, and an error, if it occurs. +func (c *pods) UpdateStatus(name string, newStatus *api.PodStatus) (result *api.Pod, err error) { + result = &api.Pod{} + pod, err := c.Get(name) + if err != nil { + return + } + pod.Status = *newStatus + err = c.r.Put().Namespace(c.ns).Resource("pods").Name(pod.Name).SubResource("status").Body(pod).Do().Into(result) + return +} diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index ae3e325bc020a..3710e525caf83 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -31,7 +31,7 @@ func NewSourceApiserver(client *client.Client, hostname string, updates chan<- i newSourceApiserverFromLW(lw, updates) } -// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver. +// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver. func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) { send := func(objs []interface{}) { var pods []api.Pod diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index 01d14cb250386..35da4c01527e0 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -126,6 +126,7 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf Running: true, Pid: 42, }, + NetworkSettings: &docker.NetworkSettings{IPAddress: "1.2.3.4"}, } return f.Err } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 10f36e8e6fedf..1de26bc71743d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -293,7 +293,7 @@ type Kubelet struct { // the EventRecorder to use recorder record.EventRecorder - // A pod status cache currently used to store rejected pods and their statuses. + // A pod status cache stores statuses for pods (both rejected and synced). podStatusesLock sync.RWMutex podStatuses map[string]api.PodStatus @@ -487,6 +487,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { glog.Warning("No api server defined - no node status update will be sent.") } go kl.syncNodeStatus() + go util.Forever(kl.syncStatus, kl.resyncInterval) kl.syncLoop(updates, kl) } @@ -1265,6 +1266,17 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) error { podFullName := GetPodFullName(pod) uid := pod.UID + + // Before returning, regenerate status and store it in the cache. + defer func() { + status, err := kl.generatePodStatus(podFullName, uid) + if err != nil { + glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err) + } else { + kl.setPodStatusInCache(podFullName, status) + } + }() + containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, containersInPod) glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) if err != nil { @@ -1633,17 +1645,33 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) { } } - pods, mirrorPods, err := kl.GetPods() - if err != nil { - glog.Errorf("Failed to get bound pods.") - return - } + pods, mirrorPods := kl.GetPods() if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil { glog.Errorf("Couldn't sync containers: %v", err) } } } +// syncStatus syncs pods statuses with the apiserver. +func (kl *Kubelet) syncStatus() { + glog.V(3).Infof("Syncing pods status") + + pods, _ := kl.GetPods() + for _, pod := range pods { + status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID) + if err != nil { + glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err) + continue + } + _, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status) + if err != nil { + glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", pod.Name, err, pod) + } else { + glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod) + } + } +} + // Update the Kubelet's internal pods with those provided by the update. // Records new and updated pods in newPods and updatedPods. func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) { @@ -1753,10 +1781,10 @@ func (kl *Kubelet) GetHostname() string { // GetPods returns all pods bound to the kubelet and their spec, and the mirror // pod map. -func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet, error) { +func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) { kl.podLock.RLock() defer kl.podLock.RUnlock() - return append([]api.Pod{}, kl.pods...), kl.mirrorPods, nil + return append([]api.Pod{}, kl.pods...), kl.mirrorPods } // GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found. @@ -1934,19 +1962,23 @@ func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) { // GetPodStatus returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { + // Check to see if we have a cached version of the status. + cachedPodStatus, found := kl.getPodStatusFromCache(podFullName) + if found { + glog.V(3).Infof("Returning cached status for %s", podFullName) + return cachedPodStatus, nil + } + return kl.generatePodStatus(podFullName, uid) +} + +func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) { + glog.V(3).Infof("Generating status for %s", podFullName) var podStatus api.PodStatus spec, found := kl.GetPodByFullName(podFullName) - if !found { return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName) } - // Check to see if the pod has been rejected. - mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName) - if ok { - return mappedPodStatus, nil - } - info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid) if err != nil { @@ -1976,6 +2008,7 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu if found { podStatus.PodIP = netContainerInfo.PodIP } + podStatus.Host = kl.hostname return podStatus, nil } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index ed26deedaa21e..7ddf9a9adbbc2 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -449,7 +449,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { t.Errorf("unexpected error: %v", err) } waitGroup.Wait() - verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"}) + verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container"}) } func TestSyncPodsWithTerminationLog(t *testing.T) { @@ -483,7 +483,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) { } waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":") @@ -533,7 +533,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() @@ -586,7 +586,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "create", "start", "inspect_container", "create", "start"}) + "list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() @@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -693,7 +693,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"}) + "list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"}) fakeDocker.Lock() if len(fakeDocker.Created) != 1 || @@ -762,7 +762,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) { waitGroup.Wait() verifyCalls(t, fakeDocker, []string{ - "list", "list", "list", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start"}) + "list", "list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) // A map iteration is used to delete containers, so must not depend on // order here. @@ -900,7 +900,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "stop"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "list"}) // Expect one of the duplicates to be killed. if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") { t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) @@ -942,7 +942,7 @@ func TestSyncPodBadHash(t *testing.T) { } //verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"}) - verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -995,7 +995,7 @@ func TestSyncPodUnhealthy(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"}) + verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list", "inspect_container"}) // A map interation is used to delete containers, so must not depend on // order here. @@ -1685,7 +1685,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { t.Errorf("unexpected error: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop"}) + verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop", "list"}) if len(fakeDocker.Stopped) != 1 { t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped) diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 27e534c65df53..2dfe6ab34f01c 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -31,7 +31,7 @@ import ( type syncPodFnType func(*api.Pod, bool, dockertools.DockerContainers) error type podWorkers struct { - // Protects podUpdates field. + // Protects all per worker fields. podLock sync.Mutex // Tracks all running per-pod goroutines - per-pod goroutine will be diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index b494e870f1e78..bdc66418ced09 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -85,7 +85,7 @@ type HostInterface interface { GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) GetDockerVersion() ([]uint, error) GetMachineInfo() (*cadvisorApi.MachineInfo, error) - GetPods() ([]api.Pod, util.StringSet, error) + GetPods() ([]api.Pod, util.StringSet) GetPodByName(namespace, name string) (*api.Pod, bool) GetPodStatus(name string, uid types.UID) (api.PodStatus, error) RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error) @@ -261,11 +261,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { // handlePods returns a list of pod bound to the Kubelet and their spec func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) { - pods, _, err := s.host.GetPods() - if err != nil { - s.error(w, err) - return - } + pods, _ := s.host.GetPods() podList := &api.PodList{ Items: pods, } diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 99d58b1a69d3c..3e2fbe408b442 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -45,7 +45,7 @@ type fakeKubelet struct { containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error) machineInfoFunc func() (*cadvisorApi.MachineInfo, error) - podsFunc func() ([]api.Pod, util.StringSet, error) + podsFunc func() ([]api.Pod, util.StringSet) logFunc func(w http.ResponseWriter, req *http.Request) runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) dockerVersionFunc func() ([]uint, error) @@ -80,7 +80,7 @@ func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) { return fk.machineInfoFunc() } -func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet, error) { +func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet) { return fk.podsFunc() } diff --git a/pkg/master/master.go b/pkg/master/master.go index 31046ec65f62f..2097b53f9ee68 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -378,13 +378,12 @@ func (m *Master) init(c *Config) { nodeStorageClient.Nodes(), podRegistry, ) + if c.SyncPodStatus { - go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout) + go util.Forever(podCache.UpdateAllContainers, m.cacheTimeout) + go util.Forever(podCache.GarbageCollectPodStatus, time.Minute*30) + podStorage = podStorage.WithPodStatus(podCache) } - go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30) - - // TODO: refactor podCache to sit on top of podStorage via status calls - podStorage = podStorage.WithPodStatus(podCache) // TODO: Factor out the core API registration m.storage = map[string]apiserver.RESTStorage{