Skip to content

Commit

Permalink
Merge pull request kubernetes#25369 from liggitt/cached-watch
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Return 'too old' errors from watch cache via watch stream

Fixes kubernetes#25151

This PR updates the API server to produce the same results when a watch is attempted with a resourceVersion that is too old, regardless of whether the etcd watch cache is enabled. The expected result is a `200` http status, with a single watch event of type `ERROR`. Previously, the watch cache would deliver a `410` http response.

This is the uncached watch impl:
```
// Implements storage.Interface.
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
	if ctx == nil {
		glog.Errorf("Context is nil")
	}
	watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
	if err != nil {
		return nil, err
	}
	key = h.prefixEtcdKey(key)
	w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
	go w.etcdWatch(ctx, h.etcdKeysAPI, key, watchRV)
	return w, nil
}
```

once the resourceVersion parses, there is no path that returns a direct error, so all errors would have to be returned as an `ERROR` event via the ResultChan().
  • Loading branch information
k8s-merge-robot committed May 11, 2016
2 parents bf7dad0 + f80b59b commit aa8fddb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
48 changes: 47 additions & 1 deletion pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ package storage

import (
"fmt"
"net/http"
"reflect"
"strconv"
"strings"
"sync"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
Expand Down Expand Up @@ -259,7 +262,10 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
return nil, err
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}

c.Lock()
Expand Down Expand Up @@ -455,6 +461,46 @@ func (lw *cacherListerWatcher) Watch(options api.ListOptions) (watch.Interface,
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
}

// cacherWatch implements watch.Interface to return a single error
type errWatcher struct {
result chan watch.Event
}

func newErrWatcher(err error) *errWatcher {
// Create an error event
errEvent := watch.Event{Type: watch.Error}
switch err := err.(type) {
case runtime.Object:
errEvent.Object = err
case *errors.StatusError:
errEvent.Object = &err.ErrStatus
default:
errEvent.Object = &unversioned.Status{
Status: unversioned.StatusFailure,
Message: err.Error(),
Reason: unversioned.StatusReasonInternalError,
Code: http.StatusInternalServerError,
}
}

// Create a watcher with room for a single event, populate it, and close the channel
watcher := &errWatcher{result: make(chan watch.Event, 1)}
watcher.result <- errEvent
close(watcher.result)

return watcher
}

// Implements watch.Interface.
func (c *errWatcher) ResultChan() <-chan watch.Event {
return c.result
}

// Implements watch.Interface.
func (c *errWatcher) Stop() {
// no-op
}

// cacherWatch implements watch.Interface
type cacheWatcher struct {
sync.Mutex
Expand Down
13 changes: 9 additions & 4 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/api/testapi"
apitesting "k8s.io/kubernetes/pkg/api/testing"
Expand Down Expand Up @@ -231,11 +232,15 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Added, podFoo)
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)

// Check whether we get too-old error.
_, err = cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
if err == nil {
t.Errorf("Expected 'error too old' error")
// Check whether we get too-old error via the watch channel
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
if err != nil {
t.Fatalf("Expected no direct error, got %v", err)
}
defer tooOldWatcher.Stop()
// Ensure we get a "Gone" error
expectedGoneError := errors.NewGone("").(*errors.StatusError).ErrStatus
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedGoneError)

initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
if err != nil {
Expand Down

0 comments on commit aa8fddb

Please sign in to comment.