diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 3e2b2fb10..0cb954fa3 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -119,7 +119,10 @@ high performance container runtime } defer nec.Close() - execService, err := execution.New(executor) + ctx := log.WithModule(gocontext.Background(), "containerd") + ctx = log.WithModule(ctx, "execution") + ctx = events.WithPoster(ctx, events.GetNATSPoster(nec)) + execService, err := execution.New(ctx, executor) if err != nil { return err } diff --git a/cmd/ctr/events.go b/cmd/ctr/events.go new file mode 100644 index 000000000..218b6d5fb --- /dev/null +++ b/cmd/ctr/events.go @@ -0,0 +1,61 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + + "github.com/nats-io/go-nats" + "github.com/urfave/cli" +) + +var eventsCommand = cli.Command{ + Name: "events", + Usage: "display containerd events", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "subject, s", + Usage: "subjects filter", + Value: "containerd.>", + }, + }, + Action: func(context *cli.Context) error { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + return err + } + nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + nc.Close() + return err + } + defer nec.Close() + + evCh := make(chan *nats.Msg, 64) + sub, err := nec.Subscribe(context.String("subject"), func(e *nats.Msg) { + evCh <- e + }) + if err != nil { + return err + } + defer sub.Unsubscribe() + + for { + e, more := <-evCh + if !more { + break + } + + var prettyJSON bytes.Buffer + + err := json.Indent(&prettyJSON, e.Data, "", "\t") + if err != nil { + fmt.Println(string(e.Data)) + } else { + fmt.Println(prettyJSON.String()) + } + } + + return nil + }, +} diff --git a/cmd/ctr/exec.go b/cmd/ctr/exec.go index be694ac04..62833e894 100644 --- a/cmd/ctr/exec.go +++ b/cmd/ctr/exec.go @@ -18,6 +18,10 @@ var execCommand = cli.Command{ Name: "id, i", Usage: "target container id", }, + cli.StringFlag{ + Name: "pid, p", + Usage: "new process id", + }, cli.StringFlag{ Name: "cwd, c", Usage: "current working directory for the process", @@ -48,6 +52,7 @@ var execCommand = cli.Command{ sOpts := &execution.StartProcessRequest{ ContainerID: id, Process: &execution.Process{ + ID: context.String("pid"), Cwd: context.String("cwd"), Terminal: context.Bool("tty"), Args: context.Args(), diff --git a/cmd/ctr/main.go b/cmd/ctr/main.go index a880c72d4..a067932aa 100644 --- a/cmd/ctr/main.go +++ b/cmd/ctr/main.go @@ -36,6 +36,7 @@ containerd client app.Commands = []cli.Command{ runCommand, execCommand, + eventsCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index c6b57fada..16ec61445 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "time" gocontext "context" @@ -110,15 +111,23 @@ var runCommand = cli.Command{ } var ec uint32 + eventLoop: for { - e, more := <-evCh - if !more { - break - } + select { + case e, more := <-evCh: + if !more { + fmt.Println("No More!") + break eventLoop + } - if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID { - ec = e.StatusCode - break + if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID { + ec = e.StatusCode + break eventLoop + } + case <-time.After(1 * time.Second): + if nec.Conn.Status() != nats.CONNECTED { + break eventLoop + } } } diff --git a/execution/container.go b/execution/container.go index cf2677d7a..c9c2db079 100644 --- a/execution/container.go +++ b/execution/container.go @@ -11,12 +11,12 @@ func NewContainer(stateRoot, id, bundle string) (*Container, error) { id: id, bundle: bundle, stateDir: stateDir, - status: "created", + status: Created, processes: make(map[string]Process), }, nil } -func LoadContainer(dir StateDir, id, bundle, status string, initPid int64) *Container { +func LoadContainer(dir StateDir, id, bundle string, status Status, initPid int64) *Container { return &Container{ id: id, stateDir: dir, @@ -32,7 +32,7 @@ type Container struct { bundle string stateDir StateDir initPid int64 - status string + status Status processes map[string]Process } @@ -41,7 +41,13 @@ func (c *Container) ID() string { return c.id } -func (c *Container) Status() string { +func (c *Container) Status() Status { + for _, p := range c.processes { + if p.Pid() == c.initPid { + c.status = p.Status() + break + } + } return c.status } diff --git a/execution/error.go b/execution/error.go new file mode 100644 index 000000000..5d06fc581 --- /dev/null +++ b/execution/error.go @@ -0,0 +1,7 @@ +package execution + +import "fmt" + +var ( + ErrProcessNotFound = fmt.Errorf("process not found") +) diff --git a/execution/executor.go b/execution/executor.go index 3af38e6e2..8ee68f489 100644 --- a/execution/executor.go +++ b/execution/executor.go @@ -16,6 +16,7 @@ type CreateOpts struct { } type StartProcessOpts struct { + ID string Spec specs.Process Console bool Stdin string diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index 1a01e64ea..c32f95cc5 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -12,6 +12,15 @@ import ( "github.com/docker/containerd/execution" ) +const ( + initProcessID = "init" +) + +const ( + PidFilename = "pid" + StartTimeFilename = "starttime" +) + var ( ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string") ) @@ -59,11 +68,11 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp } }(container) - initProcID, initStateDir, err := container.StateDir().NewProcess() + initStateDir, err := container.StateDir().NewProcess(initProcessID) if err != nil { return nil, err } - pidFile := filepath.Join(initStateDir, "pid") + pidFile := filepath.Join(initStateDir, PidFilename) err = r.runc.Create(ctx, id, o.Bundle, &runc.CreateOpts{ PidFile: pidFile, Console: oio.console, @@ -79,11 +88,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp } }() - pid, err := runc.ReadPidFile(pidFile) - if err != nil { - return nil, err - } - process, err := newProcess(initProcID, pid) + process, err := newProcess(initProcessID, initStateDir, execution.Created) if err != nil { return nil, err } @@ -112,7 +117,7 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { execution.StateDir(filepath.Join(r.root, runcC.ID)), runcC.ID, runcC.Bundle, - runcC.Status, + execution.Status(runcC.Status), int64(runcC.Pid), ) @@ -121,19 +126,11 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { return nil, err } for _, d := range dirs { - pid, err := runc.ReadPidFile(filepath.Join(d, "pid")) - if err != nil { - if os.IsNotExist(err) { - // Process died in between - continue - } - return nil, err - } - process, err := newProcess(filepath.Base(d), pid) + process, err := newProcess(filepath.Base(d), d, execution.Running) if err != nil { return nil, err } - container.AddProcess(process, pid == runcC.Pid) + container.AddProcess(process, process.Pid() == int64(runcC.Pid)) } return container, nil @@ -201,17 +198,17 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o } }() - procID, procStateDir, err := c.StateDir().NewProcess() + procStateDir, err := c.StateDir().NewProcess(o.ID) if err != nil { return nil, err } defer func() { if err != nil { - c.StateDir().DeleteProcess(procID) + c.StateDir().DeleteProcess(o.ID) } }() - pidFile := filepath.Join(procStateDir, "pid") + pidFile := filepath.Join(procStateDir, PidFilename) if err := r.runc.Exec(ctx, c.ID(), o.Spec, &runc.ExecOpts{ PidFile: pidFile, Detach: false, @@ -221,12 +218,8 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o }); err != nil { return nil, err } - pid, err := runc.ReadPidFile(pidFile) - if err != nil { - return nil, err - } - process, err := newProcess(procID, pid) + process, err := newProcess(o.ID, procStateDir, execution.Running) if err != nil { return nil, err } diff --git a/execution/executors/oci/process.go b/execution/executors/oci/process.go index d01b902ef..6e6095d4c 100644 --- a/execution/executors/oci/process.go +++ b/execution/executors/oci/process.go @@ -1,26 +1,64 @@ package oci import ( + "fmt" + "io/ioutil" "os" + "path/filepath" "syscall" + "github.com/crosbymichael/go-runc" "github.com/docker/containerd/execution" + starttime "github.com/opencontainers/runc/libcontainer/system" ) -func newProcess(id string, pid int) (execution.Process, error) { - proc, err := os.FindProcess(pid) +func newProcess(id, stateDir string, status execution.Status) (execution.Process, error) { + pid, err := runc.ReadPidFile(filepath.Join(stateDir, PidFilename)) if err != nil { return nil, err } + if err := syscall.Kill(pid, 0); err != nil { + if err == syscall.ESRCH { + status = execution.Stopped + } else { + return nil, err + } + } + if status != execution.Stopped { + stime, err := starttime.GetProcessStartTime(pid) + switch { + case os.IsNotExist(err): + status = execution.Stopped + case err != nil: + return nil, err + default: + b, err := ioutil.ReadFile(filepath.Join(stateDir, StartTimeFilename)) + switch { + case os.IsNotExist(err): + err = ioutil.WriteFile(filepath.Join(stateDir, StartTimeFilename), []byte(stime), 0600) + if err != nil { + return nil, err + } + case err != nil: + return nil, err + case string(b) != stime: + status = execution.Stopped + } + } + } return &process{ - id: id, - proc: proc, + id: id, + pid: pid, + status: status, + exitCode: execution.UnknownStatusCode, }, nil } type process struct { - id string - proc *os.Process + id string + pid int + status execution.Status + exitCode uint32 } func (p *process) ID() string { @@ -28,18 +66,37 @@ func (p *process) ID() string { } func (p *process) Pid() int64 { - return int64(p.proc.Pid) + return int64(p.pid) } func (p *process) Wait() (uint32, error) { - state, err := p.proc.Wait() - if err != nil { - return 0, nil + if p.status != execution.Stopped { + var wstatus syscall.WaitStatus + _, err := syscall.Wait4(p.pid, &wstatus, 0, nil) + if err != nil { + // This process doesn't belong to us + p.exitCode = execution.UnknownStatusCode + return p.exitCode, nil + } + // TODO: implement kill-all if we are the init pid? + p.status = execution.Stopped + p.exitCode = uint32(wstatus.ExitStatus()) } - // TODO: implement kill-all if we are the init pid - return uint32(state.Sys().(syscall.WaitStatus).ExitStatus()), nil + return p.exitCode, nil + } func (p *process) Signal(s os.Signal) error { - return p.proc.Signal(s) + if p.status != execution.Stopped { + sig, ok := s.(syscall.Signal) + if !ok { + return fmt.Errorf("invalid signal %v", s) + } + return syscall.Kill(p.pid, sig) + } + return execution.ErrProcessNotFound +} + +func (p *process) Status() execution.Status { + return p.status } diff --git a/execution/log.go b/execution/log.go deleted file mode 100644 index 21f55e9d9..000000000 --- a/execution/log.go +++ /dev/null @@ -1,19 +0,0 @@ -package execution - -import ( - "context" - - "github.com/docker/containerd/log" - "github.com/sirupsen/logrus" -) - -var ctx context.Context - -func GetLogger(module string) *logrus.Entry { - if ctx == nil { - ctx = log.WithModule(context.Background(), "execution") - } - - subCtx := log.WithModule(ctx, module) - return log.GetLogger(subCtx) -} diff --git a/execution/process.go b/execution/process.go index 470560ff4..f23de9c8c 100644 --- a/execution/process.go +++ b/execution/process.go @@ -8,4 +8,5 @@ type Process interface { //Spec() *specs.Process Wait() (uint32, error) Signal(os.Signal) error + Status() Status } diff --git a/execution/service.go b/execution/service.go index 9a0837284..cb3303b2b 100644 --- a/execution/service.go +++ b/execution/service.go @@ -2,6 +2,7 @@ package execution import ( "fmt" + "os" "syscall" "time" @@ -13,19 +14,54 @@ import ( ) var ( - emptyResponse = &google_protobuf.Empty{} - ErrProcessNotFound = fmt.Errorf("Process not found") + emptyResponse = &google_protobuf.Empty{} ) -func New(executor Executor) (*Service, error) { - return &Service{ +func New(ctx context.Context, executor Executor) (*Service, error) { + svc := &Service{ executor: executor, - }, nil + } + + // List existing container, some of them may have died away if + // we've been restarted + containers, err := executor.List(ctx) + if err != nil { + return nil, err + } + + for _, c := range containers { + status := c.Status() + if status == Stopped || status == Deleted { + // generate exit event for all processes, (generate event for init last) + processes := c.Processes() + processes = append(processes[1:], processes[0]) + for _, p := range c.Processes() { + if p.Status() != Stopped { + p.Signal(os.Kill) + } + sc, err := p.Wait() + if err != nil { + sc = UnknownStatusCode + } + topic := GetContainerProcessEventTopic(c.ID(), p.ID()) + svc.publishEvent(ctx, topic, &ContainerExitEvent{ + ContainerEvent: ContainerEvent{ + Timestamp: time.Now(), + ID: c.ID(), + Action: "exit", + }, + PID: p.ID(), + StatusCode: sc, + }) + } + } + } + + return svc, nil } type Service struct { - executor Executor - supervisor *Supervisor + executor Executor } func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) { @@ -134,6 +170,7 @@ func (s *Service) StartProcess(ctx context.Context, r *api.StartProcessRequest) } process, err := s.executor.StartProcess(ctx, container, StartProcessOpts{ + ID: r.Process.ID, Spec: spec, Console: r.Console, Stdin: r.Stdin, diff --git a/execution/statedir.go b/execution/statedir.go index eb72fcb8a..985600848 100644 --- a/execution/statedir.go +++ b/execution/statedir.go @@ -26,13 +26,13 @@ func (s StateDir) Delete() error { return os.RemoveAll(string(s)) } -func (s StateDir) NewProcess() (id, dir string, err error) { - dir, err = ioutil.TempDir(s.processesDir(), "") - if err != nil { - return "", "", err +func (s StateDir) NewProcess(id string) (dir string, err error) { + dir = filepath.Join(s.processesDir(), id) + if err = os.Mkdir(dir, 0700); err != nil { + return "", err } - return filepath.Base(dir), dir, err + return dir, nil } func (s StateDir) ProcessDir(id string) string { diff --git a/execution/status.go b/execution/status.go index ad8cf68b7..f3bc83b1f 100644 --- a/execution/status.go +++ b/execution/status.go @@ -3,8 +3,11 @@ package execution type Status string const ( + Created Status = "created" Paused Status = "paused" Running Status = "running" Stopped Status = "stopped" Deleted Status = "deleted" + + UnknownStatusCode = 255 )