Skip to content

Commit

Permalink
Merge pull request kubernetes#5542 from GoogleCloudPlatform/revert-52…
Browse files Browse the repository at this point in the history
…05-sync_pod_status

Revert "Periodically update pod status from kubelet."
  • Loading branch information
wojtek-t committed Mar 17, 2015
2 parents c04ceec + 18b728f commit 5c99bc9
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 115 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func runServiceTest(client *client.Client) {
glog.Fatalf("Failed to create service: %v, %v", svc1, err)
}

// create an identical service in the non-default namespace
// create an identical service in the default namespace
svc3 := &api.Service{
ObjectMeta: api.ObjectMeta{Name: "service1"},
Spec: api.ServiceSpec{
Expand Down
1 change: 0 additions & 1 deletion pkg/api/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,6 @@ func ValidatePodStatusUpdate(newPod, oldPod *api.Pod) errs.ValidationErrorList {
allErrs = append(allErrs, errs.NewFieldInvalid("status.host", newPod.Status.Host, "pod host cannot be changed directly"))
}

// For status update we ignore changes to pod spec.
newPod.Spec = oldPod.Spec

return allErrs
Expand Down
5 changes: 0 additions & 5 deletions pkg/client/fake_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,3 @@ func (c *FakePods) Bind(bind *api.Binding) error {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name})
return nil
}

func (c *FakePods) UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-status-pod", Value: name})
return &api.Pod{}, nil
}
21 changes: 4 additions & 17 deletions pkg/client/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type PodInterface interface {
Update(pod *api.Pod) (*api.Pod, error)
Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
Bind(binding *api.Binding) error
UpdateStatus(name string, status *api.PodStatus) (*api.Pod, error)
}

// pods implements PodsNamespacer interface
Expand All @@ -63,7 +62,7 @@ func (c *pods) List(selector labels.Selector) (result *api.PodList, err error) {
return
}

// Get takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
// GetPod takes the name of the pod, and returns the corresponding Pod object, and an error if it occurs
func (c *pods) Get(name string) (result *api.Pod, err error) {
if len(name) == 0 {
return nil, errors.New("name is required parameter to Get")
Expand All @@ -74,19 +73,19 @@ func (c *pods) Get(name string) (result *api.Pod, err error) {
return
}

// Delete takes the name of the pod, and returns an error if one occurs
// DeletePod takes the name of the pod, and returns an error if one occurs
func (c *pods) Delete(name string) error {
return c.r.Delete().Namespace(c.ns).Resource("pods").Name(name).Do().Error()
}

// Create takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs.
// CreatePod takes the representation of a pod. Returns the server's representation of the pod, and an error, if it occurs.
func (c *pods) Create(pod *api.Pod) (result *api.Pod, err error) {
result = &api.Pod{}
err = c.r.Post().Namespace(c.ns).Resource("pods").Body(pod).Do().Into(result)
return
}

// Update takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs.
// UpdatePod takes the representation of a pod to update. Returns the server's representation of the pod, and an error, if it occurs.
func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) {
result = &api.Pod{}
if len(pod.ResourceVersion) == 0 {
Expand All @@ -113,15 +112,3 @@ func (c *pods) Watch(label, field labels.Selector, resourceVersion string) (watc
func (c *pods) Bind(binding *api.Binding) error {
return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error()
}

// UpdateStatus takes the name of the pod and the new status. Returns the server's representation of the pod, and an error, if it occurs.
func (c *pods) UpdateStatus(name string, newStatus *api.PodStatus) (result *api.Pod, err error) {
result = &api.Pod{}
pod, err := c.Get(name)
if err != nil {
return
}
pod.Status = *newStatus
err = c.r.Put().Namespace(c.ns).Resource("pods").Name(pod.Name).SubResource("status").Body(pod).Do().Into(result)
return
}
2 changes: 1 addition & 1 deletion pkg/kubelet/config/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewSourceApiserver(client *client.Client, hostname string, updates chan<- i
newSourceApiserverFromLW(lw, updates)
}

// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
// newSourceApiserverFromLW holds creates a config source that watches an pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []api.Pod
Expand Down
1 change: 0 additions & 1 deletion pkg/kubelet/dockertools/fake_docker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (f *FakeDockerClient) StartContainer(id string, hostConfig *docker.HostConf
Running: true,
Pid: 42,
},
NetworkSettings: &docker.NetworkSettings{IPAddress: "1.2.3.4"},
}
return f.Err
}
Expand Down
67 changes: 8 additions & 59 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ type Kubelet struct {
// the EventRecorder to use
recorder record.EventRecorder

// A pod status cache stores statuses for pods (both rejected and synced).
// A pod status cache currently used to store rejected pods and their statuses.
podStatusesLock sync.RWMutex
podStatuses map[string]api.PodStatus
}
Expand Down Expand Up @@ -568,7 +568,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
glog.Warning("No api server defined - no node status update will be sent.")
}
go kl.syncNodeStatus()
go util.Forever(kl.syncStatus, kl.resyncInterval)
kl.syncLoop(updates, kl)
}

