Skip to content

Commit

Permalink
Sync replication count with the api server on pod creation/deletion.
Browse files Browse the repository at this point in the history
  • Loading branch information
bprashanth committed Mar 11, 2015
1 parent 1231e65 commit 28d9260
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 129 deletions.
5 changes: 2 additions & 3 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,8 @@ func startComponents(manifestURL string) (apiServerURL string) {

controllerManager := replicationControllerPkg.NewReplicationManager(cl)

// Prove that controllerManager's watch works by making it not sync until after this
// test is over. (Hopefully we don't take 10 minutes!)
controllerManager.Run(10 * time.Minute)
// TODO: Write an integration test for the replication controllers watch.
controllerManager.Run(1 * time.Second)

nodeResources := &api.NodeResources{}

Expand Down
10 changes: 9 additions & 1 deletion pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati
return err
}
filteredList := FilterActivePods(podList.Items)
diff := len(filteredList) - controller.Spec.Replicas
activePods := len(filteredList)
diff := activePods - controller.Spec.Replicas
if diff < 0 {
diff *= -1
wait := sync.WaitGroup{}
Expand All @@ -221,6 +222,13 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati
}
wait.Wait()
}
if controller.Status.Replicas != activePods {
controller.Status.Replicas = activePods
_, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller)
if err != nil {
return err
}
}
return nil
}

Expand Down
215 changes: 138 additions & 77 deletions pkg/controller/replication_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (f *FakePodControl) deletePod(namespace string, podName string) error {

func newReplicationController(replicas int) api.ReplicationController {
return api.ReplicationController{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "default", ResourceVersion: "18"},
Spec: api.ReplicationControllerSpec{
Replicas: replicas,
Template: &api.PodTemplateSpec{
Expand All @@ -81,8 +83,14 @@ func newReplicationController(replicas int) api.ReplicationController {
Containers: []api.Container{
{
Image: "foo/bar",
TerminationMessagePath: api.TerminationMessagePathDefault,
ImagePullPolicy: api.PullIfNotPresent,
},
},
RestartPolicy: api.RestartPolicy{
Always: &api.RestartPolicyAlways{},
},
DNSPolicy: api.DNSDefault,
NodeSelector: map[string]string{
"baz": "blah",
},
Expand Down Expand Up @@ -159,23 +167,37 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {

func TestSyncReplicationControllerCreates(t *testing.T) {
body := runtime.EncodeOrDie(testapi.Codec(), newPodList(0))
fakeHandler := util.FakeHandler{
fakePodHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewServer(&fakeHandler)
fakePodControl := FakePodControl{}

controller := newReplicationController(2)
fakeUpdateHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &controller),
T: t,
}

testServerMux := http.NewServeMux()
testServerMux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
testServerMux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", controller.Name), &fakeUpdateHandler)
testServer := httptest.NewServer(testServerMux)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})

fakePodControl := FakePodControl{}

manager := NewReplicationManager(client)
manager.podControl = &fakePodControl

controllerSpec := newReplicationController(2)

manager.syncReplicationController(controllerSpec)
manager.syncReplicationController(controller)
validateSyncReplication(t, &fakePodControl, 2, 0)

// No Status.Replicas update expected even though 2 pods were just created,
// because the controller manager can't observe the pods till the next sync cycle.
if fakeUpdateHandler.RequestReceived != nil {
t.Errorf("Unexpected updates for controller via %v",
fakeUpdateHandler.RequestReceived.URL)
}
}

func TestCreateReplica(t *testing.T) {
Expand All @@ -193,33 +215,7 @@ func TestCreateReplica(t *testing.T) {
kubeClient: client,
}

controllerSpec := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "test",
},
Spec: api.ReplicationControllerSpec{
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"name": "foo",
"type": "production",
"replicationController": "test",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo/bar",
},
},
NodeSelector: map[string]string{
"foo": "bar",
},
},
},
},
}

controllerSpec := newReplicationController(1)
podControl.createReplica(ns, controllerSpec)

manifest := api.ContainerManifest{}
Expand All @@ -245,48 +241,13 @@ func TestCreateReplica(t *testing.T) {
}
}

func TestSynchonize(t *testing.T) {
controllerSpec1 := api.ReplicationController{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
Spec: api.ReplicationControllerSpec{
Replicas: 4,
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"name": "foo",
"type": "production",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo/bar",
},
},
},
},
},
}
controllerSpec2 := api.ReplicationController{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
Spec: api.ReplicationControllerSpec{
Replicas: 3,
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"name": "bar",
"type": "production",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "bar/baz",
},
},
},
},
},
func TestSynchronize(t *testing.T) {
controllerSpec1 := newReplicationController(4)
controllerSpec2 := newReplicationController(3)
controllerSpec2.Name = "bar"
controllerSpec2.Spec.Template.ObjectMeta.Labels = map[string]string{
"name": "bar",
"type": "production",
}

fakeEtcd := tools.NewFakeEtcdClient(t)
Expand Down Expand Up @@ -339,6 +300,106 @@ func TestSynchonize(t *testing.T) {
validateSyncReplication(t, &fakePodControl, 7, 0)
}

func TestControllerNoReplicaUpdate(t *testing.T) {
// Steady state for the replication controller, no Status.Replicas updates expected
rc := newReplicationController(5)
rc.Status = api.ReplicationControllerStatus{Replicas: 5}
activePods := 5

body, _ := latest.Codec.Encode(newPodList(activePods))
fakePodHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
T: t,
}
fakeControllerHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(latest.Codec, &api.ReplicationControllerList{
Items: []api.ReplicationController{rc},
}),
T: t,
}
fakeUpdateHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &rc),
T: t,
}

