Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup trust domain to not use globals #51205

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cni/pkg/nodeagent/ztunnelserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
v1 "k8s.io/api/core/v1"

"istio.io/istio/pkg/monitoring"
"istio.io/istio/pkg/spiffe"
"istio.io/istio/pkg/zdsapi"
)

Expand Down Expand Up @@ -278,7 +277,7 @@ func podToWorkload(pod *v1.Pod) *zdsapi.WorkloadInfo {
namespace := pod.ObjectMeta.Namespace
name := pod.ObjectMeta.Name
svcAccount := pod.Spec.ServiceAccountName
trustDomain := spiffe.GetTrustDomain()
trustDomain := ""
return &zdsapi.WorkloadInfo{
Namespace: namespace,
Name: name,
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/bootstrap/istio_ca.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *Server) RunCA(grpc *grpc.Server) {
// Add a custom authenticator using standard JWT validation, if not running in K8S
// When running inside K8S - we can use the built-in validator, which also check pod removal (invalidation).
jwtRule := v1beta1.JWTRule{Issuer: iss, Audiences: []string{aud}}
oidcAuth, err := authenticate.NewJwtAuthenticator(&jwtRule)
oidcAuth, err := authenticate.NewJwtAuthenticator(&jwtRule, nil)
if err == nil {
s.caServer.Authenticators = append(s.caServer.Authenticators, oidcAuth)
log.Info("Using out-of-cluster JWT authentication")
Expand Down
21 changes: 10 additions & 11 deletions pilot/pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
monitoringMux: http.NewServeMux(),
readinessProbes: make(map[string]readinessProbe),
readinessFlags: &readinessFlags{},
workloadTrustBundle: tb.NewTrustBundle(nil),
server: server.New(),
shutdownDuration: args.ShutdownDuration,
internalStop: make(chan struct{}),
istiodCertBundleWatcher: keycertbundle.NewWatcher(),
webhookInfo: &webhookInfo{},
}
s.workloadTrustBundle = tb.NewTrustBundle(nil, e.Watcher)

// Apply custom initialization functions.
for _, fn := range initFuncs {
Expand Down Expand Up @@ -269,7 +269,6 @@ func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
}

s.initMeshConfiguration(args, s.fileWatcher)
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
// Setup Kubernetes watch filters
// Because this relies on meshconfig, it needs to be outside initKubeClient
if s.kubeClient != nil {
Expand Down Expand Up @@ -326,7 +325,7 @@ func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
}

// Secure gRPC Server must be initialized after CA is created as may use a Citadel generated cert.
if err := s.initSecureDiscoveryService(args); err != nil {
if err := s.initSecureDiscoveryService(args, s.environment.Mesh().GetTrustDomain()); err != nil {
return nil, fmt.Errorf("error initializing secure gRPC Listener: %v", err)
}

Expand Down Expand Up @@ -358,7 +357,7 @@ func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
&authenticate.ClientCertAuthenticator{},
}
if args.JwtRule != "" {
jwtAuthn, err := initOIDC(args)
jwtAuthn, err := initOIDC(args, s.environment.Watcher)
if err != nil {
return nil, fmt.Errorf("error initializing OIDC: %v", err)
}
Expand Down Expand Up @@ -400,7 +399,7 @@ func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
return s, nil
}

func initOIDC(args *PilotArgs) (security.Authenticator, error) {
func initOIDC(args *PilotArgs, meshWatcher mesh.Watcher) (security.Authenticator, error) {
// JWTRule is from the JWT_RULE environment variable.
// An example of json string for JWTRule is:
// `{"issuer": "foo", "jwks_uri": "baz", "audiences": ["aud1", "aud2"]}`.
Expand All @@ -410,7 +409,7 @@ func initOIDC(args *PilotArgs) (security.Authenticator, error) {
return nil, fmt.Errorf("failed to unmarshal JWT rule: %v", err)
}
log.Infof("Istiod authenticating using JWTRule: %v", jwtRule)
jwtAuthn, err := authenticate.NewJwtAuthenticator(jwtRule)
jwtAuthn, err := authenticate.NewJwtAuthenticator(jwtRule, meshWatcher)
if err != nil {
return nil, fmt.Errorf("failed to create the JWT authenticator: %v", err)
}
Expand Down Expand Up @@ -739,13 +738,13 @@ func (s *Server) initGrpcServer(options *istiokeepalive.Options) {
}

// initialize secureGRPCServer.
func (s *Server) initSecureDiscoveryService(args *PilotArgs) error {
func (s *Server) initSecureDiscoveryService(args *PilotArgs, trustDomain string) error {
if args.ServerOptions.SecureGRPCAddr == "" {
log.Info("The secure discovery port is disabled, multiplexing on httpAddr ")
return nil
}

peerCertVerifier, err := s.createPeerCertVerifier(args.ServerOptions.TLSOptions)
peerCertVerifier, err := s.createPeerCertVerifier(args.ServerOptions.TLSOptions, trustDomain)
if err != nil {
return err
}
Expand Down Expand Up @@ -1018,7 +1017,7 @@ func getDNSNames(args *PilotArgs, host string) []string {
}

// createPeerCertVerifier creates a SPIFFE certificate verifier with the current istiod configuration.
func (s *Server) createPeerCertVerifier(tlsOptions TLSOptions) (*spiffe.PeerCertVerifier, error) {
func (s *Server) createPeerCertVerifier(tlsOptions TLSOptions, trustDomain string) (*spiffe.PeerCertVerifier, error) {
customTLSCertsExists, _, _, caCertPath := hasCustomTLSCerts(tlsOptions)
if !customTLSCertsExists && s.CA == nil && !s.isCADisabled() {
// Running locally without configured certs - no TLS mode
Expand Down Expand Up @@ -1047,7 +1046,8 @@ func (s *Server) createPeerCertVerifier(tlsOptions TLSOptions) (*spiffe.PeerCert
}

if len(rootCertBytes) != 0 {
err := peerCertVerifier.AddMappingFromPEM(spiffe.GetTrustDomain(), rootCertBytes)
// TODO: trustDomain here is static and will not update if it dynamically changes in mesh config
err := peerCertVerifier.AddMappingFromPEM(trustDomain, rootCertBytes)
if err != nil {
return nil, fmt.Errorf("add root CAs into peerCertVerifier failed: %v", err)
}
Expand Down Expand Up @@ -1228,7 +1228,6 @@ func (s *Server) initMeshHandlers(changeHandler func(_ *meshconfig.MeshConfig))
log.Info("initializing mesh handlers")
// When the mesh config or networks change, do a full push.
s.environment.AddMeshHandler(func() {
spiffe.SetTrustDomain(s.environment.Mesh().GetTrustDomain())
changeHandler(s.environment.Mesh())
s.XDSServer.ConfigUpdate(&model.PushRequest{
Full: true,
Expand Down
2 changes: 1 addition & 1 deletion pilot/pkg/bootstrap/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func TestInitOIDC(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
args := &PilotArgs{JwtRule: tt.jwtRule}

_, err := initOIDC(args)
_, err := initOIDC(args, nil)
gotErr := err != nil
if gotErr != tt.expectErr {
t.Errorf("expect error is %v while actual error is %v", tt.expectErr, gotErr)
Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/bootstrap/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (s *Server) initServiceControllers(args *PilotArgs) error {

s.serviceEntryController = serviceentry.NewController(
s.configController, s.XDSServer,
s.environment.Watcher,
serviceentry.WithClusterID(s.clusterID),
)
serviceControllers.AddRegistry(s.serviceEntryController)
Expand Down
3 changes: 2 additions & 1 deletion pilot/pkg/networking/core/cluster_waypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ func (cb *ClusterBuilder) buildWaypointInboundVIP(proxy *model.Proxy, svcs map[h

func (cb *ClusterBuilder) buildWaypointConnectOriginate(proxy *model.Proxy, push *model.PushContext) *cluster.Cluster {
m := &matcher.StringMatcher{}

m.MatchPattern = &matcher.StringMatcher_Prefix{
Prefix: spiffe.URIPrefix + spiffe.GetTrustDomain() + "/ns/" + proxy.Metadata.Namespace + "/sa/",
Prefix: spiffe.URIPrefix + push.Mesh.GetTrustDomain() + "/ns/" + proxy.Metadata.Namespace + "/sa/",
}
return cb.buildConnectOriginate(proxy, push, m)
}
Expand Down
3 changes: 2 additions & 1 deletion pilot/pkg/networking/core/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func NewConfigGenTest(t test.Failer, opts TestOptions) *ConfigGenTest {
}

env := model.NewEnvironment()
env.Watcher = mesh.NewFixedWatcher(m)

xdsUpdater := opts.XDSUpdater
if xdsUpdater == nil {
Expand All @@ -146,6 +147,7 @@ func NewConfigGenTest(t test.Failer, opts TestOptions) *ConfigGenTest {
se := serviceentry.NewController(
configController,
xdsUpdater,
env.Watcher,
serviceentry.WithClusterID(opts.ClusterID))
// TODO allow passing in registry, for k8s, mem reigstry
serviceDiscovery.AddRegistry(se)
Expand All @@ -165,7 +167,6 @@ func NewConfigGenTest(t test.Failer, opts TestOptions) *ConfigGenTest {
for _, reg := range opts.ServiceRegistries {
serviceDiscovery.AddRegistry(reg)
}
env.Watcher = mesh.NewFixedWatcher(m)
if opts.NetworksWatcher == nil {
opts.NetworksWatcher = mesh.NewFixedNetworksWatcher(nil)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ func PolicyCollections(
LabelSelector: model.NewSelector(i.Spec.GetSelector().GetMatchLabels()),
}
}, krt.WithName("PeerAuthDerivedPolicies"))
ImplicitWaypointPolicies := krt.NewCollection(Waypoints, implicitWaypointPolicy, krt.WithName("DefaultAllowFromWaypointPolicies"))
ImplicitWaypointPolicies := krt.NewCollection(Waypoints, func(ctx krt.HandlerContext, waypoint Waypoint) *model.WorkloadAuthorization {
return implicitWaypointPolicy(ctx, MeshConfig, waypoint)
}, krt.WithName("DefaultAllowFromWaypointPolicies"))
DefaultPolicy := krt.NewSingleton[model.WorkloadAuthorization](func(ctx krt.HandlerContext) *model.WorkloadAuthorization {
if len(krt.Fetch(ctx, PeerAuths)) == 0 {
return nil
Expand Down Expand Up @@ -107,10 +109,11 @@ func implicitWaypointPolicyName(waypoint *Waypoint) string {
return "istio_allow_waypoint_" + waypoint.Namespace + "_" + waypoint.Name
}

func implicitWaypointPolicy(ctx krt.HandlerContext, waypoint Waypoint) *model.WorkloadAuthorization {
func implicitWaypointPolicy(ctx krt.HandlerContext, MeshConfig krt.Singleton[MeshConfig], waypoint Waypoint) *model.WorkloadAuthorization {
if !features.DefaultAllowFromWaypoint || len(waypoint.ServiceAccounts) == 0 {
return nil
}
meshCfg := krt.FetchOne(ctx, MeshConfig.AsCollection())
return &model.WorkloadAuthorization{
Authorization: &security.Authorization{
Name: implicitWaypointPolicyName(&waypoint),
Expand All @@ -126,7 +129,7 @@ func implicitWaypointPolicy(ctx krt.HandlerContext, waypoint Waypoint) *model.Wo
{
Principals: slices.Map(waypoint.ServiceAccounts, func(sa string) *security.StringMatch {
return &security.StringMatch{MatchType: &security.StringMatch_Exact{
Exact: strings.TrimPrefix(spiffe.MustGenSpiffeURI(waypoint.Namespace, sa), spiffe.URIPrefix),
Exact: strings.TrimPrefix(spiffe.MustGenSpiffeURI(meshCfg.MeshConfig, waypoint.Namespace, sa), spiffe.URIPrefix),
}}
}),
},
Expand Down
11 changes: 5 additions & 6 deletions pilot/pkg/serviceregistry/kube/controller/ambient/workloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/slices"
"istio.io/istio/pkg/spiffe"
"istio.io/istio/pkg/workloadapi"
)

Expand Down Expand Up @@ -132,7 +131,7 @@ func (a *index) workloadEntryWorkloadBuilder(
AuthorizationPolicies: policies,
Status: workloadapi.WorkloadStatus_HEALTHY, // TODO: WE can be unhealthy
Waypoint: waypointAddress,
TrustDomain: pickTrustDomain(),
TrustDomain: pickTrustDomain(meshCfg),
Locality: getWorkloadEntryLocality(&wle.Spec),
}

Expand Down Expand Up @@ -229,7 +228,7 @@ func (a *index) podWorkloadBuilder(
Services: constructServices(p, services),
AuthorizationPolicies: policies,
Status: status,
TrustDomain: pickTrustDomain(),
TrustDomain: pickTrustDomain(meshCfg),
Locality: getPodLocality(ctx, Nodes, p),
}

Expand Down Expand Up @@ -323,7 +322,7 @@ func (a *index) serviceEntryWorkloadBuilder(
AuthorizationPolicies: policies,
Status: workloadapi.WorkloadStatus_HEALTHY,
Waypoint: waypointAddress,
TrustDomain: pickTrustDomain(),
TrustDomain: pickTrustDomain(meshCfg),
Locality: getWorkloadEntryLocality(wle),
}

Expand Down Expand Up @@ -355,8 +354,8 @@ func setTunnelProtocol(labels, annotations map[string]string, w *workloadapi.Wor
}
}

func pickTrustDomain() string {
if td := spiffe.GetTrustDomain(); td != "cluster.local" {
func pickTrustDomain(mesh *MeshConfig) string {
if td := mesh.GetTrustDomain(); td != "cluster.local" {
return td
}
return ""
Expand Down
10 changes: 1 addition & 9 deletions pilot/pkg/serviceregistry/kube/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ type kubernetesNode struct {

// controllerInterface is a simplified interface for the Controller used for testing.
type controllerInterface interface {
getPodLocality(pod *v1.Pod) string
Network(endpointIP string, labels labels.Instance) network.ID
Cluster() cluster.ID
}

var (
Expand Down Expand Up @@ -219,7 +217,6 @@ type Controller struct {
configCluster bool

networksHandlerRegistration *mesh.WatcherHandlerRegistration
meshHandlerRegistration *mesh.WatcherHandlerRegistration
}

// NewController creates a new Kubernetes controller
Expand Down Expand Up @@ -370,19 +367,14 @@ func (c *Controller) Cleanup() error {
c.opts.MeshNetworksWatcher.DeleteNetworksHandler(c.networksHandlerRegistration)
}

// Unregister mesh handler
if c.meshHandlerRegistration != nil {
c.opts.MeshWatcher.DeleteMeshHandler(c.meshHandlerRegistration)
}

return nil
}

func (c *Controller) onServiceEvent(pre, curr *v1.Service, event model.Event) error {
log.Debugf("Handle event %s for service %s in namespace %s", event, curr.Name, curr.Namespace)

// Create the standard (cluster.local) service.
svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster())
svcConv := kube.ConvertService(*curr, c.opts.DomainSuffix, c.Cluster(), c.meshWatcher.Mesh())

switch event {
case model.EventDelete:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2883,16 +2883,16 @@ func TestServiceUpdateNeedsPush(t *testing.T) {
name: "target ports changed",
prev: &svc,
curr: &updatedSvc,
prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
currConv: kube.ConvertService(updatedSvc, constants.DefaultClusterLocalDomain, ""),
prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", nil),
currConv: kube.ConvertService(updatedSvc, constants.DefaultClusterLocalDomain, "", nil),
expect: true,
},
testcase{
name: "target ports unchanged",
prev: &svc,
curr: &svc,
prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
currConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, ""),
prevConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", nil),
currConv: kube.ConvertService(svc, constants.DefaultClusterLocalDomain, "", nil),
expect: false,
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ type EndpointBuilder struct {
nodeName string
}

func NewEndpointBuilder(c controllerInterface, pod *v1.Pod) *EndpointBuilder {
func (c *Controller) NewEndpointBuilder(pod *v1.Pod) *EndpointBuilder {
var locality, sa, namespace, hostname, subdomain, ip, node string
var podLabels labels.Instance
if pod != nil {
locality = c.getPodLocality(pod)
sa = kube.SecureNamingSAN(pod)
sa = kube.SecureNamingSAN(pod, c.meshWatcher.Mesh())
podLabels = pod.Labels
namespace = pod.Namespace
subdomain = pod.Spec.Subdomain
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewEndpointBuilder(c controllerInterface, pod *v1.Pod) *EndpointBuilder {
return out
}

func NewEndpointBuilderFromMetadata(c controllerInterface, proxy *model.Proxy) *EndpointBuilder {
func (c *Controller) NewEndpointBuilderFromMetadata(proxy *model.Proxy) *EndpointBuilder {
locality := util.LocalityToString(proxy.Locality)
out := &EndpointBuilder{
controller: c,
Expand Down
Loading