Skip to content

Commit

Permalink
kclient: apply client side filter to index result (istio#54316)
Browse files Browse the repository at this point in the history
* kclient: apply client side filter to index result

* fix test
  • Loading branch information
howardjohn authored Jan 22, 2025
1 parent 4836653 commit b869fe7
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 9 deletions.
10 changes: 8 additions & 2 deletions pkg/kube/kclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"istio.io/istio/pkg/kube/kubetypes"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/util/sets"
)

Expand Down Expand Up @@ -95,14 +96,18 @@ func (n *informerClient[T]) Start(stopCh <-chan struct{}) {
type internalIndex struct {
key string
indexer cache.Indexer
filter func(t any) bool
}

func (i internalIndex) Lookup(key string) []interface{} {
func (i internalIndex) Lookup(key string) []any {
res, err := i.indexer.ByIndex(i.key, key)
if err != nil {
// This should only happen if the index key (i.key, not key) does not exist which should be impossible.
log.Fatalf("index lookup failed: %v", err)
}
if i.filter != nil {
return slices.FilterInPlace(res, i.filter)
}
return res
}

Expand All @@ -112,7 +117,7 @@ func (n *informerClient[T]) Index(extract func(o T) []string) RawIndexer {
// We just need some unique key, any will do
key := fmt.Sprintf("%p", extract)
if err := n.informer.AddIndexers(map[string]cache.IndexFunc{
key: func(obj interface{}) ([]string, error) {
key: func(obj any) ([]string, error) {
t := controllers.Extract[T](obj)
return extract(t), nil
},
Expand All @@ -123,6 +128,7 @@ func (n *informerClient[T]) Index(extract func(o T) []string) RawIndexer {
ret := internalIndex{
key: key,
indexer: n.informer.GetIndexer(),
filter: n.filter,
}
return ret
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/kube/kclient/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ func (i index[K, O]) Lookup(k K) []O {
// CreateStringIndex creates a simple index, keyed by a string, over an informer for O. This is similar to
// Informer.AddIndex, but is easier to use and can be added after an informer has already started.
// This is split from CreateIndex because string does not implement fmt.Stringer.
// WARNING: This index will not respect client-side filtering, and filters
// should be re-applied to the index on lookup. see https://github.com/istio/istio/issues/54280
//
// If an informer is filtered, the underlying index will still store all data. Items that do not match the filter
// are removed at Lookup() time.
// If the filter changes, there is no "notification" to the user of an Index, as there are no events for indexes.
func CreateStringIndex[O controllers.ComparableObject](
client Informer[O],
extract func(o O) []string,
Expand All @@ -60,8 +62,10 @@ func CreateStringIndex[O controllers.ComparableObject](
// CreateIndex creates a simple index, keyed by key K, over an informer for O. This is similar to
// Informer.AddIndex, but is easier to use and can be added after an informer has already started.
// Keys can be any object, but they must encode down to a *unique* value with String().
// WARNING: This index will not respect client-side filtering, and filters
// should be re-applied to the index on lookup. see https://github.com/istio/istio/issues/54280
//
// If an informer is filtered, the underlying index will still store all data. Items that do not match the filter
// are removed at Lookup() time.
// If the filter changes, there is no "notification" to the user of an Index, as there are no events for indexes.
func CreateIndex[K fmt.Stringer, O controllers.ComparableObject](
client Informer[O],
extract func(o O) []K,
Expand Down
98 changes: 95 additions & 3 deletions pkg/kube/kclient/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kclient
package kclient_test

import (
"context"
"testing"
"time"

"go.uber.org/atomic"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/kclient/clienttest"
"istio.io/istio/pkg/kube/kubetypes"
"istio.io/istio/pkg/test"
"istio.io/istio/pkg/test/util/assert"
"istio.io/istio/pkg/test/util/retry"
Expand All @@ -40,9 +45,9 @@ func (s SaNode) String() string {

func TestIndex(t *testing.T) {
c := kube.NewFakeClient()
pods := New[*corev1.Pod](c)
pods := kclient.New[*corev1.Pod](c)
c.RunAndWait(test.NewStop(t))
index := CreateIndex[SaNode, *corev1.Pod](pods, func(pod *corev1.Pod) []SaNode {
index := kclient.CreateIndex[SaNode, *corev1.Pod](pods, func(pod *corev1.Pod) []SaNode {
if len(pod.Spec.NodeName) == 0 {
return nil
}
Expand Down Expand Up @@ -150,3 +155,90 @@ func TestIndex(t *testing.T) {
assertIndex(k1)
assertIndex(k2)
}

func TestIndexFilters(t *testing.T) {
c := kube.NewFakeClient()

currentAllowedNamespace := atomic.NewString("a")
filter := kubetypes.NewStaticObjectFilter(func(obj any) bool {
return controllers.ExtractObject(obj).GetNamespace() == currentAllowedNamespace.Load()
})
pods := kclient.NewFiltered[*corev1.Pod](c, kubetypes.Filter{
ObjectFilter: filter,
})
pc := clienttest.NewWriter[*corev1.Pod](t, c)
c.RunAndWait(test.NewStop(t))
index := kclient.CreateStringIndex[*corev1.Pod](pods, func(pod *corev1.Pod) []string {
if pod.Status.PodIP == "" {
return nil
}
return []string{pod.Status.PodIP}
})

// Initial state should be empty
assert.Equal(t, index.Lookup("1.1.1.1"), nil)

assertIndex := func(k string, pods ...*corev1.Pod) {
t.Helper()
assert.EventuallyEqual(t, func() []*corev1.Pod { return index.Lookup(k) }, pods, retry.Timeout(time.Second*5))
}

// Add a pod matching the filter, we should see it.
podA1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "a",
},
Status: corev1.PodStatus{PodIP: "1.1.1.1"},
}
pc.CreateOrUpdateStatus(podA1)
assertIndex("1.1.1.1", podA1)

podA2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "a",
},
Status: corev1.PodStatus{PodIP: "2.2.2.2"},
}
pc.CreateOrUpdateStatus(podA2)
assertIndex("1.1.1.1", podA1)
assertIndex("2.2.2.2", podA2)

// Create a pod not matching the filter with overlapping IP
podB1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "b",
},
Status: corev1.PodStatus{PodIP: "1.1.1.1"},
}
pc.CreateOrUpdateStatus(podB1)
assertIndex("1.1.1.1", podA1)
assertIndex("2.2.2.2", podA2)

// Another pod, not matching filter with distinct IP
podB2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "b",
},
Status: corev1.PodStatus{PodIP: "3.3.3.3"},
}
pc.CreateOrUpdateStatus(podB2)
assertIndex("1.1.1.1", podA1)
assertIndex("2.2.2.2", podA2)
assertIndex("3.3.3.3")

// Switch the filter
currentAllowedNamespace.Store("b")
assertIndex("1.1.1.1", podB1)
assertIndex("2.2.2.2")
assertIndex("3.3.3.3", podB2)

// Switch the filter again
currentAllowedNamespace.Store("c")
assertIndex("1.1.1.1")
assertIndex("2.2.2.2")
assertIndex("3.3.3.3")
}

0 comments on commit b869fe7

Please sign in to comment.