Skip to content

Commit

Permalink
Merge pull request kubernetes#18383 from timothysc/tools_removal
Browse files Browse the repository at this point in the history
Auto commit by PR queue bot
  • Loading branch information
k8s-merge-robot committed Dec 11, 2015
2 parents e34e6e7 + 413d8d1 commit d3243b8
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 176 deletions.
44 changes: 12 additions & 32 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ import (
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/storage"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util"
forked "k8s.io/kubernetes/third_party/forked/coreos/go-etcd/etcd"

"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -99,7 +96,6 @@ type APIServer struct {
AdmissionControl string
AdmissionControlConfigFile string
EtcdServerList []string
EtcdConfigFile string
EtcdServersOverrides []string
EtcdPathPrefix string
CorsAllowedOriginList []string
Expand Down Expand Up @@ -234,7 +230,6 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.AdmissionControl, "admission-control", s.AdmissionControl, "Ordered list of plug-ins to do admission control of resources into cluster. Comma-delimited list of: "+strings.Join(admission.GetPlugins(), ", "))
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
Expand Down Expand Up @@ -283,36 +278,21 @@ func (s *APIServer) verifyClusterIPFlags() {
}
}

type newEtcdFunc func(string, []string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)
type newEtcdFunc func([]string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)

func newEtcd(etcdConfigFile string, etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
func newEtcd(etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
if storageVersion == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
}
var client tools.EtcdClient
if etcdConfigFile != "" {
client, err = etcd.NewClientFromFile(etcdConfigFile)
if err != nil {
return nil, err
}
} else {
etcdClient := etcd.NewClient(etcdServerList)
transport := &http.Transport{
Dial: forked.Dial,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConnsPerHost: 500,
}
etcdClient.SetTransport(transport)
client = etcdClient
}
var storageConfig etcdstorage.EtcdConfig
storageConfig.ServerList = etcdServerList
storageConfig.Prefix = pathPrefix
versionedInterface, err := interfacesFunc(storageVersion)
if err != nil {
return nil, err
}
etcdStorage = etcdstorage.NewEtcdStorage(client, versionedInterface.Codec, pathPrefix)
return etcdStorage, nil
storageConfig.Codec = versionedInterface.Codec
return storageConfig.NewStorage()
}

// convert to a map between group and groupVersions.
Expand Down Expand Up @@ -360,7 +340,7 @@ func updateEtcdOverrides(overrides []string, storageVersions map[string]string,
}

servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn("", servers, apigroup.InterfacesFor, storageVersions[apigroup.GroupVersion.Group], prefix)
etcdOverrideStorage, err := newEtcdFn(servers, apigroup.InterfacesFor, storageVersions[apigroup.GroupVersion.Group], prefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}
Expand All @@ -386,8 +366,8 @@ func (s *APIServer) Run(_ []string) error {
}
glog.Infof("Will report %v as public IP address.", s.AdvertiseAddress)

