Skip to content

Commit

Permalink
Events in separate etcd
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtek-t committed Oct 5, 2015
1 parent b9cfab8 commit 0f1cbe3
Show file tree
Hide file tree
Showing 14 changed files with 290 additions and 88 deletions.
2 changes: 1 addition & 1 deletion cluster/saltbase/salt/etcd/etcd.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"command": [
"/bin/sh",
"-c",
"/usr/local/bin/etcd --listen-peer-urls=http://127.0.0.1:{{ server_port }} --addr 127.0.0.1:{{ port }} --bind-addr 127.0.0.1:{{ port }} --data-dir /var/etcd/data{{ suffix }} 1>>/var/log/etcd{{ suffix }}.log 2>&1"
"/usr/local/bin/etcd --listen-peer-urls http://127.0.0.1:{{ server_port }} --addr 127.0.0.1:{{ port }} --bind-addr 127.0.0.1:{{ port }} --data-dir /var/etcd/data{{ suffix }} 1>>/var/log/etcd{{ suffix }}.log 2>&1"
],
"livenessProbe": {
"httpGet": {
Expand Down
3 changes: 2 additions & 1 deletion cluster/saltbase/salt/kube-apiserver/kube-apiserver.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
{% endif -%}

{% set etcd_servers = "--etcd-servers=http://127.0.0.1:4001" -%}
{% set etcd_servers_overrides = "--etcd-servers-overrides=/events#http://127.0.0.1:4002" -%}

{% set service_cluster_ip_range = "" -%}
{% if pillar['service_cluster_ip_range'] is defined -%}
Expand Down Expand Up @@ -88,7 +89,7 @@
{% set runtime_config = "--runtime-config=" + grains.runtime_config -%}
{% endif -%}

{% set params = address + " " + etcd_servers + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%}
{% set params = address + " " + etcd_servers + " " + etcd_servers_overrides + " " + cloud_provider + " " + cloud_config + " " + runtime_config + " " + admission_control + " " + service_cluster_ip_range + " " + client_ca_file + " " + basic_auth_file + " " + min_request_timeout -%}
{% set params = params + " " + cluster_name + " " + cert_file + " " + key_file + " --secure-port=" + secure_port + " " + token_auth_file + " " + bind_address + " " + pillar['log_level'] + " " + advertise_address + " " + proxy_ssh_options -%}

# test_args has to be kept at the end, so they'll overwrite any prior configuration
Expand Down
7 changes: 4 additions & 3 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,12 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
}
expEtcdStorage, err := master.NewEtcdStorage(etcdClient, latest.GroupOrDie("experimental").InterfacesFor, testapi.Experimental.GroupAndVersion(), etcdtest.PathPrefix())
storageVersions["experimental"] = testapi.Experimental.GroupAndVersion()

if err != nil {
glog.Fatalf("Unable to get etcd storage for experimental: %v", err)
}
storageDestinations := master.NewStorageDestinations()
storageDestinations.AddAPIGroup("", etcdStorage)
storageDestinations.AddAPIGroup("experimental", expEtcdStorage)

// Master
host, port, err := net.SplitHostPort(strings.TrimLeft(apiServer.URL, "http://"))
Expand All @@ -166,8 +168,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string

// Create a master and install handlers into mux.
m := master.New(&master.Config{
DatabaseStorage: etcdStorage,
ExpDatabaseStorage: expEtcdStorage,
StorageDestinations: storageDestinations,
KubeletClient: fakeKubeletClient{},
EnableCoreControllers: true,
EnableLogsSupport: false,
Expand Down
58 changes: 52 additions & 6 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type APIServer struct {
AdmissionControlConfigFile string
EtcdServerList []string
EtcdConfigFile string
EtcdServersOverrides []string
EtcdPathPrefix string
CorsAllowedOriginList []string
AllowPrivileged bool
Expand Down Expand Up @@ -211,6 +212,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.AdmissionControlConfigFile, "admission-control-config-file", s.AdmissionControlConfigFile, "File with admission control configuration.")
fs.StringSliceVar(&s.EtcdServerList, "etcd-servers", s.EtcdServerList, "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.")
fs.StringSliceVar(&s.EtcdServersOverrides, "etcd-servers-overrides", s.EtcdServersOverrides, "Per-resource etcd servers overrides, comma separated. The individual override format: group/resource#servers, where servers are http://ip:port, semicolon separated.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.StringSliceVar(&s.CorsAllowedOriginList, "cors-allowed-origins", s.CorsAllowedOriginList, "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
Expand Down Expand Up @@ -253,6 +255,8 @@ func (s *APIServer) verifyClusterIPFlags() {
}
}

type newEtcdFunc func(string, []string, meta.VersionInterfacesFunc, string, string) (storage.Interface, error)

func newEtcd(etcdConfigFile string, etcdServerList []string, interfacesFunc meta.VersionInterfacesFunc, storageVersion, pathPrefix string) (etcdStorage storage.Interface, err error) {
if storageVersion == "" {
return etcdStorage, fmt.Errorf("storageVersion is required to create a etcd storage")
Expand Down Expand Up @@ -294,6 +298,45 @@ func generateStorageVersionMap(legacyVersion string, storageVersions string) map
return storageVersionMap
}

// parse the value of --etcd-servers-overrides and update given storageDestinations.
func updateEtcdOverrides(overrides []string, storageVersions map[string]string, prefix string, storageDestinations *master.StorageDestinations, newEtcdFn newEtcdFunc) {
if len(overrides) == 0 {
return
}
for _, override := range overrides {
tokens := strings.Split(override, "#")
if len(tokens) != 2 {
glog.Errorf("invalid value of etcd server overrides: %s", override)
continue
}

apiresource := strings.Split(tokens[0], "/")
if len(apiresource) != 2 {
glog.Errorf("invalid resource definition: %s", tokens[0])
}
group := apiresource[0]
resource := apiresource[1]

apigroup, err := latest.Group(group)
if err != nil {
glog.Errorf("invalid api group %s: %v", group, err)
continue
}
if _, found := storageVersions[apigroup.Group]; !found {
glog.Errorf("Couldn't find the storage version for group %s", apigroup.Group)
continue
}

servers := strings.Split(tokens[1], ";")
etcdOverrideStorage, err := newEtcdFn("", servers, apigroup.InterfacesFor, storageVersions[apigroup.Group], prefix)
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd for %s: %v", tokens[0], err)
}

storageDestinations.AddStorageOverride(group, resource, etcdOverrideStorage)
}
}

// Run runs the specified APIServer. This should never exit.
func (s *APIServer) Run(_ []string) error {
s.verifyClusterIPFlags()
Expand Down Expand Up @@ -369,6 +412,8 @@ func (s *APIServer) Run(_ []string) error {
return err
}

storageDestinations := master.NewStorageDestinations()

storageVersions := generateStorageVersionMap(s.DeprecatedStorageVersion, s.StorageVersions)
if _, found := storageVersions[legacyV1Group.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", legacyV1Group.Group, storageVersions)
Expand All @@ -377,8 +422,8 @@ func (s *APIServer) Run(_ []string) error {
if err != nil {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup("", etcdStorage)

var expEtcdStorage storage.Interface
if enableExp {
expGroup, err := latest.Group("experimental")
if err != nil {
Expand All @@ -387,12 +432,15 @@ func (s *APIServer) Run(_ []string) error {
if _, found := storageVersions[expGroup.Group]; !found {
glog.Fatalf("Couldn't find the storage version for group: %q in storageVersions: %v", expGroup.Group, storageVersions)
}
expEtcdStorage, err = newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix)
expEtcdStorage, err := newEtcd(s.EtcdConfigFile, s.EtcdServerList, expGroup.InterfacesFor, storageVersions[expGroup.Group], s.EtcdPathPrefix)
if err != nil {
glog.Fatalf("Invalid experimental storage version or misconfigured etcd: %v", err)
}
storageDestinations.AddAPIGroup("experimental", expEtcdStorage)
}

updateEtcdOverrides(s.EtcdServersOverrides, storageVersions, s.EtcdPathPrefix, &storageDestinations, newEtcd)

n := s.ServiceClusterIPRange

// Default to the private server key for service account token signing
Expand Down Expand Up @@ -460,10 +508,8 @@ func (s *APIServer) Run(_ []string) error {
}
}
config := &master.Config{
DatabaseStorage: etcdStorage,
ExpDatabaseStorage: expEtcdStorage,
StorageVersions: storageVersions,

StorageDestinations: storageDestinations,
StorageVersions: storageVersions,
EventTTL: s.EventTTL,
KubeletClient: kubeletClient,
ServiceClusterIPRange: &n,
Expand Down
56 changes: 56 additions & 0 deletions cmd/kube-apiserver/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package app
import (
"reflect"
"regexp"
"strings"
"testing"

"k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/storage"
)

func TestLongRunningRequestRegexp(t *testing.T) {
Expand Down Expand Up @@ -98,3 +103,54 @@ func TestGenerateStorageVersionMap(t *testing.T) {
}
}
}

func TestUpdateEtcdOverrides(t *testing.T) {
storageVersions := generateStorageVersionMap("", "v1,experimental/v1alpha1")

testCases := []struct {
apigroup string
resource string
servers []string
}{
{
apigroup: "",
resource: "resource",
servers: []string{"http://127.0.0.1:10000"},
},
{
apigroup: "",
resource: "resource",
servers: []string{"http://127.0.0.1:10000", "http://127.0.0.1:20000"},
},
{
apigroup: "experimental",
resource: "resource",
servers: []string{"http://127.0.0.1:10000"},
},
}

for _, test := range testCases {
newEtcd := func(_ string, serverList []string, _ meta.VersionInterfacesFunc, _, _ string) (storage.Interface, error) {
if !reflect.DeepEqual(test.servers, serverList) {
t.Errorf("unexpected server list, expected: %#v, got: %#v", test.servers, serverList)
}
return nil, nil
}
storageDestinations := master.NewStorageDestinations()
override := test.apigroup + "/" + test.resource + "#" + strings.Join(test.servers, ";")
updateEtcdOverrides([]string{override}, storageVersions, "", &storageDestinations, newEtcd)
apigroup, ok := storageDestinations.APIGroups[test.apigroup]
if !ok {
t.Errorf("apigroup: %s not created", test.apigroup)
continue
}
if apigroup.Overrides == nil {
t.Errorf("Overrides not created for: %s", test.apigroup)
continue
}
if _, ok := apigroup.Overrides[test.resource]; !ok {
t.Errorf("override not created for: %s", test.resource)
continue
}
}
}
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ etcd-config
etcd-prefix
etcd-server
etcd-servers
etcd-servers-overrides
event-burst
event-qps
event-ttl
Expand Down
Loading

0 comments on commit 0f1cbe3

Please sign in to comment.