Skip to content

Commit

Permalink
Make Reflector helpers reusable.
Browse files Browse the repository at this point in the history
Scheduler uses Reflector from pkg/client/cache.
It defines some helper classes.
I'd like to use those helpers with pkg/client/cache
in kube-proxy and kubelet too.
  • Loading branch information
erictune committed Jan 7, 2015
1 parent 4c57ec0 commit 7d5ac85
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 144 deletions.
4 changes: 2 additions & 2 deletions hack/test-go.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ kube::test::find_pkgs() {
}

# -covermode=atomic becomes default with -race in Go >=1.3
KUBE_COVER=${KUBE_COVER:--cover -covermode=atomic}
KUBE_COVER="" #${KUBE_COVER:--cover -covermode=atomic}
KUBE_TIMEOUT=${KUBE_TIMEOUT:--timeout 120s}
KUBE_RACE=${KUBE_RACE:--race}
KUBE_RACE="" #${KUBE_RACE:--race}

kube::test::usage() {
kube::log::usage_from_stdin <<EOF
Expand Down
52 changes: 52 additions & 0 deletions pkg/client/cache/listwatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
type ListWatch struct {
Client *client.Client
FieldSelector labels.Selector
Resource string
}

// ListWatch knows how to list and watch a set of apiserver resources.
func (lw *ListWatch) List() (runtime.Object, error) {
return lw.Client.
Get().
Resource(lw.Resource).
SelectorParam("fields", lw.FieldSelector).
Do().
Get()
}

func (lw *ListWatch) Watch(resourceVersion string) (watch.Interface, error) {
return lw.Client.
Get().
Prefix("watch").
Resource(lw.Resource).
SelectorParam("fields", lw.FieldSelector).
Param("resourceVersion", resourceVersion).
Watch()
}
123 changes: 123 additions & 0 deletions pkg/client/cache/listwatch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cache

import (
"net/http/httptest"
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

func parseSelectorOrDie(s string) labels.Selector {
selector, err := labels.ParseSelector(s)
if err != nil {
panic(err)
}
return selector
}

func TestListWatchesCanList(t *testing.T) {
table := []struct {
location string
lw ListWatch
}{
// Minion
{
location: "/api/" + testapi.Version() + "/minions",
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
},
// pod with "assigned" field selector.
{
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host%3D",
lw: ListWatch{
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
},
},
}

for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
// This test merely tests that the correct request is made.
item.lw.List()
handler.ValidateRequest(t, item.location, "GET", nil)
}
}

func TestListWatchesCanWatch(t *testing.T) {
table := []struct {
rv string
location string
lw ListWatch
}{
// Minion
{
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=",
rv: "",
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
},
{
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=42",
rv: "42",
lw: ListWatch{
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
},
},
// pod with "assigned" field selector.
{
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
rv: "0",
lw: ListWatch{
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
},
},
}

for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})

// This test merely tests that the correct request is made.
item.lw.Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
64 changes: 19 additions & 45 deletions plugin/pkg/scheduler/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler"

"github.com/golang/glog"
Expand Down Expand Up @@ -130,38 +128,13 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
}, nil
}

type listWatch struct {
client *client.Client
fieldSelector labels.Selector
resource string
}

func (lw *listWatch) List() (runtime.Object, error) {
return lw.client.
Get().
Resource(lw.resource).
SelectorParam("fields", lw.fieldSelector).
Do().
Get()
}

func (lw *listWatch) Watch(resourceVersion string) (watch.Interface, error) {
return lw.client.
Get().
Prefix("watch").
Resource(lw.resource).
SelectorParam("fields", lw.fieldSelector).
Param("resourceVersion", resourceVersion).
Watch()
}

// createUnassignedPodLW returns a listWatch that finds all pods that need to be
// createUnassignedPodLW returns a cache.ListWatch that finds all pods that need to be
// scheduled.
func (factory *ConfigFactory) createUnassignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
resource: "pods",
func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
}
}

Expand All @@ -173,22 +146,23 @@ func parseSelectorOrDie(s string) labels.Selector {
return selector
}

// createAssignedPodLW returns a listWatch that finds all pods that are
// createAssignedPodLW returns a cache.ListWatch that finds all pods that are
// already scheduled.
func (factory *ConfigFactory) createAssignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie("DesiredState.Host!="),
resource: "pods",
// TODO: return a ListerWatcher interface instead?
func (factory *ConfigFactory) createAssignedPodLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: parseSelectorOrDie("DesiredState.Host!="),
Resource: "pods",
}
}

// createMinionLW returns a listWatch that gets all changes to minions.
func (factory *ConfigFactory) createMinionLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie(""),
resource: "minions",
// createMinionLW returns a cache.ListWatch that gets all changes to minions.
func (factory *ConfigFactory) createMinionLW() *cache.ListWatch {
return &cache.ListWatch{
Client: factory.Client,
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
}
}

Expand Down
97 changes: 0 additions & 97 deletions plugin/pkg/scheduler/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,103 +47,6 @@ func TestCreate(t *testing.T) {
factory.Create()
}

func TestCreateLists(t *testing.T) {
factory := NewConfigFactory(nil)
table := []struct {
location string
factory func() *listWatch
}{
// Minion
{
location: "/api/" + testapi.Version() + "/minions",
factory: factory.createMinionLW,
},
// Assigned pod
{
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host!%3D",
factory: factory.createAssignedPodLW,
},
// Unassigned pod
{
location: "/api/" + testapi.Version() + "/pods?fields=DesiredState.Host%3D",
factory: factory.createUnassignedPodLW,
},
}

for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
// This test merely tests that the correct request is made.
item.factory().List()
handler.ValidateRequest(t, item.location, "GET", nil)
}
}

func TestCreateWatches(t *testing.T) {
factory := NewConfigFactory(nil)
table := []struct {
rv string
location string
factory func() *listWatch
}{
// Minion watch
{
rv: "",
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=",
factory: factory.createMinionLW,
}, {
rv: "0",
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=0",
factory: factory.createMinionLW,
}, {
rv: "42",
location: "/api/" + testapi.Version() + "/watch/minions?resourceVersion=42",
factory: factory.createMinionLW,
},
// Assigned pod watches
{
rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=",
factory: factory.createAssignedPodLW,
}, {
rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
factory: factory.createAssignedPodLW,
},
// Unassigned pod watches
{
rv: "",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=",
factory: factory.createUnassignedPodLW,
}, {
rv: "42",
location: "/api/" + testapi.Version() + "/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
factory: factory.createUnassignedPodLW,
},
}

for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
defer server.Close()
factory.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})

// This test merely tests that the correct request is made.
item.factory().Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
}
}

func TestPollMinions(t *testing.T) {
table := []struct {
minions []api.Node
Expand Down

0 comments on commit 7d5ac85

Please sign in to comment.