if (s.EtcdConfigFile != "" && len(s.EtcdServerList) != 0) || (s.EtcdConfigFile == "" && len(s.EtcdServerList) == 0) {
glog.Fatalf("Specify either --etcd-servers or --etcd-config")
if len(s.EtcdServerList) == 0 {
glog.Fatalf("--etcd-servers must be specified")
}

if s.KubernetesServiceNodePort > 0 && !s.ServiceNodePortRange.Contains(s.KubernetesServiceNodePort) {
Expand Down Expand Up @@ -471,7 +451,7 @@ func (s *APIServer) Run(_ []string) error {
if _, found := storageVersions[legacyV1Group.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.GroupVersion.Group, storageVersions)
}
etcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
etcdStorage, err := newEtcd(s.EtcdServerList, legacyV1Group.InterfacesFor, storageVersions[legacyV1Group.GroupVersion.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
Expand All @@ -485,7 +465,7 @@ func (s *APIServer) Run(_ []string) error {
if _, found := storageVersions[expGroup.GroupVersion.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.GroupVersion.Group, storageVersions)
}
expEtcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
expEtcdStorage, err := newEtcd(s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.GroupVersion.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid extensions storage version or misconfigured etcd: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestUpdateEtcdOverrides(t *testing.T) {
}

for _, test := range testCases {
newEtcd := func(_ string, serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
newEtcd := func(serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}
Expand Down
5 changes: 2 additions & 3 deletions contrib/mesos/pkg/election/etcd_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)
Expand All @@ -38,15 +37,15 @@ type Master string
func (Master) IsAnAPIObject() {}

// NewEtcdMasterElector returns an implementation of election.MasterElector backed by etcd.
func NewEtcdMasterElector(h tools.EtcdClient) MasterElector {
func NewEtcdMasterElector(h *etcd.Client) MasterElector {
return &etcdMasterElector{etcd: h}
}

type empty struct{}

// internal implementation struct
type etcdMasterElector struct {
etcd tools.EtcdClient
etcd *etcd.Client
done chan empty
events chan watch.Event
}
Expand Down
71 changes: 6 additions & 65 deletions contrib/mesos/pkg/election/etcd_master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ limitations under the License.
package election

import (
"reflect"
"testing"

"github.com/coreos/go-etcd/etcd"
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/watch"
)

Expand Down Expand Up @@ -57,77 +54,21 @@ func TestEtcdMasterNoOther(t *testing.T) {
w.Stop()
}

// MockClient is wrapper aroung tools.EtcdClient.
type MockClient struct {
client tools.EtcdClient
t *testing.T
// afterGetFunc is called after each Get() call.
afterGetFunc func()
calls []string
}

func (m *MockClient) GetCluster() []string {
return m.client.GetCluster()
}

func (m *MockClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
m.calls = append(m.calls, "get")
defer m.afterGetFunc()
response, err := m.client.Get(key, sort, recursive)
return response, err
}

func (m *MockClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
return m.client.Set(key, value, ttl)
}

func (m *MockClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
m.calls = append(m.calls, "create")
return m.client.Create(key, value, ttl)
}

func (m *MockClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) {
return m.client.CompareAndSwap(key, value, ttl, prevValue, prevIndex)
}

func (m *MockClient) Delete(key string, recursive bool) (*etcd.Response, error) {
return m.client.Delete(key, recursive)
}

func (m *MockClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
return m.client.Watch(prefix, waitIndex, recursive, receiver, stop)
}

func TestEtcdMasterNoOtherThenConflict(t *testing.T) {
server := etcdtesting.NewEtcdTestClientServer(t)
defer server.Terminate(t)

// We set up the following scenario:
// - after each Get() call, we write "baz" to a path
// - this is simulating someone else writing a data
// - the value written by someone else is the new value
path := "foo"
client := &MockClient{
client: server.Client,
t: t,
afterGetFunc: func() {
if _, err := server.Client.Set(path, "baz", 0); err != nil {
t.Errorf("unexpected error: %v", err)
}
},
calls: make([]string, 0),
}
master := NewEtcdMasterElector(server.Client)
leader := NewEtcdMasterElector(server.Client)

master := NewEtcdMasterElector(client)
w_ldr := leader.Elect(path, "baz")
result := <-w_ldr.ResultChan()
w := master.Elect(path, "bar")
result := <-w.ResultChan()
result = <-w.ResultChan()
if result.Type != watch.Modified || result.Object.(Master) != "baz" {
t.Errorf("unexpected event: %#v", result)
}
w.Stop()

expectedCalls := []string{"get", "create", "get"}
if !reflect.DeepEqual(client.calls, expectedCalls) {
t.Errorf("unexpected calls: %#v", client.calls)
}
w_ldr.Stop()
}
7 changes: 3 additions & 4 deletions contrib/mesos/pkg/scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ import (
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"

// lock to this API version, compilation will fail when this becomes unsupported
_ "k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -630,7 +629,7 @@ func validateLeadershipTransition(desired, current string) {
}

// hacked from https://github.com/GoogleCloudPlatform/kubernetes/blob/release-0.14/cmd/kube-apiserver/app/server.go
func newEtcd(etcdConfigFile string, etcdServerList []string) (client tools.EtcdClient, err error) {
func newEtcd(etcdConfigFile string, etcdServerList []string) (client *etcd.Client, err error) {
if etcdConfigFile != "" {
client, err = etcd.NewClientFromFile(etcdConfigFile)
} else {
Expand All @@ -639,7 +638,7 @@ func newEtcd(etcdConfigFile string, etcdServerList []string) (client tools.EtcdC
return
}

func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, tools.EtcdClient, *mesos.ExecutorID) {
func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config) (*ha.SchedulerProcess, ha.DriverFactory, *etcd.Client, *mesos.ExecutorID) {
s.frameworkName = strings.TrimSpace(s.frameworkName)
if s.frameworkName == "" {
log.Fatalf("framework-name must be a non-empty string")
Expand Down Expand Up @@ -917,7 +916,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred
return
}

func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) {
func (s *SchedulerServer) fetchFrameworkID(client *etcd.Client) (*mesos.FrameworkID, error) {
if s.failoverTimeout > 0 {
if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil {
if !etcdutil.IsEtcdNotFound(err) {
Expand Down
3 changes: 1 addition & 2 deletions docs/admin/kube-apiserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ kube-apiserver
--cloud-provider="": The provider for cloud services. Empty string for no provider.
--cluster-name="kubernetes": The instance prefix for the cluster
--cors-allowed-origins=[]: List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.
--etcd-config="": The config file for the etcd client. Mutually exclusive with -etcd-servers.
--etcd-prefix="/registry": The prefix for all resource paths in etcd.
--etcd-servers=[]: List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config
--etcd-servers-overrides=[]: Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.
Expand Down Expand Up @@ -107,7 +106,7 @@ kube-apiserver
--watch-cache[=true]: Enable watch caching in the apiserver
```

###### Auto generated by spf13/cobra on 3-Dec-2015
###### Auto generated by spf13/cobra on 9-Dec-2015


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
Expand Down
42 changes: 38 additions & 4 deletions pkg/storage/etcd/etcd_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,64 @@ limitations under the License.
package etcd

import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"path"
"reflect"
"strings"
"time"

"github.com/coreos/go-etcd/etcd"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/storage/etcd/metrics"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/tools"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"

"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"golang.org/x/net/context"
forked "k8s.io/kubernetes/third_party/forked/coreos/go-etcd/etcd"
)

func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string) storage.Interface {
// storage.Config object for etcd.
type EtcdConfig struct {
ServerList []string
Codec runtime.Codec
Prefix string
}

// implements storage.Config
func (c *EtcdConfig) GetType() string {
return "etcd"
}

// implements storage.Config
func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
etcdClient := etcd.NewClient(c.ServerList)
if etcdClient == nil {
return nil, errors.New("Failed to create new etcd client from serverlist")
}
transport := &http.Transport{
Dial: forked.Dial,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConnsPerHost: 500,
}
etcdClient.SetTransport(transport)

return NewEtcdStorage(etcdClient, c.Codec, c.Prefix), nil
}

// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
func NewEtcdStorage(client *etcd.Client, codec runtime.Codec, prefix string) storage.Interface {
return &etcdHelper{
client: client,
codec: codec,
Expand All @@ -53,7 +87,7 @@ func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string)

// etcdHelper is the reference implementation of storage.Interface.
type etcdHelper struct {
client tools.EtcdClient
client *etcd.Client
codec runtime.Codec
copier runtime.ObjectCopier
// optional, has to be set to perform any atomic operations
Expand Down
7 changes: 2 additions & 5 deletions pkg/storage/etcd/etcd_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"testing"

"github.com/coreos/go-etcd/etcd"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/api"
Expand All @@ -34,10 +35,6 @@ import (
etcdtesting "k8s.io/kubernetes/pkg/storage/etcd/testing"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
storagetesting "k8s.io/kubernetes/pkg/storage/testing"

// TODO: once fakeClient has been purged move utils
// and eliminate these deps
"k8s.io/kubernetes/pkg/tools"
)

const validEtcdVersion = "etcd 2.0.9"
Expand All @@ -58,7 +55,7 @@ func init() {
)
}

func newEtcdHelper(client tools.EtcdClient, codec runtime.Codec, prefix string) etcdHelper {
func newEtcdHelper(client *etcd.Client, codec runtime.Codec, prefix string) etcdHelper {
return *NewEtcdStorage(client, codec, prefix).(*etcdHelper)
}

Expand Down
Loading

0 comments on commit d3243b8

Please sign in to comment.