Merge pull request #3540 from crosbymichael/shim-hang

Use non-blocking send and retry for exit events
This commit is contained in:
Michael Crosby 2019-08-20 09:31:21 -04:00 committed by GitHub
commit 08061c7c3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 125 additions and 136 deletions

View File

@ -41,6 +41,7 @@ import (
shimlog "github.com/containerd/containerd/runtime/v1" shimlog "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/shim" "github.com/containerd/containerd/runtime/v1/shim"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/containerd/sys/reaper"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
@ -233,7 +234,7 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.S
case s := <-signals: case s := <-signals:
switch s { switch s {
case unix.SIGCHLD: case unix.SIGCHLD:
if err := shim.Reap(); err != nil { if err := reaper.Reap(); err != nil {
logger.WithError(err).Error("reap exit status") logger.WithError(err).Error("reap exit status")
} }
case unix.SIGTERM, unix.SIGINT: case unix.SIGTERM, unix.SIGINT:
@ -291,11 +292,11 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
defer bufPool.Put(b) defer bufPool.Put(b)
cmd.Stdout = b cmd.Stdout = b
cmd.Stderr = b cmd.Stderr = b
c, err := shim.Default.Start(cmd) c, err := reaper.Default.Start(cmd)
if err != nil { if err != nil {
return err return err
} }
status, err := shim.Default.Wait(cmd, c) status, err := reaper.Default.Wait(cmd, c)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to publish event: %s", b.String()) return errors.Wrapf(err, "failed to publish event: %s", b.String())
} }

View File

@ -22,7 +22,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"github.com/containerd/containerd/runtime/v1/shim" "github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
) )
@ -34,7 +34,7 @@ func setupSignals() (chan os.Signal, error) {
signal.Notify(signals) signal.Notify(signals)
// make sure runc is setup to use the monitor // make sure runc is setup to use the monitor
// for waiting on processes // for waiting on processes
runc.Monitor = shim.Default runc.Monitor = reaper.Default
return signals, nil return signals, nil
} }

View File

@ -20,7 +20,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"github.com/containerd/containerd/runtime/v1/shim" "github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/containerd/ttrpc" "github.com/containerd/ttrpc"
"github.com/opencontainers/runc/libcontainer/system" "github.com/opencontainers/runc/libcontainer/system"
@ -34,7 +34,7 @@ func setupSignals() (chan os.Signal, error) {
signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE) signal.Notify(signals, unix.SIGTERM, unix.SIGINT, unix.SIGCHLD, unix.SIGPIPE)
// make sure runc is setup to use the monitor // make sure runc is setup to use the monitor
// for waiting on processes // for waiting on processes
runc.Monitor = shim.Default runc.Monitor = reaper.Default
// set the shim as the subreaper for all orphaned processes created by the container // set the shim as the subreaper for all orphaned processes created by the container
if err := system.SetSubreaper(1); err != nil { if err := system.SetSubreaper(1); err != nil {
return nil, err return nil, err

View File

@ -1,109 +0,0 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package shim
import (
"os/exec"
"sync"
"time"
"github.com/containerd/containerd/sys"
runc "github.com/containerd/go-runc"
"github.com/pkg/errors"
)
// ErrNoSuchProcess is returned when the process no longer exists
var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 2048
// Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels
func Reap() error {
now := time.Now()
exits, err := sys.Reap(false)
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits {
c <- runc.Exit{
Timestamp: now,
Pid: e.Pid,
Status: e.Status,
}
}
}
Default.Unlock()
return err
}
// Default is the default monitor initialized for the package
var Default = &Monitor{
subscribers: make(map[chan runc.Exit]struct{}),
}
// Monitor monitors the underlying system for process status changes
type Monitor struct {
sync.Mutex
subscribers map[chan runc.Exit]struct{}
}
// Start starts the command a registers the process with the reaper
func (m *Monitor) Start(c *exec.Cmd) (chan runc.Exit, error) {
ec := m.Subscribe()
if err := c.Start(); err != nil {
m.Unsubscribe(ec)
return nil, err
}
return ec, nil
}
// Wait blocks until a process is signal as dead.
// User should rely on the value of the exit status to determine if the
// command was successful or not.
func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
for e := range ec {
if e.Pid == c.Process.Pid {
// make sure we flush all IO
c.Wait()
m.Unsubscribe(ec)
return e.Status, nil
}
}
// return no such process if the ec channel is closed and no more exit
// events will be sent
return -1, ErrNoSuchProcess
}
// Subscribe to process exit changes
func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize)
m.Lock()
m.subscribers[c] = struct{}{}
m.Unlock()
return c
}
// Unsubscribe to process exit changes
func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock()
delete(m.subscribers, c)
close(c)
m.Unlock()
}

