Skip to content

Commit

Permalink
runv cmd: change protocal between it and its namespace service
Browse files Browse the repository at this point in the history
the protocal is changed to gRPC and the container management
is also changed to supervisor package.

Signed-off-by: Lai Jiangshan <jiangshanlai@gmail.com>
  • Loading branch information
laijs committed May 6, 2016
1 parent 17192aa commit 3ec3194
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 889 deletions.
48 changes: 43 additions & 5 deletions exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/codegangsta/cli"
"github.com/docker/containerd/api/grpc/types"
"github.com/hyperhq/runv/lib/term"
"github.com/opencontainers/runtime-spec/specs-go"
netcontext "golang.org/x/net/context"
)

var execCommand = cli.Command{
Expand Down Expand Up @@ -116,11 +120,8 @@ following will output a list of processes running in the container:
fmt.Printf("get process config failed %v\n", err)
os.Exit(-1)
}
conn, err := runvRequest(root, container, RUNV_EXECCMD, config.Args)
if err != nil {
fmt.Printf("exec failed: %v", err)
}
code, err := containerTtySplice(root, container, conn, false)

code := runProcess(root, container, config)
os.Exit(code)
},
}
Expand Down Expand Up @@ -183,3 +184,40 @@ func getProcess(context *cli.Context, bundle string) (*specs.Process, error) {
}
return &p, nil
}

func runProcess(root, container string, config *specs.Process) int {
pid := os.Getpid()
process := fmt.Sprintf("p-%x", pid+0xabcdef) // uniq name

p := &types.AddProcessRequest{
Id: container,
Pid: process,
Args: config.Args,
Cwd: config.Cwd,
Terminal: config.Terminal,
Env: config.Env,
User: &types.User{
Uid: config.User.UID,
Gid: config.User.GID,
},
Stdin: fmt.Sprintf("/proc/%d/fd/0", pid),
Stdout: fmt.Sprintf("/proc/%d/fd/1", pid),
Stderr: fmt.Sprintf("/proc/%d/fd/2", pid),
}
c := getClient(filepath.Join(root, container, "namespace/namespaced.sock"))
timestamp := uint64(time.Now().Unix())
if _, err := c.AddProcess(netcontext.Background(), p); err != nil {
fmt.Printf("error %v\n", err)
return -1
}
if config.Terminal {
s, err := term.SetRawTerminal(os.Stdin.Fd())
if err != nil {
fmt.Printf("error %v\n", err)
return -1
}
defer term.RestoreTerminal(os.Stdin.Fd(), s)
monitorTtySize(c, container, process)
}
return waitForExit(c, timestamp, container, process)
}
24 changes: 16 additions & 8 deletions kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package main
import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"syscall"

"github.com/codegangsta/cli"
"github.com/docker/containerd/api/grpc/types"
"github.com/hyperhq/runv/lib/linuxsignal"
netcontext "golang.org/x/net/context"
)

var linuxSignalMap = map[string]syscall.Signal{
Expand Down Expand Up @@ -68,26 +71,31 @@ signal to the init process of the "ubuntu01" container:
# runv kill ubuntu01 KILL`,
Action: func(context *cli.Context) {
root := context.GlobalString("root")
container := context.Args().First()
if container == "" {
fmt.Printf("container id cannot be empty")
os.Exit(-1)
}

sigstr := context.Args().Get(1)
if sigstr == "" {
sigstr = "SIGTERM"
}

signal, err := parseSignal(sigstr)
if err != nil {
fmt.Printf("kill container failed %v\n", err)
fmt.Printf("parse signal failed %v, signal string:%s\n", err, sigstr)
os.Exit(-1)
}

killCmd := &killContainerCmd{Name: container, Root: root, Signal: signal}
conn, err := runvRequest(root, container, RUNV_KILLCONTAINER, killCmd)
if err != nil {
fmt.Printf("kill container failed %v\n", err)
c := getClient(filepath.Join(context.GlobalString("root"), container, "namespace/namespaced.sock"))
if _, err := c.Signal(netcontext.Background(), &types.SignalRequest{
Id: container,
Pid: "init",
Signal: uint32(signal),
}); err != nil {
fmt.Printf("kill signal failed, %v", err)
os.Exit(-1)
}
conn.Close()
},
}

Expand Down
49 changes: 48 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@ package main

import (
"fmt"
"io/ioutil"
"log"
"net"
"os"
"runtime"
"time"

"github.com/codegangsta/cli"
"github.com/docker/containerd/api/grpc/types"
netcontext "golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
)

const (
Expand Down Expand Up @@ -40,7 +48,7 @@ If not specified, the default value for the 'bundle' is the current directory.
)

func main() {
if os.Args[0] == "runv-ns-daemon" {
if os.Args[0] == "runv-namespaced" {
runvNamespaceDaemon()
os.Exit(0)
}
Expand Down Expand Up @@ -112,3 +120,42 @@ func getDefaultDriver() string {
}
return ""
}

func getClient(address string) types.APIClient {
// reset the logger for grpc to log to dev/null so that it does not mess with our stdio
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
dialOpts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithTimeout(5 * time.Second)}
dialOpts = append(dialOpts,
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
},
))
conn, err := grpc.Dial(address, dialOpts...)
if err != nil {
fmt.Printf("grpc.Dial error: %v", err)
os.Exit(-1)
}
return types.NewAPIClient(conn)
}

