From bbb447916674cc4bba3ad1e98414af4de356f9d6 Mon Sep 17 00:00:00 2001 From: Joe Beda Date: Fri, 30 Jan 2015 15:31:36 -0800 Subject: [PATCH] Convert controller-manager to hyperkube. --- cmd/hyperkube/hyperkube.go | 2 + .../controller-manager.go | 99 +--------- pkg/api/resource/quantity.go | 8 +- pkg/controllermanager/controllermanager.go | 173 ++++++++++++++++++ .../controllermanager}/plugins.go | 8 +- 5 files changed, 192 insertions(+), 98 deletions(-) create mode 100644 pkg/controllermanager/controllermanager.go rename {cmd/kube-controller-manager => pkg/controllermanager}/plugins.go (81%) diff --git a/cmd/hyperkube/hyperkube.go b/cmd/hyperkube/hyperkube.go index fc00b55110791..c6af4c7ed3585 100644 --- a/cmd/hyperkube/hyperkube.go +++ b/cmd/hyperkube/hyperkube.go @@ -21,6 +21,7 @@ package main import ( "os" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controllermanager" "github.com/GoogleCloudPlatform/kubernetes/pkg/hyperkube" apiserver "github.com/GoogleCloudPlatform/kubernetes/pkg/master/server" ) @@ -32,6 +33,7 @@ func main() { } hk.AddServer(apiserver.NewHyperkubeServer()) + hk.AddServer(controllermanager.NewHyperkubeServer()) hk.RunToExit(os.Args) } diff --git a/cmd/kube-controller-manager/controller-manager.go b/cmd/kube-controller-manager/controller-manager.go index 1eeb101b53564..c777099cd247b 100644 --- a/cmd/kube-controller-manager/controller-manager.go +++ b/cmd/kube-controller-manager/controller-manager.go @@ -21,109 +21,22 @@ limitations under the License. package main import ( - "net" - "net/http" - "strconv" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" - nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" - replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" - _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" - "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" - "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" - "github.com/GoogleCloudPlatform/kubernetes/pkg/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controllermanager" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version/verflag" - "github.com/golang/glog" - flag "github.com/spf13/pflag" -) - -var ( - port = flag.Int("port", ports.ControllerManagerPort, "The port that the controller-manager's http service runs on") - address = util.IP(net.ParseIP("127.0.0.1")) - clientConfig = &client.Config{} - cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.") - cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.") - minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.") - nodeSyncPeriod = flag.Duration("node_sync_period", 10*time.Second, ""+ - "The period for syncing nodes from cloudprovider. Longer periods will result in "+ - "fewer calls to cloud provider, but may delay addition of new nodes to cluster.") - resourceQuotaSyncPeriod = flag.Duration("resource_quota_sync_period", 10*time.Second, "The period for syncing quota usage status in the system") - registerRetryCount = flag.Int("register_retry_count", 10, ""+ - "The number of retries for initial node registration. Retry interval equals node_sync_period.") - machineList util.StringList - // TODO: Discover these by pinging the host machines, and rip out these flags. - // TODO: in the meantime, use resource.QuantityFlag() instead of these - nodeMilliCPU = flag.Int64("node_milli_cpu", 1000, "The amount of MilliCPU provisioned on each node") - nodeMemory = resource.QuantityFlag("node_memory", "3Gi", "The amount of memory (in bytes) provisioned on each node") - kubeletConfig = client.KubeletConfig{Port: ports.KubeletPort, EnableHttps: false} + "github.com/spf13/pflag" ) -func init() { - flag.Var(&address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") - flag.Var(&machineList, "machines", "List of machines to schedule onto, comma separated.") - client.BindClientConfigFlags(flag.CommandLine, clientConfig) - client.BindKubeletClientConfigFlags(flag.CommandLine, &kubeletConfig) -} - -func verifyMinionFlags() { - if *cloudProvider == "" || *minionRegexp == "" { - if len(machineList) == 0 { - glog.Info("No machines specified!") - } - return - } - if len(machineList) != 0 { - glog.Info("-machines is overwritten by -minion_regexp") - } -} - func main() { + s := controllermanager.NewCMServer() + s.AddFlags(pflag.CommandLine) + util.InitFlags() util.InitLogs() defer util.FlushLogs() verflag.PrintAndExitIfRequested() - verifyMinionFlags() - - if len(clientConfig.Host) == 0 { - glog.Fatal("usage: controller-manager -master ") - } - - kubeClient, err := client.New(clientConfig) - if err != nil { - glog.Fatalf("Invalid API configuration: %v", err) - } - - go http.ListenAndServe(net.JoinHostPort(address.String(), strconv.Itoa(*port)), nil) - - endpoints := service.NewEndpointController(kubeClient) - go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) - - controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) - controllerManager.Run(10 * time.Second) - - kubeletClient, err := client.NewKubeletClient(&kubeletConfig) - if err != nil { - glog.Fatalf("Failure to start kubelet client: %v", err) - } - cloud := cloudprovider.InitCloudProvider(*cloudProvider, *cloudConfigFile) - nodeResources := &api.NodeResources{ - Capacity: api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(*nodeMilliCPU, resource.DecimalSI), - api.ResourceMemory: *nodeMemory, - }, - } - nodeController := nodeControllerPkg.NewNodeController(cloud, *minionRegexp, machineList, nodeResources, kubeClient, kubeletClient) - nodeController.Run(*nodeSyncPeriod, *registerRetryCount) - - resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) - resourceQuotaManager.Run(*resourceQuotaSyncPeriod) - select {} + s.Run(pflag.CommandLine.Args()) } diff --git a/pkg/api/resource/quantity.go b/pkg/api/resource/quantity.go index 7ec8bf9c4cdbe..01a7e60a64464 100644 --- a/pkg/api/resource/quantity.go +++ b/pkg/api/resource/quantity.go @@ -411,6 +411,12 @@ func (qf qFlag) Type() string { // Will panic if defaultValue is not a valid quantity. func QuantityFlag(flagName, defaultValue, description string) *Quantity { q := MustParse(defaultValue) - flag.Var(qFlag{&q}, flagName, description) + flag.Var(NewQuantityFlagValue(&q), flagName, description) return &q } + +// NewQuantityFlagValue returns an object that can be used to back a flag, +// pointing at the given Quantity variable. +func NewQuantityFlagValue(q *Quantity) flag.Value { + return qFlag{q} +} diff --git a/pkg/controllermanager/controllermanager.go b/pkg/controllermanager/controllermanager.go new file mode 100644 index 0000000000000..149a002701596 --- /dev/null +++ b/pkg/controllermanager/controllermanager.go @@ -0,0 +1,173 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package controllermanager implements a server that runs a set of active +// components. This includes replication controllers, service endpoints and +// nodes. +package controllermanager + +import ( + "net" + "net/http" + "strconv" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" + replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" + _ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz" + "github.com/GoogleCloudPlatform/kubernetes/pkg/hyperkube" + "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" + "github.com/GoogleCloudPlatform/kubernetes/pkg/resourcequota" + "github.com/GoogleCloudPlatform/kubernetes/pkg/service" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + + "github.com/golang/glog" + "github.com/spf13/pflag" +) + +// CMServer is the mail context object for the controller manager. +type CMServer struct { + Port int + Address util.IP + ClientConfig client.Config + CloudProvider string + CloudConfigFile string + MinionRegexp string + NodeSyncPeriod time.Duration + ResourceQuotaSyncPeriod time.Duration + RegisterRetryCount int + MachineList util.StringList + + // TODO: Discover these by pinging the host machines, and rip out these params. + NodeMilliCPU int64 + NodeMemory resource.Quantity + + KubeletConfig client.KubeletConfig +} + +// NewCMServer creates a new CMServer with default a default config. +func NewCMServer() *CMServer { + s := CMServer{ + Port: ports.ControllerManagerPort, + Address: util.IP(net.ParseIP("127.0.0.1")), + NodeSyncPeriod: 10 * time.Second, + ResourceQuotaSyncPeriod: 10 * time.Second, + RegisterRetryCount: 10, + NodeMilliCPU: 1000, + NodeMemory: resource.MustParse("3Gi"), + KubeletConfig: client.KubeletConfig{ + Port: ports.KubeletPort, + EnableHttps: false, + }, + } + return &s +} + +// NewHyperkubeServer creates a new hyperkube Server object that includes the +// description and flags. +func NewHyperkubeServer() *hyperkube.Server { + s := NewCMServer() + + hks := hyperkube.Server{ + SimpleUsage: "controller-manager", + Long: "A server that runs a set of active components. This includes replication controllers, service endpoints and nodes.", + Run: func(_ *hyperkube.Server, args []string) error { + return s.Run(args) + }, + } + s.AddFlags(hks.Flags()) + return &hks +} + +// AddFlags adds flags for a specific CMServer to the specified FlagSet +func (s *CMServer) AddFlags(fs *pflag.FlagSet) { + fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on") + fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") + client.BindClientConfigFlags(fs, &s.ClientConfig) + fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") + fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") + fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.") + fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ + "The period for syncing nodes from cloudprovider. Longer periods will result in "+ + "fewer calls to cloud provider, but may delay addition of new nodes to cluster.") + fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") + fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+ + "The number of retries for initial node registration. Retry interval equals node_sync_period.") + fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") + // TODO: Discover these by pinging the host machines, and rip out these flags. + // TODO: in the meantime, use resource.QuantityFlag() instead of these + fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") + fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node_memory", "The amount of memory (in bytes) provisioned on each node") + client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig) +} + +func (s *CMServer) verifyMinionFlags() { + if s.CloudProvider == "" || s.MinionRegexp == "" { + if len(s.MachineList) == 0 { + glog.Info("No machines specified!") + } + return + } + if len(s.MachineList) != 0 { + glog.Info("--machines is overwritten by --minion_regexp") + } +} + +// Run runs the CMServer. This should never exit. +func (s *CMServer) Run(_ []string) error { + s.verifyMinionFlags() + + if len(s.ClientConfig.Host) == 0 { + glog.Fatal("usage: controller-manager --master ") + } + + kubeClient, err := client.New(&s.ClientConfig) + if err != nil { + glog.Fatalf("Invalid API configuration: %v", err) + } + + go http.ListenAndServe(net.JoinHostPort(s.Address.String(), strconv.Itoa(s.Port)), nil) + + endpoints := service.NewEndpointController(kubeClient) + go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) + + controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) + controllerManager.Run(10 * time.Second) + + kubeletClient, err := client.NewKubeletClient(&s.KubeletConfig) + if err != nil { + glog.Fatalf("Failure to start kubelet client: %v", err) + } + cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) + nodeResources := &api.NodeResources{ + Capacity: api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(s.NodeMilliCPU, resource.DecimalSI), + api.ResourceMemory: s.NodeMemory, + }, + } + nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient) + nodeController.Run(s.NodeSyncPeriod, s.RegisterRetryCount) + + resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) + resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod) + + select {} + return nil +} diff --git a/cmd/kube-controller-manager/plugins.go b/pkg/controllermanager/plugins.go similarity index 81% rename from cmd/kube-controller-manager/plugins.go rename to pkg/controllermanager/plugins.go index 64908e509a38a..f58afbdc4c8f7 100644 --- a/cmd/kube-controller-manager/plugins.go +++ b/pkg/controllermanager/plugins.go @@ -14,12 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package main +package controllermanager -// This file exists to force the desired plugin implementations to be linked. -// This should probably be part of some configuration fed into the build for a -// given binary target. import ( + // This file exists to force the desired plugin implementations to be linked. + // This should probably be part of some configuration fed into the build for a + // given binary target. _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/openstack"