Skip to content

Commit

Permalink
Expose exec and logs via WebSockets
Browse files Browse the repository at this point in the history
Not all clients and systems can support SPDY protocols. This commit adds
support for two new websocket protocols, one to handle streaming of pod
logs from a pod, and the other to allow exec to be tunneled over
websocket.

Browser support for chunked encoding is still poor, and web consoles
that wish to show pod logs may need to make compromises to display the
output. The /pods/<name>/log endpoint now supports websocket upgrade to
the 'binary.k8s.io' subprotocol, which sends chunks of logs as binary to
the client. Messages are written as logs are streamed from the container
daemon, so flushing should be unaffected.

Browser support for raw communication over SDPY is not possible, and
some languages lack libraries for it and HTTP/2. The Kubelet supports
upgrade to WebSocket instead of SPDY, and will multiplex STDOUT/IN/ERR
over websockets by prepending each binary message with a single byte
representing the channel (0 for IN, 1 for OUT, and 2 for ERR). Because
framing on WebSockets suffers from head-of-line blocking, clients and
other server code should ensure that no particular stream blocks. An
alternative subprotocol 'base64.channel.k8s.io' base64 encodes the body
and uses '0'-'9' to represent the channel for ease of use in browsers.
  • Loading branch information
smarterclayton committed Oct 9, 2015
1 parent 2f90f66 commit 363b616
Show file tree
Hide file tree
Showing 11 changed files with 1,177 additions and 7 deletions.
17 changes: 17 additions & 0 deletions docs/devel/api-conventions.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ using resources with kubectl can be found in [Working with resources](../user-gu
- [Events](#events)
- [Naming conventions](#naming-conventions)
- [Label, selector, and annotation conventions](#label-selector-and-annotation-conventions)
- [WebSockets and SPDY](#websockets-and-spdy)

<!-- END MUNGE: GENERATED_TOC -->

Expand Down Expand Up @@ -721,6 +722,22 @@ Other advice regarding use of labels, annotations, and other generic map keys by
- Use annotations to store API extensions that the controller responsible for the resource doesn't need to know about, experimental fields that aren't intended to be generally used API fields, etc. Beware that annotations aren't automatically handled by the API conversion machinery.


## WebSockets and SPDY

Some of the API operations exposed by Kubernetes involve transfer of binary streams between the client and a container, including attach, exec, portforward, and logging. The API therefore exposes certain operations over upgradeable HTTP connections ([described in RFC 2817](https://tools.ietf.org/html/rfc2817)) via the WebSocket and SPDY protocols. These actions are exposed as subresources with their associated verbs (exec, log, attach, and portforward) and are requested via a GET (to support JavaScript in a browser) and POST (semantically accurate).

There are two primary protocols in use today:

1. Streamed channels

When dealing with multiple independent binary streams of data such as the remote execution of a shell command (writing to STDIN, reading from STDOUT and STDERR) or forwarding multiple ports the streams can be multiplexed onto a single TCP connection. Kubernetes supports a SPDY based framing protocol that leverages SPDY channels and a WebSocket framing protocol that multiplexes multiple channels onto the same stream by prefixing each binary chunk with a byte indicating its channel. The WebSocket protocol supports an optional subprotocol that handles base64-encoded bytes from the client and returns base64-encoded bytes from the server and character based channel prefixes ('0', '1', '2') for ease of use from JavaScript in a browser.

2. Streaming response

The default log output for a channel of streaming data is an HTTP Chunked Transfer-Encoding, which can return an arbitrary stream of binary data from the server. Browser-based JavaScript is limited in its ability to access the raw data from a chunked response, especially when very large amounts of logs are returned, and in future API calls it may be desirable to transfer large files. The streaming API endpoints support an optional WebSocket upgrade that provides a unidirectional channel from the server to the client and chunks data as binary WebSocket frames. An optional WebSocket subprotocol is exposed that base64 encodes the stream before returning it to the client.

Clients should use the SPDY protocols if their clients have native support, or WebSockets as a fallback. Note that WebSockets is susceptible to Head-of-Line blocking and so clients must read and process each message sequentionally. In the future, an HTTP/2 implementation will be exposed that deprecates SPDY.


<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/docs/devel/api-conventions.md?pixel)]()
Expand Down
10 changes: 10 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flushwriter"
"k8s.io/kubernetes/pkg/util/wsstream"
"k8s.io/kubernetes/pkg/version"

"github.com/emicklei/go-restful"
Expand Down Expand Up @@ -353,6 +354,15 @@ func write(statusCode int, apiVersion string, codec runtime.Codec, object runtim
return
}
defer out.Close()

if wsstream.IsWebSocketRequest(req) {
r := wsstream.NewReader(out, true)
if err := r.Copy(w, req); err != nil {
util.HandleError(fmt.Errorf("error encountered while streaming results via websocket: %v", err))
}
return
}

if len(contentType) == 0 {
contentType = "application/octet-stream"
}
Expand Down
78 changes: 72 additions & 6 deletions pkg/kubelet/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/limitwriter"
"k8s.io/kubernetes/pkg/util/wsstream"
)

// Server is a http.Handler which exposes kubelet functionality over HTTP.
Expand Down Expand Up @@ -255,9 +256,15 @@ func (s *Server) InstallDebuggingHandlers() {
ws = new(restful.WebService)
ws.
Path("/exec")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getExec).
Operation("getExec"))
Expand All @@ -266,9 +273,15 @@ func (s *Server) InstallDebuggingHandlers() {
ws = new(restful.WebService)
ws.
Path("/attach")
ws.Route(ws.GET("/{podNamespace}/{podID}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.GET("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
ws.Route(ws.POST("/{podNamespace}/{podID}/{uid}/{containerName}").
To(s.getAttach).
Operation("getAttach"))
Expand Down Expand Up @@ -533,6 +546,10 @@ func getContainerCoordinates(request *restful.Request) (namespace, pod string, u

const defaultStreamCreationTimeout = 30 * time.Second

type Closer interface {
Close() error
}

func (s *Server) getAttach(request *restful.Request, response *restful.Response) {
podNamespace, podID, uid, container := getContainerCoordinates(request)
pod, ok := s.host.GetPodByName(podNamespace, podID)
Expand Down Expand Up @@ -600,17 +617,43 @@ func (s *Server) getExec(request *restful.Request, response *restful.Response) {
}
}

func (s *Server) createStreams(request *restful.Request, response *restful.Response) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, httpstream.Connection, bool, bool) {
// start at 1 for error stream
// standardShellChannels returns the standard channel types for a shell connection (STDIN 0, STDOUT 1, STDERR 2)
// along with the approprxate duplex value
func standardShellChannels(stdin, stdout, stderr bool) []wsstream.ChannelType {
// open three half-duplex channels
channels := []wsstream.ChannelType{wsstream.ReadChannel, wsstream.WriteChannel, wsstream.WriteChannel}
if !stdin {
channels[0] = wsstream.IgnoreChannel
}
if !stdout {
channels[1] = wsstream.IgnoreChannel
}
if !stderr {
channels[2] = wsstream.IgnoreChannel
}
return channels
}

func (s *Server) createStreams(request *restful.Request, response *restful.Response) (io.Reader, io.WriteCloser, io.WriteCloser, io.WriteCloser, Closer, bool, bool) {
tty := request.QueryParameter(api.ExecTTYParam) == "1"
stdin := request.QueryParameter(api.ExecStdinParam) == "1"
stdout := request.QueryParameter(api.ExecStdoutParam) == "1"
stderr := request.QueryParameter(api.ExecStderrParam) == "1"
if tty && stderr {
// TODO: make this an error before we reach this method
glog.V(4).Infof("Access to exec with tty and stderr is not supported, bypassing stderr")
stderr = false
}

// count the streams client asked for, starting with 1
expectedStreams := 1
if request.QueryParameter(api.ExecStdinParam) == "1" {
if stdin {
expectedStreams++
}
if request.QueryParameter(api.ExecStdoutParam) == "1" {
if stdout {
expectedStreams++
}
tty := request.QueryParameter(api.ExecTTYParam) == "1"
if !tty && request.QueryParameter(api.ExecStderrParam) == "1" {
if stderr {
expectedStreams++
}

Expand All @@ -619,6 +662,29 @@ func (s *Server) createStreams(request *restful.Request, response *restful.Respo
return nil, nil, nil, nil, nil, false, false
}

if wsstream.IsWebSocketRequest(request.Request) {
// open the requested channels, and always open the error channel
channels := append(standardShellChannels(stdin, stdout, stderr), wsstream.WriteChannel)
conn := wsstream.NewConn(channels...)
conn.SetIdleTimeout(s.host.StreamingConnectionIdleTimeout())
streams, err := conn.Open(httplog.Unlogged(response.ResponseWriter), request.Request)
if err != nil {
glog.Errorf("Unable to upgrade websocket connection: %v", err)
return nil, nil, nil, nil, nil, false, false
}
// Send an empty message to the lowest writable channel to notify the client the connection is established
// TODO: make generic to SDPY and WebSockets and do it outside of this method?
switch {
case stdout:
streams[1].Write([]byte{})
case stderr:
streams[2].Write([]byte{})
default:
streams[3].Write([]byte{})
}
return streams[0], streams[1], streams[2], streams[3], conn, tty, true
}

streamCh := make(chan httpstream.Stream)

upgrader := spdy.NewResponseUpgrader()
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/pod/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object) (re
return newUpgradeAwareProxyHandler(location, nil, false), nil
}

// Support both GET and POST methods. Over time, we want to move all clients to start using POST and then stop supporting GET.
// Support both GET and POST methods. We must support GET for browsers that want to use WebSockets.
var upgradeableMethods = []string{"GET", "POST"}

// AttachREST implements the attach subresource for a Pod
Expand Down
Loading

0 comments on commit 363b616

Please sign in to comment.