Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

krt: initial debug interface #53597

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions pilot/pkg/serviceregistry/kube/controller/ambient/ambientindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,13 @@ type Options struct {
LookupNetwork LookupNetwork
LookupNetworkGateways LookupNetworkGateways
StatusNotifier *activenotifier.ActiveNotifier

Debugger *krt.DebugHandler
}

func New(options Options) Index {
a := &index{
networkUpdateTrigger: krt.NewRecomputeTrigger(false),
networkUpdateTrigger: krt.NewRecomputeTrigger(false, krt.WithName("NetworkTrigger")),

SystemNamespace: options.SystemNamespace,
DomainSuffix: options.DomainSuffix,
Expand All @@ -140,53 +142,54 @@ func New(options Options) Index {
filter := kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
}
ConfigMaps := krt.NewInformerFiltered[*v1.ConfigMap](options.Client, filter, krt.WithName("ConfigMaps"))
withDebug := krt.WithDebugging(options.Debugger)
ConfigMaps := krt.NewInformerFiltered[*v1.ConfigMap](options.Client, filter, krt.WithName("ConfigMaps"), withDebug)

authzPolicies := kclient.NewDelayedInformer[*securityclient.AuthorizationPolicy](options.Client,
gvr.AuthorizationPolicy, kubetypes.StandardInformer, filter)
AuthzPolicies := krt.WrapClient[*securityclient.AuthorizationPolicy](authzPolicies, krt.WithName("AuthorizationPolicies"))
AuthzPolicies := krt.WrapClient[*securityclient.AuthorizationPolicy](authzPolicies, krt.WithName("AuthorizationPolicies"), withDebug)

peerAuths := kclient.NewDelayedInformer[*securityclient.PeerAuthentication](options.Client,
gvr.PeerAuthentication, kubetypes.StandardInformer, filter)
PeerAuths := krt.WrapClient[*securityclient.PeerAuthentication](peerAuths, krt.WithName("PeerAuthentications"))
PeerAuths := krt.WrapClient[*securityclient.PeerAuthentication](peerAuths, krt.WithName("PeerAuthentications"), withDebug)

serviceEntries := kclient.NewDelayedInformer[*networkingclient.ServiceEntry](options.Client,
gvr.ServiceEntry, kubetypes.StandardInformer, filter)
ServiceEntries := krt.WrapClient[*networkingclient.ServiceEntry](serviceEntries, krt.WithName("ServiceEntries"))
ServiceEntries := krt.WrapClient[*networkingclient.ServiceEntry](serviceEntries, krt.WithName("ServiceEntries"), withDebug)

workloadEntries := kclient.NewDelayedInformer[*networkingclient.WorkloadEntry](options.Client,
gvr.WorkloadEntry, kubetypes.StandardInformer, filter)
WorkloadEntries := krt.WrapClient[*networkingclient.WorkloadEntry](workloadEntries, krt.WithName("WorkloadEntries"))
WorkloadEntries := krt.WrapClient[*networkingclient.WorkloadEntry](workloadEntries, krt.WithName("WorkloadEntries"), withDebug)

gatewayClient := kclient.NewDelayedInformer[*v1beta1.Gateway](options.Client, gvr.KubernetesGateway, kubetypes.StandardInformer, filter)
Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, krt.WithName("Gateways"))
Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, krt.WithName("Gateways"), withDebug)

gatewayClassClient := kclient.NewDelayedInformer[*v1beta1.GatewayClass](options.Client, gvr.GatewayClass, kubetypes.StandardInformer, filter)
GatewayClasses := krt.WrapClient[*v1beta1.GatewayClass](gatewayClassClient, krt.WithName("GatewayClasses"))
GatewayClasses := krt.WrapClient[*v1beta1.GatewayClass](gatewayClassClient, krt.WithName("GatewayClasses"), withDebug)

