diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index e671c7af337b7..9d070e3153a6c 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1566,11 +1566,14 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con return err } - // Do a zero-byte write to stdout before handing off to the container runtime. - // This ensures at least one Write call is made to the writer when copying starts, - // even if we then block waiting for log output from the container. - if _, err := stdout.Write([]byte{}); err != nil { - return err + // Since v1.32, stdout may be nil if the stream is not requested. + if stdout != nil { + // Do a zero-byte write to stdout before handing off to the container runtime. + // This ensures at least one Write call is made to the writer when copying starts, + // even if we then block waiting for log output from the container. + if _, err := stdout.Write([]byte{}); err != nil { + return err + } } return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr) diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 94b282f7a0184..46ac2403d1d60 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -41,9 +41,9 @@ import ( oteltrace "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/kubelet/metrics/collectors" "k8s.io/utils/clock" netutils "k8s.io/utils/net" + "k8s.io/utils/ptr" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -83,6 +83,7 @@ import ( apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc" "k8s.io/kubernetes/pkg/kubelet/apis/podresources" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/metrics/collectors" "k8s.io/kubernetes/pkg/kubelet/prober" servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics" "k8s.io/kubernetes/pkg/kubelet/server/stats" @@ -723,6 +724,13 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`)) return } + if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) { + // Even with defaulters, logOptions.Stream can be nil if no arguments are provided at all. + if logOptions.Stream == nil { + // Default to "All" to maintain backward compatibility. + logOptions.Stream = ptr.To(v1.LogStreamAll) + } + } logOptions.TypeMeta = metav1.TypeMeta{} if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 { response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`)) @@ -744,9 +752,40 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response))) return } - fw := flushwriter.Wrap(response.ResponseWriter) + + var ( + stdout io.Writer + stderr io.Writer + fw = flushwriter.Wrap(response.ResponseWriter) + ) + if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) { + wantedStream := logOptions.Stream + // No stream type specified, default to All + if wantedStream == nil { + allStream := v1.LogStreamAll + wantedStream = &allStream + } + switch *wantedStream { + case v1.LogStreamStdout: + stdout, stderr = fw, nil + case v1.LogStreamStderr: + stdout, stderr = nil, fw + case v1.LogStreamAll: + stdout, stderr = fw, fw + default: + _ = response.WriteError(http.StatusBadRequest, fmt.Errorf("invalid stream type %q", *logOptions.Stream)) + return + } + } else { + if logOptions.Stream != nil && *logOptions.Stream != v1.LogStreamAll { + _ = response.WriteError(http.StatusBadRequest, fmt.Errorf("unable to return the given log stream: %q. Please enable PodLogsQuerySplitStreams feature gate in kubelet", *logOptions.Stream)) + return + } + stdout, stderr = fw, fw + } + response.Header().Set("Transfer-Encoding", "chunked") - if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil { + if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, stdout, stderr); err != nil { response.WriteError(http.StatusBadRequest, err) return } diff --git a/pkg/kubelet/server/server_test.go b/pkg/kubelet/server/server_test.go index be1d9e922d477..a79e71742bfea 100644 --- a/pkg/kubelet/server/server_test.go +++ b/pkg/kubelet/server/server_test.go @@ -37,6 +37,8 @@ import ( cadvisorapiv2 "github.com/google/cadvisor/info/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "k8s.io/utils/ptr" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -50,7 +52,6 @@ import ( runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/utils/ptr" // Do some initialization to decode the query parameters correctly. "k8s.io/apiserver/pkg/server/healthz" @@ -820,29 +821,41 @@ func TestContainerLogs(t *testing.T) { } for desc, test := range tests { - t.Run(desc, func(t *testing.T) { - output := "foo bar" - podNamespace := "other" - podName := "foo" - expectedPodName := getPodName(podName, podNamespace) - expectedContainerName := "baz" - setPodByNameFunc(fw, podNamespace, podName, expectedContainerName) - setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, test.podLogOption, output) - resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query) - if err != nil { - t.Errorf("Got error GETing: %v", err) - } - defer resp.Body.Close() + // To make sure the original behavior doesn't change no matter the feature PodLogsQuerySplitStreams is enabled or not. + for _, enablePodLogsQuerySplitStreams := range []bool{true, false} { + t.Run(fmt.Sprintf("%s (enablePodLogsQuerySplitStreams=%v)", desc, enablePodLogsQuerySplitStreams), func(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, enablePodLogsQuerySplitStreams) + expectedLogOptions := test.podLogOption.DeepCopy() + if enablePodLogsQuerySplitStreams && expectedLogOptions.Stream == nil { + // The HTTP handler will internally set the default stream value. + expectedLogOptions.Stream = ptr.To(v1.LogStreamAll) + } - body, err := io.ReadAll(resp.Body) - if err != nil { - t.Errorf("Error reading container logs: %v", err) - } - result := string(body) - if result != output { - t.Errorf("Expected: '%v', got: '%v'", output, result) - } - }) + output := "foo bar" + podNamespace := "other" + podName := "foo" + expectedPodName := getPodName(podName, podNamespace) + expectedContainerName := "baz" + setPodByNameFunc(fw, podNamespace, podName, expectedContainerName) + setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, expectedLogOptions, output) + resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query) + if err != nil { + t.Errorf("Got error GETing: %v", err) + } + defer func() { + _ = resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Errorf("Error reading container logs: %v", err) + } + result := string(body) + if result != output { + t.Errorf("Expected: '%v', got: '%v'", output, result) + } + }) + } } } @@ -866,6 +879,220 @@ func TestContainerLogsWithInvalidTail(t *testing.T) { } } +func TestContainerLogsWithSeparateStream(t *testing.T) { + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, true) + + type logEntry struct { + stream string + msg string + } + + fw := newServerTest() + defer fw.testHTTPServer.Close() + + var ( + streamStdout = v1.LogStreamStdout + streamStderr = v1.LogStreamStderr + streamAll = v1.LogStreamAll + ) + + testCases := []struct { + name string + query string + logs []logEntry + expectedOutput string + expectedLogOptions *v1.PodLogOptions + }{ + { + // Defaulters don't work if the query is empty. + // See also https://github.com/kubernetes/kubernetes/issues/128589 + name: "empty query should return all logs", + logs: []logEntry{ + {stream: v1.LogStreamStdout, msg: "foo\n"}, + {stream: v1.LogStreamStderr, msg: "bar\n"}, + }, + query: "", + expectedLogOptions: &v1.PodLogOptions{ + Stream: &streamAll, + }, + expectedOutput: "foo\nbar\n", + }, + { + name: "missing stream param should return all logs", + logs: []logEntry{ + {stream: v1.LogStreamStdout, msg: "foo\n"}, + {stream: v1.LogStreamStderr, msg: "bar\n"}, + }, + query: "?limitBytes=100", + expectedLogOptions: &v1.PodLogOptions{ + Stream: &streamAll, + LimitBytes: ptr.To[int64](100), + }, + expectedOutput: "foo\nbar\n", + }, + { + name: "only stdout logs", + logs: []logEntry{ + {stream: v1.LogStreamStdout, msg: "out1\n"}, + {stream: v1.LogStreamStderr, msg: "err1\n"}, + {stream: v1.LogStreamStdout, msg: "out2\n"}, + }, + query: "?stream=Stdout", + expectedLogOptions: &v1.PodLogOptions{ + Stream: &streamStdout, + }, + expectedOutput: "out1\nout2\n", + }, + { + name: "only stderr logs", + logs: []logEntry{ + {stream: v1.LogStreamStderr, msg: "err1\n"}, + {stream: v1.LogStreamStderr, msg: "err2\n"}, + {stream: v1.LogStreamStdout, msg: "out1\n"}, + }, + query: "?stream=Stderr", + expectedLogOptions: &v1.PodLogOptions{ + Stream: &streamStderr, + }, + expectedOutput: "err1\nerr2\n", + }, + { + name: "return all logs", + logs: []logEntry{ + {stream: v1.LogStreamStdout, msg: "out1\n"}, + {stream: v1.LogStreamStderr, msg: "err1\n"}, + {stream: v1.LogStreamStdout, msg: "out2\n"}, + }, + query: "?stream=All", + expectedLogOptions: &v1.PodLogOptions{ + Stream: &streamAll, + }, + expectedOutput: "out1\nerr1\nout2\n", + }, + { + name: "stdout logs with legacy tail", + logs: []logEntry{ + {stream: v1.LogStreamStdout, msg: "out1\n"}, + {stream: v1.LogStreamStderr, msg: "err1\n"}, + {stream: v1.LogStreamStdout, msg: "out2\n"}, + }, + query: "?stream=All&tail=1", + expectedLogOptions: &v1.PodLogOptions{ + Stream: &streamAll, + TailLines: ptr.To[int64](1), + }, + expectedOutput: "out2\n", + }, + { + name: "return the last 2 lines of logs", + logs: []logEntry{ + {stream: v1.LogStreamStdout, msg: "out1\n"}, + {stream: v1.LogStreamStderr, msg: "err1\n"}, + {stream: v1.LogStreamStdout, msg: "out2\n"}, + }, + query: "?stream=All&tailLines=2", + expectedLogOptions: &v1.PodLogOptions{ + Stream: &streamAll, + TailLines: ptr.To[int64](2), + }, + expectedOutput: "err1\nout2\n", + }, + { + name: "return the first 6 bytes of the stdout log stream", + logs: []logEntry{ + {stream: v1.LogStreamStderr, msg: "err1\n"}, + {stream: v1.LogStreamStdout, msg: "out1\n"}, + {stream: v1.LogStreamStderr, msg: "err2\n"}, + {stream: v1.LogStreamStdout, msg: "out2\n"}, + }, + query: "?stream=Stdout&limitBytes=6", + expectedLogOptions: &v1.PodLogOptions{ + Stream: &streamStdout, + LimitBytes: ptr.To[int64](6), + }, + expectedOutput: "out1\no", + }, + { + name: "invalid stream", + logs: []logEntry{ + {stream: v1.LogStreamStderr, msg: "err1\n"}, + {stream: v1.LogStreamStdout, msg: "out1\n"}, + }, + query: "?stream=invalid", + expectedLogOptions: nil, + expectedOutput: `{"message": "Invalid request."}`, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + podNamespace := "other" + podName := "foo" + expectedContainerName := "baz" + setPodByNameFunc(fw, podNamespace, podName, expectedContainerName) + fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error { + if !reflect.DeepEqual(tc.expectedLogOptions, logOptions) { + t.Errorf("expected %#v, got %#v", tc.expectedLogOptions, logOptions) + } + + var dst io.Writer + tailLines := len(tc.logs) + if logOptions.TailLines != nil { + tailLines = int(*logOptions.TailLines) + } + + remain := 0 + if logOptions.LimitBytes != nil { + remain = int(*logOptions.LimitBytes) + } else { + for _, log := range tc.logs { + remain += len(log.msg) + } + } + + logs := tc.logs[len(tc.logs)-tailLines:] + for _, log := range logs { + switch log.stream { + case v1.LogStreamStdout: + dst = stdout + case v1.LogStreamStderr: + dst = stderr + } + // Skip if the stream is not requested + if dst == nil { + continue + } + line := log.msg + if len(line) > remain { + line = line[:remain] + } + _, _ = io.WriteString(dst, line) + remain -= len(line) + if remain <= 0 { + return nil + } + } + return nil + } + resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + tc.query) + if err != nil { + t.Errorf("Got error GETing: %v", err) + } + defer func() { + _ = resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Errorf("Error reading container logs: %v", err) + } + result := string(body) + if result != tc.expectedOutput { + t.Errorf("Expected: %q, got: %q", tc.expectedOutput, result) + } + }) + } +} + func TestCheckpointContainer(t *testing.T) { podNamespace := "other" podName := "foo" diff --git a/staging/src/k8s.io/cri-client/pkg/logs/logs.go b/staging/src/k8s.io/cri-client/pkg/logs/logs.go index f086bf4c3abfb..bbfc2b603302e 100644 --- a/staging/src/k8s.io/cri-client/pkg/logs/logs.go +++ b/staging/src/k8s.io/cri-client/pkg/logs/logs.go @@ -267,6 +267,11 @@ func (w *logWriter) write(msg *logMessage, addPrefix bool) error { default: return fmt.Errorf("unexpected stream type %q", msg.stream) } + // Since v1.32, either w.stdout or w.stderr may be nil if the stream is not requested. + // In such case, we should neither count the bytes nor write to the stream. + if stream == nil { + return nil + } n, err := stream.Write(line) w.remain -= int64(n) if err != nil { diff --git a/staging/src/k8s.io/cri-client/pkg/logs/logs_test.go b/staging/src/k8s.io/cri-client/pkg/logs/logs_test.go index 692a645f649ed..8f4b13c2aa07e 100644 --- a/staging/src/k8s.io/cri-client/pkg/logs/logs_test.go +++ b/staging/src/k8s.io/cri-client/pkg/logs/logs_test.go @@ -20,6 +20,7 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "os" @@ -28,6 +29,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -540,3 +542,57 @@ func TestReadLogsLimitsWithTimestamps(t *testing.T) { assert.Equal(t, 2, lineCount, "should have two lines") } + +func TestOnlyStdoutStream(t *testing.T) { + timestamp := time.Unix(1234, 43210) + msgs := []*logMessage{ + { + timestamp: timestamp, + stream: runtimeapi.Stdout, + log: []byte("out1\n"), + }, + { + timestamp: timestamp, + stream: runtimeapi.Stderr, + log: []byte("err1\n"), + }, + { + timestamp: timestamp, + stream: runtimeapi.Stdout, + log: []byte("out2\n"), + }, + } + + testCases := map[string]struct { + limitBytes int64 + + expectedStdout string + }{ + "all stdout logs": { + limitBytes: -1, + + expectedStdout: "out1\nout2\n", + }, + "the first 7 bytes from stdout": { + limitBytes: 7, + + expectedStdout: "out1\nou", + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + stdoutBuf := bytes.NewBuffer(nil) + w := newLogWriter(stdoutBuf, nil, &LogOptions{ + bytes: tc.limitBytes, + }) + for _, msg := range msgs { + err := w.write(msg, false) + if errors.Is(err, errMaximumWrite) { + continue + } + require.NoError(t, err) + } + assert.EqualValues(t, tc.expectedStdout, stdoutBuf.String()) + }) + } +}