Skip to content

Commit

Permalink
kubelet tracing
Browse files Browse the repository at this point in the history
Signed-off-by: Sally O'Malley <somalley@redhat.com>
Co-authored-by: David Ashpole <dashpole@google.com>
  • Loading branch information
sallyom and dashpole committed Aug 1, 2022
1 parent bddb4ec commit 47e7d80
Showing 39 changed files with 683 additions and 473 deletions.
10 changes: 6 additions & 4 deletions cmd/kube-apiserver/app/options/options_test.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/spf13/pflag"
oteltrace "go.opentelemetry.io/otel/trace"
"k8s.io/apiserver/pkg/admission"
apiserveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/etcd3"
@@ -153,10 +154,11 @@ func TestAddFlags(t *testing.T) {
StorageConfig: storagebackend.Config{
Type: "etcd3",
Transport: storagebackend.TransportConfig{
ServerList: nil,
KeyFile: "/var/run/kubernetes/etcd.key",
TrustedCAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
ServerList: nil,
KeyFile: "/var/run/kubernetes/etcd.key",
TrustedCAFile: "/var/run/kubernetes/etcdca.crt",
CertFile: "/var/run/kubernetes/etcdce.crt",
TracerProvider: oteltrace.NewNoopTracerProvider(),
},
Paging: true,
Prefix: "/registry",
6 changes: 5 additions & 1 deletion cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,8 @@ import (

"github.com/spf13/cobra"

oteltrace "go.opentelemetry.io/otel/trace"

extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
@@ -417,8 +419,10 @@ func buildGenericConfig(
if genericConfig.EgressSelector != nil {
storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && genericConfig.TracerProvider != nil {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
storageFactory.StorageConfig.Transport.TracerProvider = genericConfig.TracerProvider
} else {
storageFactory.StorageConfig.Transport.TracerProvider = oteltrace.NewNoopTracerProvider()
}
if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
return
43 changes: 40 additions & 3 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
@@ -39,6 +39,10 @@ import (
"k8s.io/mount-utils"

cadvisorapi "github.com/google/cadvisor/info/v1"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
otelsdkresource "go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/semconv"
oteltrace "go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
@@ -68,6 +72,7 @@ import (
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
tracing "k8s.io/component-base/tracing"
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1"
@@ -373,6 +378,13 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
if err != nil {
return nil, err
}
tp := oteltrace.NewNoopTracerProvider()
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
tp, err = newTracerProvider(s)
if err != nil {
return nil, err
}
}
return &kubelet.Dependencies{
Auth: nil, // default does not enforce auth[nz]
CAdvisorInterface: nil, // cadvisor.New launches background processes (bg http.ListenAndServe, and some bg cleaners), not set here
@@ -381,6 +393,7 @@ func UnsecuredDependencies(s *options.KubeletServer, featureGate featuregate.Fea
KubeClient: nil,
HeartbeatClient: nil,
EventClient: nil,
TracerProvider: tp,
HostUtil: hu,
Mounter: mounter,
Subpather: subpather,
@@ -563,7 +576,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
klog.InfoS("Standalone mode, no API client")

case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, nodeName)
clientConfig, onHeartbeatFailure, err := buildKubeletClientConfig(ctx, s, kubeDeps.TracerProvider, nodeName)
if err != nil {
return err
}
@@ -790,7 +803,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend

// buildKubeletClientConfig constructs the appropriate client config for the kubelet depending on whether
// bootstrapping is enabled or client certificate rotation is enabled.
func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nodeName types.NodeName) (*restclient.Config, func(), error) {
func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, tp oteltrace.TracerProvider, nodeName types.NodeName) (*restclient.Config, func(), error) {
if s.RotateCertificates {
// Rules for client rotation and the handling of kube config files:
//
@@ -905,6 +918,9 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nod
utilnet.CloseIdleConnectionsFor(clientConfig.Transport)
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
clientConfig.Wrap(tracing.WrapperFor(tp))
}
return clientConfig, onHeartbeatFailure, nil
}

@@ -1167,7 +1183,7 @@ func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubele

// start the kubelet server
if enableServer {
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth)
go k.ListenAndServe(kubeCfg, kubeDeps.TLSOptions, kubeDeps.Auth, kubeDeps.TracerProvider)
}
if kubeCfg.ReadOnlyPort > 0 {
go k.ListenAndServeReadOnly(netutils.ParseIPSloppy(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
@@ -1250,3 +1266,24 @@ func parseResourceList(m map[string]string) (v1.ResourceList, error) {
}
return rl, nil
}

func newTracerProvider(s *options.KubeletServer) (oteltrace.TracerProvider, error) {
if s.KubeletConfiguration.Tracing == nil {
return oteltrace.NewNoopTracerProvider(), nil
}
hostname, err := nodeutil.GetHostname(s.HostnameOverride)
if err != nil {
return nil, fmt.Errorf("could not determine hostname for tracer provider: %v", err)
}
resourceOpts := []otelsdkresource.Option{
otelsdkresource.WithAttributes(
semconv.ServiceNameKey.String(componentKubelet),
semconv.HostNameKey.String(hostname),
),
}
tp, err := tracing.NewProvider(context.Background(), s.KubeletConfiguration.Tracing, []otlpgrpc.Option{}, resourceOpts)
if err != nil {
return nil, fmt.Errorf("could not configure tracer provider: %v", err)
}
return tp, nil
}
3 changes: 2 additions & 1 deletion cmd/kubemark/hollow-node.go
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/pflag"
oteltrace "go.opentelemetry.io/otel/trace"
internalapi "k8s.io/cri-api/pkg/apis"
"k8s.io/klog/v2"

@@ -246,7 +247,7 @@ func run(cmd *cobra.Command, config *hollowNodeConfig) error {
return fmt.Errorf("Failed to start fake runtime, error: %w", err)
}
defer fakeRemoteRuntime.Stop()
runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second)
runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, oteltrace.NewNoopTracerProvider())
if err != nil {
return fmt.Errorf("Failed to init runtime service, error: %w", err)
}
9 changes: 9 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
@@ -498,6 +498,13 @@ const (
// Enable POD resources API to return allocatable resources
KubeletPodResourcesGetAllocatable featuregate.Feature = "KubeletPodResourcesGetAllocatable"

// owner: @sallyom
// kep: http://kep.k8s.io/2832
// alpha: v1.25
//
// Add support for distributed tracing in the kubelet
KubeletTracing featuregate.Feature = "KubeletTracing"

// owner: @zshihang
// kep: http://kep.k8s.io/2800
// beta: v1.24
@@ -961,6 +968,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

KubeletPodResourcesGetAllocatable: {Default: true, PreRelease: featuregate.Beta},

KubeletTracing: {Default: false, PreRelease: featuregate.Alpha},

LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.Beta},

LocalStorageCapacityIsolation: {Default: true, PreRelease: featuregate.Beta},
2 changes: 1 addition & 1 deletion pkg/kubeapiserver/admission/config.go
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ type Config struct {
}

// New sets up the plugins and admission start hooks needed for admission
func (c *Config) New(proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, serviceResolver webhook.ServiceResolver, tp *trace.TracerProvider) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) {
func (c *Config) New(proxyTransport *http.Transport, egressSelector *egressselector.EgressSelector, serviceResolver webhook.ServiceResolver, tp trace.TracerProvider) ([]admission.PluginInitializer, genericapiserver.PostStartHookFunc, error) {
webhookAuthResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, egressSelector, c.LoopbackClientConfig, tp)
webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthResolverWrapper, serviceResolver)

2 changes: 2 additions & 0 deletions pkg/kubelet/apis/config/helpers_test.go
Original file line number Diff line number Diff line change
@@ -280,5 +280,7 @@ var (
"ShutdownGracePeriod.Duration",
"ShutdownGracePeriodCriticalPods.Duration",
"MemoryThrottlingFactor",
"Tracing.Endpoint",
"Tracing.SamplingRatePerMillion",
)
)
12 changes: 7 additions & 5 deletions pkg/kubelet/apis/config/register_test.go
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
componentconfigtesting "k8s.io/component-base/config/testing"
logsapi "k8s.io/component-base/logs/api/v1"
tracingapi "k8s.io/component-base/tracing/api/v1"
)

func TestComponentConfigSetup(t *testing.T) {
@@ -33,11 +34,12 @@ func TestComponentConfigSetup(t *testing.T) {
SchemeGroupVersion: SchemeGroupVersion,
AddToScheme: AddToScheme,
AllowedTags: map[reflect.Type]bool{
reflect.TypeOf(logsapi.LoggingConfiguration{}): true,
reflect.TypeOf(metav1.Duration{}): true,
reflect.TypeOf(metav1.TypeMeta{}): true,
reflect.TypeOf(v1.NodeConfigSource{}): true,
reflect.TypeOf(v1.Taint{}): true,
reflect.TypeOf(logsapi.LoggingConfiguration{}): true,
reflect.TypeOf(tracingapi.TracingConfiguration{}): true,
reflect.TypeOf(metav1.Duration{}): true,
reflect.TypeOf(metav1.TypeMeta{}): true,
reflect.TypeOf(v1.NodeConfigSource{}): true,
reflect.TypeOf(v1.Taint{}): true,
},
}

7 changes: 6 additions & 1 deletion pkg/kubelet/apis/config/types.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logsapi "k8s.io/component-base/logs/api/v1"
tracingapi "k8s.io/component-base/tracing/api/v1"
)

// HairpinMode denotes how the kubelet should configure networking to handle
@@ -441,10 +442,14 @@ type KubeletConfiguration struct {
// is true and upon the initial registration of the node.
// +optional
RegisterWithTaints []v1.Taint

// registerNode enables automatic registration with the apiserver.
// +optional
RegisterNode bool
// Tracing specifies the versioned configuration for OpenTelemetry tracing clients.
// See http://kep.k8s.io/2832 for more details.
// +featureGate=KubeletTracing
// +optional
Tracing *tracingapi.TracingConfiguration
}

// KubeletAuthorizationMode denotes the authorization mode for the kubelet
9 changes: 9 additions & 0 deletions pkg/kubelet/apis/config/validation/validation.go
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ import (
"k8s.io/component-base/featuregate"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/component-base/metrics"
tracingapi "k8s.io/component-base/tracing/api/v1"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
@@ -241,6 +242,14 @@ func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration, featur
allErrors = append(allErrors, errs.ToAggregate().Errors()...)
}

if localFeatureGate.Enabled(features.KubeletTracing) {
if errs := tracingapi.ValidateTracingConfiguration(kc.Tracing, localFeatureGate, field.NewPath("tracing")); len(errs) > 0 {
allErrors = append(allErrors, errs.ToAggregate().Errors()...)
}
} else if kc.Tracing != nil {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: tracing should not be configured if KubeletTracing feature flag is disabled."))
}

if localFeatureGate.Enabled(features.MemoryQoS) && kc.MemoryThrottlingFactor == nil {
allErrors = append(allErrors, fmt.Errorf("invalid configuration: memoryThrottlingFactor is required when MemoryQoS feature flag is enabled"))
}
31 changes: 31 additions & 0 deletions pkg/kubelet/apis/config/validation/validation_test.go
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
logsapi "k8s.io/component-base/logs/api/v1"
tracingapi "k8s.io/component-base/tracing/api/v1"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/validation"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@@ -497,6 +498,36 @@ func TestValidateKubeletConfiguration(t *testing.T) {
},
errMsg: "invalid configuration: taint.TimeAdded is not nil",
},
{
name: "specify tracing with KubeletTracing disabled",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
samplingRate := int32(99999)
conf.FeatureGates = map[string]bool{"KubeletTracing": false}
conf.Tracing = &tracingapi.TracingConfiguration{SamplingRatePerMillion: &samplingRate}
return conf
},
errMsg: "invalid configuration: tracing should not be configured if KubeletTracing feature flag is disabled.",
},
{
name: "specify tracing invalid sampling rate",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
samplingRate := int32(-1)
conf.FeatureGates = map[string]bool{"KubeletTracing": true}
conf.Tracing = &tracingapi.TracingConfiguration{SamplingRatePerMillion: &samplingRate}
return conf
},
errMsg: "tracing.samplingRatePerMillion: Invalid value: -1: sampling rate must be positive",
},
{
name: "specify tracing invalid endpoint",
configure: func(conf *kubeletconfig.KubeletConfiguration) *kubeletconfig.KubeletConfiguration {
ep := "dn%2s://localhost:4317"
conf.FeatureGates = map[string]bool{"KubeletTracing": true}
conf.Tracing = &tracingapi.TracingConfiguration{Endpoint: &ep}
return conf
},
errMsg: "tracing.endpoint: Invalid value: \"dn%2s://localhost:4317\": parse \"dn%2s://localhost:4317\": first path segment in URL cannot contain colon",
},
}

