diff --git a/pkg/util/BUILD b/pkg/util/BUILD index a2916a9a1376c..3e0984bdbb60a 100644 --- a/pkg/util/BUILD +++ b/pkg/util/BUILD @@ -30,6 +30,7 @@ filegroup( "//pkg/util/io:all-srcs", "//pkg/util/ipconfig:all-srcs", "//pkg/util/iptables:all-srcs", + "//pkg/util/ipvs:all-srcs", "//pkg/util/keymutex:all-srcs", "//pkg/util/labels:all-srcs", "//pkg/util/limitwriter:all-srcs", diff --git a/pkg/util/ipvs/BUILD b/pkg/util/ipvs/BUILD new file mode 100644 index 0000000000000..2505f84783c03 --- /dev/null +++ b/pkg/util/ipvs/BUILD @@ -0,0 +1,56 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", + "go_test", +) + +go_test( + name = "go_default_test", + srcs = [ + "ipvs_linux_test.go", + "ipvs_test.go", + ], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//vendor/github.com/docker/libnetwork/ipvs:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/utils/exec:go_default_library", + "//vendor/k8s.io/utils/exec/testing:go_default_library", + ], +) + +go_library( + name = "go_default_library", + srcs = [ + "ipvs.go", + "ipvs_linux.go", + ], + tags = ["automanaged"], + deps = [ + "//pkg/util/sysctl:go_default_library", + "//vendor/github.com/docker/libnetwork/ipvs:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/utils/exec:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/util/ipvs/testing:all-srcs", + ], + tags = ["automanaged"], +) diff --git a/pkg/util/ipvs/ipvs.go b/pkg/util/ipvs/ipvs.go new file mode 100644 index 0000000000000..5288297938d89 --- /dev/null +++ b/pkg/util/ipvs/ipvs.go @@ -0,0 +1,92 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "net" + "strconv" +) + +// Interface is an injectable interface for running ipvs commands. Implementations must be goroutine-safe. +type Interface interface { + // Flush clears all services in system. return occurred error immediately. + Flush() error + // EnsureDummyDevice checks if the specified dummy interface is present and, if not, creates it. If the dummy interface existed, return true. + EnsureDummyDevice(dummyDev string) (exist bool, err error) + // DeleteDummyDevice deletes the specified dummy interface. If the dummy interface existed, return error. + DeleteDummyDevice(dummyDev string) error + // EnsureServiceAddressBind checks if service's address is bound to dummy interface and, if not, binds it. If the address is already bound, return true. + EnsureServiceAddressBind(serv *InternalService, dev string) (exist bool, err error) + // UnBindServiceAddress checks if service's address is bound to dummy interface and, if so, unbinds it. + UnBindServiceAddress(serv *InternalService, dev string) error + // AddService creates the specified service. + AddService(*InternalService) error + // DeleteService deletes the specified service. If the service does not exist, return error. + DeleteService(*InternalService) error + // GetService returns the specified service information in the system. + GetService(*InternalService) (*InternalService, error) + // GetServices lists all services in the system. + GetServices() ([]*InternalService, error) + // AddDestination creates the specified destination for the specified service. + AddDestination(*InternalService, *InternalDestination) error + // GetDestinations returns all destinations for the specified service. + GetDestinations(*InternalService) ([]*InternalDestination, error) + // DeleteDestination deletes the specified destination from the specified service. + DeleteDestination(*InternalService, *InternalDestination) error +} + +// InternalService is an internal definition of an IPVS service in its entirety. +type InternalService struct { + Address net.IP + Protocol string + Port uint16 + Scheduler string + Flags ServiceFlags + Timeout uint32 +} + +// ServiceFlags is used to specify session affinity, ip hash etc. +type ServiceFlags uint32 + +const ( + // FlagPersistent specify IPVS service session affinity + FlagPersistent = 0x1 +) + +// Equal check the equality of internal service +func (svc *InternalService) Equal(other *InternalService) bool { + return svc.Address.Equal(other.Address) && + svc.Protocol == other.Protocol && + svc.Port == other.Port && + svc.Scheduler == other.Scheduler && + svc.Timeout == other.Timeout +} + +func (svc *InternalService) String() string { + return net.JoinHostPort(svc.Address.String(), strconv.Itoa(int(svc.Port))) + "/" + svc.Protocol +} + +// InternalDestination is an internal definition of an IPVS destination in its entirety. +type InternalDestination struct { + Address net.IP + Port uint16 + Weight int +} + +func (dest *InternalDestination) String() string { + return net.JoinHostPort(dest.Address.String(), strconv.Itoa(int(dest.Port))) +} diff --git a/pkg/util/ipvs/ipvs_linux.go b/pkg/util/ipvs/ipvs_linux.go new file mode 100644 index 0000000000000..03028e6064784 --- /dev/null +++ b/pkg/util/ipvs/ipvs_linux.go @@ -0,0 +1,322 @@ +// +build linux + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "errors" + "fmt" + "net" + "strings" + "syscall" + + "github.com/docker/libnetwork/ipvs" + "github.com/golang/glog" + utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" + utilexec "k8s.io/utils/exec" +) + +const cmdIP = "ip" + +// runner implements Interface in terms of exec("ipvs") +type runner struct { + exec utilexec.Interface + sysctl utilsysctl.Interface + ipvsHandle *ipvs.Handle +} + +// New returns a new Interface which will call ipvs APIs +func New(exec utilexec.Interface) Interface { + ihandle, err := ipvs.New("") + if err != nil { + glog.Errorf("IPVS interface can't be initialized, error: %v", err) + return nil + } + return &runner{ + exec: exec, + sysctl: utilsysctl.New(), + ipvsHandle: ihandle, + } +} + +// EnsureDummyDevice is part of Interface. +func (runner *runner) EnsureDummyDevice(dummyDev string) (exist bool, err error) { + args := []string{"link", "add", dummyDev, "type", "dummy"} + out, err := runner.exec.Command(cmdIP, args...).CombinedOutput() + if err != nil { + // "exit status code 2" will be returned if the device already exists + if ee, ok := err.(utilexec.ExitError); ok { + if ee.Exited() && ee.ExitStatus() == 2 { + return true, nil + } + } + return false, fmt.Errorf("error creating dummy interface %q: %v: %s", dummyDev, err, out) + } + return false, nil +} + +// DeleteDummyDevice is part of Interface. +func (runner *runner) DeleteDummyDevice(dummyDev string) error { + args := []string{"link", "del", dummyDev} + out, err := runner.exec.Command(cmdIP, args...).CombinedOutput() + if err != nil { + return fmt.Errorf("error deleting dummy interface %q: %v: %s", dummyDev, err, out) + } + return nil +} + +// EnsureServiceAddressBind is part of Interface. +func (runner *runner) EnsureServiceAddressBind(serv *InternalService, dummyDev string) (exist bool, err error) { + addr := serv.Address.String() + "/32" + args := []string{"addr", "add", addr, "dev", dummyDev} + out, err := runner.exec.Command(cmdIP, args...).CombinedOutput() + if err != nil { + // "exit status 2" will be returned if the address is already bound to dummy device + if ee, ok := err.(utilexec.ExitError); ok { + if ee.Exited() && ee.ExitStatus() == 2 { + return true, nil + } + } + return false, fmt.Errorf("error bind address: %s to dummy interface: %s, err: %v: %s", serv.Address.String(), dummyDev, err, out) + } + return false, nil +} + +// UnBindServiceAddress is part of Interface. +func (runner *runner) UnBindServiceAddress(serv *InternalService, dummyDev string) error { + addr := serv.Address.String() + "/32" + args := []string{"addr", "del", addr, "dev", dummyDev} + out, err := runner.exec.Command(cmdIP, args...).CombinedOutput() + if err != nil { + return fmt.Errorf("error unbind address: %s from dummy interface: %s, err: %v: %s", serv.Address.String(), dummyDev, err, out) + } + return nil +} + +// AddService is part of Interface. +func (runner *runner) AddService(svc *InternalService) error { + eSvc, err := toExternalService(svc) + if err != nil { + return err + } + return runner.ipvsHandle.NewService(eSvc) +} + +// DeleteService is part of Interface. +func (runner *runner) DeleteService(svc *InternalService) error { + eSvc, err := toExternalService(svc) + if err != nil { + return err + } + return runner.ipvsHandle.DelService(eSvc) +} + +// GetService is part of Interface. +func (runner *runner) GetService(svc *InternalService) (*InternalService, error) { + eSvc, err := toExternalService(svc) + if err != nil { + return nil, err + } + ipvsService, err := runner.ipvsHandle.GetService(eSvc) + if err != nil { + return nil, err + } + intSvc, err := toInternalService(ipvsService) + if err != nil { + return nil, err + } + return intSvc, nil +} + +// GetServices is part of Interface. +func (runner *runner) GetServices() ([]*InternalService, error) { + ipvsServices, err := runner.ipvsHandle.GetServices() + if err != nil { + return nil, err + } + svcs := make([]*InternalService, 0) + for _, ipvsService := range ipvsServices { + svc, err := toInternalService(ipvsService) + if err != nil { + return nil, err + } + svcs = append(svcs, svc) + } + return svcs, nil +} + +// Flush is part of Interface. Currently we delete IPVS services one by one +func (runner *runner) Flush() error { + Services, err := runner.GetServices() + if err != nil { + return err + } + for _, service := range Services { + err := runner.DeleteService(service) + // TODO: aggregate errors? + if err != nil { + return err + } + } + return nil +} + +// AddDestination is part of Interface. +func (runner *runner) AddDestination(svc *InternalService, dst *InternalDestination) error { + eSvc, err := toExternalService(svc) + if err != nil { + return err + } + eDst, err := toExternalDestination(dst) + if err != nil { + return err + } + return runner.ipvsHandle.NewDestination(eSvc, eDst) +} + +// DeleteDestination is part of Interface. +func (runner *runner) DeleteDestination(svc *InternalService, dst *InternalDestination) error { + eSvc, err := toExternalService(svc) + if err != nil { + return err + } + eDst, err := toExternalDestination(dst) + if err != nil { + return err + } + return runner.ipvsHandle.DelDestination(eSvc, eDst) +} + +// GetDestinations is part of Interface. +func (runner *runner) GetDestinations(svc *InternalService) ([]*InternalDestination, error) { + eSvc, err := toExternalService(svc) + if err != nil { + return nil, err + } + eDestinations, err := runner.ipvsHandle.GetDestinations(eSvc) + if err != nil { + return nil, err + } + iDestinations := make([]*InternalDestination, 0) + for _, dest := range eDestinations { + dst, err := toInternalDestination(dest) + // TODO: aggregate errors? + if err != nil { + return nil, err + } + iDestinations = append(iDestinations, dst) + } + return iDestinations, nil +} + +// toInternalService converts an "external" IPVS service representation to the equivalent "internal" Service structure. +func toInternalService(svc *ipvs.Service) (*InternalService, error) { + if svc == nil { + return nil, errors.New("ipvs svc should not be empty") + } + interSvc := &InternalService{ + Address: svc.Address, + Port: svc.Port, + Scheduler: svc.SchedName, + Protocol: protocolNumbeToString(ProtoType(svc.Protocol)), + Flags: ServiceFlags(svc.Flags), + Timeout: svc.Timeout, + } + + if interSvc.Address == nil { + if svc.AddressFamily == syscall.AF_INET { + interSvc.Address = net.IPv4zero + } else { + interSvc.Address = net.IPv6zero + } + } + return interSvc, nil +} + +// toInternalService converts an "external" IPVS destination representation to the equivalent "internal" destination structure. +func toInternalDestination(dst *ipvs.Destination) (*InternalDestination, error) { + if dst == nil { + return nil, errors.New("ipvs destination should not be empty") + } + return &InternalDestination{ + Address: dst.Address, + Port: dst.Port, + Weight: dst.Weight, + }, nil +} + +// toInternalService converts an "internal" IPVS service representation to the equivalent "external" service structure. +func toExternalService(intSvc *InternalService) (*ipvs.Service, error) { + if intSvc == nil { + return nil, errors.New("service should not be empty") + } + extSvc := &ipvs.Service{ + Address: intSvc.Address, + Protocol: stringToProtocolNumber(intSvc.Protocol), + Port: intSvc.Port, + SchedName: intSvc.Scheduler, + Flags: uint32(intSvc.Flags), + Timeout: intSvc.Timeout, + } + + if ip4 := intSvc.Address.To4(); ip4 != nil { + extSvc.AddressFamily = syscall.AF_INET + extSvc.Netmask = 0xffffffff + } else { + extSvc.AddressFamily = syscall.AF_INET6 + extSvc.Netmask = 128 + } + return extSvc, nil +} + +// toExternalDestination converts an "internal" IPVS destination representation to the equivalent "external" destination structure. +func toExternalDestination(dst *InternalDestination) (*ipvs.Destination, error) { + if dst == nil { + return nil, errors.New("destination should not be empty") + } + return &ipvs.Destination{ + Address: dst.Address, + Port: dst.Port, + Weight: dst.Weight, + }, nil +} + +// stringToProtocolNumber returns the protocol value for the given name +func stringToProtocolNumber(protocol string) uint16 { + switch strings.ToLower(protocol) { + case "tcp": + return uint16(syscall.IPPROTO_TCP) + case "udp": + return uint16(syscall.IPPROTO_UDP) + } + return uint16(0) +} + +// protocolNumbeToString returns the name for the given protocol value. +func protocolNumbeToString(proto ProtoType) string { + switch proto { + case syscall.IPPROTO_TCP: + return "TCP" + case syscall.IPPROTO_UDP: + return "UDP" + } + return "" +} + +// ProtoType is IPVS service protocol type +type ProtoType uint16 diff --git a/pkg/util/ipvs/ipvs_linux_test.go b/pkg/util/ipvs/ipvs_linux_test.go new file mode 100644 index 0000000000000..75bdfd2874f4e --- /dev/null +++ b/pkg/util/ipvs/ipvs_linux_test.go @@ -0,0 +1,522 @@ +// +build linux + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "net" + "reflect" + "syscall" + "testing" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/exec" + fakeexec "k8s.io/utils/exec/testing" + + "github.com/docker/libnetwork/ipvs" +) + +const dummyDevice = "kube-ipvs0" + +func TestEnsureDummyDevice(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success. + func() ([]byte, error) { return []byte{}, nil }, + // Exists. + func() ([]byte, error) { return nil, &fakeexec.FakeExitError{Status: 2} }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success. + exists, err := runner.EnsureDummyDevice(dummyDevice) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if exists { + t.Errorf("expected exists = false") + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "link", "add", "kube-ipvs0", "type", "dummy") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Exists. + exists, err = runner.EnsureDummyDevice(dummyDevice) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if !exists { + t.Errorf("expected exists = true") + } +} + +func TestDeleteDummyDevice(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success. + func() ([]byte, error) { return []byte{}, nil }, + // Failure. + func() ([]byte, error) { return nil, &fakeexec.FakeExitError{Status: 1} }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success. + err := runner.DeleteDummyDevice(dummyDevice) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "link", "del", "kube-ipvs0") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Failure. + err = runner.DeleteDummyDevice(dummyDevice) + if err == nil { + t.Errorf("expected failure") + } +} + +func TestEnsureServiceAddressBind(t *testing.T) { + svc := &InternalService{ + Address: net.ParseIP("10.20.30.40"), + Port: uint16(1234), + Protocol: string("TCP"), + } + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success. + func() ([]byte, error) { return []byte{}, nil }, + // Exists. + func() ([]byte, error) { return nil, &fakeexec.FakeExitError{Status: 2} }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success. + exists, err := runner.EnsureServiceAddressBind(svc, dummyDevice) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if exists { + t.Errorf("expected exists = false") + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "addr", "add", "10.20.30.40/32", "dev", "kube-ipvs0") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Exists. + exists, err = runner.EnsureServiceAddressBind(svc, dummyDevice) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if !exists { + t.Errorf("expected exists = true") + } +} + +func TestUnBindServiceAddress(t *testing.T) { + svc := &InternalService{ + Address: net.ParseIP("10.20.30.41"), + Port: uint16(80), + Protocol: string("TCP"), + } + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success. + func() ([]byte, error) { return []byte{}, nil }, + // Failure. + func() ([]byte, error) { return nil, &fakeexec.FakeExitError{Status: 2} }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success. + err := runner.UnBindServiceAddress(svc, dummyDevice) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ip", "addr", "del", "10.20.30.41/32", "dev", "kube-ipvs0") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Failure. + err = runner.UnBindServiceAddress(svc, dummyDevice) + if err == nil { + t.Errorf("expected failure") + } +} + +func Test_toInternalService(t *testing.T) { + Tests := []struct { + ipvsService ipvs.Service + internalService InternalService + }{ + { + ipvs.Service{ + Protocol: syscall.IPPROTO_TCP, + Port: 80, + FWMark: 0, + SchedName: "", + Flags: 0, + Timeout: 0, + Netmask: 0xffffffff, + AddressFamily: syscall.AF_INET, + Address: nil, + PEName: "", + }, + InternalService{ + Address: net.ParseIP("0.0.0.0"), + Protocol: "TCP", + Port: 80, + Scheduler: "", + Flags: 0, + Timeout: 0, + }, + }, + { + ipvs.Service{ + Protocol: syscall.IPPROTO_UDP, + Port: 33434, + FWMark: 0, + SchedName: "wlc", + Flags: 1234, + Timeout: 100, + Netmask: 128, + AddressFamily: syscall.AF_INET6, + Address: net.ParseIP("2012::beef"), + PEName: "", + }, + InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "UDP", + Port: 33434, + Scheduler: "wlc", + Flags: 1234, + Timeout: 100, + }, + }, + { + ipvs.Service{ + Protocol: 0, + Port: 0, + FWMark: 0, + SchedName: "lc", + Flags: 0, + Timeout: 0, + Netmask: 0xffffffff, + AddressFamily: syscall.AF_INET, + Address: net.ParseIP("1.2.3.4"), + PEName: "", + }, + InternalService{ + Address: net.ParseIP("1.2.3.4"), + Protocol: "", + Port: 0, + Scheduler: "lc", + Flags: 0, + Timeout: 0, + }, + }, + { + ipvs.Service{ + Protocol: 0, + Port: 0, + FWMark: 0, + SchedName: "wrr", + Flags: 0, + Timeout: 0, + Netmask: 128, + AddressFamily: syscall.AF_INET6, + Address: nil, + PEName: "", + }, + InternalService{ + Address: net.ParseIP("::0"), + Protocol: "", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + }, + } + + for i := range Tests { + got, err := toInternalService(&Tests[i].ipvsService) + if err != nil { + t.Errorf("case: %d, unexpected error: %v", i, err) + } + if !reflect.DeepEqual(*got, Tests[i].internalService) { + t.Errorf("case: %d, got %#v, want %#v", i, *got, Tests[i].internalService) + } + } +} + +func Test_toExternalService(t *testing.T) { + Tests := []struct { + ipvsService ipvs.Service + internalService InternalService + }{ + { + ipvs.Service{ + Protocol: syscall.IPPROTO_TCP, + Port: 80, + FWMark: 0, + SchedName: "", + Flags: 0, + Timeout: 0, + Netmask: 0xffffffff, + AddressFamily: syscall.AF_INET, + Address: net.ParseIP("0.0.0.0"), + PEName: "", + }, + InternalService{ + Address: net.ParseIP("0.0.0.0"), + Protocol: "TCP", + Port: 80, + Scheduler: "", + Flags: 0, + Timeout: 0, + }, + }, + { + ipvs.Service{ + Protocol: syscall.IPPROTO_UDP, + Port: 33434, + FWMark: 0, + SchedName: "wlc", + Flags: 1234, + Timeout: 100, + Netmask: 128, + AddressFamily: syscall.AF_INET6, + Address: net.ParseIP("2012::beef"), + PEName: "", + }, + InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "UDP", + Port: 33434, + Scheduler: "wlc", + Flags: 1234, + Timeout: 100, + }, + }, + { + ipvs.Service{ + Protocol: 0, + Port: 0, + FWMark: 0, + SchedName: "lc", + Flags: 0, + Timeout: 0, + Netmask: 0xffffffff, + AddressFamily: syscall.AF_INET, + Address: net.ParseIP("1.2.3.4"), + PEName: "", + }, + InternalService{ + Address: net.ParseIP("1.2.3.4"), + Protocol: "", + Port: 0, + Scheduler: "lc", + Flags: 0, + Timeout: 0, + }, + }, + { + ipvs.Service{ + Protocol: 0, + Port: 0, + FWMark: 0, + SchedName: "wrr", + Flags: 0, + Timeout: 0, + Netmask: 128, + AddressFamily: syscall.AF_INET6, + Address: net.ParseIP("::0"), + PEName: "", + }, + InternalService{ + Address: net.ParseIP("::0"), + Protocol: "", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + }, + } + + for i := range Tests { + got, err := toExternalService(&Tests[i].internalService) + if err != nil { + t.Errorf("case: %d, unexpected error: %v", i, err) + } + if !reflect.DeepEqual(*got, Tests[i].ipvsService) { + t.Errorf("case: %d - got %#v, want %#v", i, *got, Tests[i].ipvsService) + } + } +} + +func Test_toInternalDestination(t *testing.T) { + Tests := []struct { + ipvsDestination ipvs.Destination + internalDestination InternalDestination + }{ + { + ipvs.Destination{ + Port: 54321, + ConnectionFlags: 0, + Weight: 1, + Address: net.ParseIP("1.2.3.4"), + }, + InternalDestination{ + Address: net.ParseIP("1.2.3.4"), + Port: 54321, + Weight: 1, + }, + }, + { + ipvs.Destination{ + Port: 53, + ConnectionFlags: 0, + Weight: 1, + Address: net.ParseIP("2002::cafe"), + }, + InternalDestination{ + Address: net.ParseIP("2002::cafe"), + Port: 53, + Weight: 1, + }, + }, + } + for i := range Tests { + got, err := toInternalDestination(&Tests[i].ipvsDestination) + if err != nil { + t.Errorf("case %d unexpected error: %d", i, err) + } + if !reflect.DeepEqual(*got, Tests[i].internalDestination) { + t.Errorf("case %d Failed to translate Destination - got %#v, want %#v", i, *got, Tests[i].internalDestination) + } + } +} + +func Test_toExternalDestination(t *testing.T) { + Tests := []struct { + internalDestination InternalDestination + ipvsDestination ipvs.Destination + }{ + { + InternalDestination{ + Address: net.ParseIP("1.2.3.4"), + Port: 54321, + Weight: 1, + }, + ipvs.Destination{ + Port: 54321, + ConnectionFlags: 0, + Weight: 1, + Address: net.ParseIP("1.2.3.4"), + }, + }, + { + InternalDestination{ + Address: net.ParseIP("2002::cafe"), + Port: 53, + Weight: 1, + }, + ipvs.Destination{ + Port: 53, + ConnectionFlags: 0, + Weight: 1, + Address: net.ParseIP("2002::cafe"), + }, + }, + } + for i := range Tests { + got, err := toExternalDestination(&Tests[i].internalDestination) + if err != nil { + t.Errorf("case %d unexpected error: %d", i, err) + } + if !reflect.DeepEqual(*got, Tests[i].ipvsDestination) { + t.Errorf("case %d Failed to translate Destination - got %#v, want %#v", i, *got, Tests[i].ipvsDestination) + } + } +} + +func Test_stringToProtocolNumber(t *testing.T) { + tests := []string{ + "TCP", "UDP", "ICMP", + } + expecteds := []uint16{ + uint16(syscall.IPPROTO_TCP), uint16(syscall.IPPROTO_UDP), uint16(0), + } + for i := range tests { + got := stringToProtocolNumber(tests[i]) + if got != expecteds[i] { + t.Errorf("stringToProtocolNumber() failed - got %#v, want %#v", + got, expecteds[i]) + } + } +} + +func Test_protocolNumberToString(t *testing.T) { + tests := []ProtoType{ + syscall.IPPROTO_TCP, syscall.IPPROTO_UDP, ProtoType(0), + } + expecteds := []string{ + "TCP", "UDP", "", + } + for i := range tests { + got := protocolNumbeToString(tests[i]) + if got != expecteds[i] { + t.Errorf("protocolNumbeToString() failed - got %#v, want %#v", + got, expecteds[i]) + } + } +} diff --git a/pkg/util/ipvs/ipvs_test.go b/pkg/util/ipvs/ipvs_test.go new file mode 100644 index 0000000000000..0b3e1d9da6702 --- /dev/null +++ b/pkg/util/ipvs/ipvs_test.go @@ -0,0 +1,245 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "net" + "testing" +) + +func TestInternalServiceEqual(t *testing.T) { + Tests := []struct { + svcA *InternalService + svcB *InternalService + equal bool + reason string + }{ + { + svcA: &InternalService{ + Address: net.ParseIP("10.20.30.40"), + Protocol: "", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + svcB: &InternalService{ + Address: net.ParseIP("10.20.30.41"), + Protocol: "", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + equal: false, + reason: "IPv4 address not equal", + }, + { + svcA: &InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + svcB: &InternalService{ + Address: net.ParseIP("2017::beef"), + Protocol: "", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + equal: false, + reason: "IPv6 address not equal", + }, + { + svcA: &InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "TCP", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + svcB: &InternalService{ + Address: net.ParseIP("2012::beeef"), + Protocol: "UDP", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + equal: false, + reason: "Protocol not equal", + }, + { + svcA: &InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "TCP", + Port: 80, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + svcB: &InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "TCP", + Port: 8080, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + equal: false, + reason: "Port not equal", + }, + { + svcA: &InternalService{ + Address: net.ParseIP("1.2.3.4"), + Protocol: "TCP", + Port: 80, + Scheduler: "rr", + Flags: 0, + Timeout: 0, + }, + svcB: &InternalService{ + Address: net.ParseIP("1.2.3.4"), + Protocol: "TCP", + Port: 80, + Scheduler: "wlc", + Flags: 0, + Timeout: 0, + }, + equal: false, + reason: "Scheduler not equal", + }, + { + svcA: &InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 0, + }, + svcB: &InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "", + Port: 0, + Scheduler: "wrr", + Flags: 0, + Timeout: 10800, + }, + equal: false, + reason: "Timeout not equal", + }, + { + svcA: &InternalService{ + Address: net.ParseIP("1.2.3.4"), + Protocol: "TCP", + Port: 80, + Scheduler: "rr", + Flags: 0x1, + Timeout: 10800, + }, + svcB: &InternalService{ + Address: net.ParseIP("1.2.3.4"), + Protocol: "TCP", + Port: 80, + Scheduler: "rr", + Flags: 0x1, + Timeout: 10800, + }, + equal: true, + reason: "All fields equal", + }, + } + + for i := range Tests { + equal := Tests[i].svcA.Equal(Tests[i].svcB) + if equal != Tests[i].equal { + t.Errorf("case: %d got %v, expected %v, reason: %s", i, equal, Tests[i].equal, Tests[i].reason) + } + } +} + +func TestInternalServiceString(t *testing.T) { + Tests := []struct { + svc *InternalService + expected string + }{ + { + svc: &InternalService{ + Address: net.ParseIP("10.20.30.40"), + Protocol: "TCP", + Port: 80, + }, + expected: "10.20.30.40:80/TCP", + }, + { + svc: &InternalService{ + Address: net.ParseIP("2012::beef"), + Protocol: "UDP", + Port: 8080, + }, + expected: "[2012::beef]:8080/UDP", + }, + { + svc: &InternalService{ + Address: net.ParseIP("10.20.30.41"), + Protocol: "ESP", + Port: 1234, + }, + expected: "10.20.30.41:1234/ESP", + }, + } + + for i := range Tests { + if Tests[i].expected != Tests[i].svc.String() { + t.Errorf("case: %d got %v, expected %v", i, Tests[i].svc.String(), Tests[i].expected) + } + } +} + +func TestInternalDestinationString(t *testing.T) { + Tests := []struct { + svc *InternalDestination + expected string + }{ + { + svc: &InternalDestination{ + Address: net.ParseIP("10.20.30.40"), + Port: 80, + }, + expected: "10.20.30.40:80", + }, + { + svc: &InternalDestination{ + Address: net.ParseIP("2012::beef"), + Port: 8080, + }, + expected: "[2012::beef]:8080", + }, + } + + for i := range Tests { + if Tests[i].expected != Tests[i].svc.String() { + t.Errorf("case: %d got %v, expected %v", i, Tests[i].svc.String(), Tests[i].expected) + } + } +} diff --git a/pkg/util/ipvs/ipvs_unsupported.go b/pkg/util/ipvs/ipvs_unsupported.go new file mode 100644 index 0000000000000..89b035b7596e1 --- /dev/null +++ b/pkg/util/ipvs/ipvs_unsupported.go @@ -0,0 +1,83 @@ +// +build !linux + +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "fmt" + + utilexec "k8s.io/utils/exec" +) + +// New returns a dummy Interface for unsupported platform. +func New(utilexec.Interface) Interface { + return &runner{} +} + +type runner struct { +} + +func (runner *runner) Flush() error { + return fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) EnsureDummyDevice(string) (bool, error) { + return false, fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) DeleteDummyDevice(string) error { + return fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) EnsureServiceAddressBind(*InternalService, string) (bool, error) { + return false, fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) UnBindServiceAddress(*InternalService, string) error { + return fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) AddService(*InternalService) error { + return fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) DeleteService(*InternalService) error { + return fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) GetService(*InternalService) (*InternalService, error) { + return nil, fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) GetServices() ([]*InternalService, error) { + return nil, fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) AddDestination(*InternalService, *InternalDestination) error { + return fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) GetDestinations(*InternalService) ([]*InternalDestination, error) { + return nil, fmt.Errorf("IPVS not supported for this platform") +} + +func (runner *runner) DeleteDestination(*InternalService, *InternalDestination) error { + return fmt.Errorf("IPVS not supported for this platform") +} + +var _ = Interface(&runner{}) diff --git a/pkg/util/ipvs/testing/BUILD b/pkg/util/ipvs/testing/BUILD new file mode 100644 index 0000000000000..199a1068780e1 --- /dev/null +++ b/pkg/util/ipvs/testing/BUILD @@ -0,0 +1,28 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_library", +) + +go_library( + name = "go_default_library", + srcs = ["fake.go"], + tags = ["automanaged"], + deps = ["//pkg/util/ipvs:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], +) diff --git a/pkg/util/ipvs/testing/fake.go b/pkg/util/ipvs/testing/fake.go new file mode 100644 index 0000000000000..0df74f0fc937f --- /dev/null +++ b/pkg/util/ipvs/testing/fake.go @@ -0,0 +1,186 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + + utilipvs "k8s.io/kubernetes/pkg/util/ipvs" +) + +//FakeIPVS no-op implementation of ipvs Interface +type FakeIPVS struct { + Scheduler string + Services map[serviceKey]*utilipvs.InternalService + Destinations map[serviceKey][]*utilipvs.InternalDestination +} + +type serviceKey struct { + IP string + Port uint16 + Protocol string +} + +func (s *serviceKey) String() string { + return fmt.Sprintf("%s:%d/%s", s.IP, s.Port, s.Protocol) +} + +//NewFake creates a fake ipvs strucuter +func NewFake() *FakeIPVS { + return &FakeIPVS{ + Services: make(map[serviceKey]*utilipvs.InternalService), + Destinations: make(map[serviceKey][]*utilipvs.InternalDestination), + } +} + +func toServiceKey(serv *utilipvs.InternalService) serviceKey { + return serviceKey{ + IP: serv.Address.To4().String(), + Port: serv.Port, + Protocol: serv.Protocol, + } +} + +//EnsureDummyDevice creates dummy device +func (*FakeIPVS) EnsureDummyDevice(dev string) (exist bool, err error) { + return true, nil +} + +//DeleteDummyDevice deletes a dummy device +func (*FakeIPVS) DeleteDummyDevice(dev string) error { + return nil +} + +//EnsureServiceAddressBind is a fake implementation +func (*FakeIPVS) EnsureServiceAddressBind(serv *utilipvs.InternalService, dev string) (exist bool, err error) { + return true, nil +} + +//UnBindServiceAddress is a fake implementation +func (*FakeIPVS) UnBindServiceAddress(serv *utilipvs.InternalService, dev string) error { + return nil +} + +//AddService is a fake implementation +func (f *FakeIPVS) AddService(serv *utilipvs.InternalService) error { + if serv == nil { + return fmt.Errorf("Failed to add service: service can't be nil") + } + key := toServiceKey(serv) + f.Services[key] = serv + // make sure no destination present when creating new service + f.Destinations = make(map[serviceKey][]*utilipvs.InternalDestination) + return nil +} + +//UpdateService is a fake implementation +func (f *FakeIPVS) UpdateService(serv *utilipvs.InternalService) error { + if serv == nil { + return fmt.Errorf("Failed to update service, service can't be nil") + } + return nil +} + +//DeleteService is a fake implementation +func (f *FakeIPVS) DeleteService(serv *utilipvs.InternalService) error { + if serv == nil { + return fmt.Errorf("Failed to delete service: service can't be nil") + } + key := toServiceKey(serv) + delete(f.Services, key) + // clear specific destinations as well + f.Destinations[key] = nil + return nil +} + +//GetService is a fake implementation +func (f *FakeIPVS) GetService(serv *utilipvs.InternalService) (*utilipvs.InternalService, error) { + if serv == nil { + return nil, fmt.Errorf("Failed to get service: service can't be nil") + } + key := toServiceKey(serv) + svc, found := f.Services[key] + if found { + return svc, nil + } + return nil, fmt.Errorf("Not found serv: %v", key.String()) +} + +//GetServices is a fake implementation +func (f *FakeIPVS) GetServices() ([]*utilipvs.InternalService, error) { + res := make([]*utilipvs.InternalService, 0) + for _, svc := range f.Services { + res = append(res, svc) + } + return res, nil +} + +//Flush is a fake implementation +func (f *FakeIPVS) Flush() error { + // directly drop old data + f.Services = nil + f.Destinations = nil + return nil +} + +//AddDestination is a fake implementation +func (f *FakeIPVS) AddDestination(serv *utilipvs.InternalService, dest *utilipvs.InternalDestination) error { + if serv == nil || dest == nil { + return fmt.Errorf("Failed to add destination for service, neither service nor destination shouldn't be nil") + } + key := toServiceKey(serv) + if _, ok := f.Services[key]; !ok { + return fmt.Errorf("Failed to add destination for service %v, service not found", key.String()) + } + dests := f.Destinations[key] + if dests == nil { + dests = make([]*utilipvs.InternalDestination, 0) + f.Destinations[key] = dests + } + f.Destinations[key] = append(f.Destinations[key], dest) + return nil +} + +//GetDestinations is a fake implementation +func (f *FakeIPVS) GetDestinations(serv *utilipvs.InternalService) ([]*utilipvs.InternalDestination, error) { + if serv == nil { + return nil, fmt.Errorf("Failed to get destination for nil service") + } + key := toServiceKey(serv) + if _, ok := f.Services[key]; !ok { + return nil, fmt.Errorf("Failed to get destinations for service %v, service not found", key.String()) + } + return f.Destinations[key], nil +} + +//UpdateDestination is a fake implementation +func (*FakeIPVS) UpdateDestination(serv *utilipvs.InternalService, dest *utilipvs.InternalDestination) error { + if serv == nil || dest == nil { + return fmt.Errorf("Failed to update destination, neither service nor destination can't be nil") + } + return nil +} + +//DeleteDestination is a fake implementation +func (*FakeIPVS) DeleteDestination(serv *utilipvs.InternalService, dest *utilipvs.InternalDestination) error { + if serv == nil || dest == nil { + return fmt.Errorf("Failed to delete destination, neither service nor destination can't be nil") + } + return nil +} + +var _ = utilipvs.Interface(&FakeIPVS{})