Skip to content

Commit

Permalink
meshconfig: move over to krt (istio#54629)
Browse files Browse the repository at this point in the history
* wip

* wip

* wip

* wip

* wip

* seems to be goodish

* make it compile

* wonky

* fix tests

* fixup all issues

* more docs

* wip

* clean

* working

* more docs

* lint

* wip

* new options builder

* new global opts

* lint

* lint

* nit

* cleanup

* address comments

* address comments
  • Loading branch information
howardjohn authored Jan 21, 2025
1 parent 8fbd5e9 commit 9ac925c
Show file tree
Hide file tree
Showing 81 changed files with 1,160 additions and 1,045 deletions.
134 changes: 72 additions & 62 deletions pilot/pkg/bootstrap/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,24 @@
package bootstrap

import (
"encoding/json"
"os"

"sigs.k8s.io/yaml"

"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/kubemesh"
"istio.io/istio/pkg/config/mesh/meshwatcher"
"istio.io/istio/pkg/filewatcher"
"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/log"
"istio.io/istio/pkg/ptr"
"istio.io/istio/pkg/version"
)

const (
// defaultMeshConfigMapName is the default name of the ConfigMap with the mesh config
// The actual name can be different - use getMeshConfigMapName
defaultMeshConfigMapName = "istio"
// configMapKey should match the expected MeshConfig file name
configMapKey = "mesh"
)

// initMeshConfiguration creates the mesh in the pilotConfig from the input arguments.
Expand All @@ -46,75 +47,47 @@ const (
// - the SHARED_MESH_CONFIG config map will also be loaded and merged.
func (s *Server) initMeshConfiguration(args *PilotArgs, fileWatcher filewatcher.FileWatcher) {
log.Infof("initializing mesh configuration %v", args.MeshConfigFile)
defer func() {
if s.environment.Watcher != nil {
log.Infof("mesh configuration: %s", mesh.PrettyFormatOfMeshConfig(s.environment.Mesh()))
log.Infof("version: %s", version.Info.String())
argsdump, _ := json.MarshalIndent(args, "", " ")
log.Infof("flags: %s", argsdump)
}
}()

// Watcher will be merging more than one mesh config source?
multiWatch := features.SharedMeshConfig != ""
col := s.getMeshConfiguration(args, fileWatcher)
col.AsCollection().WaitUntilSynced(s.internalStop)
s.environment.Watcher = meshwatcher.ConfigAdapter(col)

var err error
if _, err = os.Stat(args.MeshConfigFile); !os.IsNotExist(err) {
s.environment.Watcher, err = mesh.NewFileWatcher(fileWatcher, args.MeshConfigFile, multiWatch)
if err == nil {
if multiWatch && s.kubeClient != nil {
kubemesh.AddUserMeshConfig(
s.kubeClient, s.environment.Watcher, args.Namespace, configMapKey, features.SharedMeshConfig, s.internalStop)
} else {
// Normal install no longer uses this mode - testing and special installs still use this.
log.Warnf("Using local mesh config file %s, in cluster configs ignored", args.MeshConfigFile)
}
return
}
}
log.Infof("mesh configuration: %s", meshwatcher.PrettyFormatOfMeshConfig(s.environment.Mesh()))
log.Infof("version: %s", version.Info.String())
argsdump, _ := yaml.Marshal(args)
log.Infof("flags: %s", argsdump)
}

// Config file either didn't exist or failed to load.
if s.kubeClient == nil {
// Use a default mesh.
meshConfig := mesh.DefaultMeshConfig()
s.environment.Watcher = mesh.NewFixedWatcher(meshConfig)
// getMeshConfiguration builds up MeshConfig.
func (s *Server) getMeshConfiguration(args *PilotArgs, fileWatcher filewatcher.FileWatcher) krt.Singleton[meshwatcher.MeshConfigResource] {
// We need to get mesh configuration up-front, before we start anything, so we use internalStop rather than scheduling a task to run
// later.
opts := krt.NewOptionsBuilder(s.internalStop, args.KrtDebugger)
sources := s.getConfigurationSources(args, fileWatcher, args.MeshConfigFile, kubemesh.MeshConfigKey)
if len(sources) == 0 {
log.Warnf("Using default mesh - missing file %s and no k8s client", args.MeshConfigFile)
return
}

// Watch the istio ConfigMap for mesh config changes.
// This may be necessary for external Istiod.
configMapName := getMeshConfigMapName(args.Revision)
multiWatcher := kubemesh.NewConfigMapWatcher(
s.kubeClient, args.Namespace, configMapName, configMapKey, multiWatch, s.internalStop)
s.environment.Watcher = multiWatcher
s.environment.NetworksWatcher = multiWatcher
log.Infof("initializing mesh networks from mesh config watcher")

if multiWatch {
kubemesh.AddUserMeshConfig(s.kubeClient, s.environment.Watcher, args.Namespace, configMapKey, features.SharedMeshConfig, s.internalStop)
}
return meshwatcher.NewCollection(opts, sources...)
}

// initMeshNetworks loads the mesh networks configuration from the file provided
// in the args and add a watcher for changes in this file.
func (s *Server) initMeshNetworks(args *PilotArgs, fileWatcher filewatcher.FileWatcher) {
if s.environment.NetworksWatcher != nil {
return
}
log.Info("initializing mesh networks")
if args.NetworksConfigFile != "" {
var err error
s.environment.NetworksWatcher, err = mesh.NewNetworksWatcher(fileWatcher, args.NetworksConfigFile)
if err != nil {
log.Info(err)
}
}
log.Infof("initializing mesh networks configuration %v", args.NetworksConfigFile)
col := s.getMeshNetworks(args, fileWatcher)
col.AsCollection().WaitUntilSynced(s.internalStop)
s.environment.NetworksWatcher = meshwatcher.NetworksAdapter(col)
log.Infof("mesh networks configuration: %s", meshwatcher.PrettyFormatOfMeshNetworks(s.environment.MeshNetworks()))
}

if s.environment.NetworksWatcher == nil {
log.Info("mesh networks configuration not provided")
s.environment.NetworksWatcher = mesh.NewFixedNetworksWatcher(nil)
func (s *Server) getMeshNetworks(args *PilotArgs, fileWatcher filewatcher.FileWatcher) krt.Singleton[meshwatcher.MeshNetworksResource] {
// We need to get mesh networks up-front, before we start anything, so we use internalStop rather than scheduling a task to run
// later.
opts := krt.NewOptionsBuilder(s.internalStop, args.KrtDebugger)
sources := s.getConfigurationSources(args, fileWatcher, args.NetworksConfigFile, kubemesh.MeshNetworksKey)
if len(sources) == 0 {
log.Warnf("Using default mesh networks - missing file %s and no k8s client", args.NetworksConfigFile)
}
return meshwatcher.NewNetworksCollection(opts, sources...)
}

func getMeshConfigMapName(revision string) string {
Expand All @@ -124,3 +97,40 @@ func getMeshConfigMapName(revision string) string {
}
return name + "-" + revision
}

// getConfigurationSources builds the mesh sources. This can pull MeshConfig and Meshnetworks (based on file/configmap key)
// There are a variety of possible states:
// * default + file
// * default + file + configmap
// * default + configmap
// * default + configmap + configmap
// * default
func (s *Server) getConfigurationSources(args *PilotArgs, fileWatcher filewatcher.FileWatcher, file string, cmKey string) []meshwatcher.MeshConfigSource {
opts := krt.NewOptionsBuilder(s.internalStop, args.KrtDebugger)
// Watcher will be merging more than one mesh config source?
var userMeshConfig *meshwatcher.MeshConfigSource
if features.SharedMeshConfig != "" && s.kubeClient != nil {
userMeshConfig = ptr.Of(kubemesh.NewConfigMapSource(s.kubeClient, args.Namespace, features.SharedMeshConfig, cmKey, opts))
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
fileSource, err := meshwatcher.NewFileSource(fileWatcher, file, opts)
if err == nil {
return toSources(fileSource, userMeshConfig)
}
}

if s.kubeClient == nil {
return nil
}
configMapName := getMeshConfigMapName(args.Revision)
primary := kubemesh.NewConfigMapSource(s.kubeClient, args.Namespace, configMapName, cmKey, opts)
return toSources(primary, userMeshConfig)
}

func toSources(base meshwatcher.MeshConfigSource, user *meshwatcher.MeshConfigSource) []meshwatcher.MeshConfigSource {
if user != nil {
// User configuration is applied first
return []meshwatcher.MeshConfigSource{*user, base}
}
return []meshwatcher.MeshConfigSource{base}
}
3 changes: 3 additions & 0 deletions pilot/pkg/bootstrap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"istio.io/istio/pkg/ctrlz"
"istio.io/istio/pkg/env"
"istio.io/istio/pkg/keepalive"
"istio.io/istio/pkg/kube/krt"
)

// RegistryOptions provide configuration options for the configuration controller. If FileDir is set, that directory will
Expand Down Expand Up @@ -54,6 +55,7 @@ type PilotArgs struct {
NetworksConfigFile string
RegistryOptions RegistryOptions
CtrlZOptions *ctrlz.Options
KrtDebugger *krt.DebugHandler `json:"-"`
KeepaliveOptions *keepalive.Options
ShutdownDuration time.Duration
JwtRule string
Expand Down Expand Up @@ -140,6 +142,7 @@ func (p *PilotArgs) applyDefaults() {
p.JwtRule = JwtRule
p.KeepaliveOptions = keepalive.DefaultOption()
p.RegistryOptions.ClusterRegistriesNamespace = p.Namespace
p.KrtDebugger = new(krt.DebugHandler)
}

func (p *PilotArgs) Complete() error {
Expand Down
6 changes: 5 additions & 1 deletion pilot/pkg/bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ import (
kubelib "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/inject"
"istio.io/istio/pkg/kube/kclient"
"istio.io/istio/pkg/kube/krt"
"istio.io/istio/pkg/kube/multicluster"
"istio.io/istio/pkg/kube/namespace"
"istio.io/istio/pkg/log"
Expand Down Expand Up @@ -180,6 +181,8 @@ type Server struct {
statusManager *status.Manager
// RWConfigStore is the configstore which allows updates, particularly for status.
RWConfigStore model.ConfigStoreController

krtDebugger *krt.DebugHandler
}

type readinessFlags struct {
Expand Down Expand Up @@ -249,14 +252,15 @@ func NewServer(args *PilotArgs, initFuncs ...func(*Server)) (*Server, error) {
istiodCertBundleWatcher: keycertbundle.NewWatcher(),
webhookInfo: &webhookInfo{},
metricsExporter: exporter,
krtDebugger: args.KrtDebugger,
}

// Apply custom initialization functions.
for _, fn := range initFuncs {
fn(s)
}
// Initialize workload Trust Bundle before XDS Server
s.XDSServer = xds.NewDiscoveryServer(e, args.RegistryOptions.KubeOptions.ClusterAliases)
s.XDSServer = xds.NewDiscoveryServer(e, args.RegistryOptions.KubeOptions.ClusterAliases, args.KrtDebugger)
configGen := core.NewConfigGenerator(s.XDSServer.Cache)

grpcprom.EnableHandlingTimeHistogram()
Expand Down
1 change: 1 addition & 0 deletions pilot/pkg/bootstrap/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (s *Server) initServiceControllers(args *PilotArgs) error {
func (s *Server) initKubeRegistry(args *PilotArgs) (err error) {
args.RegistryOptions.KubeOptions.ClusterID = s.clusterID
args.RegistryOptions.KubeOptions.Revision = args.Revision
args.RegistryOptions.KubeOptions.KrtDebugger = args.KrtDebugger
args.RegistryOptions.KubeOptions.Metrics = s.environment
args.RegistryOptions.KubeOptions.XDSUpdater = s.XDSServer
args.RegistryOptions.KubeOptions.MeshNetworksWatcher = s.environment.NetworksWatcher
Expand Down
4 changes: 2 additions & 2 deletions pilot/pkg/config/kube/ingress/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ import (
"istio.io/istio/pilot/pkg/model"
kubecontroller "istio.io/istio/pilot/pkg/serviceregistry/kube/controller"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/meshwatcher"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/kclient/clienttest"
"istio.io/istio/pkg/util/sets"
)

func newFakeController() (model.ConfigStoreController, kube.Client) {
meshHolder := mesh.NewTestWatcher(&meshconfig.MeshConfig{
meshHolder := meshwatcher.NewTestWatcher(&meshconfig.MeshConfig{
IngressControllerMode: meshconfig.MeshConfig_DEFAULT,
})
fakeClient := kube.NewFakeClient()
Expand Down
3 changes: 2 additions & 1 deletion pilot/pkg/config/kube/ingress/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"istio.io/api/annotation"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/meshwatcher"
kubelib "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/kclient/clienttest"
istiolog "istio.io/istio/pkg/log"
Expand Down Expand Up @@ -99,7 +100,7 @@ var testObjects = []runtime.Object{
func fakeMeshHolder(ingressService string) mesh.Watcher {
config := mesh.DefaultMeshConfig()
config.IngressService = ingressService
return mesh.NewFixedWatcher(config)
return meshwatcher.NewTestWatcher(config)
}

func makeStatusSyncer(t *testing.T, name string) *StatusSyncer {
Expand Down
4 changes: 2 additions & 2 deletions pilot/pkg/model/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"istio.io/istio/pilot/pkg/serviceregistry/provider"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/labels"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/meshwatcher"
"istio.io/istio/pkg/config/schema/gvk"
pkgtest "istio.io/istio/pkg/test"
"istio.io/istio/pkg/test/util/assert"
Expand Down Expand Up @@ -1501,7 +1501,7 @@ func getTestAuthenticationPolicies(configs []*config.Config, t *testing.T) *Auth
}
environment := &Environment{
ConfigStore: configStore,
Watcher: mesh.NewFixedWatcher(&meshconfig.MeshConfig{RootNamespace: rootNamespace}),
Watcher: meshwatcher.NewTestWatcher(&meshconfig.MeshConfig{RootNamespace: rootNamespace}),
}

return initAuthenticationPolicies(environment)
Expand Down
4 changes: 2 additions & 2 deletions pilot/pkg/model/authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"istio.io/istio/pilot/pkg/serviceregistry/provider"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/labels"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/meshwatcher"
"istio.io/istio/pkg/config/schema/collection"
"istio.io/istio/pkg/config/schema/gvk"
"istio.io/istio/pkg/util/protomarshal"
Expand Down Expand Up @@ -458,7 +458,7 @@ func createFakeAuthorizationPolicies(configs []config.Config) *AuthorizationPoli
}
environment := &Environment{
ConfigStore: store,
Watcher: mesh.NewFixedWatcher(&meshconfig.MeshConfig{RootNamespace: "istio-config"}),
Watcher: meshwatcher.NewTestWatcher(&meshconfig.MeshConfig{RootNamespace: "istio-config"}),
}
authzPolicies := GetAuthorizationPolicies(environment)
return authzPolicies
Expand Down
3 changes: 2 additions & 1 deletion pilot/pkg/model/cluster_local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/config/host"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/meshwatcher"
)

func TestIsClusterLocal(t *testing.T) {
Expand Down Expand Up @@ -265,7 +266,7 @@ func TestIsClusterLocal(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
g := NewWithT(t)

env := &model.Environment{Watcher: mesh.NewFixedWatcher(c.m)}
env := &model.Environment{Watcher: meshwatcher.NewTestWatcher(c.m)}
env.Init()

clusterLocal := env.ClusterLocal().GetClusterLocalHosts().IsClusterLocal(host.Name(c.host))
Expand Down
6 changes: 5 additions & 1 deletion pilot/pkg/model/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/config/host"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/meshwatcher"
"istio.io/istio/pkg/config/protocol"
"istio.io/istio/pkg/maps"
pm "istio.io/istio/pkg/model"
Expand Down Expand Up @@ -93,6 +94,9 @@ func NewEnvironment() *Environment {
}
}

// Watcher is a type alias to keep the embedded type name stable.
type Watcher = meshwatcher.WatcherCollection

// Environment provides an aggregate environmental API for Pilot
type Environment struct {
// Discovery interface for listing services and instances.
Expand All @@ -102,7 +106,7 @@ type Environment struct {
ConfigStore

// Watcher is the watcher for the mesh config (to be merged into the config store)
mesh.Watcher
Watcher

// NetworksWatcher (loaded from a config map) provides information about the
// set of networks inside a mesh and how to route to endpoints in each
Expand Down
4 changes: 2 additions & 2 deletions pilot/pkg/model/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/serviceregistry/memory"
"istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake"
"istio.io/istio/pkg/config/mesh"
"istio.io/istio/pkg/config/mesh/meshwatcher"
"istio.io/istio/pkg/test"
"istio.io/istio/pkg/test/scopes"
"istio.io/istio/pkg/test/util/assert"
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestGatewayHostnames(t *testing.T) {
}
})

meshNetworks := mesh.NewFixedNetworksWatcher(nil)
meshNetworks := meshwatcher.NewFixedNetworksWatcher(nil)
xdsUpdater := xdsfake.NewFakeXDS()
env := &model.Environment{NetworksWatcher: meshNetworks, ServiceDiscovery: memory.NewServiceDiscovery()}
if err := env.InitNetworksManager(xdsUpdater); err != nil {
Expand Down
Loading

0 comments on commit 9ac925c

Please sign in to comment.