Skip to content

Commit

Permalink
krt: initial debug interface (istio#53597)
Browse files Browse the repository at this point in the history
* krt: initial debug interface

* make it more explicit

* fixes

* banner

* revert api change

* lint
  • Loading branch information
howardjohn authored and 马元元 committed Dec 8, 2024
1 parent 6f24cce commit 9b81de0
Show file tree
Hide file tree
Showing 25 changed files with 458 additions and 86 deletions.
40 changes: 23 additions & 17 deletions pilot/pkg/serviceregistry/kube/controller/ambient/ambientindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,13 @@ type Options struct {
LookupNetwork LookupNetwork
LookupNetworkGateways LookupNetworkGateways
StatusNotifier *activenotifier.ActiveNotifier

Debugger *krt.DebugHandler
}

func New(options Options) Index {
a := &index{
networkUpdateTrigger: krt.NewRecomputeTrigger(false),
networkUpdateTrigger: krt.NewRecomputeTrigger(false, krt.WithName("NetworkTrigger")),

SystemNamespace: options.SystemNamespace,
DomainSuffix: options.DomainSuffix,
Expand All @@ -140,53 +142,54 @@ func New(options Options) Index {
filter := kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
}
ConfigMaps := krt.NewInformerFiltered[*v1.ConfigMap](options.Client, filter, krt.WithName("ConfigMaps"))
withDebug := krt.WithDebugging(options.Debugger)
ConfigMaps := krt.NewInformerFiltered[*v1.ConfigMap](options.Client, filter, krt.WithName("ConfigMaps"), withDebug)

authzPolicies := kclient.NewDelayedInformer[*securityclient.AuthorizationPolicy](options.Client,
gvr.AuthorizationPolicy, kubetypes.StandardInformer, filter)
AuthzPolicies := krt.WrapClient[*securityclient.AuthorizationPolicy](authzPolicies, krt.WithName("AuthorizationPolicies"))
AuthzPolicies := krt.WrapClient[*securityclient.AuthorizationPolicy](authzPolicies, krt.WithName("AuthorizationPolicies"), withDebug)

peerAuths := kclient.NewDelayedInformer[*securityclient.PeerAuthentication](options.Client,
gvr.PeerAuthentication, kubetypes.StandardInformer, filter)
PeerAuths := krt.WrapClient[*securityclient.PeerAuthentication](peerAuths, krt.WithName("PeerAuthentications"))
PeerAuths := krt.WrapClient[*securityclient.PeerAuthentication](peerAuths, krt.WithName("PeerAuthentications"), withDebug)

serviceEntries := kclient.NewDelayedInformer[*networkingclient.ServiceEntry](options.Client,
gvr.ServiceEntry, kubetypes.StandardInformer, filter)
ServiceEntries := krt.WrapClient[*networkingclient.ServiceEntry](serviceEntries, krt.WithName("ServiceEntries"))
ServiceEntries := krt.WrapClient[*networkingclient.ServiceEntry](serviceEntries, krt.WithName("ServiceEntries"), withDebug)

workloadEntries := kclient.NewDelayedInformer[*networkingclient.WorkloadEntry](options.Client,
gvr.WorkloadEntry, kubetypes.StandardInformer, filter)
WorkloadEntries := krt.WrapClient[*networkingclient.WorkloadEntry](workloadEntries, krt.WithName("WorkloadEntries"))
WorkloadEntries := krt.WrapClient[*networkingclient.WorkloadEntry](workloadEntries, krt.WithName("WorkloadEntries"), withDebug)

gatewayClient := kclient.NewDelayedInformer[*v1beta1.Gateway](options.Client, gvr.KubernetesGateway, kubetypes.StandardInformer, filter)
Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, krt.WithName("Gateways"))
Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, krt.WithName("Gateways"), withDebug)

gatewayClassClient := kclient.NewDelayedInformer[*v1beta1.GatewayClass](options.Client, gvr.GatewayClass, kubetypes.StandardInformer, filter)
GatewayClasses := krt.WrapClient[*v1beta1.GatewayClass](gatewayClassClient, krt.WithName("GatewayClasses"))
GatewayClasses := krt.WrapClient[*v1beta1.GatewayClass](gatewayClassClient, krt.WithName("GatewayClasses"), withDebug)

servicesClient := kclient.NewFiltered[*v1.Service](options.Client, filter)
Services := krt.WrapClient[*v1.Service](servicesClient, krt.WithName("Services"))
Services := krt.WrapClient[*v1.Service](servicesClient, krt.WithName("Services"), withDebug)
Nodes := krt.NewInformerFiltered[*v1.Node](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
ObjectTransform: kubeclient.StripNodeUnusedFields,
}, krt.WithName("Nodes"))
}, krt.WithName("Nodes"), withDebug)
Pods := krt.NewInformerFiltered[*v1.Pod](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
ObjectTransform: kubeclient.StripPodUnusedFields,
}, krt.WithName("Pods"))
}, krt.WithName("Pods"), withDebug)

