Skip to content

Commit

Permalink
Fix bug in kube-proxy of not updating iptables rules if a service's
Browse files Browse the repository at this point in the history
public IPs change, and add tests to catch the bug.
  • Loading branch information
a-robinson committed Mar 30, 2015
1 parent e91b77c commit 151b871
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
4 changes: 2 additions & 2 deletions pkg/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,10 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
info, exists := proxier.getServiceInfo(serviceName)
serviceIP := net.ParseIP(service.Spec.PortalIP)
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) {
if exists && info.portalPort == service.Spec.Port && info.portalIP.Equal(serviceIP) && ipsEqual(service.Spec.PublicIPs, info.publicIP) {
continue
}
if exists && (info.portalPort != service.Spec.Port || !info.portalIP.Equal(serviceIP) || !ipsEqual(service.Spec.PublicIPs, info.publicIP)) {
if exists {
glog.V(4).Infof("Something changed for service %q: stopping it", serviceName.String())
err := proxier.closePortal(serviceName, info)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions pkg/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,46 @@ func TestUDPProxyUpdatePort(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
}

func TestProxyUpdatePublicIPs(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
}},
},
})

p := CreateProxier(lb, net.ParseIP("0.0.0.0"), &fakeIptables{}, net.ParseIP("127.0.0.1"))
waitForNumProxyLoops(t, p, 0)

svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
waitForNumProxyLoops(t, p, 1)

p.OnUpdate([]api.Service{
{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{Port: svcInfo.portalPort, Protocol: "TCP", PortalIP: svcInfo.portalIP.String(), PublicIPs: []string{"4.3.2.1"}}, Status: api.ServiceStatus{}},
})
// Wait for the socket to actually get free.
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo(service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
testEchoTCP(t, "127.0.0.1", svcInfo.proxyPort)
// This is a bit async, but this should be sufficient.
time.Sleep(500 * time.Millisecond)
waitForNumProxyLoops(t, p, 1)
}

func TestProxyUpdatePortal(t *testing.T) {
lb := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "echo")
Expand Down

0 comments on commit 151b871

Please sign in to comment.