Expand Down Expand Up @@ -1347,17 +1346,6 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dock
func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerContainers) error {
podFullName := GetPodFullName(pod)
uid := pod.UID

// Before returning, regenerate status and store it in the cache.
defer func() {
status, err := kl.generatePodStatus(podFullName, uid)
if err != nil {
glog.Errorf("Unable to generate status for pod with name %q and uid %q info with error(%v)", podFullName, uid, err)
} else {
kl.setPodStatusInCache(podFullName, status)
}
}()

containerChanges, err := kl.computePodContainerChanges(pod, containersInPod)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil {
Expand Down Expand Up @@ -1727,40 +1715,6 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
}
}

// syncStatus syncs pods statuses with the apiserver.
func (kl *Kubelet) syncStatus() {
glog.V(3).Infof("Syncing pods status")

statuses := make(map[string]api.PodStatus)
func() {
kl.podLock.Lock()
defer kl.podLock.Unlock()
for _, pod := range kl.pods {
source := pod.Annotations[ConfigSourceAnnotationKey]
if source != ApiserverSource {
glog.V(3).Infof("Pod status for %q is not updated due to its source %s", pod.Name, source)
continue
}
status, err := kl.GetPodStatus(GetPodFullName(&pod), pod.UID)
if err != nil {
glog.Warningf("Error getting pod %q status: %v, retry later", pod.Name, err)
continue
}
statuses[GetPodFullName(&pod)] = status
}
}()

for podFullName, status := range statuses {
name, namespace := ParsePodFullName(podFullName)
pod, err := kl.kubeClient.Pods(namespace).UpdateStatus(name, &status)
if err != nil {
glog.Warningf("Error updating status for pod %s: %v (full pod: %s)", name, err, pod)
} else {
glog.V(3).Infof("Status for pod %q updated successfully: %s", name, pod)
}
}
}

// Updated the Kubelet's internal pods with those provided by the update.
// Records new and updated pods in newPods and updatedPods.
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
Expand Down Expand Up @@ -2046,23 +2000,19 @@ func (kl *Kubelet) GetPodByFullName(podFullName string) (*api.PodSpec, bool) {

// GetPodStatus returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
// Check to see if we have a cached version of the status.
cachedPodStatus, found := kl.getPodStatusFromCache(podFullName)
if found {
glog.V(3).Infof("Returning cached status for %s", podFullName)
return cachedPodStatus, nil
}
return kl.generatePodStatus(podFullName, uid)
}

func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %s", podFullName)
var podStatus api.PodStatus
spec, found := kl.GetPodByFullName(podFullName)

if !found {
return podStatus, fmt.Errorf("Couldn't find spec for pod %s", podFullName)
}

// Check to see if the pod has been rejected.
mappedPodStatus, ok := kl.getPodStatusFromCache(podFullName)
if ok {
return mappedPodStatus, nil
}

info, err := dockertools.GetDockerPodInfo(kl.dockerClient, *spec, podFullName, uid)

if err != nil {
Expand Down Expand Up @@ -2092,7 +2042,6 @@ func (kl *Kubelet) generatePodStatus(podFullName string, uid types.UID) (api.Pod
if found {
podStatus.PodIP = netContainerInfo.PodIP
}
podStatus.Host = kl.hostname

return podStatus, nil
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "list", "list", "inspect_container", "inspect_container"})
}

func TestSyncPodsWithTerminationLog(t *testing.T) {
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
}
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})

fakeDocker.Lock()
parts := strings.Split(fakeDocker.Container.HostConfig.Binds[0], ":")
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})

fakeDocker.Lock()

