Skip to content

Commit

Permalink
Shim pluggable logging
Browse files Browse the repository at this point in the history
Closes #603

This adds logging facilities at the shim level to provide minimal I/O
overhead and pluggable logging options.  Log handling is done within the
shim so that all I/O, cpu, and memory can be charged to the container.

A sample logging driver setting up logging for a container the systemd
journal looks like this:

```go
package main

import (
	"bufio"
	"context"
	"fmt"
	"io"
	"sync"

	"github.com/containerd/containerd/runtime/v2/logging"
	"github.com/coreos/go-systemd/journal"
)

func main() {
	logging.Run(log)
}

func log(ctx context.Context, config *logging.Config, ready func() error) error {
	// construct any log metadata for the container
	vars := map[string]string{
		"SYSLOG_IDENTIFIER": fmt.Sprintf("%s:%s", config.Namespace, config.ID),
	}
	var wg sync.WaitGroup
	wg.Add(2)
	// forward both stdout and stderr to the journal
	go copy(&wg, config.Stdout, journal.PriInfo, vars)
	go copy(&wg, config.Stderr, journal.PriErr, vars)

	// signal that we are ready and setup for the container to be started
	if err := ready(); err != nil {
		return err
	}
	wg.Wait()
	return nil
}

func copy(wg *sync.WaitGroup, r io.Reader, pri journal.Priority, vars map[string]string) {
	defer wg.Done()
	s := bufio.NewScanner(r)
	for s.Scan() {
		if s.Err() != nil {
			return
		}
		journal.Send(s.Text(), pri, vars)
	}
}
```

A `logging` package has been created to assist log developers create
logging plugins for containerd.

This uses a URI based approach for logging drivers that can be expanded
in the future.

Supported URI scheme's are:

* binary
* fifo
* file

You can pass the log url via ctr on the command line:

```bash
> ctr run --rm --runtime io.containerd.runc.v2 --log-uri binary://shim-journald docker.io/library/redis:alpine redis
```

```bash
> journalctl -f -t default:redis

-- Logs begin at Tue 2018-12-11 16:29:51 EST. --
Mar 08 16:08:22 deathstar default:redis[120760]: 1:C 08 Mar 2019 21:08:22.703 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # You requested maxclients of 10000 requiring at least 10032 max file descriptors.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # Server can't set maximum open files to 10032 because of OS error: Operation not permitted.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.704 # Current maximum open files is 1024. maxclients has been reduced to 992 to compensate for low ulimit. If you need higher maxclients increase 'ulimit -n'.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 * Running mode=standalone, port=6379.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # Server initialized
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled.
Mar 08 16:08:22 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:22.705 * Ready to accept connections
Mar 08 16:08:50 deathstar default:redis[120760]: 1:signal-handler (1552079330) Received SIGINT scheduling shutdown...
Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.405 # User requested shutdown...
Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.406 * Saving the final RDB snapshot before exiting.
Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.452 * DB saved on disk
Mar 08 16:08:50 deathstar default:redis[120760]: 1:M 08 Mar 2019 21:08:50.453 # Redis is now ready to exit, bye bye...
```

The following client side Opts are added:

```go
// LogURI provides the raw logging URI
func LogURI(uri *url.URL) Creator { }
// BinaryIO forwards contianer STDOUT|STDERR directly to a logging binary
func BinaryIO(binary string, args map[string]string) Creator {}
```

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
  • Loading branch information
crosbymichael committed Mar 12, 2019
1 parent aa328df commit e6ae9cc
Show file tree
Hide file tree
Showing 16 changed files with 538 additions and 105 deletions.
64 changes: 47 additions & 17 deletions cio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
"context"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"sync"

"github.com/containerd/containerd/defaults"
Expand Down Expand Up @@ -222,46 +222,76 @@ type DirectIO struct {
cio
}

var _ IO = &DirectIO{}
var (
_ IO = &DirectIO{}
_ IO = &logURI{}
)

// LogURI provides the raw logging URI
func LogURI(uri *url.URL) Creator {
return func(_ string) (IO, error) {
return &logURI{
config: Config{
Stdout: uri.String(),
Stderr: uri.String(),
},
}, nil
}
}