for _, tc := range cases {
25 changes: 21 additions & 4 deletions pkg/kubelet/cri/remote/remote_runtime.go
Original file line number Diff line number Diff line change
@@ -23,16 +23,20 @@ import (
"strings"
"time"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"k8s.io/klog/v2"

utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/logs/logreduction"
tracing "k8s.io/component-base/tracing"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
runtimeapiV1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/probe/exec"
utilexec "k8s.io/utils/exec"
@@ -68,7 +72,7 @@ const (
)

// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint)
addr, dialer, err := util.GetAddressAndDialer(endpoint)
if err != nil {
@@ -77,10 +81,23 @@ func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
defer cancel()

conn, err := grpc.DialContext(ctx, addr,
dialOpts := []grpc.DialOption{}
dialOpts = append(dialOpts,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(dialer),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
tracingOpts := []otelgrpc.Option{
otelgrpc.WithPropagators(tracing.Propagators()),
otelgrpc.WithTracerProvider(tp),
}
// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
dialOpts = append(dialOpts,
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
}
conn, err := grpc.DialContext(ctx, addr, dialOpts...)
if err != nil {
klog.ErrorS(err, "Connect remote runtime failed", "address", addr)
return nil, err
4 changes: 3 additions & 1 deletion pkg/kubelet/cri/remote/remote_runtime_test.go
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@ import (
"testing"
"time"

oteltrace "go.opentelemetry.io/otel/trace"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
internalapi "k8s.io/cri-api/pkg/apis"
@@ -47,7 +49,7 @@ func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, s
}

func createRemoteRuntimeService(endpoint string, t *testing.T) internalapi.RuntimeService {
runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout)
runtimeService, err := NewRemoteRuntimeService(endpoint, defaultConnectionTimeout, oteltrace.NewNoopTracerProvider())
require.NoError(t, err)

return runtimeService
Loading

0 comments on commit 47e7d80

Please sign in to comment.