Skip to content

Commit

Permalink
feat(kubelet): only returns logs that match the given stream
Browse files Browse the repository at this point in the history
Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>
  • Loading branch information
knight42 committed Nov 7, 2024
1 parent 0793f65 commit 94cd0a0
Show file tree
Hide file tree
Showing 5 changed files with 361 additions and 31 deletions.
13 changes: 8 additions & 5 deletions pkg/kubelet/kubelet_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -1566,11 +1566,14 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
return err
}

// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
// Since v1.32, stdout may be nil if the stream is not requested.
if stdout != nil {
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
}
}

return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)
Expand Down
45 changes: 42 additions & 3 deletions pkg/kubelet/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ import (
oteltrace "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/utils/clock"
netutils "k8s.io/utils/net"
"k8s.io/utils/ptr"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -83,6 +83,7 @@ import (
apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
"k8s.io/kubernetes/pkg/kubelet/prober"
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
Expand Down Expand Up @@ -723,6 +724,13 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
// Even with defaulters, logOptions.Stream can be nil if no arguments are provided at all.
if logOptions.Stream == nil {
// Default to "All" to maintain backward compatibility.
logOptions.Stream = ptr.To(v1.LogStreamAll)
}
}
logOptions.TypeMeta = metav1.TypeMeta{}
if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
Expand All @@ -744,9 +752,40 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
return
}
fw := flushwriter.Wrap(response.ResponseWriter)

var (
stdout io.Writer
stderr io.Writer
fw = flushwriter.Wrap(response.ResponseWriter)
)
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
wantedStream := logOptions.Stream
// No stream type specified, default to All
if wantedStream == nil {
allStream := v1.LogStreamAll
wantedStream = &allStream
}
switch *wantedStream {
case v1.LogStreamStdout:
stdout, stderr = fw, nil
case v1.LogStreamStderr:
stdout, stderr = nil, fw
case v1.LogStreamAll:
stdout, stderr = fw, fw
default:
_ = response.WriteError(http.StatusBadRequest, fmt.Errorf("invalid stream type %q", *logOptions.Stream))
return
}
} else {
if logOptions.Stream != nil && *logOptions.Stream != v1.LogStreamAll {
_ = response.WriteError(http.StatusBadRequest, fmt.Errorf("unable to return the given log stream: %q. Please enable PodLogsQuerySplitStreams feature gate in kubelet", *logOptions.Stream))
return
}
stdout, stderr = fw, fw
}

response.Header().Set("Transfer-Encoding", "chunked")
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, stdout, stderr); err != nil {
response.WriteError(http.StatusBadRequest, err)
return
}
Expand Down
273 changes: 250 additions & 23 deletions pkg/kubelet/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/utils/ptr"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -50,7 +52,6 @@ import (
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/utils/ptr"

// Do some initialization to decode the query parameters correctly.
"k8s.io/apiserver/pkg/server/healthz"
Expand Down Expand Up @@ -820,29 +821,41 @@ func TestContainerLogs(t *testing.T) {
}

for desc, test := range tests {
t.Run(desc, func(t *testing.T) {
output := "foo bar"
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, test.podLogOption, output)
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer resp.Body.Close()
// To make sure the original behavior doesn't change no matter the feature PodLogsQuerySplitStreams is enabled or not.
for _, enablePodLogsQuerySplitStreams := range []bool{true, false} {
t.Run(fmt.Sprintf("%s (enablePodLogsQuerySplitStreams=%v)", desc, enablePodLogsQuerySplitStreams), func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, enablePodLogsQuerySplitStreams)
expectedLogOptions := test.podLogOption.DeepCopy()
if enablePodLogsQuerySplitStreams && expectedLogOptions.Stream == nil {
// The HTTP handler will internally set the default stream value.
expectedLogOptions.Stream = ptr.To(v1.LogStreamAll)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != output {
t.Errorf("Expected: '%v', got: '%v'", output, result)
}
})
output := "foo bar"
podNamespace := "other"
podName := "foo"
expectedPodName := getPodName(podName, podNamespace)
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, expectedLogOptions, output)
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()

body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != output {
t.Errorf("Expected: '%v', got: '%v'", output, result)
}
})
}
}
}

Expand All @@ -866,6 +879,220 @@ func TestContainerLogsWithInvalidTail(t *testing.T) {
}
}

