Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Futher storage isolation and removal of the tools interface. #18383

Merged
merged 1 commit into from
Dec 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 12 additions & 32 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util"
forked "k8s.io/kubernetes/third_party/forked/coreos/go-etcd/etcd"

"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -99,7 +96,6 @@ type APIServer struct {
AdmissionControl string
AdmissionControlConfigFile string
EtcdServerList []string
EtcdConfigFile string
EtcdServersOverrides []string
EtcdPathPrefix string
CorsAllowedOriginList []string
Expand Down Expand Up @@ -234,7 +230,6 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", "))
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
Expand Down Expand Up @@ -283,36 +278,21 @@ func (s *APIServer) verifyClusterIPFlags() {
}
}

type newEtcdFunc func(string, []string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)
type newEtcdFunc func([]string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)

func newEtcd(etcdConfigFile string, etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
func newEtcd(etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
if storageVersion == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
}
var client tools.EtcdClient
if etcdConfigFile != "" {
client, err = etcd.NewClientFromFile(etcdConfigFile)
if err != nil {
return nil, err
}
} else {
etcdClient := etcd.NewClient(etcdServerList)
transport := &http.Transport{
Dial: forked.Dial,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConnsPerHost: 500,
}
etcdClient.SetTransport(transport)
client = etcdClient
}
var storageConfig etcdstorage.EtcdConfig
storageConfig.ServerList = etcdServerList
storageConfig.Prefix = pathPrefix
versionedInterface, err := interfacesFunc(storageVersion)
if err != nil {
return nil, err
}
etcdStorage = etcdstorage.NewEtcdStorage(client, versionedInterface.Codec, pathPrefix)
return etcdStorage, nil
storageConfig.Codec = versionedInterface.Codec
return storageConfig.NewStorage()
}

// convert to a map between group and groupVersions.
Expand Down Expand Up @@ -360,7 +340,7 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
}

servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn("", servers, apigroup.InterfacesFor, storageVersions[apigroup.GroupVersion.Group], prefix)
etcdOverrideStorage, err := newEtcdFn(servers, apigroup.InterfacesFor, storageVersions[apigroup.GroupVersion.Group], prefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}
Expand All @@ -386,8 +366,8 @@ func (s *APIServer) Run(_ []string) error {
}
glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress)

