Skip to content

Commit

Permalink
Add watch capability to our client.
Browse files Browse the repository at this point in the history
Next steps: Make an etcd watcher... decide on a state field for pods...
move the scheduler to its own binary.
  • Loading branch information
lavalamp committed Jul 21, 2014
1 parent f672edd commit dbd0d41
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 23 deletions.
128 changes: 115 additions & 13 deletions pkg/client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@ package client

import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)

Expand Down Expand Up @@ -191,25 +196,51 @@ func (r *Request) PollPeriod(d time.Duration) *Request {
return r
}

func (r *Request) finalURL() string {
finalURL := r.c.host + r.path
query := url.Values{}
if r.selector != nil {
query.Add("labels", r.selector.String())
}
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
}
finalURL += "?" + query.Encode()
return finalURL
}

// Attempts to begin watching the requested location. Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) {
if r.err != nil {
return nil, r.err
}
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
if err != nil {
return nil, err
}
if r.c.auth != nil {
req.SetBasicAuth(r.c.auth.User, r.c.auth.Password)
}
response, err := r.c.httpClient.Do(req)
if err != nil {
return nil, err
}
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Got status: %v", response.StatusCode)
}
return newHTTPWatcher(response.Body), nil
}

// Do formats and executes the request. Returns the API object received, or an error.
func (r *Request) Do() Result {
for {
if r.err != nil {
return Result{err: r.err}
}
finalURL := r.c.host + r.path
query := url.Values{}
if r.selector != nil {
query.Add("labels", r.selector.String())
}
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
}
finalURL += "?" + query.Encode()
req, err := http.NewRequest(r.verb, finalURL, r.body)
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
if err != nil {
return Result{err: err}
}
Expand Down Expand Up @@ -262,3 +293,74 @@ func (r Result) Into(obj interface{}) error {
func (r Result) Error() error {
return r.err
}

type httpWatcher struct {
source io.ReadCloser
result chan watch.Event
done chan struct{}
sync.Mutex
stopped bool
}

func newHTTPWatcher(source io.ReadCloser) *httpWatcher {
hw := &httpWatcher{
source: source,
result: make(chan watch.Event),
done: make(chan struct{}),
}
go hw.receive()
return hw
}

// Implements watch.Interface
func (hw *httpWatcher) ResultChan() <-chan watch.Event {
return hw.result
}

// Implements watch.Interface
func (hw *httpWatcher) Stop() {
hw.Lock()
defer hw.Unlock()
if !hw.stopped {
close(hw.done)
hw.stopped = true
}
}

// In a loop, read results from http, decode, and send down the result channel.
func (hw *httpWatcher) receive() {
defer close(hw.result)
defer hw.source.Close()
defer util.HandleCrash()

decoder := json.NewDecoder(hw.source)

decoded := make(chan *api.WatchEvent)

// Read one at a time. Have to do this separately because Decode blocks and
// we want to wait on the done channel, too.
go func() {
defer util.HandleCrash()
for {
var got api.WatchEvent
err := decoder.Decode(&got)
if err != nil {
hw.Stop()
return
}
decoded <- &got
}
}()

for {
select {
case <-hw.done:
return
case got := <-decoded:
hw.result <- watch.Event{
Type: got.Type,
Object: got.Object.Object,
}
}
}
}
92 changes: 92 additions & 0 deletions pkg/client/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@ package client

import (
"bytes"
"encoding/base64"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

func TestDoRequestNewWay(t *testing.T) {
Expand Down Expand Up @@ -303,3 +307,91 @@ func TestPolling(t *testing.T) {
f()
}
}

func authFromReq(r *http.Request) (*AuthInfo, bool) {
auth, ok := r.Header["Authorization"]
if !ok {
return nil, false
}

encoded := strings.Split(auth[0], " ")
if len(encoded) != 2 || encoded[0] != "Basic" {
return nil, false
}

decoded, err := base64.StdEncoding.DecodeString(encoded[1])
if err != nil {
return nil, false
}
parts := strings.Split(string(decoded), ":")
if len(parts) != 2 {
return nil, false
}
return &AuthInfo{User: parts[0], Password: parts[1]}, true
}

// checkAuth sets errors if the auth found in r doesn't match the expectation.
// TODO: Move to util, test in more places.
func checkAuth(t *testing.T, expect AuthInfo, r *http.Request) {
foundAuth, found := authFromReq(r)
if !found {
t.Errorf("no auth found")
} else if e, a := expect, *foundAuth; !reflect.DeepEqual(e, a) {
t.Fatalf("Wrong basic auth: wanted %#v, got %#v", e, a)
}
}

func TestWatch(t *testing.T) {
var table = []struct {
t watch.EventType
obj interface{}
}{
{watch.Added, &api.Pod{JSONBase: api.JSONBase{ID: "first"}}},
{watch.Modified, &api.Pod{JSONBase: api.JSONBase{ID: "second"}}},
{watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "third"}}},
}

auth := AuthInfo{User: "user", Password: "pass"}
testServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
checkAuth(t, auth, r)
flusher, ok := w.(http.Flusher)
if !ok {
panic("need flusher!")
}

w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()

encoder := json.NewEncoder(w)
for _, item := range table {
encoder.Encode(&api.WatchEvent{item.t, api.APIObject{item.obj}})
flusher.Flush()
}
}))

s := New(testServer.URL, &auth)

watching, err := s.Get().Path("path/to/watch/thing").Watch()
if err != nil {
t.Fatalf("Unexpected error")
}

for _, item := range table {
got, ok := <-watching.ResultChan()
if !ok {
t.Fatalf("Unexpected early close")
}
if e, a := item.t, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := item.obj, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}

_, ok := <-watching.ResultChan()
if ok {
t.Fatal("Unexpected non-close")
}
}
22 changes: 12 additions & 10 deletions pkg/watch/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Interface interface {
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
ResultChan() <-chan *Event
ResultChan() <-chan Event
}

// EventType defines the possible types of events.
Expand All @@ -52,45 +52,47 @@ type Event struct {

// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
type FakeWatcher struct {
result chan *Event
result chan Event
Stopped bool
sync.Mutex
}

func NewFake() *FakeWatcher {
return &FakeWatcher{
result: make(chan *Event),
result: make(chan Event),
}
}

// Stop implements Interface.Stop().
func (f *FakeWatcher) Stop() {
f.Lock()
defer f.Unlock()
close(f.result)
f.Stopped = true
if !f.Stopped {
close(f.result)
f.Stopped = true
}
}

func (f *FakeWatcher) ResultChan() <-chan *Event {
func (f *FakeWatcher) ResultChan() <-chan Event {
return f.result
}

// Add sends an add event.
func (f *FakeWatcher) Add(obj interface{}) {
f.result <- &Event{Added, obj}
f.result <- Event{Added, obj}
}

// Modify sends a modify event.
func (f *FakeWatcher) Modify(obj interface{}) {
f.result <- &Event{Modified, obj}
f.result <- Event{Modified, obj}
}

// Delete sends a delete event.
func (f *FakeWatcher) Delete(lastValue interface{}) {
f.result <- &Event{Deleted, lastValue}
f.result <- Event{Deleted, lastValue}
}

// Action sends an event of the requested type, for table-based testing.
func (f *FakeWatcher) Action(action EventType, obj interface{}) {
f.result <- &Event{action, obj}
f.result <- Event{action, obj}
}

0 comments on commit dbd0d41

Please sign in to comment.