Skip to content

Commit

Permalink
Emit event and retry when fail to start healthz server on kube-proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
MrHohn committed Jul 28, 2017
1 parent e4551d5 commit db379de
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 22 deletions.
16 changes: 8 additions & 8 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,10 +462,17 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})

nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}

var healthzServer *healthcheck.HealthzServer
var healthzUpdater healthcheck.HealthzUpdater
if len(config.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration)
healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
healthzUpdater = healthzServer
}

Expand Down Expand Up @@ -572,13 +579,6 @@ func NewProxyServer(config *componentconfig.KubeProxyConfiguration, cleanupAndEx
iptInterface.AddReloadFunc(proxier.Sync)
}

nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: hostname,
UID: types.UID(hostname),
Namespace: "",
}

return &ProxyServer{
Client: client,
EventClient: eventClient,
Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/healthcheck/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
Expand Down
40 changes: 27 additions & 13 deletions pkg/proxy/healthcheck/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
)

var nodeHealthzRetryInterval = 60 * time.Second

// Server serves HTTP endpoints for each service name, with results
// based on the endpoints. If there are 0 endpoints for a service, it returns a
// 503 "Service Unavailable" error (telling LBs not to use this node). If there
Expand Down Expand Up @@ -161,7 +164,7 @@ func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) err
Namespace: nsn.Namespace,
Name: nsn.Name,
UID: types.UID(nsn.String()),
}, api.EventTypeWarning, "FailedToStartHealthcheck", msg)
}, api.EventTypeWarning, "FailedToStartServiceHealthcheck", msg)
}
glog.Error(msg)
continue
Expand Down Expand Up @@ -259,16 +262,18 @@ type HealthzServer struct {
addr string
port int32
healthTimeout time.Duration
recorder record.EventRecorder
nodeRef *v1.ObjectReference

lastUpdated atomic.Value
}

// NewDefaultHealthzServer returns a default healthz http server.
func NewDefaultHealthzServer(addr string, healthTimeout time.Duration) *HealthzServer {
return newHealthzServer(nil, nil, nil, addr, healthTimeout)
func NewDefaultHealthzServer(addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
return newHealthzServer(nil, nil, nil, addr, healthTimeout, recorder, nodeRef)
}

func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration) *HealthzServer {
func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c clock.Clock, addr string, healthTimeout time.Duration, recorder record.EventRecorder, nodeRef *v1.ObjectReference) *HealthzServer {
if listener == nil {
listener = stdNetListener{}
}
Expand All @@ -284,6 +289,8 @@ func newHealthzServer(listener Listener, httpServerFactory HTTPServerFactory, c
clock: c,
addr: addr,
healthTimeout: healthTimeout,
recorder: recorder,
nodeRef: nodeRef,
}
}

Expand All @@ -297,19 +304,26 @@ func (hs *HealthzServer) Run() {
serveMux := http.NewServeMux()
serveMux.Handle("/healthz", healthzHandler{hs: hs})
server := hs.httpFactory.New(hs.addr, serveMux)
listener, err := hs.listener.Listen(hs.addr)
if err != nil {
glog.Errorf("Failed to start healthz on %s: %v", hs.addr, err)
return
}
go func() {

go wait.Until(func() {
glog.V(3).Infof("Starting goroutine for healthz on %s", hs.addr)

listener, err := hs.listener.Listen(hs.addr)
if err != nil {
msg := fmt.Sprintf("Failed to start node healthz on %s: %v", hs.addr, err)
if hs.recorder != nil {
hs.recorder.Eventf(hs.nodeRef, api.EventTypeWarning, "FailedToStartNodeHealthcheck", msg)
}
glog.Error(msg)
return
}

if err := server.Serve(listener); err != nil {
glog.Errorf("Healhz closed: %v", err)
glog.Errorf("Healthz closed with error: %v", err)
return
}
glog.Errorf("Unexpected healhz closed.")
}()
glog.Errorf("Unexpected healthz closed.")
}, nodeHealthzRetryInterval, wait.NeverStop)
}

type healthzHandler struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/proxy/healthcheck/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func TestHealthzServer(t *testing.T) {
httpFactory := newFakeHTTPServerFactory()
fakeClock := clock.NewFakeClock(time.Now())

hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second)
hs := newHealthzServer(listener, httpFactory, fakeClock, "127.0.0.1:10256", 10*time.Second, nil, nil)
server := hs.httpFactory.New(hs.addr, healthzHandler{hs: hs})

// Should return 200 "OK" by default.
Expand Down

0 comments on commit db379de

Please sign in to comment.