Skip to content

Commit

Permalink
Reduce the number of IO constructors
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Nephin <dnephin@gmail.com>
  • Loading branch information
dnephin committed Dec 11, 2017
1 parent a901091 commit 7d4337e
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 135 deletions.
102 changes: 61 additions & 41 deletions cio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"sync"
)

// Config holds the io configurations.
// Config holds the IO configurations.
type Config struct {
// Terminal is true if one has been allocated
Terminal bool
Expand Down Expand Up @@ -49,6 +49,7 @@ type FIFOSet struct {
close func() error
}

// Close the FIFOSet
func (f *FIFOSet) Close() error {
if f.close != nil {
return f.close()
Expand All @@ -61,67 +62,72 @@ func NewFIFOSet(config Config, close func() error) *FIFOSet {
return &FIFOSet{Config: config, close: close}
}

// NewIO returns an Creator that will provide IO sets without a terminal
func NewIO(stdin io.Reader, stdout, stderr io.Writer) Creator {
return NewIOWithTerminal(stdin, stdout, stderr, false)
// Streams used to configure a Creator or Attach
type Streams struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Terminal bool
}

// Opt customize options for creating a Creator or Attach
type Opt func(*Streams)

// WithStdio sets stream options to the standard input/output streams
func WithStdio(opt *Streams) {
WithStreams(os.Stdin, os.Stdout, os.Stderr)(opt)
}

// WithTerminal sets the terminal option
func WithTerminal(opt *Streams) {
opt.Terminal = true
}

// WithStreams sets the stream options to the specified Reader and Writers
func WithStreams(stdin io.Reader, stdout, stderr io.Writer) Opt {
return func(opt *Streams) {
opt.Stdin = stdin
opt.Stdout = stdout
opt.Stderr = stderr
}
}

// NewIOWithTerminal creates a new io set with the provided io.Reader/Writers for use with a terminal
func NewIOWithTerminal(stdin io.Reader, stdout, stderr io.Writer, terminal bool) Creator {
// NewCreator returns an IO creator from the options
func NewCreator(opts ...Opt) Creator {
streams := &Streams{}
for _, opt := range opts {
opt(streams)
}
return func(id string) (IO, error) {
fifos, err := newFIFOSetInTempDir(id)
// TODO: accept root as a param
root := "/run/containerd/fifo"
fifos, err := NewFIFOSetInDir(root, id, streams.Terminal)
if err != nil {
return nil, err
}

fifos.Terminal = terminal
set := &ioSet{in: stdin, out: stdout, err: stderr}
return copyIO(fifos, set)
return copyIO(fifos, streams)
}
}

// WithAttach attaches the existing io for a task to the provided io.Reader/Writers
func WithAttach(stdin io.Reader, stdout, stderr io.Writer) Attach {
// NewAttach attaches the existing io for a task to the provided io.Reader/Writers
func NewAttach(opts ...Opt) Attach {
streams := &Streams{}
for _, opt := range opts {
opt(streams)
}
return func(fifos *FIFOSet) (IO, error) {
if fifos == nil {
return nil, fmt.Errorf("cannot attach, missing fifos")
}
set := &ioSet{in: stdin, out: stdout, err: stderr}
return copyIO(fifos, set)
return copyIO(fifos, streams)
}
}

// Stdio returns an IO set to be used for a task
// that outputs the container's IO as the current processes Stdio
func Stdio(id string) (IO, error) {
return NewIO(os.Stdin, os.Stdout, os.Stderr)(id)
}

// StdioTerminal will setup the IO for the task to use a terminal
func StdioTerminal(id string) (IO, error) {
return NewIOWithTerminal(os.Stdin, os.Stdout, os.Stderr, true)(id)
}

// NullIO redirects the container's IO into /dev/null
func NullIO(_ string) (IO, error) {
return &cio{}, nil
}

type ioSet struct {
in io.Reader
out, err io.Writer
}

type pipes struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}

func (p *pipes) closers() []io.Closer {
return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
}

// cio is a basic container IO implementation.
type cio struct {
config Config
Expand Down Expand Up @@ -158,3 +164,17 @@ func (c *cio) Cancel() {
c.cancel()
}
}

type pipes struct {
Stdin io.WriteCloser
Stdout io.ReadCloser
Stderr io.ReadCloser
}

// DirectIO allows task IO to be handled externally by the caller
type DirectIO struct {
pipes
cio
}

var _ IO = &DirectIO{}
79 changes: 36 additions & 43 deletions cio/io_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ import (
"github.com/pkg/errors"
)

// newFIFOSetInTempDir returns a new set of fifos for the task
func newFIFOSetInTempDir(id string) (*FIFOSet, error) {
root := "/run/containerd/fifo"
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
// NewFIFOSetInDir returns a new FIFOSet with paths in a temporary directory under root
func NewFIFOSetInDir(root, id string, terminal bool) (*FIFOSet, error) {
if root != "" {
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}
}
dir, err := ioutil.TempDir(root, "")
if err != nil {
Expand All @@ -29,18 +30,15 @@ func newFIFOSetInTempDir(id string) (*FIFOSet, error) {
return os.RemoveAll(dir)
}
return NewFIFOSet(Config{
Stdin: filepath.Join(dir, id+"-stdin"),
Stdout: filepath.Join(dir, id+"-stdout"),
Stderr: filepath.Join(dir, id+"-stderr"),
Stdin: filepath.Join(dir, id+"-stdin"),
Stdout: filepath.Join(dir, id+"-stdout"),
Stderr: filepath.Join(dir, id+"-stderr"),
Terminal: terminal,
}, closer), nil
}

func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {
var (
ctx, cancel = context.WithCancel(context.Background())
wg = &sync.WaitGroup{}
)

func copyIO(fifos *FIFOSet, ioset *Streams) (*cio, error) {
var ctx, cancel = context.WithCancel(context.Background())
pipes, err := openFifos(ctx, fifos)
if err != nil {
cancel()
Expand All @@ -49,27 +47,29 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {

if fifos.Stdin != "" {
go func() {
io.Copy(pipes.Stdin, ioset.in)
io.Copy(pipes.Stdin, ioset.Stdin)
pipes.Stdin.Close()
}()
}

var wg = &sync.WaitGroup{}
wg.Add(1)
go func() {
io.Copy(ioset.out, pipes.Stdout)
io.Copy(ioset.Stdout, pipes.Stdout)
pipes.Stdout.Close()
wg.Done()
}()

if !fifos.Terminal {
wg.Add(1)
go func() {
io.Copy(ioset.err, pipes.Stderr)
io.Copy(ioset.Stderr, pipes.Stderr)
pipes.Stderr.Close()
wg.Done()
}()
}
return &cio{
config: fifos.Config,
wg: wg,
closers: append(pipes.closers(), fifos),
cancel: cancel,
Expand All @@ -78,41 +78,38 @@ func copyIO(fifos *FIFOSet, ioset *ioSet) (*cio, error) {

func openFifos(ctx context.Context, fifos *FIFOSet) (pipes, error) {
var err error
f := new(pipes)

defer func() {
if err != nil {
fifos.Close()
}
}()

var f pipes
if fifos.Stdin != "" {
if f.Stdin, err = fifo.OpenFifo(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return pipes{}, errors.Wrapf(err, "failed to open stdin fifo")
return f, errors.Wrapf(err, "failed to open stdin fifo")
}
}
if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
return pipes{}, errors.Wrapf(err, "failed to open stdout fifo")
if fifos.Stdout != "" {
if f.Stdout, err = fifo.OpenFifo(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
return f, errors.Wrapf(err, "failed to open stdout fifo")
}
}
if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
f.Stdout.Close()
return pipes{}, errors.Wrapf(err, "failed to open stderr fifo")
if fifos.Stderr != "" {
if f.Stderr, err = fifo.OpenFifo(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
f.Stdin.Close()
f.Stdout.Close()
return f, errors.Wrapf(err, "failed to open stderr fifo")
}
}
return pipes{}, nil
return f, nil
}

// NewDirectIO returns an IO implementation that exposes the IO streams as io.ReadCloser
// and io.WriteCloser. FIFOs are created in /run/containerd/fifo.
func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
fifos, err := newFIFOSetInTempDir("")
if err != nil {
return nil, err
}
fifos.Terminal = terminal

ctx, cancel := context.WithCancel(context.Background())
// and io.WriteCloser.
func NewDirectIO(ctx context.Context, fifos *FIFOSet) (*DirectIO, error) {
ctx, cancel := context.WithCancel(ctx)
pipes, err := openFifos(ctx, fifos)
return &DirectIO{
pipes: pipes,
Expand All @@ -124,10 +121,6 @@ func NewDirectIO(ctx context.Context, terminal bool) (*DirectIO, error) {
}, err
}

// DirectIO allows task IO to be handled externally by the caller
type DirectIO struct {
pipes
cio
func (p *pipes) closers() []io.Closer {
return []io.Closer{p.Stdin, p.Stdout, p.Stderr}
}

var _ IO = &DirectIO{}
Loading

0 comments on commit 7d4337e

Please sign in to comment.