Skip to content

Commit

Permalink
kindnetd don´t wait for kube-proxy
Browse files Browse the repository at this point in the history
kindnetd allows to replace the incluster configuration, with the
internal API server address, with an environment variable.
It is possible that the environment variable we configure changes
after a node reboots.

If the connectivity to the new address fails, it falls back to the
incluster configuration.

This removes the dependency on kube-proxy, because it needs to
install the corresponding rules to forward traffic to
the apiserver Service.
  • Loading branch information
Antonio Ojea committed Feb 20, 2021
1 parent 37613ef commit 66f0a2d
Showing 1 changed file with 59 additions and 2 deletions.
61 changes: 59 additions & 2 deletions images/kindnetd/cmd/kindnetd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@ import (
"context"
"flag"
"fmt"
"net"
"os"
"syscall"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/net"
utilsnet "k8s.io/utils/net"
)

const (
probeTCPtimeout = 1 * time.Second
)

// kindnetd is a simple networking daemon to complete kind's CNI implementation
Expand All @@ -40,6 +46,7 @@ import (
// - HOST_IP: should be populated by downward API
// - POD_IP: should be populated by downward API
// - CNI_CONFIG_TEMPLATE: the cni .conflist template, run with {{ .PodCIDR }}
// - CONTROL_PLANE_ENDPOINT: control-plane endpoint format host:port

// TODO: improve logging & error handling

Expand All @@ -54,10 +61,31 @@ func main() {
if err != nil {
panic(err.Error())
}

// override the internal apiserver endpoint to avoid
// waiting for kube-proxy to install the services rules.
// If the endpoint is not reachable, fallback the internal endpoint
controlPlaneEndpoint := os.Getenv("CONTROL_PLANE_ENDPOINT")
if controlPlaneEndpoint != "" {
// check that the apiserver is reachable before continue
// to fail fast and avoid waiting until the client operations timeout
var ok bool
for i := 0; i < 5; i++ {
ok = probeTCP(controlPlaneEndpoint, probeTCPtimeout)
if ok {
config.Host = "https://" + controlPlaneEndpoint
break
}
klog.Infof("apiserver not reachable, attempt %d ... retrying", i)
time.Sleep(time.Second * time.Duration(i))
}
}
// create the clientset to connect the apiserver
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
klog.Infof("connected to apiserver: %s", config.Host)

// obtain the host and pod ip addresses
// if both ips are different we are not using the host network
Expand All @@ -83,7 +111,7 @@ func main() {

// enforce ip masquerade rules
// TODO: dual stack...?
masqAgent, err := NewIPMasqAgent(net.IsIPv6String(hostIP), []string{os.Getenv("POD_SUBNET")})
masqAgent, err := NewIPMasqAgent(utilsnet.IsIPv6String(hostIP), []string{os.Getenv("POD_SUBNET")})
if err != nil {
panic(err.Error())
}
Expand Down Expand Up @@ -194,3 +222,32 @@ func internalIP(node corev1.Node) string {
}
return ""
}

// Modified from agnhost connect command in k/k
// https://github.com/kubernetes/kubernetes/blob/c241a237f9a635286c76c20d07b103a663b1cfa4/test/images/agnhost/connect/connect.go#L66
func probeTCP(address string, timeout time.Duration) bool {
klog.Infof("probe TCP address %s", address)
if _, err := net.ResolveTCPAddr("tcp", address); err != nil {
klog.Warningf("DNS problem %s: %v", address, err)
return false
}

conn, err := net.DialTimeout("tcp", address, timeout)
if err == nil {
conn.Close()
return true
}
if opErr, ok := err.(*net.OpError); ok {
if opErr.Timeout() {
klog.Warningf("TIMEOUT %s", address)
} else if syscallErr, ok := opErr.Err.(*os.SyscallError); ok {
if syscallErr.Err == syscall.ECONNREFUSED {
klog.Warningf("REFUSED %s", address)
}
}
return false
}

klog.Warningf("OTHER %s: %v", address, err)
return false
}

0 comments on commit 66f0a2d

Please sign in to comment.