diff --git a/pilot/pkg/model/service.go b/pilot/pkg/model/service.go index c99b33c67cb5..137112ff317b 100644 --- a/pilot/pkg/model/service.go +++ b/pilot/pkg/model/service.go @@ -514,6 +514,52 @@ func (ep *IstioEndpoint) IsDiscoverableFromProxy(p *Proxy) bool { return ep.DiscoverabilityPolicy.IsDiscoverableFromProxy(ep, p) } +// MetadataClone returns the cloned endpoint metadata used for telemetry purposes. +// This should be used when the endpoint labels should be updated. +func (ep *IstioEndpoint) MetadataClone() *EndpointMetadata { + return &EndpointMetadata{ + Network: ep.Network, + TLSMode: ep.TLSMode, + WorkloadName: ep.WorkloadName, + Namespace: ep.Namespace, + Labels: maps.Clone(ep.Labels), + ClusterID: ep.Locality.ClusterID, + } +} + +// Metadata returns the endpoint metadata used for telemetry purposes. +func (ep *IstioEndpoint) Metadata() *EndpointMetadata { + return &EndpointMetadata{ + Network: ep.Network, + TLSMode: ep.TLSMode, + WorkloadName: ep.WorkloadName, + Namespace: ep.Namespace, + Labels: ep.Labels, + ClusterID: ep.Locality.ClusterID, + } +} + +// EndpointMetadata represents metadata set on Envoy LbEndpoint used for telemetry purposes. +type EndpointMetadata struct { + // Network holds the network where this endpoint is present + Network network.ID + + // TLSMode endpoint is injected with istio sidecar and ready to configure Istio mTLS + TLSMode string + + // Name of the workload that this endpoint belongs to. This is for telemetry purpose. + WorkloadName string + + // Namespace that this endpoint belongs to. This is for telemetry purpose. + Namespace string + + // Labels points to the workload or deployment labels. + Labels labels.Instance + + // ClusterID where the endpoint is located + ClusterID cluster.ID +} + // EndpointDiscoverabilityPolicy determines the discoverability of an endpoint throughout the mesh. type EndpointDiscoverabilityPolicy interface { // IsDiscoverableFromProxy indicates whether an endpoint is discoverable from the given Proxy. diff --git a/pilot/pkg/networking/core/v1alpha3/cluster_builder.go b/pilot/pkg/networking/core/v1alpha3/cluster_builder.go index ccbf2412488e..f9a15bbf2710 100644 --- a/pilot/pkg/networking/core/v1alpha3/cluster_builder.go +++ b/pilot/pkg/networking/core/v1alpha3/cluster_builder.go @@ -569,25 +569,23 @@ func (cb *ClusterBuilder) buildLocalityLbEndpoints(proxyView model.ProxyView, se }, Metadata: &core.Metadata{}, } + var metadata *model.EndpointMetadata - labels := instance.Endpoint.Labels - ns := instance.Endpoint.Namespace if features.CanonicalServiceForMeshExternalServiceEntry && service.MeshExternal { - ns = service.Attributes.Namespace svcLabels := service.Attributes.Labels if _, ok := svcLabels[model.IstioCanonicalServiceLabelName]; ok { - labels = map[string]string{ - model.IstioCanonicalServiceLabelName: svcLabels[model.IstioCanonicalServiceLabelName], - model.IstioCanonicalServiceRevisionLabelName: svcLabels[model.IstioCanonicalServiceRevisionLabelName], - } - for k, v := range instance.Endpoint.Labels { - labels[k] = v - } + metadata = instance.Endpoint.MetadataClone() + metadata.Labels[model.IstioCanonicalServiceLabelName] = svcLabels[model.IstioCanonicalServiceLabelName] + metadata.Labels[model.IstioCanonicalServiceRevisionLabelName] = svcLabels[model.IstioCanonicalServiceRevisionLabelName] + } else { + metadata = instance.Endpoint.Metadata() } + metadata.Namespace = service.Attributes.Namespace + } else { + metadata = instance.Endpoint.Metadata() } - util.AppendLbEndpointMetadata(instance.Endpoint.Network, instance.Endpoint.TLSMode, instance.Endpoint.WorkloadName, - ns, instance.Endpoint.Locality.ClusterID, labels, ep.Metadata) + util.AppendLbEndpointMetadata(metadata, ep.Metadata) locality := instance.Endpoint.Locality.Label lbEndpoints[locality] = append(lbEndpoints[locality], ep) diff --git a/pilot/pkg/networking/core/v1alpha3/cluster_builder_test.go b/pilot/pkg/networking/core/v1alpha3/cluster_builder_test.go index a8364ffe3d1d..c9d4c1a0e407 100644 --- a/pilot/pkg/networking/core/v1alpha3/cluster_builder_test.go +++ b/pilot/pkg/networking/core/v1alpha3/cluster_builder_test.go @@ -20,6 +20,7 @@ import ( "reflect" "sort" "strings" + "sync" "testing" "time" @@ -1108,7 +1109,14 @@ func TestBuildLocalityLbEndpoints(t *testing.T) { clusterID istiocluster.ID, lbls labels.Instance, ) *core.Metadata { newmeta := &core.Metadata{} - util.AppendLbEndpointMetadata(networkID, tlsMode, workloadname, namespace, clusterID, lbls, newmeta) + util.AppendLbEndpointMetadata(&model.EndpointMetadata{ + Network: networkID, + TLSMode: tlsMode, + WorkloadName: workloadname, + Namespace: namespace, + ClusterID: clusterID, + Labels: lbls, + }, newmeta) return newmeta } @@ -1505,6 +1513,213 @@ func TestBuildLocalityLbEndpoints(t *testing.T) { } } +func TestConcurrentBuildLocalityLbEndpoints(t *testing.T) { + test.SetForTest(t, &features.CanonicalServiceForMeshExternalServiceEntry, true) + proxy := &model.Proxy{ + Metadata: &model.NodeMetadata{ + ClusterID: "cluster-1", + }, + } + servicePort := &model.Port{ + Name: "default", + Port: 8080, + Protocol: protocol.HTTP, + } + service := &model.Service{ + Hostname: host.Name("*.example.org"), + Ports: model.PortList{servicePort}, + Attributes: model.ServiceAttributes{ + Name: "TestService", + Namespace: "test-ns", + Labels: map[string]string{"service.istio.io/canonical-name": "example-service"}, + }, + MeshExternal: true, + Resolution: model.DNSLB, + } + + buildMetadata := func(networkID network.ID, tlsMode, workloadname, namespace string, + clusterID istiocluster.ID, lbls labels.Instance, + ) *core.Metadata { + newmeta := &core.Metadata{} + util.AppendLbEndpointMetadata(&model.EndpointMetadata{ + Network: networkID, + TLSMode: tlsMode, + WorkloadName: workloadname, + Namespace: namespace, + ClusterID: clusterID, + Labels: lbls, + }, newmeta) + return newmeta + } + + lbls := labels.Instance{"version": "v1"} + + instances := []*model.ServiceInstance{ + { + Service: service, + ServicePort: servicePort, + Endpoint: &model.IstioEndpoint{ + Address: "192.168.1.1", + EndpointPort: 10001, + WorkloadName: "workload-1", + Namespace: "namespace-1", + Labels: map[string]string{ + "version": "v1", + "app": "example", + }, + Locality: model.Locality{ + ClusterID: "cluster-1", + Label: "region1/zone1/subzone1", + }, + LbWeight: 30, + Network: "nw-0", + }, + }, + { + Service: service, + ServicePort: servicePort, + Endpoint: &model.IstioEndpoint{ + Address: "192.168.1.2", + EndpointPort: 10001, + WorkloadName: "workload-2", + Namespace: "namespace-2", + Labels: map[string]string{ + "version": "v2", + "app": "example", + }, + Locality: model.Locality{ + ClusterID: "cluster-2", + Label: "region1/zone1/subzone1", + }, + LbWeight: 30, + Network: "nw-1", + }, + }, + { + Service: service, + ServicePort: servicePort, + Endpoint: &model.IstioEndpoint{ + Address: "192.168.1.3", + EndpointPort: 10001, + WorkloadName: "workload-3", + Namespace: "namespace-3", + Labels: map[string]string{ + "version": "v3", + "app": "example", + }, + Locality: model.Locality{ + ClusterID: "cluster-3", + Label: "region2/zone1/subzone1", + }, + LbWeight: 40, + Network: "", + }, + }, + { + Service: service, + ServicePort: servicePort, + Endpoint: &model.IstioEndpoint{ + Address: "192.168.1.4", + EndpointPort: 10001, + WorkloadName: "workload-1", + Namespace: "namespace-1", + Labels: map[string]string{ + "version": "v4", + "app": "example", + }, + Locality: model.Locality{ + ClusterID: "cluster-1", + Label: "region1/zone1/subzone1", + }, + LbWeight: 30, + Network: "filtered-out", + }, + }, + } + + updatedLbls := labels.Instance{ + "app": "example", + model.IstioCanonicalServiceLabelName: "example-service", + } + expected := []*endpoint.LocalityLbEndpoints{ + { + Locality: &core.Locality{ + Region: "region1", + Zone: "zone1", + SubZone: "subzone1", + }, + LoadBalancingWeight: &wrappers.UInt32Value{ + Value: 30, + }, + LbEndpoints: []*endpoint.LbEndpoint{ + { + HostIdentifier: &endpoint.LbEndpoint_Endpoint{ + Endpoint: &endpoint.Endpoint{ + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Address: "192.168.1.1", + PortSpecifier: &core.SocketAddress_PortValue{ + PortValue: 10001, + }, + }, + }, + }, + }, + }, + Metadata: buildMetadata("nw-0", "", "workload-1", "test-ns", "cluster-1", updatedLbls), + LoadBalancingWeight: &wrappers.UInt32Value{ + Value: 30, + }, + }, + }, + }, + } + + sortEndpoints := func(endpoints []*endpoint.LocalityLbEndpoints) { + sort.SliceStable(endpoints, func(i, j int) bool { + if strings.Compare(endpoints[i].Locality.Region, endpoints[j].Locality.Region) < 0 { + return true + } + if strings.Compare(endpoints[i].Locality.Zone, endpoints[j].Locality.Zone) < 0 { + return true + } + return strings.Compare(endpoints[i].Locality.SubZone, endpoints[j].Locality.SubZone) < 0 + }) + } + + cg := NewConfigGenTest(t, TestOptions{ + MeshConfig: testMesh(), + Services: []*model.Service{service}, + Instances: instances, + }) + + cb := NewClusterBuilder(cg.SetupProxy(proxy), &model.PushRequest{Push: cg.PushContext()}, nil) + view := (&model.Proxy{ + Metadata: &model.NodeMetadata{ + RequestedNetworkView: []string{"nw-0", "nw-1"}, + }, + }).GetView() + wg := sync.WaitGroup{} + wg.Add(5) + var actual []*endpoint.LocalityLbEndpoints + mu := sync.Mutex{} + for i := 0; i < 5; i++ { + go func() { + eps := cb.buildLocalityLbEndpoints(view, service, 8080, lbls) + mu.Lock() + actual = eps + mu.Unlock() + wg.Done() + }() + } + wg.Wait() + sortEndpoints(actual) + if v := cmp.Diff(expected, actual, protocmp.Transform()); v != "" { + t.Fatalf("Expected (-) != actual (+):\n%s", v) + } +} + func TestBuildPassthroughClusters(t *testing.T) { cases := []struct { name string diff --git a/pilot/pkg/networking/util/util.go b/pilot/pkg/networking/util/util.go index 98e032e4a384..12fb6f3a926b 100644 --- a/pilot/pkg/networking/util/util.go +++ b/pilot/pkg/networking/util/util.go @@ -47,11 +47,8 @@ import ( istionetworking "istio.io/istio/pilot/pkg/networking" "istio.io/istio/pilot/pkg/serviceregistry/util/label" "istio.io/istio/pilot/pkg/util/protoconv" - "istio.io/istio/pkg/cluster" "istio.io/istio/pkg/config" - "istio.io/istio/pkg/config/labels" kubelabels "istio.io/istio/pkg/kube/labels" - "istio.io/istio/pkg/network" "istio.io/istio/pkg/proto/merge" "istio.io/istio/pkg/util/strcase" "istio.io/pkg/log" @@ -476,22 +473,20 @@ func MergeAnyWithAny(dst *anypb.Any, src *anypb.Any) (*anypb.Any, error) { } // AppendLbEndpointMetadata adds metadata values to a lb endpoint using the passed in metadata as base. -func AppendLbEndpointMetadata(networkID network.ID, tlsMode, workloadname, namespace string, - clusterID cluster.ID, lbls labels.Instance, metadata *core.Metadata, +func AppendLbEndpointMetadata(istioMetadata *model.EndpointMetadata, envoyMetadata *core.Metadata, ) { - if networkID == "" && (tlsMode == "" || tlsMode == model.DisabledTLSModeLabel) && - (!features.EndpointTelemetryLabel || !features.EnableTelemetryLabel) { + if !features.EndpointTelemetryLabel || !features.EnableTelemetryLabel { return } - if metadata.FilterMetadata == nil { - metadata.FilterMetadata = map[string]*structpb.Struct{} + if envoyMetadata.FilterMetadata == nil { + envoyMetadata.FilterMetadata = map[string]*structpb.Struct{} } - if tlsMode != "" && tlsMode != model.DisabledTLSModeLabel { - metadata.FilterMetadata[EnvoyTransportSocketMetadataKey] = &structpb.Struct{ + if istioMetadata.TLSMode != "" && istioMetadata.TLSMode != model.DisabledTLSModeLabel { + envoyMetadata.FilterMetadata[EnvoyTransportSocketMetadataKey] = &structpb.Struct{ Fields: map[string]*structpb.Value{ - model.TLSModeLabelShortname: {Kind: &structpb.Value_StringValue{StringValue: tlsMode}}, + model.TLSModeLabelShortname: {Kind: &structpb.Value_StringValue{StringValue: istioMetadata.TLSMode}}, }, } } @@ -503,7 +498,7 @@ func AppendLbEndpointMetadata(networkID network.ID, tlsMode, workloadname, names // workload-name;namespace;canonical-service-name;canonical-service-revision;cluster-id. if features.EndpointTelemetryLabel { // allow defaulting for non-injected cases - canonicalName, canonicalRevision := kubelabels.CanonicalService(lbls, workloadname) + canonicalName, canonicalRevision := kubelabels.CanonicalService(istioMetadata.Labels, istioMetadata.WorkloadName) // don't bother sending the default value in config if canonicalRevision == "latest" { @@ -511,16 +506,16 @@ func AppendLbEndpointMetadata(networkID network.ID, tlsMode, workloadname, names } var sb strings.Builder - sb.WriteString(workloadname) + sb.WriteString(istioMetadata.WorkloadName) sb.WriteString(";") - sb.WriteString(namespace) + sb.WriteString(istioMetadata.Namespace) sb.WriteString(";") sb.WriteString(canonicalName) sb.WriteString(";") sb.WriteString(canonicalRevision) sb.WriteString(";") - sb.WriteString(clusterID.String()) - addIstioEndpointLabel(metadata, "workload", &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: sb.String()}}) + sb.WriteString(istioMetadata.ClusterID.String()) + addIstioEndpointLabel(envoyMetadata, "workload", &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: sb.String()}}) } } diff --git a/pilot/pkg/networking/util/util_test.go b/pilot/pkg/networking/util/util_test.go index 323f5f70b08a..7fc281b6b62e 100644 --- a/pilot/pkg/networking/util/util_test.go +++ b/pilot/pkg/networking/util/util_test.go @@ -36,11 +36,9 @@ import ( "istio.io/istio/pilot/pkg/features" "istio.io/istio/pilot/pkg/model" "istio.io/istio/pilot/pkg/util/protoconv" - "istio.io/istio/pkg/cluster" "istio.io/istio/pkg/config" "istio.io/istio/pkg/config/labels" "istio.io/istio/pkg/config/schema/gvk" - "istio.io/istio/pkg/network" "istio.io/istio/pkg/test" ) @@ -911,21 +909,18 @@ func TestCidrRangeSliceEqual(t *testing.T) { func TestEndpointMetadata(t *testing.T) { test.SetForTest(t, &features.EndpointTelemetryLabel, true) cases := []struct { - name string - network network.ID - tlsMode string - workloadName string - clusterID cluster.ID - namespace string - labels labels.Instance - want *core.Metadata + name string + metadata *model.EndpointMetadata + want *core.Metadata }{ { - name: "all empty", - tlsMode: model.DisabledTLSModeLabel, - network: "", - workloadName: "", - clusterID: "", + name: "all empty", + metadata: &model.EndpointMetadata{ + TLSMode: model.DisabledTLSModeLabel, + Network: "", + WorkloadName: "", + ClusterID: "", + }, want: &core.Metadata{ FilterMetadata: map[string]*structpb.Struct{ IstioMetadataKey: { @@ -941,11 +936,13 @@ func TestEndpointMetadata(t *testing.T) { }, }, { - name: "tls mode", - tlsMode: model.IstioMutualTLSModeLabel, - network: "", - workloadName: "", - clusterID: "", + name: "tls mode", + metadata: &model.EndpointMetadata{ + TLSMode: model.IstioMutualTLSModeLabel, + Network: "", + WorkloadName: "", + ClusterID: "", + }, want: &core.Metadata{ FilterMetadata: map[string]*structpb.Struct{ EnvoyTransportSocketMetadataKey: { @@ -970,11 +967,13 @@ func TestEndpointMetadata(t *testing.T) { }, }, { - name: "network and tls mode", - tlsMode: model.IstioMutualTLSModeLabel, - network: "network", - workloadName: "", - clusterID: "", + name: "network and tls mode", + metadata: &model.EndpointMetadata{ + TLSMode: model.IstioMutualTLSModeLabel, + Network: "network", + WorkloadName: "", + ClusterID: "", + }, want: &core.Metadata{ FilterMetadata: map[string]*structpb.Struct{ EnvoyTransportSocketMetadataKey: { @@ -999,15 +998,17 @@ func TestEndpointMetadata(t *testing.T) { }, }, { - name: "all label", - tlsMode: model.IstioMutualTLSModeLabel, - network: "network", - workloadName: "workload", - clusterID: "cluster", - namespace: "default", - labels: labels.Instance{ - model.IstioCanonicalServiceLabelName: "service", - model.IstioCanonicalServiceRevisionLabelName: "v1", + name: "all label", + metadata: &model.EndpointMetadata{ + TLSMode: model.IstioMutualTLSModeLabel, + Network: "network", + WorkloadName: "workload", + ClusterID: "cluster", + Namespace: "default", + Labels: labels.Instance{ + model.IstioCanonicalServiceLabelName: "service", + model.IstioCanonicalServiceRevisionLabelName: "v1", + }, }, want: &core.Metadata{ FilterMetadata: map[string]*structpb.Struct{ @@ -1033,12 +1034,14 @@ func TestEndpointMetadata(t *testing.T) { }, }, { - name: "miss pod label", - tlsMode: model.IstioMutualTLSModeLabel, - network: "network", - workloadName: "workload", - clusterID: "cluster", - namespace: "default", + name: "miss pod label", + metadata: &model.EndpointMetadata{ + TLSMode: model.IstioMutualTLSModeLabel, + Network: "network", + WorkloadName: "workload", + ClusterID: "cluster", + Namespace: "default", + }, want: &core.Metadata{ FilterMetadata: map[string]*structpb.Struct{ EnvoyTransportSocketMetadataKey: { @@ -1063,12 +1066,14 @@ func TestEndpointMetadata(t *testing.T) { }, }, { - name: "miss workload name", - tlsMode: model.IstioMutualTLSModeLabel, - network: "network", - workloadName: "", - clusterID: "cluster", - namespace: "", + name: "miss workload name", + metadata: &model.EndpointMetadata{ + TLSMode: model.IstioMutualTLSModeLabel, + Network: "network", + WorkloadName: "", + ClusterID: "cluster", + Namespace: "", + }, want: &core.Metadata{ FilterMetadata: map[string]*structpb.Struct{ EnvoyTransportSocketMetadataKey: { @@ -1096,7 +1101,7 @@ func TestEndpointMetadata(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { input := &core.Metadata{} - AppendLbEndpointMetadata(tt.network, tt.tlsMode, tt.workloadName, tt.namespace, tt.clusterID, tt.labels, input) + AppendLbEndpointMetadata(tt.metadata, input) if !reflect.DeepEqual(input, tt.want) { t.Errorf("Unexpected Endpoint metadata got %v, want %v", input, tt.want) } diff --git a/pilot/pkg/xds/endpoint_builder.go b/pilot/pkg/xds/endpoint_builder.go index 1ba7b5623ebf..5fe7697139ab 100644 --- a/pilot/pkg/xds/endpoint_builder.go +++ b/pilot/pkg/xds/endpoint_builder.go @@ -402,7 +402,7 @@ func buildEnvoyLbEndpoint(b *EndpointBuilder, e *model.IstioEndpoint) *endpoint. // Istio telemetry depends on the metadata value being set for endpoints in the mesh. // Istio endpoint level tls transport socket configuration depends on this logic // Do not remove pilot/pkg/xds/fake.go - util.AppendLbEndpointMetadata(e.Network, e.TLSMode, e.WorkloadName, e.Namespace, e.Locality.ClusterID, e.Labels, ep.Metadata) + util.AppendLbEndpointMetadata(e.Metadata(), ep.Metadata) address, port := e.Address, e.EndpointPort tunnelAddress, tunnelPort := address, model.HBoneInboundListenPort diff --git a/pilot/pkg/xds/ep_filters.go b/pilot/pkg/xds/ep_filters.go index 1b96743abf53..03e6984846ae 100644 --- a/pilot/pkg/xds/ep_filters.go +++ b/pilot/pkg/xds/ep_filters.go @@ -146,8 +146,12 @@ func (b *EndpointBuilder) EndpointsByNetworkFilter(endpoints []*LocalityEndpoint Metadata: &core.Metadata{}, } // TODO: figure out a way to extract locality data from the gateway public endpoints in meshNetworks - util.AppendLbEndpointMetadata(gw.Network, model.IstioMutualTLSModeLabel, - "", "", b.clusterID, labels.Instance{}, gwEp.Metadata) + util.AppendLbEndpointMetadata(&model.EndpointMetadata{ + Network: gw.Network, + TLSMode: model.IstioMutualTLSModeLabel, + ClusterID: b.clusterID, + Labels: labels.Instance{}, + }, gwEp.Metadata) // Currently gateway endpoint does not support tunnel. lbEndpoints.append(gwIstioEp, gwEp) }