diff --git a/add_process.go b/add_process.go index 85573be6b..862f289b6 100644 --- a/add_process.go +++ b/add_process.go @@ -1,21 +1,36 @@ package containerd +import "github.com/Sirupsen/logrus" + type AddProcessEvent struct { s *Supervisor } +// TODO: add this to worker for concurrent starts??? maybe not because of races where the container +// could be stopped and removed... func (h *AddProcessEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + ci, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - p, err := h.s.runtime.StartProcess(container, *e.Process, e.Stdio) + p, io, err := h.s.runtime.StartProcess(ci.container, *e.Process) if err != nil { return err } + l, err := h.s.log(ci.container.Path(), io) + if err != nil { + // log the error but continue with the other commands + logrus.WithFields(logrus.Fields{ + "error": err, + "id": e.ID, + }).Error("log stdio") + } if e.Pid, err = p.Pid(); err != nil { return err } - h.s.processes[e.Pid] = container + h.s.processes[e.Pid] = &containerInfo{ + container: ci.container, + logger: l, + } return nil } diff --git a/api/grpc/server/server.go b/api/grpc/server/server.go index 5a345b1b0..0eae0c245 100644 --- a/api/grpc/server/server.go +++ b/api/grpc/server/server.go @@ -38,10 +38,6 @@ func (s *apiServer) CreateContainer(ctx context.Context, c *types.CreateContaine Name: c.Checkpoint, } } - e.Stdio = &runtime.Stdio{ - Stderr: c.Stderr, - Stdout: c.Stdout, - } s.sv.SendEvent(e) if err := <-e.Err; err != nil { return nil, err diff --git a/checkpoint.go b/checkpoint.go index 799b748d8..3fb7f0d39 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -5,11 +5,11 @@ type CreateCheckpointEvent struct { } func (h *CreateCheckpointEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - return container.Checkpoint(*e.Checkpoint) + return i.container.Checkpoint(*e.Checkpoint) } type DeleteCheckpointEvent struct { @@ -17,9 +17,9 @@ type DeleteCheckpointEvent struct { } func (h *DeleteCheckpointEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - return container.DeleteCheckpoint(e.Checkpoint.Name) + return i.container.DeleteCheckpoint(e.Checkpoint.Name) } diff --git a/ctr/logs.go b/ctr/logs.go new file mode 100644 index 000000000..65e04bde5 --- /dev/null +++ b/ctr/logs.go @@ -0,0 +1,60 @@ +package main + +import ( + "encoding/json" + "io" + "os" + "time" + + "github.com/codegangsta/cli" + "github.com/docker/containerd" +) + +var LogsCommand = cli.Command{ + Name: "logs", + Usage: "view binary container logs generated by containerd", + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "follow,f", + Usage: "follow/tail the logs", + }, + }, + Action: func(context *cli.Context) { + path := context.Args().First() + if path == "" { + fatal("path to the log cannot be empty", 1) + } + if err := readLogs(path, context.Bool("follow")); err != nil { + fatal(err.Error(), 1) + } + }, +} + +func readLogs(path string, follow bool) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + dec := json.NewDecoder(f) + for { + var msg *containerd.Message + if err := dec.Decode(&msg); err != nil { + if err == io.EOF { + if follow { + time.Sleep(100 * time.Millisecond) + continue + } + return nil + } + return err + } + switch msg.Stream { + case "stdout": + os.Stdout.Write(msg.Data) + case "stderr": + os.Stderr.Write(msg.Data) + } + } + return nil +} diff --git a/ctr/main.go b/ctr/main.go index 3575f1724..e966e0407 100644 --- a/ctr/main.go +++ b/ctr/main.go @@ -34,9 +34,10 @@ func main() { }, } app.Commands = []cli.Command{ - ContainersCommand, CheckpointCommand, + ContainersCommand, EventsCommand, + LogsCommand, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { diff --git a/delete.go b/delete.go index 12d2595cd..722da8a25 100644 --- a/delete.go +++ b/delete.go @@ -10,10 +10,15 @@ type DeleteEvent struct { } func (h *DeleteEvent) Handle(e *Event) error { - if container, ok := h.s.containers[e.ID]; ok { - if err := h.deleteContainer(container); err != nil { + if i, ok := h.s.containers[e.ID]; ok { + if err := h.deleteContainer(i.container); err != nil { logrus.WithField("error", err).Error("containerd: deleting container") } + if i.logger != nil { + if err := i.logger.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close container logger") + } + } h.s.notifySubscribers(&Event{ Type: ExitEventType, ID: e.ID, diff --git a/event.go b/event.go index 2680955d6..7239b153f 100644 --- a/event.go +++ b/event.go @@ -36,7 +36,6 @@ type Event struct { Timestamp time.Time ID string BundlePath string - Stdio *runtime.Stdio Pid int Status int Signal os.Signal diff --git a/exit.go b/exit.go index 6bb758fa7..a997f7f51 100644 --- a/exit.go +++ b/exit.go @@ -10,9 +10,9 @@ func (h *ExitEvent) Handle(e *Event) error { logrus.WithFields(logrus.Fields{"pid": e.Pid, "status": e.Status}). Debug("containerd: process exited") // is it the child process of a container - if container, ok := h.s.processes[e.Pid]; ok { + if info, ok := h.s.processes[e.Pid]; ok { ne := NewEvent(ExecExitEventType) - ne.ID = container.ID() + ne.ID = info.container.ID() ne.Pid = e.Pid ne.Status = e.Status h.s.SendEvent(ne) @@ -42,10 +42,13 @@ type ExecExitEvent struct { func (h *ExecExitEvent) Handle(e *Event) error { // exec process: we remove this process without notifying the main event loop - container := h.s.processes[e.Pid] - if err := container.RemoveProcess(e.Pid); err != nil { + info := h.s.processes[e.Pid] + if err := info.container.RemoveProcess(e.Pid); err != nil { logrus.WithField("error", err).Error("containerd: find container for pid") } + if err := info.logger.Close(); err != nil { + logrus.WithField("error", err).Error("containerd: close process IO") + } delete(h.s.processes, e.Pid) h.s.notifySubscribers(e) return nil diff --git a/get_containers.go b/get_containers.go index 23bd44974..f8d898e22 100644 --- a/get_containers.go +++ b/get_containers.go @@ -5,8 +5,8 @@ type GetContainersEvent struct { } func (h *GetContainersEvent) Handle(e *Event) error { - for _, c := range h.s.containers { - e.Containers = append(e.Containers, c) + for _, i := range h.s.containers { + e.Containers = append(e.Containers, i.container) } return nil } diff --git a/linux/linux.go b/linux/linux.go index 8a86db568..d5f5c4e0d 100644 --- a/linux/linux.go +++ b/linux/linux.go @@ -185,6 +185,15 @@ func (p *libcontainerProcess) Signal(s os.Signal) error { return p.process.Signal(s) } +func (p *libcontainerProcess) Close() error { + // in close we always need to call wait to close/flush any pipes + _, err := p.process.Wait() + p.process.Stdin.(io.Closer).Close() + p.process.Stdout.(io.Closer).Close() + p.process.Stderr.(io.Closer).Close() + return err +} + type libcontainerContainer struct { c libcontainer.Container initProcess *libcontainerProcess @@ -306,6 +315,7 @@ func (c *libcontainerContainer) SetExited(status int) { c.exitStatus = status // meh c.exited = true + c.initProcess.Close() } func (c *libcontainerContainer) Stats() (*runtime.Stat, error) { @@ -335,11 +345,13 @@ func (c *libcontainerContainer) Processes() ([]runtime.Process, error) { } func (c *libcontainerContainer) RemoveProcess(pid int) error { - if _, ok := c.additionalProcesses[pid]; !ok { + proc, ok := c.additionalProcesses[pid] + if !ok { return runtime.ErrNotChildProcess } + err := proc.Close() delete(c.additionalProcesses, pid) - return nil + return err } func NewRuntime(stateDir string) (runtime.Runtime, error) { @@ -363,25 +375,29 @@ func (r *libcontainerRuntime) Type() string { return "libcontainer" } -func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio) (runtime.Container, error) { +func (r *libcontainerRuntime) Create(id, bundlePath string) (runtime.Container, *runtime.IO, error) { spec, rspec, err := r.loadSpec( filepath.Join(bundlePath, "config.json"), filepath.Join(bundlePath, "runtime.json"), ) if err != nil { - return nil, err + return nil, nil, err } config, err := r.createLibcontainerConfig(id, bundlePath, spec, rspec) if err != nil { - return nil, err + return nil, nil, err } container, err := r.factory.Create(id, config) if err != nil { - return nil, fmt.Errorf("create container: %v", err) + return nil, nil, fmt.Errorf("create container: %v", err) } - process, err := r.newProcess(spec.Process, stdio) + process, err := r.newProcess(spec.Process) if err != nil { - return nil, err + return nil, nil, err + } + i, err := process.InitializeIO(int(spec.Process.User.UID)) + if err != nil { + return nil, nil, err } c := &libcontainerContainer{ c: container, @@ -392,20 +408,28 @@ func (r *libcontainerRuntime) Create(id, bundlePath string, stdio *runtime.Stdio }, path: bundlePath, } - return c, nil + return c, &runtime.IO{ + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + }, nil } -func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process, stdio *runtime.Stdio) (runtime.Process, error) { +func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process) (runtime.Process, *runtime.IO, error) { c, ok := ci.(*libcontainerContainer) if !ok { - return nil, runtime.ErrInvalidContainerType + return nil, nil, runtime.ErrInvalidContainerType } - process, err := r.newProcess(p, stdio) + process, err := r.newProcess(p) if err != nil { - return nil, err + return nil, nil, err + } + i, err := process.InitializeIO(int(p.User.UID)) + if err != nil { + return nil, nil, err } if err := c.c.Start(process); err != nil { - return nil, err + return nil, nil, err } lp := &libcontainerProcess{ process: process, @@ -413,42 +437,29 @@ func (r *libcontainerRuntime) StartProcess(ci runtime.Container, p specs.Process } pid, err := process.Pid() if err != nil { - return nil, err + return nil, nil, err } c.additionalProcesses[pid] = lp - return lp, nil + return lp, &runtime.IO{ + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + }, nil } // newProcess returns a new libcontainer Process with the arguments from the // spec and stdio from the current process. -func (r *libcontainerRuntime) newProcess(p specs.Process, stdio *runtime.Stdio) (*libcontainer.Process, error) { - var ( - stderr, stdout io.Writer - ) - if stdio != nil { - if stdio.Stdout != "" { - f, err := os.OpenFile(stdio.Stdout, os.O_CREATE|os.O_WRONLY, 0755) - if err != nil { - return nil, fmt.Errorf("open stdout: %v", err) - } - stdout = f - } - if stdio.Stderr != "" { - f, err := os.OpenFile(stdio.Stderr, os.O_CREATE|os.O_WRONLY, 0755) - if err != nil { - return nil, fmt.Errorf("open stderr: %v", err) - } - stderr = f - } +func (r *libcontainerRuntime) newProcess(p specs.Process) (*libcontainer.Process, error) { + // TODO: support terminals + if p.Terminal { + return nil, runtime.ErrTerminalsNotSupported } return &libcontainer.Process{ Args: p.Args, Env: p.Env, // TODO: fix libcontainer's API to better support uid/gid in a typesafe way. - User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID), - Cwd: p.Cwd, - Stderr: stderr, - Stdout: stdout, + User: fmt.Sprintf("%d:%d", p.User.UID, p.User.GID), + Cwd: p.Cwd, }, nil } diff --git a/log.go b/log.go new file mode 100644 index 000000000..2dc96e74b --- /dev/null +++ b/log.go @@ -0,0 +1,110 @@ +package containerd + +import ( + "encoding/json" + "io" + "os" + "path/filepath" + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +type logConfig struct { + BundlePath string + LogSize int64 // in bytes + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func newLogger(i *logConfig) (*logger, error) { + l := &logger{ + config: i, + messages: make(chan *Message, DefaultBufferSize), + } + f, err := os.OpenFile( + filepath.Join(l.config.BundlePath, "logs.json"), + os.O_CREATE|os.O_WRONLY|os.O_APPEND, + 0655, + ) + if err != nil { + return nil, err + } + l.f = f + hout := &logHandler{ + stream: "stdout", + messages: l.messages, + } + herr := &logHandler{ + stream: "stderr", + messages: l.messages, + } + go func() { + io.Copy(hout, i.Stdout) + }() + go func() { + io.Copy(herr, i.Stderr) + }() + l.start() + return l, nil +} + +type Message struct { + Stream string `json:"stream"` + Timestamp time.Time `json:"timestamp"` + Data []byte `json:"data"` +} + +type logger struct { + config *logConfig + f *os.File + wg sync.WaitGroup + messages chan *Message +} + +type logHandler struct { + stream string + messages chan *Message +} + +func (h *logHandler) Write(b []byte) (int, error) { + h.messages <- &Message{ + Stream: h.stream, + Timestamp: time.Now(), + Data: b, + } + return len(b), nil +} + +func (l *logger) start() { + l.wg.Add(1) + go func() { + l.wg.Done() + enc := json.NewEncoder(l.f) + for m := range l.messages { + if err := enc.Encode(m); err != nil { + logrus.WithField("error", err).Error("write log message") + } + } + }() +} + +func (l *logger) Close() (err error) { + for _, c := range []io.Closer{ + l.config.Stdin, + l.config.Stdout, + l.config.Stderr, + } { + if cerr := c.Close(); err == nil { + err = cerr + } + } + close(l.messages) + l.wg.Wait() + if ferr := l.f.Close(); err == nil { + err = ferr + } + return err +} diff --git a/runtime/container.go b/runtime/container.go index e7c20dee0..7069f9d12 100644 --- a/runtime/container.go +++ b/runtime/container.go @@ -1,6 +1,7 @@ package runtime import ( + "io" "os" "time" @@ -8,6 +9,7 @@ import ( ) type Process interface { + io.Closer Pid() (int, error) Spec() specs.Process Signal(os.Signal) error @@ -24,9 +26,24 @@ type State struct { Status Status } -type Stdio struct { - Stderr string - Stdout string +type IO struct { + Stdin io.WriteCloser + Stdout io.ReadCloser + Stderr io.ReadCloser +} + +func (i *IO) Close() error { + var oerr error + for _, c := range []io.Closer{ + i.Stdin, + i.Stdout, + i.Stderr, + } { + if err := c.Close(); oerr == nil { + oerr = err + } + } + return oerr } type Stat struct { diff --git a/runtime/runtime.go b/runtime/runtime.go index 883d7c464..aabe055bf 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -7,18 +7,20 @@ import ( ) var ( - ErrNotChildProcess = errors.New("containerd: not a child process for container") - ErrInvalidContainerType = errors.New("containerd: invalid container type for runtime") - ErrCheckpointNotExists = errors.New("containerd: checkpoint does not exist for container") - ErrCheckpointExists = errors.New("containerd: checkpoint already exists") - ErrContainerExited = errors.New("containerd: container has exited") + ErrNotChildProcess = errors.New("containerd: not a child process for container") + ErrInvalidContainerType = errors.New("containerd: invalid container type for runtime") + ErrCheckpointNotExists = errors.New("containerd: checkpoint does not exist for container") + ErrCheckpointExists = errors.New("containerd: checkpoint already exists") + ErrContainerExited = errors.New("containerd: container has exited") + ErrTerminalsNotSupported = errors.New("containerd: terminals are not supported for runtime") ) // Runtime handles containers, containers handle their own actions type Runtime interface { - // Create creates a new container initialized but without it starting it - Create(id, bundlePath string, stdio *Stdio) (Container, error) - // StartProcess adds a new process to the container - StartProcess(Container, specs.Process, *Stdio) (Process, error) + // Type of the runtime Type() string + // Create creates a new container initialized but without it starting it + Create(id, bundlePath string) (Container, *IO, error) + // StartProcess adds a new process to the container + StartProcess(Container, specs.Process) (Process, *IO, error) } diff --git a/signal.go b/signal.go index 5cdd1c42e..77abc300c 100644 --- a/signal.go +++ b/signal.go @@ -5,11 +5,11 @@ type SignalEvent struct { } func (h *SignalEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } - processes, err := container.Processes() + processes, err := i.container.Processes() if err != nil { return err } diff --git a/start.go b/start.go index 9400e089b..f84542ad2 100644 --- a/start.go +++ b/start.go @@ -5,15 +5,18 @@ type StartEvent struct { } func (h *StartEvent) Handle(e *Event) error { - container, err := h.s.runtime.Create(e.ID, e.BundlePath, e.Stdio) + container, io, err := h.s.runtime.Create(e.ID, e.BundlePath) if err != nil { return err } h.s.containerGroup.Add(1) - h.s.containers[e.ID] = container + h.s.containers[e.ID] = &containerInfo{ + container: container, + } ContainersCounter.Inc(1) task := &StartTask{ Err: e.Err, + IO: io, Container: container, } if e.Checkpoint != nil { diff --git a/supervisor.go b/supervisor.go index bbbbfa5d4..b73bc02a8 100644 --- a/supervisor.go +++ b/supervisor.go @@ -29,8 +29,8 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err } s := &Supervisor{ stateDir: stateDir, - containers: make(map[string]runtime.Container), - processes: make(map[int]runtime.Container), + containers: make(map[string]*containerInfo), + processes: make(map[int]*containerInfo), runtime: r, tasks: tasks, events: make(chan *Event, DefaultBufferSize), @@ -54,11 +54,16 @@ func NewSupervisor(id, stateDir string, tasks chan *StartTask) (*Supervisor, err return s, nil } +type containerInfo struct { + container runtime.Container + logger *logger +} + type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. stateDir string - containers map[string]runtime.Container - processes map[int]runtime.Container + containers map[string]*containerInfo + processes map[int]*containerInfo handlers map[EventType]Handler runtime runtime.Runtime events chan *Event @@ -78,7 +83,8 @@ func (s *Supervisor) Stop(sig chan os.Signal) { // Close the tasks channel so that no new containers get started close(s.tasks) // send a SIGTERM to all containers - for id, c := range s.containers { + for id, i := range s.containers { + c := i.container logrus.WithField("id", id).Debug("sending TERM to container processes") procs, err := c.Processes() if err != nil { @@ -193,7 +199,8 @@ func (s *Supervisor) Machine() Machine { // getContainerForPid returns the container where the provided pid is the pid1 or main // process in the container func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { - for _, container := range s.containers { + for _, i := range s.containers { + container := i.container cpid, err := container.Pid() if err != nil { if lerr, ok := err.(libcontainer.Error); ok { @@ -214,3 +221,17 @@ func (s *Supervisor) getContainerForPid(pid int) (runtime.Container, error) { func (s *Supervisor) SendEvent(evt *Event) { s.events <- evt } + +func (s *Supervisor) log(path string, i *runtime.IO) (*logger, error) { + config := &logConfig{ + BundlePath: path, + Stdin: i.Stdin, + Stdout: i.Stdout, + Stderr: i.Stderr, + } + l, err := newLogger(config) + if err != nil { + return nil, err + } + return l, nil +} diff --git a/update.go b/update.go index f27cbc319..925715da3 100644 --- a/update.go +++ b/update.go @@ -7,10 +7,11 @@ type UpdateEvent struct { } func (h *UpdateEvent) Handle(e *Event) error { - container, ok := h.s.containers[e.ID] + i, ok := h.s.containers[e.ID] if !ok { return ErrContainerNotFound } + container := i.container if e.State.Status != "" { switch e.State.Status { case runtime.Running: diff --git a/worker.go b/worker.go index d77ebef1a..9f14d33fe 100644 --- a/worker.go +++ b/worker.go @@ -14,6 +14,7 @@ type Worker interface { type StartTask struct { Container runtime.Container Checkpoint string + IO *runtime.IO Err chan error } @@ -33,6 +34,16 @@ func (w *worker) Start() { defer w.wg.Done() for t := range w.s.tasks { started := time.Now() + // start logging the container's stdio + l, err := w.s.log(t.Container.Path(), t.IO) + if err != nil { + evt := NewEvent(DeleteEventType) + evt.ID = t.Container.ID() + w.s.SendEvent(evt) + t.Err <- err + continue + } + w.s.containers[t.Container.ID()].logger = l if t.Checkpoint != "" { if err := t.Container.Restore(t.Checkpoint); err != nil { evt := NewEvent(DeleteEventType)