Skip to content

Commit

Permalink
Fix data race in multicluster case (istio#41329)
Browse files Browse the repository at this point in the history
  • Loading branch information
SpecialYang authored Oct 11, 2022
1 parent 20e135f commit 00e93bb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 14 deletions.
4 changes: 4 additions & 0 deletions pilot/pkg/credentials/kube/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func (m *Multicluster) deleteCluster(key cluster.ID) {
}

func (m *Multicluster) ForCluster(clusterID cluster.ID) (credentials.Controller, error) {
m.m.Lock()
defer m.m.Unlock()
if _, f := m.remoteKubeControllers[clusterID]; !f {
return nil, fmt.Errorf("cluster %v is not configured", clusterID)
}
Expand All @@ -100,6 +102,8 @@ func (m *Multicluster) ForCluster(clusterID cluster.ID) (credentials.Controller,

func (m *Multicluster) AddSecretHandler(h secretHandler) {
m.secretHandlers = append(m.secretHandlers, h)
m.m.Lock()
defer m.m.Unlock()
for _, c := range m.remoteKubeControllers {
c.AddEventHandler(h)
}
Expand Down
29 changes: 15 additions & 14 deletions pilot/pkg/serviceregistry/kube/controller/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,29 +151,29 @@ func (m *Multicluster) close() (err error) {
// to watch for resources being added, deleted or changed on remote clusters.
func (m *Multicluster) ClusterAdded(cluster *multicluster.Cluster, clusterStopCh <-chan struct{}) error {
m.m.Lock()
kubeRegistry, options, configCluster, err := m.addCluster(cluster)
kubeController, kubeRegistry, options, configCluster, err := m.addCluster(cluster)
if err != nil {
m.m.Unlock()
return err
}
m.m.Unlock()
// clusterStopCh is a channel that will be closed when this cluster removed.
return m.initializeCluster(cluster, kubeRegistry, *options, configCluster, clusterStopCh)
return m.initializeCluster(cluster, kubeController, kubeRegistry, *options, configCluster, clusterStopCh)
}

// ClusterUpdated is passed to the secret controller as a callback to be called
// when a remote cluster is updated.
func (m *Multicluster) ClusterUpdated(cluster *multicluster.Cluster, stop <-chan struct{}) error {
m.m.Lock()
m.deleteCluster(cluster.ID)
kubeRegistry, options, configCluster, err := m.addCluster(cluster)
kubeController, kubeRegistry, options, configCluster, err := m.addCluster(cluster)
if err != nil {
m.m.Unlock()
return err
}
m.m.Unlock()
// clusterStopCh is a channel that will be closed when this cluster removed.
return m.initializeCluster(cluster, kubeRegistry, *options, configCluster, stop)
return m.initializeCluster(cluster, kubeController, kubeRegistry, *options, configCluster, stop)
}

// ClusterDeleted is passed to the secret controller as a callback to be called
Expand All @@ -191,9 +191,9 @@ func (m *Multicluster) ClusterDeleted(clusterID cluster.ID) error {

// addCluster adds cluster related resources and updates internal structures.
// This is not thread safe.
func (m *Multicluster) addCluster(cluster *multicluster.Cluster) (*Controller, *Options, bool, error) {
func (m *Multicluster) addCluster(cluster *multicluster.Cluster) (*kubeController, *Controller, *Options, bool, error) {
if m.closing {
return nil, nil, false, fmt.Errorf("failed adding member cluster %s: server shutting down", cluster.ID)
return nil, nil, nil, false, fmt.Errorf("failed adding member cluster %s: server shutting down", cluster.ID)
}

client := cluster.Client
Expand All @@ -214,15 +214,16 @@ func (m *Multicluster) addCluster(cluster *multicluster.Cluster) (*Controller, *
}
log.Infof("Initializing Kubernetes service registry %q", options.ClusterID)
kubeRegistry := NewController(client, options)
m.remoteKubeControllers[cluster.ID] = &kubeController{
kubeController := &kubeController{
Controller: kubeRegistry,
}
return kubeRegistry, &options, configCluster, nil
m.remoteKubeControllers[cluster.ID] = kubeController
return kubeController, kubeRegistry, &options, configCluster, nil
}

// initializeCluster initializes the cluster by setting various handlers.
func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster, kubeRegistry *Controller, options Options,
configCluster bool, clusterStopCh <-chan struct{},
func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster, kubeController *kubeController, kubeRegistry *Controller,
options Options, configCluster bool, clusterStopCh <-chan struct{},
) error {
client := cluster.Client

Expand All @@ -239,15 +240,15 @@ func (m *Multicluster) initializeCluster(cluster *multicluster.Cluster, kubeRegi
} else if features.WorkloadEntryCrossCluster {
// TODO only do this for non-remotes, can't guarantee CRDs in remotes (depends on https://github.com/istio/istio/pull/29824)
if configStore, err := createWleConfigStore(client, m.revision, options); err == nil {
m.remoteKubeControllers[cluster.ID].workloadEntryController = serviceentry.NewWorkloadEntryController(
kubeController.workloadEntryController = serviceentry.NewWorkloadEntryController(
configStore, options.XDSUpdater,
serviceentry.WithClusterID(cluster.ID),
serviceentry.WithNetworkIDCb(kubeRegistry.Network))
// Services can select WorkloadEntry from the same cluster. We only duplicate the Service to configure kube-dns.
m.remoteKubeControllers[cluster.ID].workloadEntryController.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
kubeController.workloadEntryController.AppendWorkloadHandler(kubeRegistry.WorkloadInstanceHandler)
// ServiceEntry selects WorkloadEntry from remote cluster
m.remoteKubeControllers[cluster.ID].workloadEntryController.AppendWorkloadHandler(m.serviceEntryController.WorkloadInstanceHandler)
m.opts.MeshServiceController.AddRegistryAndRun(m.remoteKubeControllers[cluster.ID].workloadEntryController, clusterStopCh)
kubeController.workloadEntryController.AppendWorkloadHandler(m.serviceEntryController.WorkloadInstanceHandler)
m.opts.MeshServiceController.AddRegistryAndRun(kubeController.workloadEntryController, clusterStopCh)
go configStore.Run(clusterStopCh)
} else {
return fmt.Errorf("failed creating config configStore for cluster %s: %v", cluster.ID, err)
Expand Down

0 comments on commit 00e93bb

Please sign in to comment.