Skip to content

Commit

Permalink
Merge pull request #24566 from freehan/automated-cherry-pick-of-#2257…
Browse files Browse the repository at this point in the history
…3-upstream-release-1.2

Automated cherry pick of #22573
  • Loading branch information
zmerlynn committed Apr 21, 2016
2 parents 867a249 + 0df8c2e commit 6652423
Show file tree
Hide file tree
Showing 8 changed files with 355 additions and 20 deletions.
12 changes: 6 additions & 6 deletions build/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -104,28 +104,28 @@ kube::build::get_docker_wrapped_binaries() {
kube-apiserver,busybox
kube-controller-manager,busybox
kube-scheduler,busybox
kube-proxy,gcr.io/google_containers/debian-iptables:v1
kube-proxy,gcr.io/google_containers/debian-iptables-amd64:v3
);;
"arm") # TODO: Use image with iptables installed for kube-proxy for arm, arm64 and ppc64le
"arm")
local targets=(
kube-apiserver,hypriot/armhf-busybox
kube-controller-manager,hypriot/armhf-busybox
kube-scheduler,hypriot/armhf-busybox
kube-proxy,hypriot/armhf-busybox
kube-proxy,gcr.io/google_containers/debian-iptables-arm:v3
);;
"arm64")
local targets=(
kube-apiserver,aarch64/busybox
kube-controller-manager,aarch64/busybox
kube-scheduler,aarch64/busybox
kube-proxy,aarch64/busybox
kube-proxy,gcr.io/google_containers/debian-iptables-arm64:v3
);;
"ppc64le")
local targets=(
kube-apiserver,ppc64le/busybox
kube-controller-manager,ppc64le/busybox
kube-scheduler,ppc64le/busybox
kube-proxy,ppc64le/busybox
kube-proxy,gcr.io/google_containers/debian-iptables-ppc64le:v3
);;
esac

Expand Down Expand Up @@ -1573,4 +1573,4 @@ function kube::release::has_gcloud_account() {
else
return 1
fi
}
}
13 changes: 9 additions & 4 deletions build/debian-iptables/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM debian:jessie
FROM BASEIMAGE

# If we're building for another architecture than amd64, the CROSS_BUILD_ placeholder is removed so e.g. CROSS_BUILD_COPY turns into COPY
# If we're building normally, for amd64, CROSS_BUILD lines are removed
CROSS_BUILD_COPY qemu-ARCH-static /usr/bin/

