Skip to content

Commit

Permalink
Support terminal resizing for exec/attach/run
Browse files Browse the repository at this point in the history
Add support for terminal resizing for exec, attach, and run. Note that for Docker, exec sessions
inherit the environment from the primary process, so if the container was created with tty=false,
that means the exec session's TERM variable will default to "dumb". Users can override this by
setting TERM=xterm (or whatever is appropriate) to get the correct "smart" terminal behavior.
  • Loading branch information
Andy Goldstein committed Jul 13, 2016
1 parent ec6181d commit 3b21a99
Show file tree
Hide file tree
Showing 41 changed files with 1,241 additions and 241 deletions.
7 changes: 4 additions & 3 deletions cmd/hyperkube/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package main

import (
"os"

"github.com/docker/docker/pkg/term"
"k8s.io/kubernetes/pkg/kubectl/cmd"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)

func NewKubectlServer() *Server {
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
// need to use term.StdStreams to get the right IO refs on Windows
stdin, stdout, stderr := term.StdStreams()
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr)
localFlags := cmd.LocalFlags()
localFlags.SetInterspersed(false)

Expand Down
6 changes: 4 additions & 2 deletions cmd/kubectl/app/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package app

import (
"os"
"github.com/docker/docker/pkg/term"

"k8s.io/kubernetes/pkg/kubectl/cmd"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
Expand All @@ -28,6 +28,8 @@ WARNING: this logic is duplicated, with minor changes, in cmd/hyperkube/kubectl.
Any salient changes here will need to be manually reflected in that file.
*/
func Run() error {
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
// need to use term.StdStreams to get the right IO refs on Windows
stdin, stdout, stderr := term.StdStreams()
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr)
return cmd.Execute()
}
2 changes: 2 additions & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2751,6 +2751,8 @@ const (
StreamTypeData = "data"
// Value for streamType header for error stream
StreamTypeError = "error"
// Value for streamType header for terminal resize stream
StreamTypeResize = "resize"

// Name of header that specifies the port being forwarded
PortHeader = "port"
Expand Down
41 changes: 25 additions & 16 deletions pkg/client/unversioned/remotecommand/remotecommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,28 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/term"
)

// StreamOptions holds information pertaining to the current streaming session: supported stream
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
// support terminal resizing.
type StreamOptions struct {
SupportedProtocols []string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Tty bool
TerminalSizeQueue term.TerminalSizeQueue
}

// Executor is an interface for transporting shell-style streams.
type Executor interface {
// Stream initiates the transport of the standard shell streams. It will transport any
// non-nil stream to a remote system, and return an error if a problem occurs. If tty
// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
// stdout stream).
Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
Stream(options StreamOptions) error
}

// StreamExecutor supports the ability to dial an httpstream connection and the ability to
Expand Down Expand Up @@ -129,14 +142,18 @@ func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, strin
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
}

type streamCreator interface {
CreateStream(headers http.Header) (httpstream.Stream, error)
}

type streamProtocolHandler interface {
stream(httpstream.Connection) error
stream(conn streamCreator) error
}

// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
conn, protocol, err := e.Dial(supportedProtocols...)
func (e *streamExecutor) Stream(options StreamOptions) error {
conn, protocol, err := e.Dial(options.SupportedProtocols...)
if err != nil {
return err
}
Expand All @@ -145,23 +162,15 @@ func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, st
var streamer streamProtocolHandler

switch protocol {
case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name:
streamer = &streamProtocolV2{
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
}
streamer = newStreamProtocolV2(options)
case "":
glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
fallthrough
case remotecommand.StreamProtocolV1Name:
streamer = &streamProtocolV1{
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
}
streamer = newStreamProtocolV1(options)
}

return streamer.stream(conn)
Expand Down
17 changes: 12 additions & 5 deletions pkg/client/unversioned/remotecommand/remotecommand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/term"
)

type fakeExecutor struct {
Expand All @@ -52,11 +53,11 @@ type fakeExecutor struct {
exec bool
}

func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return ex.run(name, uid, container, cmd, in, out, err, tty)
}

func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return ex.run(name, uid, container, nil, in, out, err, tty)
}