// BinaryIO forwards container STDOUT|STDERR directly to a logging binary
func BinaryIO(binary string, args map[string]string) Creator {
return func(_ string) (IO, error) {
uri := &url.URL{
Scheme: "binary",
Host: binary,
}
for k, v := range args {
uri.Query().Set(k, v)
}
return &logURI{
config: Config{
Stdout: uri.String(),
Stderr: uri.String(),
},
}, nil
}
}

// LogFile creates a file on disk that logs the task's STDOUT,STDERR.
// If the log file already exists, the logs will be appended to the file.
func LogFile(path string) Creator {
return func(_ string) (IO, error) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil {
return nil, err
}
f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return nil, err
uri := &url.URL{
Scheme: "file",
Host: path,
}
f.Close()
return &logIO{
return &logURI{
config: Config{
Stdout: path,
Stderr: path,
Stdout: uri.String(),
Stderr: uri.String(),
},
}, nil
}
}

type logIO struct {
type logURI struct {
config Config
}

func (l *logIO) Config() Config {
func (l *logURI) Config() Config {
return l.config
}

func (l *logIO) Cancel() {
func (l *logURI) Cancel() {

}

func (l *logIO) Wait() {
func (l *logURI) Wait() {

}

func (l *logIO) Close() error {
func (l *logURI) Close() error {
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/ctr/commands/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ var Command = cli.Command{
Name: "null-io",
Usage: "send all IO to /dev/null",
},
cli.StringFlag{
Name: "log-uri",
Usage: "log uri",
},
cli.BoolFlag{
Name: "detach,d",
Usage: "detach from the task after it has started execution",
Expand Down Expand Up @@ -161,7 +165,7 @@ var Command = cli.Command{
}
opts := getNewTaskOpts(context)
ioOpts := []cio.Opt{cio.WithFIFODir(context.String("fifo-dir"))}
task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), ioOpts, opts...)
task, err := tasks.NewTask(ctx, client, container, context.String("checkpoint"), con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion cmd/ctr/commands/tasks/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ var startCommand = cli.Command{
Name: "null-io",
Usage: "send all IO to /dev/null",
},
cli.StringFlag{
Name: "log-uri",
Usage: "log uri",
},
cli.StringFlag{
Name: "fifo-dir",
Usage: "directory used for storing IO FIFOs",
Expand Down Expand Up @@ -85,7 +89,7 @@ var startCommand = cli.Command{
}
}

task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), ioOpts, opts...)
task, err := NewTask(ctx, client, container, "", con, context.Bool("null-io"), context.String("log-uri"), ioOpts, opts...)
if err != nil {
return err
}
Expand Down
10 changes: 9 additions & 1 deletion cmd/ctr/commands/tasks/tasks_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package tasks

import (
gocontext "context"
"net/url"
"os"
"os/signal"

Expand Down Expand Up @@ -67,7 +68,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
}

// NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, checkpoint string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
stdio := cio.NewCreator(append([]cio.Opt{cio.WithStdio}, ioOpts...)...)
if checkpoint != "" {
im, err := client.GetImage(ctx, checkpoint)
Expand All @@ -86,6 +87,13 @@ func NewTask(ctx gocontext.Context, client *containerd.Client, container contain
}
ioCreator = cio.NullIO
}
if logURI != "" {
u, err := url.Parse(logURI)
if err != nil {
return nil, err
}
ioCreator = cio.LogURI(u)
}
return container.NewTask(ctx, ioCreator, opts...)
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/ctr/commands/tasks/tasks_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func HandleConsoleResize(ctx gocontext.Context, task resizer, con console.Consol
}

// NewTask creates a new task
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
func NewTask(ctx gocontext.Context, client *containerd.Client, container containerd.Container, _ string, con console.Console, nullIO bool, logURI string, ioOpts []cio.Opt, opts ...containerd.NewTaskOpts) (containerd.Task, error) {
var ioCreator cio.Creator
if con != nil {
if nullIO {
Expand Down
2 changes: 1 addition & 1 deletion runtime/proc/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type Process interface {
// platform implementations
type Platform interface {
CopyConsole(ctx context.Context, console console.Console, stdin, stdout, stderr string,
wg, cwg *sync.WaitGroup) (console.Console, error)
wg *sync.WaitGroup) (console.Console, error)
ShutdownConsole(ctx context.Context, console console.Console) error
Close() error
}
48 changes: 27 additions & 21 deletions runtime/v1/linux/proc/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type execProcess struct {
mu sync.Mutex
id string
console console.Console
io runc.IO
io *processIO
status int
exited time.Time
pid *safePid
Expand Down Expand Up @@ -172,29 +172,30 @@ func (e *execProcess) start(ctx context.Context) (err error) {
// access e.pid until it is updated.
e.pid.Lock()
defer e.pid.Unlock()

var (
socket *runc.Socket
pidfile = filepath.Join(e.path, fmt.Sprintf("%s.pid", e.id))
pio *processIO
pidFile = newExecPidFile(e.path, e.id)
)
if e.stdio.Terminal {
if socket, err = runc.NewTempConsoleSocket(); err != nil {
return errors.Wrap(err, "failed to create runc console socket")
}
defer socket.Close()
} else if e.stdio.IsNull() {
if e.io, err = runc.NewNullIO(); err != nil {
return errors.Wrap(err, "creating new NULL IO")
}
} else {
if e.io, err = runc.NewPipeIO(e.parent.IoUID, e.parent.IoGID, withConditionalIO(e.stdio)); err != nil {
return errors.Wrap(err, "failed to create runc io pipes")
if pio, err = createIO(ctx, e.id, e.parent.IoUID, e.parent.IoGID, e.stdio); err != nil {
return errors.Wrap(err, "failed to create init process I/O")
}
e.io = pio
}
opts := &runc.ExecOpts{
PidFile: pidfile,
IO: e.io,
PidFile: pidFile.Path(),
Detach: true,
}
if pio != nil {
opts.IO = pio.IO()
}
if socket != nil {
opts.ConsoleSocket = socket
}
Expand All @@ -203,38 +204,43 @@ func (e *execProcess) start(ctx context.Context) (err error) {
return e.parent.runtimeError(err, "OCI runtime exec failed")
}
if e.stdio.Stdin != "" {
sc, err := fifo.OpenFifo(context.Background(), e.stdio.Stdin, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", e.stdio.Stdin)
if err := e.openStdin(e.stdio.Stdin); err != nil {
return err
}
e.closers = append(e.closers, sc)
e.stdin = sc
}
var copyWaitGroup sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if socket != nil {
console, err := socket.ReceiveMaster()
if err != nil {
return errors.Wrap(err, "failed to retrieve console master")
}
if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
if e.console, err = e.parent.Platform.CopyConsole(ctx, console, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg); err != nil {
return errors.Wrap(err, "failed to start console copy")
}
} else if !e.stdio.IsNull() {
if err := copyPipes(ctx, e.io, e.stdio.Stdin, e.stdio.Stdout, e.stdio.Stderr, &e.wg, &copyWaitGroup); err != nil {
} else {
if err := pio.Copy(ctx, &e.wg); err != nil {
return errors.Wrap(err, "failed to start io pipe copy")
}
}
copyWaitGroup.Wait()
pid, err := runc.ReadPidFile(opts.PidFile)
pid, err := pidFile.Read()
if err != nil {
return errors.Wrap(err, "failed to retrieve OCI runtime exec pid")
}
e.pid.pid = pid
return nil
}

func (e *execProcess) openStdin(path string) error {
sc, err := fifo.OpenFifo(context.Background(), path, syscall.O_WRONLY|syscall.O_NONBLOCK, 0)
if err != nil {
return errors.Wrapf(err, "failed to open stdin fifo %s", path)
}
e.stdin = sc
e.closers = append(e.closers, sc)
return nil
}

func (e *execProcess) Status(ctx context.Context) (string, error) {
s, err := e.parent.Status(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit e6ae9cc

Please sign in to comment.