Skip to content

Commit

Permalink
Merge pull request kubernetes#25123 from sttts/sttts-termination-logg…
Browse files Browse the repository at this point in the history
…ing-listener

UPSTREAM: <carry>: apiserver: log new connections during termination

Origin-commit: bbf6417825ee2ddf5c44253d5df863192f71a1fa
  • Loading branch information
k8s-publishing-bot committed Jun 18, 2020
2 parents e7d6075 + 76be6d7 commit 57aec40
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 41 deletions.
7 changes: 0 additions & 7 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,6 @@ type Config struct {
// Default to 0, means never send GOAWAY. Max is 0.02 to prevent break the apiserver.
GoawayChance float64

// TerminationGoaway indicates that on start of termination GOAWAY frames are sent
// immediately (before ShutdownDelayDuration has passed) in order to speed up draining.
TerminationGoaway bool

// MergedResourceConfig indicates which groupVersion enabled and its resources enabled/disabled.
// This is composed of genericapiserver defaultAPIResourceConfig and those parsed from flags.
// If not specify any in flags, then genericapiserver will only enable defaultAPIResourceConfig.
Expand Down Expand Up @@ -762,9 +758,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
if c.TerminationGoaway {
handler = genericfilters.WithTerminationGoaway(handler, c.IsTerminating)
}
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithPanicRecovery(handler, c.IsTerminating)
return handler
Expand Down
30 changes: 7 additions & 23 deletions staging/src/k8s.io/apiserver/pkg/server/filters/goaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,25 +56,13 @@ func WithProbabilisticGoaway(inner http.Handler, chance float64) http.Handler {
}
}

// WithTerminationGoaway sends a GOAWAY when isTerminating is returning true.
func WithTerminationGoaway(inner http.Handler, isTerminating func() bool) http.Handler {
return &goaway{
handler: inner,
decider: GoawayDeciderFunc(
func(r *http.Request) bool {
return isTerminating()
},
),
}
}

// goaway send a GOAWAY to client according to decider for HTTP2 requests
type goaway struct {
handler http.Handler
decider GoawayDecider
}

// ServeHTTP implements HTTP handler
// ServeHTTP implement HTTP handler
func (h *goaway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Proto == "HTTP/2.0" && h.decider.Goaway(r) {
// Send a GOAWAY and tear down the TCP connection when idle.
Expand All @@ -84,21 +72,17 @@ func (h *goaway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.handler.ServeHTTP(w, r)
}

// probabilisticGoawayDecider sends GOAWAY probabilistically according to chance
// probabilisticGoawayDecider send GOAWAY probabilistically according to chance
type probabilisticGoawayDecider struct {
chance float64
next func() float64
}

// Goaway implements GoawayDecider
// Goaway implement GoawayDecider
func (p *probabilisticGoawayDecider) Goaway(r *http.Request) bool {
return p.next() < p.chance
}

// GoawayDeciderFunc wraps a decider func as decider interface.
type GoawayDeciderFunc func(r *http.Request) bool
if p.next() < p.chance {
return true
}

// GoawayDeciderFunc implements GoawayDecider
func (f GoawayDeciderFunc) Goaway(r *http.Request) bool {
return f(r)
return false
}
45 changes: 45 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"fmt"
"net"
"net/http"
"os"
gpath "path"
Expand Down Expand Up @@ -339,6 +340,23 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
s.Eventf(corev1.EventTypeNormal, "TerminationMinimalShutdownDurationFinished", "The minimal shutdown duration of %v finished", s.ShutdownDelayDuration)
}()

lateStopCh := make(chan struct{})
if s.ShutdownDelayDuration > 0 {
go func() {
defer close(lateStopCh)

<-stopCh

time.Sleep(s.ShutdownDelayDuration * 8 / 10)
}()
}

s.SecureServingInfo.Listener = &terminationLoggingListener{
Listener: s.SecureServingInfo.Listener,
stopCh: stopCh,
lateStopCh: lateStopCh,
}

// close socket after delayed stopCh
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
if err != nil {
Expand Down Expand Up @@ -654,3 +672,30 @@ func (s *GenericAPIServer) Eventf(eventType, reason, messageFmt string, args ...
klog.Warningf("failed to create event %s/%s: %v", e.Namespace, e.Name, err)
}
}

// terminationLoggingListener wraps the given listener and logs new connections
// after the stopCh has been closed, i.e. when termination has begun.
type terminationLoggingListener struct {
net.Listener
stopCh, lateStopCh <-chan struct{}
}

func (l *terminationLoggingListener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}

select {
case <-l.lateStopCh:
klog.Warningf("Accepted new connection from %s very late in the graceful termination process (more than 80%% has passed), possibly a hint for a broken load balancer setup.", c.RemoteAddr().String())
default:
select {
case <-l.stopCh:
klog.V(2).Infof("Accepted new connection from %s during graceful termination.", c.RemoteAddr())
default:
}
}

return c, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type ServerRunOptions struct {
MaxMutatingRequestsInFlight int
RequestTimeout time.Duration
GoawayChance float64
TerminationGoaway bool
LivezGracePeriod time.Duration
MinRequestTimeout int
ShutdownDelayDuration time.Duration
Expand Down Expand Up @@ -79,7 +78,6 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.LivezGracePeriod = s.LivezGracePeriod
c.RequestTimeout = s.RequestTimeout
c.GoawayChance = s.GoawayChance
c.TerminationGoaway = s.TerminationGoaway
c.MinRequestTimeout = s.MinRequestTimeout
c.ShutdownDelayDuration = s.ShutdownDelayDuration
c.JSONPatchMaxCopyBytes = s.JSONPatchMaxCopyBytes
Expand Down Expand Up @@ -196,9 +194,6 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"This argument sets the fraction of requests that will be sent a GOAWAY. Clusters with single apiservers, or which don't use a load balancer, should NOT enable this. "+
"Min is 0 (off), Max is .02 (1/50 requests); .001 (1/1000) is a recommended starting point.")

fs.BoolVar(&s.TerminationGoaway, "termination-goaway", s.TerminationGoaway, ""+
"This option speeds up draining of a terminating apiserver by sending GOAWAY frames immediately after termination has been initiated (i.e. before shutdown-delay-duration has passed).")

fs.DurationVar(&s.LivezGracePeriod, "livez-grace-period", s.LivezGracePeriod, ""+
"This option represents the maximum amount of time it should take for apiserver to complete its startup sequence "+
"and become live. From apiserver's start time to when this amount of time has elapsed, /livez will assume "+
Expand Down
14 changes: 8 additions & 6 deletions staging/src/k8s.io/apiserver/pkg/server/secure_serving.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func RunServer(
defer utilruntime.HandleCrash()

var listener net.Listener
listener = tcpKeepAliveListener{ln.(*net.TCPListener)}
listener = tcpKeepAliveListener{ln}
if server.TLSConfig != nil {
listener = tls.NewListener(listener, server.TLSConfig)
}
Expand All @@ -235,15 +235,17 @@ func RunServer(
//
// Copied from Go 1.7.2 net/http/server.go
type tcpKeepAliveListener struct {
*net.TCPListener
net.Listener
}

func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
tc, err := ln.AcceptTCP()
c, err := ln.Listener.Accept()
if err != nil {
return nil, err
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(defaultKeepAlivePeriod)
return tc, nil
if tc, ok := c.(*net.TCPConn); ok {
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(defaultKeepAlivePeriod)
}
return c, nil
}

0 comments on commit 57aec40

Please sign in to comment.