From e5545a14612c73adbf010254828dbc12ec606ece Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 10 Dec 2015 17:07:21 -0800 Subject: [PATCH 1/3] Add basic logging to file support This currently logs to a json file with the stream type. This is slow and hard on the cpu and memory so we need to swich this over to something like protobufs for the binary logs but this is just a start. Signed-off-by: Michael Crosby --- add_process.go | 13 ++++- api/grpc/server/server.go | 4 -- ctr/logs.go | 60 ++++++++++++++++++++ ctr/main.go | 3 +- event.go | 1 - linux/linux.go | 74 ++++++++++++------------- log.go | 113 ++++++++++++++++++++++++++++++++++++++ runtime/container.go | 22 +++++++- runtime/runtime.go | 20 ++++--- start.go | 3 +- supervisor.go | 14 +++++ worker.go | 9 +++ 12 files changed, 278 insertions(+), 58 deletions(-) create mode 100644 ctr/logs.go create mode 100644 log.go diff --git a/add_process.go b/add_process.go index 85573be6b..253eab0be 100644 --- a/add_process.go +++ b/add_process.go @@ -1,18 +1,29 @@ package containerd +import "github.com/Sirupsen/logrus" + type AddProcessEvent struct { s *Supervisor } +// TODO: add this to worker for concurrent starts??? maybe not because of races where the container +// could be stopped and removed... func (h *AddProcessEvent) Handle(e *Event) error { container, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - p, err := h.s.runtime.StartProcess(container, *e.Process, e.Stdio) + p, io, err := h.s.runtime.StartProcess(container, *e.Process) if err != nil { return err } + if err := h.s.log(container.Path(), io); err != nil { + // log the error but continue with the other commands + logrus.WithFields(logrus.Fields{ + "error": err, + "id": e.ID, + }).Error("log stdio") + } if e.Pid, err = p.Pid(); err != nil { return err } diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 5a345b1b0..0eae0c245 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -38,10 +38,6 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine Name: c.Checkpoint, } } - e.Stdio = &runtime.Stdio{ - Stderr: c.Stderr, - Stdout: c.Stdout, - } s.sv.SendEvent(e) if err := <-e.Err; err != nil { return nil, err diff --git a/ctr/logs.go b/ctr/logs.go new file mode 100644 index 000000000..65e04bde5 --- /dev/null +++ b/ctr/logs.go @@ -0,0 +1,60 @@ +package main + +import ( + "encoding/json" + "io" + "os" + "time" + + "github.com/codegangsta/cli" + "github.com/docker/containerd" +) + +var LogsCommand = cli.Command{ + Name: "logs", + Usage: "view binary container logs generated by containerd", + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "follow,f", + Usage: "follow/tail the logs", + }, + }, + Action: func(context *cli.Context) { + path := context.Args().First() + if path == "" { + fatal("path to the log cannot be empty", 1) + } + if err := readLogs(path, context.Bool("follow")); err != nil { + fatal(err.Error(), 1) + } + }, +} + +func readLogs(path string, follow bool) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + dec := json.NewDecoder(f) + for { + var msg *containerd.Message + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + if follow { + time.Sleep(100 * time.Millisecond) + continue + } + return nil + } + return err + } + switch msg.Stream { + case "stdout": + os.Stdout.Write(msg.Data) + case "stderr": + os.Stderr.Write(msg.Data) + } + } + return nil +} diff --git a/ctr/main.go b/ctr/main.go index 3575f1724..e966e0407 100644 --- a/ctr/main.go +++ b/ctr/main.go @@ -34,9 +34,10 @@ func main() { }, } app.Commands = []cli.Command{ - ContainersCommand, CheckpointCommand, + ContainersCommand, EventsCommand, + LogsCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { diff --git a/event.go b/event.go index 2680955d6..7239b153f 100644 --- a/event.go +++ b/event.go @@ -36,7 +36,6 @@ type Event struct { Timestamp time.Time ID string BundlePath string - Stdio *runtime.Stdio Pid int Status int Signal os.Signal diff --git a/linux/linux.go b/linux/linux.go index 8a86db568..5809f433a 100644 --- a/linux/linux.go +++ b/linux/linux.go @@ -5,7 +5,6 @@ package linux import ( "encoding/json" "fmt" - "io" "io/ioutil" "os" "path/filepath" @@ -363,25 +362,29 @@ func (r *libcontainerRuntime) Type() string { return "libcontainer" } -func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio) (runtime.Container, error) { +func (r *libcontainerRuntime) Create(id, bundlePath string) (runtime.Container, *runtime.IO, error) { spec, rspec, err := r.loadSpec( filepath.Join(bundlePath, "config.json"), filepath.Join(bundlePath, "runtime.json"), ) if err != nil { - return nil, err + return nil, nil, err } config, err := r.createLibcontainerConfig(id, bundlePath, spec, rspec) if err != nil { - return nil, err + return nil, nil, err } container, err := r.factory.Create(id, config) if err != nil { - return nil, fmt.Errorf("create container: %v", err) + return nil, nil, fmt.Errorf("create container: %v", err) } - process, err := r.newProcess(spec.Process, stdio) + process, err := r.newProcess(spec.Process) if err != nil { - return nil, err + return nil, nil, err + } + i, err := process.InitializeIO(int(spec.Process.User.UID)) + if err != nil { + return nil, nil, err } c := &libcontainerContainer{ c: container, @@ -392,20 +395,28 @@ func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio }, path: bundlePath, } - return c, nil + return c, &runtime.IO{ + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + }, nil } -func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process, stdio *runtime.Stdio) (runtime.Process, error) { +func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process) (runtime.Process, *runtime.IO, error) { c, ok := ci.(*libcontainerContainer) if !ok { - return nil, runtime.ErrInvalidContainerType + return nil, nil, runtime.ErrInvalidContainerType } - process, err := r.newProcess(p, stdio) + process, err := r.newProcess(p) if err != nil { - return nil, err + return nil, nil, err + } + i, err := process.InitializeIO(int(p.User.UID)) + if err != nil { + return nil, nil, err } if err := c.c.Start(process); err != nil { - return nil, err + return nil, nil, err } lp := &libcontainerProcess{ process: process, @@ -413,42 +424,29 @@ func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process } pid, err := process.Pid() if err != nil { - return nil, err + return nil, nil, err } c.additionalProcesses[pid] = lp - return lp, nil + return lp, &runtime.IO{ + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + }, nil } // newProcess returns a new libcontainer Process with the arguments from the // spec and stdio from the current process. -func (r *libcontainerRuntime) newProcess(p specs.Process, stdio *runtime.Stdio) (*libcontainer.Process, error) { - var ( - stderr, stdout io.Writer - ) - if stdio != nil { - if stdio.Stdout != "" { - f, err := os.OpenFile(stdio.Stdout, os.O_CREATE|os.O_WRONLY, 0755) - if err != nil { - return nil, fmt.Errorf("open stdout: %v", err) - } - stdout = f - } - if stdio.Stderr != "" { - f, err := os.OpenFile(stdio.Stderr, os.O_CREATE|os.O_WRONLY, 0755) - if err != nil { - return nil, fmt.Errorf("open stderr: %v", err) - } - stderr = f - } +func (r *libcontainerRuntime) newProcess(p specs.Process) (*libcontainer.Process, error) { + // TODO: support terminals + if p.Terminal { + return nil, runtime.ErrTerminalsNotSupported } return &libcontainer.Process{ Args: p.Args, Env: p.Env, // TODO: fix libcontainer's API to better support uid/gid in a typesafe way. - User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID), - Cwd: p.Cwd, - Stderr: stderr, - Stdout: stdout, + User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID), + Cwd: p.Cwd, }, nil } diff --git a/log.go b/log.go new file mode 100644 index 000000000..1a84871a2 --- /dev/null +++ b/log.go @@ -0,0 +1,113 @@ +package containerd + +import ( + "encoding/json" + "io" + "os" + "path/filepath" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +type logConfig struct { + BundlePath string + LogSize int64 // in bytes + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func newLogger(i *logConfig) (*logger, error) { + l := &logger{ + config: i, + messages: make(chan *Message, DefaultBufferSize), + } + hout := &logHandler{ + stream: "stdout", + messages: l.messages, + } + herr := &logHandler{ + stream: "stderr", + messages: l.messages, + } + l.wg.Add(2) + go func() { + defer l.wg.Done() + io.Copy(hout, i.Stdout) + }() + go func() { + defer l.wg.Done() + io.Copy(herr, i.Stderr) + }() + return l, l.start() +} + +type Message struct { + Stream string `json:"stream"` + Timestamp time.Time `json:"timestamp"` + Data []byte `json:"data"` +} + +type logger struct { + config *logConfig + f *os.File + wg sync.WaitGroup + messages chan *Message +} + +type logHandler struct { + stream string + messages chan *Message +} + +func (h *logHandler) Write(b []byte) (int, error) { + h.messages <- &Message{ + Stream: h.stream, + Timestamp: time.Now(), + Data: b, + } + return len(b), nil +} + +func (l *logger) start() error { + f, err := os.OpenFile( + filepath.Join(l.config.BundlePath, "logs.json"), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, + 0655, + ) + if err != nil { + return err + } + l.f = f + l.wg.Add(1) + go func() { + l.wg.Done() + enc := json.NewEncoder(f) + for m := range l.messages { + if err := enc.Encode(m); err != nil { + logrus.WithField("error", err).Error("write log message") + } + } + }() + return nil +} + +func (l *logger) Close() (err error) { + for _, c := range []io.Closer{ + l.config.Stdin, + l.config.Stdout, + l.config.Stderr, + } { + if cerr := c.Close(); err == nil { + err = cerr + } + } + close(l.messages) + l.wg.Wait() + if ferr := l.f.Close(); err == nil { + err = ferr + } + return err +} diff --git a/runtime/container.go b/runtime/container.go index e7c20dee0..863ffe4ab 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -1,6 +1,7 @@ package runtime import ( + "io" "os" "time" @@ -24,9 +25,24 @@ type State struct { Status Status } -type Stdio struct { - Stderr string - Stdout string +type IO struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func (i *IO) Close() error { + var oerr error + for _, c := range []io.Closer{ + i.Stdin, + i.Stdout, + i.Stderr, + } { + if err := c.Close(); oerr == nil { + oerr = err + } + } + return oerr } type Stat struct { diff --git a/runtime/runtime.go b/runtime/runtime.go index 883d7c464..aabe055bf 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -7,18 +7,20 @@ import ( ) var ( - ErrNotChildProcess = errors.New("containerd: not a child process for container") - ErrInvalidContainerType = errors.New("containerd: invalid container type for runtime") - ErrCheckpointNotExists = errors.New("containerd: checkpoint does not exist for container") - ErrCheckpointExists = errors.New("containerd: checkpoint already exists") - ErrContainerExited = errors.New("containerd: container has exited") + ErrNotChildProcess = errors.New("containerd: not a child process for container") + ErrInvalidContainerType = errors.New("containerd: invalid container type for runtime") + ErrCheckpointNotExists = errors.New("containerd: checkpoint does not exist for container") + ErrCheckpointExists = errors.New("containerd: checkpoint already exists") + ErrContainerExited = errors.New("containerd: container has exited") + ErrTerminalsNotSupported = errors.New("containerd: terminals are not supported for runtime") ) // Runtime handles containers, containers handle their own actions type Runtime interface { - // Create creates a new container initialized but without it starting it - Create(id, bundlePath string, stdio *Stdio) (Container, error) - // StartProcess adds a new process to the container - StartProcess(Container, specs.Process, *Stdio) (Process, error) + // Type of the runtime Type() string + // Create creates a new container initialized but without it starting it + Create(id, bundlePath string) (Container, *IO, error) + // StartProcess adds a new process to the container + StartProcess(Container, specs.Process) (Process, *IO, error) } diff --git a/start.go b/start.go index 9400e089b..4c725ad06 100644 --- a/start.go +++ b/start.go @@ -5,7 +5,7 @@ type StartEvent struct { } func (h *StartEvent) Handle(e *Event) error { - container, err := h.s.runtime.Create(e.ID, e.BundlePath, e.Stdio) + container, io, err := h.s.runtime.Create(e.ID, e.BundlePath) if err != nil { return err } @@ -14,6 +14,7 @@ func (h *StartEvent) Handle(e *Event) error { ContainersCounter.Inc(1) task := &StartTask{ Err: e.Err, + IO: io, Container: container, } if e.Checkpoint != nil { diff --git a/supervisor.go b/supervisor.go index bbbbfa5d4..b2761df43 100644 --- a/supervisor.go +++ b/supervisor.go @@ -214,3 +214,17 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } + +func (s *Supervisor) log(path string, i *runtime.IO) error { + config := &logConfig{ + BundlePath: path, + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + } + // TODO: save logger to call close after its all done + if _, err := newLogger(config); err != nil { + return err + } + return nil +} diff --git a/worker.go b/worker.go index d77ebef1a..c3ca8be69 100644 --- a/worker.go +++ b/worker.go @@ -14,6 +14,7 @@ type Worker interface { type StartTask struct { Container runtime.Container Checkpoint string + IO *runtime.IO Err chan error } @@ -33,6 +34,14 @@ func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { started := time.Now() + // start logging the container's stdio + if err := w.s.log(t.Container.Path(), t.IO); err != nil { + evt := NewEvent(DeleteEventType) + evt.ID = t.Container.ID() + w.s.SendEvent(evt) + t.Err <- err + continue + } if t.Checkpoint != "" { if err := t.Container.Restore(t.Checkpoint); err != nil { evt := NewEvent(DeleteEventType) From 3010f209fff6d1e0eaec32a7c29dbfcf229837c6 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 11 Dec 2015 11:27:33 -0800 Subject: [PATCH 2/3] Refactor container info in supervisor Signed-off-by: Michael Crosby --- add_process.go | 12 ++++++++---- checkpoint.go | 8 ++++---- delete.go | 7 +++++-- exit.go | 11 +++++++---- get_containers.go | 4 ++-- log.go | 29 +++++++++++++---------------- signal.go | 4 ++-- start.go | 4 +++- supervisor.go | 29 ++++++++++++++++++----------- update.go | 3 ++- worker.go | 4 +++- 11 files changed, 67 insertions(+), 48 deletions(-) diff --git a/add_process.go b/add_process.go index 253eab0be..862f289b6 100644 --- a/add_process.go +++ b/add_process.go @@ -9,15 +9,16 @@ type AddProcessEvent struct { // TODO: add this to worker for concurrent starts??? maybe not because of races where the container // could be stopped and removed... func (h *AddProcessEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + ci, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - p, io, err := h.s.runtime.StartProcess(container, *e.Process) + p, io, err := h.s.runtime.StartProcess(ci.container, *e.Process) if err != nil { return err } - if err := h.s.log(container.Path(), io); err != nil { + l, err := h.s.log(ci.container.Path(), io) + if err != nil { // log the error but continue with the other commands logrus.WithFields(logrus.Fields{ "error": err, @@ -27,6 +28,9 @@ func (h *AddProcessEvent) Handle(e *Event) error { if e.Pid, err = p.Pid(); err != nil { return err } - h.s.processes[e.Pid] = container + h.s.processes[e.Pid] = &containerInfo{ + container: ci.container, + logger: l, + } return nil } diff --git a/checkpoint.go b/checkpoint.go index 799b748d8..3fb7f0d39 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -5,11 +5,11 @@ type CreateCheckpointEvent struct { } func (h *CreateCheckpointEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - return container.Checkpoint(*e.Checkpoint) + return i.container.Checkpoint(*e.Checkpoint) } type DeleteCheckpointEvent struct { @@ -17,9 +17,9 @@ type DeleteCheckpointEvent struct { } func (h *DeleteCheckpointEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - return container.DeleteCheckpoint(e.Checkpoint.Name) + return i.container.DeleteCheckpoint(e.Checkpoint.Name) } diff --git a/delete.go b/delete.go index 12d2595cd..075b5d45e 100644 --- a/delete.go +++ b/delete.go @@ -10,10 +10,13 @@ type DeleteEvent struct { } func (h *DeleteEvent) Handle(e *Event) error { - if container, ok := h.s.containers[e.ID]; ok { - if err := h.deleteContainer(container); err != nil { + if i, ok := h.s.containers[e.ID]; ok { + if err := h.deleteContainer(i.container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } + if err := i.logger.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close container logger") + } h.s.notifySubscribers(&Event{ Type: ExitEventType, ID: e.ID, diff --git a/exit.go b/exit.go index 6bb758fa7..a997f7f51 100644 --- a/exit.go +++ b/exit.go @@ -10,9 +10,9 @@ func (h *ExitEvent) Handle(e *Event) error { logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). Debug("containerd: process exited") // is it the child process of a container - if container, ok := h.s.processes[e.Pid]; ok { + if info, ok := h.s.processes[e.Pid]; ok { ne := NewEvent(ExecExitEventType) - ne.ID = container.ID() + ne.ID = info.container.ID() ne.Pid = e.Pid ne.Status = e.Status h.s.SendEvent(ne) @@ -42,10 +42,13 @@ type ExecExitEvent struct { func (h *ExecExitEvent) Handle(e *Event) error { // exec process: we remove this process without notifying the main event loop - container := h.s.processes[e.Pid] - if err := container.RemoveProcess(e.Pid); err != nil { + info := h.s.processes[e.Pid] + if err := info.container.RemoveProcess(e.Pid); err != nil { logrus.WithField("error", err).Error("containerd: find container for pid") } + if err := info.logger.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close process IO") + } delete(h.s.processes, e.Pid) h.s.notifySubscribers(e) return nil diff --git a/get_containers.go b/get_containers.go index 23bd44974..f8d898e22 100644 --- a/get_containers.go +++ b/get_containers.go @@ -5,8 +5,8 @@ type GetContainersEvent struct { } func (h *GetContainersEvent) Handle(e *Event) error { - for _, c := range h.s.containers { - e.Containers = append(e.Containers, c) + for _, i := range h.s.containers { + e.Containers = append(e.Containers, i.container) } return nil } diff --git a/log.go b/log.go index 1a84871a2..2dc96e74b 100644 --- a/log.go +++ b/log.go @@ -24,6 +24,15 @@ func newLogger(i *logConfig) (*logger, error) { config: i, messages: make(chan *Message, DefaultBufferSize), } + f, err := os.OpenFile( + filepath.Join(l.config.BundlePath, "logs.json"), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, + 0655, + ) + if err != nil { + return nil, err + } + l.f = f hout := &logHandler{ stream: "stdout", messages: l.messages, @@ -32,16 +41,14 @@ func newLogger(i *logConfig) (*logger, error) { stream: "stderr", messages: l.messages, } - l.wg.Add(2) go func() { - defer l.wg.Done() io.Copy(hout, i.Stdout) }() go func() { - defer l.wg.Done() io.Copy(herr, i.Stderr) }() - return l, l.start() + l.start() + return l, nil } type Message struct { @@ -71,27 +78,17 @@ func (h *logHandler) Write(b []byte) (int, error) { return len(b), nil } -func (l *logger) start() error { - f, err := os.OpenFile( - filepath.Join(l.config.BundlePath, "logs.json"), - os.O_CREATE|os.O_WRONLY|os.O_APPEND, - 0655, - ) - if err != nil { - return err - } - l.f = f +func (l *logger) start() { l.wg.Add(1) go func() { l.wg.Done() - enc := json.NewEncoder(f) + enc := json.NewEncoder(l.f) for m := range l.messages { if err := enc.Encode(m); err != nil { logrus.WithField("error", err).Error("write log message") } } }() - return nil } func (l *logger) Close() (err error) { diff --git a/signal.go b/signal.go index 5cdd1c42e..77abc300c 100644 --- a/signal.go +++ b/signal.go @@ -5,11 +5,11 @@ type SignalEvent struct { } func (h *SignalEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - processes, err := container.Processes() + processes, err := i.container.Processes() if err != nil { return err } diff --git a/start.go b/start.go index 4c725ad06..f84542ad2 100644 --- a/start.go +++ b/start.go @@ -10,7 +10,9 @@ func (h *StartEvent) Handle(e *Event) error { return err } h.s.containerGroup.Add(1) - h.s.containers[e.ID] = container + h.s.containers[e.ID] = &containerInfo{ + container: container, + } ContainersCounter.Inc(1) task := &StartTask{ Err: e.Err, diff --git a/supervisor.go b/supervisor.go index b2761df43..b73bc02a8 100644 --- a/supervisor.go +++ b/supervisor.go @@ -29,8 +29,8 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err } s := &Supervisor{ stateDir: stateDir, - containers: make(map[string]runtime.Container), - processes: make(map[int]runtime.Container), + containers: make(map[string]*containerInfo), + processes: make(map[int]*containerInfo), runtime: r, tasks: tasks, events: make(chan *Event, DefaultBufferSize), @@ -54,11 +54,16 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err return s, nil } +type containerInfo struct { + container runtime.Container + logger *logger +} + type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. stateDir string - containers map[string]runtime.Container - processes map[int]runtime.Container + containers map[string]*containerInfo + processes map[int]*containerInfo handlers map[EventType]Handler runtime runtime.Runtime events chan *Event @@ -78,7 +83,8 @@ func (s *Supervisor) Stop(sig chan os.Signal) { // Close the tasks channel so that no new containers get started close(s.tasks) // send a SIGTERM to all containers - for id, c := range s.containers { + for id, i := range s.containers { + c := i.container logrus.WithField("id", id).Debug("sending TERM to container processes") procs, err := c.Processes() if err != nil { @@ -193,7 +199,8 @@ func (s *Supervisor) Machine() Machine { // getContainerForPid returns the container where the provided pid is the pid1 or main // process in the container func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { - for _, container := range s.containers { + for _, i := range s.containers { + container := i.container cpid, err := container.Pid() if err != nil { if lerr, ok := err.(libcontainer.Error); ok { @@ -215,16 +222,16 @@ func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } -func (s *Supervisor) log(path string, i *runtime.IO) error { +func (s *Supervisor) log(path string, i *runtime.IO) (*logger, error) { config := &logConfig{ BundlePath: path, Stdin: i.Stdin, Stdout: i.Stdout, Stderr: i.Stderr, } - // TODO: save logger to call close after its all done - if _, err := newLogger(config); err != nil { - return err + l, err := newLogger(config) + if err != nil { + return nil, err } - return nil + return l, nil } diff --git a/update.go b/update.go index f27cbc319..925715da3 100644 --- a/update.go +++ b/update.go @@ -7,10 +7,11 @@ type UpdateEvent struct { } func (h *UpdateEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } + container := i.container if e.State.Status != "" { switch e.State.Status { case runtime.Running: diff --git a/worker.go b/worker.go index c3ca8be69..9f14d33fe 100644 --- a/worker.go +++ b/worker.go @@ -35,13 +35,15 @@ func (w *worker) Start() { for t := range w.s.tasks { started := time.Now() // start logging the container's stdio - if err := w.s.log(t.Container.Path(), t.IO); err != nil { + l, err := w.s.log(t.Container.Path(), t.IO) + if err != nil { evt := NewEvent(DeleteEventType) evt.ID = t.Container.ID() w.s.SendEvent(evt) t.Err <- err continue } + w.s.containers[t.Container.ID()].logger = l if t.Checkpoint != "" { if err := t.Container.Restore(t.Checkpoint); err != nil { evt := NewEvent(DeleteEventType) From 9052c886f029c9bb4ebcc5c404c9e240aa72674b Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 11 Dec 2015 11:56:01 -0800 Subject: [PATCH 3/3] Fix leak in logging and proc pipes Signed-off-by: Michael Crosby --- delete.go | 6 ++++-- linux/linux.go | 17 +++++++++++++++-- runtime/container.go | 1 + 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/delete.go b/delete.go index 075b5d45e..722da8a25 100644 --- a/delete.go +++ b/delete.go @@ -14,8 +14,10 @@ func (h *DeleteEvent) Handle(e *Event) error { if err := h.deleteContainer(i.container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } - if err := i.logger.Close(); err != nil { - logrus.WithField("error", err).Error("containerd: close container logger") + if i.logger != nil { + if err := i.logger.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close container logger") + } } h.s.notifySubscribers(&Event{ Type: ExitEventType, diff --git a/linux/linux.go b/linux/linux.go index 5809f433a..d5f5c4e0d 100644 --- a/linux/linux.go +++ b/linux/linux.go @@ -5,6 +5,7 @@ package linux import ( "encoding/json" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -184,6 +185,15 @@ func (p *libcontainerProcess) Signal(s os.Signal) error { return p.process.Signal(s) } +func (p *libcontainerProcess) Close() error { + // in close we always need to call wait to close/flush any pipes + _, err := p.process.Wait() + p.process.Stdin.(io.Closer).Close() + p.process.Stdout.(io.Closer).Close() + p.process.Stderr.(io.Closer).Close() + return err +} + type libcontainerContainer struct { c libcontainer.Container initProcess *libcontainerProcess @@ -305,6 +315,7 @@ func (c *libcontainerContainer) SetExited(status int) { c.exitStatus = status // meh c.exited = true + c.initProcess.Close() } func (c *libcontainerContainer) Stats() (*runtime.Stat, error) { @@ -334,11 +345,13 @@ func (c *libcontainerContainer) Processes() ([]runtime.Process, error) { } func (c *libcontainerContainer) RemoveProcess(pid int) error { - if _, ok := c.additionalProcesses[pid]; !ok { + proc, ok := c.additionalProcesses[pid] + if !ok { return runtime.ErrNotChildProcess } + err := proc.Close() delete(c.additionalProcesses, pid) - return nil + return err } func NewRuntime(stateDir string) (runtime.Runtime, error) { diff --git a/runtime/container.go b/runtime/container.go index 863ffe4ab..7069f9d12 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -9,6 +9,7 @@ import ( ) type Process interface { + io.Closer Pid() (int, error) Spec() specs.Process Signal(os.Signal) error