mux := http.NewServeMux()
mux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
mux.Handle("/api/"+testapi.Version()+"/replicationControllers/", &fakeControllerHandler)
mux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", rc.Name), &fakeUpdateHandler)
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
t.Errorf("Unexpected request for %v", req.RequestURI)
})
testServer := httptest.NewServer(mux)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl

manager.synchronize()

validateSyncReplication(t, &fakePodControl, 0, 0)
if fakeUpdateHandler.RequestReceived != nil {
t.Errorf("Unexpected updates for controller via %v",
fakeUpdateHandler.RequestReceived.URL)
}
}

func TestControllerUpdateReplicas(t *testing.T) {
// Insufficient number of pods in the system, and Status.Replicas is wrong;
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(5)
rc.Status = api.ReplicationControllerStatus{Replicas: 2}
activePods := 4

body, _ := latest.Codec.Encode(newPodList(activePods))
fakePodHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
T: t,
}
fakeControllerHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(latest.Codec, &api.ReplicationControllerList{
Items: []api.ReplicationController{rc},
}),
T: t,
}
fakeUpdateHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &rc),
T: t,
}

mux := http.NewServeMux()

mux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
mux.Handle("/api/"+testapi.Version()+"/replicationControllers/", &fakeControllerHandler)
mux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", rc.Name), &fakeUpdateHandler)
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
t.Errorf("Unexpected request for %v", req.RequestURI)
})
testServer := httptest.NewServer(mux)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl

manager.synchronize()

// Status.Replicas should go up from 2->4 even though we created 5-4=1 pod
rc.Status = api.ReplicationControllerStatus{Replicas: 4}
decRc := runtime.EncodeOrDie(testapi.Codec(), &rc)
fakeUpdateHandler.ValidateRequest(t, fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s?namespace=%s", rc.Name, rc.Namespace), "PUT", &decRc)
validateSyncReplication(t, &fakePodControl, 1, 0)
}

type FakeWatcher struct {
w *watch.FakeWatcher
*client.Fake
Expand Down
16 changes: 0 additions & 16 deletions pkg/registry/controller/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
rc "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -78,7 +77,6 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) {
if err != nil {
return nil, err
}
rs.fillCurrentState(ctx, controller)
return controller, err
}

Expand All @@ -94,7 +92,6 @@ func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Select
filtered := []api.ReplicationController{}
for _, controller := range controllers.Items {
if label.Matches(labels.Set(controller.Labels)) {
rs.fillCurrentState(ctx, &controller)
filtered = append(filtered, controller)
}
}
Expand Down Expand Up @@ -133,16 +130,3 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.WatchControllers(ctx, label, field, resourceVersion)
}

// TODO #2726: The controller should populate the current state, not the apiserver
func (rs *REST) fillCurrentState(ctx api.Context, controller *api.ReplicationController) error {
if rs.podLister == nil {
return nil
}
list, err := rs.podLister.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector())
if err != nil {
return err
}
controller.Status.Replicas = len(rc.FilterActivePods(list.Items))
return nil
}
32 changes: 0 additions & 32 deletions pkg/registry/controller/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"reflect"
"strings"
"testing"

Expand Down Expand Up @@ -354,37 +353,6 @@ func (f *fakePodLister) ListPods(ctx api.Context, s labels.Selector) (*api.PodLi
return &f.l, f.e
}

func TestFillCurrentState(t *testing.T) {
fakeLister := fakePodLister{
l: api.PodList{
Items: []api.Pod{
{ObjectMeta: api.ObjectMeta{Name: "foo"}},
{ObjectMeta: api.ObjectMeta{Name: "bar"}},
},
},
}
mockRegistry := registrytest.ControllerRegistry{}
storage := REST{
registry: &mockRegistry,
podLister: &fakeLister,
}
controller := api.ReplicationController{
Spec: api.ReplicationControllerSpec{
Selector: map[string]string{
"foo": "bar",
},
},
}
ctx := api.NewContext()
storage.fillCurrentState(ctx, &controller)
if controller.Status.Replicas != 2 {
t.Errorf("expected 2, got: %d", controller.Status.Replicas)
}
if !reflect.DeepEqual(fakeLister.s, labels.Set(controller.Spec.Selector).AsSelector()) {
t.Errorf("unexpected output: %#v %#v", labels.Set(controller.Spec.Selector).AsSelector(), fakeLister.s)
}
}

// TODO: remove, covered by TestCreate
func TestCreateControllerWithGeneratedName(t *testing.T) {
storage := NewREST(&registrytest.ControllerRegistry{}, nil)
Expand Down

0 comments on commit 28d9260

Please sign in to comment.