servicesClient := kclient.NewFiltered[*v1.Service](options.Client, filter)
Services := krt.WrapClient[*v1.Service](servicesClient, krt.WithName("Services"))
Services := krt.WrapClient[*v1.Service](servicesClient, krt.WithName("Services"), withDebug)
Nodes := krt.NewInformerFiltered[*v1.Node](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
ObjectTransform: kubeclient.StripNodeUnusedFields,
}, krt.WithName("Nodes"))
}, krt.WithName("Nodes"), withDebug)
Pods := krt.NewInformerFiltered[*v1.Pod](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
ObjectTransform: kubeclient.StripPodUnusedFields,
}, krt.WithName("Pods"))
}, krt.WithName("Pods"), withDebug)

// TODO: Should this go ahead and transform the full ns into some intermediary with just the details we care about?
Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"))
Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"), withDebug)

EndpointSlices := krt.NewInformerFiltered[*discovery.EndpointSlice](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
}, krt.WithName("EndpointSlices"))
}, krt.WithName("EndpointSlices"), withDebug)

MeshConfig := MeshConfigCollection(ConfigMaps, options)
Waypoints := a.WaypointsCollection(Gateways, GatewayClasses, Pods)
MeshConfig := MeshConfigCollection(ConfigMaps, options, withDebug)
Waypoints := a.WaypointsCollection(Gateways, GatewayClasses, Pods, withDebug)

