diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index a9bec718601bf..b9c811d82cafc 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -101,6 +101,7 @@ type APIServer struct { LongRunningRequestRE string SSHUser string SSHKeyfile string + MaxConnectionBytesPerSec int64 } // NewAPIServer creates a new APIServer object with default parameters @@ -205,6 +206,7 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.LongRunningRequestRE, "long-running-request-regexp", defaultLongRunningRequestRE, "A regular expression matching long running requests which should be excluded from maximum inflight request handling.") fs.StringVar(&s.SSHUser, "ssh-user", "", "If non-empty, use secure SSH proxy to the nodes, using this user name") fs.StringVar(&s.SSHKeyfile, "ssh-keyfile", "", "If non-empty, use secure SSH proxy to the nodes, using this user keyfile") + fs.Int64Var(&s.MaxConnectionBytesPerSec, "max-connection-bytes-per-sec", 0, "If non-zero, throttle each user connection to this number of bytes/sec. Currently only applies to long-running requests") } // TODO: Longer term we should read this from some config store, rather than a flag. @@ -255,7 +257,8 @@ func (s *APIServer) Run(_ []string) error { capabilities.Initialize(capabilities.Capabilities{ AllowPrivileged: s.AllowPrivileged, // TODO(vmarmol): Implement support for HostNetworkSources. - HostNetworkSources: []string{}, + HostNetworkSources: []string{}, + PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec, }) cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 71a48c91c5239..16dcf4d28fc5d 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -604,7 +604,7 @@ func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error { } else { glog.Warning("No api server defined - no events will be sent to API server.") } - capabilities.Setup(kcfg.AllowPrivileged, kcfg.HostNetworkSources) + capabilities.Setup(kcfg.AllowPrivileged, kcfg.HostNetworkSources, 0) credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory) diff --git a/pkg/capabilities/capabilities.go b/pkg/capabilities/capabilities.go index d105f1d660877..7a1281447c5c9 100644 --- a/pkg/capabilities/capabilities.go +++ b/pkg/capabilities/capabilities.go @@ -27,6 +27,9 @@ type Capabilities struct { // List of pod sources for which using host network is allowed. HostNetworkSources []string + + // PerConnectionBandwidthLimitBytesPerSec limits the throughput of each connection (currently only used for proxy, exec, attach) + PerConnectionBandwidthLimitBytesPerSec int64 } // TODO: Clean these up into a singleton @@ -43,10 +46,11 @@ func Initialize(c Capabilities) { } // Setup the capability set. It wraps Initialize for improving usibility. -func Setup(allowPrivileged bool, hostNetworkSources []string) { +func Setup(allowPrivileged bool, hostNetworkSources []string, perConnectionBytesPerSec int64) { Initialize(Capabilities{ - AllowPrivileged: allowPrivileged, - HostNetworkSources: hostNetworkSources, + AllowPrivileged: allowPrivileged, + HostNetworkSources: hostNetworkSources, + PerConnectionBandwidthLimitBytesPerSec: perConnectionBytesPerSec, }) } diff --git a/pkg/registry/generic/rest/proxy.go b/pkg/registry/generic/rest/proxy.go index 5339c596f4696..12d34b250c7fd 100644 --- a/pkg/registry/generic/rest/proxy.go +++ b/pkg/registry/generic/rest/proxy.go @@ -34,6 +34,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/third_party/golang/netutil" "github.com/golang/glog" + "github.com/mxk/go-flowrate/flowrate" ) // UpgradeAwareProxyHandler is a handler for proxy requests that may require an upgrade @@ -42,6 +43,7 @@ type UpgradeAwareProxyHandler struct { Location *url.URL Transport http.RoundTripper FlushInterval time.Duration + MaxBytesPerSec int64 err error } @@ -152,7 +154,13 @@ func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.R wg.Add(2) go func() { - _, err := io.Copy(backendConn, requestHijackedConn) + var writer io.WriteCloser + if h.MaxBytesPerSec > 0 { + writer = flowrate.NewWriter(backendConn, h.MaxBytesPerSec) + } else { + writer = backendConn + } + _, err := io.Copy(writer, requestHijackedConn) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { glog.Errorf("Error proxying data from client to backend: %v", err) } @@ -160,7 +168,13 @@ func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.R }() go func() { - _, err := io.Copy(requestHijackedConn, backendConn) + var reader io.ReadCloser + if h.MaxBytesPerSec > 0 { + reader = flowrate.NewReader(backendConn, h.MaxBytesPerSec) + } else { + reader = backendConn + } + _, err := io.Copy(requestHijackedConn, reader) if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { glog.Errorf("Error proxying data from backend to client: %v", err) } diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 74694f543af6d..bb7b4e2f716b6 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" @@ -277,7 +278,7 @@ func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (re return nil, err } location.Path = path.Join(location.Path, proxyOpts.Path) - return genericrest.NewUpgradeAwareProxyHandler(location, nil, false), nil + return newUpgradeAwareProxyHandler(location, nil, false), nil } // Support both GET and POST methods. Over time, we want to move all clients to start using POST and then stop supporting GET. @@ -307,7 +308,7 @@ func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object) (r if err != nil { return nil, err } - return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil + return newUpgradeAwareProxyHandler(location, transport, true), nil } // NewConnectOptions returns the versioned object that represents exec parameters @@ -350,5 +351,11 @@ func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Obj if err != nil { return nil, err } - return genericrest.NewUpgradeAwareProxyHandler(location, transport, true), nil + return newUpgradeAwareProxyHandler(location, transport, true), nil +} + +func newUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, upgradeRequired bool) *genericrest.UpgradeAwareProxyHandler { + handler := genericrest.NewUpgradeAwareProxyHandler(location, transport, upgradeRequired) + handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec + return handler }