func waitForExit(c types.APIClient, timestamp uint64, container, process string) int {
for {
events, err := c.Events(netcontext.Background(), &types.EventsRequest{Timestamp: timestamp})
if err != nil {
fmt.Printf("c.Events error: %v", err)
// TODO try to find a way to kill the process ?
return -1
}
for {
e, err := events.Recv()
if err != nil {
time.Sleep(1 * time.Second)
break
}
timestamp = e.Timestamp
if e.Id == container && e.Type == "exit" && e.Pid == process {
return int(e.Status)
}
}
}
}
120 changes: 120 additions & 0 deletions namespaced.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package main

import (
"flag"
"net"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"

"github.com/docker/containerd/api/grpc/types"
"github.com/docker/containerd/osutils"
"github.com/golang/glog"
"github.com/hyperhq/runv/containerd/api/grpc/server"
"github.com/hyperhq/runv/driverloader"
"github.com/hyperhq/runv/factory"
"github.com/hyperhq/runv/hypervisor"
"github.com/hyperhq/runv/supervisor"
"google.golang.org/grpc"
)

func runvNamespaceDaemon() {
var (
namespace string
state string
driver string
kernel string
initrd string
)
flag.StringVar(&namespace, "namespace", "", "")
flag.StringVar(&state, "state", "", "")
flag.StringVar(&driver, "driver", "", "")
flag.StringVar(&kernel, "kernel", "", "")
flag.StringVar(&initrd, "initrd", "", "")
flag.Parse()

hypervisor.InterfaceCount = 0
var err error
if hypervisor.HDriver, err = driverloader.Probe(driver); err != nil {
glog.V(1).Infof("%s\n", err.Error())
os.Exit(1)
}

daemon(namespace, state, kernel, initrd)
}

func daemon(namespace, state, kernel, initrd string) error {
// setup a standard reaper so that we don't leave any zombies if we are still alive
// this is just good practice because we are spawning new processes
s := make(chan os.Signal, 2048)
signal.Notify(s, syscall.SIGCHLD, syscall.SIGTERM, syscall.SIGINT)

// TODO: make the factory create only one vm atmost
f := factory.NewFromConfigs(kernel, initrd, nil)
sv, err := supervisor.New(state, namespace, f)
if err != nil {
return err
}

address := filepath.Join(namespace, "namespaced.sock")
server, err := startServer(address, sv)
if err != nil {
return err
}
go namespaceShare(sv, namespace, state, server)

for ss := range s {
switch ss {
case syscall.SIGCHLD:
if _, err := osutils.Reap(); err != nil {
glog.Infof("containerd: reap child processes")
}
default:
glog.Infof("stopping containerd after receiving %s", ss)
server.Stop()
os.RemoveAll(namespace)
os.Exit(0)
}
}
return nil
}

func namespaceShare(sv *supervisor.Supervisor, namespace, state string, server *grpc.Server) {
events := sv.Events.Events(time.Time{})
containerCount := 0
for e := range events {
if e.Type == supervisor.EventContainerStart {
os.Symlink(namespace, filepath.Join(state, e.ID, "namespace"))
containerCount++
} else if e.Type == supervisor.EventExit && e.PID == "init" {
containerCount--
if containerCount == 0 {
server.Stop()
os.RemoveAll(namespace)
time.Sleep(time.Second)
os.Exit(0)
}
}
}
}

func startServer(address string, sv *supervisor.Supervisor) (*grpc.Server, error) {
if err := os.RemoveAll(address); err != nil {
return nil, err
}
l, err := net.Listen("unix", address)
if err != nil {
return nil, err
}
s := grpc.NewServer()
types.RegisterAPIServer(s, server.NewServer(sv))
go func() {
glog.Infof("containerd: grpc api on %s", address)
if err := s.Serve(l); err != nil {
glog.Infof("containerd: serve grpc error")
}
}()
return s, nil
}
47 changes: 0 additions & 47 deletions ns-daemon.go

This file was deleted.

Loading

0 comments on commit 3ec3194

Please sign in to comment.