Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

krt: initial debug interface #53597

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
make it more explicit
  • Loading branch information
howardjohn committed Nov 5, 2024
commit e319633bc7cb9099361d80b1113f3acd734d451c
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,5 @@ require (
sigs.k8s.io/kustomize/kyaml v0.17.1 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)

replace istio.io/api => ../api
39 changes: 22 additions & 17 deletions pilot/pkg/serviceregistry/kube/controller/ambient/ambientindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type Options struct {

func New(options Options) Index {
a := &index{
networkUpdateTrigger: krt.NewRecomputeTrigger(false),
networkUpdateTrigger: krt.NewRecomputeTrigger(false, krt.WithName("NetworkTrigger")),

SystemNamespace: options.SystemNamespace,
DomainSuffix: options.DomainSuffix,
Expand All @@ -140,53 +140,55 @@ func New(options Options) Index {
filter := kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
}
ConfigMaps := krt.NewInformerFiltered[*v1.ConfigMap](options.Client, filter, krt.WithName("ConfigMaps"))
debugger := &krt.DebugHandler{}
withDebug := krt.WithDebugging(debugger)
ConfigMaps := krt.NewInformerFiltered[*v1.ConfigMap](options.Client, filter, krt.WithName("ConfigMaps"), withDebug)

authzPolicies := kclient.NewDelayedInformer[*securityclient.AuthorizationPolicy](options.Client,
gvr.AuthorizationPolicy, kubetypes.StandardInformer, filter)
AuthzPolicies := krt.WrapClient[*securityclient.AuthorizationPolicy](authzPolicies, krt.WithName("AuthorizationPolicies"))
AuthzPolicies := krt.WrapClient[*securityclient.AuthorizationPolicy](authzPolicies, krt.WithName("AuthorizationPolicies"), withDebug)

peerAuths := kclient.NewDelayedInformer[*securityclient.PeerAuthentication](options.Client,
gvr.PeerAuthentication, kubetypes.StandardInformer, filter)
PeerAuths := krt.WrapClient[*securityclient.PeerAuthentication](peerAuths, krt.WithName("PeerAuthentications"))
PeerAuths := krt.WrapClient[*securityclient.PeerAuthentication](peerAuths, krt.WithName("PeerAuthentications"), withDebug)

serviceEntries := kclient.NewDelayedInformer[*networkingclient.ServiceEntry](options.Client,
gvr.ServiceEntry, kubetypes.StandardInformer, filter)
ServiceEntries := krt.WrapClient[*networkingclient.ServiceEntry](serviceEntries, krt.WithName("ServiceEntries"))
ServiceEntries := krt.WrapClient[*networkingclient.ServiceEntry](serviceEntries, krt.WithName("ServiceEntries"), withDebug)

workloadEntries := kclient.NewDelayedInformer[*networkingclient.WorkloadEntry](options.Client,
gvr.WorkloadEntry, kubetypes.StandardInformer, filter)
WorkloadEntries := krt.WrapClient[*networkingclient.WorkloadEntry](workloadEntries, krt.WithName("WorkloadEntries"))
WorkloadEntries := krt.WrapClient[*networkingclient.WorkloadEntry](workloadEntries, krt.WithName("WorkloadEntries"), withDebug)

gatewayClient := kclient.NewDelayedInformer[*v1beta1.Gateway](options.Client, gvr.KubernetesGateway, kubetypes.StandardInformer, filter)
Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, krt.WithName("Gateways"))
Gateways := krt.WrapClient[*v1beta1.Gateway](gatewayClient, krt.WithName("Gateways"), withDebug)

gatewayClassClient := kclient.NewDelayedInformer[*v1beta1.GatewayClass](options.Client, gvr.GatewayClass, kubetypes.StandardInformer, filter)
GatewayClasses := krt.WrapClient[*v1beta1.GatewayClass](gatewayClassClient, krt.WithName("GatewayClasses"))
GatewayClasses := krt.WrapClient[*v1beta1.GatewayClass](gatewayClassClient, krt.WithName("GatewayClasses"), withDebug)

servicesClient := kclient.NewFiltered[*v1.Service](options.Client, filter)
Services := krt.WrapClient[*v1.Service](servicesClient, krt.WithName("Services"))
Services := krt.WrapClient[*v1.Service](servicesClient, krt.WithName("Services"), withDebug)
Nodes := krt.NewInformerFiltered[*v1.Node](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
ObjectTransform: kubeclient.StripNodeUnusedFields,
}, krt.WithName("Nodes"))
}, krt.WithName("Nodes"), withDebug)
Pods := krt.NewInformerFiltered[*v1.Pod](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
ObjectTransform: kubeclient.StripPodUnusedFields,
}, krt.WithName("Pods"))
}, krt.WithName("Pods"), withDebug)

