Skip to content

Commit

Permalink
add a knob to enable quorum read
Browse files Browse the repository at this point in the history
  • Loading branch information
mqliang committed Jan 30, 2016
1 parent 63ec304 commit b0e06c1
Show file tree
Hide file tree
Showing 17 changed files with 60 additions and 37 deletions.
1 change: 1 addition & 0 deletions cmd/kube-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.BoolVar(&s.EtcdQuorumRead, "etcd-quorum-read", s.EtcdQuorumRead, "If true, enable quorum read")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
Expand Down
15 changes: 8 additions & 7 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func verifyClusterIPFlags(s *options.APIServer) {
}
}

type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string) (storage.Interface, error)
type newEtcdFunc func([]string, runtime.NegotiatedSerializer, string, string, bool) (storage.Interface, error)

func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, pathPrefix string) (etcdStorage storage.Interface, err error) {
func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGroupVersionString, pathPrefix string, quorum bool) (etcdStorage storage.Interface, err error) {
if storageGroupVersionString == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
}
Expand All @@ -96,6 +96,7 @@ func newEtcd(etcdServerList []string, ns runtime.NegotiatedSerializer, storageGr
var storageConfig etcdstorage.EtcdConfig
storageConfig.ServerList = etcdServerList
storageConfig.Prefix = pathPrefix
storageConfig.Quorum = quorum
s, ok := ns.SerializerForMediaType("application/json", nil)
if !ok {
return nil, fmt.Errorf("unable to find serializer for JSON")
Expand All @@ -120,7 +121,7 @@ func generateStorageVersionMap(legacyVersion string, storageVersions string) map
}

// parse the value of --etcd-servers-overrides and update given storageDestinations.
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, quorum bool, storageDestinations *genericapiserver.StorageDestinations, newEtcdFn newEtcdFunc) {
if len(overrides) == 0 {
return
}
Expand Down Expand Up @@ -149,7 +150,7 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
}

servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], prefix)
etcdOverrideStorage, err := newEtcdFn(servers, api.Codecs, storageVersions[apigroup.GroupVersion.Group], prefix, quorum)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}
Expand Down Expand Up @@ -260,7 +261,7 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
}
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
etcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix, s.EtcdQuorumRead)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
Expand All @@ -274,14 +275,14 @@ func Run(s *options.APIServer) error {
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
}
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
expEtcdStorage, err := newEtcd(s.EtcdServerList, api.Codecs, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix, s.EtcdQuorumRead)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup(extensions.GroupName, expEtcdStorage)
}

updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, &storageDestinations, newEtcd)
updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, s.EtcdQuorumRead, &storageDestinations, newEtcd)

n := s.ServiceClusterIPRange

Expand Down
4 changes: 2 additions & 2 deletions cmd/kube-apiserver/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,15 @@ func TestUpdateEtcdOverrides(t *testing.T) {
}

