Skip to content

Commit

Permalink
Watch support in PodInterface.
Browse files Browse the repository at this point in the history
Added Watch support to PodInterface. Extended pods e2e to test watch. Resolves kubernetes#4777.
  • Loading branch information
jszczepkowski authored and akram committed Apr 4, 2015
1 parent 1f4a41e commit b3f359c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 3 deletions.
6 changes: 6 additions & 0 deletions pkg/client/fake_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package client
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

// FakePods implements PodsInterface. Meant to be embedded into a struct to get a default
Expand Down Expand Up @@ -53,6 +54,11 @@ func (c *FakePods) Update(pod *api.Pod) (*api.Pod, error) {
return &api.Pod{}, nil
}

func (c *FakePods) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-pods", Value: resourceVersion})
return c.Fake.Watch, c.Fake.Err
}

func (c *FakePods) Bind(bind *api.Binding) error {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "bind-pod", Value: bind.Name})
return nil
Expand Down
15 changes: 14 additions & 1 deletion pkg/client/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

// PodsNamespacer has methods to work with Pod resources in a namespace
Expand All @@ -36,7 +37,7 @@ type PodInterface interface {
Delete(name string) error
Create(pod *api.Pod) (*api.Pod, error)
Update(pod *api.Pod) (*api.Pod, error)

Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error)
Bind(binding *api.Binding) error
}

Expand Down Expand Up @@ -95,6 +96,18 @@ func (c *pods) Update(pod *api.Pod) (result *api.Pod, err error) {
return
}

// Watch returns a watch.Interface that watches the requested pods.
func (c *pods) Watch(label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return c.r.Get().
Prefix("watch").
Namespace(c.ns).
Resource("pods").
Param("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}

// Bind applies the provided binding to the named pod in the current namespace (binding.Namespace is ignored).
func (c *pods) Bind(binding *api.Binding) error {
return c.r.Post().Namespace(c.ns).Resource("pods").Name(binding.Name).SubResource("binding").Body(binding).Do().Error()
Expand Down
48 changes: 46 additions & 2 deletions test/e2e/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -122,27 +123,70 @@ var _ = Describe("Pods", func() {
},
}

By("setting up watch")
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
if err != nil {
Fail(fmt.Sprintf("Failed to query for pods: %v", err))
}
Expect(len(pods.Items)).To(Equal(0))
w, err := podClient.Watch(
labels.SelectorFromSet(labels.Set(map[string]string{"time": value})), labels.Everything(), pods.ListMeta.ResourceVersion)
if err != nil {
Fail(fmt.Sprintf("Failed to set up watch: %v", err))
}

By("submitting the pod to kubernetes")
// We call defer here in case there is a problem with
// the test so we can ensure that we clean up after
// ourselves
defer podClient.Delete(pod.Name)
_, err := podClient.Create(pod)
_, err = podClient.Create(pod)
if err != nil {
Fail(fmt.Sprintf("Failed to create pod: %v", err))
}

By("verifying the pod is in kubernetes")
pods, err := podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
if err != nil {
Fail(fmt.Sprintf("Failed to query for pods: %v", err))
}
Expect(len(pods.Items)).To(Equal(1))

By("veryfying pod creation was observed")
select {
case event, _ := <-w.ResultChan():
if event.Type != watch.Added {
Fail(fmt.Sprintf("Failed to observe pod creation: %v", event))
}
case <-time.After(podStartTimeout):
Fail("Timeout while waiting for pod creation")
}

By("deleting the pod")
podClient.Delete(pod.Name)
pods, err = podClient.List(labels.SelectorFromSet(labels.Set(map[string]string{"time": value})))
if err != nil {
Fail(fmt.Sprintf("Failed to delete pod: %v", err))
}
Expect(len(pods.Items)).To(Equal(0))

By("veryfying pod deletion was observed")
deleted := false
timeout := false
timer := time.After(podStartTimeout)
for !deleted && !timeout {
select {
case event, _ := <-w.ResultChan():
if event.Type == watch.Deleted {
deleted = true
}
case <-timer:
timeout = true
}
}
if !deleted {
Fail("Failed to observe pod deletion")
}
})

It("should be updated", func() {
Expand Down

0 comments on commit b3f359c

Please sign in to comment.