# All apt-get's must be in one run command or the
# cleanup has no effect.
RUN apt-get update && \
apt-get install -y iptables && \
ls /var/lib/apt/lists/*debian* | xargs rm
RUN DEBIAN_FRONTEND=noninteractive apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y iptables \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y conntrack \
&& rm -rf /var/lib/apt/lists/*
46 changes: 40 additions & 6 deletions build/debian-iptables/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,47 @@

.PHONY: build push

IMAGE = debian-iptables
TAG = v1
REGISTRY?="gcr.io/google_containers"
IMAGE=debian-iptables
TAG=v3
ARCH?=amd64
TEMP_DIR:=$(shell mktemp -d)

ifeq ($(ARCH),amd64)
BASEIMAGE?=debian:jessie
endif
ifeq ($(ARCH),arm)
BASEIMAGE?=armel/debian:jessie
QEMUARCH=arm
endif
ifeq ($(ARCH),arm64)
BASEIMAGE?=aarch64/debian:jessie
QEMUARCH=aarch64
endif
ifeq ($(ARCH),ppc64le)
BASEIMAGE?=ppc64le/debian:jessie
QEMUARCH=ppc64le
endif

build:
docker build -t gcr.io/google_containers/$(IMAGE):$(TAG) .
cp ./* $(TEMP_DIR)
cd $(TEMP_DIR) && sed -i "s|BASEIMAGE|$(BASEIMAGE)|g" Dockerfile
cd $(TEMP_DIR) && sed -i "s|ARCH|$(QEMUARCH)|g" Dockerfile

ifeq ($(ARCH),amd64)
# When building "normally" for amd64, remove the whole line, it has no part in the amd64 image
cd $(TEMP_DIR) && sed -i "/CROSS_BUILD_/d" Dockerfile
else
# When cross-building, only the placeholder "CROSS_BUILD_" should be removed
# Register /usr/bin/qemu-ARCH-static as the handler for ARM binaries in the kernel
docker run --rm --privileged multiarch/qemu-user-static:register --reset
curl -sSL https://github.com/multiarch/qemu-user-static/releases/download/v2.5.0/x86_64_qemu-$(QEMUARCH)-static.tar.xz | tar -xJ -C $(TEMP_DIR)
cd $(TEMP_DIR) && sed -i "s/CROSS_BUILD_//g" Dockerfile
endif

docker build -t $(REGISTRY)/$(IMAGE)-$(ARCH):$(TAG) $(TEMP_DIR)

push: build
gcloud docker --server=gcr.io push gcr.io/google_containers/$(IMAGE):$(TAG)
push: build
gcloud docker push $(REGISTRY)/$(IMAGE)-$(ARCH):$(TAG)

all: push
all: push
2 changes: 1 addition & 1 deletion cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config")
}

proxierIptables, err := iptables.NewProxier(iptInterface, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, *config.IPTablesMasqueradeBit, config.ClusterCIDR)
proxierIptables, err := iptables.NewProxier(iptInterface, execer, config.IPTablesSyncPeriod.Duration, config.MasqueradeAll, *config.IPTablesMasqueradeBit, config.ClusterCIDR)
if err != nil {
glog.Fatalf("Unable to create proxier: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion cmd/libs/go2idl/import-boss/generators/import_restrict.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"k8s.io/kubernetes/cmd/libs/go2idl/generator"
"k8s.io/kubernetes/cmd/libs/go2idl/namer"
"k8s.io/kubernetes/cmd/libs/go2idl/types"

//"github.com/golang/glog"
)

Expand Down
1 change: 0 additions & 1 deletion pkg/kubelet/config/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"

//"github.com/ghodss/yaml"
)

Expand Down
82 changes: 81 additions & 1 deletion pkg/proxy/iptables/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/types"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/slice"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
)
Expand Down Expand Up @@ -159,6 +161,7 @@ type Proxier struct {
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
exec utilexec.Interface
clusterCIDR string
}

Expand All @@ -185,7 +188,7 @@ var _ proxy.ProxyProvider = &Proxier{}
// An error will be returned if iptables fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables up to date in the background and
// will not terminate if a particular iptables call fails.
func NewProxier(ipt utiliptables.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string) (*Proxier, error) {
func NewProxier(ipt utiliptables.Interface, exec utilexec.Interface, syncPeriod time.Duration, masqueradeAll bool, masqueradeBit int, clusterCIDR string) (*Proxier, error) {
// Set the route_localnet sysctl we need for
if err := utilsysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
Expand Down Expand Up @@ -220,6 +223,7 @@ func NewProxier(ipt utiliptables.Interface, syncPeriod time.Duration, masquerade
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
exec: exec,
clusterCIDR: clusterCIDR,
}, nil
}
Expand Down Expand Up @@ -435,15 +439,21 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
}
}

staleUDPServices := sets.NewString()
// Remove services missing from the update.
for name := range proxier.serviceMap {
if !activeServices[name] {
glog.V(1).Infof("Removing service %q", name)
if proxier.serviceMap[name].protocol == api.ProtocolUDP {
staleUDPServices.Insert(proxier.serviceMap[name].clusterIP.String())
}
delete(proxier.serviceMap, name)
}
}

proxier.syncProxyRules()
proxier.deleteServiceConnections(staleUDPServices.List())

}

// OnEndpointsUpdate takes in a slice of updated endpoints.
Expand All @@ -458,6 +468,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.haveReceivedEndpointsUpdate = true

activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
staleConnections := make(map[endpointServicePair]bool)

// Update endpoints for services.
for i := range allEndpoints {
Expand All @@ -481,7 +492,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
curEndpoints := proxier.endpointsMap[svcPort]
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])

if len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
removedEndpoints := getRemovedEndpoints(curEndpoints, newEndpoints)
for _, ep := range removedEndpoints {
staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
}
glog.V(1).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
proxier.endpointsMap[svcPort] = newEndpoints
}
Expand All @@ -492,12 +508,18 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
// Remove endpoints missing from the update.
for name := range proxier.endpointsMap {
if !activeEndpoints[name] {
// record endpoints of unactive service to stale connections
for _, ep := range proxier.endpointsMap[name] {
staleConnections[endpointServicePair{endpoint: ep, servicePortName: name}] = true
}

glog.V(2).Infof("Removing endpoints for %q", name)
delete(proxier.endpointsMap, name)
}
}

proxier.syncProxyRules()
proxier.deleteEndpointConnections(staleConnections)
}

// used in OnEndpointsUpdate
Expand Down Expand Up @@ -553,6 +575,64 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
}

// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints
func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string {
return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List()
}

type endpointServicePair struct {
endpoint string
servicePortName proxy.ServicePortName
}

const noConnectionToDelete = "0 flow entries have been deleted"

// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held
func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
for epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
endpointIP := strings.Split(epSvcPair.endpoint, ":")[0]
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
err := proxier.execConntrackTool("-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
}
}
}
}

// deleteServiceConnection use conntrack-tool to delete UDP connection specified by service ip
func (proxier *Proxier) deleteServiceConnections(svcIPs []string) {
for _, ip := range svcIPs {
glog.V(2).Infof("Deleting connection tracking state for service IP %s", ip)
err := proxier.execConntrackTool("-D", "--orig-dst", ip, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services.
glog.Errorf("conntrack return with error: %v", err)
}
}
}

//execConntrackTool executes conntrack tool using given paramters
func (proxier *Proxier) execConntrackTool(parameters ...string) error {
conntrackPath, err := proxier.exec.LookPath("conntrack")
if err != nil {
return fmt.Errorf("Error looking for path of conntrack: %v", err)
}
output, err := proxier.exec.Command(conntrackPath, parameters...).CombinedOutput()
if err != nil {
return fmt.Errorf("Conntrack command returned: %q, error message: %s", string(output), err)
}
return nil
}

// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
Expand Down
Loading

0 comments on commit 6652423

Please sign in to comment.