Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Periodically update pod status from kubelet. #5555

Merged
merged 1 commit into from
Mar 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func runServiceTest(client *client.Client) {
glog.Fatalf("Failed to create service: %v, %v", svc1, err)
}

// create an identical service in the default namespace
// create an identical service in the non-default namespace
svc3 := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "service1"},
Spec: api.ServiceSpec{
Expand Down
1 change: 1 addition & 0 deletions pkg/api/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ func ValidatePodStatusUpdate(newPod, oldPod *api.Pod) errs.ValidationErrorList {
allErrs = append(allErrs, errs.NewFieldInvalid("status.host", newPod.Status.Host, "pod host cannot be changed directly"))
}

// For status update we ignore changes to pod spec.
newPod.Spec = oldPod.Spec

return allErrs
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/fake_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,8 @@ func (c *FakePods) Bind(bind *api.Binding) error {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name})
return nil
}

func (c *FakePods) UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-status-pod", Value: name})
return &api.Pod{}, nil
}
21 changes: 17 additions & 4 deletions pkg/client/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type PodInterface interface {
Update(pod *api.Pod) (*api.Pod, error)
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
Bind(binding *api.Binding) error
UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error)
}

// pods implements PodsNamespacer interface
Expand All @@ -63,7 +64,7 @@ func (c *pods) List(selector labels.Selector) (result *api.PodList, err error) {
return
}

// GetPod takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
// Get takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
func (c *pods) Get(name string) (result *api.Pod, err error) {
if len(name) == 0 {
return nil, errors.New("name is required parameter to Get")
Expand All @@ -74,19 +75,19 @@ func (c *pods) Get(name string) (result *api.Pod, err error) {
return
}

// DeletePod takes the name of the pod, and returns an error if one occurs
// Delete takes the name of the pod, and returns an error if one occurs
func (c *pods) Delete(name string) error {
return c.r.Delete().Namespace(c.ns).Resource("pods").Name(name).Do().Error()
}

// CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs.
// Create takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs.
func (c *pods) Create(pod *api.Pod) (result *api.Pod, err error) {
result = &api.Pod{}
err = c.r.Post().Namespace(c.ns).Resource("pods").Body(pod).Do().Into(result)
return
}

// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs.
// Update takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs.
func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) {
result = &api.Pod{}
if len(pod.ResourceVersion) == 0 {
Expand All @@ -113,3 +114,15 @@ func (c *pods) Watch(label labels.Selector, field fields.Selector, resourceVersi
func (c *pods) Bind(binding *api.Binding) error {
return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error()
}

// UpdateStatus takes the name of the pod and the new status. Returns the server's representation of the pod, and an error, if it occurs.
func (c *pods) UpdateStatus(name string, newStatus *api.PodStatus) (result *api.Pod, err error) {
result = &api.Pod{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the right implementation - a client should be required to send in an api.Pod to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this is that you want to be able to force clients to deal with specifying a resource version, and this doesn't allow it (it's basically an overwrite).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to submit a pull that deals with this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, thanks @smarterclayton

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@smarterclayton, could you hold off on the PR to fix this? UpdateStatus needs to update the status of the mirror pods too. We'll need keep the mirror pods objects and use them here. I am working on a pod manager refactoring which will take care of this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wondered why the mirror status test is failing. I'll wait until after your change to merge #5738. Note my pull request changes SimpleRunKubelet so we can dial down the sync selector in the integration test.

----- Original Message -----

@@ -113,3 +114,15 @@ func (c *pods) Watch(label labels.Selector, field
fields.Selector, resourceVersi
func (c *pods) Bind(binding *api.Binding) error {
return
c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error()
}
+
+// UpdateStatus takes the name of the pod and the new status. Returns the
server's representation of the pod, and an error, if it occurs.
+func (c *pods) UpdateStatus(name string, newStatus *api.PodStatus) (result
*api.Pod, err error) {

  • result = &api.Pod{}

@smarterclayton, could you hold off on the PR to fix this? UpdateStatus needs
to update the status of the mirror pods too. We'll need keep the mirror pods
objects and use them here. I am working on a pod manager refactoring which
will take care of this.


Reply to this email directly or view it on GitHub:
https://github.com/GoogleCloudPlatform/kubernetes/pull/5555/files#r26880558

pod, err := c.Get(name)
if err != nil {
return
}
pod.Status = *newStatus
err = c.r.Put().Namespace(c.ns).Resource("pods").Name(pod.Name).SubResource("status").Body(pod).Do().Into(result)
return
}
2 changes: 1 addition & 1 deletion pkg/kubelet/config/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewSourceApiserver(client *client.Client, hostname string, updates chan<- i
newSourceApiserverFromLW(lw, updates)
}

// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver.
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []api.Pod
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/dockertools/fake_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf
Running: true,
Pid: 42,
},
NetworkSettings: &docker.NetworkSettings{IPAddress: "1.2.3.4"},
}
return f.Err
}
Expand Down
63 changes: 48 additions & 15 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ type Kubelet struct {
// the EventRecorder to use
recorder record.EventRecorder

// A pod status cache currently used to store rejected pods and their statuses.
// A pod status cache stores statuses for pods (both rejected and synced).
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus

Expand Down Expand Up @@ -487,6 +487,7 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
glog.Warning("No api server defined - no node status update will be sent.")
}
go kl.syncNodeStatus()
go util.Forever(kl.syncStatus, kl.resyncInterval)
kl.syncLoop(updates, kl)
}

Expand Down Expand Up @@ -1265,6 +1266,17 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, c
func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uid := pod.UID

// Before returning, regenerate status and store it in the cache.
defer func() {
status, err := kl.generatePodStatus(podFullName, uid)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
} else {
kl.setPodStatusInCache(podFullName, status)
}
}()

containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, containersInPod)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil {
Expand Down Expand Up @@ -1633,17 +1645,33 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}