// AllPolicies includes peer-authentication converted policies
AuthorizationPolicies, AllPolicies := PolicyCollections(AuthzPolicies, PeerAuths, MeshConfig, Waypoints)
AuthorizationPolicies, AllPolicies := PolicyCollections(AuthzPolicies, PeerAuths, MeshConfig, Waypoints, withDebug)
AllPolicies.RegisterBatch(PushXds(a.XDSUpdater,
func(i model.WorkloadAuthorization) model.ConfigKey {
if i.Authorization == nil {
Expand All @@ -199,14 +202,15 @@ func New(options Options) Index {
servicesWriter := kclient.NewWriteClient[*v1.Service](options.Client)

// these are workloadapi-style services combined from kube services and service entries
WorkloadServices := a.ServicesCollection(Services, ServiceEntries, Waypoints, Namespaces)
WorkloadServices := a.ServicesCollection(Services, ServiceEntries, Waypoints, Namespaces, withDebug)

WaypointPolicyStatus := WaypointPolicyStatusCollection(
AuthzPolicies,
Waypoints,
Services,
ServiceEntries,
Namespaces,
withDebug,
)

authorizationPoliciesWriter := kclient.NewWriteClient[*securityclient.AuthorizationPolicy](options.Client)
Expand Down Expand Up @@ -270,6 +274,7 @@ func New(options Options) Index {
ServiceEntries,
EndpointSlices,
Namespaces,
withDebug,
)

WorkloadAddressIndex := krt.NewIndex[networkAddress, model.WorkloadInfo](Workloads, networkAddressFromWorkload)
Expand Down Expand Up @@ -311,6 +316,7 @@ func New(options Options) Index {
WorkloadServiceIndex,
WorkloadServices,
ServiceAddressIndex,
withDebug,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
k8sv1 "sigs.k8s.io/gateway-api/apis/v1"
k8sbeta "sigs.k8s.io/gateway-api/apis/v1beta1"
"sigs.k8s.io/yaml"

"istio.io/api/annotation"
"istio.io/api/label"
Expand Down Expand Up @@ -1486,6 +1487,7 @@ func newAmbientTestServer(t *testing.T, clusterID cluster.ID, networkID network.
} {
clienttest.MakeCRD(t, cl, crd)
}
debugger := &krt.DebugHandler{}
idx := New(Options{
Client: cl,
SystemNamespace: systemNS,
Expand All @@ -1499,19 +1501,12 @@ func newAmbientTestServer(t *testing.T, clusterID cluster.ID, networkID network.
return nil
},
StatusNotifier: activenotifier.New(true),
Debugger: debugger,
})
idx.NetworksSynced()
cl.RunAndWait(test.NewStop(t))

t.Cleanup(func() {
if t.Failed() {
idx := idx.(*index)
krt.Dump(idx.authorizationPolicies)
krt.Dump(idx.workloads.Collection)
krt.Dump(idx.services.Collection)
krt.Dump(idx.waypoints.Collection)
}
})
dumpOnFailure(t, debugger)
a := &ambientTestServer{
t: t,
clusterID: clusterID,
Expand Down Expand Up @@ -1551,6 +1546,15 @@ func newAmbientTestServer(t *testing.T, clusterID cluster.ID, networkID network.
return a
}

func dumpOnFailure(t *testing.T, debugger *krt.DebugHandler) {
t.Cleanup(func() {
if t.Failed() {
b, _ := yaml.Marshal(debugger)
t.Log(string(b))
}
})
}

func (s *ambientTestServer) addWaypoint(t *testing.T, ip, name, trafficType string, ready bool) {
s.addWaypointSpecificAddress(t, ip, fmt.Sprintf("%s.%s.svc.%s", name, testNS, s.DomainSuffix), name, trafficType, ready)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestWaypointPolicyStatusCollection(t *testing.T) {
}
})

wpsCollection := WaypointPolicyStatusCollection(authzPolCol, waypointCol, svcCol, seCol, nsCol)
wpsCollection := WaypointPolicyStatusCollection(authzPolCol, waypointCol, svcCol, seCol, nsCol, krt.WithDebugging(&krt.DebugHandler{}))
c.RunAndWait(ctx.Done())

_, err := clientNs.Create(&v1.Namespace{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type MeshConfig struct {
*meshapi.MeshConfig
}

func (m MeshConfig) ResourceName() string { return " " }
func (m MeshConfig) ResourceName() string { return "MeshConfig" }

func (m MeshConfig) Equals(other MeshConfig) bool { return proto.Equal(m.MeshConfig, other.MeshConfig) }

func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Options) krt.Singleton[MeshConfig] {
func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Options, withDebug krt.CollectionOption) krt.Singleton[MeshConfig] {
cmName := "istio"
if options.Revision != "" && options.Revision != "default" {
cmName = cmName + "-" + options.Revision
Expand All @@ -60,7 +60,7 @@ func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Opti
meshCfg = n
}
return &MeshConfig{meshCfg}
}, krt.WithName("MeshConfig"),
}, krt.WithName("MeshConfig"), withDebug,
)
}

Expand Down
16 changes: 10 additions & 6 deletions pilot/pkg/serviceregistry/kube/controller/ambient/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (
"istio.io/istio/pkg/workloadapi/security"
)

func WaypointPolicyStatusCollection(authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
func WaypointPolicyStatusCollection(
authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
waypoints krt.Collection[Waypoint],
services krt.Collection[*corev1.Service],
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
namespaces krt.Collection[*corev1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.WaypointPolicyStatus] {
return krt.NewCollection(authzPolicies,
func(ctx krt.HandlerContext, i *securityclient.AuthorizationPolicy) *model.WaypointPolicyStatus {
Expand Down Expand Up @@ -118,14 +120,15 @@ func WaypointPolicyStatusCollection(authzPolicies krt.Collection[*securityclient
Source: MakeSource(i),
Conditions: conditions,
}
}, krt.WithName("WaypointPolicyStatuses"))
}, krt.WithName("WaypointPolicyStatuses"), withDebug)
}

func PolicyCollections(
authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
peerAuths krt.Collection[*securityclient.PeerAuthentication],
meshConfig krt.Singleton[MeshConfig],
waypoints krt.Collection[Waypoint],
withDebug krt.CollectionOption,
) (krt.Collection[model.WorkloadAuthorization], krt.Collection[model.WorkloadAuthorization]) {
AuthzDerivedPolicies := krt.NewCollection(authzPolicies, func(ctx krt.HandlerContext, i *securityclient.AuthorizationPolicy) *model.WorkloadAuthorization {
meshCfg := krt.FetchOne(ctx, meshConfig.AsCollection())
Expand All @@ -145,7 +148,7 @@ func PolicyCollections(
Bound: pol != nil,
},
}
}, krt.WithName("AuthzDerivedPolicies"))
}, krt.WithName("AuthzDerivedPolicies"), withDebug)

PeerAuthDerivedPolicies := krt.NewCollection(peerAuths, func(ctx krt.HandlerContext, i *securityclient.PeerAuthentication) *model.WorkloadAuthorization {
meshCfg := krt.FetchOne(ctx, meshConfig.AsCollection())
Expand All @@ -157,11 +160,11 @@ func PolicyCollections(
Authorization: pol,
LabelSelector: model.NewSelector(i.Spec.GetSelector().GetMatchLabels()),
}
}, krt.WithName("PeerAuthDerivedPolicies"))
}, krt.WithName("PeerAuthDerivedPolicies"), withDebug)

ImplicitWaypointPolicies := krt.NewCollection(waypoints, func(ctx krt.HandlerContext, waypoint Waypoint) *model.WorkloadAuthorization {
return implicitWaypointPolicy(ctx, meshConfig, waypoint)
}, krt.WithName("DefaultAllowFromWaypointPolicies"))
}, krt.WithName("DefaultAllowFromWaypointPolicies"), withDebug)

