Skip to content

Commit

Permalink
add federation service controller
Browse files Browse the repository at this point in the history
  • Loading branch information
mfanjie committed May 28, 2016
1 parent 4983183 commit 6133db3
Show file tree
Hide file tree
Showing 14 changed files with 1,969 additions and 10 deletions.
3 changes: 2 additions & 1 deletion docs/admin/federation-controller-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ federation-controller-manager
```
--address=0.0.0.0: The IP address to serve on (set to 0.0.0.0 for all interfaces)
--cluster-monitor-period=40s: The period for syncing ClusterStatus in ClusterController.
--concurrent-service-syncs=10: The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load
--federated-api-burst=30: Burst to use while talking with federation apiserver
--federated-api-qps=20: QPS to use while talking with federation apiserver
--kube-api-content-type="": ContentType of requests sent to apiserver. Passing application/vnd.kubernetes.protobuf is an experimental feature now.
Expand All @@ -65,7 +66,7 @@ federation-controller-manager
--profiling[=true]: Enable profiling via web interface host:port/debug/pprof/
```

###### Auto generated by spf13/cobra on 25-May-2016
###### Auto generated by spf13/cobra on 29-May-2016


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
Expand Down
35 changes: 32 additions & 3 deletions federation/client/cache/cluster_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ limitations under the License.
package cache

import (
federation_v1alpha1 "k8s.io/kubernetes/federation/apis/federation/v1alpha1"
"github.com/golang/glog"
"k8s.io/kubernetes/federation/apis/federation"
kubeCache "k8s.io/kubernetes/pkg/client/cache"
)

Expand All @@ -27,9 +28,37 @@ type StoreToClusterLister struct {
kubeCache.Store
}

