From 5ee9bb5ff34cb2b487bebb3e9ce519bb8f71a936 Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Tue, 27 Dec 2022 17:43:25 +0000 Subject: [PATCH 1/2] Implement prototype of EncodeStreaming --- go.mod | 2 +- staging/src/k8s.io/apimachinery/go.mod | 1 + staging/src/k8s.io/apimachinery/go.sum | 2 + .../apimachinery/pkg/runtime/interfaces.go | 2 + .../runtime/serializer/protobuf/streaming.go | 103 ++++++++++++++++++ .../serializer/protobuf/streaming_test.go | 77 +++++++++++++ .../pkg/registry/generic/registry/store.go | 2 +- .../apiserver/pkg/storage/cacher/cacher.go | 68 ++++++++++++ 8 files changed, 255 insertions(+), 2 deletions(-) create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming.go create mode 100644 staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming_test.go diff --git a/go.mod b/go.mod index d07d1f764c9bf..2e6ff9808c30d 100644 --- a/go.mod +++ b/go.mod @@ -92,7 +92,7 @@ require ( gopkg.in/square/go-jose.v2 v2.2.2 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 - k8s.io/api v0.0.0 + k8s.io/api v0.26.0 k8s.io/apiextensions-apiserver v0.0.0 k8s.io/apimachinery v0.0.0 k8s.io/apiserver v0.0.0 diff --git a/staging/src/k8s.io/apimachinery/go.mod b/staging/src/k8s.io/apimachinery/go.mod index eeed4159f8c0b..a4ef6ec7f4ddf 100644 --- a/staging/src/k8s.io/apimachinery/go.mod +++ b/staging/src/k8s.io/apimachinery/go.mod @@ -45,6 +45,7 @@ require ( gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/api v0.26.0 // indirect ) replace k8s.io/apimachinery => ../apimachinery diff --git a/staging/src/k8s.io/apimachinery/go.sum b/staging/src/k8s.io/apimachinery/go.sum index d387337fc4342..da186601ef6a2 100644 --- a/staging/src/k8s.io/apimachinery/go.sum +++ b/staging/src/k8s.io/apimachinery/go.sum @@ -172,6 +172,8 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/api v0.26.0 h1:IpPlZnxBpV1xl7TGk/X6lFtpgjgntCg8PJ+qrPHAC7I= +k8s.io/api v0.26.0/go.mod h1:k6HDTaIFC8yn1i6pSClSqIwLABIcLV9l5Q4EcngKnQg= k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4= k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 h1:+70TFaan3hfJzs+7VK2o+OGxg8HsuBr/5f6tVAjDu6E= diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go b/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go index 710a977952f29..7ba09988142d2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go @@ -53,6 +53,8 @@ type Encoder interface { // Encode writes an object to a stream. Implementations may return errors if the versions are // incompatible, or if no conversion is defined. Encode(obj Object, w io.Writer) error + + EncodeStreaming(obj Object, items <-chan Object, w io.Writer) error // Identifier returns an identifier of the encoder. // Identifiers of two different encoders should be equal if and only if for every input // object it will be encoded to the same representation by both of them. diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming.go new file mode 100644 index 0000000000000..f8828435caea2 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming.go @@ -0,0 +1,103 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf + +import ( + "fmt" + "io" + + "github.com/gogo/protobuf/proto" + "k8s.io/apimachinery/pkg/api/meta" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" +) + +func getListMeta(obj runtime.Object) (v1.ListMeta, error) { + v, err := conversion.EnforcePtr(obj) + if err != nil { + return v1.ListMeta{}, err + } + listMeta := v.FieldByName("ListMeta") + if !listMeta.IsValid() { + return v1.ListMeta{}, fmt.Errorf("expected ListMeta") + } + return listMeta.Interface().(v1.ListMeta), nil +} + +// MarshalToWriter marshals object referenceList +func MarshalToWriter(refObj runtime.Object, items <-chan runtime.Object, w io.Writer) error { + if items, err := meta.ExtractList(refObj); err != nil { + return fmt.Errorf("failed to extract items: %v", err) + } else if len(items) != 0 { + return fmt.Errorf("got obj with nonzero items: %v", len(items)) + } + if _, err := w.Write([]byte{0xa}); err != nil { + return err + } + listMeta, err := getListMeta(refObj) + if err != nil { + return fmt.Errorf("failed to get listMeta: %v", err) + } + data, err := listMeta.Marshal() + if err != nil { + return err + } + if err := writeVarintGenerated(w, uint64(len(data))); err != nil { + return err + } + if _, err := w.Write(data); err != nil { + return err + } + + for item := range items { + marshaler, ok := item.(proto.Marshaler) + if !ok { + return fmt.Errorf("item doesn't implement proto.Marshaler: %v", item) + } + data, err := marshaler.Marshal() + if err != nil { + return err + } + + // TODO(mborsz): Make 0x12 depend on the actual proto tag of items. + if _, err := w.Write([]byte{0x12}); err != nil { + return err + } + + if err := writeVarintGenerated(w, uint64(len(data))); err != nil { + return err + } + + if _, err := w.Write(data); err != nil { + return err + } + } + + return nil +} + +func writeVarintGenerated(w io.Writer, v uint64) error { + for v >= 1<<7 { + if _, err := w.Write([]byte{uint8(v&0x7f | 0x80)}); err != nil { + return err + } + v >>= 7 + } + _, err := w.Write([]byte{uint8(v)}) + return err +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming_test.go new file mode 100644 index 0000000000000..0c5b541ff2a97 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming_test.go @@ -0,0 +1,77 @@ +package protobuf + +import ( + "bytes" + "fmt" + "testing" + + "github.com/gogo/protobuf/proto" + fuzz "github.com/google/gofuzz" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestSimplePodListStreaming(t *testing.T) { + a := &corev1.PodList{ + ListMeta: v1.ListMeta{ + ResourceVersion: "123456", + SelfLink: "self/link", + Continue: "some-base64-stuff", + RemainingItemCount: proto.Int64(1234), + }, + Items: []corev1.Pod{ + {ObjectMeta: v1.ObjectMeta{Name: "pod-1"}}, + {ObjectMeta: v1.ObjectMeta{Name: "pod-2"}}, + {ObjectMeta: v1.ObjectMeta{Name: "pod-3"}}, + }, + } + + ch := make(chan runtime.Object, len(a.Items)) + for _, pod := range a.Items { + pod := pod + ch <- &pod + } + close(ch) + refObj := &corev1.PodList{ + ListMeta: a.ListMeta, + } + data, err := a.Marshal() + require.NoError(t, err, "unexpected error while serializing PodList") + buf := &bytes.Buffer{} + err = MarshalToWriter(refObj, ch, buf) + require.NoError(t, err, "unexpected error while serializing PodListStreaming") + + assert.Equal(t, data, buf.Bytes()) +} + +func TestFuzzPodListStreaming(t *testing.T) { + f := fuzz.New() + for i := 0; i < 100; i++ { + t.Run(fmt.Sprintf("Run %d/100", i), func(t *testing.T) { + a := &corev1.PodList{} + f.Fuzz(a) + + t.Logf("PodList: %+v", a) + + ch := make(chan runtime.Object, len(a.Items)) + for _, pod := range a.Items { + pod := pod + ch <- &pod + } + close(ch) + refObj := &corev1.PodList{ + ListMeta: a.ListMeta, + } + data, err := a.Marshal() + require.NoError(t, err, "unexpected error while serializing PodList") + buf := &bytes.Buffer{} + err = MarshalToWriter(refObj, ch, buf) + require.NoError(t, err, "unexpected error while serializing PodListStreaming") + assert.Equal(t, data, buf.Bytes()) + + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 40bca49665f68..13a5c579dffe1 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1,4 +1,4 @@ -/* +p/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index bc6500909c3ba..d692213e8f2cd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -650,6 +650,74 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex()) } +// listObj here is an object with Items set to '<-chan runtime.Object'. +func (c *Cacher) GetListStreaming(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + recursive := opts.Recursive + resourceVersion := opts.ResourceVersion + pred := opts.Predicate + if shouldDelegateList(opts) { + return c.storage.GetListStreaming(ctx, key, opts, listObj) + } + + // If resourceVersion is specified, serve it from cache. + // It's guaranteed that the returned value is at least that + // fresh as the given resourceVersion. + listRV, err := c.versioner.ParseResourceVersion(resourceVersion) + if err != nil { + return err + } + + if listRV == 0 && !c.ready.check() { + // If Cacher is not yet initialized and we don't require any specific + // minimal resource version, simply forward the request to storage. + return c.storage.GetListStreaming(ctx, key, opts, listObj) + } + + ctx, span := tracing.Start(ctx, "cacher list", + attribute.String("audit-id", audit.GetAuditIDTruncated(ctx)), + attribute.Stringer("type", c.groupResource)) + defer span.End(500 * time.Millisecond) + + if err := c.ready.wait(); err != nil { + return errors.NewServiceUnavailable(err.Error()) + } + span.AddEvent("Ready") + + filter := filterWithAttrsFunction(key, pred) + + // objs is a slice of pointers. + objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, key, pred, recursive) + if err != nil { + return err + } + if c.versioner != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { + return err + } + } + + var count int + span.AddEvent("Listed items from cache", attribute.Int("count", len(objs))) + + resultChan := make(chan runtime.Object) + defer close(resultChan) + + for _, obj := range objs { + elem, ok := obj.(*storeElement) + if !ok { + return nil, fmt.Errorf("non *storeElement returned from storage: %v", obj) + } + if filter(elem.Key, elem.Labels, elem.Fields) { + resultChan <- elem.Object + count++ + } + } + span.AddEvent("Filtered items", attribute.Int("count", count)) + + metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), count) + return nil +} + // GetList implements storage.Interface func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { recursive := opts.Recursive From b49532753b4077a6edb6b5e6407ae8738e41ac90 Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Tue, 27 Dec 2022 19:49:01 +0000 Subject: [PATCH 2/2] next iter --- .../pkg/apis/meta/v1/unstructured/helpers.go | 9 +++++++ .../k8s.io/apimachinery/pkg/runtime/codec.go | 5 ++++ .../apimachinery/pkg/runtime/interfaces.go | 8 +++++- .../pkg/runtime/serializer/json/json.go | 6 +++++ .../runtime/serializer/protobuf/protobuf.go | 9 +++++++ .../runtime/serializer/protobuf/streaming.go | 8 ++++-- .../serializer/protobuf/streaming_test.go | 8 +++--- .../serializer/versioning/versioning.go | 6 +++++ staging/src/k8s.io/apiserver/go.mod | 2 +- .../apiserver/pkg/endpoints/discovery/util.go | 5 ++++ .../apiserver/pkg/endpoints/handlers/get.go | 27 +++++++++++++++++++ 11 files changed, 85 insertions(+), 8 deletions(-) diff --git a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/helpers.go b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/helpers.go index 2e33283ef221c..a95ef10990c32 100644 --- a/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/helpers.go +++ b/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured/helpers.go @@ -17,6 +17,7 @@ limitations under the License. package unstructured import ( + "context" gojson "encoding/json" "fmt" "io" @@ -352,6 +353,10 @@ func (s unstructuredJSONScheme) Encode(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w) } +func (s unstructuredJSONScheme) EncodeStreaming(ctx context.Context, obj runtime.Object, items <-chan runtime.ObjectOrError, w io.Writer) error { + return fmt.Errorf("not implemented") +} + func (unstructuredJSONScheme) doEncode(obj runtime.Object, w io.Writer) error { switch t := obj.(type) { case *Unstructured: @@ -495,6 +500,10 @@ func (c *jsonFallbackEncoder) Encode(obj runtime.Object, w io.Writer) error { return err } +func (c *jsonFallbackEncoder) EncodeStreaming(ctx context.Context, obj runtime.Object, items <-chan runtime.ObjectOrError, w io.Writer) error { + return fmt.Errorf("not implemented") +} + // Identifier implements runtime.Encoder interface. func (c *jsonFallbackEncoder) Identifier() runtime.Identifier { return c.identifier diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/codec.go b/staging/src/k8s.io/apimachinery/pkg/runtime/codec.go index 7fc513dd0e7f4..dd7dfcd0157ef 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/codec.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/codec.go @@ -18,6 +18,7 @@ package runtime import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" @@ -112,6 +113,10 @@ func (n NoopEncoder) Encode(obj Object, w io.Writer) error { return fmt.Errorf("encoding is not allowed for this codec: %v", reflect.TypeOf(n.Decoder)) } +func (n NoopEncoder) EncodeStreaming(ctx context.Context, obj Object, items <-chan ObjectOrError, w io.Writer) error { + return fmt.Errorf("not implemented") +} + // Identifier implements runtime.Encoder interface. func (n NoopEncoder) Identifier() Identifier { return noopEncoderIdentifier diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go b/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go index 7ba09988142d2..ea8be21df0050 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/interfaces.go @@ -17,6 +17,7 @@ limitations under the License. package runtime import ( + "context" "io" "net/url" @@ -48,13 +49,18 @@ type GroupVersioner interface { // input the output they produce is exactly the same. type Identifier string +type ObjectOrError struct { + Object Object + Err error +} + // Encoder writes objects to a serialized form type Encoder interface { // Encode writes an object to a stream. Implementations may return errors if the versions are // incompatible, or if no conversion is defined. Encode(obj Object, w io.Writer) error - EncodeStreaming(obj Object, items <-chan Object, w io.Writer) error + EncodeStreaming(ctx context.Context, obj Object, items <-chan ObjectOrError, w io.Writer) error // Identifier returns an identifier of the encoder. // Identifiers of two different encoders should be equal if and only if for every input // object it will be encoded to the same representation by both of them. diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go index 1ae4a32eb720c..b60f386f967a3 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go @@ -17,7 +17,9 @@ limitations under the License. package json import ( + "context" "encoding/json" + "fmt" "io" "strconv" @@ -220,6 +222,10 @@ func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w) } +func (s *Serializer) EncodeStreaming(ctx context.Context, obj runtime.Object, items <-chan runtime.ObjectOrError, w io.Writer) error { + return fmt.Errorf("not implemented") +} + func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { if s.options.Yaml { json, err := json.Marshal(obj) diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go index c63e6dc63f6bb..b602b4b1ce189 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go @@ -18,6 +18,7 @@ package protobuf import ( "bytes" + "context" "fmt" "io" "net/http" @@ -174,6 +175,10 @@ func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error { return s.encode(obj, w, &runtime.SimpleAllocator{}) } +func (s *Serializer) EncodeStreaming(ctx context.Context, obj runtime.Object, items <-chan runtime.ObjectOrError, w io.Writer) error { + return fmt.Errorf("not implemented") +} + func (s *Serializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { if co, ok := obj.(runtime.CacheableObject); ok { return co.CacheEncode(s.Identifier(), func(obj runtime.Object, w io.Writer) error { return s.doEncode(obj, w, memAlloc) }, w) @@ -427,6 +432,10 @@ func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime return s.doEncode(obj, w, memAlloc) } +func (s *RawSerializer) EncodeStreaming(ctx context.Context, obj runtime.Object, items <-chan runtime.ObjectOrError, w io.Writer) error { + return fmt.Errorf("not implemented") +} + func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { if memAlloc == nil { klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator") diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming.go index f8828435caea2..514539880216a 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming.go @@ -40,7 +40,7 @@ func getListMeta(obj runtime.Object) (v1.ListMeta, error) { } // MarshalToWriter marshals object referenceList -func MarshalToWriter(refObj runtime.Object, items <-chan runtime.Object, w io.Writer) error { +func MarshalToWriter(refObj runtime.Object, items <-chan runtime.ObjectOrError, w io.Writer) error { if items, err := meta.ExtractList(refObj); err != nil { return fmt.Errorf("failed to extract items: %v", err) } else if len(items) != 0 { @@ -64,7 +64,11 @@ func MarshalToWriter(refObj runtime.Object, items <-chan runtime.Object, w io.Wr return err } - for item := range items { + for objectOrError := range items { + if objectOrError.Err != nil { + return objectOrError.Err + } + item := objectOrError.Object marshaler, ok := item.(proto.Marshaler) if !ok { return fmt.Errorf("item doesn't implement proto.Marshaler: %v", item) diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming_test.go index 0c5b541ff2a97..0b1e68cfb8520 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/streaming_test.go @@ -29,10 +29,10 @@ func TestSimplePodListStreaming(t *testing.T) { }, } - ch := make(chan runtime.Object, len(a.Items)) + ch := make(chan runtime.ObjectOrError, len(a.Items)) for _, pod := range a.Items { pod := pod - ch <- &pod + ch <- runtime.ObjectOrError{Object: &pod} } close(ch) refObj := &corev1.PodList{ @@ -56,10 +56,10 @@ func TestFuzzPodListStreaming(t *testing.T) { t.Logf("PodList: %+v", a) - ch := make(chan runtime.Object, len(a.Items)) + ch := make(chan runtime.ObjectOrError, len(a.Items)) for _, pod := range a.Items { pod := pod - ch <- &pod + ch <- runtime.ObjectOrError{Object: &pod} } close(ch) refObj := &corev1.PodList{ diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go index 4466331829ebe..a1b959d893de5 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning/versioning.go @@ -17,7 +17,9 @@ limitations under the License. package versioning import ( + "context" "encoding/json" + "fmt" "io" "reflect" "sync" @@ -207,6 +209,10 @@ func (c *codec) Encode(obj runtime.Object, w io.Writer) error { return c.encode(obj, w, nil) } +func (c *codec) EncodeStreaming(ctx context.Context, obj runtime.Object, items <-chan runtime.ObjectOrError, w io.Writer) error { + return fmt.Errorf("not implemented") +} + func (c *codec) encode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { if co, ok := obj.(runtime.CacheableObject); ok { return co.CacheEncode(c.Identifier(), func(obj runtime.Object, w io.Writer) error { return c.doEncode(obj, w, memAlloc) }, w) diff --git a/staging/src/k8s.io/apiserver/go.mod b/staging/src/k8s.io/apiserver/go.mod index 22d36eb25f428..ef387edc6921c 100644 --- a/staging/src/k8s.io/apiserver/go.mod +++ b/staging/src/k8s.io/apiserver/go.mod @@ -42,7 +42,7 @@ require ( google.golang.org/protobuf v1.28.1 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/square/go-jose.v2 v2.2.2 - k8s.io/api v0.0.0 + k8s.io/api v0.26.0 k8s.io/apimachinery v0.0.0 k8s.io/client-go v0.0.0 k8s.io/component-base v0.0.0 diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/util.go b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/util.go index 7487ffc18a4f6..34781a939ab06 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/util.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/discovery/util.go @@ -18,6 +18,7 @@ package discovery import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -70,6 +71,10 @@ func (c stripVersionEncoder) Encode(obj runtime.Object, w io.Writer) error { return c.doEncode(obj, w) } +func (c stripVersionEncoder) EncodeStreaming(ctx context.Context, obj runtime.Object, items <-chan runtime.ObjectOrError, w io.Writer) error { + return fmt.Errorf("not implemented") +} + func (c stripVersionEncoder) doEncode(obj runtime.Object, w io.Writer) error { buf := bytes.NewBuffer([]byte{}) err := c.encoder.Encode(obj, buf) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go index 2f8c6fbc24421..2e9251e168907 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go @@ -33,6 +33,7 @@ import ( metainternalversionscheme "k8s.io/apimachinery/pkg/apis/meta/internalversion/scheme" metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -277,8 +278,34 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc scope.err(err, w, req) return } + obj, items, err := toStreaming(result) + if err != nil { + scope.err(err, w, req) + return + } + span.AddEvent("Listing from storage done") defer span.AddEvent("Writing http response done", attribute.Int("count", meta.LenList(result))) transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result) } } + +func toStreaming(obj runtime.Object) (runtime.Object, <-chan runtime.ObjectOrError, error) { + copy := obj.DeepCopyObject() + itemsPtr, err := meta.GetItemsPtr(copy) + if err != nil { + return nil, nil, err + } + items, err := conversion.EnforcePtr(itemsPtr) + if err != nil { + return nil, nil, err + } + + ch := make(chan runtime.ObjectOrError, items.Len()) + for i := 0; i < items.Len(); i++ { + ch <- runtime.ObjectOrError{Object: items.Index(i).Interface().(runtime.Object)} + } + items.SetLen(0) + + return copy, ch, nil +}