// TODO: Should this go ahead and transform the full ns into some intermediary with just the details we care about?
Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"))
Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"), withDebug)

EndpointSlices := krt.NewInformerFiltered[*discovery.EndpointSlice](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
}, krt.WithName("EndpointSlices"))
}, krt.WithName("EndpointSlices"), withDebug)

MeshConfig := MeshConfigCollection(ConfigMaps, options)
Waypoints := a.WaypointsCollection(Gateways, GatewayClasses, Pods)
MeshConfig := MeshConfigCollection(ConfigMaps, options, withDebug)
Waypoints := a.WaypointsCollection(Gateways, GatewayClasses, Pods, withDebug)

// AllPolicies includes peer-authentication converted policies
AuthorizationPolicies, AllPolicies := PolicyCollections(AuthzPolicies, PeerAuths, MeshConfig, Waypoints)
AuthorizationPolicies, AllPolicies := PolicyCollections(AuthzPolicies, PeerAuths, MeshConfig, Waypoints, withDebug)
AllPolicies.RegisterBatch(PushXds(a.XDSUpdater,
func(i model.WorkloadAuthorization) model.ConfigKey {
if i.Authorization == nil {
Expand All @@ -199,14 +202,15 @@ func New(options Options) Index {
servicesWriter := kclient.NewWriteClient[*v1.Service](options.Client)

// these are workloadapi-style services combined from kube services and service entries
WorkloadServices := a.ServicesCollection(Services, ServiceEntries, Waypoints, Namespaces)
WorkloadServices := a.ServicesCollection(Services, ServiceEntries, Waypoints, Namespaces, withDebug)

WaypointPolicyStatus := WaypointPolicyStatusCollection(
AuthzPolicies,
Waypoints,
Services,
ServiceEntries,
Namespaces,
withDebug,
)

authorizationPoliciesWriter := kclient.NewWriteClient[*securityclient.AuthorizationPolicy](options.Client)
Expand Down Expand Up @@ -270,6 +274,7 @@ func New(options Options) Index {
ServiceEntries,
EndpointSlices,
Namespaces,
withDebug,
)

WorkloadAddressIndex := krt.NewIndex[networkAddress, model.WorkloadInfo](Workloads, networkAddressFromWorkload)
Expand Down Expand Up @@ -311,6 +316,7 @@ func New(options Options) Index {
WorkloadServiceIndex,
WorkloadServices,
ServiceAddressIndex,
withDebug,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
k8sv1 "sigs.k8s.io/gateway-api/apis/v1"
k8sbeta "sigs.k8s.io/gateway-api/apis/v1beta1"
"sigs.k8s.io/yaml"

"istio.io/api/annotation"
"istio.io/api/label"
Expand Down Expand Up @@ -1486,6 +1487,7 @@ func newAmbientTestServer(t *testing.T, clusterID cluster.ID, networkID network.
} {
clienttest.MakeCRD(t, cl, crd)
}
debugger := &krt.DebugHandler{}
idx := New(Options{
Client: cl,
SystemNamespace: systemNS,
Expand All @@ -1499,19 +1501,12 @@ func newAmbientTestServer(t *testing.T, clusterID cluster.ID, networkID network.
return nil
},
StatusNotifier: activenotifier.New(true),
Debugger: debugger,
})
idx.NetworksSynced()
cl.RunAndWait(test.NewStop(t))

t.Cleanup(func() {
if t.Failed() {
idx := idx.(*index)
krt.Dump(idx.authorizationPolicies)
krt.Dump(idx.workloads.Collection)
krt.Dump(idx.services.Collection)
krt.Dump(idx.waypoints.Collection)
}
})
dumpOnFailure(t, debugger)
a := &ambientTestServer{
t: t,
clusterID: clusterID,
Expand Down Expand Up @@ -1551,6 +1546,15 @@ func newAmbientTestServer(t *testing.T, clusterID cluster.ID, networkID network.
return a
}

func dumpOnFailure(t *testing.T, debugger *krt.DebugHandler) {
t.Cleanup(func() {
if t.Failed() {
b, _ := yaml.Marshal(debugger)
t.Log(string(b))
}
})
}

func (s *ambientTestServer) addWaypoint(t *testing.T, ip, name, trafficType string, ready bool) {
s.addWaypointSpecificAddress(t, ip, fmt.Sprintf("%s.%s.svc.%s", name, testNS, s.DomainSuffix), name, trafficType, ready)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestWaypointPolicyStatusCollection(t *testing.T) {
}
})

wpsCollection := WaypointPolicyStatusCollection(authzPolCol, waypointCol, svcCol, seCol, nsCol)
wpsCollection := WaypointPolicyStatusCollection(authzPolCol, waypointCol, svcCol, seCol, nsCol, krt.WithDebugging(&krt.DebugHandler{}))
c.RunAndWait(ctx.Done())

_, err := clientNs.Create(&v1.Namespace{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type MeshConfig struct {
*meshapi.MeshConfig
}

func (m MeshConfig) ResourceName() string { return " " }
func (m MeshConfig) ResourceName() string { return "MeshConfig" }

func (m MeshConfig) Equals(other MeshConfig) bool { return proto.Equal(m.MeshConfig, other.MeshConfig) }

func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Options) krt.Singleton[MeshConfig] {
func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Options, withDebug krt.CollectionOption) krt.Singleton[MeshConfig] {
cmName := "istio"
if options.Revision != "" && options.Revision != "default" {
cmName = cmName + "-" + options.Revision
Expand All @@ -60,7 +60,7 @@ func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Opti
meshCfg = n
}
return &MeshConfig{meshCfg}
}, krt.WithName("MeshConfig"),
}, krt.WithName("MeshConfig"), withDebug,
)
}

Expand Down
16 changes: 10 additions & 6 deletions pilot/pkg/serviceregistry/kube/controller/ambient/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (
"istio.io/istio/pkg/workloadapi/security"
)

func WaypointPolicyStatusCollection(authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
func WaypointPolicyStatusCollection(
authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
waypoints krt.Collection[Waypoint],
services krt.Collection[*corev1.Service],
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
namespaces krt.Collection[*corev1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.WaypointPolicyStatus] {
return krt.NewCollection(authzPolicies,
func(ctx krt.HandlerContext, i *securityclient.AuthorizationPolicy) *model.WaypointPolicyStatus {
Expand Down Expand Up @@ -118,14 +120,15 @@ func WaypointPolicyStatusCollection(authzPolicies krt.Collection[*securityclient
Source: MakeSource(i),
Conditions: conditions,
}
}, krt.WithName("WaypointPolicyStatuses"))
}, krt.WithName("WaypointPolicyStatuses"), withDebug)
}

func PolicyCollections(
authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
peerAuths krt.Collection[*securityclient.PeerAuthentication],
meshConfig krt.Singleton[MeshConfig],
waypoints krt.Collection[Waypoint],
withDebug krt.CollectionOption,
) (krt.Collection[model.WorkloadAuthorization], krt.Collection[model.WorkloadAuthorization]) {
AuthzDerivedPolicies := krt.NewCollection(authzPolicies, func(ctx krt.HandlerContext, i *securityclient.AuthorizationPolicy) *model.WorkloadAuthorization {
meshCfg := krt.FetchOne(ctx, meshConfig.AsCollection())
Expand All @@ -145,7 +148,7 @@ func PolicyCollections(
Bound: pol != nil,
},
}
}, krt.WithName("AuthzDerivedPolicies"))
}, krt.WithName("AuthzDerivedPolicies"), withDebug)

PeerAuthDerivedPolicies := krt.NewCollection(peerAuths, func(ctx krt.HandlerContext, i *securityclient.PeerAuthentication) *model.WorkloadAuthorization {
meshCfg := krt.FetchOne(ctx, meshConfig.AsCollection())
Expand All @@ -157,11 +160,11 @@ func PolicyCollections(
Authorization: pol,
LabelSelector: model.NewSelector(i.Spec.GetSelector().GetMatchLabels()),
}
}, krt.WithName("PeerAuthDerivedPolicies"))
}, krt.WithName("PeerAuthDerivedPolicies"), withDebug)

ImplicitWaypointPolicies := krt.NewCollection(waypoints, func(ctx krt.HandlerContext, waypoint Waypoint) *model.WorkloadAuthorization {
return implicitWaypointPolicy(ctx, meshConfig, waypoint)
}, krt.WithName("DefaultAllowFromWaypointPolicies"))
}, krt.WithName("DefaultAllowFromWaypointPolicies"), withDebug)