if (s.EtcdConfigFile != "" && len(s.EtcdServerList) != 0) || (s.EtcdConfigFile == "" && len(s.EtcdServerList) == 0) {
glog.Fatalf("Specify either --etcd-servers or --etcd-config")
if len(s.EtcdServerList) == 0 {
glog.Fatalf("--etcd-servers must be specified")
}

if s.KubernetesServiceNodePort > 0 && !s.ServiceNodePortRange.Contains(s.KubernetesServiceNodePort) {
Expand Down Expand Up @@ -471,7 +451,7 @@ func (s *APIServer) Run(_ []string) error {
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
}
etcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
etcdStorage, err := newEtcd(s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
Expand All @@ -485,7 +465,7 @@ func (s *APIServer) Run(_ []string) error {
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
}
expEtcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
expEtcdStorage, err := newEtcd(s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
}

for _, test := range testCases {
newEtcd := func(_ string, serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
newEtcd := func(serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}
Expand Down
5 changes: 2 additions & 3 deletions contrib/mesos/pkg/election/etcd_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)
Expand All @@ -38,15 +37,15 @@ type Master string
func (Master) IsAnAPIObject() {}

// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd.
func NewEtcdMasterElector(h tools.EtcdClient) MasterElector {
func NewEtcdMasterElector(h *etcd.Client) MasterElector {
return &etcdMasterElector{etcd: h}
}

type empty struct{}

// internal implementation struct
type etcdMasterElector struct {
etcd tools.EtcdClient
etcd *etcd.Client
done chan empty
events chan watch.Event
}
Expand Down
71 changes: 6 additions & 65 deletions contrib/mesos/pkg/election/etcd_master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ limitations under the License.
package election

import (
"reflect"
"testing"

"github.com/coreos/go-etcd/etcd"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/watch"
)

Expand Down Expand Up @@ -57,77 +54,21 @@ func TestEtcdMasterNoOther(t *testing.T) {
w.Stop()
}

// MockClient is wrapper aroung tools.EtcdClient.
type MockClient struct {
client tools.EtcdClient
t *testing.T
// afterGetFunc is called after each Get() call.
afterGetFunc func()
calls []string
}

func (m *MockClient) GetCluster() []string {
return m.client.GetCluster()
}

func (m *MockClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
m.calls = append(m.calls, "get")
defer m.afterGetFunc()
response, err := m.client.Get(key, sort, recursive)
return response, err
}

func (m *MockClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
return m.client.Set(key, value, ttl)
}

func (m *MockClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
m.calls = append(m.calls, "create")
return m.client.Create(key, value, ttl)
}

func (m *MockClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) {
return m.client.CompareAndSwap(key, value, ttl, prevValue, prevIndex)
}

func (m *MockClient) Delete(key string, recursive bool) (*etcd.Response, error) {
return m.client.Delete(key, recursive)
}

func (m *MockClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
return m.client.Watch(prefix, waitIndex, recursive, receiver, stop)
}

func TestEtcdMasterNoOtherThenConflict(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)

// We set up the following scenario:
// - after each Get() call, we write "baz" to a path
// - this is simulating someone else writing a data
// - the value written by someone else is the new value
path := "foo"
client := &MockClient{
client: server.Client,
t: t,
afterGetFunc: func() {
if _, err := server.Client.Set(path, "baz", 0); err != nil {
t.Errorf("unexpected error: %v", err)
}
},
calls: make([]string, 0),
}
master := NewEtcdMasterElector(server.Client)
leader := NewEtcdMasterElector(server.Client)

master := NewEtcdMasterElector(client)
w_ldr := leader.Elect(path, "baz")
result := <-w_ldr.ResultChan()
w := master.Elect(path, "bar")
result := <-w.ResultChan()
result = <-w.ResultChan()
if result.Type != watch.Modified || result.Object.(Master) != "baz" {
t.Errorf("unexpected event: %#v", result)
}
w.Stop()

expectedCalls := []string{"get", "create", "get"}
if !reflect.DeepEqual(client.calls, expectedCalls) {
t.Errorf("unexpected calls: %#v", client.calls)
}
w_ldr.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need to show that the calls happened in the expected order?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue no, b/c the goal of the test is to check that (X) is not the leader.

Operational ordering imho was a false test that wrapped the tool.interface.

}
7 changes: 3 additions & 4 deletions contrib/mesos/pkg/scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ import (
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"

// lock to this API version, compilation will fail when this becomes unsupported
_ "k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -630,7 +629,7 @@ func validateLeadershipTransition(desired, current string) {
}

// hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go
func newEtcd(etcdConfigFile string, etcdServerList []string) (client tools.EtcdClient, err error) {
func newEtcd(etcdConfigFile string, etcdServerList []string) (client *etcd.Client, err error) {
if etcdConfigFile != "" {
client, err = etcd.NewClientFromFile(etcdConfigFile)
} else {
Expand All @@ -639,7 +638,7 @@ func newEtcd(etcdConfigFile string, etcdServerList []string) (client tools.EtcdC
return
}

func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdClient, *mesos.ExecutorID) {
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, *etcd.Client, *mesos.ExecutorID) {
s.frameworkName = strings.TrimSpace(s.frameworkName)
if s.frameworkName == "" {
log.Fatalf("framework-name must be a non-empty string")
Expand Down Expand Up @@ -917,7 +916,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
return
}

func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) {
func (s *SchedulerServer) fetchFrameworkID(client *etcd.Client) (*mesos.FrameworkID, error) {
if s.failoverTimeout > 0 {
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
if !etcdutil.IsEtcdNotFound(err) {
Expand Down
3 changes: 1 addition & 2 deletions docs/admin/kube-apiserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ kube-apiserver
--cloud-provider="": The provider for cloud services. Empty string for no provider.
--cluster-name="kubernetes": The instance prefix for the cluster
--cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.
--etcd-config="": The config file for the etcd client. Mutually exclusive with -etcd-servers.
--etcd-prefix="/registry": The prefix for all resource paths in etcd.
--etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config
--etcd-servers-overrides=[]: Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.
Expand Down Expand Up @@ -107,7 +106,7 @@ kube-apiserver
--watch-cache[=true]: Enable watch caching in the apiserver
```

###### Auto generated by spf13/cobra on 3-Dec-2015
###### Auto generated by spf13/cobra on 9-Dec-2015


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
Expand Down
42 changes: 38 additions & 4 deletions pkg/storage/etcd/etcd_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,64 @@ limitations under the License.
package etcd

import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"path"
"reflect"
"strings"
"time"

"github.com/coreos/go-etcd/etcd"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd/metrics"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"

"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"golang.org/x/net/context"
forked "k8s.io/kubernetes/third_party/forked/coreos/go-etcd/etcd"
)

func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string) storage.Interface {
// storage.Config object for etcd.
type EtcdConfig struct {
ServerList []string
Codec runtime.Codec
Prefix string
}

// implements storage.Config
func (c *EtcdConfig) GetType() string {
return "etcd"
}

// implements storage.Config
func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
etcdClient := etcd.NewClient(c.ServerList)
if etcdClient == nil {
return nil, errors.New("Failed to create new etcd client from serverlist")
}
transport := &http.Transport{
Dial: forked.Dial,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConnsPerHost: 500,
}
etcdClient.SetTransport(transport)

return NewEtcdStorage(etcdClient, c.Codec, c.Prefix), nil
}

// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client *etcd.Client, codec runtime.Codec, prefix string) storage.Interface {
return &etcdHelper{
client: client,
codec: codec,
Expand All @@ -53,7 +87,7 @@ func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string)

// etcdHelper is the reference implementation of storage.Interface.
type etcdHelper struct {
client tools.EtcdClient
client *etcd.Client
codec runtime.Codec
copier runtime.ObjectCopier
// optional, has to be set to perform any atomic operations
Expand Down
7 changes: 2 additions & 5 deletions pkg/storage/etcd/etcd_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"testing"

"github.com/coreos/go-etcd/etcd"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
Expand All @@ -34,10 +35,6 @@ import (
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
storagetesting "k8s.io/kubernetes/pkg/storage/testing"

// TODO: once fakeClient has been purged move utils
// and eliminate these deps
"k8s.io/kubernetes/pkg/tools"
)

const validEtcdVersion = "etcd 2.0.9"
Expand All @@ -58,7 +55,7 @@ func init() {
)
}

func newEtcdHelper(client tools.EtcdClient, codec runtime.Codec, prefix string) etcdHelper {
func newEtcdHelper(client *etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
}

Expand Down
Loading