// TODO: Should this go ahead and transform the full ns into some intermediary with just the details we care about?
Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"))
Namespaces := krt.NewInformer[*v1.Namespace](options.Client, krt.WithName("Namespaces"), withDebug)

EndpointSlices := krt.NewInformerFiltered[*discovery.EndpointSlice](options.Client, kclient.Filter{
ObjectFilter: options.Client.ObjectFilter(),
}, krt.WithName("EndpointSlices"))
}, krt.WithName("EndpointSlices"), withDebug)

MeshConfig := MeshConfigCollection(ConfigMaps, options)
Waypoints := a.WaypointsCollection(Gateways, GatewayClasses, Pods)
MeshConfig := MeshConfigCollection(ConfigMaps, options, withDebug)
Waypoints := a.WaypointsCollection(Gateways, GatewayClasses, Pods, withDebug)

// AllPolicies includes peer-authentication converted policies
AuthorizationPolicies, AllPolicies := PolicyCollections(AuthzPolicies, PeerAuths, MeshConfig, Waypoints)
AuthorizationPolicies, AllPolicies := PolicyCollections(AuthzPolicies, PeerAuths, MeshConfig, Waypoints, withDebug)
AllPolicies.RegisterBatch(PushXds(a.XDSUpdater,
func(i model.WorkloadAuthorization) model.ConfigKey {
if i.Authorization == nil {
Expand All @@ -199,14 +201,15 @@ func New(options Options) Index {
servicesWriter := kclient.NewWriteClient[*v1.Service](options.Client)

// these are workloadapi-style services combined from kube services and service entries
WorkloadServices := a.ServicesCollection(Services, ServiceEntries, Waypoints, Namespaces)
WorkloadServices := a.ServicesCollection(Services, ServiceEntries, Waypoints, Namespaces, withDebug)

WaypointPolicyStatus := WaypointPolicyStatusCollection(
AuthzPolicies,
Waypoints,
Services,
ServiceEntries,
Namespaces,
withDebug,
)

authorizationPoliciesWriter := kclient.NewWriteClient[*securityclient.AuthorizationPolicy](options.Client)
Expand Down Expand Up @@ -270,6 +273,7 @@ func New(options Options) Index {
ServiceEntries,
EndpointSlices,
Namespaces,
withDebug,
)

WorkloadAddressIndex := krt.NewIndex[networkAddress, model.WorkloadInfo](Workloads, networkAddressFromWorkload)
Expand Down Expand Up @@ -311,6 +315,7 @@ func New(options Options) Index {
WorkloadServiceIndex,
WorkloadServices,
ServiceAddressIndex,
withDebug,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestWaypointPolicyStatusCollection(t *testing.T) {
}
})

wpsCollection := WaypointPolicyStatusCollection(authzPolCol, waypointCol, svcCol, seCol, nsCol)
wpsCollection := WaypointPolicyStatusCollection(authzPolCol, waypointCol, svcCol, seCol, nsCol, nil)
c.RunAndWait(ctx.Done())

_, err := clientNs.Create(&v1.Namespace{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type MeshConfig struct {
*meshapi.MeshConfig
}

func (m MeshConfig) ResourceName() string { return " " }
func (m MeshConfig) ResourceName() string { return "MeshConfig" }

func (m MeshConfig) Equals(other MeshConfig) bool { return proto.Equal(m.MeshConfig, other.MeshConfig) }

func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Options) krt.Singleton[MeshConfig] {
func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Options, withDebug krt.CollectionOption) krt.Singleton[MeshConfig] {
cmName := "istio"
if options.Revision != "" && options.Revision != "default" {
cmName = cmName + "-" + options.Revision
Expand All @@ -60,7 +60,7 @@ func MeshConfigCollection(configMaps krt.Collection[*v1.ConfigMap], options Opti
meshCfg = n
}
return &MeshConfig{meshCfg}
}, krt.WithName("MeshConfig"),
}, krt.WithName("MeshConfig"), withDebug,
)
}

Expand Down
16 changes: 10 additions & 6 deletions pilot/pkg/serviceregistry/kube/controller/ambient/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import (
"istio.io/istio/pkg/workloadapi/security"
)

func WaypointPolicyStatusCollection(authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
func WaypointPolicyStatusCollection(
authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
waypoints krt.Collection[Waypoint],
services krt.Collection[*corev1.Service],
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
namespaces krt.Collection[*corev1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.WaypointPolicyStatus] {
return krt.NewCollection(authzPolicies,
func(ctx krt.HandlerContext, i *securityclient.AuthorizationPolicy) *model.WaypointPolicyStatus {
Expand Down Expand Up @@ -118,14 +120,15 @@ func WaypointPolicyStatusCollection(authzPolicies krt.Collection[*securityclient
Source: MakeSource(i),
Conditions: conditions,
}
}, krt.WithName("WaypointPolicyStatuses"))
}, krt.WithName("WaypointPolicyStatuses"), withDebug)
}

func PolicyCollections(
authzPolicies krt.Collection[*securityclient.AuthorizationPolicy],
peerAuths krt.Collection[*securityclient.PeerAuthentication],
meshConfig krt.Singleton[MeshConfig],
waypoints krt.Collection[Waypoint],
withDebug krt.CollectionOption,
) (krt.Collection[model.WorkloadAuthorization], krt.Collection[model.WorkloadAuthorization]) {
AuthzDerivedPolicies := krt.NewCollection(authzPolicies, func(ctx krt.HandlerContext, i *securityclient.AuthorizationPolicy) *model.WorkloadAuthorization {
meshCfg := krt.FetchOne(ctx, meshConfig.AsCollection())
Expand All @@ -145,7 +148,7 @@ func PolicyCollections(
Bound: pol != nil,
},
}
}, krt.WithName("AuthzDerivedPolicies"))
}, krt.WithName("AuthzDerivedPolicies"), withDebug)

PeerAuthDerivedPolicies := krt.NewCollection(peerAuths, func(ctx krt.HandlerContext, i *securityclient.PeerAuthentication) *model.WorkloadAuthorization {
meshCfg := krt.FetchOne(ctx, meshConfig.AsCollection())
Expand All @@ -157,11 +160,11 @@ func PolicyCollections(
Authorization: pol,
LabelSelector: model.NewSelector(i.Spec.GetSelector().GetMatchLabels()),
}
}, krt.WithName("PeerAuthDerivedPolicies"))
}, krt.WithName("PeerAuthDerivedPolicies"), withDebug)