DefaultPolicy := krt.NewSingleton[model.WorkloadAuthorization](func(ctx krt.HandlerContext) *model.WorkloadAuthorization {
if len(krt.Fetch(ctx, peerAuths)) == 0 {
Expand Down Expand Up @@ -195,9 +198,10 @@ func PolicyCollections(
},
},
}
}, krt.WithName("DefaultPolicy"))
}, krt.WithName("DefaultPolicy"), withDebug)

// Policies contains all of the policies we will send down to clients
// No need to add withDebug on join since it is trivial
Policies := krt.JoinCollection([]krt.Collection[model.WorkloadAuthorization]{
AuthzDerivedPolicies,
PeerAuthDerivedPolicies,
Expand Down
7 changes: 5 additions & 2 deletions pilot/pkg/serviceregistry/kube/controller/ambient/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func (a *index) ServicesCollection(
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
waypoints krt.Collection[Waypoint],
namespaces krt.Collection[*v1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.ServiceInfo] {
ServicesInfo := krt.NewCollection(services, a.serviceServiceBuilder(waypoints, namespaces), krt.WithName("ServicesInfo"))
ServiceEntriesInfo := krt.NewManyCollection(serviceEntries, a.serviceEntryServiceBuilder(waypoints, namespaces), krt.WithName("ServiceEntriesInfo"))
ServicesInfo := krt.NewCollection(services, a.serviceServiceBuilder(waypoints, namespaces),
krt.WithName("ServicesInfo"), withDebug)
ServiceEntriesInfo := krt.NewManyCollection(serviceEntries, a.serviceEntryServiceBuilder(waypoints, namespaces),
krt.WithName("ServiceEntriesInfo"), withDebug)
WorkloadServices := krt.JoinCollection([]krt.Collection[model.ServiceInfo]{ServicesInfo, ServiceEntriesInfo}, krt.WithName("WorkloadServices"))
return WorkloadServices
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func RegisterEdsShim(
WorkloadsByServiceKey krt.Index[string, model.WorkloadInfo],
Services krt.Collection[model.ServiceInfo],
ServicesByAddress krt.Index[networkAddress, model.ServiceInfo],
withDebug krt.CollectionOption,
) {
ServiceEds := krt.NewCollection(
Services,
Expand Down Expand Up @@ -91,7 +92,7 @@ func RegisterEdsShim(
}),
}
},
krt.WithName("ServiceEds"))
krt.WithName("ServiceEds"), withDebug)
ServiceEds.RegisterBatch(
PushXds(xdsUpdater, func(svc serviceEDS) model.ConfigKey {
ns, hostname, _ := strings.Cut(svc.ServiceKey, "/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (a *index) WaypointsCollection(
gateways krt.Collection[*v1beta1.Gateway],
gatewayClasses krt.Collection[*v1beta1.GatewayClass],
pods krt.Collection[*v1.Pod],
withDebug krt.CollectionOption,
) krt.Collection[Waypoint] {
podsByNamespace := krt.NewNamespaceIndex(pods)
return krt.NewCollection(gateways, func(ctx krt.HandlerContext, gateway *v1beta1.Gateway) *Waypoint {
Expand Down Expand Up @@ -241,7 +242,7 @@ func (a *index) WaypointsCollection(
}

return a.makeWaypoint(gateway, gatewayClass, serviceAccounts, trafficType)
}, krt.WithName("Waypoints"))
}, krt.WithName("Waypoints"), withDebug)
}

func makeInboundBinding(gateway *v1beta1.Gateway, gatewayClass *v1beta1.GatewayClass) *InboundBinding {
Expand Down
11 changes: 6 additions & 5 deletions pilot/pkg/serviceregistry/kube/controller/ambient/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (a *index) WorkloadsCollection(
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
endpointSlices krt.Collection[*discovery.EndpointSlice],
namespaces krt.Collection[*v1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.WorkloadInfo] {
WorkloadServicesNamespaceIndex := krt.NewNamespaceIndex(workloadServices)
EndpointSlicesByIPIndex := endpointSliceAddressIndex(endpointSlices)
Expand All @@ -82,20 +83,20 @@ func (a *index) WorkloadsCollection(
namespaces,
nodes,
),
krt.WithName("PodWorkloads"),
krt.WithName("PodWorkloads"), withDebug,
)
// Workloads coming from workloadEntries. These are 1:1 with WorkloadEntry.
WorkloadEntryWorkloads := krt.NewCollection(
workloadEntries,
a.workloadEntryWorkloadBuilder(meshConfig, authorizationPolicies, peerAuths, waypoints, workloadServices, WorkloadServicesNamespaceIndex, namespaces),
krt.WithName("WorkloadEntryWorkloads"),
krt.WithName("WorkloadEntryWorkloads"), withDebug,
)
// Workloads coming from serviceEntries. These are inlined workloadEntries (under `spec.endpoints`); these serviceEntries will
// also be generating `workloadapi.Service` definitions in the `ServicesCollection` logic.
ServiceEntryWorkloads := krt.NewManyCollection(
serviceEntries,
a.serviceEntryWorkloadBuilder(meshConfig, authorizationPolicies, peerAuths, waypoints, namespaces),
krt.WithName("ServiceEntryWorkloads"),
krt.WithName("ServiceEntryWorkloads"), withDebug,
)
// Workloads coming from endpointSlices. These are for *manually added* endpoints. Typically, Kubernetes will insert each pod
// into the EndpointSlice. This is because Kubernetes has 3 APIs in its model: Service, Pod, and EndpointSlice.
Expand All @@ -105,12 +106,12 @@ func (a *index) WorkloadsCollection(
EndpointSliceWorkloads := krt.NewManyCollection(
endpointSlices,
a.endpointSlicesBuilder(meshConfig, workloadServices),
krt.WithName("EndpointSliceWorkloads"))
krt.WithName("EndpointSliceWorkloads"), withDebug)

NetworkGatewayWorkloads := krt.NewManyFromNothing[model.WorkloadInfo](func(ctx krt.HandlerContext) []model.WorkloadInfo {
a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network
return slices.Map(a.LookupNetworkGateways(), convertGateway)
}, krt.WithName("NetworkGatewayWorkloads"))
}, krt.WithName("NetworkGatewayWorkloads"), withDebug)

Workloads := krt.JoinCollection([]krt.Collection[model.WorkloadInfo]{
PodWorkloads,
Expand Down
2 changes: 2 additions & 0 deletions pilot/pkg/serviceregistry/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
kubelib "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/krt"
istiolog "istio.io/istio/pkg/log"
"istio.io/istio/pkg/maps"
"istio.io/istio/pkg/monitoring"
Expand Down Expand Up @@ -290,6 +291,7 @@ func NewController(kubeClient kubelib.Client, options Options) *Controller {
LookupNetwork: c.Network,
LookupNetworkGateways: c.NetworkGateways,
StatusNotifier: options.StatusWritingEnabled,
Debugger: krt.GlobalDebugHandler,
})
}
c.exports = newServiceExportCache(c)
Expand Down
Loading

0 comments on commit 9b81de0

Please sign in to comment.