Expand Down Expand Up @@ -253,7 +254,13 @@ func TestStream(t *testing.T) {
t.Errorf("%s: unexpected error: %v", name, err)
continue
}
err = e.Stream(testCase.ClientProtocols, streamIn, streamOut, streamErr, testCase.Tty)
err = e.Stream(StreamOptions{
SupportedProtocols: testCase.ClientProtocols,
Stdin: streamIn,
Stdout: streamOut,
Stderr: streamErr,
Tty: testCase.Tty,
})
hasErr := err != nil

if len(testCase.Error) > 0 {
Expand All @@ -277,13 +284,13 @@ func TestStream(t *testing.T) {

if len(testCase.Stdout) > 0 {
if e, a := strings.Repeat(testCase.Stdout, testCase.MessageCount), localOut; e != a.String() {
t.Errorf("%s: expected stdout data '%s', got '%s'", name, e, a)
t.Errorf("%s: expected stdout data %q, got %q", name, e, a)
}
}

if testCase.Stderr != "" {
if e, a := strings.Repeat(testCase.Stderr, testCase.MessageCount), localErr; e != a.String() {
t.Errorf("%s: expected stderr data '%s', got '%s'", name, e, a)
t.Errorf("%s: expected stderr data %q, got %q", name, e, a)
}
}

Expand Down
62 changes: 33 additions & 29 deletions pkg/client/unversioned/remotecommand/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,27 @@ import (
)

// streamProtocolV1 implements the first version of the streaming exec & attach
// protocol. This version has some bugs, such as not being able to detecte when
// protocol. This version has some bugs, such as not being able to detect when
// non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
// http://issues.k8s.io/13395 for more details.
type streamProtocolV1 struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
tty bool
StreamOptions

errorStream httpstream.Stream
remoteStdin httpstream.Stream
remoteStdout httpstream.Stream
remoteStderr httpstream.Stream
}

var _ streamProtocolHandler = &streamProtocolV1{}

func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
return &streamProtocolV1{
StreamOptions: options,
}
}

func (p *streamProtocolV1) stream(conn streamCreator) error {
doneChan := make(chan struct{}, 2)
errorChan := make(chan error)

Expand All @@ -55,19 +63,15 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
}
}

var (
err error
errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
)

// set up all the streams first
var err error
headers := http.Header{}
headers.Set(api.StreamType, api.StreamTypeError)
errorStream, err = conn.CreateStream(headers)
p.errorStream, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer errorStream.Reset()
defer p.errorStream.Reset()

// Create all the streams first, then start the copy goroutines. The server doesn't start its copy
// goroutines until it's received all of the streams. If the client creates the stdin stream and
Expand All @@ -76,38 +80,38 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
// getting processed because the server hasn't started its copying, and it won't do that until it
// gets all the streams. By creating all the streams first, we ensure that the server is ready to
// process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
if e.stdin != nil {
if p.Stdin != nil {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err = conn.CreateStream(headers)
p.remoteStdin, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdin.Reset()
defer p.remoteStdin.Reset()
}

if e.stdout != nil {
if p.Stdout != nil {
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err = conn.CreateStream(headers)
p.remoteStdout, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdout.Reset()
defer p.remoteStdout.Reset()
}

if e.stderr != nil && !e.tty {
if p.Stderr != nil && !p.Tty {
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err = conn.CreateStream(headers)
p.remoteStderr, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStderr.Reset()
defer p.remoteStderr.Reset()
}

// now that all the streams have been created, proceed with reading & copying

// always read from errorStream
go func() {
message, err := ioutil.ReadAll(errorStream)
message, err := ioutil.ReadAll(p.errorStream)
if err != nil && err != io.EOF {
errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
return
Expand All @@ -118,25 +122,25 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
}
}()

if e.stdin != nil {
if p.Stdin != nil {
// TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
// because stdin is not closed until the process exits. If we try to call
// stdin.Close(), it returns no error but doesn't unblock the copy. It will
// exit when the process exits, instead.
go cp(api.StreamTypeStdin, remoteStdin, e.stdin)
go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin)
}

waitCount := 0
completedStreams := 0

if e.stdout != nil {
if p.Stdout != nil {
waitCount++
go cp(api.StreamTypeStdout, e.stdout, remoteStdout)
go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout)
}

if e.stderr != nil && !e.tty {
if p.Stderr != nil && !p.Tty {
waitCount++
go cp(api.StreamTypeStderr, e.stderr, remoteStderr)
go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr)
}

Loop:
Expand Down
Loading

0 comments on commit 3b21a99

Please sign in to comment.