for _, test := range testCases {
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _ string) (storage.Interface, error) {
newEtcd := func(serverList []string, _ runtime.NegotiatedSerializer, _, _ string, _ bool) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}
return nil, nil
}
storageDestinations := genericapiserver.NewStorageDestinations()
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
updateEtcdOverrides([]string{override}, storageVersions, "", &storageDestinations, newEtcd)
updateEtcdOverrides([]string{override}, storageVersions, "", false, &storageDestinations, newEtcd)
apigroup, ok := storageDestinations.APIGroups[test.apigroup]
if !ok {
t.Errorf("apigroup: %s not created", test.apigroup)
Expand Down
3 changes: 2 additions & 1 deletion docs/admin/kube-apiserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ kube-apiserver
--cloud-provider="": The provider for cloud services. Empty string for no provider.
--cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.
--etcd-prefix="/registry": The prefix for all resource paths in etcd.
--etcd-quorum-read[=false]: If true, enable quorum read
--etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config
--etcd-servers-overrides=[]: Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.
--event-ttl=1h0m0s: Amount of time to retain events. Default 1 hour.
Expand Down Expand Up @@ -107,7 +108,7 @@ kube-apiserver
--watch-cache[=true]: Enable watch caching in the apiserver
```

###### Auto generated by spf13/cobra on 14-Jan-2016
###### Auto generated by spf13/cobra on 26-Jan-2016


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
Expand Down
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ enable-server
etcd-config
etcd-mutation-timeout
etcd-prefix
etcd-quorum-read
etcd-server
etcd-servers
etcd-servers-overrides
Expand Down
1 change: 1 addition & 0 deletions pkg/genericapiserver/server_run_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ServerRunOptions struct {
BindAddress net.IP
CertDirectory string
ClientCAFile string
EtcdQuorumRead bool
InsecureBindAddress net.IP
InsecurePort int
LongRunningRequestRE string
Expand Down
6 changes: 3 additions & 3 deletions pkg/master/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ func setUp(t *testing.T) (Master, *etcdtesting.EtcdTestServer, Config, *assert.A
storageVersions := make(map[string]string)
storageDestinations := genericapiserver.NewStorageDestinations()
storageDestinations.AddAPIGroup(
api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix()))
api.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false))
storageDestinations.AddAPIGroup(
extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix()))
extensions.GroupName, etcdstorage.NewEtcdStorage(server.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false))

config.StorageDestinations = storageDestinations
storageVersions[api.GroupName] = testapi.Default.GroupVersion().String()
Expand Down Expand Up @@ -348,7 +348,7 @@ func initThirdParty(t *testing.T, version string) (*Master, *etcdtesting.EtcdTes
},
}
master.HandlerContainer = restful.NewContainer()
master.thirdPartyStorage = etcdstorage.NewEtcdStorage(etcdserver.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix())
master.thirdPartyStorage = etcdstorage.NewEtcdStorage(etcdserver.Client, testapi.Extensions.Codec(), etcdtest.PathPrefix(), false)

if !assert.NoError(master.InstallThirdPartyResource(api)) {
t.FailNow()
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/generic/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
func NewTestGenericEtcdRegistry(t *testing.T) (*etcdtesting.EtcdTestServer, *Etcd) {
podPrefix := "/pods"
server := etcdtesting.NewEtcdTestClientServer(t)
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix())
s := etcdstorage.NewEtcdStorage(server.Client, testapi.Default.Codec(), etcdtest.PathPrefix(), false)
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true}

return server, &Etcd{
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/registrytest/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

func NewEtcdStorage(t *testing.T, group string) (storage.Interface, *etcdtesting.EtcdTestServer) {
server := etcdtesting.NewEtcdTestClientServer(t)
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix())
storage := etcdstorage.NewEtcdStorage(server.Client, testapi.Groups[group].Codec(), etcdtest.PathPrefix(), false)
return storage, server
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (

func newEtcdTestStorage(t *testing.T, codec runtime.Codec, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
server := etcdtesting.NewEtcdTestClientServer(t)
storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix)
storage := etcdstorage.NewEtcdStorage(server.Client, codec, prefix, false)
return server, storage
}

Expand Down
27 changes: 21 additions & 6 deletions pkg/storage/etcd/etcd_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type EtcdConfig struct {
ServerList []string
Codec runtime.Codec
Prefix string
Quorum bool
}

// implements storage.Config
Expand All @@ -72,19 +73,20 @@ func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
if err != nil {
return nil, err
}
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix), nil
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix, c.Quorum), nil
}

// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string) storage.Interface {
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool) storage.Interface {
return &etcdHelper{
etcdclient: client,
client: etcd.NewKeysAPI(client),
codec: codec,
versioner: APIObjectVersioner{},
copier: api.Scheme,
pathPrefix: path.Join("/", prefix),
quorum: quorum,
cache: util.NewCache(maxEtcdCacheEntries),
}
}
Expand All @@ -99,6 +101,8 @@ type etcdHelper struct {
versioner storage.Versioner
// prefix for all etcd keys
pathPrefix string
// if true, perform quorum read
quorum bool

// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion.
Expand Down Expand Up @@ -269,7 +273,7 @@ func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion stri
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.client, key, watchRV)
return w, nil
}
Expand All @@ -284,7 +288,7 @@ func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(ctx, h.client, key, watchRV)
return w, nil
}
Expand All @@ -306,7 +310,12 @@ func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr r
glog.Errorf("Context is nil")
}
startTime := time.Now()
response, err := h.client.Get(ctx, key, nil)

opts := &etcd.GetOptions{
Quorum: h.quorum,
}

response, err := h.client.Get(ctx, key, opts)
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)

if err != nil && !etcdutil.IsEtcdNotFound(err) {
Expand Down Expand Up @@ -365,7 +374,12 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.F
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to read etcd node")
response, err := h.client.Get(ctx, key, nil)

opts := &etcd.GetOptions{
Quorum: h.quorum,
}
response, err := h.client.Get(ctx, key, opts)

metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
trace.Step("Etcd node read")
if err != nil {
Expand Down Expand Up @@ -473,6 +487,7 @@ func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node
opts := etcd.GetOptions{
Recursive: true,
Sort: true,
Quorum: h.quorum,
}
result, err := h.client.Get(ctx, key, &opts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/etcd/etcd_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func testScheme(t *testing.T) (*runtime.Scheme, runtime.Codec) {
}

func newEtcdHelper(client etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
return *NewEtcdStorage(client, codec, prefix, false).(*etcdHelper)
}

// Returns an encoded version of api.Pod with the given name.
Expand Down
9 changes: 6 additions & 3 deletions pkg/storage/etcd/etcd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type etcdWatcher struct {
transform TransformFunc

list bool // If we're doing a recursive watch, should be true.
quorum bool // If we enable quorum, shoule be true
include includeFunc
filter storage.FilterFunc

Expand Down Expand Up @@ -109,12 +110,13 @@ const watchWaitDuration = 100 * time.Millisecond

// newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform
// and a versioner, the versioner must be able to handle the objects that transform creates.
func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
func newEtcdWatcher(list bool, quorum bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher {
w := &etcdWatcher{
encoding: encoding,
versioner: versioner,
transform: transform,
list: list,
quorum: quorum,
include: include,
filter: filter,
// Buffer this channel, so that the etcd client is not forced
Expand Down Expand Up @@ -171,7 +173,7 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
// Stop() is called in the meantime (which in tests can cause etcd termination and
// strange behavior here).
if resourceVersion == 0 {
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.etcdIncoming)
latest, err := etcdGetInitialWatchState(ctx, client, key, w.list, w.quorum, w.etcdIncoming)
if err != nil {
w.etcdError <- err
return true
Expand Down Expand Up @@ -203,10 +205,11 @@ func (w *etcdWatcher) etcdWatch(ctx context.Context, client etcd.KeysAPI, key st
}

// etcdGetInitialWatchState turns an etcd Get request into a watch equivalent
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
func etcdGetInitialWatchState(ctx context.Context, client etcd.KeysAPI, key string, recursive bool, quorum bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) {
opts := etcd.GetOptions{
Recursive: recursive,
Sort: false,
Quorum: quorum,
}
resp, err := client.Get(ctx, key, &opts)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/etcd/etcd_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestWatchInterpretations(t *testing.T) {

for name, item := range table {
for _, action := range item.actions {
w := newEtcdWatcher(true, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(true, false, nil, firstLetterIsB, codec, versioner, nil, &fakeEtcdCache{})
emitCalled := false
w.emit = func(event watch.Event) {
emitCalled = true
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestWatchInterpretations(t *testing.T) {

func TestWatchInterpretation_ResponseNotSet(t *testing.T) {
_, codec := testScheme(t)
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand All @@ -185,7 +185,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) {
_, codec := testScheme(t)
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand All @@ -200,7 +200,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) {
_, codec := testScheme(t)
actions := []string{"create", "set", "compareAndSwap", "delete"}
for _, action := range actions {
w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w := newEtcdWatcher(false, false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{})
w.emit = func(e watch.Event) {
t.Errorf("Unexpected emit: %v", e)
}
Expand Down
8 changes: 4 additions & 4 deletions test/integration/etcd_tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
func TestSet(t *testing.T) {
client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "")
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false)
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
Expand All @@ -63,7 +63,7 @@ func TestSet(t *testing.T) {
func TestGet(t *testing.T) {
client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "")
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false)
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
Expand All @@ -90,7 +90,7 @@ func TestGet(t *testing.T) {
func TestWriteTTL(t *testing.T) {
client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "")
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), "", false)
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}}
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestWriteTTL(t *testing.T) {
func TestWatch(t *testing.T) {
client := framework.NewEtcdClient()
keysAPI := etcd.NewKeysAPI(client)
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix())
etcdStorage := etcdstorage.NewEtcdStorage(client, testapi.Default.Codec(), etcdtest.PathPrefix(), false)
ctx := context.TODO()
framework.WithEtcdKey(func(key string) {
key = etcdtest.AddPrefix(key)
Expand Down
Loading

0 comments on commit b0e06c1

Please sign in to comment.