Expand Down Expand Up @@ -584,7 +584,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
"list", "list", "list", "create", "start", "inspect_container", "create", "start"})

fakeDocker.Lock()

Expand Down Expand Up @@ -634,7 +634,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"})
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})

fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
Expand Down Expand Up @@ -691,7 +691,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start", "list", "inspect_container", "inspect_container"})
"list", "list", "list", "inspect_container", "inspect_image", "list", "create", "start"})

fakeDocker.Lock()
if len(fakeDocker.Created) != 1 ||
Expand Down Expand Up @@ -760,7 +760,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
waitGroup.Wait()

verifyCalls(t, fakeDocker, []string{
"list", "list", "list", "list", "inspect_container", "inspect_container", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
"list", "list", "list", "list", "inspect_container", "inspect_container", "stop", "create", "start", "inspect_container", "create", "start"})

// A map iteration is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -898,7 +898,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

verifyCalls(t, fakeDocker, []string{"list", "stop", "list"})
verifyCalls(t, fakeDocker, []string{"list", "stop"})
// Expect one of the duplicates to be killed.
if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
Expand Down Expand Up @@ -940,7 +940,7 @@ func TestSyncPodBadHash(t *testing.T) {
}

//verifyCalls(t, fakeDocker, []string{"list", "stop", "list", "create", "start", "stop", "create", "start", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start", "list", "inspect_container", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "create", "start", "inspect_container", "create", "start"})

// A map interation is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -993,7 +993,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start", "list", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "create", "start"})

// A map interation is used to delete containers, so must not depend on
// order here.
Expand Down Expand Up @@ -1683,7 +1683,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop", "list"})
verifyCalls(t, fakeDocker, []string{"list", "list", "create", "start", "stop"})

if len(fakeDocker.Stopped) != 1 {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.Stopped)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/pod_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
type syncPodFnType func(*api.Pod, dockertools.DockerContainers) error

type podWorkers struct {
// Protects all per worker fields.
// Protects podUpdates field.
podLock sync.Mutex

// Tracks all running per-pod goroutines - per-pod goroutine will be
Expand Down
8 changes: 0 additions & 8 deletions pkg/kubelet/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package kubelet

import (
"fmt"
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
)
Expand Down Expand Up @@ -69,19 +68,12 @@ type PodUpdate struct {
}

// GetPodFullName returns a name that uniquely identifies a pod across all config sources.
// NOTE: If changed ParsePodFullName must be also updated.
func GetPodFullName(pod *api.Pod) string {
// Use underscore as the delimiter because it is not allowed in pod name
// (DNS subdomain format), while allowed in the container name format.
return fmt.Sprintf("%s_%s", pod.Name, pod.Namespace)
}

// ParsePodFullName parses full name generated by GetPodFullName and returns parts of it.
func ParsePodFullName(podFullName string) (name, namespace string) {
nameParts := strings.Split(podFullName, "_")
return nameParts[0], nameParts[1]
}

// Build the pod full name from pod name and namespace.
func BuildPodFullName(name, namespace string) string {
return name + "_" + namespace
Expand Down
21 changes: 11 additions & 10 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,19 +371,20 @@ func (m *Master) init(c *Config) {
m.nodeRegistry = registry

nodeStorage := minion.NewREST(m.nodeRegistry)
// TODO: unify the storage -> registry and storage -> client patterns
nodeStorageClient := RESTStorageToNodes(nodeStorage)
podCache := NewPodCache(
c.KubeletClient,
nodeStorageClient.Nodes(),
podRegistry,
)
if c.SyncPodStatus {
// TODO: unify the storage -> registry and storage -> client patterns
nodeStorageClient := RESTStorageToNodes(nodeStorage)

podCache := NewPodCache(
c.KubeletClient,
nodeStorageClient.Nodes(),
podRegistry,
)
go util.Forever(func() { podCache.UpdateAllContainers() }, m.cacheTimeout)
go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)
podStorage = podStorage.WithPodStatus(podCache)
}
go util.Forever(func() { podCache.GarbageCollectPodStatus() }, time.Minute*30)

// TODO: refactor podCache to sit on top of podStorage via status calls
podStorage = podStorage.WithPodStatus(podCache)

// TODO: Factor out the core API registration
m.storage = map[string]apiserver.RESTStorage{
Expand Down

0 comments on commit 5c99bc9

Please sign in to comment.