func TestContainerLogsWithSeparateStream(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, true)

type logEntry struct {
stream string
msg string
}

fw := newServerTest()
defer fw.testHTTPServer.Close()

var (
streamStdout = v1.LogStreamStdout
streamStderr = v1.LogStreamStderr
streamAll = v1.LogStreamAll
)

testCases := []struct {
name string
query string
logs []logEntry
expectedOutput string
expectedLogOptions *v1.PodLogOptions
}{
{
// Defaulters don't work if the query is empty.
// See also https://github.com/kubernetes/kubernetes/issues/128589
name: "empty query should return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "foo\n"},
{stream: v1.LogStreamStderr, msg: "bar\n"},
},
query: "",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
},
expectedOutput: "foo\nbar\n",
},
{
name: "missing stream param should return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "foo\n"},
{stream: v1.LogStreamStderr, msg: "bar\n"},
},
query: "?limitBytes=100",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
LimitBytes: ptr.To[int64](100),
},
expectedOutput: "foo\nbar\n",
},
{
name: "only stdout logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=Stdout",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStdout,
},
expectedOutput: "out1\nout2\n",
},
{
name: "only stderr logs",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStderr, msg: "err2\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
},
query: "?stream=Stderr",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStderr,
},
expectedOutput: "err1\nerr2\n",
},
{
name: "return all logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
},
expectedOutput: "out1\nerr1\nout2\n",
},
{
name: "stdout logs with legacy tail",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All&tail=1",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
TailLines: ptr.To[int64](1),
},
expectedOutput: "out2\n",
},
{
name: "return the last 2 lines of logs",
logs: []logEntry{
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=All&tailLines=2",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamAll,
TailLines: ptr.To[int64](2),
},
expectedOutput: "err1\nout2\n",
},
{
name: "return the first 6 bytes of the stdout log stream",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
{stream: v1.LogStreamStderr, msg: "err2\n"},
{stream: v1.LogStreamStdout, msg: "out2\n"},
},
query: "?stream=Stdout&limitBytes=6",
expectedLogOptions: &v1.PodLogOptions{
Stream: &streamStdout,
LimitBytes: ptr.To[int64](6),
},
expectedOutput: "out1\no",
},
{
name: "invalid stream",
logs: []logEntry{
{stream: v1.LogStreamStderr, msg: "err1\n"},
{stream: v1.LogStreamStdout, msg: "out1\n"},
},
query: "?stream=invalid",
expectedLogOptions: nil,
expectedOutput: `{"message": "Invalid request."}`,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
podNamespace := "other"
podName := "foo"
expectedContainerName := "baz"
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
if !reflect.DeepEqual(tc.expectedLogOptions, logOptions) {
t.Errorf("expected %#v, got %#v", tc.expectedLogOptions, logOptions)
}

var dst io.Writer
tailLines := len(tc.logs)
if logOptions.TailLines != nil {
tailLines = int(*logOptions.TailLines)
}

remain := 0
if logOptions.LimitBytes != nil {
remain = int(*logOptions.LimitBytes)
} else {
for _, log := range tc.logs {
remain += len(log.msg)
}
}

logs := tc.logs[len(tc.logs)-tailLines:]
for _, log := range logs {
switch log.stream {
case v1.LogStreamStdout:
dst = stdout
case v1.LogStreamStderr:
dst = stderr
}
// Skip if the stream is not requested
if dst == nil {
continue
}
line := log.msg
if len(line) > remain {
line = line[:remain]
}
_, _ = io.WriteString(dst, line)
remain -= len(line)
if remain <= 0 {
return nil
}
}
return nil
}
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + tc.query)
if err != nil {
t.Errorf("Got error GETing: %v", err)
}
defer func() {
_ = resp.Body.Close()
}()

body, err := io.ReadAll(resp.Body)
if err != nil {
t.Errorf("Error reading container logs: %v", err)
}
result := string(body)
if result != tc.expectedOutput {
t.Errorf("Expected: %q, got: %q", tc.expectedOutput, result)
}
})
}
}

func TestCheckpointContainer(t *testing.T) {
podNamespace := "other"
podName := "foo"
Expand Down
Loading

0 comments on commit 94cd0a0

Please sign in to comment.