func (s *StoreToClusterLister) List() (clusters federation_v1alpha1.ClusterList, err error) {
func (s *StoreToClusterLister) List() (clusters federation.ClusterList, err error) {
for _, m := range s.Store.List() {
clusters.Items = append(clusters.Items, *(m.(*federation_v1alpha1.Cluster)))
clusters.Items = append(clusters.Items, *(m.(*federation.Cluster)))
}
return clusters, nil
}

// ClusterConditionPredicate is a function that indicates whether the given cluster's conditions meet
// some set of criteria defined by the function.
type ClusterConditionPredicate func(cluster federation.Cluster) bool

// storeToClusterConditionLister filters and returns nodes matching the given type and status from the store.
type storeToClusterConditionLister struct {
store kubeCache.Store
predicate ClusterConditionPredicate
}

// ClusterCondition returns a storeToClusterConditionLister
func (s *StoreToClusterLister) ClusterCondition(predicate ClusterConditionPredicate) storeToClusterConditionLister {
return storeToClusterConditionLister{s.Store, predicate}
}

// List returns a list of clusters that match the conditions defined by the predicate functions in the storeToClusterConditionLister.
func (s storeToClusterConditionLister) List() (clusters federation.ClusterList, err error) {
for _, m := range s.store.List() {
cluster := *m.(*federation.Cluster)
if s.predicate(cluster) {
clusters.Items = append(clusters.Items, cluster)
} else {
glog.V(5).Infof("Cluster %s matches none of the conditions", cluster.Name)
}
}
return
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,15 @@ import (
"k8s.io/kubernetes/federation/cmd/federation-controller-manager/app/options"
"k8s.io/kubernetes/pkg/client/restclient"

internalclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_internalclientset"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_release_1_3"
"k8s.io/kubernetes/federation/pkg/dnsprovider"
clustercontroller "k8s.io/kubernetes/federation/pkg/federation-controller/cluster"
servicecontroller "k8s.io/kubernetes/federation/pkg/federation-controller/service"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -103,7 +107,17 @@ func Run(s *options.CMServer) error {
}

func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) error {

federationClientSet := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "cluster-controller"))
go clustercontroller.NewclusterController(federationClientSet, s.ClusterMonitorPeriod.Duration).Run()
dns, err := dnsprovider.InitDnsProvider(s.DnsProvider, s.DnsConfigFile)
if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err)
}
scclientset := internalclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, servicecontroller.UserAgentName))
servicecontroller := servicecontroller.New(scclientset, dns)
if err := servicecontroller.Run(s.ConcurrentServiceSyncs, wait.NeverStop); err != nil {
glog.Errorf("Failed to start service controller: %v", err)
}
select {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ type ControllerManagerConfiguration struct {
Port int `json:"port"`
// address is the IP address to serve on (set to 0.0.0.0 for all interfaces).
Address string `json:"address"`
// dnsProvider is the provider for dns services.
DnsProvider string `json:"dnsProvider"`
// dnsConfigFile is the path to the dns provider configuration file.
DnsConfigFile string `json:"ndsConfigFile"`
// concurrentServiceSyncs is the number of services that are
// allowed to sync concurrently. Larger number = more responsive service
// management, but more CPU (and network) load.
ConcurrentServiceSyncs int `json:"concurrentServiceSyncs"`
// clusterMonitorPeriod is the period for syncing ClusterStatus in cluster controller.
ClusterMonitorPeriod unversioned.Duration `json:"clusterMonitorPeriod"`
// APIServerQPS is the QPS to use while talking with federation apiserver.
Expand Down Expand Up @@ -63,12 +71,13 @@ const (
func NewCMServer() *CMServer {
s := CMServer{
ControllerManagerConfiguration: ControllerManagerConfiguration{
Port: FederatedControllerManagerPort,
Address: "0.0.0.0",
ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second},
APIServerQPS: 20.0,
APIServerBurst: 30,
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
Port: FederatedControllerManagerPort,
Address: "0.0.0.0",
ConcurrentServiceSyncs: 10,
ClusterMonitorPeriod: unversioned.Duration{Duration: 40 * time.Second},
APIServerQPS: 20.0,
APIServerBurst: 30,
LeaderElection: leaderelection.DefaultLeaderElectionConfiguration(),
},
}
return &s
Expand All @@ -78,6 +87,7 @@ func NewCMServer() *CMServer {
func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.Port, "port", s.Port, "The port that the controller-manager's http service runs on")
fs.Var(componentconfig.IPVar{Val: &s.Address}, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
fs.IntVar(&s.ConcurrentServiceSyncs, "concurrent-service-syncs", s.ConcurrentServiceSyncs, "The number of service syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.DurationVar(&s.ClusterMonitorPeriod.Duration, "cluster-monitor-period", s.ClusterMonitorPeriod.Duration, "The period for syncing ClusterStatus in ClusterController.")
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
fs.StringVar(&s.Master, "master", s.Master, "The address of the federation API server (overrides any value in kubeconfig)")
Expand Down
207 changes: 207 additions & 0 deletions federation/pkg/federation-controller/service/cluster_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

import (
"sync"

"k8s.io/kubernetes/federation/apis/federation"
"k8s.io/kubernetes/pkg/api"
cache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch"

"github.com/golang/glog"
"reflect"
)

type clusterCache struct {
clientset *clientset.Clientset
cluster *federation.Cluster
// A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister
// Watches changes to all services
serviceController *framework.Controller
// A store of endpoint, populated by the serviceController
endpointStore cache.StoreToEndpointsLister
// Watches changes to all endpoints
endpointController *framework.Controller
// services that need to be synced
serviceQueue *workqueue.Type
// endpoints that need to be synced
endpointQueue *workqueue.Type
}

type clusterClientCache struct {
rwlock sync.Mutex // protects serviceMap
clientMap map[string]*clusterCache
}

func (cc *clusterClientCache) startClusterLW(cluster *federation.Cluster, clusterName string) {
cachedClusterClient, ok := cc.clientMap[clusterName]
// only create when no existing cachedClusterClient
if ok {
if !reflect.DeepEqual(cachedClusterClient.cluster.Spec, cluster.Spec) {
//rebuild clientset when cluster spec is changed
clientset, err := newClusterClientset(cluster)
if err != nil || clientset == nil {
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
}
glog.V(4).Infof("Cluster spec changed, rebuild clientset for cluster %s", clusterName)
cachedClusterClient.clientset = clientset
go cachedClusterClient.serviceController.Run(wait.NeverStop)
go cachedClusterClient.endpointController.Run(wait.NeverStop)
glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName)
} else {
// do nothing when there is no spec change
glog.V(4).Infof("Keep clientset for cluster %s", clusterName)
return
}
} else {
glog.V(4).Infof("No client cache for cluster %s, building new", clusterName)
clientset, err := newClusterClientset(cluster)
if err != nil || clientset == nil {
glog.Errorf("Failed to create corresponding restclient of kubernetes cluster: %v", err)
}
cachedClusterClient = &clusterCache{
cluster: cluster,
clientset: clientset,
serviceQueue: workqueue.New(),
endpointQueue: workqueue.New(),
}
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Endpoints(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Endpoints(api.NamespaceAll).Watch(options)
},
},
&api.Endpoints{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cc.enqueueEndpoint(obj, clusterName)
},
UpdateFunc: func(old, cur interface{}) {
cc.enqueueEndpoint(cur, clusterName)
},
DeleteFunc: func(obj interface{}) {
cc.enqueueEndpoint(obj, clusterName)
},
},
)

cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Services(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Services(api.NamespaceAll).Watch(options)
},
},
&api.Service{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cc.enqueueService(obj, clusterName)
},
UpdateFunc: func(old, cur interface{}) {
oldService, ok := old.(*api.Service)

if !ok {
return
}
curService, ok := cur.(*api.Service)
if !ok {
return
}
if !reflect.DeepEqual(oldService.Status.LoadBalancer, curService.Status.LoadBalancer) {
cc.enqueueService(cur, clusterName)
}
},
DeleteFunc: func(obj interface{}) {
service, _ := obj.(*api.Service)
cc.enqueueService(obj, clusterName)
glog.V(2).Infof("Service %s/%s deletion found and enque to service store %s", service.Namespace, service.Name, clusterName)
},
},
)
cc.clientMap[clusterName] = cachedClusterClient
go cachedClusterClient.serviceController.Run(wait.NeverStop)
go cachedClusterClient.endpointController.Run(wait.NeverStop)
glog.V(2).Infof("Start watching services and endpoints on cluster %s", clusterName)
}

}

