Skip to content

Commit

Permalink
Updated comments
Browse files Browse the repository at this point in the history
Updated documentation
Fixed e2e test
  • Loading branch information
Kenneth Shelton committed Jan 5, 2016
1 parent d399a8f commit 9e6c45c
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 88 deletions.
5 changes: 0 additions & 5 deletions docs/user-guide/services.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,6 @@ Valid values for the `ServiceType` field are:
which forwards to the `Service` exposed as a `<NodeIP>:NodePort`
for each Node.

Note that while `NodePort`s can be TCP or UDP, `LoadBalancer`s only support TCP
as of Kubernetes 1.0.

### Type NodePort

If you set the `type` field to `"NodePort"`, the Kubernetes master will
Expand Down Expand Up @@ -537,8 +534,6 @@ This makes some kinds of firewalling impossible. The iptables proxier does not
obscure in-cluster source IPs, but it does still impact clients coming through
a load-balancer or node-port.

LoadBalancers only support TCP, not UDP.

The `Type` field is designed as nested functionality - each level adds to the
previous. This is not strictly required on all cloud providers (e.g. Google Compute Engine does
not need to allocate a `NodePort` to make `LoadBalancer` work, but AWS does)
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,7 +1458,7 @@ func ValidateService(service *api.Service) field.ErrorList {
for i := range service.Spec.Ports {
portPath := portsPath.Index(i)
if !supportedPortProtocols.Has(string(service.Spec.Ports[i].Protocol)) {
allErrs = append(allErrs, validation.NewInvalidError(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP ports"))
allErrs = append(allErrs, field.Invalid(portPath.Child("protocol"), service.Spec.Ports[i].Protocol, "cannot create an external load balancer with non-TCP/UDP ports"))
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cloudprovider/providers/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP n
}

// The service controller verified all the protocols match on the ports, just check and use the first one
// TODO: Convert all error messages to use an event recorder
if ports[0].Protocol != api.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for openstack load balancers")
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/controller/service/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,9 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error {
return err
}
name := s.loadBalancerName(service)
// getPortsForLB already verified that the protocol matches for all ports.
// The cloud provider will verify the protocol is supported
// - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
if err != nil {
Expand Down Expand Up @@ -494,6 +495,7 @@ func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
if protocol == "" {
protocol = sp.Protocol
} else if protocol != sp.Protocol && wantsLoadBalancer(service) {
// TODO: Convert error messages to use event recorder
return nil, fmt.Errorf("mixed protocol external load balancers are not supported.")
}
}
Expand Down
32 changes: 0 additions & 32 deletions test/e2e/cluster_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,38 +200,6 @@ var _ = Describe("Cluster Upgrade [Skipped]", func() {
BeforeEach(func() {
SkipUnlessProviderIs("gce")
})
// The version is determined once at the beginning of the test so that
// the master and nodes won't be skewed if the value changes during the
// test.
By(fmt.Sprintf("Getting real version for %q", testContext.UpgradeTarget))
var err error
v, err = realVersion(testContext.UpgradeTarget)
expectNoError(err)
Logf("Version for %q is %q", testContext.UpgradeTarget, v)

By("Setting up the service, RC, and pods")
f.beforeEach()
w = NewServerTest(f.Client, f.Namespace.Name, svcName)
rc := w.CreateWebserverRC(replicas)
rcName = rc.ObjectMeta.Name
svc := w.BuildServiceSpec()
svc.Spec.Type = api.ServiceTypeLoadBalancer
w.CreateService(svc)

By("Waiting for the service to become reachable")
result, err := waitForLoadBalancerIngress(f.Client, svcName, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
ingresses := result.Status.LoadBalancer.Ingress
if len(ingresses) != 1 {
Failf("Was expecting only 1 ingress IP but got %d (%v): %v", len(ingresses), ingresses, result)
}
ingress = ingresses[0]
Logf("Got load balancer ingress point %v", ingress)
ip = ingress.IP
if ip == "" {
ip = ingress.Hostname
}
testLoadBalancerReachable(ingress, 80)

It("of master should maintain responsive services", func() {
By("Validating cluster before master upgrade")
Expand Down
118 changes: 70 additions & 48 deletions test/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ var _ = Describe("Services", func() {
svc2 := t2.BuildServiceSpec()
svc2.Spec.Type = api.ServiceTypeLoadBalancer
svc2.Spec.Ports[0].Port = servicePort
// Let this one be UDP so that we can test that as well without an additional test
// UDP loadbalancing is tested via test NetcatTest
svc2.Spec.Ports[0].Protocol = api.ProtocolUDP
svc2.Spec.Ports[0].TargetPort = intstr.FromInt(80)
svc2.Spec.LoadBalancerIP = loadBalancerIP
Expand Down Expand Up @@ -1171,22 +1171,33 @@ func testLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool {
return testLoadBalancerReachableInTime(ingress, port, podStartTimeout)
}

func testNetcatLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) {
func testNetcatLoadBalancerReachable(ingress api.LoadBalancerIngress, port int) bool {
return testNetcatLoadBalancerReachableInTime(ingress, port, podStartTimeout)
}

func conditionFuncDecorator(ip string, port int, fn func(string, int) (bool, error)) wait.ConditionFunc {
return func() (bool, error) {
return fn(ip, port)
}
}

func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool {
ip := ingress.IP
if ip == "" {
ip = ingress.Hostname
}

testNetcatReachable(ip, port)
return testReachableInTime(conditionFuncDecorator(ip, port, testReachable), timeout)

}

func testLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool {
func testNetcatLoadBalancerReachableInTime(ingress api.LoadBalancerIngress, port int, timeout time.Duration) bool {
ip := ingress.IP
if ip == "" {
ip = ingress.Hostname
}

return testReachableInTime(ip, port, timeout)
return testReachableInTime(conditionFuncDecorator(ip, port, testNetcatReachable), timeout)
}

func testLoadBalancerNotReachable(ingress api.LoadBalancerIngress, port int) {
Expand All @@ -1198,72 +1209,83 @@ func testLoadBalancerNotReachable(ingress api.LoadBalancerIngress, port int) {
testNotReachable(ip, port)
}

func testReachable(ip string, port int) bool {
return testReachableInTime(ip, port, podStartTimeout)
func testReachable(ip string, port int) (bool, error) {
url := fmt.Sprintf("http://%s:%d", ip, port)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", url)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", url)
return false, nil
}

Logf("Testing reachability of %v", url)

resp, err := httpGetNoConnectionPool(url)
if err != nil {
Logf("Got error waiting for reachability of %s: %v", url, err)
return false, nil
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
Logf("Got error reading response from %s: %v", url, err)
return false, nil
}
if resp.StatusCode != 200 {
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body))
}
if !strings.Contains(string(body), "test-webserver") {
return false, fmt.Errorf("received response body without expected substring 'test-webserver': %s", string(body))
}
Logf("Successfully reached %v", url)
return true, nil
}

func testNetcatReachable(ip string, port int) {
func testNetcatReachable(ip string, port int) (bool, error) {
uri := fmt.Sprintf("udp://%s:%d", ip, port)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", uri)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", uri)
return false, nil
}

Logf("Testing reachability of %v", uri)

con, err := net.Dial("udp", ip+":"+string(port))
if err != nil {
Failf("Failed to connect to: %s:%d (%s)", ip, port, err.Error())
return false, fmt.Errorf("Failed to connect to: %s:%d (%s)", ip, port, err.Error())
}

_, err = con.Write([]byte("\n"))
if err != nil {
Failf("Failed to send newline: %s", err.Error())
return false, fmt.Errorf("Failed to send newline: %s", err.Error())
}

var buf []byte = make([]byte, len("SUCCESS")+1)

_, err = con.Read(buf)
if err != nil {
Failf("Failed to read result: %s", err.Error())
return false, fmt.Errorf("Failed to read result: %s", err.Error())
}

if !strings.HasPrefix(string(buf), "SUCCESS") {
Failf("Failed to retrieve: \"SUCCESS\"")
return false, fmt.Errorf("Failed to retrieve: \"SUCCESS\"")
}

Logf("Successfully retrieved \"SUCCESS\"")
return true, nil
}

func testReachableInTime(ip string, port int, timeout time.Duration) bool {
url := fmt.Sprintf("http://%s:%d", ip, port)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", url)
return false
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", url)
return false
}

desc := fmt.Sprintf("the url %s to be reachable", url)
By(fmt.Sprintf("Waiting up to %v for %s", timeout, desc))
start := time.Now()
err := wait.PollImmediate(poll, timeout, func() (bool, error) {
resp, err := httpGetNoConnectionPool(url)
if err != nil {
Logf("Got error waiting for reachability of %s: %v (%v)", url, err, time.Since(start))
return false, nil
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
Logf("Got error reading response from %s: %v", url, err)
return false, nil
}
if resp.StatusCode != 200 {
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", resp.Status, url, string(body))
}
if !strings.Contains(string(body), "test-webserver") {
return false, fmt.Errorf("received response body without expected substring 'test-webserver': %s", string(body))
}
Logf("Successfully reached %v", url)
return true, nil
})
func testReachableInTime(testFunc wait.ConditionFunc, timeout time.Duration) bool {
By(fmt.Sprintf("Waiting up to %v", timeout))
err := wait.PollImmediate(poll, timeout, testFunc)
if err != nil {
Expect(err).NotTo(HaveOccurred(), "Error waiting for %s", desc)
Expect(err).NotTo(HaveOccurred(), "Error waiting")
return false
}
return true
Expand Down

0 comments on commit 9e6c45c

Please sign in to comment.