Skip to content

Commit

Permalink
Add tagged gateways to status and XDS (istio#54465)
Browse files Browse the repository at this point in the history
* Add tagged gateways to status

Signed-off-by: Mitch Connors <mitchconnors@gmail.com>

* remove commented code

Signed-off-by: Mitch Connors <mitchconnors@gmail.com>

* remove unused tag watcher

Signed-off-by: Mitch Connors <mitchconnors@gmail.com>

* improve tag and waypoint creation in test

Signed-off-by: Mitch Connors <mitchconnors@gmail.com>

* discard test changes to see if they are affecting the test outcomes

Signed-off-by: Mitch Connors <mitchconnors@gmail.com>

* Delete .kubeconfig

* only use tagwatcher when mwcs available

* crdclient include all gateways

* fix lint error

* first pass at integ test

* add missing file from last commit

* revert conformance test changes

* add tagged gateway test

* fmt

* address code comments

* fix templating error

* fix test failures

* add release note

* push xds on tag change

* start func is async

* don't need tags for IGW

* add comment

---------

Signed-off-by: Mitch Connors <mitchconnors@gmail.com>
  • Loading branch information
therealmitchconnors authored Jan 22, 2025
1 parent 94b6c4e commit 4836653
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 17 deletions.
12 changes: 12 additions & 0 deletions pilot/pkg/bootstrap/configcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"istio.io/istio/pkg/config/schema/gvr"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/revisions"
"istio.io/istio/pkg/util/sets"
)

