Skip to content

Commit

Permalink
Cleanup watch encoding (remove dupe Encoding)
Browse files Browse the repository at this point in the history
Move standard watch encode / decode streams to use
runtime.RawExtension and embed API decoding based on
a provided codec.
  • Loading branch information
smarterclayton committed Sep 22, 2014
1 parent 6778a8d commit e3da2ba
Show file tree
Hide file tree
Showing 11 changed files with 239 additions and 145 deletions.
15 changes: 4 additions & 11 deletions pkg/apiserver/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@ limitations under the License.
package apiserver

import (
"encoding/json"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"

"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
)

type WatchHandler struct {
Expand Down Expand Up @@ -120,7 +119,7 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) {
// End of results.
return
}
obj, err := api.NewJSONWatchEvent(w.codec, event)
obj, err := watchjson.Object(w.codec, &event)
if err != nil {
// Client disconnect.
w.watching.Stop()
Expand Down Expand Up @@ -158,7 +157,7 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
flusher.Flush()

encoder := json.NewEncoder(w)
encoder := watchjson.NewEncoder(w, self.codec)
for {
select {
case <-cn.CloseNotify():
Expand All @@ -169,13 +168,7 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// End of results.
return
}
obj, err := api.NewJSONWatchEvent(self.codec, event)
if err != nil {
// Client disconnect.
self.watching.Stop()
return
}
if err := encoder.Encode(obj); err != nil {
if err := encoder.Encode(&event); err != nil {
// Client disconnect.
self.watching.Stop()
return
Expand Down
23 changes: 14 additions & 9 deletions pkg/apiserver/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ import (
"testing"

"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

// watchJSON defines the expected JSON wire equivalent of watch.Event
type watchJSON struct {
Type watch.EventType `json:"type,omitempty" yaml:"type,omitempty"`
Object json.RawMessage `json:"object,omitempty" yaml:"object,omitempty"`
}

var watchTestTable = []struct {
t watch.EventType
obj runtime.Object
Expand Down Expand Up @@ -61,16 +66,16 @@ func TestWatchWebsocket(t *testing.T) {
// Send
simpleStorage.fakeWatch.Action(action, object)
// Test receive
var got api.WatchEvent
var got watchJSON
err := websocket.JSON.Receive(ws, &got)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if got.Type != action {
t.Errorf("Unexpected type: %v", got.Type)
}
if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
if e, a := runtime.EncodeOrDie(codec, object), string(got.Object); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}

Expand All @@ -79,7 +84,7 @@ func TestWatchWebsocket(t *testing.T) {
}
simpleStorage.fakeWatch.Stop()

var got api.WatchEvent
var got watchJSON
err = websocket.JSON.Receive(ws, &got)
if err == nil {
t.Errorf("Unexpected non-error")
Expand Down Expand Up @@ -118,21 +123,21 @@ func TestWatchHTTP(t *testing.T) {
// Send
simpleStorage.fakeWatch.Action(item.t, item.obj)
// Test receive
var got api.WatchEvent
var got watchJSON
err := decoder.Decode(&got)
if err != nil {
t.Fatalf("%d: Unexpected error: %v", i, err)
}
if got.Type != item.t {
t.Errorf("%d: Unexpected type: %v", i, got.Type)
}
if e, a := item.obj, got.Object.Object; !reflect.DeepEqual(e, a) {
t.Errorf("%d: Expected %v, got %v", i, e, a)
if e, a := runtime.EncodeOrDie(codec, item.obj), string(got.Object); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %#v, got %#v", e, a)
}
}
simpleStorage.fakeWatch.Stop()

var got api.WatchEvent
var got watchJSON
err = decoder.Decode(&got)
if err == nil {
t.Errorf("Unexpected non-error")
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
cwatch "github.com/GoogleCloudPlatform/kubernetes/pkg/client/watch"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
"github.com/golang/glog"
)

Expand Down Expand Up @@ -269,7 +269,7 @@ func (r *Request) Watch() (watch.Interface, error) {
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Got status: %v", response.StatusCode)
}
return watch.NewStreamWatcher(cwatch.NewAPIEventDecoder(response.Body)), nil
return watch.NewStreamWatcher(watchjson.NewDecoder(response.Body, r.c.Codec)), nil
}

// Do formats and executes the request. Returns the API object received, or an error.
Expand Down
13 changes: 5 additions & 8 deletions pkg/client/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package client
import (
"bytes"
"encoding/base64"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand All @@ -29,12 +28,14 @@ import (
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
)

func TestDoRequestNewWay(t *testing.T) {
Expand Down Expand Up @@ -386,7 +387,7 @@ func TestWatch(t *testing.T) {
}{
{watch.Added, &api.Pod{JSONBase: api.JSONBase{ID: "first"}}},
{watch.Modified, &api.Pod{JSONBase: api.JSONBase{ID: "second"}}},
{watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "third"}}},
{watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "last"}}},
}

auth := AuthInfo{User: "user", Password: "pass"}
Expand All @@ -401,13 +402,9 @@ func TestWatch(t *testing.T) {
w.WriteHeader(http.StatusOK)
flusher.Flush()

encoder := json.NewEncoder(w)
encoder := watchjson.NewEncoder(w, latest.Codec)
for _, item := range table {
data, err := api.NewJSONWatchEvent(v1beta1.Codec, watch.Event{item.t, item.obj})
if err != nil {
panic(err)
}
if err := encoder.Encode(data); err != nil {
if err := encoder.Encode(&watch.Event{item.t, item.obj}); err != nil {
panic(err)
}
flusher.Flush()
Expand Down
64 changes: 0 additions & 64 deletions pkg/client/watch/decoder.go

This file was deleted.

69 changes: 69 additions & 0 deletions pkg/watch/json/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package json

import (
"encoding/json"
"fmt"
"io"

"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

// Decoder implements the watch.Decoder interface for io.ReadClosers that
// have contents which consist of a series of watchEvent objects encoded via JSON.
// It will decode any object registered in the supplied codec.
type Decoder struct {
r io.ReadCloser
decoder *json.Decoder
codec runtime.Codec
}

// NewDecoder creates an Decoder for the given writer and codec.
func NewDecoder(r io.ReadCloser, codec runtime.Codec) *Decoder {
return &Decoder{
r: r,
decoder: json.NewDecoder(r),
codec: codec,
}
}

// Decode blocks until it can return the next object in the writer. Returns an error
// if the writer is closed or an object can't be decoded.
func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
var got watchEvent
if err := d.decoder.Decode(&got); err != nil {
return "", nil, err
}
switch got.Type {
case watch.Added, watch.Modified, watch.Deleted:
default:
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}

obj, err := d.codec.Decode(got.Object.RawJSON)
if err != nil {
return "", nil, fmt.Errorf("unable to decode watch event: %v", err)
}
return got.Type, obj, nil
}

// Close closes the underlying r.
func (d *Decoder) Close() {
d.r.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package watch
package json

import (
"encoding/json"
Expand All @@ -25,17 +25,13 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

type watchSerialization struct {
Type watch.EventType
Object json.RawMessage
}

func TestDecoder(t *testing.T) {
out, in := io.Pipe()
decoder := NewAPIEventDecoder(out)
decoder := NewDecoder(out, v1beta1.Codec)

expect := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
encoder := json.NewEncoder(in)
Expand All @@ -44,7 +40,7 @@ func TestDecoder(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if err := encoder.Encode(&watchSerialization{watch.Added, json.RawMessage(data)}); err != nil {
if err := encoder.Encode(&watchEvent{watch.Added, runtime.RawExtension{json.RawMessage(data)}}); err != nil {
t.Errorf("Unexpected error %v", err)
}
in.Close()
Expand Down Expand Up @@ -82,7 +78,7 @@ func TestDecoder(t *testing.T) {

func TestDecoder_SourceClose(t *testing.T) {
out, in := io.Pipe()
decoder := NewAPIEventDecoder(out)
decoder := NewDecoder(out, v1beta1.Codec)

done := make(chan struct{})

Expand Down
Loading

0 comments on commit e3da2ba

Please sign in to comment.