diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index c8af6b968..49f16e6ca 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -41,6 +41,7 @@ import ( shimlog "github.com/containerd/containerd/runtime/v1" "github.com/containerd/containerd/runtime/v1/shim" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/ttrpc" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" @@ -233,7 +234,7 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.S case s := <-signals: switch s { case unix.SIGCHLD: - if err := shim.Reap(); err != nil { + if err := reaper.Reap(); err != nil { logger.WithError(err).Error("reap exit status") } case unix.SIGTERM, unix.SIGINT: @@ -291,11 +292,11 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event defer bufPool.Put(b) cmd.Stdout = b cmd.Stderr = b - c, err := shim.Default.Start(cmd) + c, err := reaper.Default.Start(cmd) if err != nil { return err } - status, err := shim.Default.Wait(cmd, c) + status, err := reaper.Default.Wait(cmd, c) if err != nil { return errors.Wrapf(err, "failed to publish event: %s", b.String()) } diff --git a/cmd/containerd-shim/shim_linux.go b/cmd/containerd-shim/shim_linux.go index d55646977..4d81aa6d5 100644 --- a/cmd/containerd-shim/shim_linux.go +++ b/cmd/containerd-shim/shim_linux.go @@ -20,7 +20,7 @@ import ( "os" "os/signal" - "github.com/containerd/containerd/runtime/v1/shim" + "github.com/containerd/containerd/sys/reaper" runc "github.com/containerd/go-runc" "github.com/containerd/ttrpc" "github.com/opencontainers/runc/libcontainer/system" @@ -34,7 +34,7 @@ func setupSignals() (chan os.Signal, error) { signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE) // make sure runc is setup to use the monitor // for waiting on processes - runc.Monitor = shim.Default + runc.Monitor = reaper.Default // set the shim as the subreaper for all orphaned processes created by the container if err := system.SetSubreaper(1); err != nil { return nil, err diff --git a/runtime/v1/shim/reaper.go b/runtime/v1/shim/reaper.go deleted file mode 100644 index 4e002f5dc..000000000 --- a/runtime/v1/shim/reaper.go +++ /dev/null @@ -1,131 +0,0 @@ -// +build !windows - -/* - Copyright The containerd Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package shim - -import ( - "os/exec" - "sync" - "time" - - "github.com/containerd/containerd/sys" - "github.com/containerd/go-runc" - "github.com/pkg/errors" -) - -// ErrNoSuchProcess is returned when the process no longer exists -var ErrNoSuchProcess = errors.New("no such process") - -const bufferSize = 2048 - -// Reap should be called when the process receives an SIGCHLD. Reap will reap -// all exited processes and close their wait channels -func Reap() error { - now := time.Now() - exits, err := sys.Reap(false) - for _, e := range exits { - Default.Notify(runc.Exit{ - Timestamp: now, - Pid: e.Pid, - Status: e.Status, - }) - } - return err -} - -// Default is the default monitor initialized for the package -var Default = &Monitor{ - subscribers: make(map[chan runc.Exit]bool), -} - -// Monitor monitors the underlying system for process status changes -type Monitor struct { - sync.Mutex - - subscribers map[chan runc.Exit]bool -} - -// Start starts the command a registers the process with the reaper -func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) { - ec := m.Subscribe() - if err := c.Start(); err != nil { - m.Unsubscribe(ec) - return nil, err - } - return ec, nil -} - -// Wait blocks until a process is signal as dead. -// User should rely on the value of the exit status to determine if the -// command was successful or not. -func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) { - for e := range ec { - if e.Pid == c.Process.Pid { - // make sure we flush all IO - c.Wait() - m.Unsubscribe(ec) - return e.Status, nil - } - } - // return no such process if the ec channel is closed and no more exit - // events will be sent - return -1, ErrNoSuchProcess -} - -// Subscribe to process exit changes -func (m *Monitor) Subscribe() chan runc.Exit { - c := make(chan runc.Exit, bufferSize) - m.Lock() - m.subscribers[c] = false - m.Unlock() - return c -} - -// Unsubscribe to process exit changes -func (m *Monitor) Unsubscribe(c chan runc.Exit) { - m.Lock() - delete(m.subscribers, c) - close(c) - m.Unlock() -} - -// Notify to subscribers exit changes -func (m *Monitor) Notify(e runc.Exit) { -retry: - m.Lock() - for c, v := range m.subscribers { - // subscriber has receive this exit - if v == true { - continue - } - - select { - case c <- e: - m.subscribers[c] = true - case <-time.After(time.Millisecond): - m.Unlock() - goto retry - } - } - - // reset subscriber's state - for c := range m.subscribers { - m.subscribers[c] = false - } - m.Unlock() -} diff --git a/runtime/v1/shim/service.go b/runtime/v1/shim/service.go index f55700135..37eae7f8b 100644 --- a/runtime/v1/shim/service.go +++ b/runtime/v1/shim/service.go @@ -40,6 +40,7 @@ import ( "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" + "github.com/containerd/containerd/sys/reaper" runc "github.com/containerd/go-runc" "github.com/containerd/typeurl" ptypes "github.com/gogo/protobuf/types" @@ -86,7 +87,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) { context: ctx, processes: make(map[string]process.Process), events: make(chan interface{}, 128), - ec: Default.Subscribe(), + ec: reaper.Default.Subscribe(), } go s.processExits() if err := s.initPlatform(); err != nil { diff --git a/runtime/v2/runc/v1/service.go b/runtime/v2/runc/v1/service.go index c4f5328bf..516a25f0d 100644 --- a/runtime/v2/runc/v1/service.go +++ b/runtime/v2/runc/v1/service.go @@ -43,6 +43,7 @@ import ( "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/containerd/sys/reaper" runcC "github.com/containerd/go-runc" "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" @@ -70,12 +71,12 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func id: id, context: ctx, events: make(chan interface{}, 128), - ec: shim.Default.Subscribe(), + ec: reaper.Default.Subscribe(), ep: ep, cancel: shutdown, } go s.processExits() - runcC.Monitor = shim.Default + runcC.Monitor = reaper.Default if err := s.initPlatform(); err != nil { shutdown() return nil, errors.Wrap(err, "failed to initialized platform behavior") diff --git a/runtime/v2/runc/v2/service.go b/runtime/v2/runc/v2/service.go index 54e82dba9..9b31b3547 100644 --- a/runtime/v2/runc/v2/service.go +++ b/runtime/v2/runc/v2/service.go @@ -44,6 +44,7 @@ import ( "github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/shim" taskAPI "github.com/containerd/containerd/runtime/v2/task" + "github.com/containerd/containerd/sys/reaper" runcC "github.com/containerd/go-runc" "github.com/containerd/typeurl" "github.com/gogo/protobuf/proto" @@ -83,13 +84,13 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func id: id, context: ctx, events: make(chan interface{}, 128), - ec: shim.Default.Subscribe(), + ec: reaper.Default.Subscribe(), ep: ep, cancel: shutdown, containers: make(map[string]*runc.Container), } go s.processExits() - runcC.Monitor = shim.Default + runcC.Monitor = reaper.Default if err := s.initPlatform(); err != nil { shutdown() return nil, errors.Wrap(err, "failed to initialized platform behavior") diff --git a/runtime/v2/shim/shim_unix.go b/runtime/v2/shim/shim_unix.go index dc3e6a891..e6dc3e02f 100644 --- a/runtime/v2/shim/shim_unix.go +++ b/runtime/v2/shim/shim_unix.go @@ -26,6 +26,7 @@ import ( "os/signal" "syscall" + "github.com/containerd/containerd/sys/reaper" "github.com/containerd/fifo" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -79,7 +80,7 @@ func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Si case s := <-signals: switch s { case unix.SIGCHLD: - if err := Reap(); err != nil { + if err := reaper.Reap(); err != nil { logger.WithError(err).Error("reap exit status") } case unix.SIGPIPE: diff --git a/runtime/v2/shim/reaper_unix.go b/sys/reaper/reaper_unix.go similarity index 99% rename from runtime/v2/shim/reaper_unix.go rename to sys/reaper/reaper_unix.go index 45a88db12..4a150a8f9 100644 --- a/runtime/v2/shim/reaper_unix.go +++ b/sys/reaper/reaper_unix.go @@ -16,7 +16,7 @@ limitations under the License. */ -package shim +package reaper import ( "os/exec"