pods, mirrorPods, err := kl.GetPods()
if err != nil {
glog.Errorf("Failed to get bound pods.")
return
}
pods, mirrorPods := kl.GetPods()
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
glog.Errorf("Couldn't sync containers: %v", err)
}
}
}

// syncStatus syncs pods statuses with the apiserver.
func (kl *Kubelet) syncStatus() {
glog.V(3).Infof("Syncing pods status")

pods, _ := kl.GetPods()
for _, pod := range pods {
status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID)
if err != nil {
glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err)
continue
}
_, err = kl.kubeClient.Pods(pod.Namespace).UpdateStatus(pod.Name, &status)
if err != nil {
glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", pod.Name, err, pod)
} else {
glog.V(3).Infof("Status for pod %q updated successfully: %s", pod.Name, pod)
}
}
}

// Update the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
Expand Down Expand Up @@ -1753,10 +1781,10 @@ func (kl *Kubelet) GetHostname() string {

// GetPods returns all pods bound to the kubelet and their spec, and the mirror
// pod map.
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet, error) {
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet) {
kl.podLock.RLock()
defer kl.podLock.RUnlock()
return append([]api.Pod{}, kl.pods...), kl.mirrorPods, nil
return append([]api.Pod{}, kl.pods...), kl.mirrorPods
}

// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.
Expand Down Expand Up @@ -1934,19 +1962,23 @@ func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found {
glog.V(3).Infof("Returning cached status for %s", podFullName)
return cachedPodStatus, nil
}
return kl.generatePodStatus(podFullName, uid)
}

func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %s", podFullName)
var podStatus api.PodStatus
spec, found := kl.GetPodByFullName(podFullName)

if !found {
return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
}

// Check to see if the pod has been rejected.
mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName)
if ok {
return mappedPodStatus, nil
}

info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)

if err != nil {
Expand Down Expand Up @@ -1976,6 +2008,7 @@ func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatu
if found {
podStatus.PodIP = netContainerInfo.PodIP
}
podStatus.Host = kl.hostname

return podStatus, nil
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container"})
}

func TestSyncPodsWithTerminationLog(t *testing.T) {
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
}
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})

fakeDocker.Lock()
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
Expand Down Expand Up @@ -533,7 +533,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})

fakeDocker.Lock()

Expand Down Expand Up @@ -586,7 +586,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})

fakeDocker.Lock()

Expand Down Expand Up @@ -636,7 +636,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"})

fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
Expand Down Expand Up @@ -693,7 +693,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"})

fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
Expand Down Expand Up @@ -762,7 +762,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start"})
"list", "list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})

// A map iteration is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -900,7 +900,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

verifyCalls(t, fakeDocker, []string{"list", "stop"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "list"})
// Expect one of the duplicates to be killed.
if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
Expand Down Expand Up @@ -942,7 +942,7 @@ func TestSyncPodBadHash(t *testing.T) {
}

//verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})

// A map interation is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -995,7 +995,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list", "inspect_container"})

// A map interation is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -1685,7 +1685,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop"})
verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop", "list"})

if len(fakeDocker.Stopped) != 1 {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
type syncPodFnType func(*api.Pod, bool, dockertools.DockerContainers) error

type podWorkers struct {
// Protects podUpdates field.
// Protects all per worker fields.
podLock sync.Mutex

// Tracks all running per-pod goroutines - per-pod goroutine will be
Expand Down
8 changes: 2 additions & 6 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type HostInterface interface {
GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
GetDockerVersion() ([]uint, error)
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
GetPods() ([]api.Pod, util.StringSet, error)
GetPods() ([]api.Pod, util.StringSet)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodStatus(name string, uid types.UID) (api.PodStatus, error)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
Expand Down Expand Up @@ -261,11 +261,7 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {

// handlePods returns a list of pod bound to the Kubelet and their spec
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
pods, _, err := s.host.GetPods()
if err != nil {
s.error(w, err)
return
}
pods, _ := s.host.GetPods()
podList := &api.PodList{
Items: pods,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type fakeKubelet struct {
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
machineInfoFunc func() (*cadvisorApi.MachineInfo, error)
podsFunc func() ([]api.Pod, util.StringSet, error)
podsFunc func() ([]api.Pod, util.StringSet)
logFunc func(w http.ResponseWriter, req *http.Request)
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
dockerVersionFunc func() ([]uint, error)
Expand Down Expand Up @@ -80,7 +80,7 @@ func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
return fk.machineInfoFunc()
}

func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet, error) {
func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet) {
return fk.podsFunc()
}

Expand Down
9 changes: 4 additions & 5 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,13 +378,12 @@ func (m *Master) init(c *Config) {
nodeStorageClient.Nodes(),
podRegistry,
)

if c.SyncPodStatus {
go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout)
go util.Forever(podCache.UpdateAllContainers, m.cacheTimeout)
go util.Forever(podCache.GarbageCollectPodStatus, time.Minute*30)
podStorage = podStorage.WithPodStatus(podCache)
}
go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)

// TODO: refactor podCache to sit on top of podStorage via status calls
podStorage = podStorage.WithPodStatus(podCache)

// TODO: Factor out the core API registration
m.storage = map[string]apiserver.RESTStorage{
Expand Down