From 13399c133083b0823b888aadd7b1bfaf29b464f5 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Wed, 14 Dec 2016 09:15:27 -0800 Subject: [PATCH 1/5] ctr: add events command This simply print all events generated by containerd Signed-off-by: Kenfe-Mickael Laventure --- cmd/ctr/events.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++ cmd/ctr/exec.go | 5 ++++ cmd/ctr/main.go | 1 + 3 files changed, 67 insertions(+) create mode 100644 cmd/ctr/events.go diff --git a/cmd/ctr/events.go b/cmd/ctr/events.go new file mode 100644 index 000000000..218b6d5fb --- /dev/null +++ b/cmd/ctr/events.go @@ -0,0 +1,61 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + + "github.com/nats-io/go-nats" + "github.com/urfave/cli" +) + +var eventsCommand = cli.Command{ + Name: "events", + Usage: "display containerd events", + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "subject, s", + Usage: "subjects filter", + Value: "containerd.>", + }, + }, + Action: func(context *cli.Context) error { + nc, err := nats.Connect(nats.DefaultURL) + if err != nil { + return err + } + nec, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER) + if err != nil { + nc.Close() + return err + } + defer nec.Close() + + evCh := make(chan *nats.Msg, 64) + sub, err := nec.Subscribe(context.String("subject"), func(e *nats.Msg) { + evCh <- e + }) + if err != nil { + return err + } + defer sub.Unsubscribe() + + for { + e, more := <-evCh + if !more { + break + } + + var prettyJSON bytes.Buffer + + err := json.Indent(&prettyJSON, e.Data, "", "\t") + if err != nil { + fmt.Println(string(e.Data)) + } else { + fmt.Println(prettyJSON.String()) + } + } + + return nil + }, +} diff --git a/cmd/ctr/exec.go b/cmd/ctr/exec.go index be694ac04..62833e894 100644 --- a/cmd/ctr/exec.go +++ b/cmd/ctr/exec.go @@ -18,6 +18,10 @@ var execCommand = cli.Command{ Name: "id, i", Usage: "target container id", }, + cli.StringFlag{ + Name: "pid, p", + Usage: "new process id", + }, cli.StringFlag{ Name: "cwd, c", Usage: "current working directory for the process", @@ -48,6 +52,7 @@ var execCommand = cli.Command{ sOpts := &execution.StartProcessRequest{ ContainerID: id, Process: &execution.Process{ + ID: context.String("pid"), Cwd: context.String("cwd"), Terminal: context.Bool("tty"), Args: context.Args(), diff --git a/cmd/ctr/main.go b/cmd/ctr/main.go index a880c72d4..a067932aa 100644 --- a/cmd/ctr/main.go +++ b/cmd/ctr/main.go @@ -36,6 +36,7 @@ containerd client app.Commands = []cli.Command{ runCommand, execCommand, + eventsCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { From 7dd69a8597d485afb4f9075d77387e68b5a196d7 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Wed, 14 Dec 2016 09:16:23 -0800 Subject: [PATCH 2/5] execution: use provided process ID for state Signed-off-by: Kenfe-Mickael Laventure --- execution/executor.go | 1 + execution/executors/oci/oci.go | 16 ++++++++++------ execution/executors/oci/process.go | 8 +++++++- execution/process.go | 1 + execution/service.go | 1 + execution/statedir.go | 10 +++++----- 6 files changed, 25 insertions(+), 12 deletions(-) diff --git a/execution/executor.go b/execution/executor.go index 3af38e6e2..8ee68f489 100644 --- a/execution/executor.go +++ b/execution/executor.go @@ -16,6 +16,7 @@ type CreateOpts struct { } type StartProcessOpts struct { + ID string Spec specs.Process Console bool Stdin string diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index 685b9d5d6..c389034d8 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -12,6 +12,10 @@ import ( "github.com/docker/containerd/execution" ) +const ( + initProcessID = "init" +) + var ( ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string") ) @@ -56,7 +60,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp } }(container) - initProcID, initStateDir, err := container.StateDir().NewProcess() + initStateDir, err := container.StateDir().NewProcess(initProcessID) if err != nil { return nil, err } @@ -80,7 +84,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp if err != nil { return nil, err } - process, err := newProcess(initProcID, pid) + process, err := newProcess(container, initProcessID, pid) if err != nil { return nil, err } @@ -126,7 +130,7 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { } return nil, err } - process, err := newProcess(filepath.Base(d), pid) + process, err := newProcess(container, filepath.Base(d), pid) if err != nil { return nil, err } @@ -198,13 +202,13 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o } }() - procID, procStateDir, err := c.StateDir().NewProcess() + procStateDir, err := c.StateDir().NewProcess(o.ID) if err != nil { return nil, err } defer func() { if err != nil { - c.StateDir().DeleteProcess(procID) + c.StateDir().DeleteProcess(o.ID) } }() @@ -223,7 +227,7 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o return nil, err } - process, err := newProcess(procID, pid) + process, err := newProcess(c, o.ID, pid) if err != nil { return nil, err } diff --git a/execution/executors/oci/process.go b/execution/executors/oci/process.go index d01b902ef..3ae61a7b5 100644 --- a/execution/executors/oci/process.go +++ b/execution/executors/oci/process.go @@ -7,22 +7,28 @@ import ( "github.com/docker/containerd/execution" ) -func newProcess(id string, pid int) (execution.Process, error) { +func newProcess(c *execution.Container, id string, pid int) (execution.Process, error) { proc, err := os.FindProcess(pid) if err != nil { return nil, err } return &process{ + c: c, id: id, proc: proc, }, nil } type process struct { + c *execution.Container id string proc *os.Process } +func (p *process) Container() *execution.Container { + return p.c +} + func (p *process) ID() string { return p.id } diff --git a/execution/process.go b/execution/process.go index 470560ff4..2f0b1f879 100644 --- a/execution/process.go +++ b/execution/process.go @@ -3,6 +3,7 @@ package execution import "os" type Process interface { + Container() *Container ID() string Pid() int64 //Spec() *specs.Process diff --git a/execution/service.go b/execution/service.go index 9a0837284..c9a6e1922 100644 --- a/execution/service.go +++ b/execution/service.go @@ -134,6 +134,7 @@ func (s *Service) StartProcess(ctx context.Context, r *api.StartProcessRequest) } process, err := s.executor.StartProcess(ctx, container, StartProcessOpts{ + ID: r.Process.ID, Spec: spec, Console: r.Console, Stdin: r.Stdin, diff --git a/execution/statedir.go b/execution/statedir.go index eb72fcb8a..985600848 100644 --- a/execution/statedir.go +++ b/execution/statedir.go @@ -26,13 +26,13 @@ func (s StateDir) Delete() error { return os.RemoveAll(string(s)) } -func (s StateDir) NewProcess() (id, dir string, err error) { - dir, err = ioutil.TempDir(s.processesDir(), "") - if err != nil { - return "", "", err +func (s StateDir) NewProcess(id string) (dir string, err error) { + dir = filepath.Join(s.processesDir(), id) + if err = os.Mkdir(dir, 0700); err != nil { + return "", err } - return filepath.Base(dir), dir, err + return dir, nil } func (s StateDir) ProcessDir(id string) string { From abaa421141b44a9f34e26e0f9279bbb105838c81 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Wed, 14 Dec 2016 19:10:35 -0800 Subject: [PATCH 3/5] execution/oci: use Status type instead of string Signed-off-by: Kenfe-Mickael Laventure --- execution/container.go | 8 ++--- execution/error.go | 7 ++++ execution/executors/oci/oci.go | 2 +- execution/executors/oci/process.go | 56 +++++++++++++++++++++--------- execution/log.go | 19 ---------- execution/process.go | 1 + execution/service.go | 6 ++-- execution/status.go | 1 + 8 files changed, 56 insertions(+), 44 deletions(-) create mode 100644 execution/error.go delete mode 100644 execution/log.go diff --git a/execution/container.go b/execution/container.go index cf2677d7a..68b17defa 100644 --- a/execution/container.go +++ b/execution/container.go @@ -11,12 +11,12 @@ func NewContainer(stateRoot, id, bundle string) (*Container, error) { id: id, bundle: bundle, stateDir: stateDir, - status: "created", + status: Created, processes: make(map[string]Process), }, nil } -func LoadContainer(dir StateDir, id, bundle, status string, initPid int64) *Container { +func LoadContainer(dir StateDir, id, bundle string, status Status, initPid int64) *Container { return &Container{ id: id, stateDir: dir, @@ -32,7 +32,7 @@ type Container struct { bundle string stateDir StateDir initPid int64 - status string + status Status processes map[string]Process } @@ -41,7 +41,7 @@ func (c *Container) ID() string { return c.id } -func (c *Container) Status() string { +func (c *Container) Status() Status { return c.status } diff --git a/execution/error.go b/execution/error.go new file mode 100644 index 000000000..5d06fc581 --- /dev/null +++ b/execution/error.go @@ -0,0 +1,7 @@ +package execution + +import "fmt" + +var ( + ErrProcessNotFound = fmt.Errorf("process not found") +) diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index c389034d8..98c78e714 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -113,7 +113,7 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { execution.StateDir(filepath.Join(r.root, runcC.ID)), runcC.ID, runcC.Bundle, - runcC.Status, + execution.Status(runcC.Status), int64(runcC.Pid), ) diff --git a/execution/executors/oci/process.go b/execution/executors/oci/process.go index 3ae61a7b5..53a4a5330 100644 --- a/execution/executors/oci/process.go +++ b/execution/executors/oci/process.go @@ -1,6 +1,7 @@ package oci import ( + "fmt" "os" "syscall" @@ -8,21 +9,27 @@ import ( ) func newProcess(c *execution.Container, id string, pid int) (execution.Process, error) { - proc, err := os.FindProcess(pid) - if err != nil { - return nil, err + status := execution.Running + if err := syscall.Kill(pid, 0); err != nil { + if err == syscall.ESRCH { + status = execution.Stopped + } else { + return nil, err + } } return &process{ - c: c, - id: id, - proc: proc, + c: c, + id: id, + pid: pid, + status: status, }, nil } type process struct { - c *execution.Container - id string - proc *os.Process + c *execution.Container + id string + pid int + status execution.Status } func (p *process) Container() *execution.Container { @@ -34,18 +41,35 @@ func (p *process) ID() string { } func (p *process) Pid() int64 { - return int64(p.proc.Pid) + return int64(p.pid) } func (p *process) Wait() (uint32, error) { - state, err := p.proc.Wait() - if err != nil { - return 0, nil + if p.status == execution.Running { + var wstatus syscall.WaitStatus + _, err := syscall.Wait4(p.pid, &wstatus, 0, nil) + if err != nil { + return 255, nil + } + // TODO: implement kill-all if we are the init pid + p.status = execution.Stopped + return uint32(wstatus.ExitStatus()), nil } - // TODO: implement kill-all if we are the init pid - return uint32(state.Sys().(syscall.WaitStatus).ExitStatus()), nil + + return 255, execution.ErrProcessNotFound } func (p *process) Signal(s os.Signal) error { - return p.proc.Signal(s) + if p.status == execution.Running { + sig, ok := s.(syscall.Signal) + if !ok { + return fmt.Errorf("invalid signal %v", s) + } + return syscall.Kill(p.pid, sig) + } + return execution.ErrProcessNotFound +} + +func (p *process) Status() execution.Status { + return p.status } diff --git a/execution/log.go b/execution/log.go deleted file mode 100644 index 21f55e9d9..000000000 --- a/execution/log.go +++ /dev/null @@ -1,19 +0,0 @@ -package execution - -import ( - "context" - - "github.com/docker/containerd/log" - "github.com/sirupsen/logrus" -) - -var ctx context.Context - -func GetLogger(module string) *logrus.Entry { - if ctx == nil { - ctx = log.WithModule(context.Background(), "execution") - } - - subCtx := log.WithModule(ctx, module) - return log.GetLogger(subCtx) -} diff --git a/execution/process.go b/execution/process.go index 2f0b1f879..9f365a6c8 100644 --- a/execution/process.go +++ b/execution/process.go @@ -9,4 +9,5 @@ type Process interface { //Spec() *specs.Process Wait() (uint32, error) Signal(os.Signal) error + Status() Status } diff --git a/execution/service.go b/execution/service.go index c9a6e1922..f8121832e 100644 --- a/execution/service.go +++ b/execution/service.go @@ -13,8 +13,7 @@ import ( ) var ( - emptyResponse = &google_protobuf.Empty{} - ErrProcessNotFound = fmt.Errorf("Process not found") + emptyResponse = &google_protobuf.Empty{} ) func New(executor Executor) (*Service, error) { @@ -24,8 +23,7 @@ func New(executor Executor) (*Service, error) { } type Service struct { - executor Executor - supervisor *Supervisor + executor Executor } func (s *Service) Create(ctx context.Context, r *api.CreateContainerRequest) (*api.CreateContainerResponse, error) { diff --git a/execution/status.go b/execution/status.go index ad8cf68b7..a6be45a75 100644 --- a/execution/status.go +++ b/execution/status.go @@ -3,6 +3,7 @@ package execution type Status string const ( + Created Status = "created" Paused Status = "paused" Running Status = "running" Stopped Status = "stopped" From 73cb78fae39f3ff4922dea58ebcc696a98fbe17d Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Thu, 15 Dec 2016 08:21:04 -0800 Subject: [PATCH 4/5] execution/oci: Add check when loading processes This should ensure that we don't kill a different process after a restore (once supported) Signed-off-by: Kenfe-Mickael Laventure --- execution/executors/oci/oci.go | 33 ++++++++++-------------------- execution/executors/oci/process.go | 32 ++++++++++++++++++++++++++++- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index 98c78e714..07d449059 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -16,6 +16,11 @@ const ( initProcessID = "init" ) +const ( + PidFilename = "pid" + StartTimeFilename = "starttime" +) + var ( ErrRootEmpty = errors.New("oci: runtime root cannot be an empty string") ) @@ -64,7 +69,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp if err != nil { return nil, err } - pidFile := filepath.Join(initStateDir, "pid") + pidFile := filepath.Join(initStateDir, PidFilename) err = r.runc.Create(ctx, id, o.Bundle, &runc.CreateOpts{ PidFile: pidFile, Console: oio.console, @@ -80,11 +85,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp } }() - pid, err := runc.ReadPidFile(pidFile) - if err != nil { - return nil, err - } - process, err := newProcess(container, initProcessID, pid) + process, err := newProcess(container, initProcessID, initStateDir) if err != nil { return nil, err } @@ -122,19 +123,11 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { return nil, err } for _, d := range dirs { - pid, err := runc.ReadPidFile(filepath.Join(d, "pid")) - if err != nil { - if os.IsNotExist(err) { - // Process died in between - continue - } - return nil, err - } - process, err := newProcess(container, filepath.Base(d), pid) + process, err := newProcess(container, filepath.Base(d), d) if err != nil { return nil, err } - container.AddProcess(process, pid == runcC.Pid) + container.AddProcess(process, process.Pid() == int64(runcC.Pid)) } return container, nil @@ -212,7 +205,7 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o } }() - pidFile := filepath.Join(procStateDir, "pid") + pidFile := filepath.Join(procStateDir, PidFilename) if err := r.runc.Exec(ctx, c.ID(), o.Spec, &runc.ExecOpts{ PidFile: pidFile, Detach: false, @@ -222,12 +215,8 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o }); err != nil { return nil, err } - pid, err := runc.ReadPidFile(pidFile) - if err != nil { - return nil, err - } - process, err := newProcess(c, o.ID, pid) + process, err := newProcess(c, o.ID, procStateDir) if err != nil { return nil, err } diff --git a/execution/executors/oci/process.go b/execution/executors/oci/process.go index 53a4a5330..b3f73f881 100644 --- a/execution/executors/oci/process.go +++ b/execution/executors/oci/process.go @@ -2,13 +2,21 @@ package oci import ( "fmt" + "io/ioutil" "os" + "path/filepath" "syscall" + "github.com/crosbymichael/go-runc" "github.com/docker/containerd/execution" + starttime "github.com/opencontainers/runc/libcontainer/system" ) -func newProcess(c *execution.Container, id string, pid int) (execution.Process, error) { +func newProcess(c *execution.Container, id, stateDir string) (execution.Process, error) { + pid, err := runc.ReadPidFile(filepath.Join(stateDir, PidFilename)) + if err != nil { + return nil, err + } status := execution.Running if err := syscall.Kill(pid, 0); err != nil { if err == syscall.ESRCH { @@ -17,6 +25,28 @@ func newProcess(c *execution.Container, id string, pid int) (execution.Process, return nil, err } } + if status == execution.Running { + stime, err := starttime.GetProcessStartTime(pid) + switch { + case os.IsNotExist(err): + status = execution.Stopped + case err != nil: + return nil, err + default: + b, err := ioutil.ReadFile(filepath.Join(stateDir, StartTimeFilename)) + switch { + case os.IsNotExist(err): + err = ioutil.WriteFile(filepath.Join(stateDir, StartTimeFilename), []byte(stime), 0600) + if err != nil { + return nil, err + } + case err != nil: + return nil, err + case string(b) != stime: + status = execution.Stopped + } + } + } return &process{ c: c, id: id, From 0fdd2469f66d6a1a324c30a8e130152e4c8acdf7 Mon Sep 17 00:00:00 2001 From: Kenfe-Mickael Laventure Date: Thu, 15 Dec 2016 10:40:24 -0800 Subject: [PATCH 5/5] execution: "restore" container on service creation Signed-off-by: Kenfe-Mickael Laventure --- cmd/containerd/main.go | 5 +++- cmd/ctr/run.go | 23 +++++++++++----- execution/container.go | 6 ++++ execution/executors/oci/oci.go | 6 ++-- execution/executors/oci/process.go | 39 ++++++++++++-------------- execution/process.go | 1 - execution/service.go | 44 ++++++++++++++++++++++++++++-- execution/status.go | 2 ++ 8 files changed, 90 insertions(+), 36 deletions(-) diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 3e2b2fb10..0cb954fa3 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -119,7 +119,10 @@ high performance container runtime } defer nec.Close() - execService, err := execution.New(executor) + ctx := log.WithModule(gocontext.Background(), "containerd") + ctx = log.WithModule(ctx, "execution") + ctx = events.WithPoster(ctx, events.GetNATSPoster(nec)) + execService, err := execution.New(ctx, executor) if err != nil { return err } diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index c6b57fada..16ec61445 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "time" gocontext "context" @@ -110,15 +111,23 @@ var runCommand = cli.Command{ } var ec uint32 + eventLoop: for { - e, more := <-evCh - if !more { - break - } + select { + case e, more := <-evCh: + if !more { + fmt.Println("No More!") + break eventLoop + } - if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID { - ec = e.StatusCode - break + if e.ID == cr.Container.ID && e.PID == cr.InitProcess.ID { + ec = e.StatusCode + break eventLoop + } + case <-time.After(1 * time.Second): + if nec.Conn.Status() != nats.CONNECTED { + break eventLoop + } } } diff --git a/execution/container.go b/execution/container.go index 68b17defa..c9c2db079 100644 --- a/execution/container.go +++ b/execution/container.go @@ -42,6 +42,12 @@ func (c *Container) ID() string { } func (c *Container) Status() Status { + for _, p := range c.processes { + if p.Pid() == c.initPid { + c.status = p.Status() + break + } + } return c.status } diff --git a/execution/executors/oci/oci.go b/execution/executors/oci/oci.go index 07d449059..5a6dc5f41 100644 --- a/execution/executors/oci/oci.go +++ b/execution/executors/oci/oci.go @@ -85,7 +85,7 @@ func (r *OCIRuntime) Create(ctx context.Context, id string, o execution.CreateOp } }() - process, err := newProcess(container, initProcessID, initStateDir) + process, err := newProcess(initProcessID, initStateDir, execution.Created) if err != nil { return nil, err } @@ -123,7 +123,7 @@ func (r *OCIRuntime) load(runcC *runc.Container) (*execution.Container, error) { return nil, err } for _, d := range dirs { - process, err := newProcess(container, filepath.Base(d), d) + process, err := newProcess(filepath.Base(d), d, execution.Running) if err != nil { return nil, err } @@ -216,7 +216,7 @@ func (r *OCIRuntime) StartProcess(ctx context.Context, c *execution.Container, o return nil, err } - process, err := newProcess(c, o.ID, procStateDir) + process, err := newProcess(o.ID, procStateDir, execution.Running) if err != nil { return nil, err } diff --git a/execution/executors/oci/process.go b/execution/executors/oci/process.go index b3f73f881..6e6095d4c 100644 --- a/execution/executors/oci/process.go +++ b/execution/executors/oci/process.go @@ -12,12 +12,11 @@ import ( starttime "github.com/opencontainers/runc/libcontainer/system" ) -func newProcess(c *execution.Container, id, stateDir string) (execution.Process, error) { +func newProcess(id, stateDir string, status execution.Status) (execution.Process, error) { pid, err := runc.ReadPidFile(filepath.Join(stateDir, PidFilename)) if err != nil { return nil, err } - status := execution.Running if err := syscall.Kill(pid, 0); err != nil { if err == syscall.ESRCH { status = execution.Stopped @@ -25,7 +24,7 @@ func newProcess(c *execution.Container, id, stateDir string) (execution.Process, return nil, err } } - if status == execution.Running { + if status != execution.Stopped { stime, err := starttime.GetProcessStartTime(pid) switch { case os.IsNotExist(err): @@ -48,22 +47,18 @@ func newProcess(c *execution.Container, id, stateDir string) (execution.Process, } } return &process{ - c: c, - id: id, - pid: pid, - status: status, + id: id, + pid: pid, + status: status, + exitCode: execution.UnknownStatusCode, }, nil } type process struct { - c *execution.Container - id string - pid int - status execution.Status -} - -func (p *process) Container() *execution.Container { - return p.c + id string + pid int + status execution.Status + exitCode uint32 } func (p *process) ID() string { @@ -75,22 +70,24 @@ func (p *process) Pid() int64 { } func (p *process) Wait() (uint32, error) { - if p.status == execution.Running { + if p.status != execution.Stopped { var wstatus syscall.WaitStatus _, err := syscall.Wait4(p.pid, &wstatus, 0, nil) if err != nil { - return 255, nil + // This process doesn't belong to us + p.exitCode = execution.UnknownStatusCode + return p.exitCode, nil } - // TODO: implement kill-all if we are the init pid + // TODO: implement kill-all if we are the init pid? p.status = execution.Stopped - return uint32(wstatus.ExitStatus()), nil + p.exitCode = uint32(wstatus.ExitStatus()) } + return p.exitCode, nil - return 255, execution.ErrProcessNotFound } func (p *process) Signal(s os.Signal) error { - if p.status == execution.Running { + if p.status != execution.Stopped { sig, ok := s.(syscall.Signal) if !ok { return fmt.Errorf("invalid signal %v", s) diff --git a/execution/process.go b/execution/process.go index 9f365a6c8..f23de9c8c 100644 --- a/execution/process.go +++ b/execution/process.go @@ -3,7 +3,6 @@ package execution import "os" type Process interface { - Container() *Container ID() string Pid() int64 //Spec() *specs.Process diff --git a/execution/service.go b/execution/service.go index f8121832e..cb3303b2b 100644 --- a/execution/service.go +++ b/execution/service.go @@ -2,6 +2,7 @@ package execution import ( "fmt" + "os" "syscall" "time" @@ -16,10 +17,47 @@ var ( emptyResponse = &google_protobuf.Empty{} ) -func New(executor Executor) (*Service, error) { - return &Service{ +func New(ctx context.Context, executor Executor) (*Service, error) { + svc := &Service{ executor: executor, - }, nil + } + + // List existing container, some of them may have died away if + // we've been restarted + containers, err := executor.List(ctx) + if err != nil { + return nil, err + } + + for _, c := range containers { + status := c.Status() + if status == Stopped || status == Deleted { + // generate exit event for all processes, (generate event for init last) + processes := c.Processes() + processes = append(processes[1:], processes[0]) + for _, p := range c.Processes() { + if p.Status() != Stopped { + p.Signal(os.Kill) + } + sc, err := p.Wait() + if err != nil { + sc = UnknownStatusCode + } + topic := GetContainerProcessEventTopic(c.ID(), p.ID()) + svc.publishEvent(ctx, topic, &ContainerExitEvent{ + ContainerEvent: ContainerEvent{ + Timestamp: time.Now(), + ID: c.ID(), + Action: "exit", + }, + PID: p.ID(), + StatusCode: sc, + }) + } + } + } + + return svc, nil } type Service struct { diff --git a/execution/status.go b/execution/status.go index a6be45a75..f3bc83b1f 100644 --- a/execution/status.go +++ b/execution/status.go @@ -8,4 +8,6 @@ const ( Running Status = "running" Stopped Status = "stopped" Deleted Status = "deleted" + + UnknownStatusCode = 255 )