DefaultPolicy := krt.NewSingleton[model.WorkloadAuthorization](func(ctx krt.HandlerContext) *model.WorkloadAuthorization {
if len(krt.Fetch(ctx, peerAuths)) == 0 {
Expand Down Expand Up @@ -195,9 +198,10 @@ func PolicyCollections(
},
},
}
}, krt.WithName("DefaultPolicy"))
}, krt.WithName("DefaultPolicy"), withDebug)

// Policies contains all of the policies we will send down to clients
// No need to add withDebug on join since it is trivial
Policies := krt.JoinCollection([]krt.Collection[model.WorkloadAuthorization]{
AuthzDerivedPolicies,
PeerAuthDerivedPolicies,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func (a *index) ServicesCollection(
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
waypoints krt.Collection[Waypoint],
namespaces krt.Collection[*v1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.ServiceInfo] {
ServicesInfo := krt.NewCollection(services, a.serviceServiceBuilder(waypoints, namespaces), krt.WithName("ServicesInfo"))
ServiceEntriesInfo := krt.NewManyCollection(serviceEntries, a.serviceEntryServiceBuilder(waypoints, namespaces), krt.WithName("ServiceEntriesInfo"))
ServicesInfo := krt.NewCollection(services, a.serviceServiceBuilder(waypoints, namespaces),
krt.WithName("ServicesInfo"), withDebug)
ServiceEntriesInfo := krt.NewManyCollection(serviceEntries, a.serviceEntryServiceBuilder(waypoints, namespaces),
krt.WithName("ServiceEntriesInfo"), withDebug)
WorkloadServices := krt.JoinCollection([]krt.Collection[model.ServiceInfo]{ServicesInfo, ServiceEntriesInfo}, krt.WithName("WorkloadServices"))
return WorkloadServices
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func RegisterEdsShim(
WorkloadsByServiceKey krt.Index[string, model.WorkloadInfo],
Services krt.Collection[model.ServiceInfo],
ServicesByAddress krt.Index[networkAddress, model.ServiceInfo],
withDebug krt.CollectionOption,
) {
ServiceEds := krt.NewCollection(
Services,
Expand Down Expand Up @@ -91,7 +92,7 @@ func RegisterEdsShim(
}),
}
},
krt.WithName("ServiceEds"))
krt.WithName("ServiceEds"), withDebug)
ServiceEds.RegisterBatch(
PushXds(xdsUpdater, func(svc serviceEDS) model.ConfigKey {
ns, hostname, _ := strings.Cut(svc.ServiceKey, "/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (a *index) WaypointsCollection(
gateways krt.Collection[*v1beta1.Gateway],
gatewayClasses krt.Collection[*v1beta1.GatewayClass],
pods krt.Collection[*v1.Pod],
withDebug krt.CollectionOption,
) krt.Collection[Waypoint] {
podsByNamespace := krt.NewNamespaceIndex(pods)
return krt.NewCollection(gateways, func(ctx krt.HandlerContext, gateway *v1beta1.Gateway) *Waypoint {
Expand Down Expand Up @@ -241,7 +242,7 @@ func (a *index) WaypointsCollection(
}

return a.makeWaypoint(gateway, gatewayClass, serviceAccounts, trafficType)
}, krt.WithName("Waypoints"))
}, krt.WithName("Waypoints"), withDebug)
}

func makeInboundBinding(gateway *v1beta1.Gateway, gatewayClass *v1beta1.GatewayClass) *InboundBinding {
Expand Down
11 changes: 6 additions & 5 deletions pilot/pkg/serviceregistry/kube/controller/ambient/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (a *index) WorkloadsCollection(
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
endpointSlices krt.Collection[*discovery.EndpointSlice],
namespaces krt.Collection[*v1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.WorkloadInfo] {
WorkloadServicesNamespaceIndex := krt.NewNamespaceIndex(workloadServices)
EndpointSlicesByIPIndex := endpointSliceAddressIndex(endpointSlices)
Expand All @@ -82,20 +83,20 @@ func (a *index) WorkloadsCollection(
namespaces,
nodes,
),
krt.WithName("PodWorkloads"),
krt.WithName("PodWorkloads"), withDebug,
)
// Workloads coming from workloadEntries. These are 1:1 with WorkloadEntry.
WorkloadEntryWorkloads := krt.NewCollection(
workloadEntries,
a.workloadEntryWorkloadBuilder(meshConfig, authorizationPolicies, peerAuths, waypoints, workloadServices, WorkloadServicesNamespaceIndex, namespaces),
krt.WithName("WorkloadEntryWorkloads"),
krt.WithName("WorkloadEntryWorkloads"), withDebug,
)
// Workloads coming from serviceEntries. These are inlined workloadEntries (under `spec.endpoints`); these serviceEntries will
// also be generating `workloadapi.Service` definitions in the `ServicesCollection` logic.
ServiceEntryWorkloads := krt.NewManyCollection(
serviceEntries,
a.serviceEntryWorkloadBuilder(meshConfig, authorizationPolicies, peerAuths, waypoints, namespaces),
krt.WithName("ServiceEntryWorkloads"),
krt.WithName("ServiceEntryWorkloads"), withDebug,
)
// Workloads coming from endpointSlices. These are for *manually added* endpoints. Typically, Kubernetes will insert each pod
// into the EndpointSlice. This is because Kubernetes has 3 APIs in its model: Service, Pod, and EndpointSlice.
Expand All @@ -105,12 +106,12 @@ func (a *index) WorkloadsCollection(
EndpointSliceWorkloads := krt.NewManyCollection(
endpointSlices,
a.endpointSlicesBuilder(meshConfig, workloadServices),
krt.WithName("EndpointSliceWorkloads"))
krt.WithName("EndpointSliceWorkloads"), withDebug)

NetworkGatewayWorkloads := krt.NewManyFromNothing[model.WorkloadInfo](func(ctx krt.HandlerContext) []model.WorkloadInfo {
a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network
return slices.Map(a.LookupNetworkGateways(), convertGateway)
}, krt.WithName("NetworkGatewayWorkloads"))
}, krt.WithName("NetworkGatewayWorkloads"), withDebug)

Workloads := krt.JoinCollection([]krt.Collection[model.WorkloadInfo]{
PodWorkloads,
Expand Down
2 changes: 2 additions & 0 deletions pilot/pkg/serviceregistry/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
kubelib "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/krt"
istiolog "istio.io/istio/pkg/log"
"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/monitoring"
Expand Down Expand Up @@ -290,6 +291,7 @@ func NewController(kubeClient kubelib.Client, options Options) *Controller {
LookupNetwork: c.Network,
LookupNetworkGateways: c.NetworkGateways,
StatusNotifier: options.StatusWritingEnabled,
Debugger: krt.GlobalDebugHandler,
})
}
c.exports = newServiceExportCache(c)
Expand Down
Loading