Skip to content

Commit

Permalink
add test for udp connection flush
Browse files Browse the repository at this point in the history
  • Loading branch information
freehan committed Apr 18, 2016
1 parent 4fa6f38 commit ad8c677
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 14 deletions.
28 changes: 14 additions & 14 deletions pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ const kubeMarkMasqChain utiliptables.Chain = "KUBE-MARK-MASQ"
// TODO(thockin): Remove this for v1.3 or v1.4.
const oldIptablesMasqueradeMark = "0x4d415351"

const noConnectionToDelete = "0 flow entries have been deleted"

// IptablesVersioner can query the current iptables version.
type IptablesVersioner interface {
// returns "X.Y.Z"
Expand Down Expand Up @@ -439,20 +437,20 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
}
}

staleUDPService := sets.NewString()
staleUDPServices := sets.NewString()
// Remove services missing from the update.
for name := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1).Infof("Removing service %q", name)
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
staleUDPService.Insert(proxier.serviceMap[name].clusterIP.String())
staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String())
}
delete(proxier.serviceMap, name)
}
}

proxier.syncProxyRules()
proxier.deleteServiceConnection(staleUDPService.List())
proxier.deleteServiceConnections(staleUDPServices.List())

}

Expand Down Expand Up @@ -519,7 +517,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
}

proxier.syncProxyRules()
proxier.deleteEndpointConnection(staleConnections)
proxier.deleteEndpointConnections(staleConnections)
}

// used in OnEndpointsUpdate
Expand Down Expand Up @@ -585,17 +583,19 @@ type endpointServicePair struct {
servicePortName proxy.ServicePortName
}

const noConnectionToDelete = "0 flow entries have been deleted"

// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
func (proxier *Proxier) deleteEndpointConnection(connectionMap map[endpointServicePair]bool) {
func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
for epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
endpointIP := strings.Split(epSvcPair.endpoint, ":")[0]
glog.V(2).Infof("Deleting connection to service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
Expand All @@ -604,13 +604,13 @@ func (proxier *Proxier) deleteEndpointConnection(connectionMap map[endpointServi
}
}

// deleteServiceConnection use conntrack-tool to delete udp connection specified by service ip
func (proxier *Proxier) deleteServiceConnection(svcIPs []string) {
// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip
func (proxier *Proxier) deleteServiceConnections(svcIPs []string) {
for _, ip := range svcIPs {
glog.V(2).Infof("Deleting udp connection to service IP %s", ip)
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TO DO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
Expand All @@ -626,7 +626,7 @@ func (proxier *Proxier) execConntrackTool(parameters ...string) error {
}
output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("Conntrack command returns: %s, error message: %s", string(output), err)
return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err)
}
return nil
}
Expand Down
218 changes: 218 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ package iptables
import (
"testing"

"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"net"
"strings"
)

func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
Expand Down Expand Up @@ -156,4 +163,215 @@ func TestGetChainLinesMultipleTables(t *testing.T) {
checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected)
}

func TestGetRemovedEndpoints(t *testing.T) {
testCases := []struct {
currentEndpoints []string
newEndpoints []string
removedEndpoints []string
}{
{
currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
removedEndpoints: []string{},
},
{
currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.3:80"},
newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
removedEndpoints: []string{"10.0.2.3:80"},
},
{
currentEndpoints: []string{},
newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
removedEndpoints: []string{},
},
{
currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
newEndpoints: []string{},
removedEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
},
{
currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.2:443"},
newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"},
removedEndpoints: []string{"10.0.2.2:443"},
},
}

for i := range testCases {
res := getRemovedEndpoints(testCases[i].currentEndpoints, testCases[i].newEndpoints)
if !slicesEquiv(res, testCases[i].removedEndpoints) {
t.Errorf("Expected: %v, but getRemovedEndpoints returned: %v", testCases[i].removedEndpoints, res)
}
}
}

func TestExecConntrackTool(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}

fakeProxier := Proxier{exec: &fexec}

testCases := [][]string{
{"-L", "-p", "udp"},
{"-D", "-p", "udp", "-d", "10.0.240.1"},
{"-D", "-p", "udp", "--orig-dst", "10.240.0.2", "--dst-nat", "10.0.10.2"},
}

expectErr := []bool{false, false, true}

for i := range testCases {
err := fakeProxier.execConntrackTool(testCases[i]...)

if expectErr[i] {
if err == nil {
t.Errorf("expected err, got %v", err)
}
} else {
if err != nil {
t.Errorf("expected success, got %v", err)
}
}

execCmd := strings.Join(fcmd.CombinedOutputLog[i], " ")
expectCmd := fmt.Sprintf("%s %s", "conntrack", strings.Join(testCases[i], " "))

if execCmd != expectCmd {
t.Errorf("expect execute command: %s, but got: %s", expectCmd, execCmd)
}
}
}

func newFakeServiceInfo(service proxy.ServicePortName, ip net.IP, protocol api.Protocol) *serviceInfo {
return &serviceInfo{
sessionAffinityType: api.ServiceAffinityNone, // default
stickyMaxAgeSeconds: 180, // TODO: paramaterize this in the API.
clusterIP: ip,
protocol: protocol,
}
}

func TestDeleteEndpointConnections(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}

serviceMap := make(map[proxy.ServicePortName]*serviceInfo)
svc1 := proxy.ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, ""}
svc2 := proxy.ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc2"}, ""}
serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), api.ProtocolUDP)
serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), api.ProtocolTCP)

fakeProxier := Proxier{exec: &fexec, serviceMap: serviceMap}

testCases := []endpointServicePair{
{
endpoint: "10.240.0.3:80",
servicePortName: svc1,
},
{
endpoint: "10.240.0.4:80",
servicePortName: svc1,
},
{
endpoint: "10.240.0.5:80",
servicePortName: svc2,
},
}

expectCommandExecCount := 0
for i := range testCases {
input := map[endpointServicePair]bool{testCases[i]: true}
fakeProxier.deleteEndpointConnections(input)
svcInfo := fakeProxier.serviceMap[testCases[i].servicePortName]
if svcInfo.protocol == api.ProtocolUDP {
svcIp := svcInfo.clusterIP.String()
endpointIp := strings.Split(testCases[i].endpoint, ":")[0]
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", svcIp, endpointIp)
execCommand := strings.Join(fcmd.CombinedOutputLog[expectCommandExecCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
expectCommandExecCount += 1
}

if expectCommandExecCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", expectCommandExecCount, fexec.CommandCalls)
}
}
}

func TestDeleteServiceConnections(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
func() ([]byte, error) {
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted.")
},
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}

fakeProxier := Proxier{exec: &fexec}

testCases := [][]string{
{
"10.240.0.3",
"10.240.0.5",
},
{
"10.240.0.4",
},
}

svcCount := 0
for i := range testCases {
fakeProxier.deleteServiceConnections(testCases[i])
for _, ip := range testCases[i] {
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", ip)
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand {
t.Errorf("Exepect comand: %s, but executed %s", expectCommand, execCommand)
}
svcCount += 1
}
if svcCount != fexec.CommandCalls {
t.Errorf("Exepect comand executed %d times, but got %d", svcCount, fexec.CommandCalls)
}
}
}

// TODO(thockin): add a test for syncProxyRules() or break it down further and test the pieces.

1 comment on commit ad8c677

@k8s-teamcity-mesosphere

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TeamCity OSS :: Kubernetes Mesos :: 4 - Smoke Tests Build 21729 outcome was SUCCESS
Summary: Tests passed: 1, ignored: 273 Build time: 00:12:20

Please sign in to comment.