//TODO: copied from cluster controller, to make this as common function in pass 2
// delFromClusterSet delete a cluster from clusterSet and
// delete the corresponding restclient from the map clusterKubeClientMap
func (cc *clusterClientCache) delFromClusterSet(obj interface{}) {
cluster, ok := obj.(*federation.Cluster)
cc.rwlock.Lock()
defer cc.rwlock.Unlock()
if ok {
delete(cc.clientMap, cluster.Name)
} else {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
glog.Infof("Object contained wasn't a cluster or a deleted key: %+v", obj)
return
}
glog.Infof("Found tombstone for %v", obj)
delete(cc.clientMap, tombstone.Key)
}
}

// addToClusterSet inserts the new cluster to clusterSet and creates a corresponding
// restclient to map clusterKubeClientMap
func (cc *clusterClientCache) addToClientMap(obj interface{}) {
cluster := obj.(*federation.Cluster)
cc.rwlock.Lock()
defer cc.rwlock.Unlock()
cluster, ok := obj.(*federation.Cluster)
if !ok {
return
}
pred := getClusterConditionPredicate()
// check status
// skip if not ready
if pred(*cluster) {
cc.startClusterLW(cluster, cluster.Name)
}
}

func newClusterClientset(c *federation.Cluster) (*clientset.Clientset, error) {
clusterConfig, err := clientcmd.BuildConfigFromFlags(c.Spec.ServerAddressByClientCIDRs[0].ServerAddress, "")
if err != nil {
return nil, err
}
clusterConfig.QPS = KubeAPIQPS
clusterConfig.Burst = KubeAPIBurst
clientset := clientset.NewForConfigOrDie(restclient.AddUserAgent(clusterConfig, UserAgentName))
return clientset, nil
}
40 changes: 40 additions & 0 deletions federation/pkg/federation-controller/service/dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package service

// getClusterZoneName returns the name of the zone where the specified cluster exists (e.g. "us-east1-c" on GCE, or "us-east-1b" on AWS)
func getClusterZoneName(clusterName string) string {
// TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet.
return "zone-of-cluster-" + clusterName
}

// getClusterRegionName returns the name of the region where the specified cluster exists (e.g. us-east1 on GCE, or "us-east-1" on AWS)
func getClusterRegionName(clusterName string) string {
// TODO: quinton: Get this from the cluster API object - from the annotation on a node in the cluster - it doesn't contain this yet.
return "region-of-cluster-" + clusterName
}

// getFederationDNSZoneName returns the name of the managed DNS Zone configured for this federation
func getFederationDNSZoneName() string {
return "mydomain.com" // TODO: quinton: Get this from the federation configuration.
}

func ensureDNSRecords(clusterName string, cachedService *cachedService) error {
// Quinton: Pseudocode....

return nil
}
Loading

0 comments on commit 6133db3

Please sign in to comment.