View File

@ -40,6 +40,7 @@ import (
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes" "github.com/containerd/containerd/runtime/linux/runctypes"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/containerd/sys/reaper"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
@ -86,7 +87,7 @@ func NewService(config Config, publisher events.Publisher) (*Service, error) {
context: ctx, context: ctx,
processes: make(map[string]process.Process), processes: make(map[string]process.Process),
events: make(chan interface{}, 128), events: make(chan interface{}, 128),
ec: Default.Subscribe(), ec: reaper.Default.Subscribe(),
} }
go s.processExits() go s.processExits()
if err := s.initPlatform(); err != nil { if err := s.initPlatform(); err != nil {

View File

@ -43,6 +43,7 @@ import (
"github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim" "github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task" taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/sys/reaper"
runcC "github.com/containerd/go-runc" runcC "github.com/containerd/go-runc"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
@ -70,12 +71,12 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func
id: id, id: id,
context: ctx, context: ctx,
events: make(chan interface{}, 128), events: make(chan interface{}, 128),
ec: shim.Default.Subscribe(), ec: reaper.Default.Subscribe(),
ep: ep, ep: ep,
cancel: shutdown, cancel: shutdown,
} }
go s.processExits() go s.processExits()
runcC.Monitor = shim.Default runcC.Monitor = reaper.Default
if err := s.initPlatform(); err != nil { if err := s.initPlatform(); err != nil {
shutdown() shutdown()
return nil, errors.Wrap(err, "failed to initialized platform behavior") return nil, errors.Wrap(err, "failed to initialized platform behavior")

View File

@ -44,6 +44,7 @@ import (
"github.com/containerd/containerd/runtime/v2/runc/options" "github.com/containerd/containerd/runtime/v2/runc/options"
"github.com/containerd/containerd/runtime/v2/shim" "github.com/containerd/containerd/runtime/v2/shim"
taskAPI "github.com/containerd/containerd/runtime/v2/task" taskAPI "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/sys/reaper"
runcC "github.com/containerd/go-runc" runcC "github.com/containerd/go-runc"
"github.com/containerd/typeurl" "github.com/containerd/typeurl"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
@ -83,13 +84,13 @@ func New(ctx context.Context, id string, publisher shim.Publisher, shutdown func
id: id, id: id,
context: ctx, context: ctx,
events: make(chan interface{}, 128), events: make(chan interface{}, 128),
ec: shim.Default.Subscribe(), ec: reaper.Default.Subscribe(),
ep: ep, ep: ep,
cancel: shutdown, cancel: shutdown,
containers: make(map[string]*runc.Container), containers: make(map[string]*runc.Container),
} }
go s.processExits() go s.processExits()
runcC.Monitor = shim.Default runcC.Monitor = reaper.Default
if err := s.initPlatform(); err != nil { if err := s.initPlatform(); err != nil {
shutdown() shutdown()
return nil, errors.Wrap(err, "failed to initialized platform behavior") return nil, errors.Wrap(err, "failed to initialized platform behavior")

View File

@ -26,6 +26,7 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/containerd/containerd/sys/reaper"
"github.com/containerd/fifo" "github.com/containerd/fifo"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -79,7 +80,7 @@ func handleSignals(ctx context.Context, logger *logrus.Entry, signals chan os.Si
case s := <-signals: case s := <-signals:
switch s { switch s {
case unix.SIGCHLD: case unix.SIGCHLD:
if err := Reap(); err != nil { if err := reaper.Reap(); err != nil {
logger.WithError(err).Error("reap exit status") logger.WithError(err).Error("reap exit status")
} }
case unix.SIGPIPE: case unix.SIGPIPE:

View File

@ -16,7 +16,7 @@
limitations under the License. limitations under the License.
*/ */
package shim package reaper
import ( import (
"os/exec" "os/exec"
@ -33,35 +33,59 @@ var ErrNoSuchProcess = errors.New("no such process")
const bufferSize = 2048 const bufferSize = 2048
type subscriber struct {
sync.Mutex
c chan runc.Exit
closed bool
}
func (s *subscriber) close() {
s.Lock()
if s.closed {
s.Unlock()
return
}
close(s.c)
s.closed = true
s.Unlock()
}
func (s *subscriber) do(fn func()) {
s.Lock()
fn()
s.Unlock()
}
// Reap should be called when the process receives an SIGCHLD. Reap will reap // Reap should be called when the process receives an SIGCHLD. Reap will reap
// all exited processes and close their wait channels // all exited processes and close their wait channels
func Reap() error { func Reap() error {
now := time.Now() now := time.Now()
exits, err := sys.Reap(false) exits, err := sys.Reap(false)
Default.Lock()
for c := range Default.subscribers {
for _, e := range exits { for _, e := range exits {
c <- runc.Exit{ done := Default.notify(runc.Exit{
Timestamp: now, Timestamp: now,
Pid: e.Pid, Pid: e.Pid,
Status: e.Status, Status: e.Status,
})
select {
case <-done:
case <-time.After(1 * time.Second):
} }
} }
}
Default.Unlock()
return err return err
} }
// Default is the default monitor initialized for the package // Default is the default monitor initialized for the package
var Default = &Monitor{ var Default = &Monitor{
subscribers: make(map[chan runc.Exit]struct{}), subscribers: make(map[chan runc.Exit]*subscriber),
} }
// Monitor monitors the underlying system for process status changes // Monitor monitors the underlying system for process status changes
type Monitor struct { type Monitor struct {
sync.Mutex sync.Mutex
subscribers map[chan runc.Exit]struct{} subscribers map[chan runc.Exit]*subscriber
} }
// Start starts the command a registers the process with the reaper // Start starts the command a registers the process with the reaper
@ -95,7 +119,9 @@ func (m *Monitor) Wait(c *exec.Cmd, ec chan runc.Exit) (int, error) {
func (m *Monitor) Subscribe() chan runc.Exit { func (m *Monitor) Subscribe() chan runc.Exit {
c := make(chan runc.Exit, bufferSize) c := make(chan runc.Exit, bufferSize)
m.Lock() m.Lock()
m.subscribers[c] = struct{}{} m.subscribers[c] = &subscriber{
c: c,
}
m.Unlock() m.Unlock()
return c return c
} }
@ -103,7 +129,74 @@ func (m *Monitor) Subscribe() chan runc.Exit {
// Unsubscribe to process exit changes // Unsubscribe to process exit changes
func (m *Monitor) Unsubscribe(c chan runc.Exit) { func (m *Monitor) Unsubscribe(c chan runc.Exit) {
m.Lock() m.Lock()
s, ok := m.subscribers[c]
if !ok {
m.Unlock()
return
}
s.close()
delete(m.subscribers, c) delete(m.subscribers, c)
close(c)
m.Unlock() m.Unlock()
} }
func (m *Monitor) getSubscribers() map[chan runc.Exit]*subscriber {
out := make(map[chan runc.Exit]*subscriber)
m.Lock()
for k, v := range m.subscribers {
out[k] = v
}
m.Unlock()
return out
}
func (m *Monitor) notify(e runc.Exit) chan struct{} {
const timeout = 1 * time.Millisecond
var (
done = make(chan struct{}, 1)
timer = time.NewTimer(timeout)
success = make(map[chan runc.Exit]struct{})
)
stop(timer, true)
go func() {
defer close(done)
for {
var (
failed int
subscribers = m.getSubscribers()
)
for _, s := range subscribers {
s.do(func() {
if s.closed {
return
}
if _, ok := success[s.c]; ok {
return
}
timer.Reset(timeout)
recv := true
select {
case s.c <- e:
success[s.c] = struct{}{}
case <-timer.C:
recv = false
failed++
}
stop(timer, recv)
})
}
// all subscribers received the message
if failed == 0 {
return
}
}
}()
return done
}
func stop(timer *time.Timer, recv bool) {
if !timer.Stop() && recv {
<-timer.C
}
}