Skip to content

Commit

Permalink
kube: wrapper around informer (istio#43870)
Browse files Browse the repository at this point in the history
* kube: wrapper around informer

This PR introduces a new abstraction around a Kubernetes Informer, Lister, and Client.

Benefits:
* Automatic handling of HasSynced for Handlers. Normally, HasSynced only covers the *store* - handlers are not guaranteed to be called. Kubernetes recently added a way to check for informers syncing, but using it is verbose.
* Easier use - Lister Get and List can not actually fail but return errors; this removes those to make code simpler. Also use of the underlying Indexer or Store or Controller is not required, which are even harder to use correctly.
* Better typing - less use of `any` requiring casting
* less verbose - For example, `s.client.NetworkingV1().Ingresses(currIng.Namespace).UpdateStatus(context.TODO(), currIng, metav1.UpdateOptions{})` becomes `s.ingresses.UpdateStatus(currIng)`
* Common type for everything - this allows us to implement higher level abstractions like CreateOrUpdate, a test wrapper (t.Fatal's any errors), etc
* Easier to properly remove handlers from informers, which is very desirable for some cases.
* Easier ability to create an informer on LabelSelector or FieldSelector, which before was like 15 lines just to create one
* Easier to use the DiscoveryNamespaceFilter, and usage uses the same type as un-filtered making interop easier
* Handle SetWatchErrorHandler from centralized location, removing need to handle it on every controller and removing HasStopped
* Introduced an example that is a fully commented example of writing a proper controller in Istio

* lint

* lint

* Fix filter on filter

* Renamed erased

* fix sort

* Rename things

* Check for conflicting filters

* lint

* change lint
  • Loading branch information
howardjohn authored Mar 15, 2023
1 parent 9657885 commit 4e05079
Show file tree
Hide file tree
Showing 77 changed files with 1,550 additions and 1,501 deletions.
35 changes: 9 additions & 26 deletions cni/pkg/ambient/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,15 @@ package ambient

import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"

"istio.io/istio/cni/pkg/ambient/ambientpod"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
)

var ErrLegacyLabel = "Namespace %s has sidecar label istio-injection or istio.io/rev " +
Expand All @@ -43,24 +38,14 @@ func (s *Server) setupHandlers() {
)

// We only need to handle pods on our node
podInformer := s.kubeClient.KubeInformer().InformerFor(&corev1.Pod{}, func(k kubernetes.Interface, resync time.Duration) cache.SharedIndexInformer {
return informersv1.NewFilteredPodInformer(
k, metav1.NamespaceAll, resync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
func(options *metav1.ListOptions) {
options.FieldSelector = "spec.nodeName=" + NodeName
},
)
})
_ = podInformer.SetTransform(kube.StripUnusedFields)
_, _ = podInformer.AddEventHandler(controllers.FromEventHandler(func(o controllers.Event) {
s.pods = kclient.NewFiltered[*corev1.Pod](s.kubeClient, kclient.Filter{FieldSelector: "spec.nodeName=" + NodeName})
s.pods.AddEventHandler(controllers.FromEventHandler(func(o controllers.Event) {
s.queue.Add(o)
}))
s.podLister = listerv1.NewPodLister(podInformer.GetIndexer())

// Namespaces could be anything though, so we watch all of those
ns := s.kubeClient.KubeInformer().Core().V1().Namespaces()
s.nsLister = ns.Lister()
_, _ = ns.Informer().AddEventHandler(controllers.ObjectHandler(s.EnqueueNamespace))
s.namespaces = kclient.New[*corev1.Namespace](s.kubeClient)
s.namespaces.AddEventHandler(controllers.ObjectHandler(s.EnqueueNamespace))
}

func (s *Server) Run(stop <-chan struct{}) {
Expand All @@ -69,8 +54,7 @@ func (s *Server) Run(stop <-chan struct{}) {
}

func (s *Server) ReconcileNamespaces() {
namespaces, _ := s.nsLister.List(klabels.Everything())
for _, ns := range namespaces {
for _, ns := range s.namespaces.List(metav1.NamespaceAll, klabels.Everything()) {
s.EnqueueNamespace(ns)
}
}
Expand All @@ -79,10 +63,9 @@ func (s *Server) ReconcileNamespaces() {
func (s *Server) EnqueueNamespace(o controllers.Object) {
namespace := o.GetName()
matchAmbient := o.GetLabels()[constants.DataplaneMode] == constants.DataplaneModeAmbient
pods, _ := s.podLister.Pods(namespace).List(klabels.Everything())
if matchAmbient {
log.Infof("Namespace %s is enabled in ambient mesh", namespace)
for _, pod := range pods {
for _, pod := range s.pods.List(namespace, klabels.Everything()) {
s.queue.Add(controllers.Event{
New: pod,
Old: pod,
Expand All @@ -91,7 +74,7 @@ func (s *Server) EnqueueNamespace(o controllers.Object) {
}
} else {
log.Infof("Namespace %s is disabled from ambient mesh", namespace)
for _, pod := range pods {
for _, pod := range s.pods.List(namespace, klabels.Everything()) {
s.queue.Add(controllers.Event{
New: pod,
Event: controllers.EventDelete,
Expand All @@ -113,7 +96,7 @@ func (s *Server) Reconcile(input any) error {
// For update, we just need to handle opt outs
newPod := event.New.(*corev1.Pod)
oldPod := event.Old.(*corev1.Pod)
ns, _ := s.nsLister.Get(newPod.Namespace)
ns := s.namespaces.Get(newPod.Namespace, "")
if ns == nil {
return fmt.Errorf("failed to find namespace %v", ns)
}
Expand Down
11 changes: 3 additions & 8 deletions cni/pkg/ambient/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"

pconstants "istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/kube/kclient"
istiolog "istio.io/pkg/log"
)

Expand Down Expand Up @@ -181,15 +180,11 @@ func DelPodFromMesh(client kubernetes.Interface, pod *corev1.Pod) {
}

// GetHostIPByRoute get the automatically chosen host ip to the Pod's CIDR
func GetHostIPByRoute(podLister listerv1.PodLister) (string, error) {
func GetHostIPByRoute(pods kclient.Client[*corev1.Pod]) (string, error) {
// We assume per node POD's CIDR is the same block, so the route to the POD
// from host should be "same". Otherwise, there may multiple host IPs will be
// used as source to dial to PODs.
pods, err := podLister.List(labels.Set{"app": "ztunnel"}.AsSelector())
if err != nil {
return "", fmt.Errorf("error getting ztunnel pod: %v", err)
}
for _, pod := range pods {
for _, pod := range pods.List(metav1.NamespaceAll, ztunnelLabels) {
targetIP := pod.Status.PodIP
if hostIP := getOutboundIP(targetIP); hostIP != nil {
return hostIP.String(), nil
Expand Down
17 changes: 7 additions & 10 deletions cni/pkg/ambient/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"

"istio.io/istio/cni/pkg/ambient/constants"
ebpf "istio.io/istio/cni/pkg/ebpf/server"
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/lazy"
)

Expand All @@ -40,8 +40,8 @@ type Server struct {
ctx context.Context
queue controllers.Queue

nsLister listerv1.NamespaceLister
podLister listerv1.PodLister
namespaces kclient.Client[*corev1.Namespace]
pods kclient.Client[*corev1.Pod]

mu sync.Mutex
ztunnelPod *corev1.Pod
Expand Down Expand Up @@ -111,7 +111,7 @@ func buildKubeClient(kubeConfig string) (kube.Client, error) {
return nil, fmt.Errorf("failed creating kube config: %v", err)
}

client, err := kube.NewClient(kube.NewClientConfigForRestConfig(kubeRestConfig))
client, err := kube.NewClient(kube.NewClientConfigForRestConfig(kubeRestConfig), "")
if err != nil {
return nil, fmt.Errorf("failed creating kube client: %v", err)
}
Expand Down Expand Up @@ -149,10 +149,7 @@ func (s *Server) UpdateConfig() {
var ztunnelLabels = labels.SelectorFromValidatedSet(labels.Set{"app": "ztunnel"})

func (s *Server) ReconcileZtunnel() error {
pods, err := s.podLister.Pods(metav1.NamespaceAll).List(ztunnelLabels)
if err != nil {
return err
}
pods := s.pods.List(metav1.NamespaceAll, ztunnelLabels)
var activePod *corev1.Pod
for _, p := range pods {
ready := kube.CheckPodReady(p) == nil
Expand Down Expand Up @@ -216,7 +213,7 @@ func (s *Server) ReconcileZtunnel() error {
if err != nil {
return fmt.Errorf("failed to get ns name: %v", err)
}
hostIP, err := GetHostIPByRoute(s.podLister)
hostIP, err := GetHostIPByRoute(s.pods)
if err != nil || hostIP == "" {
log.Warnf("failed to getting host IP: %v", err)
}
Expand All @@ -230,7 +227,7 @@ func (s *Server) ReconcileZtunnel() error {
return fmt.Errorf("failed to configure node for ztunnel: %v", err)
}
case EbpfMode:
h, err := GetHostIPByRoute(s.podLister)
h, err := GetHostIPByRoute(s.pods)
if err != nil || h == "" {
log.Warnf("failed to getting host IP: %v", err)
} else if HostIP != h {
Expand Down
4 changes: 2 additions & 2 deletions istioctl/cmd/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Analyze() *cobra.Command {

// check whether selected namespace exists.
if namespace != "" && useKube {
client, err := kube.NewClient(kube.BuildClientCmd(kubeconfig, configContext))
client, err := kube.NewClient(kube.BuildClientCmd(kubeconfig, configContext), "")
if err != nil {
return err
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func Analyze() *cobra.Command {
if err != nil {
return err
}
k, err := kube.NewClient(kube.NewClientConfigForRestConfig(restConfig))
k, err := kube.NewClient(kube.NewClientConfigForRestConfig(restConfig), "")
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion istioctl/cmd/deprecated_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func newConfigStore() (istioclient.Interface, error) {
if err != nil {
return nil, err
}
kclient, err := kubecfg.NewClient(kubecfg.NewClientConfigForRestConfig(cfg))
kclient, err := kubecfg.NewClient(kubecfg.NewClientConfigForRestConfig(cfg), "")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion istioctl/cmd/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func checkControlPlane(cli kube.CLIClient) (diag.Messages, error) {
return nil, err
}

k, err := kube.NewClient(kube.NewClientConfigForRestConfig(restConfig))
k, err := kube.NewClient(kube.NewClientConfigForRestConfig(restConfig), "")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion istioctl/cmd/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func analyzeWebhook(name, wh, revision string, config *rest.Config) error {
if err := sa.AddReaderKubeSource([]local.ReaderSource{{Name: "", Reader: strings.NewReader(wh)}}); err != nil {
return err
}
k, err := kube.NewClient(kube.NewClientConfigForRestConfig(config))
k, err := kube.NewClient(kube.NewClientConfigForRestConfig(config), "")
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func mergeIOPSWithProfile(iop *iopv1alpha1.IstioOperator) (*v1alpha1.IstioOperat
// and Start it when the Manager is Started. It also provides additional options to modify internal reconciler behavior.
func Add(mgr manager.Manager, options *Options) error {
restConfig = mgr.GetConfig()
kubeClient, err := kube.NewClient(kube.NewClientConfigForRestConfig(restConfig))
kubeClient, err := kube.NewClient(kube.NewClientConfigForRestConfig(restConfig), "")
if err != nil {
return fmt.Errorf("create Kubernetes client: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ func (s *Server) initKubeClient(args *PilotArgs) error {
return fmt.Errorf("failed creating kube config: %v", err)
}

s.kubeClient, err = kubelib.NewClient(kubelib.NewClientConfigForRestConfig(kubeRestConfig))
s.kubeClient, err = kubelib.NewClient(kubelib.NewClientConfigForRestConfig(kubeRestConfig), args.RegistryOptions.KubeOptions.ClusterID)
if err != nil {
return fmt.Errorf("failed creating kube client: %v", err)
}
Expand Down
23 changes: 0 additions & 23 deletions pilot/pkg/config/aggregate/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ package aggregate
import (
"errors"

"github.com/hashicorp/go-multierror"
"k8s.io/client-go/tools/cache"

"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/collection"
Expand Down Expand Up @@ -167,16 +164,6 @@ type storeCache struct {
caches []model.ConfigStoreController
}

func (cr *storeCache) HasStarted() bool {
for _, cache := range cr.caches {
if !cache.HasStarted() {
return false
}
}

return true
}

func (cr *storeCache) HasSynced() bool {
for _, cache := range cr.caches {
if !cache.HasSynced() {
Expand All @@ -194,16 +181,6 @@ func (cr *storeCache) RegisterEventHandler(kind config.GroupVersionKind, handler
}
}

func (cr *storeCache) SetWatchErrorHandler(handler func(r *cache.Reflector, err error)) error {
var errs error
for _, cache := range cr.caches {
if err := cache.SetWatchErrorHandler(handler); err != nil {
errs = multierror.Append(errs, err)
}
}
return errs
}

func (cr *storeCache) Run(stop <-chan struct{}) {
for _, cache := range cr.caches {
go cache.Run(stop)
Expand Down
9 changes: 0 additions & 9 deletions pilot/pkg/config/file/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
kubeJson "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/tools/cache"

kubeyaml2 "istio.io/istio/pilot/pkg/config/file/util/kubeyaml"
"istio.io/istio/pilot/pkg/config/memory"
Expand Down Expand Up @@ -121,14 +120,6 @@ func (s *KubeSource) RegisterEventHandler(kind config.GroupVersionKind, handler
func (s *KubeSource) Run(stop <-chan struct{}) {
}

func (s *KubeSource) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error {
panic("implement me")
}

func (s *KubeSource) HasStarted() bool {
return true
}

func (s *KubeSource) HasSynced() bool {
return true
}
Expand Down
16 changes: 3 additions & 13 deletions pilot/pkg/config/kube/crdclient/cache_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package crdclient

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // import GKE cluster authentication plugin
Expand All @@ -26,16 +25,15 @@ import (
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/resource"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/informer"
"istio.io/istio/pkg/kube/kclient"
"istio.io/pkg/log"
)

// cacheHandler abstracts the logic of an informer with a set of handlers. Handlers can be added at runtime
// and will be invoked on each informer event.
type cacheHandler struct {
client *Client
informer informer.FilteredSharedIndexInformer
lister func(namespace string) cache.GenericNamespaceLister
informer kclient.Untyped
schema resource.Schema
}

Expand Down Expand Up @@ -92,15 +90,7 @@ func createCacheHandler(cl *Client, schema resource.Schema, i informers.GenericI
h := &cacheHandler{
client: cl,
schema: schema,
informer: informer.NewFilteredSharedIndexInformer(cl.namespacesFilter, i.Informer()),
}

h.lister = func(namespace string) cache.GenericNamespaceLister {
gr := schema.GroupVersionResource().GroupResource()
if schema.IsClusterScoped() || namespace == metav1.NamespaceAll {
return cache.NewGenericLister(h.informer.GetIndexer(), gr)
}
return cache.NewGenericLister(h.informer.GetIndexer(), gr).ByNamespace(namespace)
informer: kclient.NewUntyped(cl.client, i.Informer(), kclient.Filter{ObjectFilter: cl.namespacesFilter}),
}

kind := schema.Kind()
Expand Down
Loading

0 comments on commit 4e05079

Please sign in to comment.