ImplicitWaypointPolicies := krt.NewCollection(waypoints, func(ctx krt.HandlerContext, waypoint Waypoint) *model.WorkloadAuthorization {
return implicitWaypointPolicy(ctx, meshConfig, waypoint)
}, krt.WithName("DefaultAllowFromWaypointPolicies"))
}, krt.WithName("DefaultAllowFromWaypointPolicies"), withDebug)

DefaultPolicy := krt.NewSingleton[model.WorkloadAuthorization](func(ctx krt.HandlerContext) *model.WorkloadAuthorization {
if len(krt.Fetch(ctx, peerAuths)) == 0 {
Expand Down Expand Up @@ -195,9 +198,10 @@ func PolicyCollections(
},
},
}
}, krt.WithName("DefaultPolicy"))
}, krt.WithName("DefaultPolicy"), withDebug)

// Policies contains all of the policies we will send down to clients
// No need to add withDebug on join since it is trivial
Policies := krt.JoinCollection([]krt.Collection[model.WorkloadAuthorization]{
AuthzDerivedPolicies,
PeerAuthDerivedPolicies,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ func (a *index) ServicesCollection(
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
waypoints krt.Collection[Waypoint],
namespaces krt.Collection[*v1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.ServiceInfo] {
ServicesInfo := krt.NewCollection(services, a.serviceServiceBuilder(waypoints, namespaces), krt.WithName("ServicesInfo"))
ServiceEntriesInfo := krt.NewManyCollection(serviceEntries, a.serviceEntryServiceBuilder(waypoints, namespaces), krt.WithName("ServiceEntriesInfo"))
ServicesInfo := krt.NewCollection(services, a.serviceServiceBuilder(waypoints, namespaces),
krt.WithName("ServicesInfo"), withDebug)
ServiceEntriesInfo := krt.NewManyCollection(serviceEntries, a.serviceEntryServiceBuilder(waypoints, namespaces),
krt.WithName("ServiceEntriesInfo"), withDebug)
WorkloadServices := krt.JoinCollection([]krt.Collection[model.ServiceInfo]{ServicesInfo, ServiceEntriesInfo}, krt.WithName("WorkloadServices"))
return WorkloadServices
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func RegisterEdsShim(
WorkloadsByServiceKey krt.Index[string, model.WorkloadInfo],
Services krt.Collection[model.ServiceInfo],
ServicesByAddress krt.Index[networkAddress, model.ServiceInfo],
withDebug krt.CollectionOption,
) {
ServiceEds := krt.NewCollection(
Services,
Expand Down Expand Up @@ -91,7 +92,7 @@ func RegisterEdsShim(
}),
}
},
krt.WithName("ServiceEds"))
krt.WithName("ServiceEds"), withDebug)
ServiceEds.RegisterBatch(
PushXds(xdsUpdater, func(svc serviceEDS) model.ConfigKey {
ns, hostname, _ := strings.Cut(svc.ServiceKey, "/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (a *index) WaypointsCollection(
gateways krt.Collection[*v1beta1.Gateway],
gatewayClasses krt.Collection[*v1beta1.GatewayClass],
pods krt.Collection[*v1.Pod],
withDebug krt.CollectionOption,
) krt.Collection[Waypoint] {
podsByNamespace := krt.NewNamespaceIndex(pods)
return krt.NewCollection(gateways, func(ctx krt.HandlerContext, gateway *v1beta1.Gateway) *Waypoint {
Expand Down Expand Up @@ -241,7 +242,7 @@ func (a *index) WaypointsCollection(
}

return a.makeWaypoint(gateway, gatewayClass, serviceAccounts, trafficType)
}, krt.WithName("Waypoints"))
}, krt.WithName("Waypoints"), withDebug)
}

func makeInboundBinding(gateway *v1beta1.Gateway, gatewayClass *v1beta1.GatewayClass) *InboundBinding {
Expand Down
11 changes: 6 additions & 5 deletions pilot/pkg/serviceregistry/kube/controller/ambient/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (a *index) WorkloadsCollection(
serviceEntries krt.Collection[*networkingclient.ServiceEntry],
endpointSlices krt.Collection[*discovery.EndpointSlice],
namespaces krt.Collection[*v1.Namespace],
withDebug krt.CollectionOption,
) krt.Collection[model.WorkloadInfo] {
WorkloadServicesNamespaceIndex := krt.NewNamespaceIndex(workloadServices)
EndpointSlicesByIPIndex := endpointSliceAddressIndex(endpointSlices)
Expand All @@ -82,20 +83,20 @@ func (a *index) WorkloadsCollection(
namespaces,
nodes,
),
krt.WithName("PodWorkloads"),
krt.WithName("PodWorkloads"), withDebug,
)
// Workloads coming from workloadEntries. These are 1:1 with WorkloadEntry.
WorkloadEntryWorkloads := krt.NewCollection(
workloadEntries,
a.workloadEntryWorkloadBuilder(meshConfig, authorizationPolicies, peerAuths, waypoints, workloadServices, WorkloadServicesNamespaceIndex, namespaces),
krt.WithName("WorkloadEntryWorkloads"),
krt.WithName("WorkloadEntryWorkloads"), withDebug,
)
// Workloads coming from serviceEntries. These are inlined workloadEntries (under `spec.endpoints`); these serviceEntries will
// also be generating `workloadapi.Service` definitions in the `ServicesCollection` logic.
ServiceEntryWorkloads := krt.NewManyCollection(
serviceEntries,
a.serviceEntryWorkloadBuilder(meshConfig, authorizationPolicies, peerAuths, waypoints, namespaces),
krt.WithName("ServiceEntryWorkloads"),
krt.WithName("ServiceEntryWorkloads"), withDebug,
)
// Workloads coming from endpointSlices. These are for *manually added* endpoints. Typically, Kubernetes will insert each pod
// into the EndpointSlice. This is because Kubernetes has 3 APIs in its model: Service, Pod, and EndpointSlice.
Expand All @@ -105,12 +106,12 @@ func (a *index) WorkloadsCollection(
EndpointSliceWorkloads := krt.NewManyCollection(
endpointSlices,
a.endpointSlicesBuilder(meshConfig, workloadServices),
krt.WithName("EndpointSliceWorkloads"))
krt.WithName("EndpointSliceWorkloads"), withDebug)

NetworkGatewayWorkloads := krt.NewManyFromNothing[model.WorkloadInfo](func(ctx krt.HandlerContext) []model.WorkloadInfo {
a.networkUpdateTrigger.MarkDependant(ctx) // Mark we depend on out of band a.Network
return slices.Map(a.LookupNetworkGateways(), convertGateway)
}, krt.WithName("NetworkGatewayWorkloads"))
}, krt.WithName("NetworkGatewayWorkloads"), withDebug)

Workloads := krt.JoinCollection([]krt.Collection[model.WorkloadInfo]{
PodWorkloads,
Expand Down
16 changes: 12 additions & 4 deletions pkg/kube/krt/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (h *manyCollection[I, O]) dump() CollectionDump {
for k, deps := range h.objectDependencies {
depss := make([]string, 0, len(deps))
for _, dep := range deps {
depss = append(depss, fmt.Sprintf("%v with filter %v", dep.collectionName, dep.filter))
depss = append(depss, dep.collectionName)
}
slices.Sort(depss)
cur := inputs[string(k)]
Expand All @@ -191,8 +191,9 @@ func (h *manyCollection[I, O]) dump() CollectionDump {
// x := h.objectDependencies[xx][0].

return CollectionDump{
Outputs: eraseMap(h.collectionState.outputs),
Inputs: inputs,
Outputs: eraseMap(h.collectionState.outputs),
Inputs: inputs,
InputCollection: h.parent.(internalCollection[I]).name(),
}
//h.log.Errorf(">>> BEGIN DUMP")
//for k, deps := range h.objectDependencies {
Expand Down Expand Up @@ -404,6 +405,13 @@ func WithStop(stop <-chan struct{}) CollectionOption {
}
}

// WithDebugging enables debugging of the collection
func WithDebugging(handler *DebugHandler) CollectionOption {
return func(c *collectionOptions) {
c.debugger = handler
}
}

// NewCollection transforms a Collection[I] to a Collection[O] by applying the provided transformation function.
// This applies for one-to-one relationships between I and O.
// For zero-to-one, use NewSingleton. For one-to-many, use NewManyCollection.
Expand Down Expand Up @@ -454,7 +462,7 @@ func newManyCollection[I, O any](cc Collection[I], hf TransformationMulti[I, O],
synced: make(chan struct{}),
stop: opts.stop,
}
RegisterCollectionForDebugging(h)
maybeRegisterCollectionForDebugging(h, opts.debugger)
go func() {
// Wait for primary dependency to be ready
if !c.Synced().WaitUntilSynced(h.stop) {
Expand Down
Loading