Skip to content

Commit

Permalink
Add optional throttling to the proxy/exec/attach methods
Browse files Browse the repository at this point in the history
  • Loading branch information
brendandburns committed Jul 29, 2015
1 parent dfe3e80 commit 99b02bf
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 10 deletions.
5 changes: 4 additions & 1 deletion cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type APIServer struct {
LongRunningRequestRE string
SSHUser string
SSHKeyfile string
MaxConnectionBytesPerSec int64
}

// NewAPIServer creates a new APIServer object with default parameters
Expand Down Expand Up @@ -205,6 +206,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", defaultLongRunningRequestRE, "A regular expression matching long running requests which should be excluded from maximum inflight request handling.")
fs.StringVar(&s.SSHUser, "ssh-user", "", "If non-empty, use secure SSH proxy to the nodes, using this user name")
fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", "", "If non-empty, use secure SSH proxy to the nodes, using this user keyfile")
fs.Int64Var(&s.MaxConnectionBytesPerSec, "max-connection-bytes-per-sec", 0, "If non-zero, throttle each user connection to this number of bytes/sec. Currently only applies to long-running requests")
}

// TODO: Longer term we should read this from some config store, rather than a flag.
Expand Down Expand Up @@ -255,7 +257,8 @@ func (s *APIServer) Run(_ []string) error {
capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: s.AllowPrivileged,
// TODO(vmarmol): Implement support for HostNetworkSources.
HostNetworkSources: []string{},
HostNetworkSources: []string{},
PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec,
})

cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {
} else {
glog.Warning("No api server defined - no events will be sent to API server.")
}
capabilities.Setup(kcfg.AllowPrivileged, kcfg.HostNetworkSources)
capabilities.Setup(kcfg.AllowPrivileged, kcfg.HostNetworkSources, 0)

credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)

Expand Down
10 changes: 7 additions & 3 deletions pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Capabilities struct {

// List of pod sources for which using host network is allowed.
HostNetworkSources []string

// PerConnectionBandwidthLimitBytesPerSec limits the throughput of each connection (currently only used for proxy, exec, attach)
PerConnectionBandwidthLimitBytesPerSec int64
}

// TODO: Clean these up into a singleton
Expand All @@ -43,10 +46,11 @@ func Initialize(c Capabilities) {
}

// Setup the capability set. It wraps Initialize for improving usibility.
func Setup(allowPrivileged bool, hostNetworkSources []string) {
func Setup(allowPrivileged bool, hostNetworkSources []string, perConnectionBytesPerSec int64) {
Initialize(Capabilities{
AllowPrivileged: allowPrivileged,
HostNetworkSources: hostNetworkSources,
AllowPrivileged: allowPrivileged,
HostNetworkSources: hostNetworkSources,
PerConnectionBandwidthLimitBytesPerSec: perConnectionBytesPerSec,
})
}

Expand Down
18 changes: 16 additions & 2 deletions pkg/registry/generic/rest/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/third_party/golang/netutil"
"github.com/golang/glog"
"github.com/mxk/go-flowrate/flowrate"
)

// UpgradeAwareProxyHandler is a handler for proxy requests that may require an upgrade
Expand All @@ -42,6 +43,7 @@ type UpgradeAwareProxyHandler struct {
Location *url.URL
Transport http.RoundTripper
FlushInterval time.Duration
MaxBytesPerSec int64
err error
}

Expand Down Expand Up @@ -152,15 +154,27 @@ func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.R
wg.Add(2)

go func() {
_, err := io.Copy(backendConn, requestHijackedConn)
var writer io.WriteCloser
if h.MaxBytesPerSec > 0 {
writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec)
} else {
writer = backendConn
}
_, err := io.Copy(writer, requestHijackedConn)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
glog.Errorf("Error proxying data from client to backend: %v", err)
}
wg.Done()
}()

go func() {
_, err := io.Copy(requestHijackedConn, backendConn)
var reader io.ReadCloser
if h.MaxBytesPerSec > 0 {
reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec)
} else {
reader = backendConn
}
_, err := io.Copy(requestHijackedConn, reader)
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
glog.Errorf("Error proxying data from backend to client: %v", err)
}
Expand Down
13 changes: 10 additions & 3 deletions pkg/registry/pod/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
Expand Down Expand Up @@ -277,7 +278,7 @@ func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (re
return nil, err
}
location.Path = path.Join(location.Path, proxyOpts.Path)
return genericrest.NewUpgradeAwareProxyHandler(location, nil, false), nil
return newUpgradeAwareProxyHandler(location, nil, false), nil
}

// Support both GET and POST methods. Over time, we want to move all clients to start using POST and then stop supporting GET.
Expand Down Expand Up @@ -307,7 +308,7 @@ func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object) (r
if err != nil {
return nil, err
}
return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil
return newUpgradeAwareProxyHandler(location, transport, true), nil
}

// NewConnectOptions returns the versioned object that represents exec parameters
Expand Down Expand Up @@ -350,5 +351,11 @@ func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Obj
if err != nil {
return nil, err
}
return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil
return newUpgradeAwareProxyHandler(location, transport, true), nil
}

func newUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *genericrest.UpgradeAwareProxyHandler {
handler := genericrest.NewUpgradeAwareProxyHandler(location, transport, upgradeRequired)
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
return handler
}

0 comments on commit 99b02bf

Please sign in to comment.