Skip to content

Commit

Permalink
Move pkg/kubelet/cri/remote to cri-client
Browse files Browse the repository at this point in the history
Signed-off-by: Sascha Grunert <sgrunert@redhat.com>
  • Loading branch information
saschagrunert committed May 14, 2024
1 parent 4f04dff commit 2aa9e76
Show file tree
Hide file tree
Showing 39 changed files with 891 additions and 473 deletions.
4 changes: 2 additions & 2 deletions cmd/kubemark/app/hollow_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ import (
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/component-base/version"
"k8s.io/component-base/version/verflag"
remote "k8s.io/cri-client/pkg"
fakeremote "k8s.io/cri-client/pkg/fake"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/cluster/ports"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/certificate/bootstrap"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/cri/remote"
fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake"
"k8s.io/kubernetes/pkg/kubemark"
kubemarkproxy "k8s.io/kubernetes/pkg/proxy/kubemark"
utilflag "k8s.io/kubernetes/pkg/util/flag"
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.13
go.etcd.io/etcd/client/v3 v3.5.13
go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful v0.42.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0
go.opentelemetry.io/otel v1.20.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.20.0
go.opentelemetry.io/otel/sdk v1.20.0
Expand Down Expand Up @@ -100,6 +99,7 @@ require (
k8s.io/component-helpers v0.0.0
k8s.io/controller-manager v0.0.0
k8s.io/cri-api v0.0.0
k8s.io/cri-client v0.0.0
k8s.io/csi-translation-lib v0.0.0
k8s.io/dynamic-resource-allocation v0.0.0
k8s.io/endpointslice v0.0.0
Expand Down Expand Up @@ -203,6 +203,7 @@ require (
go.etcd.io/etcd/raft/v3 v3.5.13 // indirect
go.etcd.io/etcd/server/v3 v3.5.13 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.44.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.20.0 // indirect
go.opentelemetry.io/otel/metric v1.20.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/apis/podresources/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"k8s.io/cri-client/pkg/util"
"k8s.io/kubelet/pkg/apis/podresources/v1"
"k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/util"
)

// Note: Consumers of the pod resources API should not be importing this package.
Expand Down
6 changes: 0 additions & 6 deletions pkg/kubelet/cri/remote/OWNERS

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"k8s.io/component-helpers/apimachinery/lease"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
remote "k8s.io/cri-client/pkg"
"k8s.io/klog/v2"
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
Expand All @@ -80,7 +81,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/cri/remote"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images"
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
remote "k8s.io/cri-client/pkg"
fakeremote "k8s.io/cri-client/pkg/fake"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/features"
Expand All @@ -68,8 +70,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/configmap"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/cri/remote"
fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kuberuntime/kuberuntime_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
remote "k8s.io/cri-client/pkg"
kubelettypes "k8s.io/kubelet/pkg/types"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/cri/remote"
"k8s.io/kubernetes/pkg/kubelet/events"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/types"
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/kuberuntime/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
v1 "k8s.io/api/core/v1"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/kubernetes/pkg/kubelet/cri/remote"
remote "k8s.io/cri-client/pkg"
"k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util/tail"
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/prometheus/slis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/cri-client/pkg/util"
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
podresourcesapiv1alpha1 "k8s.io/kubelet/pkg/apis/podresources/v1alpha1"
"k8s.io/kubelet/pkg/cri/streaming"
Expand All @@ -84,7 +85,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/prober"
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/util"
)

func init() {
Expand Down
98 changes: 0 additions & 98 deletions pkg/kubelet/util/util_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,113 +20,15 @@ limitations under the License.
package util

import (
"context"
"fmt"
"net"
"net/url"
"os"
"path/filepath"

"golang.org/x/sys/unix"
"k8s.io/klog/v2"
)

const (
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
)

// CreateListener creates a listener on the specified endpoint.
func CreateListener(endpoint string) (net.Listener, error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return nil, err
}
if protocol != unixProtocol {
return nil, fmt.Errorf("only support unix socket endpoint")
}

// Unlink to cleanup the previous socket file.
err = unix.Unlink(addr)
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("failed to unlink socket file %q: %v", addr, err)
}

if err := os.MkdirAll(filepath.Dir(addr), 0750); err != nil {
return nil, fmt.Errorf("error creating socket directory %q: %v", filepath.Dir(addr), err)
}

// Create the socket on a tempfile and move it to the destination socket to handle improper cleanup
file, err := os.CreateTemp(filepath.Dir(addr), "")
if err != nil {
return nil, fmt.Errorf("failed to create temporary file: %v", err)
}

if err := os.Remove(file.Name()); err != nil {
return nil, fmt.Errorf("failed to remove temporary file: %v", err)
}

l, err := net.Listen(protocol, file.Name())
if err != nil {
return nil, err
}

if err = os.Rename(file.Name(), addr); err != nil {
return nil, fmt.Errorf("failed to move temporary file to addr %q: %v", addr, err)
}

return l, nil
}