// URL schemes supported by the config store
Expand Down Expand Up @@ -132,6 +133,17 @@ func (s *Server) initK8SConfigStore(args *PilotArgs) error {
}
configController := s.makeKubeConfigController(args)
s.ConfigStores = append(s.ConfigStores, configController)
tw := revisions.NewTagWatcher(s.kubeClient, args.Revision)
s.addStartFunc("tag-watcher", func(stop <-chan struct{}) error {
go tw.Run(stop)
return nil
})
tw.AddHandler(func(sets.String) {
s.XDSServer.ConfigUpdate(&model.PushRequest{
Full: true,
Reason: model.NewReasonStats(model.TagUpdate),
})
})
if features.EnableGatewayAPI {
if s.statusManager == nil && features.EnableGatewayAPIStatus {
s.initStatusManager(args)
Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/bootstrap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (p *PilotArgs) applyDefaults() {
p.CniNamespace = PodNamespace
p.PodName = PodName
p.Revision = Revision
p.RegistryOptions.KubeOptions.Revision = Revision
p.JwtRule = JwtRule
p.KeepaliveOptions = keepalive.DefaultOption()
p.RegistryOptions.ClusterRegistriesNamespace = p.Namespace
Expand Down
4 changes: 4 additions & 0 deletions pilot/pkg/config/kube/crdclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/collections"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/config/schema/resource"
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
Expand Down Expand Up @@ -367,6 +368,9 @@ func (cl *Client) addCRD(name string) {
ObjectFilter: composeFilters(namespaceFilter, cl.inRevision, extraFilter),
ObjectTransform: transform,
}
if resourceGVK == gvk.KubernetesGateway {
filter.ObjectFilter = composeFilters(namespaceFilter, extraFilter)
}

var kc kclient.Untyped
if s.IsBuiltin() {
Expand Down
12 changes: 11 additions & 1 deletion pilot/pkg/config/kube/gateway/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"istio.io/istio/pkg/kube/kubetypes"
istiolog "istio.io/istio/pkg/log"
"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/revisions"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/util/sets"
)
Expand Down Expand Up @@ -88,6 +89,8 @@ type Controller struct {
// is only the case when we are the leader.
statusController *atomic.Pointer[status.Controller]

tagWatcher revisions.TagWatcher

waitForCRD func(class schema.GroupVersionResource, stop <-chan struct{}) bool
}

Expand All @@ -111,6 +114,7 @@ func NewController(
cluster: options.ClusterID,
domain: options.DomainSuffix,
statusController: atomic.NewPointer(ctl),
tagWatcher: revisions.NewTagWatcher(kc, options.Revision),
waitForCRD: waitForCRD,
}

Expand Down Expand Up @@ -185,6 +189,11 @@ func (c *Controller) Reconcile(ps *model.PushContext) error {
referenceGrant := c.cache.List(gvk.ReferenceGrant, metav1.NamespaceAll)
serviceEntry := c.cache.List(gvk.ServiceEntry, metav1.NamespaceAll) // TODO lazy load only referenced SEs?

// all other types are filtered by revision, but for gateways we need to select tags as well
gateway = slices.FilterInPlace(gateway, func(gw config.Config) bool {
return c.tagWatcher.IsMine(gw.ToObjectMeta())
})

input := GatewayResources{
GatewayClass: deepCopyStatus(gatewayClass),
Gateway: deepCopyStatus(gateway),
Expand Down Expand Up @@ -296,10 +305,11 @@ func (c *Controller) Run(stop <-chan struct{}) {
}
}()
}
go c.tagWatcher.Run(stop)
}

func (c *Controller) HasSynced() bool {
return c.cache.HasSynced() && c.namespaces.HasSynced()
return c.cache.HasSynced() && c.namespaces.HasSynced() && c.tagWatcher.HasSynced()
}

func (c *Controller) SecretAllowed(resourceName string, namespace string) bool {
Expand Down
11 changes: 9 additions & 2 deletions pilot/pkg/config/kube/gateway/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,24 @@ var AlwaysReady = func(class schema.GroupVersionResource, stop <-chan struct{})
func TestListInvalidGroupVersionKind(t *testing.T) {
g := NewWithT(t)
clientSet := kube.NewFakeClient()
clientSet.RunAndWait(test.NewStop(t))
stop := test.NewStop(t)
clientSet.RunAndWait(stop)
store := memory.NewController(memory.Make(collections.All))
controller := NewController(clientSet, store, AlwaysReady, nil, controller.Options{})

typ := config.GroupVersionKind{Kind: "wrong-kind"}
c := controller.List(typ, "ns1")
g.Expect(c).To(HaveLen(0))
controller.Run(stop)
kube.WaitForCacheSync("test", stop, controller.HasSynced)
}

func TestListGatewayResourceType(t *testing.T) {
g := NewWithT(t)

clientSet := kube.NewFakeClient()
clientSet.RunAndWait(test.NewStop(t))
stop := test.NewStop(t)
clientSet.RunAndWait(stop)
store := memory.NewController(memory.Make(collections.All))
controller := NewController(clientSet, store, AlwaysReady, nil, controller.Options{})

Expand Down Expand Up @@ -138,12 +142,15 @@ func TestListGatewayResourceType(t *testing.T) {
g.Expect(c.Namespace).To(Equal("ns1"))
g.Expect(c.Spec).To(Equal(expectedgw))
}
controller.Run(stop)
kube.WaitForCacheSync("test", stop, controller.HasSynced)
}

func TestNamespaceEvent(t *testing.T) {
clientSet := kube.NewFakeClient()
store := memory.NewController(memory.Make(collections.All))
c := NewController(clientSet, store, AlwaysReady, nil, controller.Options{})

s := xdsfake.NewFakeXDS()

c.RegisterEventHandler(gvk.Namespace, func(_, cfg config.Config, _ model.Event) {
Expand Down
12 changes: 1 addition & 11 deletions pilot/pkg/config/kube/gateway/deploymentcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,17 +300,7 @@ func (d *DeploymentController) Reconcile(req types.NamespacedName) error {
}

// find the tag or revision indicated by the object
selectedTag, ok := gw.Labels[label.IoIstioRev.Name]
if !ok {
ns := d.namespaces.Get(gw.Namespace, "")
if ns == nil {
log.Debugf("gateway is not for this revision, skipping")
return nil
}
selectedTag = ns.Labels[label.IoIstioRev.Name]
}
myTags := d.tagWatcher.GetMyTags()
if !myTags.Contains(selectedTag) && !(selectedTag == "" && myTags.Contains("default")) {
if !d.tagWatcher.IsMine(gw.ObjectMeta) {
log.Debugf("gateway is not for this revision, skipping")
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pilot/pkg/model/push_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ const (
NamespaceUpdate TriggerReason = "namespace"
// ClusterUpdate describes a push triggered by a Cluster change
ClusterUpdate TriggerReason = "cluster"
// TagUpdate occurs when the revision's tags change, and all resources must be recalculated.
TagUpdate TriggerReason = "tag"
)

// Merge two update requests together
Expand Down
10 changes: 10 additions & 0 deletions pkg/config/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/util/gogoprotomarshal"
"istio.io/istio/pkg/util/protomarshal"
"istio.io/istio/pkg/util/sets"
)

// Meta is metadata attached to each configuration unit.
Expand Down Expand Up @@ -122,6 +123,15 @@ func LabelsInRevision(lbls map[string]string, rev string) bool {
return configEnv == rev
}

func LabelsInRevisionOrTags(lbls map[string]string, rev string, tags sets.Set[string]) bool {
if LabelsInRevision(lbls, rev) {
return true
}
configEnv := lbls[label.IoIstioRev.Name]
// Otherwise, only return true if revisions equal
return tags.Contains(configEnv)
}

func ObjectInRevision(o *Config, rev string) bool {
return LabelsInRevision(o.Labels, rev)
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/revisions/leak_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package revisions

import (
"testing"

"istio.io/istio/tests/util/leak"
)

func TestMain(m *testing.M) {
// CheckMain asserts that no goroutines are leaked after a test package exits.
leak.CheckMain(m)
}
24 changes: 21 additions & 3 deletions pkg/revisions/tag_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package revisions

import (
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"istio.io/api/label"
Expand All @@ -33,6 +35,7 @@ type TagWatcher interface {
HasSynced() bool
AddHandler(handler TagHandler)
GetMyTags() sets.String
IsMine(metav1.ObjectMeta) bool
}

// TagHandler is a callback for when the tags revision change.
Expand All @@ -42,9 +45,10 @@ type tagWatcher struct {
revision string
handlers []TagHandler

queue controllers.Queue
webhooks kclient.Client[*admissionregistrationv1.MutatingWebhookConfiguration]
index kclient.Index[string, *admissionregistrationv1.MutatingWebhookConfiguration]
namespaces kclient.Client[*corev1.Namespace]
queue controllers.Queue
webhooks kclient.Client[*admissionregistrationv1.MutatingWebhookConfiguration]
index kclient.Index[string, *admissionregistrationv1.MutatingWebhookConfiguration]
}

func NewTagWatcher(client kube.Client, revision string) TagWatcher {
Expand All @@ -67,6 +71,7 @@ func NewTagWatcher(client kube.Client, revision string) TagWatcher {
}
return []string{rev}
})
p.namespaces = kclient.New[*corev1.Namespace](client)
return p
}

Expand All @@ -88,6 +93,19 @@ func (p *tagWatcher) HasSynced() bool {
return p.queue.HasSynced()
}

func (p *tagWatcher) IsMine(obj metav1.ObjectMeta) bool {
selectedTag, ok := obj.Labels[label.IoIstioRev.Name]
if !ok {
ns := p.namespaces.Get(obj.Namespace, "")
if ns == nil {
return true
}
selectedTag = ns.Labels[label.IoIstioRev.Name]
}
myTags := p.GetMyTags()
return myTags.Contains(selectedTag) || (selectedTag == "" && myTags.Contains("default"))
}

func (p *tagWatcher) GetMyTags() sets.String {
res := sets.New(p.revision)
for _, wh := range p.index.Lookup(p.revision) {
Expand Down
9 changes: 9 additions & 0 deletions releasenotes/notes/54458.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: release-notes/v2
kind: bug-fix
area: traffic-management
issue:
- 54458
releaseNotes:
- |
**Fixed** an issue where using a tag in the istio.io/rev label on a gateway causes the gateway to be improperly programmed, and to lack status.
66 changes: 66 additions & 0 deletions tests/integration/pilot/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pilot
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand All @@ -37,6 +38,7 @@ import (
"istio.io/istio/pkg/test/framework/components/echo"
"istio.io/istio/pkg/test/framework/components/echo/check"
"istio.io/istio/pkg/test/framework/components/istio"
"istio.io/istio/pkg/test/framework/components/istioctl"
testKube "istio.io/istio/pkg/test/kube"
"istio.io/istio/pkg/test/util/assert"
"istio.io/istio/pkg/test/util/retry"
Expand All @@ -51,6 +53,7 @@ func TestGateway(t *testing.T) {

t.NewSubTest("unmanaged").Run(UnmanagedGatewayTest)
t.NewSubTest("managed").Run(ManagedGatewayTest)
t.NewSubTest("tagged").Run(TaggedGatewayTest)
t.NewSubTest("managed-owner").Run(ManagedOwnerGatewayTest)
t.NewSubTest("status").Run(StatusGatewayTest)
t.NewSubTest("managed-short-name").Run(ManagedGatewayShortNameTest)
Expand Down Expand Up @@ -252,6 +255,69 @@ spec:
}
}

func TaggedGatewayTest(t framework.TestContext) {
istioctl.NewOrFail(t, istioctl.Config{}).InvokeOrFail(
t, strings.Split("tag set tag --revision default", " "))

testCases := []struct {
check echo.Checker
revisionValue string
}{
{
check: check.OK(),
revisionValue: "tag",
},
{
check: check.NotOK(),
revisionValue: "badtag",
},
}
for _, tc := range testCases {
t.NewSubTest(fmt.Sprintf("gateway-connectivity-tagged-%s", tc.revisionValue)).Run(func(t framework.TestContext) {
t.ConfigIstio().Eval(apps.Namespace.Name(),
map[string]string{"revision": tc.revisionValue}, `apiVersion: gateway.networking.k8s.io/v1beta1
kind: Gateway
metadata:
name: gateway
labels:
istio.io/rev: {{.revision}}
spec:
gatewayClassName: istio
listeners:
- name: default
hostname: "*.example.com"
port: 80
protocol: HTTP
---
apiVersion: gateway.networking.k8s.io/v1beta1
kind: HTTPRoute
metadata:
name: http-1
spec:
parentRefs:
- name: gateway
hostnames: ["bar.example.com"]
rules:
- backendRefs:
- name: b
port: 80
`).ApplyOrFail(t)
apps.B[0].CallOrFail(t, echo.CallOptions{
Port: echo.Port{
Protocol: protocol.HTTP,
ServicePort: 80,
},
Scheme: scheme.HTTP,
HTTP: echo.HTTP{
Headers: headers.New().WithHost("bar.example.com").Build(),
},
Address: fmt.Sprintf("gateway-istio.%s.svc.cluster.local", apps.Namespace.Name()),
Check: tc.check,
})
})
}
}

func ManagedGatewayShortNameTest(t framework.TestContext) {
t.ConfigIstio().YAML(apps.Namespace.Name(), `apiVersion: gateway.networking.k8s.io/v1beta1
kind: Gateway
Expand Down

0 comments on commit 4836653

Please sign in to comment.