Skip to content

Commit

Permalink
move informer and controller to pkg/client/cache
Browse files Browse the repository at this point in the history
Signed-off-by: Mike Danese <mikedanese@google.com>
  • Loading branch information
mikedanese committed Sep 15, 2016
1 parent 0a62dab commit a765d59
Show file tree
Hide file tree
Showing 78 changed files with 425 additions and 498 deletions.
2 changes: 1 addition & 1 deletion cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
"k8s.io/kubernetes/pkg/apiserver/authenticator"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/informers"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ import (
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/disruption"
endpointcontroller "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/controller/job"
namespacecontroller "k8s.io/kubernetes/pkg/controller/namespace"
nodecontroller "k8s.io/kubernetes/pkg/controller/node"
Expand Down
5 changes: 2 additions & 3 deletions contrib/mesos/pkg/scheduler/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
controllerfw "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/healthz"
"k8s.io/kubernetes/pkg/master/ports"
Expand Down Expand Up @@ -786,10 +785,10 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Fatalf("Cannot create client to watch nodes: %v", err)
}
nodeLW := cache.NewListWatchFromClient(nodesClient.CoreClient, "nodes", api.NamespaceAll, fields.Everything())
nodeStore, nodeCtl := controllerfw.NewInformer(nodeLW, &api.Node{}, s.nodeRelistPeriod, &controllerfw.ResourceEventHandlerFuncs{
nodeStore, nodeCtl := cache.NewInformer(nodeLW, &api.Node{}, s.nodeRelistPeriod, &cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
if eiRegistry != nil {
// TODO(jdef) use controllerfw.DeletionHandlingMetaNamespaceKeyFunc at some point?
// TODO(jdef) use cache.DeletionHandlingMetaNamespaceKeyFunc at some point?
nodeName := ""
if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
nodeName = tombstone.Key
Expand Down
15 changes: 7 additions & 8 deletions contrib/mesos/pkg/service/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
kservice "k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/intstr"
Expand All @@ -43,7 +42,7 @@ import (
)

var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)

type EndpointController interface {
Expand All @@ -56,7 +55,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
client: client,
queue: workqueue.New(),
}
e.serviceStore.Store, e.serviceController = framework.NewInformer(
e.serviceStore.Store, e.serviceController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().Services(api.NamespaceAll).List(options)
Expand All @@ -67,7 +66,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
},
&api.Service{},
kservice.FullServiceResyncPeriod,
framework.ResourceEventHandlerFuncs{
cache.ResourceEventHandlerFuncs{
AddFunc: e.enqueueService,
UpdateFunc: func(old, cur interface{}) {
e.enqueueService(cur)
Expand All @@ -76,7 +75,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
},
)

e.podStore.Indexer, e.podController = framework.NewIndexerInformer(
e.podStore.Indexer, e.podController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return e.client.Core().Pods(api.NamespaceAll).List(options)
Expand All @@ -87,7 +86,7 @@ func NewEndpointController(client *clientset.Clientset) *endpointController {
},
&api.Pod{},
5*time.Minute,
framework.ResourceEventHandlerFuncs{
cache.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
Expand All @@ -113,8 +112,8 @@ type endpointController struct {

// Since we join two objects, we'll watch both of them with
// controllers.
serviceController *framework.Controller
podController *framework.Controller
serviceController *cache.Controller
podController *cache.Controller
}

// Runs e; will not return until stopCh is closed. workers determines how many
Expand Down
4 changes: 2 additions & 2 deletions federation/cmd/federation-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.

// Package app does all of the work necessary to create a Kubernetes
// APIServer by binding together the API, master and APIServer infrastructure.
// It can be configured and called directly or via the hyperkube framework.
// It can be configured and called directly or via the hyperkube cache.
package app

import (
Expand All @@ -33,7 +33,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/rbac"
"k8s.io/kubernetes/pkg/apiserver/authenticator"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/informers"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/genericapiserver/authorizer"
genericoptions "k8s.io/kubernetes/pkg/genericapiserver/options"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
Expand All @@ -50,7 +49,7 @@ type ClusterController struct {
clusterKubeClientMap map[string]ClusterClient

// cluster framework and store
clusterController *framework.Controller
clusterController *cache.Controller
clusterStore cluster_cache.StoreToClusterLister
}

Expand All @@ -63,7 +62,7 @@ func NewclusterController(federationClient federationclientset.Interface, cluste
clusterClusterStatusMap: make(map[string]federation_v1beta1.ClusterStatus),
clusterKubeClientMap: make(map[string]ClusterClient),
}
cc.clusterStore.Store, cc.clusterController = framework.NewInformer(
cc.clusterStore.Store, cc.clusterController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return cc.federationClient.Federation().Clusters().List(options)
Expand All @@ -74,7 +73,7 @@ func NewclusterController(federationClient federationclientset.Interface, cluste
},
&federation_v1beta1.Cluster{},
controller.NoResyncPeriodFunc(),
framework.ResourceEventHandlerFuncs{
cache.ResourceEventHandlerFuncs{
DeleteFunc: cc.delFromClusterSet,
AddFunc: cc.addToClusterSet,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
Expand Down Expand Up @@ -81,7 +80,7 @@ type IngressController struct {
// Definitions of ingresses that should be federated.
ingressInformerStore cache.Store
// Informer controller for ingresses that should be federated.
ingressInformerController framework.ControllerInterface
ingressInformerController cache.ControllerInterface

// Client to federated api server.
federatedApiClient federation_release_1_4.Interface
Expand Down Expand Up @@ -125,7 +124,7 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr
ic.configMapDeliverer = util.NewDelayingDeliverer()

// Start informer in federated API servers on ingresses that should be federated.
ic.ingressInformerStore, ic.ingressInformerController = framework.NewInformer(
ic.ingressInformerStore, ic.ingressInformerController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return client.Extensions().Ingresses(api.NamespaceAll).List(options)
Expand All @@ -145,8 +144,8 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr
// Federated informer on ingresses in members of federation.
ic.ingressFederatedInformer = util.NewFederatedInformer(
client,
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) {
return framework.NewInformer(
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return targetClient.Extensions().Ingresses(api.NamespaceAll).List(options)
Expand Down Expand Up @@ -177,9 +176,9 @@ func NewIngressController(client federation_release_1_4.Interface) *IngressContr
// Federated informer on configmaps for ingress controllers in members of the federation.
ic.configMapFederatedInformer = util.NewFederatedInformer(
client,
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) {
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, cache.ControllerInterface) {
glog.V(4).Infof("Returning new informer for cluster %q", cluster.Name)
return framework.NewInformer(
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
if targetClient == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/sets"
Expand Down Expand Up @@ -62,7 +61,7 @@ type NamespaceController struct {
// Definitions of namespaces that should be federated.
namespaceInformerStore cache.Store
// Informer controller for namespaces that should be federated.
namespaceInformerController framework.ControllerInterface
namespaceInformerController cache.ControllerInterface

// Client to federated api server.
federatedApiClient federation_release_1_4.Interface
Expand Down Expand Up @@ -100,7 +99,7 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
nc.clusterDeliverer = util.NewDelayingDeliverer()

// Start informer in federated API servers on namespaces that should be federated.
nc.namespaceInformerStore, nc.namespaceInformerController = framework.NewInformer(
nc.namespaceInformerStore, nc.namespaceInformerController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return client.Core().Namespaces().List(options)
Expand All @@ -116,8 +115,8 @@ func NewNamespaceController(client federation_release_1_4.Interface) *NamespaceC
// Federated informer on namespaces in members of federation.
nc.namespaceFederatedInformer = util.NewFederatedInformer(
client,
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) {
return framework.NewInformer(
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return targetClient.Core().Namespaces().List(options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"
Expand Down Expand Up @@ -79,7 +78,7 @@ func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.Fede
type ReplicaSetController struct {
fedClient fedclientset.Interface

replicaSetController *framework.Controller
replicaSetController *cache.Controller
replicaSetStore cache.StoreToReplicaSetLister

fedReplicaSetInformer fedutil.FederatedInformer
Expand Down Expand Up @@ -118,8 +117,8 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
eventRecorder: recorder,
}

replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) {
return framework.NewInformer(
replicaSetFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return clientset.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options)
Expand All @@ -145,8 +144,8 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
}
frsc.fedReplicaSetInformer = fedutil.NewFederatedInformer(federationClient, replicaSetFedInformerFactory, &clusterLifecycle)

podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, framework.ControllerInterface) {
return framework.NewInformer(
podFedInformerFactory := func(cluster *fedv1.Cluster, clientset kubeclientset.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return clientset.Core().Pods(apiv1.NamespaceAll).List(options)
Expand All @@ -166,7 +165,7 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
}
frsc.fedPodInformer = fedutil.NewFederatedInformer(federationClient, podFedInformerFactory, &fedutil.ClusterLifecycleHandlerFuncs{})

frsc.replicaSetStore.Store, frsc.replicaSetController = framework.NewInformer(
frsc.replicaSetStore.Store, frsc.replicaSetController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return frsc.fedClient.Extensions().ReplicaSets(apiv1.NamespaceAll).List(options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
kube_release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/watch"
Expand Down Expand Up @@ -59,7 +58,7 @@ type SecretController struct {
// Definitions of secrets that should be federated.
secretInformerStore cache.Store
// Informer controller for secrets that should be federated.
secretInformerController framework.ControllerInterface
secretInformerController cache.ControllerInterface

// Client to federated api server.
federatedApiClient federation_release_1_4.Interface
Expand Down Expand Up @@ -97,7 +96,7 @@ func NewSecretController(client federation_release_1_4.Interface) *SecretControl
secretcontroller.clusterDeliverer = util.NewDelayingDeliverer()

// Start informer in federated API servers on secrets that should be federated.
secretcontroller.secretInformerStore, secretcontroller.secretInformerController = framework.NewInformer(
secretcontroller.secretInformerStore, secretcontroller.secretInformerController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return client.Core().Secrets(api_v1.NamespaceAll).List(options)
Expand All @@ -113,8 +112,8 @@ func NewSecretController(client federation_release_1_4.Interface) *SecretControl
// Federated informer on secrets in members of federation.
secretcontroller.secretFederatedInformer = util.NewFederatedInformer(
client,
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, framework.ControllerInterface) {
return framework.NewInformer(
func(cluster *federation_api.Cluster, targetClient kube_release_1_4.Interface) (cache.Store, cache.ControllerInterface) {
return cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return targetClient.Core().Secrets(api_v1.NamespaceAll).List(options)
Expand Down
13 changes: 6 additions & 7 deletions federation/pkg/federation-controller/service/cluster_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
cache "k8s.io/kubernetes/pkg/client/cache"
release_1_4 "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_4"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/controller/framework"
pkg_runtime "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue"
Expand All @@ -43,11 +42,11 @@ type clusterCache struct {
// A store of services, populated by the serviceController
serviceStore cache.StoreToServiceLister
// Watches changes to all services
serviceController *framework.Controller
serviceController *cache.Controller
// A store of endpoint, populated by the serviceController
endpointStore cache.StoreToEndpointsLister
// Watches changes to all endpoints
endpointController *framework.Controller
endpointController *cache.Controller
// services that need to be synced
serviceQueue *workqueue.Type
// endpoints that need to be synced
Expand Down Expand Up @@ -91,7 +90,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
serviceQueue: workqueue.New(),
endpointQueue: workqueue.New(),
}
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = framework.NewInformer(
cachedClusterClient.endpointStore.Store, cachedClusterClient.endpointController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Endpoints(v1.NamespaceAll).List(options)
Expand All @@ -102,7 +101,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
},
&v1.Endpoints{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cc.enqueueEndpoint(obj, clusterName)
},
Expand All @@ -115,7 +114,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
},
)

cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = framework.NewInformer(
cachedClusterClient.serviceStore.Store, cachedClusterClient.serviceController = cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return clientset.Core().Services(v1.NamespaceAll).List(options)
Expand All @@ -126,7 +125,7 @@ func (cc *clusterClientCache) startClusterLW(cluster *v1beta1.Cluster, clusterNa
},
&v1.Service{},
serviceSyncPeriod,
framework.ResourceEventHandlerFuncs{
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
cc.enqueueService(obj, clusterName)
},
Expand Down
Loading

0 comments on commit a765d59

Please sign in to comment.