// GetAddressAndDialer returns the address parsed from the given endpoint and a context dialer.
func GetAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) {
protocol, addr, err := parseEndpointWithFallbackProtocol(endpoint, unixProtocol)
if err != nil {
return "", nil, err
}
if protocol != unixProtocol {
return "", nil, fmt.Errorf("only support unix socket endpoint")
}

return addr, dial, nil
}

func dial(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, unixProtocol, addr)
}

func parseEndpointWithFallbackProtocol(endpoint string, fallbackProtocol string) (protocol string, addr string, err error) {
if protocol, addr, err = parseEndpoint(endpoint); err != nil && protocol == "" {
fallbackEndpoint := fallbackProtocol + "://" + endpoint
protocol, addr, err = parseEndpoint(fallbackEndpoint)
if err == nil {
klog.InfoS("Using this endpoint is deprecated, please consider using full URL format", "endpoint", endpoint, "URL", fallbackEndpoint)
}
}
return
}

func parseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", err
}

switch u.Scheme {
case "tcp":
return "tcp", u.Host, nil

case "unix":
return "unix", u.Path, nil

case "":
return "", "", fmt.Errorf("using %q as endpoint is deprecated, please consider using full url format", endpoint)

default:
return u.Scheme, "", fmt.Errorf("protocol %q not supported", u.Scheme)
}
}

// LocalEndpoint returns the full path to a unix socket at the given endpoint
func LocalEndpoint(path, file string) (string, error) {
u := url.URL{
Expand Down
92 changes: 0 additions & 92 deletions pkg/kubelet/util/util_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,98 +25,6 @@ import (
"github.com/stretchr/testify/assert"
)

func TestParseEndpoint(t *testing.T) {
tests := []struct {
endpoint string
expectError bool
expectedProtocol string
expectedAddr string
}{
{
endpoint: "unix:///tmp/s1.sock",
expectedProtocol: "unix",
expectedAddr: "/tmp/s1.sock",
},
{
endpoint: "tcp://localhost:15880",
expectedProtocol: "tcp",
expectedAddr: "localhost:15880",
},
{
endpoint: "npipe://./pipe/mypipe",
expectedProtocol: "npipe",
expectError: true,
},
{
endpoint: "tcp1://abc",
expectedProtocol: "tcp1",
expectError: true,
},
{
endpoint: "a b c",
expectError: true,
},
}

for _, test := range tests {
protocol, addr, err := parseEndpoint(test.endpoint)
assert.Equal(t, test.expectedProtocol, protocol)
if test.expectError {
assert.NotNil(t, err, "Expect error during parsing %q", test.endpoint)
continue
}
assert.Nil(t, err, "Expect no error during parsing %q", test.endpoint)
assert.Equal(t, test.expectedAddr, addr)
}

}

func TestGetAddressAndDialer(t *testing.T) {
tests := []struct {
endpoint string
expectError bool
expectedAddr string
}{
{
endpoint: "unix:///tmp/s1.sock",
expectError: false,
expectedAddr: "/tmp/s1.sock",
},
{
endpoint: "unix:///tmp/f6.sock",
expectError: false,
expectedAddr: "/tmp/f6.sock",
},
{
endpoint: "tcp://localhost:9090",
expectError: true,
},
{
// The misspelling is intentional to make it error
endpoint: "htta://free-test.com",
expectError: true,
},
{
endpoint: "https://www.youtube.com/",
expectError: true,
},
{
endpoint: "http://www.baidu.com/",
expectError: true,
},
}
for _, test := range tests {
// just test addr and err
addr, _, err := GetAddressAndDialer(test.endpoint)
if test.expectError {
assert.NotNil(t, err, "expected error during parsing %s", test.endpoint)
continue
}
assert.Nil(t, err, "expected no error during parsing %s", test.endpoint)
assert.Equal(t, test.expectedAddr, addr)
}
}

func TestLocalEndpoint(t *testing.T) {
tests := []struct {
path string
Expand Down
12 changes: 0 additions & 12 deletions pkg/kubelet/util/util_unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,10 @@ limitations under the License.
package util

import (
"context"
"fmt"
"net"
"time"
)

// CreateListener creates a listener on the specified endpoint.
func CreateListener(endpoint string) (net.Listener, error) {
return nil, fmt.Errorf("CreateListener is unsupported in this build")
}

// GetAddressAndDialer returns the address parsed from the given endpoint and a context dialer.
func GetAddressAndDialer(endpoint string) (string, func(ctx context.Context, addr string) (net.Conn, error), error) {
return "", nil, fmt.Errorf("GetAddressAndDialer is unsupported in this build")
}

// LockAndCheckSubPath empty implementation
func LockAndCheckSubPath(volumePath, subPath string) ([]uintptr, error) {
return []uintptr{}, nil
Expand Down
Loading

0 comments on commit 2aa9e76

Please sign in to comment.