Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from istio:master #971

Merged
merged 2 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pilot/pkg/bootstrap/configcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"istio.io/istio/pkg/config/schema/gvr"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/revisions"
"istio.io/istio/pkg/util/sets"
)

// URL schemes supported by the config store
Expand Down Expand Up @@ -132,6 +133,17 @@ func (s *Server) initK8SConfigStore(args *PilotArgs) error {
}
configController := s.makeKubeConfigController(args)
s.ConfigStores = append(s.ConfigStores, configController)
tw := revisions.NewTagWatcher(s.kubeClient, args.Revision)
s.addStartFunc("tag-watcher", func(stop <-chan struct{}) error {
go tw.Run(stop)
return nil
})
tw.AddHandler(func(sets.String) {
s.XDSServer.ConfigUpdate(&model.PushRequest{
Full: true,
Reason: model.NewReasonStats(model.TagUpdate),
})
})
if features.EnableGatewayAPI {
if s.statusManager == nil && features.EnableGatewayAPIStatus {
s.initStatusManager(args)
Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/bootstrap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (p *PilotArgs) applyDefaults() {
p.CniNamespace = PodNamespace
p.PodName = PodName
p.Revision = Revision
p.RegistryOptions.KubeOptions.Revision = Revision
p.JwtRule = JwtRule
p.KeepaliveOptions = keepalive.DefaultOption()
p.RegistryOptions.ClusterRegistriesNamespace = p.Namespace
Expand Down
4 changes: 4 additions & 0 deletions pilot/pkg/config/kube/crdclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/collections"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/config/schema/resource"
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
Expand Down Expand Up @@ -367,6 +368,9 @@ func (cl *Client) addCRD(name string) {
ObjectFilter: composeFilters(namespaceFilter, cl.inRevision, extraFilter),
ObjectTransform: transform,
}
if resourceGVK == gvk.KubernetesGateway {
filter.ObjectFilter = composeFilters(namespaceFilter, extraFilter)
}

var kc kclient.Untyped
if s.IsBuiltin() {
Expand Down
12 changes: 11 additions & 1 deletion pilot/pkg/config/kube/gateway/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"istio.io/istio/pkg/kube/kubetypes"
istiolog "istio.io/istio/pkg/log"
"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/revisions"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/util/sets"
)
Expand Down Expand Up @@ -88,6 +89,8 @@ type Controller struct {
// is only the case when we are the leader.
statusController *atomic.Pointer[status.Controller]

tagWatcher revisions.TagWatcher

waitForCRD func(class schema.GroupVersionResource, stop <-chan struct{}) bool
}

Expand All @@ -111,6 +114,7 @@ func NewController(
cluster: options.ClusterID,
domain: options.DomainSuffix,
statusController: atomic.NewPointer(ctl),
tagWatcher: revisions.NewTagWatcher(kc, options.Revision),
waitForCRD: waitForCRD,
}

Expand Down Expand Up @@ -185,6 +189,11 @@ func (c *Controller) Reconcile(ps *model.PushContext) error {
referenceGrant := c.cache.List(gvk.ReferenceGrant, metav1.NamespaceAll)
serviceEntry := c.cache.List(gvk.ServiceEntry, metav1.NamespaceAll) // TODO lazy load only referenced SEs?

// all other types are filtered by revision, but for gateways we need to select tags as well
gateway = slices.FilterInPlace(gateway, func(gw config.Config) bool {
return c.tagWatcher.IsMine(gw.ToObjectMeta())
})

input := GatewayResources{
GatewayClass: deepCopyStatus(gatewayClass),
Gateway: deepCopyStatus(gateway),
Expand Down Expand Up @@ -296,10 +305,11 @@ func (c *Controller) Run(stop <-chan struct{}) {
}
}()
}
go c.tagWatcher.Run(stop)
}

func (c *Controller) HasSynced() bool {
return c.cache.HasSynced() && c.namespaces.HasSynced()
return c.cache.HasSynced() && c.namespaces.HasSynced() && c.tagWatcher.HasSynced()
}

func (c *Controller) SecretAllowed(resourceName string, namespace string) bool {
Expand Down
11 changes: 9 additions & 2 deletions pilot/pkg/config/kube/gateway/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,24 @@ var AlwaysReady = func(class schema.GroupVersionResource, stop <-chan struct{})
func TestListInvalidGroupVersionKind(t *testing.T) {
g := NewWithT(t)
clientSet := kube.NewFakeClient()
clientSet.RunAndWait(test.NewStop(t))
stop := test.NewStop(t)
clientSet.RunAndWait(stop)
store := memory.NewController(memory.Make(collections.All))
controller := NewController(clientSet, store, AlwaysReady, nil, controller.Options{})

typ := config.GroupVersionKind{Kind: "wrong-kind"}
c := controller.List(typ, "ns1")
g.Expect(c).To(HaveLen(0))
controller.Run(stop)
kube.WaitForCacheSync("test", stop, controller.HasSynced)
}

func TestListGatewayResourceType(t *testing.T) {
g := NewWithT(t)

clientSet := kube.NewFakeClient()
clientSet.RunAndWait(test.NewStop(t))
stop := test.NewStop(t)
clientSet.RunAndWait(stop)
store := memory.NewController(memory.Make(collections.All))
controller := NewController(clientSet, store, AlwaysReady, nil, controller.Options{})

Expand Down Expand Up @@ -138,12 +142,15 @@ func TestListGatewayResourceType(t *testing.T) {
g.Expect(c.Namespace).To(Equal("ns1"))
g.Expect(c.Spec).To(Equal(expectedgw))
}
controller.Run(stop)
kube.WaitForCacheSync("test", stop, controller.HasSynced)
}

func TestNamespaceEvent(t *testing.T) {
clientSet := kube.NewFakeClient()
store := memory.NewController(memory.Make(collections.All))
c := NewController(clientSet, store, AlwaysReady, nil, controller.Options{})

s := xdsfake.NewFakeXDS()

c.RegisterEventHandler(gvk.Namespace, func(_, cfg config.Config, _ model.Event) {
Expand Down
12 changes: 1 addition & 11 deletions pilot/pkg/config/kube/gateway/deploymentcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,17 +300,7 @@ func (d *DeploymentController) Reconcile(req types.NamespacedName) error {
}

// find the tag or revision indicated by the object
selectedTag, ok := gw.Labels[label.IoIstioRev.Name]
if !ok {
ns := d.namespaces.Get(gw.Namespace, "")
if ns == nil {
log.Debugf("gateway is not for this revision, skipping")
return nil
}
selectedTag = ns.Labels[label.IoIstioRev.Name]
}
myTags := d.tagWatcher.GetMyTags()
if !myTags.Contains(selectedTag) && !(selectedTag == "" && myTags.Contains("default")) {
if !d.tagWatcher.IsMine(gw.ObjectMeta) {
log.Debugf("gateway is not for this revision, skipping")
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pilot/pkg/model/push_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ const (
NamespaceUpdate TriggerReason = "namespace"
// ClusterUpdate describes a push triggered by a Cluster change
ClusterUpdate TriggerReason = "cluster"
// TagUpdate occurs when the revision's tags change, and all resources must be recalculated.
TagUpdate TriggerReason = "tag"
)

// Merge two update requests together
Expand Down
10 changes: 10 additions & 0 deletions pkg/config/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/util/gogoprotomarshal"
"istio.io/istio/pkg/util/protomarshal"
"istio.io/istio/pkg/util/sets"
)

// Meta is metadata attached to each configuration unit.
Expand Down Expand Up @@ -122,6 +123,15 @@ func LabelsInRevision(lbls map[string]string, rev string) bool {
return configEnv == rev
}

func LabelsInRevisionOrTags(lbls map[string]string, rev string, tags sets.Set[string]) bool {
if LabelsInRevision(lbls, rev) {
return true
}
configEnv := lbls[label.IoIstioRev.Name]
// Otherwise, only return true if revisions equal
return tags.Contains(configEnv)
}

func ObjectInRevision(o *Config, rev string) bool {
return LabelsInRevision(o.Labels, rev)
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/kube/kclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"istio.io/istio/pkg/kube/kubetypes"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/util/sets"
)

Expand Down Expand Up @@ -95,14 +96,18 @@ func (n *informerClient[T]) Start(stopCh <-chan struct{}) {
type internalIndex struct {
key string
indexer cache.Indexer
filter func(t any) bool
}

func (i internalIndex) Lookup(key string) []interface{} {
func (i internalIndex) Lookup(key string) []any {
res, err := i.indexer.ByIndex(i.key, key)
if err != nil {
// This should only happen if the index key (i.key, not key) does not exist which should be impossible.
log.Fatalf("index lookup failed: %v", err)
}
if i.filter != nil {
return slices.FilterInPlace(res, i.filter)
}
return res
}

Expand All @@ -112,7 +117,7 @@ func (n *informerClient[T]) Index(extract func(o T) []string) RawIndexer {
// We just need some unique key, any will do
key := fmt.Sprintf("%p", extract)
if err := n.informer.AddIndexers(map[string]cache.IndexFunc{
key: func(obj interface{}) ([]string, error) {
key: func(obj any) ([]string, error) {
t := controllers.Extract[T](obj)
return extract(t), nil
},
Expand All @@ -123,6 +128,7 @@ func (n *informerClient[T]) Index(extract func(o T) []string) RawIndexer {
ret := internalIndex{
key: key,
indexer: n.informer.GetIndexer(),
filter: n.filter,
}
return ret
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/kube/kclient/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ func (i index[K, O]) Lookup(k K) []O {
// CreateStringIndex creates a simple index, keyed by a string, over an informer for O. This is similar to
// Informer.AddIndex, but is easier to use and can be added after an informer has already started.
// This is split from CreateIndex because string does not implement fmt.Stringer.
// WARNING: This index will not respect client-side filtering, and filters
// should be re-applied to the index on lookup. see https://github.com/istio/istio/issues/54280
//
// If an informer is filtered, the underlying index will still store all data. Items that do not match the filter
// are removed at Lookup() time.
// If the filter changes, there is no "notification" to the user of an Index, as there are no events for indexes.
func CreateStringIndex[O controllers.ComparableObject](
client Informer[O],
extract func(o O) []string,
Expand All @@ -60,8 +62,10 @@ func CreateStringIndex[O controllers.ComparableObject](
// CreateIndex creates a simple index, keyed by key K, over an informer for O. This is similar to
// Informer.AddIndex, but is easier to use and can be added after an informer has already started.
// Keys can be any object, but they must encode down to a *unique* value with String().
// WARNING: This index will not respect client-side filtering, and filters
// should be re-applied to the index on lookup. see https://github.com/istio/istio/issues/54280
//
// If an informer is filtered, the underlying index will still store all data. Items that do not match the filter
// are removed at Lookup() time.
// If the filter changes, there is no "notification" to the user of an Index, as there are no events for indexes.
func CreateIndex[K fmt.Stringer, O controllers.ComparableObject](
client Informer[O],
extract func(o O) []K,
Expand Down
98 changes: 95 additions & 3 deletions pkg/kube/kclient/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package kclient
package kclient_test

import (
"context"
"testing"
"time"

"go.uber.org/atomic"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/kclient/clienttest"
"istio.io/istio/pkg/kube/kubetypes"
"istio.io/istio/pkg/test"
"istio.io/istio/pkg/test/util/assert"
"istio.io/istio/pkg/test/util/retry"
Expand All @@ -40,9 +45,9 @@ func (s SaNode) String() string {

func TestIndex(t *testing.T) {
c := kube.NewFakeClient()
pods := New[*corev1.Pod](c)
pods := kclient.New[*corev1.Pod](c)
c.RunAndWait(test.NewStop(t))
index := CreateIndex[SaNode, *corev1.Pod](pods, func(pod *corev1.Pod) []SaNode {
index := kclient.CreateIndex[SaNode, *corev1.Pod](pods, func(pod *corev1.Pod) []SaNode {
if len(pod.Spec.NodeName) == 0 {
return nil
}
Expand Down Expand Up @@ -150,3 +155,90 @@ func TestIndex(t *testing.T) {
assertIndex(k1)
assertIndex(k2)
}

func TestIndexFilters(t *testing.T) {
c := kube.NewFakeClient()

currentAllowedNamespace := atomic.NewString("a")
filter := kubetypes.NewStaticObjectFilter(func(obj any) bool {
return controllers.ExtractObject(obj).GetNamespace() == currentAllowedNamespace.Load()
})
pods := kclient.NewFiltered[*corev1.Pod](c, kubetypes.Filter{
ObjectFilter: filter,
})
pc := clienttest.NewWriter[*corev1.Pod](t, c)
c.RunAndWait(test.NewStop(t))
index := kclient.CreateStringIndex[*corev1.Pod](pods, func(pod *corev1.Pod) []string {
if pod.Status.PodIP == "" {
return nil
}
return []string{pod.Status.PodIP}
})

// Initial state should be empty
assert.Equal(t, index.Lookup("1.1.1.1"), nil)

assertIndex := func(k string, pods ...*corev1.Pod) {
t.Helper()
assert.EventuallyEqual(t, func() []*corev1.Pod { return index.Lookup(k) }, pods, retry.Timeout(time.Second*5))
}

// Add a pod matching the filter, we should see it.
podA1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "a",
},
Status: corev1.PodStatus{PodIP: "1.1.1.1"},
}
pc.CreateOrUpdateStatus(podA1)
assertIndex("1.1.1.1", podA1)

podA2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "a",
},
Status: corev1.PodStatus{PodIP: "2.2.2.2"},
}
pc.CreateOrUpdateStatus(podA2)
assertIndex("1.1.1.1", podA1)
assertIndex("2.2.2.2", podA2)

// Create a pod not matching the filter with overlapping IP
podB1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "b",
},
Status: corev1.PodStatus{PodIP: "1.1.1.1"},
}
pc.CreateOrUpdateStatus(podB1)
assertIndex("1.1.1.1", podA1)
assertIndex("2.2.2.2", podA2)

// Another pod, not matching filter with distinct IP
podB2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod2",
Namespace: "b",
},
Status: corev1.PodStatus{PodIP: "3.3.3.3"},
}
pc.CreateOrUpdateStatus(podB2)
assertIndex("1.1.1.1", podA1)
assertIndex("2.2.2.2", podA2)
assertIndex("3.3.3.3")

// Switch the filter
currentAllowedNamespace.Store("b")
assertIndex("1.1.1.1", podB1)
assertIndex("2.2.2.2")
assertIndex("3.3.3.3", podB2)

// Switch the filter again
currentAllowedNamespace.Store("c")
assertIndex("1.1.1.1")
assertIndex("2.2.2.2")
assertIndex("3.3.3.3")
}
Loading