diff --git a/api/v1/server.go b/api/v1/server.go index a105f85a5..4a66c2ffe 100644 --- a/api/v1/server.go +++ b/api/v1/server.go @@ -325,9 +325,13 @@ func (s *server) createCheckpoint(w http.ResponseWriter, r *http.Request) { name = vars["name"] ) var cp Checkpoint - if err := json.NewDecoder(r.Body).Decode(&cp); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + // most options to the checkpoint action can be left out so don't + // decode unless the client passed anything in the body. + if r.ContentLength > 0 { + if err := json.NewDecoder(r.Body).Decode(&cp); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } } e := containerd.NewEvent(containerd.CreateCheckpointEventType) e.ID = id diff --git a/containerd/main.go b/containerd/main.go index 2a1c7682f..918255ca4 100644 --- a/containerd/main.go +++ b/containerd/main.go @@ -86,8 +86,11 @@ func daemon(stateDir string, concurrency, bufferSize int) error { w := containerd.NewWorker(supervisor, wg) go w.Start() } - if err := setSubReaper(); err != nil { - return err + // only set containerd as the subreaper if it is not an init process + if pid := os.Getpid(); pid != 1 { + if err := setSubReaper(); err != nil { + return err + } } // start the signal handler in the background. go startSignalHandler(supervisor, bufferSize) diff --git a/containerd/reap_linux.go b/containerd/reap_linux.go index 40dc7068a..bbc7f58d8 100644 --- a/containerd/reap_linux.go +++ b/containerd/reap_linux.go @@ -34,8 +34,7 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { for s := range signals { switch s { case syscall.SIGTERM, syscall.SIGINT: - supervisor.Close() - os.Exit(0) + supervisor.Stop(signals) case syscall.SIGCHLD: exits, err := reap() if err != nil { @@ -46,6 +45,8 @@ func startSignalHandler(supervisor *containerd.Supervisor, bufferSize int) { } } } + supervisor.Close() + os.Exit(0) } func reap() (exits []*containerd.Event, err error) { diff --git a/delete.go b/delete.go index 8695ea0f9..c36a2792e 100644 --- a/delete.go +++ b/delete.go @@ -15,6 +15,7 @@ func (h *DeleteEvent) Handle(e *Event) error { logrus.WithField("error", err).Error("containerd: deleting container") } ContainersCounter.Dec(1) + h.s.containerGroup.Done() } return nil } diff --git a/supervisor.go b/supervisor.go index a6d470e60..c2a269ecd 100644 --- a/supervisor.go +++ b/supervisor.go @@ -2,8 +2,11 @@ package containerd import ( "os" + "os/signal" "path/filepath" goruntime "runtime" + "sync" + "syscall" "github.com/Sirupsen/logrus" "github.com/docker/containerd/runtime" @@ -56,24 +59,58 @@ func NewSupervisor(stateDir string, tasks chan *StartTask) (*Supervisor, error) type Supervisor struct { // stateDir is the directory on the system to store container runtime state information. - stateDir string - containers map[string]runtime.Container - processes map[int]runtime.Container - handlers map[EventType]Handler - runtime runtime.Runtime - journal *journal - events chan *Event - tasks chan *StartTask - subscribers map[subscriber]bool - machine Machine + stateDir string + containers map[string]runtime.Container + processes map[int]runtime.Container + handlers map[EventType]Handler + runtime runtime.Runtime + journal *journal + events chan *Event + tasks chan *StartTask + subscribers map[subscriber]bool + machine Machine + containerGroup sync.WaitGroup } type subscriber chan *Event -// need proper close logic for jobs and stuff so that sending to the channels dont panic -// but can complete jobs +func (s *Supervisor) Stop(sig chan os.Signal) { + // Close the tasks channel so that no new containers get started + close(s.tasks) + // send a SIGTERM to all containers + for id, c := range s.containers { + logrus.WithField("id", id).Debug("sending TERM to container processes") + procs, err := c.Processes() + if err != nil { + logrus.WithField("id", id).Warn("get container processes") + continue + } + if len(procs) == 0 { + continue + } + mainProc := procs[0] + if err := mainProc.Signal(syscall.SIGTERM); err != nil { + pid, _ := mainProc.Pid() + logrus.WithFields(logrus.Fields{ + "id": id, + "pid": pid, + "error": err, + }).Error("send SIGTERM to process") + } + } + go func() { + logrus.Debug("waiting for containers to exit") + s.containerGroup.Wait() + logrus.Debug("all containers exited") + // stop receiving signals and close the channel + signal.Stop(sig) + close(sig) + }() +} + +// Close closes any open files in the supervisor but expects that Stop has been +// callsed so that no more containers are started. func (s *Supervisor) Close() error { - //TODO: unsubscribe all channels return s.journal.Close() } diff --git a/worker.go b/worker.go index 9256b7862..8a4c87201 100644 --- a/worker.go +++ b/worker.go @@ -55,6 +55,7 @@ func (w *worker) Start() { continue } } + w.s.containerGroup.Add(1) ContainerStartTimer.UpdateSince(started) t.Err <- nil }