Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support auto generation of SinkBindings identity service account #7327

Merged
merged 11 commits into from
Oct 11, 2023
6 changes: 6 additions & 0 deletions config/core/roles/webhook-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ rules:
- "create"
- "patch"

rahulii marked this conversation as resolved.
Show resolved Hide resolved
# For the SinkBinding reconciler adding the OIDC identity service accounts
- apiGroups:
- ""
resources:
- "serviceaccounts"
verbs: *everything
# Necessary for conversion webhook. These are copied from the serving
# TODO: Do we really need all these permissions?
- apiGroups: ["apiextensions.k8s.io"]
Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/sources/v1/sinkbinding_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

var sbCondSet = apis.NewLivingConditionSet(
SinkBindingConditionSinkProvided,
SinkBindingConditionOIDCIdentityCreated,
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -95,6 +96,22 @@ func (sbs *SinkBindingStatus) MarkSink(addr *duckv1.Addressable) {
}
}

func (sbs *SinkBindingStatus) MarkOIDCIdentityCreatedSucceeded() {
sbCondSet.Manage(sbs).MarkTrue(SinkBindingConditionOIDCIdentityCreated)
}

func (sbs *SinkBindingStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) {
sbCondSet.Manage(sbs).MarkTrueWithReason(SinkBindingConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (sbs *SinkBindingStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
sbCondSet.Manage(sbs).MarkFalse(SinkBindingConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (sbs *SinkBindingStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
sbCondSet.Manage(sbs).MarkUnknown(SinkBindingConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

// Do implements psbinding.Bindable
func (sb *SinkBinding) Do(ctx context.Context, ps *duckv1.WithPod) {
// First undo so that we can just unconditionally append below.
Expand Down
34 changes: 34 additions & 0 deletions pkg/apis/sources/v1/sinkbinding_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,43 @@ func TestSinkBindingStatusIsReady(t *testing.T) {
s.InitializeConditions()
s.MarkSink(sink)
s.MarkBindingAvailable()
s.MarkOIDCIdentityCreatedSucceeded()
return s
}(),
want: true,
}, {
name: "mark OIDC identity created",
s: func() *SinkBindingStatus {
s := &SinkBindingStatus{}
s.InitializeConditions()
s.MarkSink(sink)
s.MarkBindingAvailable()
s.MarkOIDCIdentityCreatedSucceeded()
return s
}(),
want: true,
}, {
name: "mark OIDC identity created with reason",
s: func() *SinkBindingStatus {
s := &SinkBindingStatus{}
s.InitializeConditions()
s.MarkSink(sink)
s.MarkBindingAvailable()
s.MarkOIDCIdentityCreatedSucceededWithReason("TheReason", "feature is disabled")
return s
}(),
want: true,
}, {
name: "mark OIDC identity created failed",
s: func() *SinkBindingStatus {
s := &SinkBindingStatus{}
s.InitializeConditions()
s.MarkSink(sink)
s.MarkBindingAvailable()
s.MarkOIDCIdentityCreatedFailed("TheReason", "this is a message")
return s
}(),
want: false,
}}

for _, test := range tests {
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/sources/v1/sinkbinding_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ const (
// SinkBindingConditionSinkProvided is configured to indicate whether the
// sink has been properly extracted from the resolver.
SinkBindingConditionSinkProvided apis.ConditionType = "SinkProvided"

// SinkBindingConditionOIDCIdentityCreated is configured to indicate whether
// the OIDC identity has been created for the sink.
SinkBindingConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
)

// SinkBindingStatus communicates the observed state of the SinkBinding (from the controller).
Expand Down
64 changes: 60 additions & 4 deletions pkg/reconciler/sinkbinding/controller.go
rahulii marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,33 @@ package sinkbinding
import (
"context"
"errors"
"fmt"

"knative.dev/eventing/pkg/auth"
sbinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1/sinkbinding"
"knative.dev/pkg/client/injection/ducks/duck/v1/podspecable"
"knative.dev/pkg/client/injection/kube/informers/core/v1/namespace"
"knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
"knative.dev/pkg/system"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
Expand All @@ -50,8 +59,11 @@ const (
)

type SinkBindingSubResourcesReconciler struct {
res *resolver.URIResolver
tracker tracker.Interface
res *resolver.URIResolver
tracker tracker.Interface
serviceAccountLister corev1listers.ServiceAccountLister
kubeclient kubernetes.Interface
featureStore *feature.Store
}

// NewController returns a new SinkBinding reconciler.
Expand All @@ -65,6 +77,9 @@ func NewController(
dc := dynamicclient.Get(ctx)
psInformerFactory := podspecable.Get(ctx)
namespaceInformer := namespace.Get(ctx)
serviceaccountInformer := serviceaccountinformer.Get(ctx)
configmapInformer := configmapinformer.Get(ctx)

c := &psbinding.BaseReconciler{
LeaderAwareFuncs: reconciler.LeaderAwareFuncs{
PromoteFunc: func(bkt reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
Expand Down Expand Up @@ -94,13 +109,22 @@ func NewController(
Logger: logger,
})

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
impl.GlobalResync(sbInformer.Informer())
})

featureStore.WatchConfigs(cmw)

sbInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
namespaceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

sbResolver := resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
c.SubResourcesReconciler = &SinkBindingSubResourcesReconciler{
res: sbResolver,
tracker: impl.Tracker,
res: sbResolver,
tracker: impl.Tracker,
kubeclient: kubeclient.Get(ctx),
serviceAccountLister: serviceaccountInformer.Lister(),
featureStore: featureStore,
}

c.WithContext = func(ctx context.Context, b psbinding.Bindable) (context.Context, error) {
Expand All @@ -114,6 +138,20 @@ func NewController(
},
}

// Reconcile SinkBinding when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&v1.SinkBinding{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

Comment on lines +141 to +146
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@creydr it's a general common not specific to this PR, but we should evaluate if it makes sense to label all of our service accounts and use a "filtered informer" so that we're not watching all service accounts into a cluster which could be quite large but only "ours"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense to label the service account 👍🏼

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point @pierDipi. I think we can do this in a separate PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #7341

// reconcile sinkindings on changes on the features configmap
configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(system.Namespace(), feature.FlagsConfigName),
Handler: controller.HandleAll(func(i interface{}) {
impl.GlobalResync(sbInformer.Informer())
}),
})
Comment on lines +147 to +153
Copy link
Member

@pierDipi pierDipi Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a cluster-wide configmap informer, we can configure a callback function when calling NewStore, that saves quite a bit of memory since that watcher is watching only configmaps in the system namespace as opposed to watching every configmap in the cluster /cc @creydr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see example in pkg/reconciler/broker/controller.go

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like featureStore.WatchConfigs(cmw) , right ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. featureStore.WatchConfigs(cmw) takes additional callback functions which get called on configmap updates. E.g.:

var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
	if globalResync != nil {
		globalResync(nil)
	}
})

with globalResync being something like:

globalResync = func(obj interface{}) {
	impl.GlobalResync(sbInformer.Informer())
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @creydr


return impl
}

Expand Down Expand Up @@ -161,6 +199,24 @@ func (s *SinkBindingSubResourcesReconciler) Reconcile(ctx context.Context, b psb
Name: sb.Spec.Sink.Ref.Name,
}, b)
}

featureFlags := s.featureStore.Load()
if featureFlags.IsOIDCAuthentication() {
saName := auth.GetOIDCServiceAccountNameForResource(v1.SchemeGroupVersion.WithKind("SinkBinding"), sb.ObjectMeta)
sb.Status.Auth = &duckv1.AuthStatus{
ServiceAccountName: &saName,
}

if err := auth.EnsureOIDCServiceAccountExistsForResource(ctx, s.serviceAccountLister, s.kubeclient, v1.SchemeGroupVersion.WithKind("SinkBinding"), sb.ObjectMeta); err != nil {
sb.Status.MarkOIDCIdentityCreatedFailed("Unable to resolve service account for OIDC authentication", "%v", err)
return err
}
sb.Status.MarkOIDCIdentityCreatedSucceeded()
} else {
sb.Status.Auth = nil
sb.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a constant string we can use across the board for this reason? /cc @creydr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet. But makes sense to introduce one, as we have this at multiple places 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that can be a part of seperate PR!

}

addr, err := s.res.AddressableFromDestinationV1(ctx, sb.Spec.Sink, sb)
if err != nil {
logging.FromContext(ctx).Errorf("Failed to get Addressable from Destination: %w", err)
Expand Down